#!/usr/bin/env python3 """ Cariflex - S2 Protocol Service Converts S2 messages to FlexMeasures scheduling commands. Supports FRBC (Flexibility Resource Bank Control) for batteries/EVs. """ import asyncio, json, logging, os, re from datetime import datetime, timezone, timedelta from typing import Optional import requests # S2 Protocol imports from s2python.s2_parser import S2Parser from s2python.common import FRBCInstruction, FRBCSystemDescription, FRBCActuatorStatus from s2python.common import FRBCStorageStatus, FRBCUsageForecast, FRBCFillLevelTargetProfile logging.basicConfig(level=logging.INFO) logger = logging.getLogger("cariflex-s2") FM_HOST = os.getenv("FM_HOST", "https://cariflex.digitribe.fr") FM_EMAIL = os.getenv("FM_EMAIL", "admin@digitribe.fr") FM_PWD = os.getenv("FM_PWD", "Digitribe972") # S2 Resource mapping: S2 resource_id -> FM sensor_id S2_RESOURCE_MAP = { "battery_01": 51, # Bat_01 "battery_02": 52, # Bat_02 "battery_03": 53, # Bat_03 "battery_04": 54, # Bat_04 "battery_05": 55, # Bat_05 "ev_01": 61, # EV_01 "ev_02": 62, # EV_02 "ev_03": 63, # EV_03 "ev_04": 64, # EV_04 "ev_05": 65, # EV_05 } fm_session = None def fm_login(): global fm_session try: s = requests.Session(); s.verify = False r = s.get(f"{FM_HOST}/login", timeout=15) m = re.search(r"csrf_token[^>]*value=[\"\\']([^\"\\']+)", r.text) if m: r = s.post(f"{FM_HOST}/login", data={"email":FM_EMAIL,"password":FM_PWD,"csrf_token":m.group(1),"remember":"y"}, allow_redirects=True, timeout=15) if "dashboard" in r.url or r.status_code == 200: fm_session = s return True except Exception as e: logger.error(f"FM login: {e}") return False def post_sensor_data(sensor_id, value, unit, start, duration="PT1H"): if not fm_session: return False try: r = fm_session.post(f"{FM_HOST}/api/v3_0/sensors/{sensor_id}/data", json={"values":[value],"start":start,"duration":duration,"unit":unit}, timeout=30) return r.status_code in [200,201,202] except Exception as e: logger.error(f"Post: {e}") return False def handle_frbc_instruction(instruction: FRBCInstruction): """Handle FRBC instruction from S2 protocol.""" logger.info(f"FRBC Instruction received: {instruction}") resource_id = getattr(instruction, 'resource_id', None) if not resource_id: logger.warning("No resource_id in FRBC instruction") return fm_sensor_id = S2_RESOURCE_MAP.get(resource_id) if not fm_sensor_id: logger.warning(f"Unknown S2 resource: {resource_id}") return # Extract power setpoint from instruction power_setpoint = getattr(instruction, 'power_setpoint', None) if power_setpoint is None: logger.warning("No power_setpoint in FRBC instruction") return # Convert to FM units (kW) power_kw = float(power_setpoint) now = datetime.now(timezone.utc) success = post_sensor_data(fm_sensor_id, power_kw, "kW", now.isoformat()) logger.info(f"S2 FRBC -> FM sensor {fm_sensor_id}: {power_kw} kW: {'OK' if success else 'FAIL'}") def handle_s2_message(raw_message: str): """Parse and handle an S2 message.""" try: parser = S2Parser() message = parser.parse(raw_message) if isinstance(message, FRBCInstruction): handle_frbc_instruction(message) elif isinstance(message, FRBCSystemDescription): logger.info(f"FRBC System Description: {message}") elif isinstance(message, FRBCActuatorStatus): logger.info(f"FRBC Actuator Status: {message}") else: logger.info(f"S2 message type: {type(message).__name__}") except Exception as e: logger.error(f"S2 message handling error: {e}") def generate_s2_system_description(): """Generate S2 System Description for Cariflex resources.""" resources = [] for s2_id, fm_id in S2_RESOURCE_MAP.items(): if fm_id <= 60: # Batteries resources.append({ "resource_id": s2_id, "resource_type": "battery", "min_power_kw": -50, "max_power_kw": 50, "capacity_kwh": 100, }) else: # EVs resources.append({ "resource_id": s2_id, "resource_type": "ev", "min_power_kw": -22, "max_power_kw": 22, "capacity_kwh": 60, }) return resources def main(): logger.info("Cariflex S2 Protocol Service starting") logger.info(f"FM: {FM_HOST}") logger.info(f"Resources: {len(S2_RESOURCE_MAP)} mapped") fm_login() # Generate S2 system description resources = generate_s2_system_description() logger.info(f"S2 System Description: {len(resources)} resources") # Main loop: listen for S2 messages (via OpenADR events) logger.info("S2 service ready, listening for instructions...") # For now, simulate S2 instructions from OpenADR load_control events # In production, this would listen on an S2 WebSocket/MQTT endpoint while True: try: # Check for new OpenADR events that contain S2 instructions if fm_session: r = fm_session.get(f"{FM_HOST}/api/v3_0/sensors/86/data", timeout=30) if r.status_code == 200: data = r.json() if data and len(data) > 0: latest = data[-1] value = latest.get("event_value", 0) if value > 0: # Convert load control to S2 FRBC instruction logger.info(f"Load control signal: {value}%") # TODO: Convert to proper S2 FRBC instruction except Exception as e: logger.error(f"S2 loop error: {e}") asyncio.run(asyncio.sleep(10)) if __name__ == "__main__": main()