186 lines
6.1 KiB
Python
186 lines
6.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Cariflex - Asset Synchronization Service
|
|
Syncs EV charging assets between FlexMeasures and CitrineOS.
|
|
Ensures consistency of: count, power, location, connector config.
|
|
"""
|
|
|
|
import json, logging, os, re
|
|
from datetime import datetime, timezone
|
|
import requests
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger("cariflex-asset-sync")
|
|
|
|
FM_HOST = os.getenv("FM_HOST", "https://cariflex.digitribe.fr")
|
|
FM_EMAIL = os.getenv("FM_EMAIL", "admin@digitribe.fr")
|
|
FM_PWD = os.getenv("FM_PWD", "Digitribe972")
|
|
CITRINEOS_URL = os.getenv("CITRINEOS_URL", "http://cariflex-citrineos-server:8080")
|
|
|
|
fm_session = None
|
|
|
|
|
|
def fm_login():
|
|
global fm_session
|
|
try:
|
|
s = requests.Session(); s.verify = False
|
|
r = s.get(f"{FM_HOST}/login", timeout=15)
|
|
m = re.search(r"csrf_token[^>]*value=[\"\\']([^\"\\']+)", r.text)
|
|
if m:
|
|
r = s.post(f"{FM_HOST}/login", data={"email":FM_EMAIL,"password":FM_PWD,"csrf_token":m.group(1),"remember":"y"}, allow_redirects=True, timeout=15)
|
|
if "dashboard" in r.url or r.status_code == 200:
|
|
fm_session = s
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"FM login: {e}")
|
|
return False
|
|
|
|
|
|
def get_fm_ev_assets():
|
|
"""Get all EV charging assets from FlexMeasures."""
|
|
try:
|
|
# Get assets of type 'evse' or similar
|
|
r = fm_session.get(f"{FM_HOST}/api/v3_0/generic_assets", timeout=30)
|
|
if r.status_code == 200:
|
|
assets = r.json()
|
|
ev_assets = [a for a in assets if 'ev' in a.get('name', '').lower() or 'charge' in a.get('name', '').lower()]
|
|
return ev_assets
|
|
except Exception as e:
|
|
logger.error(f"FM assets: {e}")
|
|
return []
|
|
|
|
|
|
def get_fm_ev_sensors():
|
|
"""Get all EV-related sensors from FlexMeasures."""
|
|
try:
|
|
r = fm_session.get(f"{FM_HOST}/api/v3_0/sensors", timeout=30)
|
|
if r.status_code == 200:
|
|
sensors = r.json()
|
|
ev_sensors = [s for s in sensors if s.get('id') in range(61, 71)]
|
|
return ev_sensors
|
|
except Exception as e:
|
|
logger.error(f"FM sensors: {e}")
|
|
return []
|
|
|
|
|
|
def get_citrineos_charge_points():
|
|
"""Get all charge points from CitrineOS."""
|
|
try:
|
|
r = requests.get(f"{CITRINEOS_URL}/api/v1/ocpp-charge-points", timeout=10)
|
|
if r.status_code == 200:
|
|
return r.json()
|
|
except Exception as e:
|
|
logger.error(f"CitrineOS CPs: {e}")
|
|
return []
|
|
|
|
|
|
def create_fm_ev_asset(cp_id, connector_id, max_power_kw, lat, lon):
|
|
"""Create an EV asset in FlexMeasures matching a CitrineOS charge point."""
|
|
try:
|
|
# Create asset
|
|
asset_data = {
|
|
"name": f"EVSE_{cp_id}",
|
|
"asset_type_name": "evse",
|
|
"latitude": lat,
|
|
"longitude": lon,
|
|
"attributes": {
|
|
"charge_point_id": cp_id,
|
|
"connector_id": connector_id,
|
|
"max_power_kw": max_power_kw,
|
|
"ocpp_version": "2.0.1",
|
|
"source": "citrineos"
|
|
}
|
|
}
|
|
r = fm_session.post(f"{FM_HOST}/api/v3_0/generic_assets", json=asset_data, timeout=30)
|
|
if r.status_code == 201:
|
|
asset = r.json()
|
|
logger.info(f"Created FM asset: {asset['name']} (id={asset['id']})")
|
|
|
|
# Create sensor for this asset
|
|
sensor_data = {
|
|
"name": f"evse_{cp_id}_power",
|
|
"unit": "kW",
|
|
"event_resolution": "PT15M",
|
|
"generic_asset_id": asset["id"],
|
|
"attributes": {
|
|
"charge_point_id": cp_id,
|
|
"connector_id": connector_id,
|
|
"max_power_kw": max_power_kw
|
|
}
|
|
}
|
|
r2 = fm_session.post(f"{FM_HOST}/api/v3_0/sensors", json=sensor_data, timeout=30)
|
|
if r2.status_code == 201:
|
|
sensor = r2.json()
|
|
logger.info(f"Created FM sensor: {sensor['name']} (id={sensor['id']})")
|
|
return asset, sensor
|
|
except Exception as e:
|
|
logger.error(f"Create FM asset: {e}")
|
|
return None, None
|
|
|
|
|
|
def sync_assets():
|
|
"""Synchronize assets between FlexMeasures and CitrineOS."""
|
|
logger.info("Starting asset synchronization...")
|
|
|
|
fm_assets = get_fm_ev_assets()
|
|
fm_sensors = get_fm_ev_sensors()
|
|
cps = get_citrineos_charge_points()
|
|
|
|
logger.info(f"FM assets: {len(fm_assets)}, FM sensors: {len(fm_sensors)}, CPs: {len(cps)}")
|
|
|
|
# Build mapping
|
|
fm_cp_ids = set()
|
|
for asset in fm_assets:
|
|
cp_id = asset.get("attributes", {}).get("charge_point_id")
|
|
if cp_id:
|
|
fm_cp_ids.add(cp_id)
|
|
|
|
citrineos_cp_ids = set()
|
|
for cp in cps:
|
|
cp_id = cp.get("id")
|
|
if cp_id:
|
|
citrineos_cp_ids.add(cp_id)
|
|
|
|
# Find mismatches
|
|
only_in_fm = fm_cp_ids - citrineos_cp_ids
|
|
only_in_citrineos = citrineos_cp_ids - fm_cp_ids
|
|
in_both = fm_cp_ids & citrineos_cp_ids
|
|
|
|
logger.info(f"In both: {len(in_both)}, Only FM: {len(only_in_fm)}, Only CitrineOS: {len(only_in_citrineos)}")
|
|
|
|
# Report mismatches
|
|
if only_in_fm:
|
|
logger.warning(f"Assets only in FM: {only_in_fm}")
|
|
if only_in_citrineos:
|
|
logger.warning(f"CPs only in CitrineOS: {only_in_citrineos}")
|
|
|
|
# Sync: create FM assets for CPs that don't have them
|
|
for cp in cps:
|
|
cp_id = cp.get("id")
|
|
if cp_id not in fm_cp_ids:
|
|
connector_id = cp.get("connectorId", 1)
|
|
max_power = cp.get("maxPowerKw", 22)
|
|
lat = cp.get("latitude", 14.6415) # Martinique default
|
|
lon = cp.get("longitude", -61.0242)
|
|
create_fm_ev_asset(cp_id, connector_id, max_power, lat, lon)
|
|
|
|
return {
|
|
"fm_assets": len(fm_assets),
|
|
"fm_sensors": len(fm_sensors),
|
|
"citrineos_cps": len(cps),
|
|
"synced": len(in_both),
|
|
"only_fm": len(only_in_fm),
|
|
"only_citrineos": len(only_in_citrineos)
|
|
}
|
|
|
|
|
|
def main():
|
|
logger.info("Cariflex Asset Synchronization starting")
|
|
fm_login()
|
|
result = sync_assets()
|
|
logger.info(f"Sync result: {result}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|