Files
cariflex/scripts/asset_sync.py

186 lines
6.1 KiB
Python

#!/usr/bin/env python3
"""
Cariflex - Asset Synchronization Service
Syncs EV charging assets between FlexMeasures and CitrineOS.
Ensures consistency of: count, power, location, connector config.
"""
import json, logging, os, re
from datetime import datetime, timezone
import requests
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("cariflex-asset-sync")
FM_HOST = os.getenv("FM_HOST", "https://cariflex.digitribe.fr")
FM_EMAIL = os.getenv("FM_EMAIL", "admin@digitribe.fr")
FM_PWD = os.getenv("FM_PWD", "Digitribe972")
CITRINEOS_URL = os.getenv("CITRINEOS_URL", "http://cariflex-citrineos-server:8080")
fm_session = None
def fm_login():
global fm_session
try:
s = requests.Session(); s.verify = False
r = s.get(f"{FM_HOST}/login", timeout=15)
m = re.search(r"csrf_token[^>]*value=[\"\\']([^\"\\']+)", r.text)
if m:
r = s.post(f"{FM_HOST}/login", data={"email":FM_EMAIL,"password":FM_PWD,"csrf_token":m.group(1),"remember":"y"}, allow_redirects=True, timeout=15)
if "dashboard" in r.url or r.status_code == 200:
fm_session = s
return True
except Exception as e:
logger.error(f"FM login: {e}")
return False
def get_fm_ev_assets():
"""Get all EV charging assets from FlexMeasures."""
try:
# Get assets of type 'evse' or similar
r = fm_session.get(f"{FM_HOST}/api/v3_0/generic_assets", timeout=30)
if r.status_code == 200:
assets = r.json()
ev_assets = [a for a in assets if 'ev' in a.get('name', '').lower() or 'charge' in a.get('name', '').lower()]
return ev_assets
except Exception as e:
logger.error(f"FM assets: {e}")
return []
def get_fm_ev_sensors():
"""Get all EV-related sensors from FlexMeasures."""
try:
r = fm_session.get(f"{FM_HOST}/api/v3_0/sensors", timeout=30)
if r.status_code == 200:
sensors = r.json()
ev_sensors = [s for s in sensors if s.get('id') in range(61, 71)]
return ev_sensors
except Exception as e:
logger.error(f"FM sensors: {e}")
return []
def get_citrineos_charge_points():
"""Get all charge points from CitrineOS."""
try:
r = requests.get(f"{CITRINEOS_URL}/api/v1/ocpp-charge-points", timeout=10)
if r.status_code == 200:
return r.json()
except Exception as e:
logger.error(f"CitrineOS CPs: {e}")
return []
def create_fm_ev_asset(cp_id, connector_id, max_power_kw, lat, lon):
"""Create an EV asset in FlexMeasures matching a CitrineOS charge point."""
try:
# Create asset
asset_data = {
"name": f"EVSE_{cp_id}",
"asset_type_name": "evse",
"latitude": lat,
"longitude": lon,
"attributes": {
"charge_point_id": cp_id,
"connector_id": connector_id,
"max_power_kw": max_power_kw,
"ocpp_version": "2.0.1",
"source": "citrineos"
}
}
r = fm_session.post(f"{FM_HOST}/api/v3_0/generic_assets", json=asset_data, timeout=30)
if r.status_code == 201:
asset = r.json()
logger.info(f"Created FM asset: {asset['name']} (id={asset['id']})")
# Create sensor for this asset
sensor_data = {
"name": f"evse_{cp_id}_power",
"unit": "kW",
"event_resolution": "PT15M",
"generic_asset_id": asset["id"],
"attributes": {
"charge_point_id": cp_id,
"connector_id": connector_id,
"max_power_kw": max_power_kw
}
}
r2 = fm_session.post(f"{FM_HOST}/api/v3_0/sensors", json=sensor_data, timeout=30)
if r2.status_code == 201:
sensor = r2.json()
logger.info(f"Created FM sensor: {sensor['name']} (id={sensor['id']})")
return asset, sensor
except Exception as e:
logger.error(f"Create FM asset: {e}")
return None, None
def sync_assets():
"""Synchronize assets between FlexMeasures and CitrineOS."""
logger.info("Starting asset synchronization...")
fm_assets = get_fm_ev_assets()
fm_sensors = get_fm_ev_sensors()
cps = get_citrineos_charge_points()
logger.info(f"FM assets: {len(fm_assets)}, FM sensors: {len(fm_sensors)}, CPs: {len(cps)}")
# Build mapping
fm_cp_ids = set()
for asset in fm_assets:
cp_id = asset.get("attributes", {}).get("charge_point_id")
if cp_id:
fm_cp_ids.add(cp_id)
citrineos_cp_ids = set()
for cp in cps:
cp_id = cp.get("id")
if cp_id:
citrineos_cp_ids.add(cp_id)
# Find mismatches
only_in_fm = fm_cp_ids - citrineos_cp_ids
only_in_citrineos = citrineos_cp_ids - fm_cp_ids
in_both = fm_cp_ids & citrineos_cp_ids
logger.info(f"In both: {len(in_both)}, Only FM: {len(only_in_fm)}, Only CitrineOS: {len(only_in_citrineos)}")
# Report mismatches
if only_in_fm:
logger.warning(f"Assets only in FM: {only_in_fm}")
if only_in_citrineos:
logger.warning(f"CPs only in CitrineOS: {only_in_citrineos}")
# Sync: create FM assets for CPs that don't have them
for cp in cps:
cp_id = cp.get("id")
if cp_id not in fm_cp_ids:
connector_id = cp.get("connectorId", 1)
max_power = cp.get("maxPowerKw", 22)
lat = cp.get("latitude", 14.6415) # Martinique default
lon = cp.get("longitude", -61.0242)
create_fm_ev_asset(cp_id, connector_id, max_power, lat, lon)
return {
"fm_assets": len(fm_assets),
"fm_sensors": len(fm_sensors),
"citrineos_cps": len(cps),
"synced": len(in_both),
"only_fm": len(only_in_fm),
"only_citrineos": len(only_in_citrineos)
}
def main():
logger.info("Cariflex Asset Synchronization starting")
fm_login()
result = sync_assets()
logger.info(f"Sync result: {result}")
if __name__ == "__main__":
main()