#!/usr/bin/env python3 """ Cariflex - OpenLEADR VEN (Virtual End Node) Reçoit les signaux DSR du TSO/DSO et les transmet à FlexMeasures. """ import asyncio import json import logging from datetime import datetime, timezone, timedelta from openleadr import OpenADRClient, enable_default_logging import requests import re # Configuration FM_HOST = "https://cariflex.digitribe.fr" FM_EMAIL = "admin@digitribe.fr" FM_PASSWORD = "Digitribe972" VEN_NAME = "Cariflex-VEN" VTN_URL = "http://localhost:8080" # URL du VTN OpenADR (à configurer) # Sensors IDs dans FM SENSOR_CONSUMPTION_PRICE = 84 SENSOR_PRODUCTION_PRICE = 85 SENSOR_PV_FORECAST = 41 # PV_01 SENSOR_BAT_SOC = 51 # Bat_01 # Setup logging enable_default_logging() logger = logging.getLogger("openleadr") class CariflexVEN: """OpenLEADR VEN pour Cariflex EMS.""" def __init__(self): self.client = OpenADRClient( vad_name=VEN_NAME, vtn_url=VTN_URL, ) self.fm_session = None def fm_login(self): """Login to FlexMeasures.""" self.fm_session = requests.Session() self.fm_session.verify = False r = self.fm_session.get(f"{FM_HOST}/login") match = re.search(r']*csrf_token[^>]*value="([^"]+)"', r.text) if match: csrf = match.group(1) r = self.fm_session.post(f"{FM_HOST}/login", data={ "email": FM_EMAIL, "password": FM_PASSWORD, "csrf_token": csrf, "remember": "y" }, allow_redirects=True) return "dashboard" in r.url return False def post_sensor_data(self, sensor_id, values, unit, start, duration="PT1H"): """Post sensor data to FlexMeasures.""" if not self.fm_session: if not self.fm_login(): return False r = self.fm_session.post( f"{FM_HOST}/api/v3_0/sensors/{sensor_id}/data", json={"values": values, "start": start, "duration": duration, "unit": unit}, timeout=30 ) return r.status_code in [200, 201, 202] async def handle_dsr_event(self, event): """ Handle DSR event from TSO/DSO. Event format: { "event_id": "...", "modification_number": 0, "event_status": "active", "created_date_time": "...", "event_descriptor": {...}, "active_period": {...}, "event_signals": [ { "signal_name": "LOAD_CONTROL", "signal_type": "X_LOAD_CONTROL", "intervals": [ {"duration": "PT1H", "signal_payload": 0.5} ] } ] } """ logger.info(f"Received DSR event: {event.get('event_id')}") for signal in event.get("event_signals", []): signal_name = signal.get("signal_name") signal_type = signal.get("signal_type") for interval in signal.get("intervals", []): payload = interval.get("signal_payload") duration = interval.get("duration", "PT1H") # Convert payload to price signal # payload: 0.0 = low price, 1.0 = high price if signal_name in ["LOAD_CONTROL", "PRICE"]: # Calculate price based on payload base_price = 100 # EUR/MWh price = base_price * (0.5 + payload) # 50-150 EUR/MWh # Post to FM now = datetime.now(timezone.utc) success = self.post_sensor_data( SENSOR_CONSUMPTION_PRICE, [round(price, 2)], "EUR/MWh", now.isoformat(), duration ) logger.info(f"Posted price {price} EUR/MWh: {success}") elif signal_name == "RENEWABLE_GENERATION": # Adjust PV forecast based on DSR signal # payload: 0.0 = no PV, 1.0 = max PV pv_factor = payload # TODO: Adjust PV forecast logger.info(f"PV factor: {pv_factor}") return "OK" async def handle_s2_message(self, message): """ Handle S2 message from TSO/DSO. S2 messages are used for real-time control of flexible resources. """ logger.info(f"Received S2 message: {message.get('message_type')}") msg_type = message.get("message_type") if msg_type == "FRBC.ActuatorStatus": # Update actuator status in FM pass elif msg_type == "FRBC.Instruction": # Execute instruction in FM pass elif msg_type == "FRBC.SystemDescription": # Update system description in FM pass return "OK" async def run(self): """Run the OpenLEADR VEN.""" logger.info(f"Starting Cariflex VEN: {VEN_NAME}") # Register event handler self.client.add_handler("on_event", self.handle_dsr_event) # Login to FM if self.fm_login(): logger.info("Logged in to FlexMeasures") else: logger.error("Failed to login to FlexMeasures") # Start the VEN await self.client.run() if __name__ == "__main__": ven = CariflexVEN() asyncio.run(ven.run())