From dbf8b7f5ca91f7cc5ca63c7639ebdc83e63cf22f Mon Sep 17 00:00:00 2001 From: Eric FELIXINE Date: Tue, 12 May 2026 08:18:32 -0400 Subject: [PATCH] =?UTF-8?q?docs:=20=C3=A9tat=20des=20lieux=20localisation?= =?UTF-8?q?=20capteurs=20OpenRemote?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Documentation des découvertes et corrections appliquées - Problèmes restants identifiés (connexion MQTT, topics, déconnexion) - Prochaines étapes recommandées --- LOCALISATION-CAPTEURS-STATUS.md | 65 ++ simulator.py.backup_20260512_073330 | 1232 +++++++++++++++++++++++++++ 2 files changed, 1297 insertions(+) create mode 100644 LOCALISATION-CAPTEURS-STATUS.md create mode 100644 simulator.py.backup_20260512_073330 diff --git a/LOCALISATION-CAPTEURS-STATUS.md b/LOCALISATION-CAPTEURS-STATUS.md new file mode 100644 index 00000000..3d67cacf --- /dev/null +++ b/LOCALISATION-CAPTEURS-STATUS.md @@ -0,0 +1,65 @@ +# État des lieux - Localisation des capteurs sur les maps OpenRemote + +## Problème initial +Les capteurs du simulateur n'apparaissent pas sur les maps OpenRemote (realm master et smart city martinique). + +## Découvertes + +### 1. Deux sets d'assets en BDD +- **Anciens assets** (avec suffixe `(traffic)`, `(airquality)`, etc.) : ont `agentLink` MQTT + `location` GeoJSON → ce sont les bons +- **Nouveaux assets** (sans suffixe, créés par le simulateur via REST) : sans `agentLink`, sans `location` + +### 2. Format de la location +L'attribut `location` dans OpenRemote utilise le format GeoJSON Point : +```json +{"type": "GeoJSONPoint", "value": {"type": "Point", "coordinates": [lat, lon]}} +``` + +### 3. Compteur SENSORS global +Le compteur utilisé pour générer les clés SENSORS est **global** (pas par type) : +- traffic: 0-9, airquality: 10-19, parking: 20-29, noise: 30-39, weather: 40-49, light: 50-59 + +### 4. API REST refuse les PUT sur assets avec agentLink +L'API REST d'OpenRemote refuse les mises à jour (HTTP 403) sur les assets qui ont un `agentLink` actif. C'est une protection pour éviter les conflits avec l'agent MQTT. + +### 5. Connexion MQTT au broker Artemis +Le broker Artemis d'OpenRemote nécessite un **"Service user"** avec username/password pour l'authentification MQTT (rc=5 = Not Authorized sans credentials). La documentation mentionne ce mécanisme mais ne détaille pas comment créer le service user. + +### 6. Topics MQTT pour l'API interne +La documentation indique que les topics pour publier des valeurs d'attributs sont : +- `{realm}/{clientId}/writeattributevalue/{attributeName}/{assetId}` - Payload: JSON de la valeur +- `{realm}/{clientId}/writeattribute/{attributeName}/{assetId}` - Payload: `{"value": , "timestamp": }` + +Le format `smartcity/{type}/{id}` utilisé par le simulateur est pour les agents MQTT externes, pas pour l'API MQTT interne. + +## Corrections appliquées au simulateur + +1. **ASSET_MAP mis à jour** avec les bons asset IDs (ceux avec agentLink + location) +2. **Location ajoutée dans le payload REST** (GeoJSONPoint) +3. **Topics MQTT corrigés** (index basé sur position du capteur, pas compteur global) +4. **REST désactivé** pour les assets avec agentLink (403) +5. **Connexion MQTT anonyme** au broker Artemis (rc=5 persistant) + +## Problèmes restants + +### Connexion MQTT au broker Artemis +Le broker refuse les connexions anonymes (rc=5). Il faut un "Service user" dont la création n'est pas documentée. Solutions possibles : +1. Créer un service user via l'UI OpenRemote (Manager UI → Users) +2. Modifier la configuration Artemis pour accepter les connexions anonymes +3. Utiliser un broker MQTT externe (EMQX) et configurer un agent MQTT dans OpenRemote + +### Topics MQTT +Le simulateur publie sur `smartcity/{type}/{index}` mais l'API MQTT d'OpenRemote attend `{realm}/{clientId}/writeattributevalue/{attributeName}/{assetId}`. Il faut soit : +1. Changer le format des topics dans le simulateur +2. Configurer un agent MQTT dans OpenRemote qui écoute sur `smartcity/#` + +### Déconnexion cyclique +Le broker Artemis déconnecte le client MQTT du simulateur de manière cyclique. Cause possible : keepalive trop court ou configuration du broker. + +## Prochaines étapes recommandées + +1. **Créer un service user** dans OpenRemote pour l'authentification MQTT +2. **Configurer un agent MQTT** dans OpenRemote qui écoute sur `smartcity/#` et mappe les topics vers les attributs des assets +3. **Corriger le format des topics** dans le simulateur pour utiliser le format de l'API MQTT d'OpenRemote +4. **Tester la connexion MQTT** avec les bons credentials +5. **Vérifier la localisation** sur les maps OpenRemote une fois que les agents MQTT reçoivent les données diff --git a/simulator.py.backup_20260512_073330 b/simulator.py.backup_20260512_073330 new file mode 100644 index 00000000..aa0ea754 --- /dev/null +++ b/simulator.py.backup_20260512_073330 @@ -0,0 +1,1232 @@ +#!/usr/bin/env python3 +""" +Smart City IoT Simulator — Martinique (14.6°N, 61.2°W) +======================================================= +Publie vers MULTIPLES brokers MQTT + context brokers NGSI-LD. + +Brokers MQTT: + - EMQX: emqx_emqx_1:1883 (sans auth) + - Mosquitto: mainfluxlabs-mosquitto:1883 (bunker/bunker) + - BunkerM: bunkerm_bunkerm_1:1900 (TLS, bunker/bunker) + - OpenRemote: openremote-manager-1:1883 (admin/Digitribe972) + +Context Brokers REST: +# - Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD) +# - Stellio: stellio-api-gateway:8080 (NGSI-LD) + - FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings) + +Streaming Platforms: + - Pulsar: smart-city-pulsar:8080 (HTTP REST Producer API) + - Redpanda: smart-city-redpanda:8082 (Kafka REST Proxy) + +Time-Series DB: + - InfluxDB v2: smart-city-influxdb:8086 (via influxdb-client) + +Variables d'environnement: + PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s) + BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France) +# ENABLE_ORION=1 : activer Orion-LD (défaut: 1) +# ENABLE_STELLIO=1 : activer Stellio (défaut: 1) + ENABLE_FROST=1 : activer FROST-Server (défaut: 1) + ENABLE_INFLUX=1 : activer InfluxDB v2 (défaut: 1) + ENABLE_PULSAR=1 : activer Apache Pulsar (défaut: 1) + ENABLE_REDPANDA=1 : activer Redpanda Kafka (défaut: 1) +""" + +import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse +import paho.mqtt.client as mqtt +import urllib.request, urllib.error +from datetime import datetime, timezone +from typing import Any + +# Prometheus metrics +import prometheus_client +from prometheus_client import Counter, Histogram, Gauge, Info + +# InfluxDB support +import influxdb_client +from influxdb_client.client.write_api import SYNCHRONOUS + +# ============================================================================= +# Configuration des brokers MQTT +# Configuration des brokers MQTT +# Utilise les noms de services Docker par défaut +EMQX_HOST = os.environ.get("EMQX_HOST", "emqx_emqx_1") +EMQX_PORT = int(os.environ.get("EMQX_PORT", "1883")) +MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "smart-city-mosquitto") +MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883")) +BUNKERM_HOST = os.environ.get("BUNKERM_HOST", "bunkerm_bunkerm_1") +BUNKERM_PORT = int(os.environ.get("BUNKERM_PORT", "1900")) + +# ============================================================================= +# Configuration +# ============================================================================= +BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091")) +BASE_LON = float(os.environ.get("BASE_LON", "-61.2155")) +INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "1")) # 1s pour temps réel +#ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1" +#ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1").lower() in ("1", "true", "yes", "on") +ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1" +ENABLE_OPENREMOTE = os.environ.get("ENABLE_OPENREMOTE", "1") == "1" +OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin") +OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "Digitribe972") +OR_REALM = os.environ.get("OR_REALM", "master") +OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token +#ENABLE_IOT_AGENT = os.environ.get("ENABLE_IOT_AGENT", "1") == "1" +FROST_URL = os.environ.get("FROST_URL", "http://localhost:8090/FROST-Server/v1.1") # Exposer frost_http-web-1:8080 -> host:8086 + +# Pulsar config (HTTP REST — pulsar-admin + producer REST API) +ENABLE_PULSAR = os.environ.get("ENABLE_PULSAR", "1").lower() in ("1", "true", "yes", "on") +PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar") +PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "8080")) +PULSAR_BASE = f"http://{PULSAR_HOST}:{PULSAR_PORT}" + +# Redpanda / Kafka config (REST Proxy HTTP) +ENABLE_REDPANDA = os.environ.get("ENABLE_REDPANDA", "1").lower() in ("1", "true", "yes", "on") +REDPANDA_HOST = os.environ.get("REDPANDA_HOST", "smart-city-redpanda") +REDPANDA_PORT = int(os.environ.get("REDPANDA_PORT", "8082")) +REDPANDA_BASE = f"http://{REDPANDA_HOST}:{REDPANDA_PORT}" + +# InfluxDB config +ENABLE_INFLUX = os.environ.get("ENABLE_INFLUX", "1").lower() in ("1", "true", "yes", "on") +INFLUX_URL = os.environ.get("INFLUX_URL", "http://smart-city-influxdb:8086") # InfluxDB v2 sur smartcity-shared +INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe") +INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "smartcity") # Correspond au bucket de Telegraf +INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-token") + +# Prometheus metrics HTTP server +METRICS_PORT = int(os.environ.get("METRICS_PORT", "8001")) + +# ============================================================================= +# Prometheus Metrics Definitions +# ============================================================================= + +# --- Info --- +simulator_info = Info( + "simulator", "Smart City Simulator info" +) +simulator_info.info({ + "version": "1.0.0", + "python_version": sys.version.split()[0], + "mqtt_brokers": "emqx,mosquitto,bunkerm", +# "context_brokers": "orion_ld,stellio,frost", +}) + +# --- Counters --- +messages_published_total = Counter( + "simulator_messages_published_total", + "Total messages published by broker", + ["broker", "sensor_type"] +) + +messages_errors_total = Counter( + "simulator_messages_errors_total", + "Total publish errors", + ["broker", "sensor_type", "error_type"] +) + +mqtt_connection_total = Counter( + "simulator_mqtt_connection_total", + "MQTT connection attempts", + ["broker", "status"] # status: success, failure +) + +http_requests_total = Counter( + "simulator_http_requests_total", + "HTTP requests to REST APIs", + ["broker", "method", "status_code"] +) + +influx_write_total = Counter( + "simulator_influx_write_total", + "InfluxDB write operations", + ["status"] # success, error +) + +# --- Gauges --- +mqtt_broker_connected = Gauge( + "simulator_mqtt_broker_connected", + "MQTT broker connection status (1=connected, 0=disconnected)", + ["broker"] +) + +sensors_total = Gauge( + "simulator_sensors_total", + "Total number of sensors by type", + ["sensor_type"] +) + +up = Gauge( + "simulator_up", + "Simulator is running (1=yes, 0=no)" +) + +# --- Histograms --- +publish_duration = Histogram( + "simulator_publish_duration_seconds", + "Time spent publishing a message", + ["broker"], + buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5) +) + +http_request_duration = Histogram( + "simulator_http_request_duration_seconds", + "HTTP request latency to REST APIs", + ["broker", "method"], + buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0) +) + +message_payload_size = Histogram( + "simulator_message_payload_size_bytes", + "Message payload size in bytes", + ["broker"], + buckets=(64, 128, 256, 512, 1024, 2048, 4096, 8192) +) + +# Start Prometheus HTTP server in a background thread +def _start_metrics_server(): + def run(): + prometheus_client.start_http_server(METRICS_PORT) + print(f"[METRICS] 🚀 Prometheus metrics on :{METRICS_PORT}/metrics") + t = threading.Thread(target=run, daemon=True) + t.start() + return t + +# Initialize InfluxDB client +_influx_client = None +_influx_write_api = None +if ENABLE_INFLUX: + try: + _influx_client = influxdb_client.InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG) + _influx_write_api = _influx_client.write_api(write_options=influxdb_client.client.write_api.ASYNCHRONOUS) + print(f"[INFLUX] ✅ Connected to {INFLUX_URL} (async mode)") + except Exception as e: + print(f"[INFLUX] ❌ Connection failed: {e}") + +SENSOR_COUNTS = { + "traffic": int(os.environ.get("SENSOR_COUNT_traffic", "3")), + "airquality": int(os.environ.get("SENSOR_COUNT_airquality", "10")), + "parking": int(os.environ.get("SENSOR_COUNT_parking", "10")), + "noise": int(os.environ.get("SENSOR_COUNT_noise", "1")), + "weather": int(os.environ.get("SENSOR_COUNT_weather", "1")), + "light": int(os.environ.get("SENSOR_COUNT_light", "1")), +} +# Si SENSOR_COUNT est défini, multiplier les counts de façon proportionnelle +_total_default = sum(SENSOR_COUNTS.values()) +if "SENSOR_COUNT" in os.environ: + target = int(os.environ["SENSOR_COUNT"]) + ratio = target / _total_default + for k in SENSOR_COUNTS: + SENSOR_COUNTS[k] = max(1, int(SENSOR_COUNTS[k] * ratio)) + +# ============================================================================= +# Localisation des capteurs Martinique — Coordonnées FIXES sur terre ferme +# IMPORTANT : ±0.02° autour de Fort-de-France donne ~2km, or Martinique fait +# ~60km de long — certains points tombent en mer. Solution : coordonnées fixes. +# ============================================================================= + +# Coordonnées réelles Martinique (terre ferme uniquement) +# Martinique : 14.4°N–14.9°N, -61.23°W–-60.8°W +# Coordonnées GPS exactes depuis les assets OpenRemote (realm master) +# Martinique bounds: lat 14.37–14.88°N, lon 61.0–61.25°W +FIXED_LOCATIONS: dict[str, dict[str, tuple[float, float]]] = { + "traffic": { + "Fort-de-France Centre": (14.6164, -61.07), + "Le Lamentin Aéroport": (14.6167, -61.0035), + "Le Robert D110": (14.6833, -60.9333), + "Sainte-Anne Plage": (14.4333, -60.9833), + "Saint-Joseph D1": (14.7, -61.05), + "Trinité Centre": (14.7167, -60.9167), + "Le François D2": (14.6833, -60.8333), + "Ducos Penitencier": (14.5833, -61.0667), + "Schœlcher Morne": (14.65, -61.1), + "Case-Pilote Bourg": (14.5167, -61.1167), + }, + "airquality": { + "Fort-de-France Lamartine": (14.613, -61.0667), + "Le Lamentin Zac": (14.62, -61.0), + "Le Robert Bourg": (14.68, -60.93), + "Sainte-Anne Village": (14.43, -60.98), + "Saint-Joseph Morne": (14.705, -61.04), + "Trinité Eglise": (14.72, -60.91), + "Le François Bourg": (14.68, -60.83), + "Ducos Centre": (14.58, -61.06), + "Schœlcher Plage": (14.655, -61.11), + "Case-Pilote D1": (14.52, -61.12), + }, + "parking": { + "Fort-de-France Place Clémenceau": (14.615, -61.068), + "Le Lamentin Centre Commercial": (14.618, -61.002), + "Le Robert Stade": (14.685, -60.935), + "Sainte-Anne Mairie": (14.432, -60.985), + "Saint-Joseph Ecole": (14.702, -61.045), + "Trinité Port": (14.715, -60.92), + "Le François Mairie": (14.682, -60.835), + "Ducos ZI": (14.585, -61.055), + "Schœlcher Bourg": (14.652, -61.105), + "Case-Pilote Stade": (14.518, -61.118), + }, + "noise": { + "Fort-de-France Théâtre": (14.617, -61.069), + "Le Lamentin Zone Industrielle": (14.619, -61.001), + "Le Robert Bourg": (14.681, -60.932), + "Sainte-Anne Plage": (14.434, -60.982), + "Saint-Joseph Morne": (14.703, -61.042), + "Trinité Centre": (14.717, -60.918), + "Le François Bourg": (14.681, -60.832), + "Ducos Penitencier": (14.584, -61.058), + "Schœlcher Morne": (14.651, -61.102), + "Case-Pilote Village": (14.519, -61.115), + }, + "weather": { + "Fort-de-France Meteo": (14.616, -61.067), + "Le Lamentin Aéroport": (14.617, -61.004), + "Le Robert Bourg": (14.682, -60.934), + "Sainte-Anne Village": (14.431, -60.981), + "Saint-Joseph Morne": (14.704, -61.043), + "Trinité Eglise": (14.718, -60.912), + "Le François Bourg": (14.683, -60.834), + "Ducos Centre": (14.586, -61.057), + "Schœlcher Plage": (14.654, -61.108), + "Case-Pilote D1": (14.521, -61.113), + }, + "light": { + "Fort-de-France Place": (14.6155, -61.0685), + "Le Lamentin Rond-point": (14.6185, -61.0025), + "Le Robert D110": (14.6835, -60.9335), + "Sainte-Anne Plage": (14.4335, -60.9835), + "Saint-Joseph D1": (14.7005, -61.0505), + "Trinité Centre": (14.7165, -60.9165), + "Le François D2": (14.6835, -60.8335), + "Ducos Penitencier": (14.5835, -61.0665), + "Schœlcher Morne": (14.6505, -61.1005), + "Case-Pilote Bourg": (14.5165, -61.1165), + }, +} + +# Ranges par type +SENSOR_RANGES: dict[str, dict] = { + "traffic": {"vehicle_count":(10,150),"average_speed_kmh":(10,80), + "congestion_level":(0,5),"occupancy_percent":(0,100)}, + "airquality": {"pm25_ugm3":(5,80),"pm10_ugm3":(10,150),"no2_ugm3":(5,60), + "o3_ugm3":(20,120),"co_mgm3":(0.1,5.0), + "temperature_celsius":(20,35),"humidity_percent":(40,95)}, + "parking": {"total_spots":(50,500),"available_spots":(0,500), + "occupancy_percent":(0,100),"turnover_per_hour":(5,50)}, + "noise": {"noise_level_db":(40,95),"peak_db":(60,110)}, + "weather": {"temperature_celsius":(22,34),"humidity_percent":(50,95), + "wind_speed_kmh":(0,50),"pressure_hpa":(1005,1025), + "rain_mm":(0,20),"uv_index":(0,11)}, + "light": {"brightness_lux":(0,100000),"power_consumption_w":(0,500)}, +} + +NOISE_CATEGORIES = ["quiet","moderate","loud","very_loud"] +LIGHT_STATUSES = ["on","off","dimmed","auto"] + +# ============================================================================= +# Capteurs déclarés +# ============================================================================= +SENSORS: dict[str, dict] = {} +counter = 0 +for stype, locs in FIXED_LOCATIONS.items(): + for name, coords in locs.items(): + sid = f"{stype}_{counter:03d}" + SENSORS[sid] = {"type": stype, "lat": coords[0], "lon": coords[1], "name": name} + counter += 1 + +# ============================================================================= +## Payload NGSI-LD pour Orion-LD / Stellio +# ============================================================================= +# Contextes NGSI-LD : core + Smart Data Models +# https://smartdatamodels.org pour les @context officiels +## Contexte NGSI-LD pur pour Orion-LD (vocabulaires standards uniquement) +## Orion-LD ne peut pas résoudre raw.githubusercontent.com — utiliser uri.etsi.org uniquement +#ORION_CONTEXT = [ +# "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", +#] + +# Mapping sensor type → Smart Data Model type NGSI-LD +SMART_MODEL_MAPPING = { + "airquality": "AirQualityObserved", + "traffic": "TrafficFlowObserved", + "parking": "OffStreetParking", + "noise": "NoiseLevelObserved", + "weather": "WeatherObserved", + "light": "Device", +} +FROST_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"} + +# Cache FROST : éviter de recréer Thing/Datastream +_frost_cache: dict[str, tuple[str, str]] = {} # (sid, field) -> (thing_id, ds_id) + +## Contexte NGSI-LD pur pour Stellio et Orion-LD (vocabulaires standards uniquement) +## Stellio et Orion-LD embarquent le contexte core NGSI-LD : https://uri.etsi.org/ngsi-ld/ +# On n'utilise PAS les vocabulaires smartdatamodels.org distants (inaccessibles depuis les containers) +# Les types d'entité Smart Data Models (AirQualityObserved, etc.) sont reconnus par leur nom +# Les propriétés spécifiques sont stockées telles quelles (vocabulaire libre) +#STELLIO_INLINE_CONTEXT = [ +# "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", +#] + +def _frost_payload(sid: str, sensor: dict, source: str = "simulator", topic: str = "") -> dict: + """Construit un payload SensorThings pour FROST-Server.""" + stype = sensor["type"] + ranges = SENSOR_RANGES.get(stype, {}) + datastreams = [] + + for field, val_range in ranges.items(): + if isinstance(val_range, tuple) and len(val_range) == 2: + lo, hi = val_range + if isinstance(lo, (int, float)) and isinstance(hi, (int, float)): + val = round(random.uniform(lo, hi), 1) + unit = "http://www.qudt.org/vocab/unit#DegreeCelsius" + obs_prop = { + "name": f"{field} Observation", + "description": f"Observation of {field}", + "definition": unit, + } + sensor_data = { + "name": f"Sensor {sid} {field}", + "description": f"Sensor {sid} measuring {field}", + "encodingType": "http://www.opengis.net/doc/IS/SensorML/2.0", + "metadata": {"unit": unit}, + } + ds = { + "name": f"Datastream {stype}/{field}", + "description": f"Datastream for {stype} sensor {sid} - {field}", + "observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement", + "unitOfMeasurement": {"name": field, "symbol": "", "definition": unit}, + "Sensor": sensor_data, + "ObservedProperty": obs_prop, + } + datastreams.append((field, ds, val)) + + thing_payload = { + "name": f"Thing_{sid}", + "description": f"Smart City {stype} sensor in Martinique", + "properties": { + "sensorType": stype, + "region": "Martinique", + "source": source, # Traçabilité + "mqttTopic": topic # Traçabilité + }, + } + return thing_payload, datastreams + +# ============================================================================= +# HTTP helper +# ============================================================================= +def _http_post(url: str, data: dict, headers: dict, broker: str = "unknown") -> str: + """POST et retourne 'ok' ou 'created' (ou '' si échec).""" + try: + body = json.dumps(data).encode() + req = urllib.request.Request(url, data=body, headers=headers, method="POST") + with http_request_duration.labels(broker=broker, method="POST").time(): + try: + with urllib.request.urlopen(req, timeout=8) as resp: + http_requests_total.labels(broker=broker, method="POST", status_code=str(resp.status)).inc() + if resp.status == 204: + return 'created' # No Content — succès + if resp.status not in (200, 201): + return '' + # Lire le corps pour extraire l'ID (FROST) + try: + result = json.loads(resp.read()) + if '@iot.selfLink' in result: + link = result['@iot.selfLink'] + return link.split('(')[1].rstrip(')') + if '@iot.id' in result: + return str(result['@iot.id']) + except Exception: + pass + location = resp.headers.get('Location', '') + if location: + return location.split('(')[1].rstrip(')') if '(' in location else '' + return 'created' + except Exception: + pass + except urllib.error.HTTPError as e: + # Lire le corps de l'erreur pour debug + try: + err_body = e.read().decode()[:200] + except Exception: + err_body = str(e) + print(f" ⚠️ HTTP POST {url} → {e.code}: {err_body}") + http_requests_total.labels(broker=broker, method="POST", status_code=str(e.code)).inc() + messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc() + return '' + except Exception as e: + http_requests_total.labels(broker=broker, method="POST", status_code="exception").inc() + messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc() + print(f" ⚠️ HTTP POST {url} → {e}") + return '' + +def _http_put(url: str, data: dict, headers: dict, broker: str = "unknown") -> bool: + try: + body = json.dumps(data).encode() + req = urllib.request.Request(url, data=body, headers=headers, method="PUT") + with http_request_duration.labels(broker=broker, method="PUT").time(): + with urllib.request.urlopen(req, timeout=5) as resp: + http_requests_total.labels(broker=broker, method="PUT", status_code=str(resp.status)).inc() + return resp.status in (200, 204) + except urllib.error.HTTPError as e: + http_requests_total.labels(broker=broker, method="PUT", status_code=str(e.code)).inc() + if e.code == 409: + return True # Already exists - that's fine + messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc() + print(f" ⚠️ HTTP PUT {url} → {e}") + return False + except Exception as e: + http_requests_total.labels(broker=broker, method="PUT", status_code="exception").inc() + messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc() + print(f" ⚠️ HTTP PUT {url} → {e}") + return False + +# ============================================================================= +# MQTT Client multi-broker +# ============================================================================= +class MultiMQTT: + def __init__(self): + self.clients: dict[str, mqtt.Client] = {} + self.ok: dict[str, bool] = {} + self._lock = threading.Lock() + self._setup() + + def _mk_client(self, name: str, host: str, port: int, + tls: bool = False, user: str = "", pwd: str = "", + ws: bool = False, use_v5: bool = False) -> mqtt.Client: + cid = f"smartcity-sim-{name}-{os.getpid()}" + protocol = mqtt.MQTTv5 if use_v5 else mqtt.MQTTv311 + # Use Callback API v1 for compatibility with existing lambda callbacks + c = mqtt.Client(client_id=cid, protocol=protocol, callback_api_version=mqtt.CallbackAPIVersion.VERSION1) + if user: + c.username_pw_set(user, pwd) + if tls: + c.tls_set(cert_reqs=ssl.CERT_NONE) + c.tls_insecure_set(True) + if ws: + c.ws_set(b"/mqtt") + c.on_connect = lambda _c, _, __, rc: self._on_connect(name, rc) + c.on_disconnect = lambda _c, _, __: self._on_disconnect(name) + try: + c.connect(host, port, keepalive=60, clean_start=True) + c.loop_start() + except Exception as e: + print(f"[MQTT] ❌ {name} @ {host}:{port} → {e}") + self.ok[name] = False + return c + + def _on_connect(self, name: str, rc: int): + with self._lock: + if rc == 0: + self.ok[name] = True + mqtt_broker_connected.labels(broker=name).set(1) + mqtt_connection_total.labels(broker=name, status="success").inc() + print(f"[MQTT] ✅ {name} connecté") + else: + self.ok[name] = False + mqtt_broker_connected.labels(broker=name).set(0) + mqtt_connection_total.labels(broker=name, status="failure").inc() + print(f"[MQTT] ❌ {name} rc={rc}") + + def _on_disconnect(self, name: str): + with self._lock: + self.ok[name] = False + mqtt_broker_connected.labels(broker=name).set(0) + print(f"[MQTT] ⚠️ {name} déconnecté") + + def _setup(self): + # Utiliser les variables d'environnement pour les brokers + mqtt_version = os.environ.get("MQTT_PROTOCOL_VERSION", "311") + use_v5 = (mqtt_version == "5") + print(f"[MQTT] Version du protocole: {'v5.0' if use_v5 else 'v3.1.1'} (MQTT_PROTOCOL_VERSION={mqtt_version})") + + # Check which brokers are enabled + enable_emqx = os.environ.get("ENABLE_EMQX", "1") == "1" + enable_mosquitto = os.environ.get("ENABLE_MOSQUITTO", "1") == "1" + enable_bunkerm = os.environ.get("ENABLE_BUNKER", "1") == "1" + + brokers = [] + if enable_emqx: + brokers.append(("emqx", EMQX_HOST, EMQX_PORT, False, "", "", use_v5)) + if enable_mosquitto: + brokers.append(("mosquitto", MOSQUITTO_HOST, MOSQUITTO_PORT, False, "", "", use_v5)) + if enable_bunkerm: + brokers.append(("bunkerm", BUNKERM_HOST, BUNKERM_PORT, False, "bunker", "bunker", use_v5)) + + # OpenRemote MQTT broker (pour agents MQTT créés dans OpenRemote) + # Le broker Artemis d'OR accepte les connexions anonymes (pas de credentials) + if ENABLE_OPENREMOTE: + brokers.append(("openremote", "openremote_manager_1", 1883, False, "", "", use_v5)) + + print(f"[MQTT] 🔌 Connexion aux brokers (EMQX={enable_emqx}, Mosquitto={enable_mosquitto}, BunkerM={enable_bunkerm})...") + print("[MQTT] 🔌 Connexion aux brokers...") + for name, host, port, tls, user, pwd, use_v5 in brokers: + c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd, use_v5=use_v5) + self.clients[name] = c + self.ok[name] = False + # Attendre que tous les brokers soient connectés (Artemis peut être lent) + for _ in range(20): + time.sleep(1) + if all(self.ok.values()): + break + print(f"[MQTT] État des connexions: {dict(self.ok)}") + + def publish(self, topic: str, payload: str, sensor_type: str = "unknown") -> dict[str, bool]: + results = {} + payload_bytes = len(payload.encode()) + with self._lock: + for name, client in self.clients.items(): + if self.ok.get(name, False): + with publish_duration.labels(broker=name).time(): + try: + r = client.publish(topic, payload, qos=1) + success = (r.rc == mqtt.MQTT_ERR_SUCCESS) + results[name] = success + if success: + messages_published_total.labels(broker=name, sensor_type=sensor_type).inc() + message_payload_size.labels(broker=name).observe(payload_bytes) + else: + messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="mqtt_rc").inc() + except Exception: + results[name] = False + messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="exception").inc() + else: + results[name] = False + return results + +# def publish_iot_agent(self, sid: str, payload: dict, sensor_type: str = "unknown") -> bool: +# """Publie sur le topic IoT-Agent (json/smartcity-api-key/{sid}/attrs) via les 3 brokers.""" + topic = f"json/smartcity-api-key/{sid}/attrs" + msg = json.dumps(payload, ensure_ascii=False) + payload_bytes = len(msg.encode()) + + success = False + # Publier sur les 3 brokers: emqx, mosquitto, bunkerm + for broker_name in ['emqx', 'mosquitto', 'bunkerm']: + client_ok = self.clients.get(broker_name) is not None and self.ok.get(broker_name, False) + print(f"[MQTT-DEBUG] {broker_name}: client_exists={self.clients.get(broker_name) is not None}, ok={self.ok.get(broker_name, False)}") + if client_ok: + try: + r = self.clients[broker_name].publish(topic, msg, qos=1) + if r.rc == mqtt.MQTT_ERR_SUCCESS: + success = True +# messages_published_total.labels(broker='iot-agent', sensor_type=sensor_type).inc() +# message_payload_size.labels(broker='iot-agent').observe(payload_bytes) + except Exception: + pass # IoT-Agent code removed + return success + + def stop(self): + for name, c in self.clients.items(): + try: + c.loop_stop() + c.disconnect() + except Exception: + pass + +# ============================================================================= +# URLs de base (résolues au démarrage) +# ============================================================================= +#ORION_HOST = "localhost" +#ORION_PORT = "2026" +#ORION_URL = f"http://{ORION_HOST}:{ORION_PORT}" +#STELLIO_URL = os.environ.get("STELLIO_URL", "http://localhost:8087") # Stellio API Gateway (à exposer) +# Configuration OpenRemote (URLs dynamiques) +OR_URL = os.environ.get("OR_URL", "http://localhost:8080") # OpenRemote Manager (Traefik) +OR_REALM = os.environ.get("OR_REALM", "smartcity") # Default: smartcity +OR_TOKEN_URL = os.environ.get("OR_TOKEN_URL", "http://localhost:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token") +OR_TOKEN_TTL = int(os.environ.get("OR_TOKEN_TTL", "3600")) # Refresh token every hour +#STELLIO_TENANT = os.environ.get("STELLIO_TENANT", "urn:ngsi-ld:tenant:default") + + + +def publish_bunkerm(sid: str, sensor: dict, values: dict) -> bool: + """Publie sur BunkerM via HTTP API (port 2000) avec session.""" + import base64, http.cookiejar, urllib.request, json + from datetime import datetime, timezone + + host = "bunkerm_bunkerm_1:2000" + login_url = f"http://{host}/login" + data_url = f"http://{host}/api/sensors/data" + + # 1. Cookie jar pour maintenir la session + cj = http.cookiejar.CookieJar() + opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj)) + + # 2. Authentification (Basic) pour obtenir le cookie de session + creds = base64.b64encode(b"bunker:bunker").decode() + auth_header = {"Authorization": f"Basic {creds}"} + + try: + # GET sur /login pour initialiser la session + req_login = urllib.request.Request(login_url, headers=auth_header) + opener.open(req_login, timeout=5) + except Exception as e: + print(f" ⚠️ BunkerM login → {e}") + return False + + # 3. Préparer le payload + payload = { + "sensor_id": sid, + "sensor_type": sensor["type"], + "timestamp": datetime.now(timezone.utc).isoformat(), + } + for k, v in values.items(): + if k not in payload: + payload[k] = v + + # 4. POST avec le cookie de session + data = json.dumps(payload).encode() + req = urllib.request.Request( + data_url, + data=data, + headers={**auth_header, "Content-Type": "application/json"}, + method="POST" + ) + try: + with http_request_duration.labels(broker="bunkerm", method="POST").time(): + with opener.open(req, timeout=5) as resp: + http_requests_total.labels(broker="bunkerm", method="POST", status_code=str(resp.status)).inc() + messages_published_total.labels(broker="bunkerm", sensor_type=sensor["type"]).inc() + print(f" ✅ BunkerM: HTTP {resp.status}") + return resp.status in (200, 201, 204) + except Exception as e: + http_requests_total.labels(broker="bunkerm", method="POST", status_code="exception").inc() + messages_errors_total.labels(broker="bunkerm", sensor_type=sensor["type"], error_type="exception").inc() + print(f" ⚠️ BunkerM POST → {e}") + return False + +def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool: + """Crée le Thing (1 par capteur) + Datastreams, puis POST l'Observation.""" + # Cache : {sid: (thing_id, {field: ds_id})} + if sid in _frost_cache: + thing_id, ds_map = _frost_cache[sid] + if field in ds_map: + ds_id = ds_map[field] + obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations" + obs = { + "resultTime": datetime.now(timezone.utc).isoformat(), + "result": value, + "FeatureOfInterest": { + "name": f"Location {sid}", + "description": f"Feature of interest for sensor {sid}", + "encodingType": "application/vnd.geo+json", + "feature": { + "type": "Point", + "coordinates": [sensor.get("lon", -61.0), sensor.get("lat", 14.6)] + } + } + } + if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"): + print(f" ✅ FROST Observation {sid}/{field} → OK (cached)") + return True + else: + print(f" ⚠️ FROST Observation {sid}/{field} → échec") + return False + + # Premier appel pour ce capteur : créer Thing + tous les Datastreams + # Topic MQTT pour traçabilité + stype = sensor["type"] + topic = f"city/sensors/{stype}/{sid}" + thing_payload, datastreams = _frost_payload(sid, sensor, source="simulator", topic=topic) + print(f" 📊 FROST: POST Thing {sid}...") + tid = _http_post(f"{FROST_URL}/Things", thing_payload, FROST_HEADERS, broker="frost") + if not tid: + print(f" ⚠️ FROST Thing {sid} → échec création") + return False + print(f" ✅ FROST Thing {sid} créé (ID: {tid})") + + # Créer les Datastreams + ds_map = {} + for f, ds, _ in datastreams: + ds["Thing"] = {"@iot.id": tid} + print(f" 📊 FROST: POST Datastream {sid}/{f}...") + ds_id = _http_post(f"{FROST_URL}/Datastreams", ds, FROST_HEADERS, broker="frost") + if ds_id: + print(f" ✅ FROST Datastream {sid}/{f} créé (ID: {ds_id})") + ds_map[f] = ds_id + else: + print(f" ⚠️ FROST Datastream {sid}/{f} → échec") + + _frost_cache[sid] = (tid, ds_map) + + # Poster l'observation pour le field actuel + if field in ds_map: + ds_id = ds_map[field] + obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations" + obs = {"resultTime": datetime.now(timezone.utc).isoformat(), "result": value} + if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"): + print(f" ✅ FROST Observation {sid}/{field} → OK") + return True + return False + +# ============================================================================= +# OpenRemote REST +# ============================================================================= +_or_token_cache = {"token": "", "expires": 0} + +def _get_or_token() -> str: + """Obtain an OpenRemote token via password grant (admin user).""" + import time, urllib.parse + if _or_token_cache["token"] and _or_token_cache["expires"] > time.time() + 60: + return _or_token_cache["token"] + try: + # Use password grant with admin user (full rights) + token_url = f"http://openremote-keycloak-1:8080/auth/realms/{OR_TOKEN_REALM}/protocol/openid-connect/token" + client_id = os.environ.get("OR_CLIENT_ID", "openremote") + client_secret = os.environ.get("OR_CLIENT_SECRET", "0oQjzTfiEELYmj5jFwT4iIuWUDtQDvVa") + data = urllib.parse.urlencode({ + "grant_type": "password", + "username": os.environ.get("OR_ADMIN_USER", "admin"), + "password": os.environ.get("OR_ADMIN_PASS", "Digitribe972"), + "client_id": client_id, + "client_secret": client_secret + }).encode() + req = urllib.request.Request( + token_url, + data=data, + headers={"Content-Type": "application/x-www-form-urlencoded"} + ) + with urllib.request.urlopen(req, timeout=5) as r: + token_data = json.loads(r.read().decode()) + _or_token_cache["token"] = token_data["access_token"] + _or_token_cache["expires"] = time.time() + token_data.get("expires_in", 300) - 60 + return _or_token_cache["token"] + except Exception as e: + print(f" ⚠️ OpenRemote token → {e}") + return "" + +def _or_put(asset_id: str, payload: dict) -> bool: + """PUT update sur un asset OpenRemote (avec version).""" + token = _get_or_token() + if not token: + return False + try: + # Récupérer la version actuelle de l'asset + get_url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}" + get_req = urllib.request.Request(get_url, headers={"Authorization": f"Bearer {token}"}) + version = 1 + try: + with urllib.request.urlopen(get_req, timeout=5) as resp: + asset_data = json.loads(resp.read().decode()) + version = asset_data.get("version", 1) + except: + pass # Si GET échoue, utiliser version=1 + + # Ajouter la version au payload + payload["version"] = version + body = json.dumps(payload).encode() + url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}" + req = urllib.request.Request(url, data=body, + headers={ + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + "If-Match": str(version), + }, + method="PUT") + with http_request_duration.labels(broker="openremote", method="PUT").time(): + with urllib.request.urlopen(req, timeout=5) as resp: + http_requests_total.labels(broker="openremote", method="PUT", status_code=str(resp.status)).inc() + messages_published_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown")).inc() + return resp.status in (200, 204) + except urllib.error.HTTPError as e: + http_requests_total.labels(broker="openremote", method="PUT", status_code=str(e.code)).inc() + messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="http_error").inc() + print(f" ⚠️ OR PUT {asset_id} → HTTP {e.code}") + return False + except Exception as e: + http_requests_total.labels(broker="openremote", method="PUT", status_code="exception").inc() + messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="exception").inc() + print(f" ⚠️ OR PUT {asset_id} → {e}") + return False + +def publish_openremote(sid: str, sensor: dict, values: dict) -> bool: + """Met à jour les attributs d'un asset OpenRemote via REST.""" + # Mapping sid → asset ID (realm master) — assets avec agentLink + location + # Note: le compteur SENSORS est global (traffic=0-9, airquality=10-19, parking=20-29, noise=30-39, weather=40-49, light=50-59) + ASSET_MAP = { + "traffic_000": "429858caca3341f56fbf65", + "traffic_001": "301218322f5aaca9d6d168", + "traffic_002": "bd35fe2a90133118b9b004", + "traffic_003": "da59ec9301c4efd3fd55c4", + "traffic_004": "834f4b7b9df848f5c5c2d8", + "airquality_010": "0f922351a9894bc0144c94", + "airquality_011": "4f83219bbee703b3e0a255", + "airquality_012": "381cc31ab83dd66ed4be37", + "airquality_013": "808b73c22ecd19589a33be", + "airquality_014": "03c18679226329183b44b6", + "parking_020": "0ee6689f5c0499643d48eb", + "parking_021": "8fb6b2d0601d98b47a4172", + "parking_022": "0c00bda9e5075d12d59694", + "parking_023": "ae981dc9d155d1313b9acf", + "parking_024": "96020cc5aef95c5fda7bb4", + "noise_030": "0be31930e45d2eb5c12ccd", + "noise_031": "1802e76e3432d5eda1deb7", + "noise_032": "08edb6518750d50644afe3", + "noise_033": "93d09bfac36d2ed95fc858", + "noise_034": "7942726d84d2bd29de1e5d", + "weather_040": "9942f881ab6df375d8d9fa", + "weather_041": "5400fdf5c51a4fe4f5a89c", + "weather_042": "1a3bf32aa5208892e68965", + "weather_043": "d3725f922f96085f2df3f7", + "weather_044": "13be192a8c23dd8fdceada", + "light_050": "1f4302946b1a4a1ded23f6", + "light_051": "35e6ef027ed9a157ad8780", + "light_052": "526538589aa981bdc77ce9", + "light_053": "d4a6ac7f34d64e581937c0", + "light_054": "40bbe989be2ae5b2a98b30", + } + asset_id = ASSET_MAP.get(sid) + if not asset_id: + return False + # Construire les attributs à jour + now = datetime.now(timezone.utc).isoformat() + attrs = { + "timestamp": {"type": "DateTime", "value": now, "timestamp": now}, + "battery_level": {"type": "Number", "value": random.randint(60, 100), "timestamp": now}, + } + # Mapper les valeurs du payload vers les attributs OR + field_map = { + "temperature_celsius": "temperature", + "humidity_percent": "humidity", + "noise_level_db": "noiseLevel", + "pm25_ugm3": "airQuality", + "vehicle_count": "trafficFlow", + "available_spots": "parking", + "brightness_lux": "light", + "flood": "flood", + } + for field, val in values.items(): + attr_name = field_map.get(field, field) + attrs[attr_name] = { + "type": "Number", + "value": val, + "timestamp": now, + "unit": "µg/m³" if "ugm3" in field else ("°C" if "celsius" in field else ("%" if "percent" in field or "humidity" in field else ("dB" if "db" in field else ""))), + } + + # Ajouter la location du capteur (GeoJSON Point) + lat = sensor.get("lat", 0) + lon = sensor.get("lon", 0) + attrs["location"] = { + "type": "GeoJSONPoint", + "value": {"type": "Point", "coordinates": [lat, lon]}, + "timestamp": now, + } + + payload = { + "id": asset_id, + "name": sensor["name"], + "type": "IOTSensor", + "realm": OR_REALM, + "attributes": attrs, + } + return _or_put(asset_id, payload) + +# ============================================================================= +# Pulsar — HTTP REST Producer +# API: POST http://host:8080/admin/v2/persistent/public/default/{topic}/produce +# Payload: {"messages": [{"payload": "", "properties": {...}}]} +# Topics auto-créés par le premier message (Pulsar standalone) +# ============================================================================= +_pulsar_session = None + +def _get_pulsar_session(): + global _pulsar_session + if _pulsar_session is None: + import urllib.request + _pulsar_session = urllib.request + return _pulsar_session + +def _init_pulsar() -> bool: + """Teste la connectivité Pulsar au démarrage.""" + try: + import urllib.request + req = urllib.request.Request(f"{PULSAR_BASE}/admin/v2/clusters") + with urllib.request.urlopen(req, timeout=5) as resp: + if resp.status == 200: + print(f"[PULSAR] ✅ Connected to {PULSAR_BASE}") + return True + except Exception as e: + print(f"[PULSAR] ⚠️ Cannot reach {PULSAR_BASE}: {e}") + return False + +def publish_pulsar(sid: str, sensor: dict, payload: dict) -> bool: + """Publie un message sur Pulsar via le client Python (port binaire 6650).""" + stype = sensor["type"] + topic = f"persistent://public/default/smartcity-{stype.replace('-','')}" + try: + import pulsar + with publish_duration.labels(broker="pulsar").time(): + client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650") + producer = client.create_producer(topic) + body = json.dumps(payload, ensure_ascii=False).encode() + producer.send(body, properties={"sensor_id": sid, "source": "simulator"}) + client.close() + messages_published_total.labels(broker="pulsar", sensor_type=stype).inc() + message_payload_size.labels(broker="pulsar").observe(len(body)) + return True + except Exception as e: + messages_errors_total.labels(broker="pulsar", sensor_type=stype, error_type="exception").inc() + print(f" ⚠️ Pulsar → {e}") + return False + +# ============================================================================= +# Redpanda / Kafka — HTTP REST Proxy +# API: POST http://host:8082/topics/{topic} +# Payload: {"records": [{"value": ""}]} +# Topics auto-créés par le premier message (Redpanda) +# ============================================================================= +_redpanda_session = None + +def _get_redpanda_session(): + global _redpanda_session + if _redpanda_session is None: + import urllib.request + _redpanda_session = urllib.request + return _redpanda_session + +def _init_redpanda() -> bool: + """Teste la connectivité Redpanda au démarrage.""" + try: + import urllib.request + req = urllib.request.Request(f"{REDPANDA_BASE}/v1/status/alive") + with urllib.request.urlopen(req, timeout=5) as resp: + if resp.status == 200: + print(f"[REDPANDA] ✅ Connected to {REDPANDA_BASE}") + return True + except Exception as e: + print(f"[REDPANDA] ⚠️ Cannot reach {REDPANDA_BASE}: {e}") + return False + +def publish_redpanda(sid: str, sensor: dict, payload: dict) -> bool: + """Publie un message sur Redpanda/Kafka via le REST Proxy.""" + stype = sensor["type"] + topic = stype # air-quality, traffic, weather, parking, noise, light + try: + import urllib.request, base64 + body = json.dumps(payload, ensure_ascii=False) + b64 = base64.b64encode(body.encode()).decode() + record = { + "records": [{"value": b64}] + } + url = f"{REDPANDA_BASE}/topics/{topic}" + req = urllib.request.Request( + url, + data=json.dumps(record).encode(), + headers={"Content-Type": "application/vnd.kafka.json.v2+json"}, + method="POST" + ) + with http_request_duration.labels(broker="redpanda", method="POST").time(): + with urllib.request.urlopen(req, timeout=8) as resp: + http_requests_total.labels(broker="redpanda", method="POST", status_code=str(resp.status)).inc() + messages_published_total.labels(broker="redpanda", sensor_type=stype).inc() + message_payload_size.labels(broker="redpanda").observe(len(body.encode())) + return resp.status in (200, 201, 204) + except urllib.error.HTTPError as e: + http_requests_total.labels(broker="redpanda", method="POST", status_code=str(e.code)).inc() + messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="http_error").inc() + print(f" ⚠️ Redpanda → {e.code}") + return False + except Exception as e: + http_requests_total.labels(broker="redpanda", method="POST", status_code="exception").inc() + messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="exception").inc() + print(f" ⚠️ Redpanda → {e}") + return False + +def publish_influx(sid: str, sensor: dict, values: dict) -> bool: + """Write sensor data to InfluxDB (async, non-blocking).""" + if not _influx_write_api: + influx_write_total.labels(status="skipped").inc() + return False + + def _write_async(): + try: + stype = sensor["type"] + lat = sensor.get("lat", BASE_LAT) + lon = sensor.get("lon", BASE_LON) + + points = [] + for field, value in values.items(): + if isinstance(value, (int, float)): + p = influxdb_client.Point(stype)\ + .tag("sensor_id", sid)\ + .tag("location", sensor.get("name", sid))\ + .field(field, float(value))\ + .field("lat", float(lat))\ + .field("lon", float(lon)) + points.append(p) + + if points: + _influx_write_api.write(bucket=INFLUX_BUCKET, record=points) + influx_write_total.labels(status="success").inc() + print(f" 📈 InfluxDB: {len(points)} points written") + except Exception as e: + influx_write_total.labels(status="error").inc() + print(f" ⚠️ InfluxDB → {e}") + + # Exécution asynchrone (non-bloquante) + t = threading.Thread(target=_write_async, daemon=True) + t.start() + return True # Async: on ne peut pas savoir immédiatement + +def main(): + print("╔══════════════════════════════════════════════════╗") + print("║ Smart City Simulator — Martinique ║") + print("╚══════════════════════════════════════════════════╝") + print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s") +# print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}") + print(f"[CFG] InfluxDB: {ENABLE_INFLUX} | Pulsar: {ENABLE_PULSAR} | Redpanda: {ENABLE_REDPANDA}") + + # --- Démarrer le serveur Prometheus --- + _start_metrics_server() + + # --- Configurer les gauges --- + for stype, count in SENSOR_COUNTS.items(): + sensors_total.labels(sensor_type=stype).set(count) + up.set(1) + + # Init connectivity checks + if ENABLE_PULSAR: + _init_pulsar() + # Test immédiat + print(f" 🌪️ DEBUG: Test Pulsar direct...", flush=True) + test_payload = {"type": "test", "value": 123} + test_result = publish_pulsar("test_001", {"type": "air-quality"}, test_payload) + print(f" 🌪️ DEBUG: Test Pulsar result: {test_result}", flush=True) + if ENABLE_REDPANDA: + _init_redpanda() + + mqtt_client = MultiMQTT() + + running = True + def signal_handler(*_): + nonlocal running + running = False + up.set(0) + print("\n[SIM] 🛑 Arrêt...") + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + iteration = 0 + while running: + iteration += 1 + ts = datetime.now().strftime("%H:%M:%S") + print(f"\n[SIM] ⏱️ It #{iteration} — {ts}") + + for sid, sensor in SENSORS.items(): + stype = sensor["type"] + # Utiliser l'index du capteur dans FIXED_LOCATIONS (1-5) pour correspondre aux agents OpenRemote + # Le sid est formaté comme {stype}_{counter:03d} avec counter global + # On extrait l'index du capteur depuis le sid (derniers chiffres) + sensor_num = int(sid.split("_")[1]) + # Calculer l'index 1-5 basé sur la position dans le type + # traffic: 0-9, airquality: 10-19, parking: 20-29, noise: 30-39, weather: 40-49, light: 50-59 + type_offsets = {"traffic": 0, "airquality": 10, "parking": 20, "noise": 30, "weather": 40, "light": 50} + type_offset = type_offsets.get(stype, 0) + sensor_index = sensor_num - type_offset + 1 # 1-indexed + topic = f"smartcity/{stype}/{sensor_index}" + + # --- Payload MQTT (ATTRIBUTES ONLY - pas de id/type/lat/lon !) +# # L'IoT Agent n'attend que les readings, pas le body complet + ranges = SENSOR_RANGES.get(stype, {}) + payload_mqtt = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "battery_level": random.randint(60, 100), + } + for field, val_range in ranges.items(): + if isinstance(val_range, tuple) and len(val_range) == 2: + lo, hi = val_range + if isinstance(lo, (int, float)): + payload_mqtt[field] = round(random.uniform(lo, hi), 1) + elif isinstance(val_range, list): + payload_mqtt[field] = random.choice(val_range) + + msg = json.dumps(payload_mqtt, ensure_ascii=False) + + # --- MQTT publish --- + results = mqtt_client.publish(topic, msg, sensor_type=stype) + ok_mqtt = [n for n, r in results.items() if r] + if ok_mqtt: + print(f" 📤 {topic} → {','.join(ok_mqtt)}") + +# # --- IoT-Agent (via EMQX) --- +# if ENABLE_IOT_AGENT: +# ok_iot = mqtt_client.publish_iot_agent(sid, payload_mqtt, sensor_type=stype) +# print(f" 🤖 IoT-Agent: {'✅' if ok_iot else '❌'}") + + # Extraire les valeurs pour OpenRemote + or_values = {} + for field, val_range in ranges.items(): + if isinstance(val_range, tuple) and len(val_range) == 2: + lo, hi = val_range + if isinstance(lo, (int, float)): + or_values[field] = round(random.uniform(lo, hi), 1) + + # --- OpenRemote REST --- (DÉSACTIVÉ: les agents MQTT gèrent les mises à jour) + # Le REST échoue en 403 sur les assets avec agentLink + # if ENABLE_OPENREMOTE: + # ok_or = publish_openremote(sid, sensor, or_values) + # print(f" 🏠 OpenRemote: {'✅' if ok_or else '⚠️ skipped'}") + +# # --- Orion-LD --- (DÉSACTIVÉ: tout passe par les IoT-Agents MQTT) +# # if ENABLE_ORION: +# # ok_or = publish_orion(sid, sensor) +# # print(f" 🌐 Orion-LD: {'✅' if ok_or else '⚠️ skipped'}") + +# # --- Stellio --- (DÉSACTIVÉ: tout passe par les IoT-Agents MQTT) +# # if ENABLE_STELLIO: +# # ok_st = publish_stellio(sid, sensor) +# # print(f" 🏢 Stellio: {'✅' if ok_st else '❌'}") + + # --- FROST --- + if ENABLE_FROST: + ranges2 = SENSOR_RANGES.get(stype, {}) + for field, val_range in ranges2.items(): + if isinstance(val_range, tuple) and len(val_range) == 2: + lo, hi = val_range + if isinstance(lo, (int, float)): + val = round(random.uniform(lo, hi), 1) + ok_fr = publish_frost(sid, sensor, field, val) + print(f" 📊 FROST: {'✅' if ok_fr else '❌'}") + + # --- InfluxDB --- + if ENABLE_INFLUX: + influx_vals = {} + for field, val_range in ranges.items(): + if isinstance(val_range, tuple) and len(val_range) == 2: + lo, hi = val_range + if isinstance(lo, (int, float)): + influx_vals[field] = round(random.uniform(lo, hi), 1) + ok_influx = publish_influx(sid, sensor, influx_vals) + print(f" 📈 InfluxDB: {'✅' if ok_influx else '❌'}") + + # --- Pulsar (HTTP REST) --- + if ENABLE_PULSAR: + print(f" 🌪️ DEBUG: calling publish_pulsar for {sid}, payload_mqtt exists: {bool(locals().get('payload_mqtt'))}", flush=True) + ok_pulsar = publish_pulsar(sid, sensor, payload_mqtt) + print(f" 🌪️ Pulsar: {'✅' if ok_pulsar else '❌'}") + + # --- Redpanda (Kafka REST Proxy) --- + if ENABLE_REDPANDA: + ok_redpanda = publish_redpanda(sid, sensor, payload_mqtt) + print(f" 🐟 Redpanda: {'✅' if ok_redpanda else '❌'}") + + # --- BunkerM HTTP --- + if os.getenv("BUNKERM_HTTP", "0") == "1": + ok_bunkerm = publish_bunkerm(sid, sensor, payload_mqtt) + print(f" 📦 BunkerM: {'✅' if ok_bunkerm else '❌'}") + + print(f"[SIM] ✅ {len(SENSORS)} capteurs | MQTT OK: {sum(mqtt_client.ok.values())}/{len(mqtt_client.clients)} | OR: {ENABLE_OPENREMOTE}") + + try: + time.sleep(INTERVAL) + except KeyboardInterrupt: + break + + mqtt_client.stop() + print("[SIM] ✅ Arrêté proprement.") + +if __name__ == "__main__": + main()