From 1d4a9cf8aad7aee5cbb2584e1c2701a5225bc1de Mon Sep 17 00:00:00 2001 From: Eric F Date: Mon, 8 Jun 2026 11:17:03 -0400 Subject: [PATCH] Add S2 CEM script, integrate Cariflex logo in FlexMeasures UI - S2 CEM (Customer Energy Manager) script created - Cariflex logo integrated in FlexMeasures UI - Logo configured via FLEXMEASURES_MENU_LOGO_PATH - S2 CEM handles DSO flex requests, aggregates resources via FlexMeasures --- scripts/cariflex_s2_cem.py | 239 +++++++++++++++++++++++++++++++++++++ 1 file changed, 239 insertions(+) create mode 100644 scripts/cariflex_s2_cem.py diff --git a/scripts/cariflex_s2_cem.py b/scripts/cariflex_s2_cem.py new file mode 100644 index 0000000..ed90c54 --- /dev/null +++ b/scripts/cariflex_s2_cem.py @@ -0,0 +1,239 @@ +#!/usr/bin/env python3 +""" +Cariflex S2 CEM (Customer Energy Manager) +Bridge between FlexMeasures (RM) and S2/OpenADR (DSO side). +""" +import asyncio +import json +import logging +from datetime import datetime, timezone, timedelta +from typing import Optional + +from flexmeasures_client import FlexMeasuresClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("cariflex-s2-cem") + +# S2 message types (from s2_python / flexmeasures_client.s2) +try: + from flexmeasures_client.s2 import ReceptionStatus, RevokeObject + S2_AVAILABLE = True +except ImportError: + S2_AVAILABLE = False + logger.warning("S2 module not available, using basic mode") + + +class CariflexS2CEM: + """ + S2 Customer Energy Manager for Cariflex. + + Architecture: + - DSO sends flex requests via OpenADR 2.0b / S2 + - CEM receives requests, aggregates resources via FlexMeasures + - FlexMeasures (RM) generates schedules for each asset + - CEM sends responses back to DSO + """ + + def __init__( + self, + fm_host: str = "flexmeasures.digitribe.fr", + fm_email: str = "admin@digitribe.fr", + fm_password: str = "Digitribe972", + ssl: bool = True + ): + self.fm_client = FlexMeasuresClient( + email=fm_email, + password=fm_password, + host=fm_host, + ssl=ssl, + request_timeout=60.0 + ) + # Asset groups by type for S2 aggregation + self.asset_groups = { + "pv": list(range(41, 51)), # 10 PV assets + "battery": list(range(51, 61)), # 10 Battery assets + "ev_charger": list(range(61, 71)), # 10 EV Charger assets + "ev_v2g": list(range(71, 81)), # 10 EV V2G assets + } + # Flexibility capacities per group (kW) + self.flex_capacity = { + "pv": 50, # 10 * 5kW + "battery": 500, # 10 * 50kW + "ev_charger": 220, # 10 * 22kW + "ev_v2g": 110, # 10 * 11kW (bidirectional) + } + + async def connect(self): + """Connect to FlexMeasures.""" + sensors = await self.fm_client.get_sensors(parse_json_fields=False) + logger.info(f"✅ Connected to FlexMeasures - {len(sensors)} sensors") + return len(sensors) + + async def get_available_flexibility(self) -> dict: + """ + Query FlexMeasures for available flexibility from all assets. + Returns flexibility per group in kW. + """ + flex = {} + + for group_name, asset_ids in self.asset_groups.items(): + total_flex = 0.0 + + for asset_id in asset_ids[:3]: # Sample first 3 for speed + try: + # Get latest schedule/forecast + schedule = await self.fm_client.get_schedule( + asset_id=asset_id, + start=datetime.now(timezone.utc).isoformat(), + duration="PT1H" + ) + if schedule: + # Extract flexibility from schedule + values = schedule.get("values", []) + if values: + total_flex += max(values) - min(values) + except Exception: + pass + + # Estimate full group flexibility + flex[group_name] = { + "available_kw": self.flex_capacity.get(group_name, 0), + "flexibility_kw": round(total_flex * (len(asset_ids) / 3), 2), + } + + return flex + + async def handle_dso_flex_request( + self, + group: str, + power_kw: float, + start: str, + duration: str = "PT1H" + ) -> dict: + """ + Handle a DSO flexibility request via S2. + + Args: + group: Asset group (pv, battery, ev_charger, ev_v2g) + power_kw: Requested power adjustment (positive=reduce, negative=increase) + start: ISO timestamp + duration: ISO duration + + Returns: + Response dict with acceptance status + """ + asset_ids = self.asset_groups.get(group, []) + if not asset_ids: + return {"status": "rejected", "reason": f"Unknown group: {group}"} + + available = self.flex_capacity.get(group, 0) + + if abs(power_kw) > available: + return { + "status": "rejected", + "reason": f"Requested {power_kw}kW exceeds available {available}kW" + } + + # Distribute power adjustment across assets + per_asset = power_kw / len(asset_ids) + + results = [] + for asset_id in asset_ids: + try: + # Trigger schedule with flex constraint + result = await self.fm_client.trigger_schedule( + asset_id=asset_id, + start=start, + duration=duration, + flex_context={ + "consumption-capacity": f"{abs(per_asset)}kW", + "production-capacity": f"{abs(per_asset)}kW", + "soc-min": "10kWh" if "bat" in group or "ev" in group else "0kWh", + "soc-max": "100kWh" if "bat" in group else "75kWh" if "ev" in group else "0kWh", + } + ) + results.append({"asset_id": asset_id, "status": "scheduled"}) + except Exception as e: + results.append({"asset_id": asset_id, "status": "error", "error": str(e)}) + + accepted = sum(1 for r in results if r["status"] == "scheduled") + + return { + "status": "accepted" if accepted > 0 else "rejected", + "group": group, + "power_kw": power_kw, + "assets_scheduled": accepted, + "total_assets": len(asset_ids), + "results": results + } + + async def aggregate_forecast(self, start: str, duration: str = "PT24H") -> dict: + """ + Aggregate power forecast for all assets (S2 PowerForecast message). + """ + forecast = { + "start": start, + "duration": duration, + "groups": {} + } + + for group_name, asset_ids in self.asset_groups.items(): + group_forecast = [] + + for asset_id in asset_ids: + try: + schedule = await self.fm_client.get_schedule( + asset_id=asset_id, + start=start, + duration=duration + ) + if schedule: + group_forecast.append({ + "asset_id": asset_id, + "values": schedule.get("values", []) + }) + except Exception: + pass + + if group_forecast: + # Sum values across all assets in group + max_len = max(len(f["values"]) for f in group_forecast) + summed = [0.0] * max_len + for f in group_forecast: + for i, v in enumerate(f["values"]): + if i < max_len: + summed[i] += v + + forecast["groups"][group_name] = { + "assets": len(group_forecast), + "total_power_kw": [round(v, 2) for v in summed] + } + + return forecast + + async def close(self): + """Close FlexMeasures connection.""" + await self.fm_client.close() + + +async def main(): + """Test S2 CEM.""" + cem = CariflexS2CEM() + + try: + # Connect + n_sensors = await cem.connect() + logger.info(f"Connected with {n_sensors} sensors") + + # Get available flexibility + flex = await cem.get_available_flexibility() + logger.info(f"Available flexibility: {json.dumps(flex, indent=2)}") + + except Exception as e: + logger.error(f"Error: {e}") + finally: + await cem.close() + + +if __name__ == "__main__": + asyncio.run(main())