#!/usr/bin/env python3 """ Cariflex - EV Charging Session Simulator Simulates realistic charging sessions synchronized across FM, CitrineOS, and Grafana. """ import asyncio, json, logging, os, random, re from datetime import datetime, timezone, timedelta import requests logging.basicConfig(level=logging.INFO) logger = logging.getLogger("cariflex-ev-simulator") FM_HOST = os.getenv("FM_HOST", "https://cariflex.digitribe.fr") FM_EMAIL = os.getenv("FM_EMAIL", "admin@digitribe.fr") FM_PWD = os.getenv("FM_PWD", "Digitribe972") CITRINEOS_URL = os.getenv("CITRINEOS_URL", "http://cariflex-citrineos-server:8080") # EVSE configuration: sensor_id -> (cp_id, max_power_kw, location) EVSE_CONFIG = { 61: {"cp_id": "CP001", "max_power": 22, "location": "Fort-de-France Centre"}, 62: {"cp_id": "CP002", "max_power": 22, "location": "Lamentin"}, 63: {"cp_id": "CP003", "max_power": 22, "location": "Sainte-Marie"}, 64: {"cp_id": "CP004", "max_power": 22, "location": "Ducos"}, 65: {"cp_id": "CP005", "max_power": 22, "location": "Le Robert"}, 66: {"cp_id": "CP006", "max_power": 11, "location": "Fort-de-France Sud"}, 67: {"cp_id": "CP007", "max_power": 11, "location": "Lamentin Nord"}, 68: {"cp_id": "CP008", "max_power": 11, "location": "Sainte-Marie Nord"}, 69: {"cp_id": "CP009", "max_power": 11, "location": "Ducos Sud"}, 70: {"cp_id": "CP010", "max_power": 11, "location": "Le Robert Nord"}, } # Session state for each EVSE sessions = {} class ChargingSession: """Represents a single EV charging session.""" def __init__(self, sensor_id, cp_id, max_power, location): self.sensor_id = sensor_id self.cp_id = cp_id self.max_power = max_power self.location = location self.session_id = f"SESS-{cp_id}-{datetime.now().strftime('%Y%m%d%H%M%S')}" self.vehicle_id = f"VEH-{random.randint(1000, 9999)}" self.start_time = datetime.now(timezone.utc) self.soc_start = random.uniform(10, 40) # Start SOC 10-40% self.soc_target = random.uniform(70, 90) # Target SOC 70-90% self.soc_current = self.soc_start self.energy_capacity = random.choice([40, 50, 60, 75, 100]) # kWh battery self.status = "Charging" # Charging, Paused, Completed self.current_power = 0.0 self.total_energy = 0.0 def update(self, price, load_control): """Update session state based on price and load control.""" if self.status == "Completed": return False # Calculate power based on price and load control if price > 150: # Very expensive - pause charging self.current_power = 0 self.status = "Paused" elif price > 100: # Expensive - reduce power self.current_power = self.max_power * 0.3 * (1 - load_control) self.status = "Charging" elif price > 50: # Medium - normal charging self.current_power = self.max_power * 0.7 * (1 - load_control) self.status = "Charging" else: # Cheap - full power self.current_power = self.max_power * (1 - load_control) self.status = "Charging" # Update SOC if self.current_power > 0: energy_added = self.current_power * (5/60) # 5 min interval self.total_energy += energy_added soc_increase = (energy_added / self.energy_capacity) * 100 self.soc_current = min(self.soc_current + soc_increase, self.soc_target) # Check if charging complete if self.soc_current >= self.soc_target: self.status = "Completed" self.current_power = 0 return True def to_fm_data(self): """Convert to FM sensor data format.""" return { "sensor_id": self.sensor_id, "value": round(self.current_power, 2), "unit": "kW", "start": datetime.now(timezone.utc).isoformat(), "duration": "PT5M", } def to_citrineos_data(self): """Convert to CitrineOS meter value format.""" return { "connectorId": 1, "transactionId": self.session_id, "meterValue": [{ "timestamp": datetime.now(timezone.utc).isoformat(), "sampledValue": [ {"value": str(int(self.current_power * 1000)), "measurand": "Power.Active.Import", "unit": "W"}, {"value": str(int(self.soc_current)), "measurand": "SoC", "unit": "%"}, {"value": str(int(self.total_energy * 1000)), "measurand": "Energy.Active.Import.Register", "unit": "Wh"}, ] }] } fm_session = None def fm_login(): global fm_session try: s = requests.Session(); s.verify = False r = s.get(f"{FM_HOST}/login") m = re.search(r'csrf_token[^>]*value="([^"]+)"', r.text) if m: csrf = m.group(1) r = s.post(f"{FM_HOST}/login", data={ "email": FM_EMAIL, "password": FM_PWD, "csrf_token": csrf, "remember": "y" }, allow_redirects=True) if "dashboard" in r.url: fm_session = s return True except Exception as e: logger.error(f"FM login: {e}") return False def post_fm_data(sensor_id, value, unit, start, duration="PT5M"): """Post sensor data to FlexMeasures.""" if not fm_session: return False try: r = fm_session.post( f"{FM_HOST}/api/v3_0/sensors/{sensor_id}/data", json={"values": [value], "start": start, "duration": duration, "unit": unit}, timeout=30 ) return r.status_code in [200, 201, 202] except Exception as e: logger.error(f"FM post: {e}") return False def get_price(): """Get current price from FM (sensor 84 - OpenADR).""" if not fm_session: return 80.0 try: r = fm_session.get(f"{FM_HOST}/api/v3_0/sensors/84/data?limit=1") if r.status_code == 200: data = r.json() if data and len(data) > 0: return float(data[-1].get("event_value", 80)) except Exception as e: logger.error(f"Price: {e}") return 80.0 def get_load_control(): """Get current load control signal from FM (sensor 86).""" if not fm_session: return 0.0 try: r = fm_session.get(f"{FM_HOST}/api/v3_0/sensors/86/data?limit=1") if r.status_code == 200: data = r.json() if data and len(data) > 0: return float(data[-1].get("event_value", 0)) except Exception as e: logger.error(f"Load control: {e}") return 0.0 async def simulate_sessions(): """Main simulation loop.""" global sessions logger.info("EV Charging Session Simulator starting") fm_login() # Initialize sessions for each EVSE for sensor_id, config in EVSE_CONFIG.items(): # Randomly start with some sessions active if random.random() < 0.6: # 60% chance of active session sessions[sensor_id] = ChargingSession( sensor_id, config["cp_id"], config["max_power"], config["location"] ) logger.info(f"Started session on {config['cp_id']} ({config['location']})") logger.info(f"Initial sessions: {len(sessions)}") while True: try: price = get_price() load_control = get_load_control() active_count = 0 completed_sessions = [] for sensor_id, session in sessions.items(): # Update session session.update(price, load_control) if session.status in ["Charging", "Paused"]: active_count += 1 # Post to FM data = session.to_fm_data() post_fm_data(data["sensor_id"], data["value"], data["unit"], data["start"]) # Check if completed if session.status == "Completed": completed_sessions.append(sensor_id) logger.info(f"Session completed: {session.cp_id} - SOC: {session.soc_current:.1f}%, Energy: {session.total_energy:.1f} kWh") # Remove completed sessions for sensor_id in completed_sessions: del sessions[sensor_id] # Start new sessions for idle EVSEs for sensor_id, config in EVSE_CONFIG.items(): if sensor_id not in sessions and random.random() < 0.1: # 10% chance per cycle sessions[sensor_id] = ChargingSession( sensor_id, config["cp_id"], config["max_power"], config["location"] ) logger.info(f"New session on {config['cp_id']} - Vehicle: {sessions[sensor_id].vehicle_id}") # Log status charging = sum(1 for s in sessions.values() if s.status == "Charging") paused = sum(1 for s in sessions.values() if s.status == "Paused") logger.info(f"Sessions: {len(sessions)} total, {charging} charging, {paused} paused | Price: {price} EUR/MWh | Load control: {load_control}") except Exception as e: logger.error(f"Simulation error: {e}") fm_login() await asyncio.sleep(300) # 5 minutes if __name__ == "__main__": asyncio.run(simulate_sessions())