Files
smart-city-digital-twin-mar…/simulator.py.backup_20260512_073330
Eric FELIXINE dbf8b7f5ca docs: état des lieux localisation capteurs OpenRemote
- Documentation des découvertes et corrections appliquées
- Problèmes restants identifiés (connexion MQTT, topics, déconnexion)
- Prochaines étapes recommandées
2026-05-12 08:18:32 -04:00

1233 lines
56 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Smart City IoT Simulator — Martinique (14.6°N, 61.2°W)
=======================================================
Publie vers MULTIPLES brokers MQTT + context brokers NGSI-LD.
Brokers MQTT:
- EMQX: emqx_emqx_1:1883 (sans auth)
- Mosquitto: mainfluxlabs-mosquitto:1883 (bunker/bunker)
- BunkerM: bunkerm_bunkerm_1:1900 (TLS, bunker/bunker)
- OpenRemote: openremote-manager-1:1883 (admin/Digitribe972)
Context Brokers REST:
# - Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD)
# - Stellio: stellio-api-gateway:8080 (NGSI-LD)
- FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings)
Streaming Platforms:
- Pulsar: smart-city-pulsar:8080 (HTTP REST Producer API)
- Redpanda: smart-city-redpanda:8082 (Kafka REST Proxy)
Time-Series DB:
- InfluxDB v2: smart-city-influxdb:8086 (via influxdb-client)
Variables d'environnement:
PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s)
BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France)
# ENABLE_ORION=1 : activer Orion-LD (défaut: 1)
# ENABLE_STELLIO=1 : activer Stellio (défaut: 1)
ENABLE_FROST=1 : activer FROST-Server (défaut: 1)
ENABLE_INFLUX=1 : activer InfluxDB v2 (défaut: 1)
ENABLE_PULSAR=1 : activer Apache Pulsar (défaut: 1)
ENABLE_REDPANDA=1 : activer Redpanda Kafka (défaut: 1)
"""
import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse
import paho.mqtt.client as mqtt
import urllib.request, urllib.error
from datetime import datetime, timezone
from typing import Any
# Prometheus metrics
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge, Info
# InfluxDB support
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
# =============================================================================
# Configuration des brokers MQTT
# Configuration des brokers MQTT
# Utilise les noms de services Docker par défaut
EMQX_HOST = os.environ.get("EMQX_HOST", "emqx_emqx_1")
EMQX_PORT = int(os.environ.get("EMQX_PORT", "1883"))
MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "smart-city-mosquitto")
MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883"))
BUNKERM_HOST = os.environ.get("BUNKERM_HOST", "bunkerm_bunkerm_1")
BUNKERM_PORT = int(os.environ.get("BUNKERM_PORT", "1900"))
# =============================================================================
# Configuration
# =============================================================================
BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091"))
BASE_LON = float(os.environ.get("BASE_LON", "-61.2155"))
INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "1")) # 1s pour temps réel
#ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1"
#ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1").lower() in ("1", "true", "yes", "on")
ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1"
ENABLE_OPENREMOTE = os.environ.get("ENABLE_OPENREMOTE", "1") == "1"
OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin")
OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "Digitribe972")
OR_REALM = os.environ.get("OR_REALM", "master")
OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token
#ENABLE_IOT_AGENT = os.environ.get("ENABLE_IOT_AGENT", "1") == "1"
FROST_URL = os.environ.get("FROST_URL", "http://localhost:8090/FROST-Server/v1.1") # Exposer frost_http-web-1:8080 -> host:8086
# Pulsar config (HTTP REST — pulsar-admin + producer REST API)
ENABLE_PULSAR = os.environ.get("ENABLE_PULSAR", "1").lower() in ("1", "true", "yes", "on")
PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar")
PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "8080"))
PULSAR_BASE = f"http://{PULSAR_HOST}:{PULSAR_PORT}"
# Redpanda / Kafka config (REST Proxy HTTP)
ENABLE_REDPANDA = os.environ.get("ENABLE_REDPANDA", "1").lower() in ("1", "true", "yes", "on")
REDPANDA_HOST = os.environ.get("REDPANDA_HOST", "smart-city-redpanda")
REDPANDA_PORT = int(os.environ.get("REDPANDA_PORT", "8082"))
REDPANDA_BASE = f"http://{REDPANDA_HOST}:{REDPANDA_PORT}"
# InfluxDB config
ENABLE_INFLUX = os.environ.get("ENABLE_INFLUX", "1").lower() in ("1", "true", "yes", "on")
INFLUX_URL = os.environ.get("INFLUX_URL", "http://smart-city-influxdb:8086") # InfluxDB v2 sur smartcity-shared
INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe")
INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "smartcity") # Correspond au bucket de Telegraf
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-token")
# Prometheus metrics HTTP server
METRICS_PORT = int(os.environ.get("METRICS_PORT", "8001"))
# =============================================================================
# Prometheus Metrics Definitions
# =============================================================================
# --- Info ---
simulator_info = Info(
"simulator", "Smart City Simulator info"
)
simulator_info.info({
"version": "1.0.0",
"python_version": sys.version.split()[0],
"mqtt_brokers": "emqx,mosquitto,bunkerm",
# "context_brokers": "orion_ld,stellio,frost",
})
# --- Counters ---
messages_published_total = Counter(
"simulator_messages_published_total",
"Total messages published by broker",
["broker", "sensor_type"]
)
messages_errors_total = Counter(
"simulator_messages_errors_total",
"Total publish errors",
["broker", "sensor_type", "error_type"]
)
mqtt_connection_total = Counter(
"simulator_mqtt_connection_total",
"MQTT connection attempts",
["broker", "status"] # status: success, failure
)
http_requests_total = Counter(
"simulator_http_requests_total",
"HTTP requests to REST APIs",
["broker", "method", "status_code"]
)
influx_write_total = Counter(
"simulator_influx_write_total",
"InfluxDB write operations",
["status"] # success, error
)
# --- Gauges ---
mqtt_broker_connected = Gauge(
"simulator_mqtt_broker_connected",
"MQTT broker connection status (1=connected, 0=disconnected)",
["broker"]
)
sensors_total = Gauge(
"simulator_sensors_total",
"Total number of sensors by type",
["sensor_type"]
)
up = Gauge(
"simulator_up",
"Simulator is running (1=yes, 0=no)"
)
# --- Histograms ---
publish_duration = Histogram(
"simulator_publish_duration_seconds",
"Time spent publishing a message",
["broker"],
buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5)
)
http_request_duration = Histogram(
"simulator_http_request_duration_seconds",
"HTTP request latency to REST APIs",
["broker", "method"],
buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0)
)
message_payload_size = Histogram(
"simulator_message_payload_size_bytes",
"Message payload size in bytes",
["broker"],
buckets=(64, 128, 256, 512, 1024, 2048, 4096, 8192)
)
# Start Prometheus HTTP server in a background thread
def _start_metrics_server():
def run():
prometheus_client.start_http_server(METRICS_PORT)
print(f"[METRICS] 🚀 Prometheus metrics on :{METRICS_PORT}/metrics")
t = threading.Thread(target=run, daemon=True)
t.start()
return t
# Initialize InfluxDB client
_influx_client = None
_influx_write_api = None
if ENABLE_INFLUX:
try:
_influx_client = influxdb_client.InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
_influx_write_api = _influx_client.write_api(write_options=influxdb_client.client.write_api.ASYNCHRONOUS)
print(f"[INFLUX] ✅ Connected to {INFLUX_URL} (async mode)")
except Exception as e:
print(f"[INFLUX] ❌ Connection failed: {e}")
SENSOR_COUNTS = {
"traffic": int(os.environ.get("SENSOR_COUNT_traffic", "3")),
"airquality": int(os.environ.get("SENSOR_COUNT_airquality", "10")),
"parking": int(os.environ.get("SENSOR_COUNT_parking", "10")),
"noise": int(os.environ.get("SENSOR_COUNT_noise", "1")),
"weather": int(os.environ.get("SENSOR_COUNT_weather", "1")),
"light": int(os.environ.get("SENSOR_COUNT_light", "1")),
}
# Si SENSOR_COUNT est défini, multiplier les counts de façon proportionnelle
_total_default = sum(SENSOR_COUNTS.values())
if "SENSOR_COUNT" in os.environ:
target = int(os.environ["SENSOR_COUNT"])
ratio = target / _total_default
for k in SENSOR_COUNTS:
SENSOR_COUNTS[k] = max(1, int(SENSOR_COUNTS[k] * ratio))
# =============================================================================
# Localisation des capteurs Martinique — Coordonnées FIXES sur terre ferme
# IMPORTANT : ±0.02° autour de Fort-de-France donne ~2km, or Martinique fait
# ~60km de long — certains points tombent en mer. Solution : coordonnées fixes.
# =============================================================================
# Coordonnées réelles Martinique (terre ferme uniquement)
# Martinique : 14.4°N14.9°N, -61.23°W-60.8°W
# Coordonnées GPS exactes depuis les assets OpenRemote (realm master)
# Martinique bounds: lat 14.3714.88°N, lon 61.061.25°W
FIXED_LOCATIONS: dict[str, dict[str, tuple[float, float]]] = {
"traffic": {
"Fort-de-France Centre": (14.6164, -61.07),
"Le Lamentin Aéroport": (14.6167, -61.0035),
"Le Robert D110": (14.6833, -60.9333),
"Sainte-Anne Plage": (14.4333, -60.9833),
"Saint-Joseph D1": (14.7, -61.05),
"Trinité Centre": (14.7167, -60.9167),
"Le François D2": (14.6833, -60.8333),
"Ducos Penitencier": (14.5833, -61.0667),
"Schœlcher Morne": (14.65, -61.1),
"Case-Pilote Bourg": (14.5167, -61.1167),
},
"airquality": {
"Fort-de-France Lamartine": (14.613, -61.0667),
"Le Lamentin Zac": (14.62, -61.0),
"Le Robert Bourg": (14.68, -60.93),
"Sainte-Anne Village": (14.43, -60.98),
"Saint-Joseph Morne": (14.705, -61.04),
"Trinité Eglise": (14.72, -60.91),
"Le François Bourg": (14.68, -60.83),
"Ducos Centre": (14.58, -61.06),
"Schœlcher Plage": (14.655, -61.11),
"Case-Pilote D1": (14.52, -61.12),
},
"parking": {
"Fort-de-France Place Clémenceau": (14.615, -61.068),
"Le Lamentin Centre Commercial": (14.618, -61.002),
"Le Robert Stade": (14.685, -60.935),
"Sainte-Anne Mairie": (14.432, -60.985),
"Saint-Joseph Ecole": (14.702, -61.045),
"Trinité Port": (14.715, -60.92),
"Le François Mairie": (14.682, -60.835),
"Ducos ZI": (14.585, -61.055),
"Schœlcher Bourg": (14.652, -61.105),
"Case-Pilote Stade": (14.518, -61.118),
},
"noise": {
"Fort-de-France Théâtre": (14.617, -61.069),
"Le Lamentin Zone Industrielle": (14.619, -61.001),
"Le Robert Bourg": (14.681, -60.932),
"Sainte-Anne Plage": (14.434, -60.982),
"Saint-Joseph Morne": (14.703, -61.042),
"Trinité Centre": (14.717, -60.918),
"Le François Bourg": (14.681, -60.832),
"Ducos Penitencier": (14.584, -61.058),
"Schœlcher Morne": (14.651, -61.102),
"Case-Pilote Village": (14.519, -61.115),
},
"weather": {
"Fort-de-France Meteo": (14.616, -61.067),
"Le Lamentin Aéroport": (14.617, -61.004),
"Le Robert Bourg": (14.682, -60.934),
"Sainte-Anne Village": (14.431, -60.981),
"Saint-Joseph Morne": (14.704, -61.043),
"Trinité Eglise": (14.718, -60.912),
"Le François Bourg": (14.683, -60.834),
"Ducos Centre": (14.586, -61.057),
"Schœlcher Plage": (14.654, -61.108),
"Case-Pilote D1": (14.521, -61.113),
},
"light": {
"Fort-de-France Place": (14.6155, -61.0685),
"Le Lamentin Rond-point": (14.6185, -61.0025),
"Le Robert D110": (14.6835, -60.9335),
"Sainte-Anne Plage": (14.4335, -60.9835),
"Saint-Joseph D1": (14.7005, -61.0505),
"Trinité Centre": (14.7165, -60.9165),
"Le François D2": (14.6835, -60.8335),
"Ducos Penitencier": (14.5835, -61.0665),
"Schœlcher Morne": (14.6505, -61.1005),
"Case-Pilote Bourg": (14.5165, -61.1165),
},
}
# Ranges par type
SENSOR_RANGES: dict[str, dict] = {
"traffic": {"vehicle_count":(10,150),"average_speed_kmh":(10,80),
"congestion_level":(0,5),"occupancy_percent":(0,100)},
"airquality": {"pm25_ugm3":(5,80),"pm10_ugm3":(10,150),"no2_ugm3":(5,60),
"o3_ugm3":(20,120),"co_mgm3":(0.1,5.0),
"temperature_celsius":(20,35),"humidity_percent":(40,95)},
"parking": {"total_spots":(50,500),"available_spots":(0,500),
"occupancy_percent":(0,100),"turnover_per_hour":(5,50)},
"noise": {"noise_level_db":(40,95),"peak_db":(60,110)},
"weather": {"temperature_celsius":(22,34),"humidity_percent":(50,95),
"wind_speed_kmh":(0,50),"pressure_hpa":(1005,1025),
"rain_mm":(0,20),"uv_index":(0,11)},
"light": {"brightness_lux":(0,100000),"power_consumption_w":(0,500)},
}
NOISE_CATEGORIES = ["quiet","moderate","loud","very_loud"]
LIGHT_STATUSES = ["on","off","dimmed","auto"]
# =============================================================================
# Capteurs déclarés
# =============================================================================
SENSORS: dict[str, dict] = {}
counter = 0
for stype, locs in FIXED_LOCATIONS.items():
for name, coords in locs.items():
sid = f"{stype}_{counter:03d}"
SENSORS[sid] = {"type": stype, "lat": coords[0], "lon": coords[1], "name": name}
counter += 1
# =============================================================================
## Payload NGSI-LD pour Orion-LD / Stellio
# =============================================================================
# Contextes NGSI-LD : core + Smart Data Models
# https://smartdatamodels.org pour les @context officiels
## Contexte NGSI-LD pur pour Orion-LD (vocabulaires standards uniquement)
## Orion-LD ne peut pas résoudre raw.githubusercontent.com — utiliser uri.etsi.org uniquement
#ORION_CONTEXT = [
# "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
#]
# Mapping sensor type → Smart Data Model type NGSI-LD
SMART_MODEL_MAPPING = {
"airquality": "AirQualityObserved",
"traffic": "TrafficFlowObserved",
"parking": "OffStreetParking",
"noise": "NoiseLevelObserved",
"weather": "WeatherObserved",
"light": "Device",
}
FROST_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"}
# Cache FROST : éviter de recréer Thing/Datastream
_frost_cache: dict[str, tuple[str, str]] = {} # (sid, field) -> (thing_id, ds_id)
## Contexte NGSI-LD pur pour Stellio et Orion-LD (vocabulaires standards uniquement)
## Stellio et Orion-LD embarquent le contexte core NGSI-LD : https://uri.etsi.org/ngsi-ld/
# On n'utilise PAS les vocabulaires smartdatamodels.org distants (inaccessibles depuis les containers)
# Les types d'entité Smart Data Models (AirQualityObserved, etc.) sont reconnus par leur nom
# Les propriétés spécifiques sont stockées telles quelles (vocabulaire libre)
#STELLIO_INLINE_CONTEXT = [
# "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
#]
def _frost_payload(sid: str, sensor: dict, source: str = "simulator", topic: str = "") -> dict:
"""Construit un payload SensorThings pour FROST-Server."""
stype = sensor["type"]
ranges = SENSOR_RANGES.get(stype, {})
datastreams = []
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)) and isinstance(hi, (int, float)):
val = round(random.uniform(lo, hi), 1)
unit = "http://www.qudt.org/vocab/unit#DegreeCelsius"
obs_prop = {
"name": f"{field} Observation",
"description": f"Observation of {field}",
"definition": unit,
}
sensor_data = {
"name": f"Sensor {sid} {field}",
"description": f"Sensor {sid} measuring {field}",
"encodingType": "http://www.opengis.net/doc/IS/SensorML/2.0",
"metadata": {"unit": unit},
}
ds = {
"name": f"Datastream {stype}/{field}",
"description": f"Datastream for {stype} sensor {sid} - {field}",
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
"unitOfMeasurement": {"name": field, "symbol": "", "definition": unit},
"Sensor": sensor_data,
"ObservedProperty": obs_prop,
}
datastreams.append((field, ds, val))
thing_payload = {
"name": f"Thing_{sid}",
"description": f"Smart City {stype} sensor in Martinique",
"properties": {
"sensorType": stype,
"region": "Martinique",
"source": source, # Traçabilité
"mqttTopic": topic # Traçabilité
},
}
return thing_payload, datastreams
# =============================================================================
# HTTP helper
# =============================================================================
def _http_post(url: str, data: dict, headers: dict, broker: str = "unknown") -> str:
"""POST et retourne 'ok' ou 'created' (ou '' si échec)."""
try:
body = json.dumps(data).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with http_request_duration.labels(broker=broker, method="POST").time():
try:
with urllib.request.urlopen(req, timeout=8) as resp:
http_requests_total.labels(broker=broker, method="POST", status_code=str(resp.status)).inc()
if resp.status == 204:
return 'created' # No Content — succès
if resp.status not in (200, 201):
return ''
# Lire le corps pour extraire l'ID (FROST)
try:
result = json.loads(resp.read())
if '@iot.selfLink' in result:
link = result['@iot.selfLink']
return link.split('(')[1].rstrip(')')
if '@iot.id' in result:
return str(result['@iot.id'])
except Exception:
pass
location = resp.headers.get('Location', '')
if location:
return location.split('(')[1].rstrip(')') if '(' in location else ''
return 'created'
except Exception:
pass
except urllib.error.HTTPError as e:
# Lire le corps de l'erreur pour debug
try:
err_body = e.read().decode()[:200]
except Exception:
err_body = str(e)
print(f" ⚠️ HTTP POST {url}{e.code}: {err_body}")
http_requests_total.labels(broker=broker, method="POST", status_code=str(e.code)).inc()
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc()
return ''
except Exception as e:
http_requests_total.labels(broker=broker, method="POST", status_code="exception").inc()
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc()
print(f" ⚠️ HTTP POST {url}{e}")
return ''
def _http_put(url: str, data: dict, headers: dict, broker: str = "unknown") -> bool:
try:
body = json.dumps(data).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="PUT")
with http_request_duration.labels(broker=broker, method="PUT").time():
with urllib.request.urlopen(req, timeout=5) as resp:
http_requests_total.labels(broker=broker, method="PUT", status_code=str(resp.status)).inc()
return resp.status in (200, 204)
except urllib.error.HTTPError as e:
http_requests_total.labels(broker=broker, method="PUT", status_code=str(e.code)).inc()
if e.code == 409:
return True # Already exists - that's fine
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc()
print(f" ⚠️ HTTP PUT {url}{e}")
return False
except Exception as e:
http_requests_total.labels(broker=broker, method="PUT", status_code="exception").inc()
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc()
print(f" ⚠️ HTTP PUT {url}{e}")
return False
# =============================================================================
# MQTT Client multi-broker
# =============================================================================
class MultiMQTT:
def __init__(self):
self.clients: dict[str, mqtt.Client] = {}
self.ok: dict[str, bool] = {}
self._lock = threading.Lock()
self._setup()
def _mk_client(self, name: str, host: str, port: int,
tls: bool = False, user: str = "", pwd: str = "",
ws: bool = False, use_v5: bool = False) -> mqtt.Client:
cid = f"smartcity-sim-{name}-{os.getpid()}"
protocol = mqtt.MQTTv5 if use_v5 else mqtt.MQTTv311
# Use Callback API v1 for compatibility with existing lambda callbacks
c = mqtt.Client(client_id=cid, protocol=protocol, callback_api_version=mqtt.CallbackAPIVersion.VERSION1)
if user:
c.username_pw_set(user, pwd)
if tls:
c.tls_set(cert_reqs=ssl.CERT_NONE)
c.tls_insecure_set(True)
if ws:
c.ws_set(b"/mqtt")
c.on_connect = lambda _c, _, __, rc: self._on_connect(name, rc)
c.on_disconnect = lambda _c, _, __: self._on_disconnect(name)
try:
c.connect(host, port, keepalive=60, clean_start=True)
c.loop_start()
except Exception as e:
print(f"[MQTT] ❌ {name} @ {host}:{port}{e}")
self.ok[name] = False
return c
def _on_connect(self, name: str, rc: int):
with self._lock:
if rc == 0:
self.ok[name] = True
mqtt_broker_connected.labels(broker=name).set(1)
mqtt_connection_total.labels(broker=name, status="success").inc()
print(f"[MQTT] ✅ {name} connecté")
else:
self.ok[name] = False
mqtt_broker_connected.labels(broker=name).set(0)
mqtt_connection_total.labels(broker=name, status="failure").inc()
print(f"[MQTT] ❌ {name} rc={rc}")
def _on_disconnect(self, name: str):
with self._lock:
self.ok[name] = False
mqtt_broker_connected.labels(broker=name).set(0)
print(f"[MQTT] ⚠️ {name} déconnecté")
def _setup(self):
# Utiliser les variables d'environnement pour les brokers
mqtt_version = os.environ.get("MQTT_PROTOCOL_VERSION", "311")
use_v5 = (mqtt_version == "5")
print(f"[MQTT] Version du protocole: {'v5.0' if use_v5 else 'v3.1.1'} (MQTT_PROTOCOL_VERSION={mqtt_version})")
# Check which brokers are enabled
enable_emqx = os.environ.get("ENABLE_EMQX", "1") == "1"
enable_mosquitto = os.environ.get("ENABLE_MOSQUITTO", "1") == "1"
enable_bunkerm = os.environ.get("ENABLE_BUNKER", "1") == "1"
brokers = []
if enable_emqx:
brokers.append(("emqx", EMQX_HOST, EMQX_PORT, False, "", "", use_v5))
if enable_mosquitto:
brokers.append(("mosquitto", MOSQUITTO_HOST, MOSQUITTO_PORT, False, "", "", use_v5))
if enable_bunkerm:
brokers.append(("bunkerm", BUNKERM_HOST, BUNKERM_PORT, False, "bunker", "bunker", use_v5))
# OpenRemote MQTT broker (pour agents MQTT créés dans OpenRemote)
# Le broker Artemis d'OR accepte les connexions anonymes (pas de credentials)
if ENABLE_OPENREMOTE:
brokers.append(("openremote", "openremote_manager_1", 1883, False, "", "", use_v5))
print(f"[MQTT] 🔌 Connexion aux brokers (EMQX={enable_emqx}, Mosquitto={enable_mosquitto}, BunkerM={enable_bunkerm})...")
print("[MQTT] 🔌 Connexion aux brokers...")
for name, host, port, tls, user, pwd, use_v5 in brokers:
c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd, use_v5=use_v5)
self.clients[name] = c
self.ok[name] = False
# Attendre que tous les brokers soient connectés (Artemis peut être lent)
for _ in range(20):
time.sleep(1)
if all(self.ok.values()):
break
print(f"[MQTT] État des connexions: {dict(self.ok)}")
def publish(self, topic: str, payload: str, sensor_type: str = "unknown") -> dict[str, bool]:
results = {}
payload_bytes = len(payload.encode())
with self._lock:
for name, client in self.clients.items():
if self.ok.get(name, False):
with publish_duration.labels(broker=name).time():
try:
r = client.publish(topic, payload, qos=1)
success = (r.rc == mqtt.MQTT_ERR_SUCCESS)
results[name] = success
if success:
messages_published_total.labels(broker=name, sensor_type=sensor_type).inc()
message_payload_size.labels(broker=name).observe(payload_bytes)
else:
messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="mqtt_rc").inc()
except Exception:
results[name] = False
messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="exception").inc()
else:
results[name] = False
return results
# def publish_iot_agent(self, sid: str, payload: dict, sensor_type: str = "unknown") -> bool:
# """Publie sur le topic IoT-Agent (json/smartcity-api-key/{sid}/attrs) via les 3 brokers."""
topic = f"json/smartcity-api-key/{sid}/attrs"
msg = json.dumps(payload, ensure_ascii=False)
payload_bytes = len(msg.encode())
success = False
# Publier sur les 3 brokers: emqx, mosquitto, bunkerm
for broker_name in ['emqx', 'mosquitto', 'bunkerm']:
client_ok = self.clients.get(broker_name) is not None and self.ok.get(broker_name, False)
print(f"[MQTT-DEBUG] {broker_name}: client_exists={self.clients.get(broker_name) is not None}, ok={self.ok.get(broker_name, False)}")
if client_ok:
try:
r = self.clients[broker_name].publish(topic, msg, qos=1)
if r.rc == mqtt.MQTT_ERR_SUCCESS:
success = True
# messages_published_total.labels(broker='iot-agent', sensor_type=sensor_type).inc()
# message_payload_size.labels(broker='iot-agent').observe(payload_bytes)
except Exception:
pass # IoT-Agent code removed
return success
def stop(self):
for name, c in self.clients.items():
try:
c.loop_stop()
c.disconnect()
except Exception:
pass
# =============================================================================
# URLs de base (résolues au démarrage)
# =============================================================================
#ORION_HOST = "localhost"
#ORION_PORT = "2026"
#ORION_URL = f"http://{ORION_HOST}:{ORION_PORT}"
#STELLIO_URL = os.environ.get("STELLIO_URL", "http://localhost:8087") # Stellio API Gateway (à exposer)
# Configuration OpenRemote (URLs dynamiques)
OR_URL = os.environ.get("OR_URL", "http://localhost:8080") # OpenRemote Manager (Traefik)
OR_REALM = os.environ.get("OR_REALM", "smartcity") # Default: smartcity
OR_TOKEN_URL = os.environ.get("OR_TOKEN_URL", "http://localhost:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token")
OR_TOKEN_TTL = int(os.environ.get("OR_TOKEN_TTL", "3600")) # Refresh token every hour
#STELLIO_TENANT = os.environ.get("STELLIO_TENANT", "urn:ngsi-ld:tenant:default")
def publish_bunkerm(sid: str, sensor: dict, values: dict) -> bool:
"""Publie sur BunkerM via HTTP API (port 2000) avec session."""
import base64, http.cookiejar, urllib.request, json
from datetime import datetime, timezone
host = "bunkerm_bunkerm_1:2000"
login_url = f"http://{host}/login"
data_url = f"http://{host}/api/sensors/data"
# 1. Cookie jar pour maintenir la session
cj = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
# 2. Authentification (Basic) pour obtenir le cookie de session
creds = base64.b64encode(b"bunker:bunker").decode()
auth_header = {"Authorization": f"Basic {creds}"}
try:
# GET sur /login pour initialiser la session
req_login = urllib.request.Request(login_url, headers=auth_header)
opener.open(req_login, timeout=5)
except Exception as e:
print(f" ⚠️ BunkerM login → {e}")
return False
# 3. Préparer le payload
payload = {
"sensor_id": sid,
"sensor_type": sensor["type"],
"timestamp": datetime.now(timezone.utc).isoformat(),
}
for k, v in values.items():
if k not in payload:
payload[k] = v
# 4. POST avec le cookie de session
data = json.dumps(payload).encode()
req = urllib.request.Request(
data_url,
data=data,
headers={**auth_header, "Content-Type": "application/json"},
method="POST"
)
try:
with http_request_duration.labels(broker="bunkerm", method="POST").time():
with opener.open(req, timeout=5) as resp:
http_requests_total.labels(broker="bunkerm", method="POST", status_code=str(resp.status)).inc()
messages_published_total.labels(broker="bunkerm", sensor_type=sensor["type"]).inc()
print(f" ✅ BunkerM: HTTP {resp.status}")
return resp.status in (200, 201, 204)
except Exception as e:
http_requests_total.labels(broker="bunkerm", method="POST", status_code="exception").inc()
messages_errors_total.labels(broker="bunkerm", sensor_type=sensor["type"], error_type="exception").inc()
print(f" ⚠️ BunkerM POST → {e}")
return False
def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool:
"""Crée le Thing (1 par capteur) + Datastreams, puis POST l'Observation."""
# Cache : {sid: (thing_id, {field: ds_id})}
if sid in _frost_cache:
thing_id, ds_map = _frost_cache[sid]
if field in ds_map:
ds_id = ds_map[field]
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
obs = {
"resultTime": datetime.now(timezone.utc).isoformat(),
"result": value,
"FeatureOfInterest": {
"name": f"Location {sid}",
"description": f"Feature of interest for sensor {sid}",
"encodingType": "application/vnd.geo+json",
"feature": {
"type": "Point",
"coordinates": [sensor.get("lon", -61.0), sensor.get("lat", 14.6)]
}
}
}
if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"):
print(f" ✅ FROST Observation {sid}/{field} → OK (cached)")
return True
else:
print(f" ⚠️ FROST Observation {sid}/{field} → échec")
return False
# Premier appel pour ce capteur : créer Thing + tous les Datastreams
# Topic MQTT pour traçabilité
stype = sensor["type"]
topic = f"city/sensors/{stype}/{sid}"
thing_payload, datastreams = _frost_payload(sid, sensor, source="simulator", topic=topic)
print(f" 📊 FROST: POST Thing {sid}...")
tid = _http_post(f"{FROST_URL}/Things", thing_payload, FROST_HEADERS, broker="frost")
if not tid:
print(f" ⚠️ FROST Thing {sid} → échec création")
return False
print(f" ✅ FROST Thing {sid} créé (ID: {tid})")
# Créer les Datastreams
ds_map = {}
for f, ds, _ in datastreams:
ds["Thing"] = {"@iot.id": tid}
print(f" 📊 FROST: POST Datastream {sid}/{f}...")
ds_id = _http_post(f"{FROST_URL}/Datastreams", ds, FROST_HEADERS, broker="frost")
if ds_id:
print(f" ✅ FROST Datastream {sid}/{f} créé (ID: {ds_id})")
ds_map[f] = ds_id
else:
print(f" ⚠️ FROST Datastream {sid}/{f} → échec")
_frost_cache[sid] = (tid, ds_map)
# Poster l'observation pour le field actuel
if field in ds_map:
ds_id = ds_map[field]
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
obs = {"resultTime": datetime.now(timezone.utc).isoformat(), "result": value}
if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"):
print(f" ✅ FROST Observation {sid}/{field} → OK")
return True
return False
# =============================================================================
# OpenRemote REST
# =============================================================================
_or_token_cache = {"token": "", "expires": 0}
def _get_or_token() -> str:
"""Obtain an OpenRemote token via password grant (admin user)."""
import time, urllib.parse
if _or_token_cache["token"] and _or_token_cache["expires"] > time.time() + 60:
return _or_token_cache["token"]
try:
# Use password grant with admin user (full rights)
token_url = f"http://openremote-keycloak-1:8080/auth/realms/{OR_TOKEN_REALM}/protocol/openid-connect/token"
client_id = os.environ.get("OR_CLIENT_ID", "openremote")
client_secret = os.environ.get("OR_CLIENT_SECRET", "0oQjzTfiEELYmj5jFwT4iIuWUDtQDvVa")
data = urllib.parse.urlencode({
"grant_type": "password",
"username": os.environ.get("OR_ADMIN_USER", "admin"),
"password": os.environ.get("OR_ADMIN_PASS", "Digitribe972"),
"client_id": client_id,
"client_secret": client_secret
}).encode()
req = urllib.request.Request(
token_url,
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
with urllib.request.urlopen(req, timeout=5) as r:
token_data = json.loads(r.read().decode())
_or_token_cache["token"] = token_data["access_token"]
_or_token_cache["expires"] = time.time() + token_data.get("expires_in", 300) - 60
return _or_token_cache["token"]
except Exception as e:
print(f" ⚠️ OpenRemote token → {e}")
return ""
def _or_put(asset_id: str, payload: dict) -> bool:
"""PUT update sur un asset OpenRemote (avec version)."""
token = _get_or_token()
if not token:
return False
try:
# Récupérer la version actuelle de l'asset
get_url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}"
get_req = urllib.request.Request(get_url, headers={"Authorization": f"Bearer {token}"})
version = 1
try:
with urllib.request.urlopen(get_req, timeout=5) as resp:
asset_data = json.loads(resp.read().decode())
version = asset_data.get("version", 1)
except:
pass # Si GET échoue, utiliser version=1
# Ajouter la version au payload
payload["version"] = version
body = json.dumps(payload).encode()
url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}"
req = urllib.request.Request(url, data=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"If-Match": str(version),
},
method="PUT")
with http_request_duration.labels(broker="openremote", method="PUT").time():
with urllib.request.urlopen(req, timeout=5) as resp:
http_requests_total.labels(broker="openremote", method="PUT", status_code=str(resp.status)).inc()
messages_published_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown")).inc()
return resp.status in (200, 204)
except urllib.error.HTTPError as e:
http_requests_total.labels(broker="openremote", method="PUT", status_code=str(e.code)).inc()
messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="http_error").inc()
print(f" ⚠️ OR PUT {asset_id} → HTTP {e.code}")
return False
except Exception as e:
http_requests_total.labels(broker="openremote", method="PUT", status_code="exception").inc()
messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="exception").inc()
print(f" ⚠️ OR PUT {asset_id}{e}")
return False
def publish_openremote(sid: str, sensor: dict, values: dict) -> bool:
"""Met à jour les attributs d'un asset OpenRemote via REST."""
# Mapping sid → asset ID (realm master) — assets avec agentLink + location
# Note: le compteur SENSORS est global (traffic=0-9, airquality=10-19, parking=20-29, noise=30-39, weather=40-49, light=50-59)
ASSET_MAP = {
"traffic_000": "429858caca3341f56fbf65",
"traffic_001": "301218322f5aaca9d6d168",
"traffic_002": "bd35fe2a90133118b9b004",
"traffic_003": "da59ec9301c4efd3fd55c4",
"traffic_004": "834f4b7b9df848f5c5c2d8",
"airquality_010": "0f922351a9894bc0144c94",
"airquality_011": "4f83219bbee703b3e0a255",
"airquality_012": "381cc31ab83dd66ed4be37",
"airquality_013": "808b73c22ecd19589a33be",
"airquality_014": "03c18679226329183b44b6",
"parking_020": "0ee6689f5c0499643d48eb",
"parking_021": "8fb6b2d0601d98b47a4172",
"parking_022": "0c00bda9e5075d12d59694",
"parking_023": "ae981dc9d155d1313b9acf",
"parking_024": "96020cc5aef95c5fda7bb4",
"noise_030": "0be31930e45d2eb5c12ccd",
"noise_031": "1802e76e3432d5eda1deb7",
"noise_032": "08edb6518750d50644afe3",
"noise_033": "93d09bfac36d2ed95fc858",
"noise_034": "7942726d84d2bd29de1e5d",
"weather_040": "9942f881ab6df375d8d9fa",
"weather_041": "5400fdf5c51a4fe4f5a89c",
"weather_042": "1a3bf32aa5208892e68965",
"weather_043": "d3725f922f96085f2df3f7",
"weather_044": "13be192a8c23dd8fdceada",
"light_050": "1f4302946b1a4a1ded23f6",
"light_051": "35e6ef027ed9a157ad8780",
"light_052": "526538589aa981bdc77ce9",
"light_053": "d4a6ac7f34d64e581937c0",
"light_054": "40bbe989be2ae5b2a98b30",
}
asset_id = ASSET_MAP.get(sid)
if not asset_id:
return False
# Construire les attributs à jour
now = datetime.now(timezone.utc).isoformat()
attrs = {
"timestamp": {"type": "DateTime", "value": now, "timestamp": now},
"battery_level": {"type": "Number", "value": random.randint(60, 100), "timestamp": now},
}
# Mapper les valeurs du payload vers les attributs OR
field_map = {
"temperature_celsius": "temperature",
"humidity_percent": "humidity",
"noise_level_db": "noiseLevel",
"pm25_ugm3": "airQuality",
"vehicle_count": "trafficFlow",
"available_spots": "parking",
"brightness_lux": "light",
"flood": "flood",
}
for field, val in values.items():
attr_name = field_map.get(field, field)
attrs[attr_name] = {
"type": "Number",
"value": val,
"timestamp": now,
"unit": "µg/m³" if "ugm3" in field else ("°C" if "celsius" in field else ("%" if "percent" in field or "humidity" in field else ("dB" if "db" in field else ""))),
}
# Ajouter la location du capteur (GeoJSON Point)
lat = sensor.get("lat", 0)
lon = sensor.get("lon", 0)
attrs["location"] = {
"type": "GeoJSONPoint",
"value": {"type": "Point", "coordinates": [lat, lon]},
"timestamp": now,
}
payload = {
"id": asset_id,
"name": sensor["name"],
"type": "IOTSensor",
"realm": OR_REALM,
"attributes": attrs,
}
return _or_put(asset_id, payload)
# =============================================================================
# Pulsar — HTTP REST Producer
# API: POST http://host:8080/admin/v2/persistent/public/default/{topic}/produce
# Payload: {"messages": [{"payload": "<base64>", "properties": {...}}]}
# Topics auto-créés par le premier message (Pulsar standalone)
# =============================================================================
_pulsar_session = None
def _get_pulsar_session():
global _pulsar_session
if _pulsar_session is None:
import urllib.request
_pulsar_session = urllib.request
return _pulsar_session
def _init_pulsar() -> bool:
"""Teste la connectivité Pulsar au démarrage."""
try:
import urllib.request
req = urllib.request.Request(f"{PULSAR_BASE}/admin/v2/clusters")
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status == 200:
print(f"[PULSAR] ✅ Connected to {PULSAR_BASE}")
return True
except Exception as e:
print(f"[PULSAR] ⚠️ Cannot reach {PULSAR_BASE}: {e}")
return False
def publish_pulsar(sid: str, sensor: dict, payload: dict) -> bool:
"""Publie un message sur Pulsar via le client Python (port binaire 6650)."""
stype = sensor["type"]
topic = f"persistent://public/default/smartcity-{stype.replace('-','')}"
try:
import pulsar
with publish_duration.labels(broker="pulsar").time():
client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650")
producer = client.create_producer(topic)
body = json.dumps(payload, ensure_ascii=False).encode()
producer.send(body, properties={"sensor_id": sid, "source": "simulator"})
client.close()
messages_published_total.labels(broker="pulsar", sensor_type=stype).inc()
message_payload_size.labels(broker="pulsar").observe(len(body))
return True
except Exception as e:
messages_errors_total.labels(broker="pulsar", sensor_type=stype, error_type="exception").inc()
print(f" ⚠️ Pulsar → {e}")
return False
# =============================================================================
# Redpanda / Kafka — HTTP REST Proxy
# API: POST http://host:8082/topics/{topic}
# Payload: {"records": [{"value": "<base64>"}]}
# Topics auto-créés par le premier message (Redpanda)
# =============================================================================
_redpanda_session = None
def _get_redpanda_session():
global _redpanda_session
if _redpanda_session is None:
import urllib.request
_redpanda_session = urllib.request
return _redpanda_session
def _init_redpanda() -> bool:
"""Teste la connectivité Redpanda au démarrage."""
try:
import urllib.request
req = urllib.request.Request(f"{REDPANDA_BASE}/v1/status/alive")
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status == 200:
print(f"[REDPANDA] ✅ Connected to {REDPANDA_BASE}")
return True
except Exception as e:
print(f"[REDPANDA] ⚠️ Cannot reach {REDPANDA_BASE}: {e}")
return False
def publish_redpanda(sid: str, sensor: dict, payload: dict) -> bool:
"""Publie un message sur Redpanda/Kafka via le REST Proxy."""
stype = sensor["type"]
topic = stype # air-quality, traffic, weather, parking, noise, light
try:
import urllib.request, base64
body = json.dumps(payload, ensure_ascii=False)
b64 = base64.b64encode(body.encode()).decode()
record = {
"records": [{"value": b64}]
}
url = f"{REDPANDA_BASE}/topics/{topic}"
req = urllib.request.Request(
url,
data=json.dumps(record).encode(),
headers={"Content-Type": "application/vnd.kafka.json.v2+json"},
method="POST"
)
with http_request_duration.labels(broker="redpanda", method="POST").time():
with urllib.request.urlopen(req, timeout=8) as resp:
http_requests_total.labels(broker="redpanda", method="POST", status_code=str(resp.status)).inc()
messages_published_total.labels(broker="redpanda", sensor_type=stype).inc()
message_payload_size.labels(broker="redpanda").observe(len(body.encode()))
return resp.status in (200, 201, 204)
except urllib.error.HTTPError as e:
http_requests_total.labels(broker="redpanda", method="POST", status_code=str(e.code)).inc()
messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="http_error").inc()
print(f" ⚠️ Redpanda → {e.code}")
return False
except Exception as e:
http_requests_total.labels(broker="redpanda", method="POST", status_code="exception").inc()
messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="exception").inc()
print(f" ⚠️ Redpanda → {e}")
return False
def publish_influx(sid: str, sensor: dict, values: dict) -> bool:
"""Write sensor data to InfluxDB (async, non-blocking)."""
if not _influx_write_api:
influx_write_total.labels(status="skipped").inc()
return False
def _write_async():
try:
stype = sensor["type"]
lat = sensor.get("lat", BASE_LAT)
lon = sensor.get("lon", BASE_LON)
points = []
for field, value in values.items():
if isinstance(value, (int, float)):
p = influxdb_client.Point(stype)\
.tag("sensor_id", sid)\
.tag("location", sensor.get("name", sid))\
.field(field, float(value))\
.field("lat", float(lat))\
.field("lon", float(lon))
points.append(p)
if points:
_influx_write_api.write(bucket=INFLUX_BUCKET, record=points)
influx_write_total.labels(status="success").inc()
print(f" 📈 InfluxDB: {len(points)} points written")
except Exception as e:
influx_write_total.labels(status="error").inc()
print(f" ⚠️ InfluxDB → {e}")
# Exécution asynchrone (non-bloquante)
t = threading.Thread(target=_write_async, daemon=True)
t.start()
return True # Async: on ne peut pas savoir immédiatement
def main():
print("╔══════════════════════════════════════════════════╗")
print("║ Smart City Simulator — Martinique ║")
print("╚══════════════════════════════════════════════════╝")
print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s")
# print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}")
print(f"[CFG] InfluxDB: {ENABLE_INFLUX} | Pulsar: {ENABLE_PULSAR} | Redpanda: {ENABLE_REDPANDA}")
# --- Démarrer le serveur Prometheus ---
_start_metrics_server()
# --- Configurer les gauges ---
for stype, count in SENSOR_COUNTS.items():
sensors_total.labels(sensor_type=stype).set(count)
up.set(1)
# Init connectivity checks
if ENABLE_PULSAR:
_init_pulsar()
# Test immédiat
print(f" 🌪️ DEBUG: Test Pulsar direct...", flush=True)
test_payload = {"type": "test", "value": 123}
test_result = publish_pulsar("test_001", {"type": "air-quality"}, test_payload)
print(f" 🌪️ DEBUG: Test Pulsar result: {test_result}", flush=True)
if ENABLE_REDPANDA:
_init_redpanda()
mqtt_client = MultiMQTT()
running = True
def signal_handler(*_):
nonlocal running
running = False
up.set(0)
print("\n[SIM] 🛑 Arrêt...")
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
iteration = 0
while running:
iteration += 1
ts = datetime.now().strftime("%H:%M:%S")
print(f"\n[SIM] ⏱️ It #{iteration}{ts}")
for sid, sensor in SENSORS.items():
stype = sensor["type"]
# Utiliser l'index du capteur dans FIXED_LOCATIONS (1-5) pour correspondre aux agents OpenRemote
# Le sid est formaté comme {stype}_{counter:03d} avec counter global
# On extrait l'index du capteur depuis le sid (derniers chiffres)
sensor_num = int(sid.split("_")[1])
# Calculer l'index 1-5 basé sur la position dans le type
# traffic: 0-9, airquality: 10-19, parking: 20-29, noise: 30-39, weather: 40-49, light: 50-59
type_offsets = {"traffic": 0, "airquality": 10, "parking": 20, "noise": 30, "weather": 40, "light": 50}
type_offset = type_offsets.get(stype, 0)
sensor_index = sensor_num - type_offset + 1 # 1-indexed
topic = f"smartcity/{stype}/{sensor_index}"
# --- Payload MQTT (ATTRIBUTES ONLY - pas de id/type/lat/lon !)
# # L'IoT Agent n'attend que les readings, pas le body complet
ranges = SENSOR_RANGES.get(stype, {})
payload_mqtt = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"battery_level": random.randint(60, 100),
}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
payload_mqtt[field] = round(random.uniform(lo, hi), 1)
elif isinstance(val_range, list):
payload_mqtt[field] = random.choice(val_range)
msg = json.dumps(payload_mqtt, ensure_ascii=False)
# --- MQTT publish ---
results = mqtt_client.publish(topic, msg, sensor_type=stype)
ok_mqtt = [n for n, r in results.items() if r]
if ok_mqtt:
print(f" 📤 {topic}{','.join(ok_mqtt)}")
# # --- IoT-Agent (via EMQX) ---
# if ENABLE_IOT_AGENT:
# ok_iot = mqtt_client.publish_iot_agent(sid, payload_mqtt, sensor_type=stype)
# print(f" 🤖 IoT-Agent: {'✅' if ok_iot else '❌'}")
# Extraire les valeurs pour OpenRemote
or_values = {}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
or_values[field] = round(random.uniform(lo, hi), 1)
# --- OpenRemote REST --- (DÉSACTIVÉ: les agents MQTT gèrent les mises à jour)
# Le REST échoue en 403 sur les assets avec agentLink
# if ENABLE_OPENREMOTE:
# ok_or = publish_openremote(sid, sensor, or_values)
# print(f" 🏠 OpenRemote: {'✅' if ok_or else '⚠️ skipped'}")
# # --- Orion-LD --- (DÉSACTIVÉ: tout passe par les IoT-Agents MQTT)
# # if ENABLE_ORION:
# # ok_or = publish_orion(sid, sensor)
# # print(f" 🌐 Orion-LD: {'✅' if ok_or else '⚠️ skipped'}")
# # --- Stellio --- (DÉSACTIVÉ: tout passe par les IoT-Agents MQTT)
# # if ENABLE_STELLIO:
# # ok_st = publish_stellio(sid, sensor)
# # print(f" 🏢 Stellio: {'✅' if ok_st else '❌'}")
# --- FROST ---
if ENABLE_FROST:
ranges2 = SENSOR_RANGES.get(stype, {})
for field, val_range in ranges2.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
val = round(random.uniform(lo, hi), 1)
ok_fr = publish_frost(sid, sensor, field, val)
print(f" 📊 FROST: {'' if ok_fr else ''}")
# --- InfluxDB ---
if ENABLE_INFLUX:
influx_vals = {}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
influx_vals[field] = round(random.uniform(lo, hi), 1)
ok_influx = publish_influx(sid, sensor, influx_vals)
print(f" 📈 InfluxDB: {'' if ok_influx else ''}")
# --- Pulsar (HTTP REST) ---
if ENABLE_PULSAR:
print(f" 🌪️ DEBUG: calling publish_pulsar for {sid}, payload_mqtt exists: {bool(locals().get('payload_mqtt'))}", flush=True)
ok_pulsar = publish_pulsar(sid, sensor, payload_mqtt)
print(f" 🌪️ Pulsar: {'' if ok_pulsar else ''}")
# --- Redpanda (Kafka REST Proxy) ---
if ENABLE_REDPANDA:
ok_redpanda = publish_redpanda(sid, sensor, payload_mqtt)
print(f" 🐟 Redpanda: {'' if ok_redpanda else ''}")
# --- BunkerM HTTP ---
if os.getenv("BUNKERM_HTTP", "0") == "1":
ok_bunkerm = publish_bunkerm(sid, sensor, payload_mqtt)
print(f" 📦 BunkerM: {'' if ok_bunkerm else ''}")
print(f"[SIM] ✅ {len(SENSORS)} capteurs | MQTT OK: {sum(mqtt_client.ok.values())}/{len(mqtt_client.clients)} | OR: {ENABLE_OPENREMOTE}")
try:
time.sleep(INTERVAL)
except KeyboardInterrupt:
break
mqtt_client.stop()
print("[SIM] ✅ Arrêté proprement.")
if __name__ == "__main__":
main()