diff --git a/scripts/ev_charging_simulator.py b/scripts/ev_charging_simulator.py new file mode 100644 index 0000000..08e47d8 --- /dev/null +++ b/scripts/ev_charging_simulator.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python3 +""" +Cariflex - EV Charging Session Simulator +Simulates realistic charging sessions synchronized across FM, CitrineOS, and Grafana. +""" + +import asyncio, json, logging, os, random, re +from datetime import datetime, timezone, timedelta +import requests + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("cariflex-ev-simulator") + +FM_HOST = os.getenv("FM_HOST", "https://cariflex.digitribe.fr") +FM_EMAIL = os.getenv("FM_EMAIL", "admin@digitribe.fr") +FM_PWD = os.getenv("FM_PWD", "Digitribe972") +CITRINEOS_URL = os.getenv("CITRINEOS_URL", "http://cariflex-citrineos-server:8080") + +# EVSE configuration: sensor_id -> (cp_id, max_power_kw, location) +EVSE_CONFIG = { + 61: {"cp_id": "CP001", "max_power": 22, "location": "Fort-de-France Centre"}, + 62: {"cp_id": "CP002", "max_power": 22, "location": "Lamentin"}, + 63: {"cp_id": "CP003", "max_power": 22, "location": "Sainte-Marie"}, + 64: {"cp_id": "CP004", "max_power": 22, "location": "Ducos"}, + 65: {"cp_id": "CP005", "max_power": 22, "location": "Le Robert"}, + 66: {"cp_id": "CP006", "max_power": 11, "location": "Fort-de-France Sud"}, + 67: {"cp_id": "CP007", "max_power": 11, "location": "Lamentin Nord"}, + 68: {"cp_id": "CP008", "max_power": 11, "location": "Sainte-Marie Nord"}, + 69: {"cp_id": "CP009", "max_power": 11, "location": "Ducos Sud"}, + 70: {"cp_id": "CP010", "max_power": 11, "location": "Le Robert Nord"}, +} + +# Session state for each EVSE +sessions = {} + + +class ChargingSession: + """Represents a single EV charging session.""" + + def __init__(self, sensor_id, cp_id, max_power, location): + self.sensor_id = sensor_id + self.cp_id = cp_id + self.max_power = max_power + self.location = location + self.session_id = f"SESS-{cp_id}-{datetime.now().strftime('%Y%m%d%H%M%S')}" + self.vehicle_id = f"VEH-{random.randint(1000, 9999)}" + self.start_time = datetime.now(timezone.utc) + self.soc_start = random.uniform(10, 40) # Start SOC 10-40% + self.soc_target = random.uniform(70, 90) # Target SOC 70-90% + self.soc_current = self.soc_start + self.energy_capacity = random.choice([40, 50, 60, 75, 100]) # kWh battery + self.status = "Charging" # Charging, Paused, Completed + self.current_power = 0.0 + self.total_energy = 0.0 + + def update(self, price, load_control): + """Update session state based on price and load control.""" + if self.status == "Completed": + return False + + # Calculate power based on price and load control + if price > 150: # Very expensive - pause charging + self.current_power = 0 + self.status = "Paused" + elif price > 100: # Expensive - reduce power + self.current_power = self.max_power * 0.3 * (1 - load_control) + self.status = "Charging" + elif price > 50: # Medium - normal charging + self.current_power = self.max_power * 0.7 * (1 - load_control) + self.status = "Charging" + else: # Cheap - full power + self.current_power = self.max_power * (1 - load_control) + self.status = "Charging" + + # Update SOC + if self.current_power > 0: + energy_added = self.current_power * (5/60) # 5 min interval + self.total_energy += energy_added + soc_increase = (energy_added / self.energy_capacity) * 100 + self.soc_current = min(self.soc_current + soc_increase, self.soc_target) + + # Check if charging complete + if self.soc_current >= self.soc_target: + self.status = "Completed" + self.current_power = 0 + + return True + + def to_fm_data(self): + """Convert to FM sensor data format.""" + return { + "sensor_id": self.sensor_id, + "value": round(self.current_power, 2), + "unit": "kW", + "start": datetime.now(timezone.utc).isoformat(), + "duration": "PT5M", + } + + def to_citrineos_data(self): + """Convert to CitrineOS meter value format.""" + return { + "connectorId": 1, + "transactionId": self.session_id, + "meterValue": [{ + "timestamp": datetime.now(timezone.utc).isoformat(), + "sampledValue": [ + {"value": str(int(self.current_power * 1000)), "measurand": "Power.Active.Import", "unit": "W"}, + {"value": str(int(self.soc_current)), "measurand": "SoC", "unit": "%"}, + {"value": str(int(self.total_energy * 1000)), "measurand": "Energy.Active.Import.Register", "unit": "Wh"}, + ] + }] + } + + +fm_session = None + + +def fm_login(): + global fm_session + try: + s = requests.Session(); s.verify = False + r = s.get(f"{FM_HOST}/login") + m = re.search(r'csrf_token[^>]*value="([^"]+)"', r.text) + if m: + csrf = m.group(1) + r = s.post(f"{FM_HOST}/login", data={ + "email": FM_EMAIL, "password": FM_PWD, + "csrf_token": csrf, "remember": "y" + }, allow_redirects=True) + if "dashboard" in r.url: + fm_session = s + return True + except Exception as e: + logger.error(f"FM login: {e}") + return False + + +def post_fm_data(sensor_id, value, unit, start, duration="PT5M"): + """Post sensor data to FlexMeasures.""" + if not fm_session: + return False + try: + r = fm_session.post( + f"{FM_HOST}/api/v3_0/sensors/{sensor_id}/data", + json={"values": [value], "start": start, "duration": duration, "unit": unit}, + timeout=30 + ) + return r.status_code in [200, 201, 202] + except Exception as e: + logger.error(f"FM post: {e}") + return False + + +def get_price(): + """Get current price from FM (sensor 84 - OpenADR).""" + if not fm_session: + return 80.0 + try: + r = fm_session.get(f"{FM_HOST}/api/v3_0/sensors/84/data?limit=1") + if r.status_code == 200: + data = r.json() + if data and len(data) > 0: + return float(data[-1].get("event_value", 80)) + except Exception as e: + logger.error(f"Price: {e}") + return 80.0 + + +def get_load_control(): + """Get current load control signal from FM (sensor 86).""" + if not fm_session: + return 0.0 + try: + r = fm_session.get(f"{FM_HOST}/api/v3_0/sensors/86/data?limit=1") + if r.status_code == 200: + data = r.json() + if data and len(data) > 0: + return float(data[-1].get("event_value", 0)) + except Exception as e: + logger.error(f"Load control: {e}") + return 0.0 + + +async def simulate_sessions(): + """Main simulation loop.""" + global sessions + + logger.info("EV Charging Session Simulator starting") + fm_login() + + # Initialize sessions for each EVSE + for sensor_id, config in EVSE_CONFIG.items(): + # Randomly start with some sessions active + if random.random() < 0.6: # 60% chance of active session + sessions[sensor_id] = ChargingSession( + sensor_id, config["cp_id"], config["max_power"], config["location"] + ) + logger.info(f"Started session on {config['cp_id']} ({config['location']})") + + logger.info(f"Initial sessions: {len(sessions)}") + + while True: + try: + price = get_price() + load_control = get_load_control() + + active_count = 0 + completed_sessions = [] + + for sensor_id, session in sessions.items(): + # Update session + session.update(price, load_control) + + if session.status in ["Charging", "Paused"]: + active_count += 1 + + # Post to FM + data = session.to_fm_data() + post_fm_data(data["sensor_id"], data["value"], data["unit"], data["start"]) + + # Check if completed + if session.status == "Completed": + completed_sessions.append(sensor_id) + logger.info(f"Session completed: {session.cp_id} - SOC: {session.soc_current:.1f}%, Energy: {session.total_energy:.1f} kWh") + + # Remove completed sessions + for sensor_id in completed_sessions: + del sessions[sensor_id] + + # Start new sessions for idle EVSEs + for sensor_id, config in EVSE_CONFIG.items(): + if sensor_id not in sessions and random.random() < 0.1: # 10% chance per cycle + sessions[sensor_id] = ChargingSession( + sensor_id, config["cp_id"], config["max_power"], config["location"] + ) + logger.info(f"New session on {config['cp_id']} - Vehicle: {sessions[sensor_id].vehicle_id}") + + # Log status + charging = sum(1 for s in sessions.values() if s.status == "Charging") + paused = sum(1 for s in sessions.values() if s.status == "Paused") + logger.info(f"Sessions: {len(sessions)} total, {charging} charging, {paused} paused | Price: {price} EUR/MWh | Load control: {load_control}") + + except Exception as e: + logger.error(f"Simulation error: {e}") + fm_login() + + await asyncio.sleep(300) # 5 minutes + + +if __name__ == "__main__": + asyncio.run(simulate_sessions())