#!/usr/bin/env python3 """ Cariflex S2 CEM (Customer Energy Manager) Bridge between FlexMeasures (RM) and S2/OpenADR (DSO side). """ import asyncio import json import logging from datetime import datetime, timezone, timedelta from typing import Optional from flexmeasures_client import FlexMeasuresClient logging.basicConfig(level=logging.INFO) logger = logging.getLogger("cariflex-s2-cem") # S2 message types (from s2_python / flexmeasures_client.s2) try: from flexmeasures_client.s2 import ReceptionStatus, RevokeObject S2_AVAILABLE = True except ImportError: S2_AVAILABLE = False logger.warning("S2 module not available, using basic mode") class CariflexS2CEM: """ S2 Customer Energy Manager for Cariflex. Architecture: - DSO sends flex requests via OpenADR 2.0b / S2 - CEM receives requests, aggregates resources via FlexMeasures - FlexMeasures (RM) generates schedules for each asset - CEM sends responses back to DSO """ def __init__( self, fm_host: str = "flexmeasures.digitribe.fr", fm_email: str = "admin@digitribe.fr", fm_password: str = "Digitribe972", ssl: bool = True ): self.fm_client = FlexMeasuresClient( email=fm_email, password=fm_password, host=fm_host, ssl=ssl, request_timeout=60.0 ) # Asset groups by type for S2 aggregation self.asset_groups = { "pv": list(range(41, 51)), # 10 PV assets "battery": list(range(51, 61)), # 10 Battery assets "ev_charger": list(range(61, 71)), # 10 EV Charger assets "ev_v2g": list(range(71, 81)), # 10 EV V2G assets } # Flexibility capacities per group (kW) self.flex_capacity = { "pv": 50, # 10 * 5kW "battery": 500, # 10 * 50kW "ev_charger": 220, # 10 * 22kW "ev_v2g": 110, # 10 * 11kW (bidirectional) } async def connect(self): """Connect to FlexMeasures.""" sensors = await self.fm_client.get_sensors(parse_json_fields=False) logger.info(f"✅ Connected to FlexMeasures - {len(sensors)} sensors") return len(sensors) async def get_available_flexibility(self) -> dict: """ Query FlexMeasures for available flexibility from all assets. Returns flexibility per group in kW. """ flex = {} for group_name, asset_ids in self.asset_groups.items(): total_flex = 0.0 for asset_id in asset_ids[:3]: # Sample first 3 for speed try: # Get latest schedule/forecast schedule = await self.fm_client.get_schedule( asset_id=asset_id, start=datetime.now(timezone.utc).isoformat(), duration="PT1H" ) if schedule: # Extract flexibility from schedule values = schedule.get("values", []) if values: total_flex += max(values) - min(values) except Exception: pass # Estimate full group flexibility flex[group_name] = { "available_kw": self.flex_capacity.get(group_name, 0), "flexibility_kw": round(total_flex * (len(asset_ids) / 3), 2), } return flex async def handle_dso_flex_request( self, group: str, power_kw: float, start: str, duration: str = "PT1H" ) -> dict: """ Handle a DSO flexibility request via S2. Args: group: Asset group (pv, battery, ev_charger, ev_v2g) power_kw: Requested power adjustment (positive=reduce, negative=increase) start: ISO timestamp duration: ISO duration Returns: Response dict with acceptance status """ asset_ids = self.asset_groups.get(group, []) if not asset_ids: return {"status": "rejected", "reason": f"Unknown group: {group}"} available = self.flex_capacity.get(group, 0) if abs(power_kw) > available: return { "status": "rejected", "reason": f"Requested {power_kw}kW exceeds available {available}kW" } # Distribute power adjustment across assets per_asset = power_kw / len(asset_ids) results = [] for asset_id in asset_ids: try: # Trigger schedule with flex constraint result = await self.fm_client.trigger_schedule( asset_id=asset_id, start=start, duration=duration, flex_context={ "consumption-capacity": f"{abs(per_asset)}kW", "production-capacity": f"{abs(per_asset)}kW", "soc-min": "10kWh" if "bat" in group or "ev" in group else "0kWh", "soc-max": "100kWh" if "bat" in group else "75kWh" if "ev" in group else "0kWh", } ) results.append({"asset_id": asset_id, "status": "scheduled"}) except Exception as e: results.append({"asset_id": asset_id, "status": "error", "error": str(e)}) accepted = sum(1 for r in results if r["status"] == "scheduled") return { "status": "accepted" if accepted > 0 else "rejected", "group": group, "power_kw": power_kw, "assets_scheduled": accepted, "total_assets": len(asset_ids), "results": results } async def aggregate_forecast(self, start: str, duration: str = "PT24H") -> dict: """ Aggregate power forecast for all assets (S2 PowerForecast message). """ forecast = { "start": start, "duration": duration, "groups": {} } for group_name, asset_ids in self.asset_groups.items(): group_forecast = [] for asset_id in asset_ids: try: schedule = await self.fm_client.get_schedule( asset_id=asset_id, start=start, duration=duration ) if schedule: group_forecast.append({ "asset_id": asset_id, "values": schedule.get("values", []) }) except Exception: pass if group_forecast: # Sum values across all assets in group max_len = max(len(f["values"]) for f in group_forecast) summed = [0.0] * max_len for f in group_forecast: for i, v in enumerate(f["values"]): if i < max_len: summed[i] += v forecast["groups"][group_name] = { "assets": len(group_forecast), "total_power_kw": [round(v, 2) for v in summed] } return forecast async def close(self): """Close FlexMeasures connection.""" await self.fm_client.close() async def main(): """Test S2 CEM.""" cem = CariflexS2CEM() try: # Connect n_sensors = await cem.connect() logger.info(f"Connected with {n_sensors} sensors") # Get available flexibility flex = await cem.get_available_flexibility() logger.info(f"Available flexibility: {json.dumps(flex, indent=2)}") except Exception as e: logger.error(f"Error: {e}") finally: await cem.close() if __name__ == "__main__": asyncio.run(main())