Add S2 CEM script, integrate Cariflex logo in FlexMeasures UI
- S2 CEM (Customer Energy Manager) script created - Cariflex logo integrated in FlexMeasures UI - Logo configured via FLEXMEASURES_MENU_LOGO_PATH - S2 CEM handles DSO flex requests, aggregates resources via FlexMeasures
This commit is contained in:
239
scripts/cariflex_s2_cem.py
Normal file
239
scripts/cariflex_s2_cem.py
Normal file
@@ -0,0 +1,239 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Cariflex S2 CEM (Customer Energy Manager)
|
||||
Bridge between FlexMeasures (RM) and S2/OpenADR (DSO side).
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from typing import Optional
|
||||
|
||||
from flexmeasures_client import FlexMeasuresClient
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger("cariflex-s2-cem")
|
||||
|
||||
# S2 message types (from s2_python / flexmeasures_client.s2)
|
||||
try:
|
||||
from flexmeasures_client.s2 import ReceptionStatus, RevokeObject
|
||||
S2_AVAILABLE = True
|
||||
except ImportError:
|
||||
S2_AVAILABLE = False
|
||||
logger.warning("S2 module not available, using basic mode")
|
||||
|
||||
|
||||
class CariflexS2CEM:
|
||||
"""
|
||||
S2 Customer Energy Manager for Cariflex.
|
||||
|
||||
Architecture:
|
||||
- DSO sends flex requests via OpenADR 2.0b / S2
|
||||
- CEM receives requests, aggregates resources via FlexMeasures
|
||||
- FlexMeasures (RM) generates schedules for each asset
|
||||
- CEM sends responses back to DSO
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
fm_host: str = "flexmeasures.digitribe.fr",
|
||||
fm_email: str = "admin@digitribe.fr",
|
||||
fm_password: str = "Digitribe972",
|
||||
ssl: bool = True
|
||||
):
|
||||
self.fm_client = FlexMeasuresClient(
|
||||
email=fm_email,
|
||||
password=fm_password,
|
||||
host=fm_host,
|
||||
ssl=ssl,
|
||||
request_timeout=60.0
|
||||
)
|
||||
# Asset groups by type for S2 aggregation
|
||||
self.asset_groups = {
|
||||
"pv": list(range(41, 51)), # 10 PV assets
|
||||
"battery": list(range(51, 61)), # 10 Battery assets
|
||||
"ev_charger": list(range(61, 71)), # 10 EV Charger assets
|
||||
"ev_v2g": list(range(71, 81)), # 10 EV V2G assets
|
||||
}
|
||||
# Flexibility capacities per group (kW)
|
||||
self.flex_capacity = {
|
||||
"pv": 50, # 10 * 5kW
|
||||
"battery": 500, # 10 * 50kW
|
||||
"ev_charger": 220, # 10 * 22kW
|
||||
"ev_v2g": 110, # 10 * 11kW (bidirectional)
|
||||
}
|
||||
|
||||
async def connect(self):
|
||||
"""Connect to FlexMeasures."""
|
||||
sensors = await self.fm_client.get_sensors(parse_json_fields=False)
|
||||
logger.info(f"✅ Connected to FlexMeasures - {len(sensors)} sensors")
|
||||
return len(sensors)
|
||||
|
||||
async def get_available_flexibility(self) -> dict:
|
||||
"""
|
||||
Query FlexMeasures for available flexibility from all assets.
|
||||
Returns flexibility per group in kW.
|
||||
"""
|
||||
flex = {}
|
||||
|
||||
for group_name, asset_ids in self.asset_groups.items():
|
||||
total_flex = 0.0
|
||||
|
||||
for asset_id in asset_ids[:3]: # Sample first 3 for speed
|
||||
try:
|
||||
# Get latest schedule/forecast
|
||||
schedule = await self.fm_client.get_schedule(
|
||||
asset_id=asset_id,
|
||||
start=datetime.now(timezone.utc).isoformat(),
|
||||
duration="PT1H"
|
||||
)
|
||||
if schedule:
|
||||
# Extract flexibility from schedule
|
||||
values = schedule.get("values", [])
|
||||
if values:
|
||||
total_flex += max(values) - min(values)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Estimate full group flexibility
|
||||
flex[group_name] = {
|
||||
"available_kw": self.flex_capacity.get(group_name, 0),
|
||||
"flexibility_kw": round(total_flex * (len(asset_ids) / 3), 2),
|
||||
}
|
||||
|
||||
return flex
|
||||
|
||||
async def handle_dso_flex_request(
|
||||
self,
|
||||
group: str,
|
||||
power_kw: float,
|
||||
start: str,
|
||||
duration: str = "PT1H"
|
||||
) -> dict:
|
||||
"""
|
||||
Handle a DSO flexibility request via S2.
|
||||
|
||||
Args:
|
||||
group: Asset group (pv, battery, ev_charger, ev_v2g)
|
||||
power_kw: Requested power adjustment (positive=reduce, negative=increase)
|
||||
start: ISO timestamp
|
||||
duration: ISO duration
|
||||
|
||||
Returns:
|
||||
Response dict with acceptance status
|
||||
"""
|
||||
asset_ids = self.asset_groups.get(group, [])
|
||||
if not asset_ids:
|
||||
return {"status": "rejected", "reason": f"Unknown group: {group}"}
|
||||
|
||||
available = self.flex_capacity.get(group, 0)
|
||||
|
||||
if abs(power_kw) > available:
|
||||
return {
|
||||
"status": "rejected",
|
||||
"reason": f"Requested {power_kw}kW exceeds available {available}kW"
|
||||
}
|
||||
|
||||
# Distribute power adjustment across assets
|
||||
per_asset = power_kw / len(asset_ids)
|
||||
|
||||
results = []
|
||||
for asset_id in asset_ids:
|
||||
try:
|
||||
# Trigger schedule with flex constraint
|
||||
result = await self.fm_client.trigger_schedule(
|
||||
asset_id=asset_id,
|
||||
start=start,
|
||||
duration=duration,
|
||||
flex_context={
|
||||
"consumption-capacity": f"{abs(per_asset)}kW",
|
||||
"production-capacity": f"{abs(per_asset)}kW",
|
||||
"soc-min": "10kWh" if "bat" in group or "ev" in group else "0kWh",
|
||||
"soc-max": "100kWh" if "bat" in group else "75kWh" if "ev" in group else "0kWh",
|
||||
}
|
||||
)
|
||||
results.append({"asset_id": asset_id, "status": "scheduled"})
|
||||
except Exception as e:
|
||||
results.append({"asset_id": asset_id, "status": "error", "error": str(e)})
|
||||
|
||||
accepted = sum(1 for r in results if r["status"] == "scheduled")
|
||||
|
||||
return {
|
||||
"status": "accepted" if accepted > 0 else "rejected",
|
||||
"group": group,
|
||||
"power_kw": power_kw,
|
||||
"assets_scheduled": accepted,
|
||||
"total_assets": len(asset_ids),
|
||||
"results": results
|
||||
}
|
||||
|
||||
async def aggregate_forecast(self, start: str, duration: str = "PT24H") -> dict:
|
||||
"""
|
||||
Aggregate power forecast for all assets (S2 PowerForecast message).
|
||||
"""
|
||||
forecast = {
|
||||
"start": start,
|
||||
"duration": duration,
|
||||
"groups": {}
|
||||
}
|
||||
|
||||
for group_name, asset_ids in self.asset_groups.items():
|
||||
group_forecast = []
|
||||
|
||||
for asset_id in asset_ids:
|
||||
try:
|
||||
schedule = await self.fm_client.get_schedule(
|
||||
asset_id=asset_id,
|
||||
start=start,
|
||||
duration=duration
|
||||
)
|
||||
if schedule:
|
||||
group_forecast.append({
|
||||
"asset_id": asset_id,
|
||||
"values": schedule.get("values", [])
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if group_forecast:
|
||||
# Sum values across all assets in group
|
||||
max_len = max(len(f["values"]) for f in group_forecast)
|
||||
summed = [0.0] * max_len
|
||||
for f in group_forecast:
|
||||
for i, v in enumerate(f["values"]):
|
||||
if i < max_len:
|
||||
summed[i] += v
|
||||
|
||||
forecast["groups"][group_name] = {
|
||||
"assets": len(group_forecast),
|
||||
"total_power_kw": [round(v, 2) for v in summed]
|
||||
}
|
||||
|
||||
return forecast
|
||||
|
||||
async def close(self):
|
||||
"""Close FlexMeasures connection."""
|
||||
await self.fm_client.close()
|
||||
|
||||
|
||||
async def main():
|
||||
"""Test S2 CEM."""
|
||||
cem = CariflexS2CEM()
|
||||
|
||||
try:
|
||||
# Connect
|
||||
n_sensors = await cem.connect()
|
||||
logger.info(f"Connected with {n_sensors} sensors")
|
||||
|
||||
# Get available flexibility
|
||||
flex = await cem.get_available_flexibility()
|
||||
logger.info(f"Available flexibility: {json.dumps(flex, indent=2)}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error: {e}")
|
||||
finally:
|
||||
await cem.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user