#!/usr/bin/env python3 """ Cariflex - Asset Synchronization Service Syncs EV charging assets between FlexMeasures and CitrineOS. Ensures consistency of: count, power, location, connector config. """ import json, logging, os, re from datetime import datetime, timezone import requests logging.basicConfig(level=logging.INFO) logger = logging.getLogger("cariflex-asset-sync") FM_HOST = os.getenv("FM_HOST", "https://cariflex.digitribe.fr") FM_EMAIL = os.getenv("FM_EMAIL", "admin@digitribe.fr") FM_PWD = os.getenv("FM_PWD", "Digitribe972") CITRINEOS_URL = os.getenv("CITRINEOS_URL", "http://cariflex-citrineos-server:8080") fm_session = None def fm_login(): global fm_session try: s = requests.Session(); s.verify = False r = s.get(f"{FM_HOST}/login", timeout=15) m = re.search(r"csrf_token[^>]*value=[\"\\']([^\"\\']+)", r.text) if m: r = s.post(f"{FM_HOST}/login", data={"email":FM_EMAIL,"password":FM_PWD,"csrf_token":m.group(1),"remember":"y"}, allow_redirects=True, timeout=15) if "dashboard" in r.url or r.status_code == 200: fm_session = s return True except Exception as e: logger.error(f"FM login: {e}") return False def get_fm_ev_assets(): """Get all EV charging assets from FlexMeasures.""" try: # Get assets of type 'evse' or similar r = fm_session.get(f"{FM_HOST}/api/v3_0/generic_assets", timeout=30) if r.status_code == 200: assets = r.json() ev_assets = [a for a in assets if 'ev' in a.get('name', '').lower() or 'charge' in a.get('name', '').lower()] return ev_assets except Exception as e: logger.error(f"FM assets: {e}") return [] def get_fm_ev_sensors(): """Get all EV-related sensors from FlexMeasures.""" try: r = fm_session.get(f"{FM_HOST}/api/v3_0/sensors", timeout=30) if r.status_code == 200: sensors = r.json() ev_sensors = [s for s in sensors if s.get('id') in range(61, 71)] return ev_sensors except Exception as e: logger.error(f"FM sensors: {e}") return [] def get_citrineos_charge_points(): """Get all charge points from CitrineOS.""" try: r = requests.get(f"{CITRINEOS_URL}/api/v1/ocpp-charge-points", timeout=10) if r.status_code == 200: return r.json() except Exception as e: logger.error(f"CitrineOS CPs: {e}") return [] def create_fm_ev_asset(cp_id, connector_id, max_power_kw, lat, lon): """Create an EV asset in FlexMeasures matching a CitrineOS charge point.""" try: # Create asset asset_data = { "name": f"EVSE_{cp_id}", "asset_type_name": "evse", "latitude": lat, "longitude": lon, "attributes": { "charge_point_id": cp_id, "connector_id": connector_id, "max_power_kw": max_power_kw, "ocpp_version": "2.0.1", "source": "citrineos" } } r = fm_session.post(f"{FM_HOST}/api/v3_0/generic_assets", json=asset_data, timeout=30) if r.status_code == 201: asset = r.json() logger.info(f"Created FM asset: {asset['name']} (id={asset['id']})") # Create sensor for this asset sensor_data = { "name": f"evse_{cp_id}_power", "unit": "kW", "event_resolution": "PT15M", "generic_asset_id": asset["id"], "attributes": { "charge_point_id": cp_id, "connector_id": connector_id, "max_power_kw": max_power_kw } } r2 = fm_session.post(f"{FM_HOST}/api/v3_0/sensors", json=sensor_data, timeout=30) if r2.status_code == 201: sensor = r2.json() logger.info(f"Created FM sensor: {sensor['name']} (id={sensor['id']})") return asset, sensor except Exception as e: logger.error(f"Create FM asset: {e}") return None, None def sync_assets(): """Synchronize assets between FlexMeasures and CitrineOS.""" logger.info("Starting asset synchronization...") fm_assets = get_fm_ev_assets() fm_sensors = get_fm_ev_sensors() cps = get_citrineos_charge_points() logger.info(f"FM assets: {len(fm_assets)}, FM sensors: {len(fm_sensors)}, CPs: {len(cps)}") # Build mapping fm_cp_ids = set() for asset in fm_assets: cp_id = asset.get("attributes", {}).get("charge_point_id") if cp_id: fm_cp_ids.add(cp_id) citrineos_cp_ids = set() for cp in cps: cp_id = cp.get("id") if cp_id: citrineos_cp_ids.add(cp_id) # Find mismatches only_in_fm = fm_cp_ids - citrineos_cp_ids only_in_citrineos = citrineos_cp_ids - fm_cp_ids in_both = fm_cp_ids & citrineos_cp_ids logger.info(f"In both: {len(in_both)}, Only FM: {len(only_in_fm)}, Only CitrineOS: {len(only_in_citrineos)}") # Report mismatches if only_in_fm: logger.warning(f"Assets only in FM: {only_in_fm}") if only_in_citrineos: logger.warning(f"CPs only in CitrineOS: {only_in_citrineos}") # Sync: create FM assets for CPs that don't have them for cp in cps: cp_id = cp.get("id") if cp_id not in fm_cp_ids: connector_id = cp.get("connectorId", 1) max_power = cp.get("maxPowerKw", 22) lat = cp.get("latitude", 14.6415) # Martinique default lon = cp.get("longitude", -61.0242) create_fm_ev_asset(cp_id, connector_id, max_power, lat, lon) return { "fm_assets": len(fm_assets), "fm_sensors": len(fm_sensors), "citrineos_cps": len(cps), "synced": len(in_both), "only_fm": len(only_in_fm), "only_citrineos": len(only_in_citrineos) } def main(): logger.info("Cariflex Asset Synchronization starting") fm_login() result = sync_assets() logger.info(f"Sync result: {result}") if __name__ == "__main__": main()