From b69417b0d7f46e33a436ca705f81543bc689b19a Mon Sep 17 00:00:00 2001 From: Eric F Date: Wed, 10 Jun 2026 10:12:41 -0400 Subject: [PATCH] OpenADR VTN/VEN deployment + Grafana dashboard update --- scripts/fm_fix_and_schedule.sh | 58 +++++++++++ scripts/fm_forecast_schedule.py | 85 ++++++++++++++++ scripts/openadr_ven.py | 106 ++++++++++++++++++++ scripts/openadr_vtn.py | 91 +++++++++++++++++ scripts/openleadr_ven.py | 170 ++++++++++++++++++++++++++++++++ 5 files changed, 510 insertions(+) create mode 100644 scripts/fm_fix_and_schedule.sh create mode 100644 scripts/fm_forecast_schedule.py create mode 100644 scripts/openadr_ven.py create mode 100644 scripts/openadr_vtn.py create mode 100644 scripts/openleadr_ven.py diff --git a/scripts/fm_fix_and_schedule.sh b/scripts/fm_fix_and_schedule.sh new file mode 100644 index 0000000..2bc7101 --- /dev/null +++ b/scripts/fm_fix_and_schedule.sh @@ -0,0 +1,58 @@ +#!/bin/bash +# Cariflex - Correction des assets et lancement du scheduling + +echo "=== Correction des flex_context des assets ===" + +# Clear all flex_context for battery and EV assets +docker exec flexmeasures-db psql -U flexmeasures -d flexmeasures -c " +UPDATE generic_asset SET flex_context = '{}' WHERE id BETWEEN 51 AND 80; +UPDATE generic_asset SET flex_model = '{}' WHERE id BETWEEN 51 AND 80; +SELECT id, name, flex_context, flex_model FROM generic_asset WHERE id = 51; +" 2>&1 + +echo "" +echo "=== Lancement du scheduling Batteries ===" +for sensor_id in $(seq 51 60); do + echo " Scheduling sensor $sensor_id..." + docker exec flexmeasures-server bash -c " + cd /app && .venv/bin/flexmeasures add schedule \ + --sensor $sensor_id \ + --start \$(date -u +'%Y-%m-%dT%H:%M:%S+00:00') \ + --duration PT24H \ + --resolution PT15M \ + --soc-at-start 0.5 \ + --flex-model '{\"soc-min\": \"0.1 MWh\", \"soc-max\": \"1 MWh\", \"power-capacity\": \"0.05 MW\"}' 2>&1 + " 2>&1 | grep -E "Successfully|SAVED|Error" | head -3 +done + +echo "" +echo "=== Lancement du forecasting PV ===" +for sensor_id in $(seq 41 45); do + echo " Forecasting sensor $sensor_id..." + docker exec flexmeasures-server bash -c " + cd /app && .venv/bin/flexmeasures add forecasts \ + --sensor $sensor_id \ + --to-date \$(date -u -d '+24 hours' +'%Y-%m-%dT%H:%M:%S+00:00') 2>&1 + " 2>&1 | grep -E "Successfully|SAVED" | head -1 +done + +echo "" +echo "=== Vérification des données ===" +docker exec flexmeasures-db psql -U flexmeasures -d flexmeasures -c " +SELECT + 'forecasts' as type, + COUNT(*) as count +FROM timed_belief +WHERE source_id IN (SELECT id FROM data_source WHERE name LIKE '%forecast%') + AND event_start > NOW() - INTERVAL '1 hour' +UNION ALL +SELECT + 'schedules' as type, + COUNT(*) as count +FROM timed_belief +WHERE source_id IN (SELECT id FROM data_source WHERE name LIKE '%schedule%') + AND event_start > NOW() - INTERVAL '1 hour'; +" 2>&1 + +echo "" +echo "=== Terminé ===" diff --git a/scripts/fm_forecast_schedule.py b/scripts/fm_forecast_schedule.py new file mode 100644 index 0000000..b20f2f2 --- /dev/null +++ b/scripts/fm_forecast_schedule.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +"""Cariflex - Déclenche le forecasting et scheduling FlexMeasures.""" + +import subprocess +import json +from datetime import datetime, timezone, timedelta + +FM_HOST = "https://cariflex.digitribe.fr" + +def run_fm_cli(args): + """Run FM CLI inside the container.""" + cmd = ["docker", "exec", "flexmeasures-server", "bash", "-c", + f"cd /app && .venv/bin/flexmeasures {' '.join(args)} 2>&1"] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) + return result.stdout.strip(), result.stderr.strip(), result.returncode + +# ======================================== +# 1. Forecasting - Prévisions PV +# ======================================== +print("=== Forecasting PV ===") +for sensor_id in range(41, 51): + stdout, stderr, rc = run_fm_cli([ + "add", "forecasts", + "--sensor", str(sensor_id), + "--to-date", (datetime.now(timezone.utc) + timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%S+00:00"), + ]) + if "Successfully" in stdout or "SAVED" in stdout: + print(f" Sensor {sensor_id}: OK") + else: + print(f" Sensor {sensor_id}: {stdout[:100]}") + +# ======================================== +# 2. Forecasting - Prévisions Charge VE +# ======================================== +print("\n=== Forecasting Charge VE ===") +for sensor_id in range(61, 71): + stdout, stderr, rc = run_fm_cli([ + "add", "forecasts", + "--sensor", str(sensor_id), + "--to-date", (datetime.now(timezone.utc) + timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%S+00:00"), + ]) + if "Successfully" in stdout or "SAVED" in stdout: + print(f" Sensor {sensor_id}: OK") + else: + print(f" Sensor {sensor_id}: {stdout[:100]}") + +# ======================================== +# 3. Scheduling - Plans de charge Batteries +# ======================================== +print("\n=== Scheduling Batteries ===") +for sensor_id in range(51, 61): + stdout, stderr, rc = run_fm_cli([ + "add", "schedule", + "--sensor", str(sensor_id), + "--start", datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00"), + "--duration", "PT24H", + "--resolution", "PT15M", + "--soc-at-start", "0.5", + "--flex-model", '{"soc-min": "10 kWh", "soc-max": "100 kWh", "power-capacity": "50 kW"}', + ]) + if "Successfully" in stdout or "SAVED" in stdout: + print(f" Sensor {sensor_id}: OK") + else: + print(f" Sensor {sensor_id}: {stdout[:100]}") + +# ======================================== +# 4. Scheduling - Plans de charge V2G +# ======================================== +print("\n=== Scheduling V2G ===") +for sensor_id in range(71, 81): + stdout, stderr, rc = run_fm_cli([ + "add", "schedule", + "--sensor", str(sensor_id), + "--start", datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00"), + "--duration", "PT24H", + "--resolution", "PT15M", + "--soc-at-start", "0.5", + "--flex-model", '{"soc-min": "15 kWh", "soc-max": "75 kWh", "power-capacity": "11 kW"}', + ]) + if "Successfully" in stdout or "SAVED" in stdout: + print(f" Sensor {sensor_id}: OK") + else: + print(f" Sensor {sensor_id}: {stdout[:100]}") + +print("\n=== Terminé ===") diff --git a/scripts/openadr_ven.py b/scripts/openadr_ven.py new file mode 100644 index 0000000..6df7bf8 --- /dev/null +++ b/scripts/openadr_ven.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +""" +Cariflex - OpenADR VEN (Virtual End Node) +Reçoit les signaux DSR du VTN et les transmet à FlexMeasures. +""" + +import asyncio +import logging +from datetime import datetime, timezone, timedelta +from openleadr import OpenADRClient, enable_default_logging +import requests +import re + +enable_default_logging() +logger = logging.getLogger("openleadr") + +FM_HOST = "https://cariflex.digitribe.fr" +FM_EMAIL = "admin@digitribe.fr" +FM_PASSWORD = "Digitribe972" +VEN_ID = "Cariflex-VEN" +VTN_URL = "http://localhost:8081" + +# Sensor IDs in FM +SENSOR_CONSUMPTION_PRICE = 84 +SENSOR_LOAD_CONTROL = 86 + + +def fm_login(): + session = requests.Session() + session.verify = False + r = session.get(f"{FM_HOST}/login") + match = re.search(r']*csrf_token[^>]*value="([^"]+)"', r.text) + if match: + csrf = match.group(1) + r = session.post(f"{FM_HOST}/login", data={ + "email": FM_EMAIL, "password": FM_PASSWORD, + "csrf_token": csrf, "remember": "y" + }, allow_redirects=True) + if "dashboard" in r.url: + return session + return None + + +def post_sensor_data(session, sensor_id, value, unit, start, duration="PT1H"): + r = session.post( + f"{FM_HOST}/api/v3_0/sensors/{sensor_id}/data", + json={"values": [value], "start": start, "duration": duration, "unit": unit}, + timeout=30 + ) + return r.status_code in [200, 201, 202] + + +async def main(): + client = OpenADRClient( + ven_id=VEN_ID, + ven_name=VEN_ID, + vtn_url=VTN_URL, + ) + + logger.info(f"Starting Cariflex VEN: {VEN_ID}") + logger.info(f"Connecting to VTN: {VTN_URL}") + + # Login to FM + session = fm_login() + if session: + logger.info("Logged in to FlexMeasures") + else: + logger.error("Failed to login to FlexMeasures") + return + + # Add event handler + @client.on_event + async def handle_event(event): + logger.info(f"Received event: {event}") + + # Extract signal data + for signal in event.get("event_signals", []): + signal_name = signal.get("signal_name") + + for interval in signal.get("intervals", []): + payload = interval.get("signal_payload", 0) + dtstart = interval.get("dtstart", datetime.now(timezone.utc)) + + if signal_name == "ENERGY_PRICE": + price = round(float(payload), 2) + success = post_sensor_data( + session, SENSOR_CONSUMPTION_PRICE, price, "EUR/MWh", + dtstart.isoformat() if isinstance(dtstart, datetime) else str(dtstart) + ) + logger.info(f"Price {price} EUR/MWh: {success}") + + elif signal_name == "LOAD_CONTROL": + success = post_sensor_data( + session, SENSOR_LOAD_CONTROL, round(float(payload), 2), "%", + dtstart.isoformat() if isinstance(dtstart, datetime) else str(dtstart) + ) + logger.info(f"Load control {payload}: {success}") + + return "optIn" + + # Run the client + await client.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/scripts/openadr_vtn.py b/scripts/openadr_vtn.py new file mode 100644 index 0000000..37851da --- /dev/null +++ b/scripts/openadr_vtn.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +""" +Cariflex - OpenADR VTN (Virtual Top Node) Simulé +Envoie des signaux DSR (prix, charge/décharge) au VEN Cariflex. +""" + +import asyncio +import random +from datetime import datetime, timezone, timedelta +from openleadr import OpenADRServer, enable_default_logging +from openleadr.objects import Interval +import logging + +enable_default_logging() +logger = logging.getLogger("openleadr") + +VTN_ID = "Cariflex-VTN" +VEN_ID = "Cariflex-VEN" +PORT = 8082 # Use 8082 to avoid conflicts + + +async def ven_lookup(ven_id, *args, **kwargs): + """Look up a VEN by ID - required for auto-registration.""" + logger.info(f"VEN lookup: {ven_id}") + if ven_id == VEN_ID: + return {"ven_name": VEN_ID} + return None + + +async def main(): + server = OpenADRServer( + vtn_id=VTN_ID, + http_port=PORT, + http_host="0.0.0.0", + ven_lookup=ven_lookup, # This enables auto-registration + ) + + logger.info(f"Starting Cariflex VTN: {VTN_ID} on port {PORT}") + + # Create price event + now = datetime.now(timezone.utc) + price_intervals = [] + for h in range(24): + dt = now + timedelta(hours=h) + hour_of_day = dt.hour + if 6 <= hour_of_day <= 22: + price = round(random.uniform(80, 150), 2) + else: + price = round(random.uniform(40, 80), 2) + price_intervals.append( + Interval(dtstart=dt, duration=timedelta(hours=1), signal_payload=price) + ) + + server.add_event( + ven_id=VEN_ID, + signal_name="ENERGY_PRICE", + signal_type="price", + intervals=price_intervals, + event_id=f"price-{now.strftime('%Y%m%d%H%M%S')}", + targets=[{"ven_id": VEN_ID}], + market_context="https://cariflex.digitribe.fr/price", + ) + logger.info(f"Created price event with {len(price_intervals)} intervals") + + # Create load control event + load_intervals = [] + for h in range(4): + dt = now + timedelta(hours=h) + payload = random.choice([0.0, 0.0, 0.0, 0.5, 1.0]) + load_intervals.append( + Interval(dtstart=dt, duration=timedelta(hours=1), signal_payload=payload) + ) + + server.add_event( + ven_id=VEN_ID, + signal_name="LOAD_CONTROL", + signal_type="x-loadControlCapacity", + intervals=load_intervals, + event_id=f"load-{now.strftime('%Y%m%d%H%M%S')}", + targets=[{"ven_id": VEN_ID}], + market_context="https://cariflex.digitribe.fr/load", + ) + logger.info(f"Created load control event with {len(load_intervals)} intervals") + logger.info("Created 2 DSR events, starting server...") + + # Start server (blocks forever) + await server.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/scripts/openleadr_ven.py b/scripts/openleadr_ven.py new file mode 100644 index 0000000..4ca13ea --- /dev/null +++ b/scripts/openleadr_ven.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 +""" +Cariflex - OpenLEADR VEN (Virtual End Node) +Reçoit les signaux DSR du TSO/DSO et les transmet à FlexMeasures. +""" + +import asyncio +import json +import logging +from datetime import datetime, timezone, timedelta +from openleadr import OpenADRClient, enable_default_logging +import requests +import re + +# Configuration +FM_HOST = "https://cariflex.digitribe.fr" +FM_EMAIL = "admin@digitribe.fr" +FM_PASSWORD = "Digitribe972" +VEN_NAME = "Cariflex-VEN" +VTN_URL = "http://localhost:8080" # URL du VTN OpenADR (à configurer) + +# Sensors IDs dans FM +SENSOR_CONSUMPTION_PRICE = 84 +SENSOR_PRODUCTION_PRICE = 85 +SENSOR_PV_FORECAST = 41 # PV_01 +SENSOR_BAT_SOC = 51 # Bat_01 + +# Setup logging +enable_default_logging() +logger = logging.getLogger("openleadr") + +class CariflexVEN: + """OpenLEADR VEN pour Cariflex EMS.""" + + def __init__(self): + self.client = OpenADRClient( + vad_name=VEN_NAME, + vtn_url=VTN_URL, + ) + self.fm_session = None + + def fm_login(self): + """Login to FlexMeasures.""" + self.fm_session = requests.Session() + self.fm_session.verify = False + r = self.fm_session.get(f"{FM_HOST}/login") + match = re.search(r']*csrf_token[^>]*value="([^"]+)"', r.text) + if match: + csrf = match.group(1) + r = self.fm_session.post(f"{FM_HOST}/login", data={ + "email": FM_EMAIL, "password": FM_PASSWORD, + "csrf_token": csrf, "remember": "y" + }, allow_redirects=True) + return "dashboard" in r.url + return False + + def post_sensor_data(self, sensor_id, values, unit, start, duration="PT1H"): + """Post sensor data to FlexMeasures.""" + if not self.fm_session: + if not self.fm_login(): + return False + + r = self.fm_session.post( + f"{FM_HOST}/api/v3_0/sensors/{sensor_id}/data", + json={"values": values, "start": start, "duration": duration, "unit": unit}, + timeout=30 + ) + return r.status_code in [200, 201, 202] + + async def handle_dsr_event(self, event): + """ + Handle DSR event from TSO/DSO. + + Event format: + { + "event_id": "...", + "modification_number": 0, + "event_status": "active", + "created_date_time": "...", + "event_descriptor": {...}, + "active_period": {...}, + "event_signals": [ + { + "signal_name": "LOAD_CONTROL", + "signal_type": "X_LOAD_CONTROL", + "intervals": [ + {"duration": "PT1H", "signal_payload": 0.5} + ] + } + ] + } + """ + logger.info(f"Received DSR event: {event.get('event_id')}") + + for signal in event.get("event_signals", []): + signal_name = signal.get("signal_name") + signal_type = signal.get("signal_type") + + for interval in signal.get("intervals", []): + payload = interval.get("signal_payload") + duration = interval.get("duration", "PT1H") + + # Convert payload to price signal + # payload: 0.0 = low price, 1.0 = high price + if signal_name in ["LOAD_CONTROL", "PRICE"]: + # Calculate price based on payload + base_price = 100 # EUR/MWh + price = base_price * (0.5 + payload) # 50-150 EUR/MWh + + # Post to FM + now = datetime.now(timezone.utc) + success = self.post_sensor_data( + SENSOR_CONSUMPTION_PRICE, + [round(price, 2)], + "EUR/MWh", + now.isoformat(), + duration + ) + logger.info(f"Posted price {price} EUR/MWh: {success}") + + elif signal_name == "RENEWABLE_GENERATION": + # Adjust PV forecast based on DSR signal + # payload: 0.0 = no PV, 1.0 = max PV + pv_factor = payload + # TODO: Adjust PV forecast + logger.info(f"PV factor: {pv_factor}") + + return "OK" + + async def handle_s2_message(self, message): + """ + Handle S2 message from TSO/DSO. + + S2 messages are used for real-time control of flexible resources. + """ + logger.info(f"Received S2 message: {message.get('message_type')}") + + msg_type = message.get("message_type") + + if msg_type == "FRBC.ActuatorStatus": + # Update actuator status in FM + pass + elif msg_type == "FRBC.Instruction": + # Execute instruction in FM + pass + elif msg_type == "FRBC.SystemDescription": + # Update system description in FM + pass + + return "OK" + + async def run(self): + """Run the OpenLEADR VEN.""" + logger.info(f"Starting Cariflex VEN: {VEN_NAME}") + + # Register event handler + self.client.add_handler("on_event", self.handle_dsr_event) + + # Login to FM + if self.fm_login(): + logger.info("Logged in to FlexMeasures") + else: + logger.error("Failed to login to FlexMeasures") + + # Start the VEN + await self.client.run() + +if __name__ == "__main__": + ven = CariflexVEN() + asyncio.run(ven.run())