#!/usr/bin/env python3 """ Smart City IoT Simulator — Martinique (14.6°N, 61.2°W) ======================================================= Publie vers MULTIPLES brokers MQTT + context brokers NGSI-LD. Brokers MQTT: - EMQX: emqx_emqx_1:1883 (sans auth) - Mosquitto: mosquitto-traefik:1883 (bunker/bunker) - BunkerM: bunkerm_bunkerm_1:1900 (TLS, bunker/bunker) - OpenRemote: openremote-manager-1:1883 (admin/Digitribe972) Context Brokers REST: - Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD) - Stellio: stellio-api-gateway:8080 (NGSI-LD) - FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings) Variables d'environnement: PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s) BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France) ENABLE_ORION=1 : activer Orion-LD (défaut: 1) ENABLE_STELLIO=1 : activer Stellio (défaut: 1) ENABLE_FROST=1 : activer FROST-Server (défaut: 1) """ import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse import paho.mqtt.client as mqtt import urllib.request, urllib.error from datetime import datetime, timezone from typing import Any # InfluxDB support import influxdb_client from influxdb_client.client.write_api import SYNCHRONOUS # ============================================================================= # Configuration des brokers MQTT # Configuration des brokers MQTT # Par défaut localhost (simulateur tourne sur l'hôte) EMQX_HOST = os.environ.get("EMQX_HOST", "localhost") EMQX_PORT = int(os.environ.get("EMQX_PORT", "11883")) MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "localhost") MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883")) BUNKERM_HOST = os.environ.get("BUNKERM_HOST", "mqtt.digitribe.fr") BUNKERM_PORT = int(os.environ.get("BUNKERM_PORT", "1900")) # ============================================================================= # Configuration # ============================================================================= BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091")) BASE_LON = float(os.environ.get("BASE_LON", "-61.2155")) INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "10")) ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1" ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1") == "1" ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1" ENABLE_OPENREMOTE = os.environ.get("ENABLE_OPENREMOTE", "1") == "1" OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin") OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "Digitribe972") OR_REALM = os.environ.get("OR_REALM", "smartcity") OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token FROST_URL = os.environ.get("FROST_URL", "http://frost_http-web-1:8080/FROST-Server/v1.1") # InfluxDB config ENABLE_INFLUX = os.environ.get("ENABLE_INFLUX", "1") == "1" INFLUX_URL = os.environ.get("INFLUX_URL", "http://digital-twin-influxdb:8086") INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe") INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "iot_data") INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-secret-admin-token") # Initialize InfluxDB client _influx_client = None _influx_write_api = None if ENABLE_INFLUX: try: _influx_client = influxdb_client.InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG) _influx_write_api = _influx_client.write_api(write_options=SYNCHRONOUS) print(f"[INFLUX] ✅ Connected to {INFLUX_URL}") except Exception as e: print(f"[INFLUX] ❌ Connection failed: {e}") SENSOR_COUNTS = { "traffic": int(os.environ.get("SENSOR_COUNT_traffic", "3")), "airquality": int(os.environ.get("SENSOR_COUNT_airquality", "2")), "parking": int(os.environ.get("SENSOR_COUNT_parking", "2")), "noise": int(os.environ.get("SENSOR_COUNT_noise", "1")), "weather": int(os.environ.get("SENSOR_COUNT_weather", "1")), "light": int(os.environ.get("SENSOR_COUNT_light", "1")), } # Si SENSOR_COUNT est défini, multiplier les counts de façon proportionnelle _total_default = sum(SENSOR_COUNTS.values()) if "SENSOR_COUNT" in os.environ: target = int(os.environ["SENSOR_COUNT"]) ratio = target / _total_default for k in SENSOR_COUNTS: SENSOR_COUNTS[k] = max(1, int(SENSOR_COUNTS[k] * ratio)) # ============================================================================= # Localisation des capteurs Martinique # ============================================================================= SENSOR_LOCATIONS: dict[str, list[dict]] = {} SENSOR_NAMES: dict[str, list[str]] = { "traffic": ["Carrefour Central", "Avenue des Caraïbes", "Boulevard Pasteur", "Rue des Flamboyants", "Place de la République"], "airquality": ["Quartier Bonde", "Port de Fort-de-France", "Château Denis", "Lamentin Aéroport", "Schoelcher Village"], "parking": ["Parking Rivière-Saleé", "Parking Cluny", "Parking Média", "Parking Grand-Camp", "Parking Dillon"], "noise": ["Rue des Arts", "Marché Central", "Université Fort-de-France", "Stade de Dillon", "Place du Champs de Mars"], "weather": ["Station Météo Lamentin", "Station Schoelcher", "Station Ajoupa-Bouillon", "Station Le François", "Station Le Robert"], "light": ["Eclairage Rue des Mouettes", "Candela Boulevard", "Lumiere Rue des Acacias", "Feux Signalisation Centre", "Eclairage Port"], } def _gen_locs(stype: str, count: int) -> list[dict]: locs = [] for i in range(count): lat = BASE_LAT + random.uniform(-0.05, 0.05) lon = BASE_LON + random.uniform(-0.05, 0.05) names = SENSOR_NAMES.get(stype, [stype]) locs.append({ "lat": round(lat, 6), "lon": round(lon, 6), "name": names[i % len(names)], }) return locs for stype, count in SENSOR_COUNTS.items(): SENSOR_LOCATIONS[stype] = _gen_locs(stype, count) # Ranges par type SENSOR_RANGES: dict[str, dict] = { "traffic": {"vehicle_count":(10,150),"average_speed_kmh":(10,80), "congestion_level":(0,5),"occupancy_percent":(0,100)}, "airquality": {"pm25_ugm3":(5,80),"pm10_ugm3":(10,150),"no2_ugm3":(5,60), "o3_ugm3":(20,120),"co_mgm3":(0.1,5.0), "temperature_celsius":(20,35),"humidity_percent":(40,95)}, "parking": {"total_spots":(50,500),"available_spots":(0,500), "occupancy_percent":(0,100),"turnover_per_hour":(5,50)}, "noise": {"noise_level_db":(40,95),"peak_db":(60,110)}, "weather": {"temperature_celsius":(22,34),"humidity_percent":(50,95), "wind_speed_kmh":(0,50),"pressure_hpa":(1005,1025), "rain_mm":(0,20),"uv_index":(0,11)}, "light": {"brightness_lux":(0,100000),"power_consumption_w":(0,500)}, } NOISE_CATEGORIES = ["quiet","moderate","loud","very_loud"] LIGHT_STATUSES = ["on","off","dimmed","auto"] # ============================================================================= # Capteurs déclarés # ============================================================================= SENSORS: dict[str, dict] = {} counter = 0 for stype, locs in SENSOR_LOCATIONS.items(): for loc in locs: sid = f"{stype}_{counter:03d}" SENSORS[sid] = {"type": stype, "lat": loc["lat"], "lon": loc["lon"], "name": loc["name"]} counter += 1 # ============================================================================= # Payload NGSI-LD pour Orion-LD / Stellio # ============================================================================= # Contextes NGSI-LD : core + Smart Data Models # https://smartdatamodels.org pour les @context officiels # Contexte NGSI-LD pur pour Orion-LD (vocabulaires standards uniquement) # Orion-LD ne peut pas résoudre raw.githubusercontent.com — utiliser uri.etsi.org uniquement ORION_CONTEXT = [ "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", ] # Mapping sensor type → Smart Data Model type NGSI-LD SMART_MODEL_MAPPING = { "airquality": "AirQualityObserved", "traffic": "TrafficFlowObserved", "parking": "OffStreetParking", "noise": "NoiseLevelObserved", "weather": "WeatherObserved", "light": "Device", } FROST_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"} # Cache FROST : éviter de recréer Thing/Datastream _frost_cache: dict[str, tuple[str, str]] = {} # (sid, field) -> (thing_id, ds_id) # Contexte NGSI-LD pur pour Stellio et Orion-LD (vocabulaires standards uniquement) # Stellio et Orion-LD embarquent le contexte core NGSI-LD : https://uri.etsi.org/ngsi-ld/ # On n'utilise PAS les vocabulaires smartdatamodels.org distants (inaccessibles depuis les containers) # Les types d'entité Smart Data Models (AirQualityObserved, etc.) sont reconnus par leur nom # Les propriétés spécifiques sont stockées telles quelles (vocabulaire libre) STELLIO_INLINE_CONTEXT = [ "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", ] def _ngsi_payload(sid: str, sensor: dict, context: list | dict = ORION_CONTEXT) -> dict: """Construit un payload NGSI-LD avec Smart Data Models officiels.""" stype = sensor["type"] model_type = SMART_MODEL_MAPPING.get(stype, "Device") now = datetime.now(timezone.utc).isoformat() # Attributs communs à tous les modèles payload = { "@context": context, "id": f"urn:ngsi-ld:{model_type}:{sid}", "type": model_type, "dateObserved": {"type": "Property", "value": now}, "location": {"type": "GeoProperty", "value": {"type": "Point", "coordinates": [sensor["lon"], sensor["lat"]]}}, "name": {"type": "Property", "value": sensor["name"]}, "batteryLevel": {"type": "Property", "value": random.randint(60, 100)}, } # Attributs spécifiques par type de modèle ranges = SENSOR_RANGES.get(stype, {}) props = {} for field, val_range in ranges.items(): if isinstance(val_range, tuple) and len(val_range) == 2: lo, hi = val_range if isinstance(lo, (int, float)): props[field] = {"type": "Property", "value": round(random.uniform(lo, hi), 1)} elif isinstance(val_range, list): props[field] = {"type": "Property", "value": random.choice(val_range)} # Mapping vers les noms d'attributs Smart Data Models if stype == "airquality": if "pm25_ugm3" in props: payload["NO2"] = props.pop("pm25_ugm3") # Simplifié if "pm10_ugm3" in props: payload["PM10"] = props.pop("pm10_ugm3") if "no2_ugm3" in props: payload["NO2"] = props.pop("no2_ugm3") if "o3_ugm3" in props: payload["O3"] = props.pop("o3_ugm3") if "co_mgm3" in props: payload["CO"] = props.pop("co_mgm3") if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius") if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent") elif stype == "traffic": if "vehicle_count" in props: payload["vehicleCount"] = props.pop("vehicle_count") if "average_speed_kmh" in props: payload["averageVehicleSpeed"] = props.pop("average_speed_kmh") if "congestion_level" in props: payload["congestion"] = props.pop("congestion_level") if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent") elif stype == "parking": if "available_spots" in props: payload["availableSpotNumber"] = props.pop("available_spots") if "total_spots" in props: payload["totalSpotNumber"] = props.pop("total_spots") if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent") if "turnover_per_hour" in props: payload["turnover"] = props.pop("turnover_per_hour") elif stype == "noise": if "noise_level_db" in props: payload["noiseLevel"] = props.pop("noise_level_db") if "peak_db" in props: payload["noisePeak"] = props.pop("peak_db") payload["noiseCategory"] = {"type": "Property", "value": random.choice(NOISE_CATEGORIES)} elif stype == "weather": if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius") if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent") if "rain_mm" in props: payload["rainfall"] = props.pop("rain_mm") if "uv_index" in props: payload["uvIndex"] = props.pop("uv_index") if "wind_speed_kmh" in props: payload["windSpeed"] = props.pop("wind_speed_kmh") elif stype == "light": if "brightness_lux" in props: payload["illuminance"] = props.pop("brightness_lux") if "power_consumption_w" in props: payload["power"] = props.pop("power_consumption_w") payload["status"] = {"type": "Property", "value": random.choice(LIGHT_STATUSES)} return payload def _frost_payload(sid: str, sensor: dict) -> dict: """Construit un payload SensorThings pour FROST-Server.""" stype = sensor["type"] ranges = SENSOR_RANGES.get(stype, {}) datastreams = [] for field, val_range in ranges.items(): if isinstance(val_range, tuple) and len(val_range) == 2: lo, hi = val_range if isinstance(lo, (int, float)) and isinstance(hi, (int, float)): val = round(random.uniform(lo, hi), 1) unit = "http://www.qudt.org/vocab/unit#DegreeCelsius" obs_prop = { "name": f"{field} Observation", "description": f"Observation of {field}", "definition": unit, } sensor_data = { "name": f"Sensor {sid} {field}", "description": f"Sensor {sid} measuring {field}", "encodingType": "http://www.opengis.net/doc/IS/SensorML/2.0", "metadata": {"unit": unit}, } ds = { "name": f"Datastream {stype}/{field}", "description": f"Datastream for {stype} sensor {sid} - {field}", "observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement", "unitOfMeasurement": {"name": field, "symbol": "", "definition": unit}, "Sensor": sensor_data, "ObservedProperty": obs_prop, } datastreams.append((field, ds, val)) thing_payload = { "name": f"Thing_{sid}", "description": f"Smart City {stype} sensor in Martinique", "properties": {"sensorType": stype, "region": "Martinique"}, } return thing_payload, datastreams # ============================================================================= # HTTP helper # ============================================================================= def _http_post(url: str, data: dict, headers: dict) -> str: """POST et retourne 'ok' ou 'created' (ou '' si échec).""" try: body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, headers=headers, method="POST") with urllib.request.urlopen(req, timeout=8) as resp: if resp.status == 204: return 'created' # No Content — succès if resp.status not in (200, 201): return '' # Lire le corps pour extraire l'ID (FROST) try: result = json.loads(resp.read()) if '@iot.selfLink' in result: link = result['@iot.selfLink'] return link.split('(')[1].rstrip(')') if '@iot.id' in result: return str(result['@iot.id']) except Exception: pass location = resp.headers.get('Location', '') if location: return location.split('(')[1].rstrip(')') if '(' in location else '' return 'created' except urllib.error.HTTPError as e: # Lire le corps de l'erreur pour debug try: err_body = e.read().decode()[:200] except Exception: err_body = str(e) print(f" ⚠️ HTTP POST {url} → {e.code}: {err_body}") return '' except Exception as e: print(f" ⚠️ HTTP POST {url} → {e}") return '' def _http_put(url: str, data: dict, headers: dict) -> bool: try: body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, headers=headers, method="PUT") with urllib.request.urlopen(req, timeout=5) as resp: return resp.status in (200, 204) except urllib.error.HTTPError as e: if e.code == 409: return True # Already exists - that's fine print(f" ⚠️ HTTP PUT {url} → {e}") return False except Exception as e: print(f" ⚠️ HTTP PUT {url} → {e}") return False # ============================================================================= # MQTT Client multi-broker # ============================================================================= class MultiMQTT: def __init__(self): self.clients: dict[str, mqtt.Client] = {} self.ok: dict[str, bool] = {} self._lock = threading.Lock() self._setup() def _mk_client(self, name: str, host: str, port: int, tls: bool = False, user: str = "", pwd: str = "", ws: bool = False) -> mqtt.Client: cid = f"smartcity-sim-{name}-{os.getpid()}" c = mqtt.Client(client_id=cid, protocol=mqtt.MQTTv311) if user: c.username_pw_set(user, pwd) if tls: c.tls_set(cert_reqs=ssl.CERT_NONE) c.tls_insecure_set(True) if ws: c.ws_set(b"/mqtt") c.on_connect = lambda _c, _, __, rc: self._on_connect(name, rc) c.on_disconnect = lambda _c, _, __: self._on_disconnect(name) try: c.connect(host, port, keepalive=30) c.loop_start() except Exception as e: print(f"[MQTT] ❌ {name} @ {host}:{port} → {e}") self.ok[name] = False return c def _on_connect(self, name: str, rc: int): with self._lock: if rc == 0: self.ok[name] = True print(f"[MQTT] ✅ {name} connecté") else: self.ok[name] = False print(f"[MQTT] ❌ {name} rc={rc}") def _on_disconnect(self, name: str): with self._lock: self.ok[name] = False print(f"[MQTT] ⚠️ {name} déconnecté") def _setup(self): # Utiliser les variables d'environnement pour les brokers brokers = [ ("EMQX", EMQX_HOST, EMQX_PORT, False, "", ""), ("Mosquitto", MOSQUITTO_HOST, MOSQUITTO_PORT, False, "bunker", "bunker"), ("BunkerM", BUNKERM_HOST, BUNKERM_PORT, False, "bunker", "bunker"), # Port 1900 = MQTT simple, pas TLS ] print("[MQTT] 🔌 Connexion aux brokers...") for name, host, port, tls, user, pwd in brokers: c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd) self.clients[name] = c self.ok[name] = False time.sleep(3) # Attend les connexions def publish(self, topic: str, payload: str) -> dict[str, bool]: results = {} with self._lock: for name, client in self.clients.items(): if self.ok.get(name, False): try: r = client.publish(topic, payload, qos=1) results[name] = (r.rc == mqtt.MQTT_ERR_SUCCESS) except Exception: results[name] = False else: results[name] = False return results def stop(self): for name, c in self.clients.items(): try: c.loop_stop() c.disconnect() except Exception: pass # ============================================================================= # URLs de base (résolues au démarrage) # ============================================================================= ORION_HOST = "localhost" ORION_PORT = "2026" ORION_URL = f"http://{ORION_HOST}:{ORION_PORT}" STELLIO_URL = os.environ.get("STELLIO_URL", "http://localhost:8080") # Stellio API Gateway (à exposer) # Configuration OpenRemote (URLs dynamiques) OR_URL = os.environ.get("OR_URL", "http://localhost:8080") # OpenRemote Manager (Traefik) OR_REALM = os.environ.get("OR_REALM", "smartcity") # Default: smartcity OR_TOKEN_URL = os.environ.get("OR_TOKEN_URL", "http://localhost:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token") OR_TOKEN_TTL = int(os.environ.get("OR_TOKEN_TTL", "3600")) # Refresh token every hour STELLIO_TENANT = os.environ.get("STELLIO_TENANT", "urn:ngsi-ld:tenant:default") def publish_stellio(sid: str, sensor: dict) -> bool: """Publie sur Stellio via Traefik (gère le 409).""" entity = _ngsi_payload(sid, sensor, context=STELLIO_INLINE_CONTEXT) # Stellio a besoin du @context pour résoudre les vocabulaires NGSI-LD # (uri.etsi.org résolu depuis le JAR embarqué) url = f"{STELLIO_URL}/ngsi-ld/v1/entities" headers = { "Content-Type": "application/ld+json", "Accept": "application/ld+json", "NGSILD-Tenant": STELLIO_TENANT, } try: body = json.dumps(entity).encode() req = urllib.request.Request(url, data=body, headers=headers, method="POST") with urllib.request.urlopen(req, timeout=8) as resp: print(f" 🏢 Stellio: ✅ (HTTP {resp.status})") return True except urllib.error.HTTPError as e: if e.code == 409: # Already exists, do update with PUT try: entity_id = urllib.parse.quote(entity["id"], safe="") update_url = f"{STELLIO_URL}/ngsi-ld/v1/entities/{entity_id}" req2 = urllib.request.Request(update_url, data=body, headers=headers, method="PUT") with urllib.request.urlopen(req2, timeout=8) as resp2: print(f" 🏢 Stellio: ✅ (HTTP {resp2.status} updated)") return True except Exception as e2: print(f" ⚠️ Stellio update failed: {e2}") return False try: err = e.read().decode()[:300] except Exception: err = str(e) print(f" ⚠️ Stellio → {e.code}: {err}") return False except Exception as e: print(f" ⚠️ Stellio → {e}") return False def publish_orion(sid: str, sensor: dict) -> bool: """Publie sur Orion-LD (POST create, PATCH update).""" import socket entity = _ngsi_payload(sid, sensor) if not hasattr(publish_orion, "orion_ip"): try: publish_orion.orion_ip = socket.gethostbyname("fiware-gis-quickstart-orion-1") except Exception: publish_orion.orion_ip = "192.168.192.20" base = f"http://{publish_orion.orion_ip}:1026/ngsi-ld/v1" # 1. Essayer de créer (POST) try: body = json.dumps(entity).encode() req = urllib.request.Request(f"{base}/entities", data=body, headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="POST") with urllib.request.urlopen(req, timeout=8) as resp: print(f" 🌐 Orion-LD: ✅ (HTTP {resp.status} created)") return True except urllib.error.HTTPError as e: if e.code != 409: print(f" ⚠️ Orion-LD → {e.code}: {e.read().decode()[:200]}") return False # 2. Déjà existant (409) → PATCH sur les attributs try: eid = urllib.parse.quote(entity['id'], safe='') patch_url = f"{base}/entities/{eid}/attrs" req2 = urllib.request.Request(patch_url, data=body, headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="PATCH") with urllib.request.urlopen(req2, timeout=8) as resp2: print(f" 🌐 Orion-LD: ✅ (HTTP {resp2.status} updated)") return True except Exception as e2: print(f" ⚠️ Orion-LD → update failed: {e2}") return False except Exception as e: print(f" ⚠️ Orion-LD → {e}") return False def publish_bunkerm(sid: str, sensor: dict, values: dict) -> bool: """Publie sur BunkerM via HTTP API (port 2000) avec session.""" import base64, http.cookiejar, urllib.request, json from datetime import datetime, timezone host = "bunkerm_bunkerm_1:2000" login_url = f"http://{host}/login" data_url = f"http://{host}/api/sensors/data" # 1. Cookie jar pour maintenir la session cj = http.cookiejar.CookieJar() opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj)) # 2. Authentification (Basic) pour obtenir le cookie de session creds = base64.b64encode(b"bunker:bunker").decode() auth_header = {"Authorization": f"Basic {creds}"} try: # GET sur /login pour initialiser la session req_login = urllib.request.Request(login_url, headers=auth_header) opener.open(req_login, timeout=5) except Exception as e: print(f" ⚠️ BunkerM login → {e}") return False # 3. Préparer le payload payload = { "sensor_id": sid, "sensor_type": sensor["type"], "timestamp": datetime.now(timezone.utc).isoformat(), } for k, v in values.items(): if k not in payload: payload[k] = v # 4. POST avec le cookie de session data = json.dumps(payload).encode() req = urllib.request.Request( data_url, data=data, headers={**auth_header, "Content-Type": "application/json"}, method="POST" ) try: with opener.open(req, timeout=5) as resp: print(f" ✅ BunkerM: HTTP {resp.status}") return resp.status in (200, 201, 204) except Exception as e: print(f" ⚠️ BunkerM POST → {e}") return False def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool: """Crée le Thing (1 par capteur) + Datastreams, puis POST l'Observation.""" # Cache : {sid: (thing_id, {field: ds_id})} if sid in _frost_cache: thing_id, ds_map = _frost_cache[sid] if field in ds_map: ds_id = ds_map[field] obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations" obs = { "resultTime": datetime.now(timezone.utc).isoformat(), "result": value, "FeatureOfInterest": { "name": f"Location {sid}", "description": f"Feature of interest for sensor {sid}", "encodingType": "application/vnd.geo+json", "feature": { "type": "Point", "coordinates": [sensor.get("lon", -61.0), sensor.get("lat", 14.6)] } } } if _http_post(obs_url, obs, FROST_HEADERS): print(f" ✅ FROST Observation {sid}/{field} → OK (cached)") return True else: print(f" ⚠️ FROST Observation {sid}/{field} → échec") return False # Premier appel pour ce capteur : créer Thing + tous les Datastreams thing_payload, datastreams = _frost_payload(sid, sensor) print(f" 📊 FROST: POST Thing {sid}...") tid = _http_post(f"{FROST_URL}/Things", thing_payload, FROST_HEADERS) if not tid: print(f" ⚠️ FROST Thing {sid} → échec création") return False print(f" ✅ FROST Thing {sid} créé (ID: {tid})") # Créer les Datastreams ds_map = {} for f, ds, _ in datastreams: ds["Thing"] = {"@iot.id": tid} print(f" 📊 FROST: POST Datastream {sid}/{f}...") ds_id = _http_post(f"{FROST_URL}/Datastreams", ds, FROST_HEADERS) if ds_id: print(f" ✅ FROST Datastream {sid}/{f} créé (ID: {ds_id})") ds_map[f] = ds_id else: print(f" ⚠️ FROST Datastream {sid}/{f} → échec") _frost_cache[sid] = (tid, ds_map) # Poster l'observation pour le field actuel if field in ds_map: ds_id = ds_map[field] obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations" obs = {"resultTime": datetime.now(timezone.utc).isoformat(), "result": value} if _http_post(obs_url, obs, FROST_HEADERS): print(f" ✅ FROST Observation {sid}/{field} → OK") return True return False # ============================================================================= # OpenRemote REST # ============================================================================= _or_token_cache = {"token": "", "expires": 0} def _get_or_token() -> str: """Obtain an OpenRemote token via password grant (admin user).""" import time, urllib.parse if _or_token_cache["token"] and _or_token_cache["expires"] > time.time() + 60: return _or_token_cache["token"] try: # Use password grant with admin user (full rights) token_url = f"http://openremote-keycloak-1:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token" client_id = os.environ.get("OR_CLIENT_ID", "openremote") client_secret = os.environ.get("OR_CLIENT_SECRET", "QVTnyObwXdpQ0Vuc60kFSonidK49FiXb") data = urllib.parse.urlencode({ "grant_type": "password", "username": os.environ.get("OR_ADMIN_USER", "admin"), "password": os.environ.get("OR_ADMIN_PASS", "Digitribe972"), "client_id": client_id, "client_secret": client_secret }).encode() req = urllib.request.Request( token_url, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"} ) with urllib.request.urlopen(req, timeout=5) as r: token_data = json.loads(r.read().decode()) _or_token_cache["token"] = token_data["access_token"] _or_token_cache["expires"] = time.time() + token_data.get("expires_in", 300) - 60 return _or_token_cache["token"] except Exception as e: print(f" ⚠️ OpenRemote token → {e}") return "" def _or_put(asset_id: str, payload: dict) -> bool: """PUT update sur un asset OpenRemote (avec version).""" token = _get_or_token() if not token: return False try: body = json.dumps(payload).encode() url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}" req = urllib.request.Request(url, data=body, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json", "If-Match": str(payload.get("version", 1)), }, method="PUT") with urllib.request.urlopen(req, timeout=5) as resp: return resp.status in (200, 204) except urllib.error.HTTPError as e: print(f" ⚠️ OR PUT {asset_id} → HTTP {e.code}") return False except Exception as e: print(f" ⚠️ OR PUT {asset_id} → {e}") return False def publish_openremote(sid: str, sensor: dict, values: dict) -> bool: """Met à jour les attributs d'un asset OpenRemote via REST.""" # Mapping sid → asset ID (créés manuellement dans OR) ASSET_MAP = { "traffic_000": "7b5c5670d1b84865ba3ac7", # Traffic Fort-de-France Centre "traffic_001": "557f6e993b994d3cb81017", # Traffic Fort-de-France North "traffic_002": "cb81dfd2d2dc4d25adc9c2", # Traffic Fort-de-France South "airquality_000": "a51c982a2d3e451898b978", # Air Quality Fort-de-France "parking_000": "b8f0df19e0af47b386ebb9", # Parking Fort-de-France Centre "noise_000": "9035103d1866454fb7e451", # Noise Fort-de-France Centre "weather_000": "b9de80905ac640f488ab27", # Weather Lamentin Airport "light_000": "ee7823a41e594851ba202f", # Light Fort-de-France } asset_id = ASSET_MAP.get(sid) if not asset_id: return False # Construire les attributs à jour now = datetime.now(timezone.utc).isoformat() attrs = { "timestamp": {"type": "DateTime", "value": now, "timestamp": now}, "battery_level": {"type": "Number", "value": random.randint(60, 100), "timestamp": now}, } # Mapper les valeurs du payload vers les attributs OR field_map = { "temperature_celsius": "temperature", "humidity_percent": "humidity", "noise_level_db": "noiseLevel", "pm25_ugm3": "airQuality", "vehicle_count": "trafficFlow", "available_spots": "parking", "brightness_lux": "light", "flood": "flood", } for field, val in values.items(): attr_name = field_map.get(field, field) attrs[attr_name] = { "type": "Number", "value": val, "timestamp": now, "unit": "µg/m³" if "ugm3" in field else ("°C" if "celsius" in field else ("%" if "percent" in field or "humidity" in field else ("dB" if "db" in field else ""))), } payload = { "id": asset_id, "name": sensor["name"], "type": "IOTSensor", "realm": OR_REALM, "attributes": attrs, } return _or_put(asset_id, payload) def publish_influx(sid: str, sensor: dict, values: dict) -> bool: """Write sensor data to InfluxDB (async, non-blocking).""" if not _influx_write_api: return False def _write_async(): try: stype = sensor["type"] lat = sensor.get("lat", BASE_LAT) lon = sensor.get("lon", BASE_LON) points = [] for field, value in values.items(): if isinstance(value, (int, float)): p = influxdb_client.Point(stype)\ .tag("sensor_id", sid)\ .tag("location", sensor.get("name", sid))\ .field(field, float(value))\ .field("lat", float(lat))\ .field("lon", float(lon)) points.append(p) if points: _influx_write_api.write(bucket=INFLUX_BUCKET, record=points) print(f" 📈 InfluxDB: {len(points)} points written") except Exception as e: print(f" ⚠️ InfluxDB → {e}") # Exécution asynchrone (non-bloquante) t = threading.Thread(target=_write_async, daemon=True) t.start() return True def main(): print("╔══════════════════════════════════════════════════╗") print("║ Smart City Simulator — Martinique ║") print("╚══════════════════════════════════════════════════╝") print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s") print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}") mqtt_client = MultiMQTT() running = True def signal_handler(*_): nonlocal running running = False print("\n[SIM] 🛑 Arrêt...") signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) iteration = 0 while running: iteration += 1 ts = datetime.now().strftime("%H:%M:%S") print(f"\n[SIM] ⏱️ It #{iteration} — {ts}") for sid, sensor in SENSORS.items(): stype = sensor["type"] topic = f"city/sensors/{stype}/{sid}" # --- Payload MQTT --- ranges = SENSOR_RANGES.get(stype, {}) payload_mqtt = { "id": sid, "type": stype, "name": sensor["name"], "lat": sensor["lat"], "lon": sensor["lon"], "timestamp": datetime.now(timezone.utc).isoformat(), "battery_level": random.randint(60, 100), } for field, val_range in ranges.items(): if isinstance(val_range, tuple) and len(val_range) == 2: lo, hi = val_range if isinstance(lo, (int, float)): payload_mqtt[field] = round(random.uniform(lo, hi), 1) elif isinstance(val_range, list): payload_mqtt[field] = random.choice(val_range) msg = json.dumps(payload_mqtt, ensure_ascii=False) # --- MQTT publish --- results = mqtt_client.publish(topic, msg) ok_mqtt = [n for n, r in results.items() if r] if ok_mqtt: print(f" 📤 {topic} → {','.join(ok_mqtt)}") # Extraire les valeurs pour OpenRemote or_values = {} for field, val_range in ranges.items(): if isinstance(val_range, tuple) and len(val_range) == 2: lo, hi = val_range if isinstance(lo, (int, float)): or_values[field] = round(random.uniform(lo, hi), 1) # --- OpenRemote REST --- if ENABLE_OPENREMOTE: ok_or = publish_openremote(sid, sensor, or_values) print(f" 🏠 OpenRemote: {'✅' if ok_or else '⚠️ skipped'}") # --- Orion-LD --- if ENABLE_ORION: ok_or = publish_orion(sid, sensor) print(f" 🌐 Orion-LD: {'✅' if ok_or else '⚠️ skipped'}") # --- Stellio --- if ENABLE_STELLIO: ok_st = publish_stellio(sid, sensor) print(f" 🏢 Stellio: {'✅' if ok_st else '❌'}") # --- FROST --- if ENABLE_FROST: ranges2 = SENSOR_RANGES.get(stype, {}) for field, val_range in ranges2.items(): if isinstance(val_range, tuple) and len(val_range) == 2: lo, hi = val_range if isinstance(lo, (int, float)): val = round(random.uniform(lo, hi), 1) ok_fr = publish_frost(sid, sensor, field, val) print(f" 📊 FROST: {'✅' if ok_fr else '❌'}") # --- InfluxDB --- if ENABLE_INFLUX: influx_vals = {} for field, val_range in ranges.items(): if isinstance(val_range, tuple) and len(val_range) == 2: lo, hi = val_range if isinstance(lo, (int, float)): influx_vals[field] = round(random.uniform(lo, hi), 1) ok_influx = publish_influx(sid, sensor, influx_vals) print(f" 📈 InfluxDB: {'✅' if ok_influx else '❌'}") # --- BunkerM HTTP --- if os.getenv("BUNKERM_HTTP", "0") == "1": ok_bunkerm = publish_bunkerm(sid, sensor, payload_mqtt) print(f" 📦 BunkerM: {'✅' if ok_bunkerm else '❌'}") print(f"[SIM] ✅ {len(SENSORS)} capteurs | MQTT OK: {sum(mqtt_client.ok.values())}/{len(mqtt_client.clients)} | OR: {ENABLE_OPENREMOTE}") try: time.sleep(INTERVAL) except KeyboardInterrupt: break mqtt_client.stop() print("[SIM] ✅ Arrêté proprement.") if __name__ == "__main__": main()