From ce67b8e9f6c2ef5ee0846faf1d6aa799e12c29cd Mon Sep 17 00:00:00 2001 From: Eric F Date: Wed, 10 Jun 2026 13:33:56 -0400 Subject: [PATCH] CitrineOS deployment + Asset sync + Scheduling service + Traefik integration --- config/docker-compose-citrineos.yml | 29 ++-- scripts/asset_sync.py | 185 ++++++++++++++++++++++ scripts/fm_scheduling_service.py | 235 ++++++++++++++++++++++++++++ 3 files changed, 439 insertions(+), 10 deletions(-) create mode 100644 scripts/asset_sync.py create mode 100644 scripts/fm_scheduling_service.py diff --git a/config/docker-compose-citrineos.yml b/config/docker-compose-citrineos.yml index d976296..133f947 100644 --- a/config/docker-compose-citrineos.yml +++ b/config/docker-compose-citrineos.yml @@ -24,6 +24,12 @@ services: condition: service_healthy ports: - "8081:8080" + labels: + - "traefik.enable=true" + - "traefik.http.routers.citrineos.rule=Host(`citrineos.digitribe.fr`)" + - "traefik.http.routers.citrineos.entrypoints=websecure" + - "traefik.http.routers.citrineos.tls.certresolver=letsencrypt" + - "traefik.http.services.citrineos.loadbalancer.server.port=8080" volumes: - citrineos-data:/data healthcheck: @@ -32,9 +38,8 @@ services: timeout: 10s retries: 5 networks: - cariflex-internal: - aliases: - - citrineos-server + - traefik-public + - cariflex-internal cariflex-citrineos-db: image: postgis/postgis:16-3.5 @@ -52,9 +57,7 @@ services: timeout: 10s retries: 5 networks: - cariflex-internal: - aliases: - - citrineos-db + - cariflex-internal cariflex-amqp: image: rabbitmq:3-management @@ -63,6 +66,12 @@ services: environment: RABBITMQ_DEFAULT_USER: guest RABBITMQ_DEFAULT_PASS: guest + labels: + - "traefik.enable=true" + - "traefik.http.routers.rabbitmq.rule=Host(`amqp.digitribe.fr`)" + - "traefik.http.routers.rabbitmq.entrypoints=websecure" + - "traefik.http.routers.rabbitmq.tls.certresolver=letsencrypt" + - "traefik.http.services.rabbitmq.loadbalancer.server.port=15672" volumes: - citrineos-amqp-data:/var/lib/rabbitmq healthcheck: @@ -72,10 +81,8 @@ services: retries: 10 start_period: 30s networks: - cariflex-internal: - aliases: - - amqp-broker - - cariflex-amqp + - traefik-public + - cariflex-internal volumes: citrineos-data: @@ -86,6 +93,8 @@ volumes: driver: local networks: + traefik-public: + external: true cariflex-internal: name: config_cariflex-internal external: true diff --git a/scripts/asset_sync.py b/scripts/asset_sync.py new file mode 100644 index 0000000..a28f323 --- /dev/null +++ b/scripts/asset_sync.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +""" +Cariflex - Asset Synchronization Service +Syncs EV charging assets between FlexMeasures and CitrineOS. +Ensures consistency of: count, power, location, connector config. +""" + +import json, logging, os, re +from datetime import datetime, timezone +import requests + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("cariflex-asset-sync") + +FM_HOST = os.getenv("FM_HOST", "https://cariflex.digitribe.fr") +FM_EMAIL = os.getenv("FM_EMAIL", "admin@digitribe.fr") +FM_PWD = os.getenv("FM_PWD", "Digitribe972") +CITRINEOS_URL = os.getenv("CITRINEOS_URL", "http://cariflex-citrineos-server:8080") + +fm_session = None + + +def fm_login(): + global fm_session + try: + s = requests.Session(); s.verify = False + r = s.get(f"{FM_HOST}/login", timeout=15) + m = re.search(r"csrf_token[^>]*value=[\"\\']([^\"\\']+)", r.text) + if m: + r = s.post(f"{FM_HOST}/login", data={"email":FM_EMAIL,"password":FM_PWD,"csrf_token":m.group(1),"remember":"y"}, allow_redirects=True, timeout=15) + if "dashboard" in r.url or r.status_code == 200: + fm_session = s + return True + except Exception as e: + logger.error(f"FM login: {e}") + return False + + +def get_fm_ev_assets(): + """Get all EV charging assets from FlexMeasures.""" + try: + # Get assets of type 'evse' or similar + r = fm_session.get(f"{FM_HOST}/api/v3_0/generic_assets", timeout=30) + if r.status_code == 200: + assets = r.json() + ev_assets = [a for a in assets if 'ev' in a.get('name', '').lower() or 'charge' in a.get('name', '').lower()] + return ev_assets + except Exception as e: + logger.error(f"FM assets: {e}") + return [] + + +def get_fm_ev_sensors(): + """Get all EV-related sensors from FlexMeasures.""" + try: + r = fm_session.get(f"{FM_HOST}/api/v3_0/sensors", timeout=30) + if r.status_code == 200: + sensors = r.json() + ev_sensors = [s for s in sensors if s.get('id') in range(61, 71)] + return ev_sensors + except Exception as e: + logger.error(f"FM sensors: {e}") + return [] + + +def get_citrineos_charge_points(): + """Get all charge points from CitrineOS.""" + try: + r = requests.get(f"{CITRINEOS_URL}/api/v1/ocpp-charge-points", timeout=10) + if r.status_code == 200: + return r.json() + except Exception as e: + logger.error(f"CitrineOS CPs: {e}") + return [] + + +def create_fm_ev_asset(cp_id, connector_id, max_power_kw, lat, lon): + """Create an EV asset in FlexMeasures matching a CitrineOS charge point.""" + try: + # Create asset + asset_data = { + "name": f"EVSE_{cp_id}", + "asset_type_name": "evse", + "latitude": lat, + "longitude": lon, + "attributes": { + "charge_point_id": cp_id, + "connector_id": connector_id, + "max_power_kw": max_power_kw, + "ocpp_version": "2.0.1", + "source": "citrineos" + } + } + r = fm_session.post(f"{FM_HOST}/api/v3_0/generic_assets", json=asset_data, timeout=30) + if r.status_code == 201: + asset = r.json() + logger.info(f"Created FM asset: {asset['name']} (id={asset['id']})") + + # Create sensor for this asset + sensor_data = { + "name": f"evse_{cp_id}_power", + "unit": "kW", + "event_resolution": "PT15M", + "generic_asset_id": asset["id"], + "attributes": { + "charge_point_id": cp_id, + "connector_id": connector_id, + "max_power_kw": max_power_kw + } + } + r2 = fm_session.post(f"{FM_HOST}/api/v3_0/sensors", json=sensor_data, timeout=30) + if r2.status_code == 201: + sensor = r2.json() + logger.info(f"Created FM sensor: {sensor['name']} (id={sensor['id']})") + return asset, sensor + except Exception as e: + logger.error(f"Create FM asset: {e}") + return None, None + + +def sync_assets(): + """Synchronize assets between FlexMeasures and CitrineOS.""" + logger.info("Starting asset synchronization...") + + fm_assets = get_fm_ev_assets() + fm_sensors = get_fm_ev_sensors() + cps = get_citrineos_charge_points() + + logger.info(f"FM assets: {len(fm_assets)}, FM sensors: {len(fm_sensors)}, CPs: {len(cps)}") + + # Build mapping + fm_cp_ids = set() + for asset in fm_assets: + cp_id = asset.get("attributes", {}).get("charge_point_id") + if cp_id: + fm_cp_ids.add(cp_id) + + citrineos_cp_ids = set() + for cp in cps: + cp_id = cp.get("id") + if cp_id: + citrineos_cp_ids.add(cp_id) + + # Find mismatches + only_in_fm = fm_cp_ids - citrineos_cp_ids + only_in_citrineos = citrineos_cp_ids - fm_cp_ids + in_both = fm_cp_ids & citrineos_cp_ids + + logger.info(f"In both: {len(in_both)}, Only FM: {len(only_in_fm)}, Only CitrineOS: {len(only_in_citrineos)}") + + # Report mismatches + if only_in_fm: + logger.warning(f"Assets only in FM: {only_in_fm}") + if only_in_citrineos: + logger.warning(f"CPs only in CitrineOS: {only_in_citrineos}") + + # Sync: create FM assets for CPs that don't have them + for cp in cps: + cp_id = cp.get("id") + if cp_id not in fm_cp_ids: + connector_id = cp.get("connectorId", 1) + max_power = cp.get("maxPowerKw", 22) + lat = cp.get("latitude", 14.6415) # Martinique default + lon = cp.get("longitude", -61.0242) + create_fm_ev_asset(cp_id, connector_id, max_power, lat, lon) + + return { + "fm_assets": len(fm_assets), + "fm_sensors": len(fm_sensors), + "citrineos_cps": len(cps), + "synced": len(in_both), + "only_fm": len(only_in_fm), + "only_citrineos": len(only_in_citrineos) + } + + +def main(): + logger.info("Cariflex Asset Synchronization starting") + fm_login() + result = sync_assets() + logger.info(f"Sync result: {result}") + + +if __name__ == "__main__": + main() diff --git a/scripts/fm_scheduling_service.py b/scripts/fm_scheduling_service.py new file mode 100644 index 0000000..eed2599 --- /dev/null +++ b/scripts/fm_scheduling_service.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 +""" +Cariflex - FlexMeasures Scheduling Service +Creates EV charging schedules based on OpenADR price signals. +Sends schedules to CitrineOS via OCPP profiles. +""" + +import json, logging, os, re, sys +from datetime import datetime, timezone, timedelta +import requests + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("cariflex-scheduling") + +# Config +FM_HOST = os.getenv("FM_HOST", "https://cariflex.digitribe.fr") +FM_EMAIL = os.getenv("FM_EMAIL", "admin@digitribe.fr") +FM_PWD = os.getenv("FM_PWD", "Digitribe972") +CITRINEOS_URL = os.getenv("CITRINEOS_URL", "http://cariflex-citrineos-server:8080") + +# EV Charge Point -> FM SensorID -> OCPP connector +EVSE_CONFIG = { + # sensor_id: (charge_point_id, connector_id, max_power_kw) + 61: ("CP001", 1, 22), + 62: ("CP002", 1, 22), + 63: ("CP003", 1, 22), + 64: ("CP004", 1, 22), + 65: ("CP005", 1, 22), + 66: ("CP006", 1, 11), + 67: ("CP007", 1, 11), + 68: ("CP008", 1, 11), + 69: ("CP009", 1, 11), + 70: ("CP010", 1, 11), +} + +fm_session = None + + +def fm_login(): + global fm_session + try: + s = requests.Session(); s.verify = False + r = s.get(f"{FM_HOST}/login", timeout=15) + m = re.search(r"csrf_token[^>]*value=[\"\\']([^\"\\']+)", r.text) + if m: + r = s.post(f"{FM_HOST}/login", data={"email":FM_EMAIL,"password":FM_PWD,"csrf_token":m.group(1),"remember":"y"}, allow_redirects=True, timeout=15) + if "dashboard" in r.url or r.status_code == 200: + fm_session = s + return True + except Exception as e: + logger.error(f"FM login: {e}") + return False + + +def get_price_forecast(): + """Get latest price forecast from FM (sensor 84 - OpenADR prices).""" + if not fm_session: + return None + try: + r = fm_session.get(f"{FM_HOST}/api/v3_0/sensors/84/data?limit=48", timeout=30) + if r.status_code == 200: + return r.json() + except Exception as e: + logger.error(f"Price forecast: {e}") + return None + + +def get_load_control_signal(): + """Get latest load control signal from FM (sensor 86).""" + if not fm_session: + return None + try: + r = fm_session.get(f"{FM_HOST}/api/v3_0/sensors/86/data?limit=1", timeout=30) + if r.status_code == 200: + data = r.json() + if data and len(data) > 0: + return float(data[-1].get("event_value", 0)) + except Exception as e: + logger.error(f"Load control: {e}") + return 0 + + +def get_ev_soc(sensor_id): + """Get current SOC for an EV sensor.""" + try: + r = fm_session.get(f"{FM_HOST}/api/v3_0/sensors/{sensor_id}/data?limit=1", timeout=30) + if r.status_code == 200: + data = r.json() + if data and len(data) > 0: + return float(data[-1].get("event_value", 0)) + except Exception as e: + logger.error(f"EV SOC: {e}") + return 50.0 # Default 50% + + +def calculate_charging_schedule(prices, load_control, current_soc, max_power_kw): + """ + Calculate optimal charging schedule based on prices and load control. + + Returns list of (timestamp, power_kw) tuples. + """ + schedule = [] + now = datetime.now(timezone.utc) + + # Sort price periods by cheapest first + sorted_prices = sorted(prices, key=lambda x: x.get("event_value", 0)) + + # Load control factor: 0=normal, 0.5=reduce, 1=cut + lc_factor = 1.0 - load_control # 1.0 = full, 0.5 = half, 0 = cut + + # SOC target: charge to 80% during cheap hours + target_soc = 80 + soc_needed = max(0, target_soc - current_soc) + + if soc_needed <= 0: + return schedule # Already charged + + # Calculate how many hours needed at max power + hours_needed = (soc_needed * 0.6) / max_power_kw # Rough estimate (60% of 100kWh battery) + + # Select cheapest hours + selected_hours = sorted_prices[:int(hours_needed) + 1] + + for price_point in selected_hours: + ts = price_point.get("start", now.isoformat()) + price = price_point.get("event_value", 0) + + # Calculate power based on price and load control + if price < 50: # Cheap + power = max_power_kw * lc_factor + elif price < 100: # Medium + power = (max_power_kw * 0.5) * lc_factor + else: # Expensive + power = 0 # Don't charge during expensive hours + + if power > 0: + schedule.append((ts, power)) + + return schedule + + +def send_ocpp_charging_profile(charge_point_id, connector_id, power_limit_kw, start_time): + """Send charging profile to EVSE via CitrineOS (OCPP).""" + try: + payload = { + "connectorId": connector_id, + "csChargingProfiles": { + "chargingProfileId": 1, + "stackLevel": 0, + "chargingProfilePurpose": "TxDefaultProfile", + "chargingProfileKind": "Absolute", + "chargingSchedule": { + "startSchedule": start_time, + "chargingRateUnit": "W", + "chargingSchedulePeriod": [ + { + "startPeriod": 0, + "limit": int(power_limit_kw * 1000) # kW -> W + }, + { + "startPeriod": 3600, # After 1 hour + "limit": int(power_limit_kw * 1000) + } + ] + } + } + } + + # CitrineOS uses OCPP 2.0.1 API + r = requests.put( + f"{CITRINEOS_URL}/api/v1/ocpp-charge-points/{charge_point_id}/set-charging-profile", + json=payload, timeout=10 + ) + + logger.info(f"OCPP profile -> {charge_point_id}: {power_limit_kw}kW @ {start_time} -> {r.status_code}") + return r.status_code in [200, 201, 204] + except Exception as e: + logger.error(f"OCPP profile error: {e}") + return False + + +def run_scheduling(): + """Main scheduling loop: OpenADR prices → FM schedules → OCPP profiles.""" + logger.info("Running scheduling cycle...") + + # 1. Get price forecast from OpenADR + prices = get_price_forecast() + if not prices: + logger.warning("No price data available") + return + + logger.info(f"Price forecast: {len(prices)} periods available") + + # 2. Get load control signal + load_control = get_load_control_signal() + logger.info(f"Load control signal: {load_control}") + + # 3. For each EVSE, calculate optimal schedule + for sensor_id, (cp_id, connector_id, max_power) in EVSE_CONFIG.items(): + # Get current SOC + current_soc = get_ev_soc(sensor_id) + + # Calculate schedule + schedule = calculate_charging_schedule(prices, load_control, current_soc, max_power) + + if schedule: + # Send to CitrineOS via OCPP + for ts, power in schedule: + send_ocpp_charging_profile(cp_id, connector_id, power, ts) + logger.info(f"Schedule {cp_id}: {power} kW at {ts}") + else: + logger.info(f"No charging needed for {cp_id} (SOC: {current_soc}%)") + + +async def main(): + logger.info("Cariflex Scheduling Service starting") + logger.info(f"FM: {FM_HOST}") + logger.info(f"CitrineOS: {CITRINEOS_URL}") + logger.info(f"EVSEs: {len(EVSE_CONFIG)}") + + fm_login() + + # Run scheduling every 5 minutes + while True: + try: + run_scheduling() + except Exception as e: + logger.error(f"Scheduling error: {e}") + fm_login() + + await asyncio.sleep(300) # 5 minutes + + +if __name__ == "__main__": + asyncio.run(main())