Files
smart-city-digital-twin-mar…/simulator.py

924 lines
41 KiB
Python

#!/usr/bin/env python3
"""
Smart City IoT Simulator — Martinique (14.6°N, 61.2°W)
=======================================================
Publie vers MULTIPLES brokers MQTT + context brokers NGSI-LD.
Brokers MQTT:
- EMQX: emqx_emqx_1:1883 (sans auth)
- Mosquitto: mosquitto-traefik:1883 (bunker/bunker)
- BunkerM: bunkerm_bunkerm_1:1900 (TLS, bunker/bunker)
- OpenRemote: openremote-manager-1:1883 (admin/Digitribe972)
Context Brokers REST:
- Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD)
- Stellio: stellio-api-gateway:8080 (NGSI-LD)
- FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings)
Variables d'environnement:
PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s)
BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France)
ENABLE_ORION=1 : activer Orion-LD (défaut: 1)
ENABLE_STELLIO=1 : activer Stellio (défaut: 1)
ENABLE_FROST=1 : activer FROST-Server (défaut: 1)
"""
import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse
import paho.mqtt.client as mqtt
import urllib.request, urllib.error
from datetime import datetime, timezone
from typing import Any
# InfluxDB support
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
# =============================================================================
# Configuration des brokers MQTT
# Configuration des brokers MQTT
# Par défaut localhost (simulateur tourne sur l'hôte)
EMQX_HOST = os.environ.get("EMQX_HOST", "localhost")
EMQX_PORT = int(os.environ.get("EMQX_PORT", "11883"))
MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "localhost")
MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883"))
BUNKERM_HOST = os.environ.get("BUNKERM_HOST", "mqtt.digitribe.fr")
BUNKERM_PORT = int(os.environ.get("BUNKERM_PORT", "1900"))
# =============================================================================
# Configuration
# =============================================================================
BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091"))
BASE_LON = float(os.environ.get("BASE_LON", "-61.2155"))
INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "10"))
ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1"
ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1") == "1"
ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1"
ENABLE_OPENREMOTE = os.environ.get("ENABLE_OPENREMOTE", "1") == "1"
OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin")
OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "Digitribe972")
OR_REALM = os.environ.get("OR_REALM", "smartcity")
OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token
FROST_URL = os.environ.get("FROST_URL", "http://localhost:8086/FROST-Server/v1.1") # Exposer frost_http-web-1:8080 -> host:8086
# InfluxDB config
ENABLE_INFLUX = os.environ.get("ENABLE_INFLUX", "1") == "1"
INFLUX_URL = os.environ.get("INFLUX_URL", "http://digital-twin-influxdb:8086")
INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe")
INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "iot_data")
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-secret-admin-token")
# Initialize InfluxDB client
_influx_client = None
_influx_write_api = None
if ENABLE_INFLUX:
try:
_influx_client = influxdb_client.InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
_influx_write_api = _influx_client.write_api(write_options=SYNCHRONOUS)
print(f"[INFLUX] ✅ Connected to {INFLUX_URL}")
except Exception as e:
print(f"[INFLUX] ❌ Connection failed: {e}")
SENSOR_COUNTS = {
"traffic": int(os.environ.get("SENSOR_COUNT_traffic", "3")),
"airquality": int(os.environ.get("SENSOR_COUNT_airquality", "2")),
"parking": int(os.environ.get("SENSOR_COUNT_parking", "2")),
"noise": int(os.environ.get("SENSOR_COUNT_noise", "1")),
"weather": int(os.environ.get("SENSOR_COUNT_weather", "1")),
"light": int(os.environ.get("SENSOR_COUNT_light", "1")),
}
# Si SENSOR_COUNT est défini, multiplier les counts de façon proportionnelle
_total_default = sum(SENSOR_COUNTS.values())
if "SENSOR_COUNT" in os.environ:
target = int(os.environ["SENSOR_COUNT"])
ratio = target / _total_default
for k in SENSOR_COUNTS:
SENSOR_COUNTS[k] = max(1, int(SENSOR_COUNTS[k] * ratio))
# =============================================================================
# Localisation des capteurs Martinique
# =============================================================================
SENSOR_LOCATIONS: dict[str, list[dict]] = {}
SENSOR_NAMES: dict[str, list[str]] = {
"traffic": ["Carrefour Central", "Avenue des Caraïbes", "Boulevard Pasteur",
"Rue des Flamboyants", "Place de la République"],
"airquality": ["Quartier Bonde", "Port de Fort-de-France", "Château Denis",
"Lamentin Aéroport", "Schoelcher Village"],
"parking": ["Parking Rivière-Saleé", "Parking Cluny", "Parking Média",
"Parking Grand-Camp", "Parking Dillon"],
"noise": ["Rue des Arts", "Marché Central", "Université Fort-de-France",
"Stade de Dillon", "Place du Champs de Mars"],
"weather": ["Station Météo Lamentin", "Station Schoelcher",
"Station Ajoupa-Bouillon", "Station Le François", "Station Le Robert"],
"light": ["Eclairage Rue des Mouettes", "Candela Boulevard",
"Lumiere Rue des Acacias", "Feux Signalisation Centre", "Eclairage Port"],
}
def _gen_locs(stype: str, count: int) -> list[dict]:
locs = []
for i in range(count):
lat = BASE_LAT + random.uniform(-0.05, 0.05)
lon = BASE_LON + random.uniform(-0.05, 0.05)
names = SENSOR_NAMES.get(stype, [stype])
locs.append({
"lat": round(lat, 6),
"lon": round(lon, 6),
"name": names[i % len(names)],
})
return locs
for stype, count in SENSOR_COUNTS.items():
SENSOR_LOCATIONS[stype] = _gen_locs(stype, count)
# Ranges par type
SENSOR_RANGES: dict[str, dict] = {
"traffic": {"vehicle_count":(10,150),"average_speed_kmh":(10,80),
"congestion_level":(0,5),"occupancy_percent":(0,100)},
"airquality": {"pm25_ugm3":(5,80),"pm10_ugm3":(10,150),"no2_ugm3":(5,60),
"o3_ugm3":(20,120),"co_mgm3":(0.1,5.0),
"temperature_celsius":(20,35),"humidity_percent":(40,95)},
"parking": {"total_spots":(50,500),"available_spots":(0,500),
"occupancy_percent":(0,100),"turnover_per_hour":(5,50)},
"noise": {"noise_level_db":(40,95),"peak_db":(60,110)},
"weather": {"temperature_celsius":(22,34),"humidity_percent":(50,95),
"wind_speed_kmh":(0,50),"pressure_hpa":(1005,1025),
"rain_mm":(0,20),"uv_index":(0,11)},
"light": {"brightness_lux":(0,100000),"power_consumption_w":(0,500)},
}
NOISE_CATEGORIES = ["quiet","moderate","loud","very_loud"]
LIGHT_STATUSES = ["on","off","dimmed","auto"]
# =============================================================================
# Capteurs déclarés
# =============================================================================
SENSORS: dict[str, dict] = {}
counter = 0
for stype, locs in SENSOR_LOCATIONS.items():
for loc in locs:
sid = f"{stype}_{counter:03d}"
SENSORS[sid] = {"type": stype, "lat": loc["lat"], "lon": loc["lon"], "name": loc["name"]}
counter += 1
# =============================================================================
# Payload NGSI-LD pour Orion-LD / Stellio
# =============================================================================
# Contextes NGSI-LD : core + Smart Data Models
# https://smartdatamodels.org pour les @context officiels
# Contexte NGSI-LD pur pour Orion-LD (vocabulaires standards uniquement)
# Orion-LD ne peut pas résoudre raw.githubusercontent.com — utiliser uri.etsi.org uniquement
ORION_CONTEXT = [
"https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
]
# Mapping sensor type → Smart Data Model type NGSI-LD
SMART_MODEL_MAPPING = {
"airquality": "AirQualityObserved",
"traffic": "TrafficFlowObserved",
"parking": "OffStreetParking",
"noise": "NoiseLevelObserved",
"weather": "WeatherObserved",
"light": "Device",
}
FROST_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"}
# Cache FROST : éviter de recréer Thing/Datastream
_frost_cache: dict[str, tuple[str, str]] = {} # (sid, field) -> (thing_id, ds_id)
# Contexte NGSI-LD pur pour Stellio et Orion-LD (vocabulaires standards uniquement)
# Stellio et Orion-LD embarquent le contexte core NGSI-LD : https://uri.etsi.org/ngsi-ld/
# On n'utilise PAS les vocabulaires smartdatamodels.org distants (inaccessibles depuis les containers)
# Les types d'entité Smart Data Models (AirQualityObserved, etc.) sont reconnus par leur nom
# Les propriétés spécifiques sont stockées telles quelles (vocabulaire libre)
STELLIO_INLINE_CONTEXT = [
"https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
]
def _ngsi_payload(sid: str, sensor: dict, context: list | dict = ORION_CONTEXT, source: str = "simulator", topic: str = "") -> dict:
"""Construit un payload NGSI-LD avec Smart Data Models officiels."""
stype = sensor["type"]
model_type = SMART_MODEL_MAPPING.get(stype, "Device")
now = datetime.now(timezone.utc).isoformat()
# Attributs communs à tous les modèles
payload = {
"@context": context,
"id": f"urn:ngsi-ld:{model_type}:{sid}",
"type": model_type,
"dateObserved": {"type": "Property", "value": now},
"location": {"type": "GeoProperty",
"value": {"type": "Point",
"coordinates": [sensor["lon"], sensor["lat"]]}},
"name": {"type": "Property", "value": sensor["name"]},
"batteryLevel": {"type": "Property", "value": random.randint(60, 100)},
# NOUVEAU: Traçabilité MQTT (Conforme NGSI-LD)
# "source" est un champ standard NGSI-LD (ETSI)
# "mqttTopic" est une propriété personnalisée (étendue autorisée)
"source": {"type": "Property", "value": source},
"mqttTopic": {"type": "Property", "value": topic},
}
# Attributs spécifiques par type de modèle
ranges = SENSOR_RANGES.get(stype, {})
props = {}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
props[field] = {"type": "Property", "value": round(random.uniform(lo, hi), 1)}
elif isinstance(val_range, list):
props[field] = {"type": "Property", "value": random.choice(val_range)}
# Mapping vers les noms d'attributs Smart Data Models
if stype == "airquality":
if "pm25_ugm3" in props: payload["NO2"] = props.pop("pm25_ugm3") # Simplifié
if "pm10_ugm3" in props: payload["PM10"] = props.pop("pm10_ugm3")
if "no2_ugm3" in props: payload["NO2"] = props.pop("no2_ugm3")
if "o3_ugm3" in props: payload["O3"] = props.pop("o3_ugm3")
if "co_mgm3" in props: payload["CO"] = props.pop("co_mgm3")
if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius")
if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent")
elif stype == "traffic":
if "vehicle_count" in props: payload["vehicleCount"] = props.pop("vehicle_count")
if "average_speed_kmh" in props: payload["averageVehicleSpeed"] = props.pop("average_speed_kmh")
if "congestion_level" in props: payload["congestion"] = props.pop("congestion_level")
if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent")
elif stype == "parking":
if "available_spots" in props: payload["availableSpotNumber"] = props.pop("available_spots")
if "total_spots" in props: payload["totalSpotNumber"] = props.pop("total_spots")
if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent")
if "turnover_per_hour" in props: payload["turnover"] = props.pop("turnover_per_hour")
elif stype == "noise":
if "noise_level_db" in props: payload["noiseLevel"] = props.pop("noise_level_db")
if "peak_db" in props: payload["noisePeak"] = props.pop("peak_db")
payload["noiseCategory"] = {"type": "Property", "value": random.choice(NOISE_CATEGORIES)}
elif stype == "weather":
if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius")
if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent")
if "rain_mm" in props: payload["rainfall"] = props.pop("rain_mm")
if "uv_index" in props: payload["uvIndex"] = props.pop("uv_index")
if "wind_speed_kmh" in props: payload["windSpeed"] = props.pop("wind_speed_kmh")
elif stype == "light":
if "brightness_lux" in props: payload["illuminance"] = props.pop("brightness_lux")
if "power_consumption_w" in props: payload["power"] = props.pop("power_consumption_w")
payload["status"] = {"type": "Property", "value": random.choice(LIGHT_STATUSES)}
return payload
def _frost_payload(sid: str, sensor: dict, source: str = "simulator", topic: str = "") -> dict:
"""Construit un payload SensorThings pour FROST-Server."""
stype = sensor["type"]
ranges = SENSOR_RANGES.get(stype, {})
datastreams = []
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)) and isinstance(hi, (int, float)):
val = round(random.uniform(lo, hi), 1)
unit = "http://www.qudt.org/vocab/unit#DegreeCelsius"
obs_prop = {
"name": f"{field} Observation",
"description": f"Observation of {field}",
"definition": unit,
}
sensor_data = {
"name": f"Sensor {sid} {field}",
"description": f"Sensor {sid} measuring {field}",
"encodingType": "http://www.opengis.net/doc/IS/SensorML/2.0",
"metadata": {"unit": unit},
}
ds = {
"name": f"Datastream {stype}/{field}",
"description": f"Datastream for {stype} sensor {sid} - {field}",
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
"unitOfMeasurement": {"name": field, "symbol": "", "definition": unit},
"Sensor": sensor_data,
"ObservedProperty": obs_prop,
}
datastreams.append((field, ds, val))
thing_payload = {
"name": f"Thing_{sid}",
"description": f"Smart City {stype} sensor in Martinique",
"properties": {
"sensorType": stype,
"region": "Martinique",
"source": source, # Traçabilité
"mqttTopic": topic # Traçabilité
},
}
return thing_payload, datastreams
# =============================================================================
# HTTP helper
# =============================================================================
def _http_post(url: str, data: dict, headers: dict) -> str:
"""POST et retourne 'ok' ou 'created' (ou '' si échec)."""
try:
body = json.dumps(data).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=8) as resp:
if resp.status == 204:
return 'created' # No Content — succès
if resp.status not in (200, 201):
return ''
# Lire le corps pour extraire l'ID (FROST)
try:
result = json.loads(resp.read())
if '@iot.selfLink' in result:
link = result['@iot.selfLink']
return link.split('(')[1].rstrip(')')
if '@iot.id' in result:
return str(result['@iot.id'])
except Exception:
pass
location = resp.headers.get('Location', '')
if location:
return location.split('(')[1].rstrip(')') if '(' in location else ''
return 'created'
except urllib.error.HTTPError as e:
# Lire le corps de l'erreur pour debug
try:
err_body = e.read().decode()[:200]
except Exception:
err_body = str(e)
print(f" ⚠️ HTTP POST {url}{e.code}: {err_body}")
return ''
except Exception as e:
print(f" ⚠️ HTTP POST {url}{e}")
return ''
def _http_put(url: str, data: dict, headers: dict) -> bool:
try:
body = json.dumps(data).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="PUT")
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status in (200, 204)
except urllib.error.HTTPError as e:
if e.code == 409:
return True # Already exists - that's fine
print(f" ⚠️ HTTP PUT {url}{e}")
return False
except Exception as e:
print(f" ⚠️ HTTP PUT {url}{e}")
return False
# =============================================================================
# MQTT Client multi-broker
# =============================================================================
class MultiMQTT:
def __init__(self):
self.clients: dict[str, mqtt.Client] = {}
self.ok: dict[str, bool] = {}
self._lock = threading.Lock()
self._setup()
def _mk_client(self, name: str, host: str, port: int,
tls: bool = False, user: str = "", pwd: str = "",
ws: bool = False) -> mqtt.Client:
cid = f"smartcity-sim-{name}-{os.getpid()}"
c = mqtt.Client(client_id=cid, protocol=mqtt.MQTTv311)
if user:
c.username_pw_set(user, pwd)
if tls:
c.tls_set(cert_reqs=ssl.CERT_NONE)
c.tls_insecure_set(True)
if ws:
c.ws_set(b"/mqtt")
c.on_connect = lambda _c, _, __, rc: self._on_connect(name, rc)
c.on_disconnect = lambda _c, _, __: self._on_disconnect(name)
try:
c.connect(host, port, keepalive=30)
c.loop_start()
except Exception as e:
print(f"[MQTT] ❌ {name} @ {host}:{port}{e}")
self.ok[name] = False
return c
def _on_connect(self, name: str, rc: int):
with self._lock:
if rc == 0:
self.ok[name] = True
print(f"[MQTT] ✅ {name} connecté")
else:
self.ok[name] = False
print(f"[MQTT] ❌ {name} rc={rc}")
def _on_disconnect(self, name: str):
with self._lock:
self.ok[name] = False
print(f"[MQTT] ⚠️ {name} déconnecté")
def _setup(self):
# Utiliser les variables d'environnement pour les brokers
brokers = [
("EMQX", EMQX_HOST, EMQX_PORT, False, "", ""),
("Mosquitto", MOSQUITTO_HOST, MOSQUITTO_PORT, False, "bunker", "bunker"),
("BunkerM", BUNKERM_HOST, BUNKERM_PORT, False, "bunker", "bunker"), # Port 1900 = MQTT simple, pas TLS
]
print("[MQTT] 🔌 Connexion aux brokers...")
for name, host, port, tls, user, pwd in brokers:
c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd)
self.clients[name] = c
self.ok[name] = False
time.sleep(3) # Attend les connexions
def publish(self, topic: str, payload: str) -> dict[str, bool]:
results = {}
with self._lock:
for name, client in self.clients.items():
if self.ok.get(name, False):
try:
r = client.publish(topic, payload, qos=1)
results[name] = (r.rc == mqtt.MQTT_ERR_SUCCESS)
except Exception:
results[name] = False
else:
results[name] = False
return results
def stop(self):
for name, c in self.clients.items():
try:
c.loop_stop()
c.disconnect()
except Exception:
pass
# =============================================================================
# URLs de base (résolues au démarrage)
# =============================================================================
ORION_HOST = "localhost"
ORION_PORT = "2026"
ORION_URL = f"http://{ORION_HOST}:{ORION_PORT}"
STELLIO_URL = os.environ.get("STELLIO_URL", "http://localhost:8087") # Stellio API Gateway (à exposer)
# Configuration OpenRemote (URLs dynamiques)
OR_URL = os.environ.get("OR_URL", "http://localhost:8080") # OpenRemote Manager (Traefik)
OR_REALM = os.environ.get("OR_REALM", "smartcity") # Default: smartcity
OR_TOKEN_URL = os.environ.get("OR_TOKEN_URL", "http://localhost:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token")
OR_TOKEN_TTL = int(os.environ.get("OR_TOKEN_TTL", "3600")) # Refresh token every hour
STELLIO_TENANT = os.environ.get("STELLIO_TENANT", "urn:ngsi-ld:tenant:default")
def publish_stellio(sid: str, sensor: dict) -> bool:
"""Publie sur Stellio via Traefik (gère le 409)."""
# Topic MQTT correspondant (pour traçabilité)
stype = sensor["type"]
topic = f"city/sensors/{stype}/{sid}"
entity = _ngsi_payload(sid, sensor, context=STELLIO_INLINE_CONTEXT, source="simulator", topic=topic)
# Stellio a besoin du @context pour résoudre les vocabulaires NGSI-LD
# (uri.etsi.org résolu depuis le JAR embarqué)
url = f"{STELLIO_URL}/ngsi-ld/v1/entities"
headers = {
"Content-Type": "application/ld+json",
"Accept": "application/ld+json",
"NGSILD-Tenant": STELLIO_TENANT,
}
try:
body = json.dumps(entity).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=8) as resp:
print(f" 🏢 Stellio: ✅ (HTTP {resp.status})")
return True
except urllib.error.HTTPError as e:
if e.code == 409: # Already exists, do update with PUT
try:
entity_id = urllib.parse.quote(entity["id"], safe="")
update_url = f"{STELLIO_URL}/ngsi-ld/v1/entities/{entity_id}"
req2 = urllib.request.Request(update_url, data=body, headers=headers, method="PUT")
with urllib.request.urlopen(req2, timeout=8) as resp2:
print(f" 🏢 Stellio: ✅ (HTTP {resp2.status} updated)")
return True
except Exception as e2:
print(f" ⚠️ Stellio update failed: {e2}")
return False
try:
err = e.read().decode()[:300]
except Exception:
err = str(e)
print(f" ⚠️ Stellio → {e.code}: {err}")
return False
except Exception as e:
print(f" ⚠️ Stellio → {e}")
return False
def publish_orion(sid: str, sensor: dict) -> bool:
"""Publie sur Orion-LD (POST create, PATCH update)."""
import socket
# Topic MQTT correspondant (pour traçabilité)
stype = sensor["type"]
topic = f"city/sensors/{stype}/{sid}"
entity = _ngsi_payload(sid, sensor, source="simulator", topic=topic)
if not hasattr(publish_orion, "orion_ip"):
try:
publish_orion.orion_ip = socket.gethostbyname("fiware-gis-quickstart-orion-1")
except Exception:
publish_orion.orion_ip = "192.168.192.20"
base = f"http://{publish_orion.orion_ip}:1026/ngsi-ld/v1"
# 1. Essayer de créer (POST)
try:
body = json.dumps(entity).encode()
req = urllib.request.Request(f"{base}/entities", data=body,
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="POST")
with urllib.request.urlopen(req, timeout=8) as resp:
print(f" 🌐 Orion-LD: ✅ (HTTP {resp.status} created)")
return True
except urllib.error.HTTPError as e:
if e.code != 409:
print(f" ⚠️ Orion-LD → {e.code}: {e.read().decode()[:200]}")
return False
# 2. Déjà existant (409) → PATCH sur les attributs
try:
eid = urllib.parse.quote(entity['id'], safe='')
patch_url = f"{base}/entities/{eid}/attrs"
req2 = urllib.request.Request(patch_url, data=body,
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="PATCH")
with urllib.request.urlopen(req2, timeout=8) as resp2:
print(f" 🌐 Orion-LD: ✅ (HTTP {resp2.status} updated)")
return True
except Exception as e2:
print(f" ⚠️ Orion-LD → update failed: {e2}")
return False
except Exception as e:
print(f" ⚠️ Orion-LD → {e}")
return False
def publish_bunkerm(sid: str, sensor: dict, values: dict) -> bool:
"""Publie sur BunkerM via HTTP API (port 2000) avec session."""
import base64, http.cookiejar, urllib.request, json
from datetime import datetime, timezone
host = "bunkerm_bunkerm_1:2000"
login_url = f"http://{host}/login"
data_url = f"http://{host}/api/sensors/data"
# 1. Cookie jar pour maintenir la session
cj = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
# 2. Authentification (Basic) pour obtenir le cookie de session
creds = base64.b64encode(b"bunker:bunker").decode()
auth_header = {"Authorization": f"Basic {creds}"}
try:
# GET sur /login pour initialiser la session
req_login = urllib.request.Request(login_url, headers=auth_header)
opener.open(req_login, timeout=5)
except Exception as e:
print(f" ⚠️ BunkerM login → {e}")
return False
# 3. Préparer le payload
payload = {
"sensor_id": sid,
"sensor_type": sensor["type"],
"timestamp": datetime.now(timezone.utc).isoformat(),
}
for k, v in values.items():
if k not in payload:
payload[k] = v
# 4. POST avec le cookie de session
data = json.dumps(payload).encode()
req = urllib.request.Request(
data_url,
data=data,
headers={**auth_header, "Content-Type": "application/json"},
method="POST"
)
try:
with opener.open(req, timeout=5) as resp:
print(f" ✅ BunkerM: HTTP {resp.status}")
return resp.status in (200, 201, 204)
except Exception as e:
print(f" ⚠️ BunkerM POST → {e}")
return False
def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool:
"""Crée le Thing (1 par capteur) + Datastreams, puis POST l'Observation."""
# Cache : {sid: (thing_id, {field: ds_id})}
if sid in _frost_cache:
thing_id, ds_map = _frost_cache[sid]
if field in ds_map:
ds_id = ds_map[field]
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
obs = {
"resultTime": datetime.now(timezone.utc).isoformat(),
"result": value,
"FeatureOfInterest": {
"name": f"Location {sid}",
"description": f"Feature of interest for sensor {sid}",
"encodingType": "application/vnd.geo+json",
"feature": {
"type": "Point",
"coordinates": [sensor.get("lon", -61.0), sensor.get("lat", 14.6)]
}
}
}
if _http_post(obs_url, obs, FROST_HEADERS):
print(f" ✅ FROST Observation {sid}/{field} → OK (cached)")
return True
else:
print(f" ⚠️ FROST Observation {sid}/{field} → échec")
return False
# Premier appel pour ce capteur : créer Thing + tous les Datastreams
# Topic MQTT pour traçabilité
stype = sensor["type"]
topic = f"city/sensors/{stype}/{sid}"
thing_payload, datastreams = _frost_payload(sid, sensor, source="simulator", topic=topic)
print(f" 📊 FROST: POST Thing {sid}...")
tid = _http_post(f"{FROST_URL}/Things", thing_payload, FROST_HEADERS)
if not tid:
print(f" ⚠️ FROST Thing {sid} → échec création")
return False
print(f" ✅ FROST Thing {sid} créé (ID: {tid})")
# Créer les Datastreams
ds_map = {}
for f, ds, _ in datastreams:
ds["Thing"] = {"@iot.id": tid}
print(f" 📊 FROST: POST Datastream {sid}/{f}...")
ds_id = _http_post(f"{FROST_URL}/Datastreams", ds, FROST_HEADERS)
if ds_id:
print(f" ✅ FROST Datastream {sid}/{f} créé (ID: {ds_id})")
ds_map[f] = ds_id
else:
print(f" ⚠️ FROST Datastream {sid}/{f} → échec")
_frost_cache[sid] = (tid, ds_map)
# Poster l'observation pour le field actuel
if field in ds_map:
ds_id = ds_map[field]
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
obs = {"resultTime": datetime.now(timezone.utc).isoformat(), "result": value}
if _http_post(obs_url, obs, FROST_HEADERS):
print(f" ✅ FROST Observation {sid}/{field} → OK")
return True
return False
# =============================================================================
# OpenRemote REST
# =============================================================================
_or_token_cache = {"token": "", "expires": 0}
def _get_or_token() -> str:
"""Obtain an OpenRemote token via password grant (admin user)."""
import time, urllib.parse
if _or_token_cache["token"] and _or_token_cache["expires"] > time.time() + 60:
return _or_token_cache["token"]
try:
# Use password grant with admin user (full rights)
token_url = f"http://openremote-keycloak-1:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token"
client_id = os.environ.get("OR_CLIENT_ID", "openremote")
client_secret = os.environ.get("OR_CLIENT_SECRET", "QVTnyObwXdpQ0Vuc60kFSonidK49FiXb")
data = urllib.parse.urlencode({
"grant_type": "password",
"username": os.environ.get("OR_ADMIN_USER", "admin"),
"password": os.environ.get("OR_ADMIN_PASS", "Digitribe972"),
"client_id": client_id,
"client_secret": client_secret
}).encode()
req = urllib.request.Request(
token_url,
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
with urllib.request.urlopen(req, timeout=5) as r:
token_data = json.loads(r.read().decode())
_or_token_cache["token"] = token_data["access_token"]
_or_token_cache["expires"] = time.time() + token_data.get("expires_in", 300) - 60
return _or_token_cache["token"]
except Exception as e:
print(f" ⚠️ OpenRemote token → {e}")
return ""
def _or_put(asset_id: str, payload: dict) -> bool:
"""PUT update sur un asset OpenRemote (avec version)."""
token = _get_or_token()
if not token:
return False
try:
body = json.dumps(payload).encode()
url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}"
req = urllib.request.Request(url, data=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"If-Match": str(payload.get("version", 1)),
},
method="PUT")
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status in (200, 204)
except urllib.error.HTTPError as e:
print(f" ⚠️ OR PUT {asset_id} → HTTP {e.code}")
return False
except Exception as e:
print(f" ⚠️ OR PUT {asset_id}{e}")
return False
def publish_openremote(sid: str, sensor: dict, values: dict) -> bool:
"""Met à jour les attributs d'un asset OpenRemote via REST."""
# Mapping sid → asset ID (créés manuellement dans OR)
ASSET_MAP = {
"traffic_000": "7b5c5670d1b84865ba3ac7", # Traffic Fort-de-France Centre
"traffic_001": "557f6e993b994d3cb81017", # Traffic Fort-de-France North
"traffic_002": "cb81dfd2d2dc4d25adc9c2", # Traffic Fort-de-France South
"airquality_000": "a51c982a2d3e451898b978", # Air Quality Fort-de-France
"parking_000": "b8f0df19e0af47b386ebb9", # Parking Fort-de-France Centre
"noise_000": "9035103d1866454fb7e451", # Noise Fort-de-France Centre
"weather_000": "b9de80905ac640f488ab27", # Weather Lamentin Airport
"light_000": "ee7823a41e594851ba202f", # Light Fort-de-France
}
asset_id = ASSET_MAP.get(sid)
if not asset_id:
return False
# Construire les attributs à jour
now = datetime.now(timezone.utc).isoformat()
attrs = {
"timestamp": {"type": "DateTime", "value": now, "timestamp": now},
"battery_level": {"type": "Number", "value": random.randint(60, 100), "timestamp": now},
}
# Mapper les valeurs du payload vers les attributs OR
field_map = {
"temperature_celsius": "temperature",
"humidity_percent": "humidity",
"noise_level_db": "noiseLevel",
"pm25_ugm3": "airQuality",
"vehicle_count": "trafficFlow",
"available_spots": "parking",
"brightness_lux": "light",
"flood": "flood",
}
for field, val in values.items():
attr_name = field_map.get(field, field)
attrs[attr_name] = {
"type": "Number",
"value": val,
"timestamp": now,
"unit": "µg/m³" if "ugm3" in field else ("°C" if "celsius" in field else ("%" if "percent" in field or "humidity" in field else ("dB" if "db" in field else ""))),
}
payload = {
"id": asset_id,
"name": sensor["name"],
"type": "IOTSensor",
"realm": OR_REALM,
"attributes": attrs,
}
return _or_put(asset_id, payload)
def publish_influx(sid: str, sensor: dict, values: dict) -> bool:
"""Write sensor data to InfluxDB (async, non-blocking)."""
if not _influx_write_api:
return False
def _write_async():
try:
stype = sensor["type"]
lat = sensor.get("lat", BASE_LAT)
lon = sensor.get("lon", BASE_LON)
points = []
for field, value in values.items():
if isinstance(value, (int, float)):
p = influxdb_client.Point(stype)\
.tag("sensor_id", sid)\
.tag("location", sensor.get("name", sid))\
.field(field, float(value))\
.field("lat", float(lat))\
.field("lon", float(lon))
points.append(p)
if points:
_influx_write_api.write(bucket=INFLUX_BUCKET, record=points)
print(f" 📈 InfluxDB: {len(points)} points written")
except Exception as e:
print(f" ⚠️ InfluxDB → {e}")
# Exécution asynchrone (non-bloquante)
t = threading.Thread(target=_write_async, daemon=True)
t.start()
return True
def main():
print("╔══════════════════════════════════════════════════╗")
print("║ Smart City Simulator — Martinique ║")
print("╚══════════════════════════════════════════════════╝")
print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s")
print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}")
mqtt_client = MultiMQTT()
running = True
def signal_handler(*_):
nonlocal running
running = False
print("\n[SIM] 🛑 Arrêt...")
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
iteration = 0
while running:
iteration += 1
ts = datetime.now().strftime("%H:%M:%S")
print(f"\n[SIM] ⏱️ It #{iteration}{ts}")
for sid, sensor in SENSORS.items():
stype = sensor["type"]
topic = f"city/sensors/{stype}/{sid}"
# --- Payload MQTT ---
ranges = SENSOR_RANGES.get(stype, {})
payload_mqtt = {
"id": sid,
"type": stype,
"name": sensor["name"],
"lat": sensor["lat"],
"lon": sensor["lon"],
"timestamp": datetime.now(timezone.utc).isoformat(),
"battery_level": random.randint(60, 100),
}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
payload_mqtt[field] = round(random.uniform(lo, hi), 1)
elif isinstance(val_range, list):
payload_mqtt[field] = random.choice(val_range)
msg = json.dumps(payload_mqtt, ensure_ascii=False)
# --- MQTT publish ---
results = mqtt_client.publish(topic, msg)
ok_mqtt = [n for n, r in results.items() if r]
if ok_mqtt:
print(f" 📤 {topic}{','.join(ok_mqtt)}")
# Extraire les valeurs pour OpenRemote
or_values = {}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
or_values[field] = round(random.uniform(lo, hi), 1)
# --- OpenRemote REST ---
if ENABLE_OPENREMOTE:
ok_or = publish_openremote(sid, sensor, or_values)
print(f" 🏠 OpenRemote: {'' if ok_or else '⚠️ skipped'}")
# --- Orion-LD ---
if ENABLE_ORION:
ok_or = publish_orion(sid, sensor)
print(f" 🌐 Orion-LD: {'' if ok_or else '⚠️ skipped'}")
# --- Stellio ---
if ENABLE_STELLIO:
ok_st = publish_stellio(sid, sensor)
print(f" 🏢 Stellio: {'' if ok_st else ''}")
# --- FROST ---
if ENABLE_FROST:
ranges2 = SENSOR_RANGES.get(stype, {})
for field, val_range in ranges2.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
val = round(random.uniform(lo, hi), 1)
ok_fr = publish_frost(sid, sensor, field, val)
print(f" 📊 FROST: {'' if ok_fr else ''}")
# --- InfluxDB ---
if ENABLE_INFLUX:
influx_vals = {}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
influx_vals[field] = round(random.uniform(lo, hi), 1)
ok_influx = publish_influx(sid, sensor, influx_vals)
print(f" 📈 InfluxDB: {'' if ok_influx else ''}")
# --- BunkerM HTTP ---
if os.getenv("BUNKERM_HTTP", "0") == "1":
ok_bunkerm = publish_bunkerm(sid, sensor, payload_mqtt)
print(f" 📦 BunkerM: {'' if ok_bunkerm else ''}")
print(f"[SIM] ✅ {len(SENSORS)} capteurs | MQTT OK: {sum(mqtt_client.ok.values())}/{len(mqtt_client.clients)} | OR: {ENABLE_OPENREMOTE}")
try:
time.sleep(INTERVAL)
except KeyboardInterrupt:
break
mqtt_client.stop()
print("[SIM] ✅ Arrêté proprement.")
if __name__ == "__main__":
main()