#!/usr/bin/env python3 """ Smart City IoT Simulator — Martinique (14.6°N, 61.2°W) ======================================================= Publie vers MULTIPLES brokers MQTT + context brokers NGSI-LD. Brokers MQTT: - EMQX: emqx_emqx_1:1883 (sans auth) - Mosquitto: mainfluxlabs-mosquitto:1883 (bunker/bunker) - BunkerM: bunkerm_bunkerm_1:1900 (TLS, bunker/bunker) - OpenRemote: openremote-manager-1:1883 (admin/Digitribe972) Context Brokers REST: # - Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD) # - Stellio: stellio-api-gateway:8080 (NGSI-LD) - FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings) Streaming Platforms: - Pulsar: smart-city-pulsar:8080 (HTTP REST Producer API) - Redpanda: smart-city-redpanda:8082 (Kafka REST Proxy) Time-Series DB: - InfluxDB v2: smart-city-influxdb:8086 (via influxdb-client) Variables d'environnement: PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s) BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France) # ENABLE_ORION=1 : activer Orion-LD (défaut: 1) # ENABLE_STELLIO=1 : activer Stellio (défaut: 1) ENABLE_FROST=1 : activer FROST-Server (défaut: 1) ENABLE_INFLUX=1 : activer InfluxDB v2 (défaut: 1) ENABLE_PULSAR=1 : activer Apache Pulsar (défaut: 1) ENABLE_REDPANDA=1 : activer Redpanda Kafka (défaut: 1) """ import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse import paho.mqtt.client as mqtt import urllib.request, urllib.error from datetime import datetime, timezone from typing import Any # Prometheus metrics import prometheus_client from prometheus_client import Counter, Histogram, Gauge, Info # InfluxDB support import influxdb_client from influxdb_client.client.write_api import SYNCHRONOUS # ============================================================================= # Configuration des brokers MQTT # Configuration des brokers MQTT # Utilise les noms de services Docker par défaut EMQX_HOST = os.environ.get("EMQX_HOST", "emqx_emqx_1") EMQX_PORT = int(os.environ.get("EMQX_PORT", "1883")) MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "smart-city-mosquitto") MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883")) BUNKERM_HOST = os.environ.get("BUNKERM_HOST", "bunkerm-bunkerm-1") BUNKERM_PORT = int(os.environ.get("BUNKERM_PORT", "1900")) # ============================================================================= # Configuration # ============================================================================= BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091")) BASE_LON = float(os.environ.get("BASE_LON", "-61.2155")) INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "1")) # 1s pour temps réel #ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1" #ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1").lower() in ("1", "true", "yes", "on") ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1" ENABLE_OPENREMOTE = os.environ.get("ENABLE_OPENREMOTE", "1") == "1" OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin") OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "Digitribe972") OR_REALM = os.environ.get("OR_REALM", "master") OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token #ENABLE_IOT_AGENT = os.environ.get("ENABLE_IOT_AGENT", "1") == "1" FROST_URL = os.environ.get("FROST_URL", "http://localhost:8090/FROST-Server/v1.1") # Exposer frost_http-web-1:8080 -> host:8086 # Pulsar config (HTTP REST — pulsar-admin + producer REST API) ENABLE_PULSAR = os.environ.get("ENABLE_PULSAR", "1").lower() in ("1", "true", "yes", "on") PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar") PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "8080")) PULSAR_BASE = f"http://{PULSAR_HOST}:{PULSAR_PORT}" # Redpanda / Kafka config (REST Proxy HTTP) ENABLE_REDPANDA = os.environ.get("ENABLE_REDPANDA", "1").lower() in ("1", "true", "yes", "on") REDPANDA_HOST = os.environ.get("REDPANDA_HOST", "smart-city-redpanda") REDPANDA_PORT = int(os.environ.get("REDPANDA_PORT", "8082")) REDPANDA_BASE = f"http://{REDPANDA_HOST}:{REDPANDA_PORT}" # InfluxDB config ENABLE_INFLUX = os.environ.get("ENABLE_INFLUX", "1").lower() in ("1", "true", "yes", "on") INFLUX_URL = os.environ.get("INFLUX_URL", "http://smart-city-influxdb:8086") # InfluxDB v2 sur smartcity-shared INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe") INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "smartcity") # Correspond au bucket de Telegraf INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-token") # Prometheus metrics HTTP server METRICS_PORT = int(os.environ.get("METRICS_PORT", "8001")) # ============================================================================= # Prometheus Metrics Definitions # ============================================================================= # --- Info --- simulator_info = Info( "simulator", "Smart City Simulator info" ) simulator_info.info({ "version": "1.0.0", "python_version": sys.version.split()[0], "mqtt_brokers": "emqx,mosquitto,bunkerm", # "context_brokers": "orion_ld,stellio,frost", }) # --- Counters --- messages_published_total = Counter( "simulator_messages_published_total", "Total messages published by broker", ["broker", "sensor_type"] ) messages_errors_total = Counter( "simulator_messages_errors_total", "Total publish errors", ["broker", "sensor_type", "error_type"] ) mqtt_connection_total = Counter( "simulator_mqtt_connection_total", "MQTT connection attempts", ["broker", "status"] # status: success, failure ) http_requests_total = Counter( "simulator_http_requests_total", "HTTP requests to REST APIs", ["broker", "method", "status_code"] ) influx_write_total = Counter( "simulator_influx_write_total", "InfluxDB write operations", ["status"] # success, error ) # --- Gauges --- mqtt_broker_connected = Gauge( "simulator_mqtt_broker_connected", "MQTT broker connection status (1=connected, 0=disconnected)", ["broker"] ) sensors_total = Gauge( "simulator_sensors_total", "Total number of sensors by type", ["sensor_type"] ) up = Gauge( "simulator_up", "Simulator is running (1=yes, 0=no)" ) # --- Histograms --- publish_duration = Histogram( "simulator_publish_duration_seconds", "Time spent publishing a message", ["broker"], buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5) ) http_request_duration = Histogram( "simulator_http_request_duration_seconds", "HTTP request latency to REST APIs", ["broker", "method"], buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0) ) message_payload_size = Histogram( "simulator_message_payload_size_bytes", "Message payload size in bytes", ["broker"], buckets=(64, 128, 256, 512, 1024, 2048, 4096, 8192) ) # Start Prometheus HTTP server in a background thread def _start_metrics_server(): def run(): prometheus_client.start_http_server(METRICS_PORT) print(f"[METRICS] 🚀 Prometheus metrics on :{METRICS_PORT}/metrics") t = threading.Thread(target=run, daemon=True) t.start() return t # Initialize InfluxDB client _influx_client = None _influx_write_api = None if ENABLE_INFLUX: try: _influx_client = influxdb_client.InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG) _influx_write_api = _influx_client.write_api(write_options=influxdb_client.client.write_api.ASYNCHRONOUS) print(f"[INFLUX] ✅ Connected to {INFLUX_URL} (async mode)") except Exception as e: print(f"[INFLUX] ❌ Connection failed: {e}") SENSOR_COUNTS = { "traffic": int(os.environ.get("SENSOR_COUNT_traffic", "3")), "airquality": int(os.environ.get("SENSOR_COUNT_airquality", "10")), "parking": int(os.environ.get("SENSOR_COUNT_parking", "10")), "noise": int(os.environ.get("SENSOR_COUNT_noise", "1")), "weather": int(os.environ.get("SENSOR_COUNT_weather", "1")), "light": int(os.environ.get("SENSOR_COUNT_light", "1")), } # Si SENSOR_COUNT est défini, multiplier les counts de façon proportionnelle _total_default = sum(SENSOR_COUNTS.values()) if "SENSOR_COUNT" in os.environ: target = int(os.environ["SENSOR_COUNT"]) ratio = target / _total_default for k in SENSOR_COUNTS: SENSOR_COUNTS[k] = max(1, int(SENSOR_COUNTS[k] * ratio)) # ============================================================================= # Localisation des capteurs Martinique — Coordonnées FIXES sur terre ferme # IMPORTANT : ±0.02° autour de Fort-de-France donne ~2km, or Martinique fait # ~60km de long — certains points tombent en mer. Solution : coordonnées fixes. # ============================================================================= # Coordonnées réelles Martinique (terre ferme uniquement) # Martinique : 14.4°N–14.9°N, -61.23°W–-60.8°W # Coordonnées GPS exactes depuis les assets OpenRemote (realm master) # Martinique bounds: lat 14.37–14.88°N, lon 61.0–61.25°W FIXED_LOCATIONS: dict[str, dict[str, tuple[float, float]]] = { "traffic": { "Fort-de-France Centre": (14.6164, -61.07), "Le Lamentin Aéroport": (14.6167, -61.0035), "Le Robert D110": (14.6833, -60.9333), "Sainte-Anne Plage": (14.4333, -60.9833), "Saint-Joseph D1": (14.7, -61.05), "Trinité Centre": (14.7167, -60.9167), "Le François D2": (14.6833, -60.8333), "Ducos Penitencier": (14.5833, -61.0667), "Schœlcher Morne": (14.65, -61.1), "Case-Pilote Bourg": (14.5167, -61.1167), }, "airquality": { "Fort-de-France Lamartine": (14.613, -61.0667), "Le Lamentin Zac": (14.62, -61.0), "Le Robert Bourg": (14.68, -60.93), "Sainte-Anne Village": (14.43, -60.98), "Saint-Joseph Morne": (14.705, -61.04), "Trinité Eglise": (14.72, -60.91), "Le François Bourg": (14.68, -60.83), "Ducos Centre": (14.58, -61.06), "Schœlcher Plage": (14.655, -61.11), "Case-Pilote D1": (14.52, -61.12), }, "parking": { "Fort-de-France Place Clémenceau": (14.615, -61.068), "Le Lamentin Centre Commercial": (14.618, -61.002), "Le Robert Stade": (14.685, -60.935), "Sainte-Anne Mairie": (14.432, -60.985), "Saint-Joseph Ecole": (14.702, -61.045), "Trinité Port": (14.715, -60.92), "Le François Mairie": (14.682, -60.835), "Ducos ZI": (14.585, -61.055), "Schœlcher Bourg": (14.652, -61.105), "Case-Pilote Stade": (14.518, -61.118), }, "noise": { "Fort-de-France Théâtre": (14.617, -61.069), "Le Lamentin Zone Industrielle": (14.619, -61.001), "Le Robert Bourg": (14.681, -60.932), "Sainte-Anne Plage": (14.434, -60.982), "Saint-Joseph Morne": (14.703, -61.042), "Trinité Centre": (14.717, -60.918), "Le François Bourg": (14.681, -60.832), "Ducos Penitencier": (14.584, -61.058), "Schœlcher Morne": (14.651, -61.102), "Case-Pilote Village": (14.519, -61.115), }, "weather": { "Fort-de-France Meteo": (14.616, -61.067), "Le Lamentin Aéroport": (14.617, -61.004), "Le Robert Bourg": (14.682, -60.934), "Sainte-Anne Village": (14.431, -60.981), "Saint-Joseph Morne": (14.704, -61.043), "Trinité Eglise": (14.718, -60.912), "Le François Bourg": (14.683, -60.834), "Ducos Centre": (14.586, -61.057), "Schœlcher Plage": (14.654, -61.108), "Case-Pilote D1": (14.521, -61.113), }, "light": { "Fort-de-France Place": (14.6155, -61.0685), "Le Lamentin Rond-point": (14.6185, -61.0025), "Le Robert D110": (14.6835, -60.9335), "Sainte-Anne Plage": (14.4335, -60.9835), "Saint-Joseph D1": (14.7005, -61.0505), "Trinité Centre": (14.7165, -60.9165), "Le François D2": (14.6835, -60.8335), "Ducos Penitencier": (14.5835, -61.0665), "Schœlcher Morne": (14.6505, -61.1005), "Case-Pilote Bourg": (14.5165, -61.1165), }, } # Ranges par type SENSOR_RANGES: dict[str, dict] = { "traffic": {"vehicle_count":(10,150),"average_speed_kmh":(10,80), "congestion_level":(0,5),"occupancy_percent":(0,100)}, "airquality": {"pm25_ugm3":(5,80),"pm10_ugm3":(10,150),"no2_ugm3":(5,60), "o3_ugm3":(20,120),"co_mgm3":(0.1,5.0), "temperature_celsius":(20,35),"humidity_percent":(40,95)}, "parking": {"total_spots":(50,500),"available_spots":(0,500), "occupancy_percent":(0,100),"turnover_per_hour":(5,50)}, "noise": {"noise_level_db":(40,95),"peak_db":(60,110)}, "weather": {"temperature_celsius":(22,34),"humidity_percent":(50,95), "wind_speed_kmh":(0,50),"pressure_hpa":(1005,1025), "rain_mm":(0,20),"uv_index":(0,11)}, "light": {"brightness_lux":(0,100000),"power_consumption_w":(0,500)}, } NOISE_CATEGORIES = ["quiet","moderate","loud","very_loud"] LIGHT_STATUSES = ["on","off","dimmed","auto"] # ============================================================================= # Capteurs déclarés # ============================================================================= SENSORS: dict[str, dict] = {} counter = 0 for stype, locs in FIXED_LOCATIONS.items(): for name, coords in locs.items(): sid = f"{stype}_{counter:03d}" SENSORS[sid] = {"type": stype, "lat": coords[0], "lon": coords[1], "name": name} counter += 1 # ============================================================================= ## Payload NGSI-LD pour Orion-LD / Stellio # ============================================================================= # Contextes NGSI-LD : core + Smart Data Models # https://smartdatamodels.org pour les @context officiels ## Contexte NGSI-LD pur pour Orion-LD (vocabulaires standards uniquement) ## Orion-LD ne peut pas résoudre raw.githubusercontent.com — utiliser uri.etsi.org uniquement #ORION_CONTEXT = [ # "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", #] # Mapping sensor type → Smart Data Model type NGSI-LD SMART_MODEL_MAPPING = { "airquality": "AirQualityObserved", "traffic": "TrafficFlowObserved", "parking": "OffStreetParking", "noise": "NoiseLevelObserved", "weather": "WeatherObserved", "light": "Device", } FROST_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"} # Cache FROST : éviter de recréer Thing/Datastream _frost_cache: dict[str, tuple[str, str]] = {} # (sid, field) -> (thing_id, ds_id) ## Contexte NGSI-LD pur pour Stellio et Orion-LD (vocabulaires standards uniquement) ## Stellio et Orion-LD embarquent le contexte core NGSI-LD : https://uri.etsi.org/ngsi-ld/ # On n'utilise PAS les vocabulaires smartdatamodels.org distants (inaccessibles depuis les containers) # Les types d'entité Smart Data Models (AirQualityObserved, etc.) sont reconnus par leur nom # Les propriétés spécifiques sont stockées telles quelles (vocabulaire libre) #STELLIO_INLINE_CONTEXT = [ # "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", #] def _frost_payload(sid: str, sensor: dict, source: str = "simulator", topic: str = "") -> dict: """Construit un payload SensorThings pour FROST-Server.""" stype = sensor["type"] ranges = SENSOR_RANGES.get(stype, {}) datastreams = [] for field, val_range in ranges.items(): if isinstance(val_range, tuple) and len(val_range) == 2: lo, hi = val_range if isinstance(lo, (int, float)) and isinstance(hi, (int, float)): val = round(random.uniform(lo, hi), 1) unit = "http://www.qudt.org/vocab/unit#DegreeCelsius" obs_prop = { "name": f"{field} Observation", "description": f"Observation of {field}", "definition": unit, } sensor_data = { "name": f"Sensor {sid} {field}", "description": f"Sensor {sid} measuring {field}", "encodingType": "http://www.opengis.net/doc/IS/SensorML/2.0", "metadata": {"unit": unit}, } ds = { "name": f"Datastream {stype}/{field}", "description": f"Datastream for {stype} sensor {sid} - {field}", "observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement", "unitOfMeasurement": {"name": field, "symbol": "", "definition": unit}, "Sensor": sensor_data, "ObservedProperty": obs_prop, } datastreams.append((field, ds, val)) thing_payload = { "name": f"Thing_{sid}", "description": f"Smart City {stype} sensor in Martinique", "properties": { "sensorType": stype, "region": "Martinique", "source": source, # Traçabilité "mqttTopic": topic # Traçabilité }, } return thing_payload, datastreams # ============================================================================= # HTTP helper # ============================================================================= def _http_post(url: str, data: dict, headers: dict, broker: str = "unknown") -> str: """POST et retourne 'ok' ou 'created' (ou '' si échec).""" try: body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, headers=headers, method="POST") with http_request_duration.labels(broker=broker, method="POST").time(): try: with urllib.request.urlopen(req, timeout=8) as resp: http_requests_total.labels(broker=broker, method="POST", status_code=str(resp.status)).inc() if resp.status == 204: return 'created' # No Content — succès if resp.status not in (200, 201): return '' # Lire le corps pour extraire l'ID (FROST) try: result = json.loads(resp.read()) if '@iot.selfLink' in result: link = result['@iot.selfLink'] return link.split('(')[1].rstrip(')') if '@iot.id' in result: return str(result['@iot.id']) except Exception: pass location = resp.headers.get('Location', '') if location: return location.split('(')[1].rstrip(')') if '(' in location else '' return 'created' except Exception: pass except urllib.error.HTTPError as e: # Lire le corps de l'erreur pour debug try: err_body = e.read().decode()[:200] except Exception: err_body = str(e) print(f" ⚠️ HTTP POST {url} → {e.code}: {err_body}") http_requests_total.labels(broker=broker, method="POST", status_code=str(e.code)).inc() messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc() return '' except Exception as e: http_requests_total.labels(broker=broker, method="POST", status_code="exception").inc() messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc() print(f" ⚠️ HTTP POST {url} → {e}") return '' def _http_put(url: str, data: dict, headers: dict, broker: str = "unknown") -> bool: try: body = json.dumps(data).encode() req = urllib.request.Request(url, data=body, headers=headers, method="PUT") with http_request_duration.labels(broker=broker, method="PUT").time(): with urllib.request.urlopen(req, timeout=5) as resp: http_requests_total.labels(broker=broker, method="PUT", status_code=str(resp.status)).inc() return resp.status in (200, 204) except urllib.error.HTTPError as e: http_requests_total.labels(broker=broker, method="PUT", status_code=str(e.code)).inc() if e.code == 409: return True # Already exists - that's fine messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc() print(f" ⚠️ HTTP PUT {url} → {e}") return False except Exception as e: http_requests_total.labels(broker=broker, method="PUT", status_code="exception").inc() messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc() print(f" ⚠️ HTTP PUT {url} → {e}") return False # ============================================================================= # MQTT Client multi-broker # ============================================================================= class MultiMQTT: def __init__(self): self.clients: dict[str, mqtt.Client] = {} self.ok: dict[str, bool] = {} self._lock = threading.Lock() self._setup() def _mk_client(self, name: str, host: str, port: int, tls: bool = False, user: str = "", pwd: str = "", ws: bool = False, use_v5: bool = False) -> mqtt.Client: cid = f"smartcity-sim-{name}-{os.getpid()}" protocol = mqtt.MQTTv5 if use_v5 else mqtt.MQTTv311 # Use Callback API v1 for compatibility with existing lambda callbacks c = mqtt.Client(client_id=cid, protocol=protocol, callback_api_version=mqtt.CallbackAPIVersion.VERSION1) if user: c.username_pw_set(user, pwd) if tls: c.tls_set(cert_reqs=ssl.CERT_NONE) c.tls_insecure_set(True) if ws: c.ws_set(b"/mqtt") c.on_connect = lambda _c, _, __, rc: self._on_connect(name, rc) c.on_disconnect = lambda _c, _, __: self._on_disconnect(name) try: c.connect(host, port, keepalive=120) c.loop_start() except Exception as e: print(f"[MQTT] ❌ {name} @ {host}:{port} → {e}") self.ok[name] = False return c def _on_connect(self, name: str, rc: int): with self._lock: if rc == 0: self.ok[name] = True mqtt_broker_connected.labels(broker=name).set(1) mqtt_connection_total.labels(broker=name, status="success").inc() print(f"[MQTT] ✅ {name} connecté") else: self.ok[name] = False mqtt_broker_connected.labels(broker=name).set(0) mqtt_connection_total.labels(broker=name, status="failure").inc() print(f"[MQTT] ❌ {name} rc={rc}") def _on_disconnect(self, name: str): with self._lock: self.ok[name] = False mqtt_broker_connected.labels(broker=name).set(0) print(f"[MQTT] ⚠️ {name} déconnecté") def _setup(self): # Utiliser les variables d'environnement pour les brokers mqtt_version = os.environ.get("MQTT_PROTOCOL_VERSION", "311") use_v5 = (mqtt_version == "5") print(f"[MQTT] Version du protocole: {'v5.0' if use_v5 else 'v3.1.1'} (MQTT_PROTOCOL_VERSION={mqtt_version})") # Check which brokers are enabled enable_emqx = os.environ.get("ENABLE_EMQX", "1") == "1" enable_mosquitto = os.environ.get("ENABLE_MOSQUITTO", "1") == "1" enable_bunkerm = os.environ.get("ENABLE_BUNKER", "1") == "1" brokers = [] if enable_emqx: brokers.append(("emqx", EMQX_HOST, EMQX_PORT, False, "", "", use_v5)) if enable_mosquitto: brokers.append(("mosquitto", MOSQUITTO_HOST, MOSQUITTO_PORT, False, "", "", use_v5)) if enable_bunkerm: brokers.append(("bunkerm", BUNKERM_HOST, BUNKERM_PORT, False, "bunker", "bunker", use_v5)) # OpenRemote MQTT broker (pour agents MQTT créés dans OpenRemote) # Le broker Artemis d'OR nécessite MQTTv5 pour l'auth anonyme if ENABLE_OPENREMOTE: brokers.append(("openremote", "openremote_manager_1", 1883, False, "", "", True)) print(f"[MQTT] 🔌 Connexion aux brokers (EMQX={enable_emqx}, Mosquitto={enable_mosquitto}, BunkerM={enable_bunkerm})...") print("[MQTT] 🔌 Connexion aux brokers...") for name, host, port, tls, user, pwd, use_v5 in brokers: c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd, use_v5=use_v5) self.clients[name] = c self.ok[name] = False # Attendre que tous les brokers soient connectés (Artemis peut être lent) for _ in range(20): time.sleep(1) if all(self.ok.values()): break print(f"[MQTT] État des connexions: {dict(self.ok)}") def publish(self, topic: str, payload: str, sensor_type: str = "unknown") -> dict[str, bool]: results = {} payload_bytes = len(payload.encode()) with self._lock: for name, client in self.clients.items(): if self.ok.get(name, False): with publish_duration.labels(broker=name).time(): try: r = client.publish(topic, payload, qos=1) success = (r.rc == mqtt.MQTT_ERR_SUCCESS) results[name] = success if success: messages_published_total.labels(broker=name, sensor_type=sensor_type).inc() message_payload_size.labels(broker=name).observe(payload_bytes) else: messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="mqtt_rc").inc() except Exception: results[name] = False messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="exception").inc() else: results[name] = False return results # def publish_iot_agent(self, sid: str, payload: dict, sensor_type: str = "unknown") -> bool: # """Publie sur le topic IoT-Agent (json/smartcity-api-key/{sid}/attrs) via les 3 brokers.""" topic = f"json/smartcity-api-key/{sid}/attrs" msg = json.dumps(payload, ensure_ascii=False) payload_bytes = len(msg.encode()) success = False # Publier sur les 3 brokers: emqx, mosquitto, bunkerm for broker_name in ['emqx', 'mosquitto', 'bunkerm']: client_ok = self.clients.get(broker_name) is not None and self.ok.get(broker_name, False) print(f"[MQTT-DEBUG] {broker_name}: client_exists={self.clients.get(broker_name) is not None}, ok={self.ok.get(broker_name, False)}") if client_ok: try: r = self.clients[broker_name].publish(topic, msg, qos=1) if r.rc == mqtt.MQTT_ERR_SUCCESS: success = True # messages_published_total.labels(broker='iot-agent', sensor_type=sensor_type).inc() # message_payload_size.labels(broker='iot-agent').observe(payload_bytes) except Exception: pass # IoT-Agent code removed return success def stop(self): for name, c in self.clients.items(): try: c.loop_stop() c.disconnect() except Exception: pass # ============================================================================= # URLs de base (résolues au démarrage) # ============================================================================= #ORION_HOST = "localhost" #ORION_PORT = "2026" #ORION_URL = f"http://{ORION_HOST}:{ORION_PORT}" #STELLIO_URL = os.environ.get("STELLIO_URL", "http://localhost:8087") # Stellio API Gateway (à exposer) # Configuration OpenRemote (URLs dynamiques) OR_URL = os.environ.get("OR_URL", "http://localhost:8080") # OpenRemote Manager (Traefik) OR_REALM = os.environ.get("OR_REALM", "smartcity") # Default: smartcity OR_TOKEN_URL = os.environ.get("OR_TOKEN_URL", "http://localhost:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token") OR_TOKEN_TTL = int(os.environ.get("OR_TOKEN_TTL", "3600")) # Refresh token every hour #STELLIO_TENANT = os.environ.get("STELLIO_TENANT", "urn:ngsi-ld:tenant:default") def publish_bunkerm(sid: str, sensor: dict, values: dict) -> bool: """Publie sur BunkerM via HTTP API (port 2000) avec session.""" import base64, http.cookiejar, urllib.request, json from datetime import datetime, timezone host = "bunkerm_bunkerm_1:2000" login_url = f"http://{host}/login" data_url = f"http://{host}/api/sensors/data" # 1. Cookie jar pour maintenir la session cj = http.cookiejar.CookieJar() opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj)) # 2. Authentification (Basic) pour obtenir le cookie de session creds = base64.b64encode(b"bunker:bunker").decode() auth_header = {"Authorization": f"Basic {creds}"} try: # GET sur /login pour initialiser la session req_login = urllib.request.Request(login_url, headers=auth_header) opener.open(req_login, timeout=5) except Exception as e: print(f" ⚠️ BunkerM login → {e}") return False # 3. Préparer le payload payload = { "sensor_id": sid, "sensor_type": sensor["type"], "timestamp": datetime.now(timezone.utc).isoformat(), } for k, v in values.items(): if k not in payload: payload[k] = v # 4. POST avec le cookie de session data = json.dumps(payload).encode() req = urllib.request.Request( data_url, data=data, headers={**auth_header, "Content-Type": "application/json"}, method="POST" ) try: with http_request_duration.labels(broker="bunkerm", method="POST").time(): with opener.open(req, timeout=5) as resp: http_requests_total.labels(broker="bunkerm", method="POST", status_code=str(resp.status)).inc() messages_published_total.labels(broker="bunkerm", sensor_type=sensor["type"]).inc() print(f" ✅ BunkerM: HTTP {resp.status}") return resp.status in (200, 201, 204) except Exception as e: http_requests_total.labels(broker="bunkerm", method="POST", status_code="exception").inc() messages_errors_total.labels(broker="bunkerm", sensor_type=sensor["type"], error_type="exception").inc() print(f" ⚠️ BunkerM POST → {e}") return False def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool: """Crée le Thing (1 par capteur) + Datastreams, puis POST l'Observation.""" # Cache : {sid: (thing_id, {field: ds_id})} if sid in _frost_cache: thing_id, ds_map = _frost_cache[sid] if field in ds_map: ds_id = ds_map[field] obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations" obs = { "resultTime": datetime.now(timezone.utc).isoformat(), "result": value, "FeatureOfInterest": { "name": f"Location {sid}", "description": f"Feature of interest for sensor {sid}", "encodingType": "application/vnd.geo+json", "feature": { "type": "Point", "coordinates": [sensor.get("lon", -61.0), sensor.get("lat", 14.6)] } } } if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"): print(f" ✅ FROST Observation {sid}/{field} → OK (cached)") return True else: print(f" ⚠️ FROST Observation {sid}/{field} → échec") return False # Premier appel pour ce capteur : créer Thing + tous les Datastreams # Topic MQTT pour traçabilité stype = sensor["type"] topic = f"city/sensors/{stype}/{sid}" thing_payload, datastreams = _frost_payload(sid, sensor, source="simulator", topic=topic) print(f" 📊 FROST: POST Thing {sid}...") tid = _http_post(f"{FROST_URL}/Things", thing_payload, FROST_HEADERS, broker="frost") if not tid: print(f" ⚠️ FROST Thing {sid} → échec création") return False print(f" ✅ FROST Thing {sid} créé (ID: {tid})") # Créer les Datastreams ds_map = {} for f, ds, _ in datastreams: ds["Thing"] = {"@iot.id": tid} print(f" 📊 FROST: POST Datastream {sid}/{f}...") ds_id = _http_post(f"{FROST_URL}/Datastreams", ds, FROST_HEADERS, broker="frost") if ds_id: print(f" ✅ FROST Datastream {sid}/{f} créé (ID: {ds_id})") ds_map[f] = ds_id else: print(f" ⚠️ FROST Datastream {sid}/{f} → échec") _frost_cache[sid] = (tid, ds_map) # Poster l'observation pour le field actuel if field in ds_map: ds_id = ds_map[field] obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations" obs = {"resultTime": datetime.now(timezone.utc).isoformat(), "result": value} if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"): print(f" ✅ FROST Observation {sid}/{field} → OK") return True return False # ============================================================================= # OpenRemote REST # ============================================================================= _or_token_cache = {"token": "", "expires": 0} def _get_or_token() -> str: """Obtain an OpenRemote token via password grant (admin user).""" import time, urllib.parse if _or_token_cache["token"] and _or_token_cache["expires"] > time.time() + 60: return _or_token_cache["token"] try: # Use password grant with admin user (full rights) token_url = f"http://openremote-keycloak-1:8080/auth/realms/{OR_TOKEN_REALM}/protocol/openid-connect/token" client_id = os.environ.get("OR_CLIENT_ID", "openremote") client_secret = os.environ.get("OR_CLIENT_SECRET", "0oQjzTfiEELYmj5jFwT4iIuWUDtQDvVa") data = urllib.parse.urlencode({ "grant_type": "password", "username": os.environ.get("OR_ADMIN_USER", "admin"), "password": os.environ.get("OR_ADMIN_PASS", "Digitribe972"), "client_id": client_id, "client_secret": client_secret }).encode() req = urllib.request.Request( token_url, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"} ) with urllib.request.urlopen(req, timeout=5) as r: token_data = json.loads(r.read().decode()) _or_token_cache["token"] = token_data["access_token"] _or_token_cache["expires"] = time.time() + token_data.get("expires_in", 300) - 60 return _or_token_cache["token"] except Exception as e: print(f" ⚠️ OpenRemote token → {e}") return "" def _or_put(asset_id: str, payload: dict) -> bool: """PUT update sur un asset OpenRemote (sans If-Match pour éviter 403).""" token = _get_or_token() if not token: return False try: body = json.dumps(payload).encode() url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}" req = urllib.request.Request(url, data=body, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json", }, method="PUT") with http_request_duration.labels(broker="openremote", method="PUT").time(): with urllib.request.urlopen(req, timeout=5) as resp: http_requests_total.labels(broker="openremote", method="PUT", status_code=str(resp.status)).inc() messages_published_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown")).inc() return resp.status in (200, 204) except urllib.error.HTTPError as e: http_requests_total.labels(broker="openremote", method="PUT", status_code=str(e.code)).inc() messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="http_error").inc() print(f" ⚠️ OR PUT {asset_id} → HTTP {e.code}") return False except Exception as e: http_requests_total.labels(broker="openremote", method="PUT", status_code="exception").inc() messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="exception").inc() print(f" ⚠️ OR PUT {asset_id} → {e}") return False def publish_openremote(sid: str, sensor: dict, values: dict) -> bool: """Met à jour les attributs d'un asset OpenRemote via REST.""" # Mapping sid → asset ID (realm master) — assets avec agentLink + location # Note: le compteur SENSORS est global (traffic=0-9, airquality=10-19, parking=20-29, noise=30-39, weather=40-49, light=50-59) ASSET_MAP = { "traffic_000": "429858caca3341f56fbf65", "traffic_001": "301218322f5aaca9d6d168", "traffic_002": "bd35fe2a90133118b9b004", "traffic_003": "da59ec9301c4efd3fd55c4", "traffic_004": "834f4b7b9df848f5c5c2d8", "airquality_010": "0f922351a9894bc0144c94", "airquality_011": "4f83219bbee703b3e0a255", "airquality_012": "381cc31ab83dd66ed4be37", "airquality_013": "808b73c22ecd19589a33be", "airquality_014": "03c18679226329183b44b6", "parking_020": "0ee6689f5c0499643d48eb", "parking_021": "8fb6b2d0601d98b47a4172", "parking_022": "0c00bda9e5075d12d59694", "parking_023": "ae981dc9d155d1313b9acf", "parking_024": "96020cc5aef95c5fda7bb4", "noise_030": "0be31930e45d2eb5c12ccd", "noise_031": "1802e76e3432d5eda1deb7", "noise_032": "08edb6518750d50644afe3", "noise_033": "93d09bfac36d2ed95fc858", "noise_034": "7942726d84d2bd29de1e5d", "weather_040": "9942f881ab6df375d8d9fa", "weather_041": "5400fdf5c51a4fe4f5a89c", "weather_042": "1a3bf32aa5208892e68965", "weather_043": "d3725f922f96085f2df3f7", "weather_044": "13be192a8c23dd8fdceada", "light_050": "1f4302946b1a4a1ded23f6", "light_051": "35e6ef027ed9a157ad8780", "light_052": "526538589aa981bdc77ce9", "light_053": "d4a6ac7f34d64e581937c0", "light_054": "40bbe989be2ae5b2a98b30", } asset_id = ASSET_MAP.get(sid) if not asset_id: return False # Construire les attributs à jour now = datetime.now(timezone.utc).isoformat() attrs = { "timestamp": {"type": "DateTime", "value": now, "timestamp": now}, "battery_level": {"type": "Number", "value": random.randint(60, 100), "timestamp": now}, } # Mapper les valeurs du payload vers les attributs OR field_map = { "temperature_celsius": "temperature", "humidity_percent": "humidity", "noise_level_db": "noiseLevel", "pm25_ugm3": "airQuality", "vehicle_count": "trafficFlow", "available_spots": "parking", "brightness_lux": "light", "flood": "flood", } for field, val in values.items(): attr_name = field_map.get(field, field) attrs[attr_name] = { "type": "Number", "value": val, "timestamp": now, "unit": "µg/m³" if "ugm3" in field else ("°C" if "celsius" in field else ("%" if "percent" in field or "humidity" in field else ("dB" if "db" in field else ""))), } # Ajouter la location du capteur (GeoJSON Point) lat = sensor.get("lat", 0) lon = sensor.get("lon", 0) attrs["location"] = { "type": "GeoJSONPoint", "value": {"type": "Point", "coordinates": [lat, lon]}, "timestamp": now, } payload = { "id": asset_id, "name": sensor["name"], "type": "IOTSensor", "realm": OR_REALM, "attributes": attrs, } return _or_put(asset_id, payload) # ============================================================================= # Pulsar — HTTP REST Producer # API: POST http://host:8080/admin/v2/persistent/public/default/{topic}/produce # Payload: {"messages": [{"payload": "", "properties": {...}}]} # Topics auto-créés par le premier message (Pulsar standalone) # ============================================================================= _pulsar_session = None def _get_pulsar_session(): global _pulsar_session if _pulsar_session is None: import urllib.request _pulsar_session = urllib.request return _pulsar_session def _init_pulsar() -> bool: """Teste la connectivité Pulsar au démarrage.""" try: import urllib.request req = urllib.request.Request(f"{PULSAR_BASE}/admin/v2/clusters") with urllib.request.urlopen(req, timeout=5) as resp: if resp.status == 200: print(f"[PULSAR] ✅ Connected to {PULSAR_BASE}") return True except Exception as e: print(f"[PULSAR] ⚠️ Cannot reach {PULSAR_BASE}: {e}") return False def publish_pulsar(sid: str, sensor: dict, payload: dict) -> bool: """Publie un message sur Pulsar via le client Python (port binaire 6650).""" stype = sensor["type"] topic = f"persistent://public/default/smartcity-{stype.replace('-','')}" try: import pulsar with publish_duration.labels(broker="pulsar").time(): client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650") producer = client.create_producer(topic) body = json.dumps(payload, ensure_ascii=False).encode() producer.send(body, properties={"sensor_id": sid, "source": "simulator"}) client.close() messages_published_total.labels(broker="pulsar", sensor_type=stype).inc() message_payload_size.labels(broker="pulsar").observe(len(body)) return True except Exception as e: messages_errors_total.labels(broker="pulsar", sensor_type=stype, error_type="exception").inc() print(f" ⚠️ Pulsar → {e}") return False # ============================================================================= # Redpanda / Kafka — HTTP REST Proxy # API: POST http://host:8082/topics/{topic} # Payload: {"records": [{"value": ""}]} # Topics auto-créés par le premier message (Redpanda) # ============================================================================= _redpanda_session = None def _get_redpanda_session(): global _redpanda_session if _redpanda_session is None: import urllib.request _redpanda_session = urllib.request return _redpanda_session def _init_redpanda() -> bool: """Teste la connectivité Redpanda au démarrage.""" try: import urllib.request req = urllib.request.Request(f"{REDPANDA_BASE}/v1/status/alive") with urllib.request.urlopen(req, timeout=5) as resp: if resp.status == 200: print(f"[REDPANDA] ✅ Connected to {REDPANDA_BASE}") return True except Exception as e: print(f"[REDPANDA] ⚠️ Cannot reach {REDPANDA_BASE}: {e}") return False def publish_redpanda(sid: str, sensor: dict, payload: dict) -> bool: """Publie un message sur Redpanda/Kafka via le REST Proxy.""" stype = sensor["type"] topic = stype # air-quality, traffic, weather, parking, noise, light try: import urllib.request, base64 body = json.dumps(payload, ensure_ascii=False) b64 = base64.b64encode(body.encode()).decode() record = { "records": [{"value": b64}] } url = f"{REDPANDA_BASE}/topics/{topic}" req = urllib.request.Request( url, data=json.dumps(record).encode(), headers={"Content-Type": "application/vnd.kafka.json.v2+json"}, method="POST" ) with http_request_duration.labels(broker="redpanda", method="POST").time(): with urllib.request.urlopen(req, timeout=8) as resp: http_requests_total.labels(broker="redpanda", method="POST", status_code=str(resp.status)).inc() messages_published_total.labels(broker="redpanda", sensor_type=stype).inc() message_payload_size.labels(broker="redpanda").observe(len(body.encode())) return resp.status in (200, 201, 204) except urllib.error.HTTPError as e: http_requests_total.labels(broker="redpanda", method="POST", status_code=str(e.code)).inc() messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="http_error").inc() print(f" ⚠️ Redpanda → {e.code}") return False except Exception as e: http_requests_total.labels(broker="redpanda", method="POST", status_code="exception").inc() messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="exception").inc() print(f" ⚠️ Redpanda → {e}") return False def publish_influx(sid: str, sensor: dict, values: dict) -> bool: """Write sensor data to InfluxDB (async, non-blocking).""" if not _influx_write_api: influx_write_total.labels(status="skipped").inc() return False def _write_async(): try: stype = sensor["type"] lat = sensor.get("lat", BASE_LAT) lon = sensor.get("lon", BASE_LON) points = [] for field, value in values.items(): if isinstance(value, (int, float)): p = influxdb_client.Point(stype)\ .tag("sensor_id", sid)\ .tag("location", sensor.get("name", sid))\ .field(field, float(value))\ .field("lat", float(lat))\ .field("lon", float(lon)) points.append(p) if points: _influx_write_api.write(bucket=INFLUX_BUCKET, record=points) influx_write_total.labels(status="success").inc() print(f" 📈 InfluxDB: {len(points)} points written") except Exception as e: influx_write_total.labels(status="error").inc() print(f" ⚠️ InfluxDB → {e}") # Exécution asynchrone (non-bloquante) t = threading.Thread(target=_write_async, daemon=True) t.start() return True # Async: on ne peut pas savoir immédiatement def main(): print("╔══════════════════════════════════════════════════╗") print("║ Smart City Simulator — Martinique ║") print("╚══════════════════════════════════════════════════╝") print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s") # print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}") print(f"[CFG] InfluxDB: {ENABLE_INFLUX} | Pulsar: {ENABLE_PULSAR} | Redpanda: {ENABLE_REDPANDA}") # --- Démarrer le serveur Prometheus --- _start_metrics_server() # --- Configurer les gauges --- for stype, count in SENSOR_COUNTS.items(): sensors_total.labels(sensor_type=stype).set(count) up.set(1) # Init connectivity checks if ENABLE_PULSAR: _init_pulsar() # Test immédiat print(f" 🌪️ DEBUG: Test Pulsar direct...", flush=True) test_payload = {"type": "test", "value": 123} test_result = publish_pulsar("test_001", {"type": "air-quality"}, test_payload) print(f" 🌪️ DEBUG: Test Pulsar result: {test_result}", flush=True) if ENABLE_REDPANDA: _init_redpanda() mqtt_client = MultiMQTT() running = True def signal_handler(*_): nonlocal running running = False up.set(0) print("\n[SIM] 🛑 Arrêt...") signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) iteration = 0 while running: iteration += 1 ts = datetime.now().strftime("%H:%M:%S") print(f"\n[SIM] ⏱️ It #{iteration} — {ts}") for sid, sensor in SENSORS.items(): stype = sensor["type"] # Utiliser l'index du capteur dans FIXED_LOCATIONS (1-5) pour correspondre aux agents OpenRemote # Le sid est formaté comme {stype}_{counter:03d} avec counter global # On extrait l'index du capteur depuis le sid (derniers chiffres) sensor_num = int(sid.split("_")[1]) # Calculer l'index 1-5 basé sur la position dans le type # traffic: 0-9, airquality: 10-19, parking: 20-29, noise: 30-39, weather: 40-49, light: 50-59 type_offsets = {"traffic": 0, "airquality": 10, "parking": 20, "noise": 30, "weather": 40, "light": 50} type_offset = type_offsets.get(stype, 0) sensor_index = sensor_num - type_offset + 1 # 1-indexed topic = f"smartcity/{stype}/{sensor_index}" # --- Payload MQTT (ATTRIBUTES ONLY - pas de id/type/lat/lon !) # # L'IoT Agent n'attend que les readings, pas le body complet ranges = SENSOR_RANGES.get(stype, {}) payload_mqtt = { "timestamp": datetime.now(timezone.utc).isoformat(), "battery_level": random.randint(60, 100), } for field, val_range in ranges.items(): if isinstance(val_range, tuple) and len(val_range) == 2: lo, hi = val_range if isinstance(lo, (int, float)): payload_mqtt[field] = round(random.uniform(lo, hi), 1) elif isinstance(val_range, list): payload_mqtt[field] = random.choice(val_range) msg = json.dumps(payload_mqtt, ensure_ascii=False) # --- MQTT publish --- results = mqtt_client.publish(topic, msg, sensor_type=stype) ok_mqtt = [n for n, r in results.items() if r] if ok_mqtt: print(f" 📤 {topic} → {','.join(ok_mqtt)}") # # --- IoT-Agent (via EMQX) --- # if ENABLE_IOT_AGENT: # ok_iot = mqtt_client.publish_iot_agent(sid, payload_mqtt, sensor_type=stype) # print(f" 🤖 IoT-Agent: {'✅' if ok_iot else '❌'}") # Extraire les valeurs pour OpenRemote or_values = {} for field, val_range in ranges.items(): if isinstance(val_range, tuple) and len(val_range) == 2: lo, hi = val_range if isinstance(lo, (int, float)): or_values[field] = round(random.uniform(lo, hi), 1) # --- OpenRemote REST --- if ENABLE_OPENREMOTE: ok_or = publish_openremote(sid, sensor, or_values) print(f" 🏠 OpenRemote: {'✅' if ok_or else '⚠️ skipped'}") # # --- Orion-LD --- (DÉSACTIVÉ: tout passe par les IoT-Agents MQTT) # # if ENABLE_ORION: # # ok_or = publish_orion(sid, sensor) # # print(f" 🌐 Orion-LD: {'✅' if ok_or else '⚠️ skipped'}") # # --- Stellio --- (DÉSACTIVÉ: tout passe par les IoT-Agents MQTT) # # if ENABLE_STELLIO: # # ok_st = publish_stellio(sid, sensor) # # print(f" 🏢 Stellio: {'✅' if ok_st else '❌'}") # --- FROST --- if ENABLE_FROST: ranges2 = SENSOR_RANGES.get(stype, {}) for field, val_range in ranges2.items(): if isinstance(val_range, tuple) and len(val_range) == 2: lo, hi = val_range if isinstance(lo, (int, float)): val = round(random.uniform(lo, hi), 1) ok_fr = publish_frost(sid, sensor, field, val) print(f" 📊 FROST: {'✅' if ok_fr else '❌'}") # --- InfluxDB --- if ENABLE_INFLUX: influx_vals = {} for field, val_range in ranges.items(): if isinstance(val_range, tuple) and len(val_range) == 2: lo, hi = val_range if isinstance(lo, (int, float)): influx_vals[field] = round(random.uniform(lo, hi), 1) ok_influx = publish_influx(sid, sensor, influx_vals) print(f" 📈 InfluxDB: {'✅' if ok_influx else '❌'}") # --- Pulsar (HTTP REST) --- if ENABLE_PULSAR: print(f" 🌪️ DEBUG: calling publish_pulsar for {sid}, payload_mqtt exists: {bool(locals().get('payload_mqtt'))}", flush=True) ok_pulsar = publish_pulsar(sid, sensor, payload_mqtt) print(f" 🌪️ Pulsar: {'✅' if ok_pulsar else '❌'}") # --- Redpanda (Kafka REST Proxy) --- if ENABLE_REDPANDA: ok_redpanda = publish_redpanda(sid, sensor, payload_mqtt) print(f" 🐟 Redpanda: {'✅' if ok_redpanda else '❌'}") # --- BunkerM HTTP --- if os.getenv("BUNKERM_HTTP", "0") == "1": ok_bunkerm = publish_bunkerm(sid, sensor, payload_mqtt) print(f" 📦 BunkerM: {'✅' if ok_bunkerm else '❌'}") print(f"[SIM] ✅ {len(SENSORS)} capteurs | MQTT OK: {sum(mqtt_client.ok.values())}/{len(mqtt_client.clients)} | OR: {ENABLE_OPENREMOTE}") try: time.sleep(INTERVAL) except KeyboardInterrupt: break mqtt_client.stop() print("[SIM] ✅ Arrêté proprement.") if __name__ == "__main__": main()