Files
smart-city-digital-twin-mar…/simulator.py
Eric FELIXINE ad31e2289f fix: replace random coords with fixed Martinique locations (no more sea sensors)
- Replace random.uniform(±0.02°) with FIXED_LOCATIONS dict keyed by type+name
- All 30 named sensor locations mapped to real Martinique coordinates on land
- Coordonnées Martinique: 14.4°N–14.88°N, -61.25°W–-60.85°W
- OpenRemote DB: UPDATE all IOTSensor assets with wrong coords (PostgreSQL jsonb_set)
- All 34 sensor instances now validated as TERRE (100% on land)

Fixed sensors: traffic, airquality, parking, noise, weather, light
2026-05-05 21:24:29 -04:00

1106 lines
49 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Smart City IoT Simulator — Martinique (14.6°N, 61.2°W)
=======================================================
Publie vers MULTIPLES brokers MQTT + context brokers NGSI-LD.
Brokers MQTT:
- EMQX: emqx_emqx_1:1883 (sans auth)
- Mosquitto: mosquitto-traefik:1883 (bunker/bunker)
- BunkerM: bunkerm_bunkerm_1:1900 (TLS, bunker/bunker)
- OpenRemote: openremote-manager-1:1883 (admin/Digitribe972)
Context Brokers REST:
- Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD)
- Stellio: stellio-api-gateway:8080 (NGSI-LD)
- FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings)
Streaming Platforms:
- Pulsar: smart-city-pulsar:8080 (HTTP REST Producer API)
- Redpanda: smart-city-redpanda:8082 (Kafka REST Proxy)
Time-Series DB:
- InfluxDB v2: smart-city-influxdb:8086 (via influxdb-client)
Variables d'environnement:
PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s)
BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France)
ENABLE_ORION=1 : activer Orion-LD (défaut: 1)
ENABLE_STELLIO=1 : activer Stellio (défaut: 1)
ENABLE_FROST=1 : activer FROST-Server (défaut: 1)
ENABLE_INFLUX=1 : activer InfluxDB v2 (défaut: 1)
ENABLE_PULSAR=1 : activer Apache Pulsar (défaut: 1)
ENABLE_REDPANDA=1 : activer Redpanda Kafka (défaut: 1)
"""
import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse
import paho.mqtt.client as mqtt
import urllib.request, urllib.error
from datetime import datetime, timezone
from typing import Any
# InfluxDB support
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
# =============================================================================
# Configuration des brokers MQTT
# Configuration des brokers MQTT
# Par défaut localhost (simulateur tourne sur l'hôte)
EMQX_HOST = os.environ.get("EMQX_HOST", "localhost")
EMQX_PORT = int(os.environ.get("EMQX_PORT", "11883"))
MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "localhost")
MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883"))
BUNKERM_HOST = os.environ.get("BUNKERM_HOST", "mqtt.digitribe.fr")
BUNKERM_PORT = int(os.environ.get("BUNKERM_PORT", "1900"))
# =============================================================================
# Configuration
# =============================================================================
BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091"))
BASE_LON = float(os.environ.get("BASE_LON", "-61.2155"))
INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "1")) # 1s pour temps réel
ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1"
ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1").lower() in ("1", "true", "yes", "on")
ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1"
ENABLE_OPENREMOTE = os.environ.get("ENABLE_OPENREMOTE", "1") == "1"
OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin")
OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "Digitribe972")
OR_REALM = os.environ.get("OR_REALM", "smartcity")
OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token
FROST_URL = os.environ.get("FROST_URL", "http://localhost:8090/FROST-Server/v1.1") # Exposer frost_http-web-1:8080 -> host:8086
# Pulsar config (HTTP REST — pulsar-admin + producer REST API)
ENABLE_PULSAR = os.environ.get("ENABLE_PULSAR", "1").lower() in ("1", "true", "yes", "on")
PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar")
PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "8080"))
PULSAR_BASE = f"http://{PULSAR_HOST}:{PULSAR_PORT}"
# Redpanda / Kafka config (REST Proxy HTTP)
ENABLE_REDPANDA = os.environ.get("ENABLE_REDPANDA", "1").lower() in ("1", "true", "yes", "on")
REDPANDA_HOST = os.environ.get("REDPANDA_HOST", "smart-city-redpanda")
REDPANDA_PORT = int(os.environ.get("REDPANDA_PORT", "8082"))
REDPANDA_BASE = f"http://{REDPANDA_HOST}:{REDPANDA_PORT}"
# InfluxDB config
ENABLE_INFLUX = os.environ.get("ENABLE_INFLUX", "1").lower() in ("1", "true", "yes", "on")
INFLUX_URL = os.environ.get("INFLUX_URL", "http://smart-city-influxdb:8086") # InfluxDB v2 sur smartcity-shared
INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe")
INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "iot_data")
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-secret-admin-token")
# Initialize InfluxDB client
_influx_client = None
_influx_write_api = None
if ENABLE_INFLUX:
try:
_influx_client = influxdb_client.InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
_influx_write_api = _influx_client.write_api(write_options=SYNCHRONOUS)
print(f"[INFLUX] ✅ Connected to {INFLUX_URL}")
except Exception as e:
print(f"[INFLUX] ❌ Connection failed: {e}")
SENSOR_COUNTS = {
"traffic": int(os.environ.get("SENSOR_COUNT_traffic", "3")),
"airquality": int(os.environ.get("SENSOR_COUNT_airquality", "2")),
"parking": int(os.environ.get("SENSOR_COUNT_parking", "2")),
"noise": int(os.environ.get("SENSOR_COUNT_noise", "1")),
"weather": int(os.environ.get("SENSOR_COUNT_weather", "1")),
"light": int(os.environ.get("SENSOR_COUNT_light", "1")),
}
# Si SENSOR_COUNT est défini, multiplier les counts de façon proportionnelle
_total_default = sum(SENSOR_COUNTS.values())
if "SENSOR_COUNT" in os.environ:
target = int(os.environ["SENSOR_COUNT"])
ratio = target / _total_default
for k in SENSOR_COUNTS:
SENSOR_COUNTS[k] = max(1, int(SENSOR_COUNTS[k] * ratio))
# =============================================================================
# Localisation des capteurs Martinique — Coordonnées FIXES sur terre ferme
# IMPORTANT : ±0.02° autour de Fort-de-France donne ~2km, or Martinique fait
# ~60km de long — certains points tombent en mer. Solution : coordonnées fixes.
# =============================================================================
# Coordonnées réelles Martinique (terre ferme uniquement)
# Martinique : 14.4°N14.9°N, -61.23°W-60.8°W
FIXED_LOCATIONS: dict[str, dict[str, tuple[float, float]]] = {
"traffic": {
# Fort-de-France — grands axes
"Carrefour Central": (14.6036, -61.1783), # Place du Palais, centre-ville
"Avenue des Caraïbes": (14.6100, -61.1850), # Route de Schoelcher (N1)
"Boulevard Pasteur": (14.6150, -61.1700), # Boulevard Pasteur, nord FdF
"Rue des Flamboyants": (14.5970, -61.1900), # Zone Industrielle, Lamentin
"Place de la République": (14.6000, -61.2100), # Centre administratif, sud FdF
},
"airquality": {
# Points de mesure qualité de l'air sur terre
"Quartier Bonde": (14.6050, -61.1750), # Bonde, sud-est FdF
"Port de Fort-de-France": (14.5980, -61.2250), # Zone portuaire, bord de mer (OK, port ≠ mer)
"Château Denis": (14.6200, -61.1550), # Château Denis, nord montagne
"Lamentin Aéroport": (14.5950, -61.1700), # Aéroport, Lamentin
"Schoelcher Village": (14.7400, -61.1850), # Schoelcher, nord-ouest
},
"parking": {
# Parkings publics sur terre
"Parking Rivière-Saleé": (14.5820, -61.2050), # Rivière-Salée (sud)
"Parking Cluny": (14.6050, -61.1750), # Cluny, FdF
"Parking Média": (14.6000, -61.1850), # Quartier Média, FdF
"Parking Grand-Camp": (14.6100, -61.1700), # Grand-Camp, Lamentin
"Parking Dillon": (14.6200, -61.1650), # Dillon, nord FdF
},
"noise": {
# Zones urbainesbruyantes
"Rue des Arts": (14.6020, -61.1800), # Rue des Arts, centre FdF
"Marché Central": (14.6000, -61.2100), # Marché Central, FdF
"Université Fort-de-France": (14.6400, -61.1600), # Campus Schoe, nord
"Stade de Dillon": (14.6250, -61.1600), # Stade Dillon, nord
"Place du Champs de Mars": (14.6030, -61.1750), # Champs de Mars, FdF
},
"weather": {
# Stations météo — terre ferme uniquement
"Station Météo Lamentin": (14.5950, -61.1650), # Aéroport Lamentin
"Station Schoelcher": (14.7350, -61.1800), # Schoelcher, NW
"Station Ajoupa-Bouillon": (14.8100, -61.0500), # Ajoupa-Bouillon, nord (interieur)
"Station Le François": (14.6150, -60.9000), # Le François, côte atlantique est
"Station Le Robert": (14.6800, -60.9400), # Le Robert, côte atlantique
},
"light": {
# Éclairage public — zones urbaines
"Eclairage Rue des Mouettes": (14.6050, -61.1800), # Rue des Mouettes, FdF
"Candela Boulevard": (14.6150, -61.1700), # Boulevard Pasteur
"Lumiere Rue des Acacias": (14.6000, -61.1850), # Rue des Acacias, FdF
"Feux Signalisation Centre": (14.6030, -61.1780), # Carrefours centraux
"Eclairage Port": (14.5980, -61.2250), # Zone portuaire
},
}
def _build_locs(stype: str, count: int) -> list[dict]:
"""Construit la liste des capteurs avec coordonnées fixes (sur terre)."""
locs = []
names = list(FIXED_LOCATIONS.get(stype, {stype: (BASE_LAT, BASE_LON)}).keys())
# Répéter les noms si count > len(names)
for i in range(count):
name = names[i % len(names)]
coords = FIXED_LOCATIONS.get(stype, {}).get(name, (BASE_LAT, BASE_LON))
locs.append({
"lat": round(coords[0], 6),
"lon": round(coords[1], 6),
"name": name,
})
return locs
SENSOR_LOCATIONS: dict[str, list[dict]] = {}
for stype, count in SENSOR_COUNTS.items():
SENSOR_LOCATIONS[stype] = _build_locs(stype, count)
# Ranges par type
SENSOR_RANGES: dict[str, dict] = {
"traffic": {"vehicle_count":(10,150),"average_speed_kmh":(10,80),
"congestion_level":(0,5),"occupancy_percent":(0,100)},
"airquality": {"pm25_ugm3":(5,80),"pm10_ugm3":(10,150),"no2_ugm3":(5,60),
"o3_ugm3":(20,120),"co_mgm3":(0.1,5.0),
"temperature_celsius":(20,35),"humidity_percent":(40,95)},
"parking": {"total_spots":(50,500),"available_spots":(0,500),
"occupancy_percent":(0,100),"turnover_per_hour":(5,50)},
"noise": {"noise_level_db":(40,95),"peak_db":(60,110)},
"weather": {"temperature_celsius":(22,34),"humidity_percent":(50,95),
"wind_speed_kmh":(0,50),"pressure_hpa":(1005,1025),
"rain_mm":(0,20),"uv_index":(0,11)},
"light": {"brightness_lux":(0,100000),"power_consumption_w":(0,500)},
}
NOISE_CATEGORIES = ["quiet","moderate","loud","very_loud"]
LIGHT_STATUSES = ["on","off","dimmed","auto"]
# =============================================================================
# Capteurs déclarés
# =============================================================================
SENSORS: dict[str, dict] = {}
counter = 0
for stype, locs in SENSOR_LOCATIONS.items():
for loc in locs:
sid = f"{stype}_{counter:03d}"
SENSORS[sid] = {"type": stype, "lat": loc["lat"], "lon": loc["lon"], "name": loc["name"]}
counter += 1
# =============================================================================
# Payload NGSI-LD pour Orion-LD / Stellio
# =============================================================================
# Contextes NGSI-LD : core + Smart Data Models
# https://smartdatamodels.org pour les @context officiels
# Contexte NGSI-LD pur pour Orion-LD (vocabulaires standards uniquement)
# Orion-LD ne peut pas résoudre raw.githubusercontent.com — utiliser uri.etsi.org uniquement
ORION_CONTEXT = [
"https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
]
# Mapping sensor type → Smart Data Model type NGSI-LD
SMART_MODEL_MAPPING = {
"airquality": "AirQualityObserved",
"traffic": "TrafficFlowObserved",
"parking": "OffStreetParking",
"noise": "NoiseLevelObserved",
"weather": "WeatherObserved",
"light": "Device",
}
FROST_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"}
# Cache FROST : éviter de recréer Thing/Datastream
_frost_cache: dict[str, tuple[str, str]] = {} # (sid, field) -> (thing_id, ds_id)
# Contexte NGSI-LD pur pour Stellio et Orion-LD (vocabulaires standards uniquement)
# Stellio et Orion-LD embarquent le contexte core NGSI-LD : https://uri.etsi.org/ngsi-ld/
# On n'utilise PAS les vocabulaires smartdatamodels.org distants (inaccessibles depuis les containers)
# Les types d'entité Smart Data Models (AirQualityObserved, etc.) sont reconnus par leur nom
# Les propriétés spécifiques sont stockées telles quelles (vocabulaire libre)
STELLIO_INLINE_CONTEXT = [
"https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
]
def _ngsi_payload(sid: str, sensor: dict, context: list | dict = ORION_CONTEXT, source: str = "simulator", topic: str = "") -> dict:
"""Construit un payload NGSI-LD avec Smart Data Models officiels."""
stype = sensor["type"]
model_type = SMART_MODEL_MAPPING.get(stype, "Device")
now = datetime.now(timezone.utc).isoformat()
# Attributs communs à tous les modèles
payload = {
"@context": context,
"id": f"urn:ngsi-ld:{model_type}:{sid}",
"type": model_type,
"dateObserved": {"type": "Property", "value": now},
"location": {"type": "GeoProperty",
"value": {"type": "Point",
"coordinates": [sensor["lon"], sensor["lat"]]}},
"name": {"type": "Property", "value": sensor["name"]},
"batteryLevel": {"type": "Property", "value": random.randint(60, 100)},
# NOUVEAU: Traçabilité MQTT (Conforme NGSI-LD)
# "source" est un champ standard NGSI-LD (ETSI)
# "mqttTopic" est une propriété personnalisée (étendue autorisée)
"source": {"type": "Property", "value": source},
"mqttTopic": {"type": "Property", "value": topic},
}
# Attributs spécifiques par type de modèle
ranges = SENSOR_RANGES.get(stype, {})
props = {}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
props[field] = {"type": "Property", "value": round(random.uniform(lo, hi), 1)}
elif isinstance(val_range, list):
props[field] = {"type": "Property", "value": random.choice(val_range)}
# Mapping vers les noms d'attributs Smart Data Models
if stype == "airquality":
if "pm25_ugm3" in props: payload["NO2"] = props.pop("pm25_ugm3") # Simplifié
if "pm10_ugm3" in props: payload["PM10"] = props.pop("pm10_ugm3")
if "no2_ugm3" in props: payload["NO2"] = props.pop("no2_ugm3")
if "o3_ugm3" in props: payload["O3"] = props.pop("o3_ugm3")
if "co_mgm3" in props: payload["CO"] = props.pop("co_mgm3")
if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius")
if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent")
elif stype == "traffic":
if "vehicle_count" in props: payload["vehicleCount"] = props.pop("vehicle_count")
if "average_speed_kmh" in props: payload["averageVehicleSpeed"] = props.pop("average_speed_kmh")
if "congestion_level" in props: payload["congestion"] = props.pop("congestion_level")
if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent")
elif stype == "parking":
if "available_spots" in props: payload["availableSpotNumber"] = props.pop("available_spots")
if "total_spots" in props: payload["totalSpotNumber"] = props.pop("total_spots")
if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent")
if "turnover_per_hour" in props: payload["turnover"] = props.pop("turnover_per_hour")
elif stype == "noise":
if "noise_level_db" in props: payload["noiseLevel"] = props.pop("noise_level_db")
if "peak_db" in props: payload["noisePeak"] = props.pop("peak_db")
payload["noiseCategory"] = {"type": "Property", "value": random.choice(NOISE_CATEGORIES)}
elif stype == "weather":
if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius")
if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent")
if "rain_mm" in props: payload["rainfall"] = props.pop("rain_mm")
if "uv_index" in props: payload["uvIndex"] = props.pop("uv_index")
if "wind_speed_kmh" in props: payload["windSpeed"] = props.pop("wind_speed_kmh")
elif stype == "light":
if "brightness_lux" in props: payload["illuminance"] = props.pop("brightness_lux")
if "power_consumption_w" in props: payload["power"] = props.pop("power_consumption_w")
payload["status"] = {"type": "Property", "value": random.choice(LIGHT_STATUSES)}
return payload
def _frost_payload(sid: str, sensor: dict, source: str = "simulator", topic: str = "") -> dict:
"""Construit un payload SensorThings pour FROST-Server."""
stype = sensor["type"]
ranges = SENSOR_RANGES.get(stype, {})
datastreams = []
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)) and isinstance(hi, (int, float)):
val = round(random.uniform(lo, hi), 1)
unit = "http://www.qudt.org/vocab/unit#DegreeCelsius"
obs_prop = {
"name": f"{field} Observation",
"description": f"Observation of {field}",
"definition": unit,
}
sensor_data = {
"name": f"Sensor {sid} {field}",
"description": f"Sensor {sid} measuring {field}",
"encodingType": "http://www.opengis.net/doc/IS/SensorML/2.0",
"metadata": {"unit": unit},
}
ds = {
"name": f"Datastream {stype}/{field}",
"description": f"Datastream for {stype} sensor {sid} - {field}",
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
"unitOfMeasurement": {"name": field, "symbol": "", "definition": unit},
"Sensor": sensor_data,
"ObservedProperty": obs_prop,
}
datastreams.append((field, ds, val))
thing_payload = {
"name": f"Thing_{sid}",
"description": f"Smart City {stype} sensor in Martinique",
"properties": {
"sensorType": stype,
"region": "Martinique",
"source": source, # Traçabilité
"mqttTopic": topic # Traçabilité
},
}
return thing_payload, datastreams
# =============================================================================
# HTTP helper
# =============================================================================
def _http_post(url: str, data: dict, headers: dict) -> str:
"""POST et retourne 'ok' ou 'created' (ou '' si échec)."""
try:
body = json.dumps(data).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=8) as resp:
if resp.status == 204:
return 'created' # No Content — succès
if resp.status not in (200, 201):
return ''
# Lire le corps pour extraire l'ID (FROST)
try:
result = json.loads(resp.read())
if '@iot.selfLink' in result:
link = result['@iot.selfLink']
return link.split('(')[1].rstrip(')')
if '@iot.id' in result:
return str(result['@iot.id'])
except Exception:
pass
location = resp.headers.get('Location', '')
if location:
return location.split('(')[1].rstrip(')') if '(' in location else ''
return 'created'
except urllib.error.HTTPError as e:
# Lire le corps de l'erreur pour debug
try:
err_body = e.read().decode()[:200]
except Exception:
err_body = str(e)
print(f" ⚠️ HTTP POST {url}{e.code}: {err_body}")
return ''
except Exception as e:
print(f" ⚠️ HTTP POST {url}{e}")
return ''
def _http_put(url: str, data: dict, headers: dict) -> bool:
try:
body = json.dumps(data).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="PUT")
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status in (200, 204)
except urllib.error.HTTPError as e:
if e.code == 409:
return True # Already exists - that's fine
print(f" ⚠️ HTTP PUT {url}{e}")
return False
except Exception as e:
print(f" ⚠️ HTTP PUT {url}{e}")
return False
# =============================================================================
# MQTT Client multi-broker
# =============================================================================
class MultiMQTT:
def __init__(self):
self.clients: dict[str, mqtt.Client] = {}
self.ok: dict[str, bool] = {}
self._lock = threading.Lock()
self._setup()
def _mk_client(self, name: str, host: str, port: int,
tls: bool = False, user: str = "", pwd: str = "",
ws: bool = False) -> mqtt.Client:
cid = f"smartcity-sim-{name}-{os.getpid()}"
c = mqtt.Client(client_id=cid, protocol=mqtt.MQTTv311)
if user:
c.username_pw_set(user, pwd)
if tls:
c.tls_set(cert_reqs=ssl.CERT_NONE)
c.tls_insecure_set(True)
if ws:
c.ws_set(b"/mqtt")
c.on_connect = lambda _c, _, __, rc: self._on_connect(name, rc)
c.on_disconnect = lambda _c, _, __: self._on_disconnect(name)
try:
c.connect(host, port, keepalive=30)
c.loop_start()
except Exception as e:
print(f"[MQTT] ❌ {name} @ {host}:{port}{e}")
self.ok[name] = False
return c
def _on_connect(self, name: str, rc: int):
with self._lock:
if rc == 0:
self.ok[name] = True
print(f"[MQTT] ✅ {name} connecté")
else:
self.ok[name] = False
print(f"[MQTT] ❌ {name} rc={rc}")
def _on_disconnect(self, name: str):
with self._lock:
self.ok[name] = False
print(f"[MQTT] ⚠️ {name} déconnecté")
def _setup(self):
# Utiliser les variables d'environnement pour les brokers
brokers = [
("EMQX", EMQX_HOST, EMQX_PORT, False, "", ""),
("Mosquitto", MOSQUITTO_HOST, MOSQUITTO_PORT, False, "bunker", "bunker"),
("BunkerM", BUNKERM_HOST, BUNKERM_PORT, False, "bunker", "bunker"), # Port 1900 = MQTT simple, pas TLS
]
print("[MQTT] 🔌 Connexion aux brokers...")
for name, host, port, tls, user, pwd in brokers:
c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd)
self.clients[name] = c
self.ok[name] = False
time.sleep(3) # Attend les connexions
def publish(self, topic: str, payload: str) -> dict[str, bool]:
results = {}
with self._lock:
for name, client in self.clients.items():
if self.ok.get(name, False):
try:
r = client.publish(topic, payload, qos=1)
results[name] = (r.rc == mqtt.MQTT_ERR_SUCCESS)
except Exception:
results[name] = False
else:
results[name] = False
return results
def stop(self):
for name, c in self.clients.items():
try:
c.loop_stop()
c.disconnect()
except Exception:
pass
# =============================================================================
# URLs de base (résolues au démarrage)
# =============================================================================
ORION_HOST = "localhost"
ORION_PORT = "2026"
ORION_URL = f"http://{ORION_HOST}:{ORION_PORT}"
STELLIO_URL = os.environ.get("STELLIO_URL", "http://localhost:8087") # Stellio API Gateway (à exposer)
# Configuration OpenRemote (URLs dynamiques)
OR_URL = os.environ.get("OR_URL", "http://localhost:8080") # OpenRemote Manager (Traefik)
OR_REALM = os.environ.get("OR_REALM", "smartcity") # Default: smartcity
OR_TOKEN_URL = os.environ.get("OR_TOKEN_URL", "http://localhost:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token")
OR_TOKEN_TTL = int(os.environ.get("OR_TOKEN_TTL", "3600")) # Refresh token every hour
STELLIO_TENANT = os.environ.get("STELLIO_TENANT", "urn:ngsi-ld:tenant:default")
def publish_stellio(sid: str, sensor: dict) -> bool:
"""Publie sur Stellio via Traefik (gère le 409)."""
# Topic MQTT correspondant (pour traçabilité)
stype = sensor["type"]
topic = f"city/sensors/{stype}/{sid}"
entity = _ngsi_payload(sid, sensor, context=STELLIO_INLINE_CONTEXT, source="simulator", topic=topic)
# Stellio a besoin du @context pour résoudre les vocabulaires NGSI-LD
# (uri.etsi.org résolu depuis le JAR embarqué)
url = f"{STELLIO_URL}/ngsi-ld/v1/entities"
headers = {
"Content-Type": "application/ld+json",
"Accept": "application/ld+json",
"NGSILD-Tenant": STELLIO_TENANT,
}
try:
body = json.dumps(entity).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=8) as resp:
print(f" 🏢 Stellio: ✅ (HTTP {resp.status})")
return True
except urllib.error.HTTPError as e:
if e.code == 409: # Already exists, do update with PUT
try:
entity_id = urllib.parse.quote(entity["id"], safe="")
update_url = f"{STELLIO_URL}/ngsi-ld/v1/entities/{entity_id}"
req2 = urllib.request.Request(update_url, data=body, headers=headers, method="PUT")
with urllib.request.urlopen(req2, timeout=8) as resp2:
print(f" 🏢 Stellio: ✅ (HTTP {resp2.status} updated)")
return True
except Exception as e2:
print(f" ⚠️ Stellio update failed: {e2}")
return False
try:
err = e.read().decode()[:300]
except Exception:
err = str(e)
print(f" ⚠️ Stellio → {e.code}: {err}")
return False
except Exception as e:
print(f" ⚠️ Stellio → {e}")
return False
def publish_orion(sid: str, sensor: dict) -> bool:
"""Publie sur Orion-LD (POST create, PATCH update)."""
# Topic MQTT correspondant (pour traçabilité)
stype = sensor["type"]
topic = f"city/sensors/{stype}/{sid}"
entity = _ngsi_payload(sid, sensor, source="simulator", topic=topic)
# Orion-LD est exposé sur localhost:2026 (hôte)
base = "http://localhost:2026/ngsi-ld/v1"
# 1. Essayer de créer (POST)
try:
body = json.dumps(entity).encode()
req = urllib.request.Request(f"{base}/entities", data=body,
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="POST")
with urllib.request.urlopen(req, timeout=8) as resp:
print(f" 🌐 Orion-LD: ✅ (HTTP {resp.status} created)")
return True
except urllib.error.HTTPError as e:
if e.code != 409:
print(f" ⚠️ Orion-LD → {e.code}: {e.read().decode()[:200]}")
return False
# 409 = déjà existant → PATCH
# 2. Déjà existant (409) → PATCH sur les attributs (avec @context complet requis par Orion-LD)
try:
# Orion-LD exige @context même dans le PATCH
eid = urllib.parse.quote(entity['id'], safe='')
patch_url = f"{base}/entities/{eid}/attrs"
req2 = urllib.request.Request(patch_url, data=body,
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="PATCH")
with urllib.request.urlopen(req2, timeout=8) as resp2:
print(f" 🌐 Orion-LD: ✅ (HTTP {resp2.status} updated)")
return True
except Exception as e2:
print(f" ⚠️ Orion-LD PATCH failed: {e2}")
return False
def publish_bunkerm(sid: str, sensor: dict, values: dict) -> bool:
"""Publie sur BunkerM via HTTP API (port 2000) avec session."""
import base64, http.cookiejar, urllib.request, json
from datetime import datetime, timezone
host = "bunkerm_bunkerm_1:2000"
login_url = f"http://{host}/login"
data_url = f"http://{host}/api/sensors/data"
# 1. Cookie jar pour maintenir la session
cj = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
# 2. Authentification (Basic) pour obtenir le cookie de session
creds = base64.b64encode(b"bunker:bunker").decode()
auth_header = {"Authorization": f"Basic {creds}"}
try:
# GET sur /login pour initialiser la session
req_login = urllib.request.Request(login_url, headers=auth_header)
opener.open(req_login, timeout=5)
except Exception as e:
print(f" ⚠️ BunkerM login → {e}")
return False
# 3. Préparer le payload
payload = {
"sensor_id": sid,
"sensor_type": sensor["type"],
"timestamp": datetime.now(timezone.utc).isoformat(),
}
for k, v in values.items():
if k not in payload:
payload[k] = v
# 4. POST avec le cookie de session
data = json.dumps(payload).encode()
req = urllib.request.Request(
data_url,
data=data,
headers={**auth_header, "Content-Type": "application/json"},
method="POST"
)
try:
with opener.open(req, timeout=5) as resp:
print(f" ✅ BunkerM: HTTP {resp.status}")
return resp.status in (200, 201, 204)
except Exception as e:
print(f" ⚠️ BunkerM POST → {e}")
return False
def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool:
"""Crée le Thing (1 par capteur) + Datastreams, puis POST l'Observation."""
# Cache : {sid: (thing_id, {field: ds_id})}
if sid in _frost_cache:
thing_id, ds_map = _frost_cache[sid]
if field in ds_map:
ds_id = ds_map[field]
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
obs = {
"resultTime": datetime.now(timezone.utc).isoformat(),
"result": value,
"FeatureOfInterest": {
"name": f"Location {sid}",
"description": f"Feature of interest for sensor {sid}",
"encodingType": "application/vnd.geo+json",
"feature": {
"type": "Point",
"coordinates": [sensor.get("lon", -61.0), sensor.get("lat", 14.6)]
}
}
}
if _http_post(obs_url, obs, FROST_HEADERS):
print(f" ✅ FROST Observation {sid}/{field} → OK (cached)")
return True
else:
print(f" ⚠️ FROST Observation {sid}/{field} → échec")
return False
# Premier appel pour ce capteur : créer Thing + tous les Datastreams
# Topic MQTT pour traçabilité
stype = sensor["type"]
topic = f"city/sensors/{stype}/{sid}"
thing_payload, datastreams = _frost_payload(sid, sensor, source="simulator", topic=topic)
print(f" 📊 FROST: POST Thing {sid}...")
tid = _http_post(f"{FROST_URL}/Things", thing_payload, FROST_HEADERS)
if not tid:
print(f" ⚠️ FROST Thing {sid} → échec création")
return False
print(f" ✅ FROST Thing {sid} créé (ID: {tid})")
# Créer les Datastreams
ds_map = {}
for f, ds, _ in datastreams:
ds["Thing"] = {"@iot.id": tid}
print(f" 📊 FROST: POST Datastream {sid}/{f}...")
ds_id = _http_post(f"{FROST_URL}/Datastreams", ds, FROST_HEADERS)
if ds_id:
print(f" ✅ FROST Datastream {sid}/{f} créé (ID: {ds_id})")
ds_map[f] = ds_id
else:
print(f" ⚠️ FROST Datastream {sid}/{f} → échec")
_frost_cache[sid] = (tid, ds_map)
# Poster l'observation pour le field actuel
if field in ds_map:
ds_id = ds_map[field]
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
obs = {"resultTime": datetime.now(timezone.utc).isoformat(), "result": value}
if _http_post(obs_url, obs, FROST_HEADERS):
print(f" ✅ FROST Observation {sid}/{field} → OK")
return True
return False
# =============================================================================
# OpenRemote REST
# =============================================================================
_or_token_cache = {"token": "", "expires": 0}
def _get_or_token() -> str:
"""Obtain an OpenRemote token via password grant (admin user)."""
import time, urllib.parse
if _or_token_cache["token"] and _or_token_cache["expires"] > time.time() + 60:
return _or_token_cache["token"]
try:
# Use password grant with admin user (full rights)
token_url = f"http://localhost:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token"
client_id = os.environ.get("OR_CLIENT_ID", "openremote")
client_secret = os.environ.get("OR_CLIENT_SECRET", "QVTnyObwXdpQ0Vuc60kFSonidK49FiXb")
data = urllib.parse.urlencode({
"grant_type": "password",
"username": os.environ.get("OR_ADMIN_USER", "admin"),
"password": os.environ.get("OR_ADMIN_PASS", "Digitribe972"),
"client_id": client_id,
"client_secret": client_secret
}).encode()
req = urllib.request.Request(
token_url,
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
with urllib.request.urlopen(req, timeout=5) as r:
token_data = json.loads(r.read().decode())
_or_token_cache["token"] = token_data["access_token"]
_or_token_cache["expires"] = time.time() + token_data.get("expires_in", 300) - 60
return _or_token_cache["token"]
except Exception as e:
print(f" ⚠️ OpenRemote token → {e}")
return ""
def _or_put(asset_id: str, payload: dict) -> bool:
"""PUT update sur un asset OpenRemote (avec version)."""
token = _get_or_token()
if not token:
return False
try:
body = json.dumps(payload).encode()
url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}"
req = urllib.request.Request(url, data=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"If-Match": str(payload.get("version", 1)),
},
method="PUT")
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status in (200, 204)
except urllib.error.HTTPError as e:
print(f" ⚠️ OR PUT {asset_id} → HTTP {e.code}")
return False
except Exception as e:
print(f" ⚠️ OR PUT {asset_id}{e}")
return False
def publish_openremote(sid: str, sensor: dict, values: dict) -> bool:
"""Met à jour les attributs d'un asset OpenRemote via REST."""
# Mapping sid → asset ID (créés manuellement dans OR)
ASSET_MAP = {
"traffic_000": "7b5c5670d1b84865ba3ac7", # Traffic Fort-de-France Centre
"traffic_001": "557f6e993b994d3cb81017", # Traffic Fort-de-France North
"traffic_002": "cb81dfd2d2dc4d25adc9c2", # Traffic Fort-de-France South
"airquality_000": "a51c982a2d3e451898b978", # Air Quality Fort-de-France
"parking_000": "b8f0df19e0af47b386ebb9", # Parking Fort-de-France Centre
"noise_000": "9035103d1866454fb7e451", # Noise Fort-de-France Centre
"weather_000": "b9de80905ac640f488ab27", # Weather Lamentin Airport
"light_000": "ee7823a41e594851ba202f", # Light Fort-de-France
}
asset_id = ASSET_MAP.get(sid)
if not asset_id:
return False
# Construire les attributs à jour
now = datetime.now(timezone.utc).isoformat()
attrs = {
"timestamp": {"type": "DateTime", "value": now, "timestamp": now},
"battery_level": {"type": "Number", "value": random.randint(60, 100), "timestamp": now},
}
# Mapper les valeurs du payload vers les attributs OR
field_map = {
"temperature_celsius": "temperature",
"humidity_percent": "humidity",
"noise_level_db": "noiseLevel",
"pm25_ugm3": "airQuality",
"vehicle_count": "trafficFlow",
"available_spots": "parking",
"brightness_lux": "light",
"flood": "flood",
}
for field, val in values.items():
attr_name = field_map.get(field, field)
attrs[attr_name] = {
"type": "Number",
"value": val,
"timestamp": now,
"unit": "µg/m³" if "ugm3" in field else ("°C" if "celsius" in field else ("%" if "percent" in field or "humidity" in field else ("dB" if "db" in field else ""))),
}
payload = {
"id": asset_id,
"name": sensor["name"],
"type": "IOTSensor",
"realm": OR_REALM,
"attributes": attrs,
}
return _or_put(asset_id, payload)
# =============================================================================
# Pulsar — HTTP REST Producer
# API: POST http://host:8080/admin/v2/persistent/public/default/{topic}/produce
# Payload: {"messages": [{"payload": "<base64>", "properties": {...}}]}
# Topics auto-créés par le premier message (Pulsar standalone)
# =============================================================================
_pulsar_session = None
def _get_pulsar_session():
global _pulsar_session
if _pulsar_session is None:
import urllib.request
_pulsar_session = urllib.request
return _pulsar_session
def _init_pulsar() -> bool:
"""Teste la connectivité Pulsar au démarrage."""
try:
import urllib.request
req = urllib.request.Request(f"{PULSAR_BASE}/admin/v2/clusters")
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status == 200:
print(f"[PULSAR] ✅ Connected to {PULSAR_BASE}")
return True
except Exception as e:
print(f"[PULSAR] ⚠️ Cannot reach {PULSAR_BASE}: {e}")
return False
def publish_pulsar(sid: str, sensor: dict, payload: dict) -> bool:
"""Publie un message sur Pulsar via le client Python (port binaire 6650)."""
stype = sensor["type"]
topic = f"persistent://public/default/smartcity-{stype}"
try:
import pulsar
# Utiliser le client Pulsar binaire (socket 6650)
client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650")
producer = client.create_producer(topic)
body = json.dumps(payload, ensure_ascii=False).encode()
producer.send(body, properties={"sensor_id": sid, "source": "simulator"})
client.close()
return True
except Exception as e:
print(f" ⚠️ Pulsar → {e}")
return False
# =============================================================================
# Redpanda / Kafka — HTTP REST Proxy
# API: POST http://host:8082/topics/{topic}
# Payload: {"records": [{"value": "<base64>"}]}
# Topics auto-créés par le premier message (Redpanda)
# =============================================================================
_redpanda_session = None
def _get_redpanda_session():
global _redpanda_session
if _redpanda_session is None:
import urllib.request
_redpanda_session = urllib.request
return _redpanda_session
def _init_redpanda() -> bool:
"""Teste la connectivité Redpanda au démarrage."""
try:
import urllib.request
req = urllib.request.Request(f"{REDPANDA_BASE}/v1/status/alive")
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status == 200:
print(f"[REDPANDA] ✅ Connected to {REDPANDA_BASE}")
return True
except Exception as e:
print(f"[REDPANDA] ⚠️ Cannot reach {REDPANDA_BASE}: {e}")
return False
def publish_redpanda(sid: str, sensor: dict, payload: dict) -> bool:
"""Publie un message sur Redpanda/Kafka via le REST Proxy."""
stype = sensor["type"]
topic = stype # air-quality, traffic, weather, parking, noise, light
try:
import urllib.request, base64
body = json.dumps(payload, ensure_ascii=False)
b64 = base64.b64encode(body.encode()).decode()
record = {
"records": [{"value": b64}]
}
url = f"{REDPANDA_BASE}/topics/{topic}"
req = urllib.request.Request(
url,
data=json.dumps(record).encode(),
headers={"Content-Type": "application/vnd.kafka.json.v2+json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=8) as resp:
return resp.status in (200, 201, 204)
except urllib.error.HTTPError as e:
print(f" ⚠️ Redpanda → {e.code}")
return False
except Exception as e:
print(f" ⚠️ Redpanda → {e}")
return False
def publish_influx(sid: str, sensor: dict, values: dict) -> bool:
"""Write sensor data to InfluxDB (async, non-blocking)."""
if not _influx_write_api:
return False
def _write_async():
try:
stype = sensor["type"]
lat = sensor.get("lat", BASE_LAT)
lon = sensor.get("lon", BASE_LON)
points = []
for field, value in values.items():
if isinstance(value, (int, float)):
p = influxdb_client.Point(stype)\
.tag("sensor_id", sid)\
.tag("location", sensor.get("name", sid))\
.field(field, float(value))\
.field("lat", float(lat))\
.field("lon", float(lon))
points.append(p)
if points:
_influx_write_api.write(bucket=INFLUX_BUCKET, record=points)
print(f" 📈 InfluxDB: {len(points)} points written")
except Exception as e:
print(f" ⚠️ InfluxDB → {e}")
# Exécution asynchrone (non-bloquante)
t = threading.Thread(target=_write_async, daemon=True)
t.start()
return True
def main():
print("╔══════════════════════════════════════════════════╗")
print("║ Smart City Simulator — Martinique ║")
print("╚══════════════════════════════════════════════════╝")
print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s")
print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}")
print(f"[CFG] InfluxDB: {ENABLE_INFLUX} | Pulsar: {ENABLE_PULSAR} | Redpanda: {ENABLE_REDPANDA}")
# Init connectivity checks
if ENABLE_PULSAR:
_init_pulsar()
# Test immédiat
print(f" 🌪️ DEBUG: Test Pulsar direct...", flush=True)
test_payload = {"type": "test", "value": 123}
test_result = publish_pulsar("test_001", {"type": "air-quality"}, test_payload)
print(f" 🌪️ DEBUG: Test Pulsar result: {test_result}", flush=True)
if ENABLE_REDPANDA:
_init_redpanda()
mqtt_client = MultiMQTT()
running = True
def signal_handler(*_):
nonlocal running
running = False
print("\n[SIM] 🛑 Arrêt...")
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
iteration = 0
while running:
iteration += 1
ts = datetime.now().strftime("%H:%M:%S")
print(f"\n[SIM] ⏱️ It #{iteration}{ts}")
for sid, sensor in SENSORS.items():
stype = sensor["type"]
topic = f"city/sensors/{stype}/{sid}"
# --- Payload MQTT ---
ranges = SENSOR_RANGES.get(stype, {})
payload_mqtt = {
"id": sid,
"type": stype,
"name": sensor["name"],
"lat": sensor["lat"],
"lon": sensor["lon"],
"timestamp": datetime.now(timezone.utc).isoformat(),
"battery_level": random.randint(60, 100),
}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
payload_mqtt[field] = round(random.uniform(lo, hi), 1)
elif isinstance(val_range, list):
payload_mqtt[field] = random.choice(val_range)
msg = json.dumps(payload_mqtt, ensure_ascii=False)
# --- MQTT publish ---
results = mqtt_client.publish(topic, msg)
ok_mqtt = [n for n, r in results.items() if r]
if ok_mqtt:
print(f" 📤 {topic}{','.join(ok_mqtt)}")
# Extraire les valeurs pour OpenRemote
or_values = {}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
or_values[field] = round(random.uniform(lo, hi), 1)
# --- OpenRemote REST ---
if ENABLE_OPENREMOTE:
ok_or = publish_openremote(sid, sensor, or_values)
print(f" 🏠 OpenRemote: {'' if ok_or else '⚠️ skipped'}")
# --- Orion-LD ---
if ENABLE_ORION:
ok_or = publish_orion(sid, sensor)
print(f" 🌐 Orion-LD: {'' if ok_or else '⚠️ skipped'}")
# --- Stellio ---
if ENABLE_STELLIO:
ok_st = publish_stellio(sid, sensor)
print(f" 🏢 Stellio: {'' if ok_st else ''}")
# --- FROST ---
if ENABLE_FROST:
ranges2 = SENSOR_RANGES.get(stype, {})
for field, val_range in ranges2.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
val = round(random.uniform(lo, hi), 1)
ok_fr = publish_frost(sid, sensor, field, val)
print(f" 📊 FROST: {'' if ok_fr else ''}")
# --- InfluxDB ---
if ENABLE_INFLUX:
influx_vals = {}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
influx_vals[field] = round(random.uniform(lo, hi), 1)
ok_influx = publish_influx(sid, sensor, influx_vals)
print(f" 📈 InfluxDB: {'' if ok_influx else ''}")
# --- Pulsar (HTTP REST) ---
if ENABLE_PULSAR:
print(f" 🌪️ DEBUG: calling publish_pulsar for {sid}, payload_mqtt exists: {bool(locals().get('payload_mqtt'))}", flush=True)
ok_pulsar = publish_pulsar(sid, sensor, payload_mqtt)
print(f" 🌪️ Pulsar: {'' if ok_pulsar else ''}")
# --- Redpanda (Kafka REST Proxy) ---
if ENABLE_REDPANDA:
ok_redpanda = publish_redpanda(sid, sensor, payload_mqtt)
print(f" 🐟 Redpanda: {'' if ok_redpanda else ''}")
# --- BunkerM HTTP ---
if os.getenv("BUNKERM_HTTP", "0") == "1":
ok_bunkerm = publish_bunkerm(sid, sensor, payload_mqtt)
print(f" 📦 BunkerM: {'' if ok_bunkerm else ''}")
print(f"[SIM] ✅ {len(SENSORS)} capteurs | MQTT OK: {sum(mqtt_client.ok.values())}/{len(mqtt_client.clients)} | OR: {ENABLE_OPENREMOTE}")
try:
time.sleep(INTERVAL)
except KeyboardInterrupt:
break
mqtt_client.stop()
print("[SIM] ✅ Arrêté proprement.")
if __name__ == "__main__":
main()