Files
cariflex/scripts/cariflex_s2_cem.py
Eric F 1d4a9cf8aa Add S2 CEM script, integrate Cariflex logo in FlexMeasures UI
- S2 CEM (Customer Energy Manager) script created
- Cariflex logo integrated in FlexMeasures UI
- Logo configured via FLEXMEASURES_MENU_LOGO_PATH
- S2 CEM handles DSO flex requests, aggregates resources via FlexMeasures
2026-06-08 11:17:03 -04:00

240 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""
Cariflex S2 CEM (Customer Energy Manager)
Bridge between FlexMeasures (RM) and S2/OpenADR (DSO side).
"""
import asyncio
import json
import logging
from datetime import datetime, timezone, timedelta
from typing import Optional
from flexmeasures_client import FlexMeasuresClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("cariflex-s2-cem")
# S2 message types (from s2_python / flexmeasures_client.s2)
try:
from flexmeasures_client.s2 import ReceptionStatus, RevokeObject
S2_AVAILABLE = True
except ImportError:
S2_AVAILABLE = False
logger.warning("S2 module not available, using basic mode")
class CariflexS2CEM:
"""
S2 Customer Energy Manager for Cariflex.
Architecture:
- DSO sends flex requests via OpenADR 2.0b / S2
- CEM receives requests, aggregates resources via FlexMeasures
- FlexMeasures (RM) generates schedules for each asset
- CEM sends responses back to DSO
"""
def __init__(
self,
fm_host: str = "flexmeasures.digitribe.fr",
fm_email: str = "admin@digitribe.fr",
fm_password: str = "Digitribe972",
ssl: bool = True
):
self.fm_client = FlexMeasuresClient(
email=fm_email,
password=fm_password,
host=fm_host,
ssl=ssl,
request_timeout=60.0
)
# Asset groups by type for S2 aggregation
self.asset_groups = {
"pv": list(range(41, 51)), # 10 PV assets
"battery": list(range(51, 61)), # 10 Battery assets
"ev_charger": list(range(61, 71)), # 10 EV Charger assets
"ev_v2g": list(range(71, 81)), # 10 EV V2G assets
}
# Flexibility capacities per group (kW)
self.flex_capacity = {
"pv": 50, # 10 * 5kW
"battery": 500, # 10 * 50kW
"ev_charger": 220, # 10 * 22kW
"ev_v2g": 110, # 10 * 11kW (bidirectional)
}
async def connect(self):
"""Connect to FlexMeasures."""
sensors = await self.fm_client.get_sensors(parse_json_fields=False)
logger.info(f"✅ Connected to FlexMeasures - {len(sensors)} sensors")
return len(sensors)
async def get_available_flexibility(self) -> dict:
"""
Query FlexMeasures for available flexibility from all assets.
Returns flexibility per group in kW.
"""
flex = {}
for group_name, asset_ids in self.asset_groups.items():
total_flex = 0.0
for asset_id in asset_ids[:3]: # Sample first 3 for speed
try:
# Get latest schedule/forecast
schedule = await self.fm_client.get_schedule(
asset_id=asset_id,
start=datetime.now(timezone.utc).isoformat(),
duration="PT1H"
)
if schedule:
# Extract flexibility from schedule
values = schedule.get("values", [])
if values:
total_flex += max(values) - min(values)
except Exception:
pass
# Estimate full group flexibility
flex[group_name] = {
"available_kw": self.flex_capacity.get(group_name, 0),
"flexibility_kw": round(total_flex * (len(asset_ids) / 3), 2),
}
return flex
async def handle_dso_flex_request(
self,
group: str,
power_kw: float,
start: str,
duration: str = "PT1H"
) -> dict:
"""
Handle a DSO flexibility request via S2.
Args:
group: Asset group (pv, battery, ev_charger, ev_v2g)
power_kw: Requested power adjustment (positive=reduce, negative=increase)
start: ISO timestamp
duration: ISO duration
Returns:
Response dict with acceptance status
"""
asset_ids = self.asset_groups.get(group, [])
if not asset_ids:
return {"status": "rejected", "reason": f"Unknown group: {group}"}
available = self.flex_capacity.get(group, 0)
if abs(power_kw) > available:
return {
"status": "rejected",
"reason": f"Requested {power_kw}kW exceeds available {available}kW"
}
# Distribute power adjustment across assets
per_asset = power_kw / len(asset_ids)
results = []
for asset_id in asset_ids:
try:
# Trigger schedule with flex constraint
result = await self.fm_client.trigger_schedule(
asset_id=asset_id,
start=start,
duration=duration,
flex_context={
"consumption-capacity": f"{abs(per_asset)}kW",
"production-capacity": f"{abs(per_asset)}kW",
"soc-min": "10kWh" if "bat" in group or "ev" in group else "0kWh",
"soc-max": "100kWh" if "bat" in group else "75kWh" if "ev" in group else "0kWh",
}
)
results.append({"asset_id": asset_id, "status": "scheduled"})
except Exception as e:
results.append({"asset_id": asset_id, "status": "error", "error": str(e)})
accepted = sum(1 for r in results if r["status"] == "scheduled")
return {
"status": "accepted" if accepted > 0 else "rejected",
"group": group,
"power_kw": power_kw,
"assets_scheduled": accepted,
"total_assets": len(asset_ids),
"results": results
}
async def aggregate_forecast(self, start: str, duration: str = "PT24H") -> dict:
"""
Aggregate power forecast for all assets (S2 PowerForecast message).
"""
forecast = {
"start": start,
"duration": duration,
"groups": {}
}
for group_name, asset_ids in self.asset_groups.items():
group_forecast = []
for asset_id in asset_ids:
try:
schedule = await self.fm_client.get_schedule(
asset_id=asset_id,
start=start,
duration=duration
)
if schedule:
group_forecast.append({
"asset_id": asset_id,
"values": schedule.get("values", [])
})
except Exception:
pass
if group_forecast:
# Sum values across all assets in group
max_len = max(len(f["values"]) for f in group_forecast)
summed = [0.0] * max_len
for f in group_forecast:
for i, v in enumerate(f["values"]):
if i < max_len:
summed[i] += v
forecast["groups"][group_name] = {
"assets": len(group_forecast),
"total_power_kw": [round(v, 2) for v in summed]
}
return forecast
async def close(self):
"""Close FlexMeasures connection."""
await self.fm_client.close()
async def main():
"""Test S2 CEM."""
cem = CariflexS2CEM()
try:
# Connect
n_sensors = await cem.connect()
logger.info(f"Connected with {n_sensors} sensors")
# Get available flexibility
flex = await cem.get_available_flexibility()
logger.info(f"Available flexibility: {json.dumps(flex, indent=2)}")
except Exception as e:
logger.error(f"Error: {e}")
finally:
await cem.close()
if __name__ == "__main__":
asyncio.run(main())