Files
smart-city-digital-twin-mar…/simulator.py

821 lines
36 KiB
Python

#!/usr/bin/env python3
"""
Smart City IoT Simulator — Martinique (14.6°N, 61.2°W)
=======================================================
Publie vers MULTIPLES brokers MQTT + context brokers NGSI-LD.
Brokers MQTT:
- EMQX: emqx_emqx_1:1883 (sans auth)
- Mosquitto: mosquitto-traefik:1883 (bunker/bunker)
- BunkerM: bunkerm_bunkerm_1:1900 (TLS, bunker/bunker)
- OpenRemote: openremote-manager-1:1883 (admin/Digitribe972)
Context Brokers REST:
- Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD)
- Stellio: stellio-api-gateway:8080 (NGSI-LD)
- FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings)
Variables d'environnement:
PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s)
BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France)
ENABLE_ORION=1 : activer Orion-LD (défaut: 1)
ENABLE_STELLIO=1 : activer Stellio (défaut: 1)
ENABLE_FROST=1 : activer FROST-Server (défaut: 1)
"""
import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse
import paho.mqtt.client as mqtt
import urllib.request, urllib.error
from datetime import datetime, timezone
from typing import Any
# =============================================================================
# Configuration
# =============================================================================
BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091"))
BASE_LON = float(os.environ.get("BASE_LON", "-61.2155"))
INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "10"))
ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1"
ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1") == "1"
ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1"
ENABLE_OPENREMOTE = os.environ.get("ENABLE_OPENREMOTE", "1") == "1"
OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin")
OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "Digitribe972")
OR_REALM = os.environ.get("OR_REALM", "smartcity")
OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token
FROST_URL = os.environ.get("FROST_URL", "http://192.168.208.3:8080/FROST-Server/v1.1")
SENSOR_COUNTS = {
"traffic": int(os.environ.get("SENSOR_COUNT_traffic", "3")),
"airquality": int(os.environ.get("SENSOR_COUNT_airquality", "2")),
"parking": int(os.environ.get("SENSOR_COUNT_parking", "2")),
"noise": int(os.environ.get("SENSOR_COUNT_noise", "1")),
"weather": int(os.environ.get("SENSOR_COUNT_weather", "1")),
"light": int(os.environ.get("SENSOR_COUNT_light", "1")),
}
# Si SENSOR_COUNT est défini, multiplier les counts de façon proportionnelle
_total_default = sum(SENSOR_COUNTS.values())
if "SENSOR_COUNT" in os.environ:
target = int(os.environ["SENSOR_COUNT"])
ratio = target / _total_default
for k in SENSOR_COUNTS:
SENSOR_COUNTS[k] = max(1, int(SENSOR_COUNTS[k] * ratio))
# =============================================================================
# Localisation des capteurs Martinique
# =============================================================================
SENSOR_LOCATIONS: dict[str, list[dict]] = {}
SENSOR_NAMES: dict[str, list[str]] = {
"traffic": ["Carrefour Central", "Avenue des Caraïbes", "Boulevard Pasteur",
"Rue des Flamboyants", "Place de la République"],
"airquality": ["Quartier Bonde", "Port de Fort-de-France", "Château Denis",
"Lamentin Aéroport", "Schoelcher Village"],
"parking": ["Parking Rivière-Saleé", "Parking Cluny", "Parking Média",
"Parking Grand-Camp", "Parking Dillon"],
"noise": ["Rue des Arts", "Marché Central", "Université Fort-de-France",
"Stade de Dillon", "Place du Champs de Mars"],
"weather": ["Station Météo Lamentin", "Station Schoelcher",
"Station Ajoupa-Bouillon", "Station Le François", "Station Le Robert"],
"light": ["Eclairage Rue des Mouettes", "Candela Boulevard",
"Lumiere Rue des Acacias", "Feux Signalisation Centre", "Eclairage Port"],
}
def _gen_locs(stype: str, count: int) -> list[dict]:
locs = []
for i in range(count):
lat = BASE_LAT + random.uniform(-0.05, 0.05)
lon = BASE_LON + random.uniform(-0.05, 0.05)
names = SENSOR_NAMES.get(stype, [stype])
locs.append({
"lat": round(lat, 6),
"lon": round(lon, 6),
"name": names[i % len(names)],
})
return locs
for stype, count in SENSOR_COUNTS.items():
SENSOR_LOCATIONS[stype] = _gen_locs(stype, count)
# Ranges par type
SENSOR_RANGES: dict[str, dict] = {
"traffic": {"vehicle_count":(10,150),"average_speed_kmh":(10,80),
"congestion_level":(0,5),"occupancy_percent":(0,100)},
"airquality": {"pm25_ugm3":(5,80),"pm10_ugm3":(10,150),"no2_ugm3":(5,60),
"o3_ugm3":(20,120),"co_mgm3":(0.1,5.0),
"temperature_celsius":(20,35),"humidity_percent":(40,95)},
"parking": {"total_spots":(50,500),"available_spots":(0,500),
"occupancy_percent":(0,100),"turnover_per_hour":(5,50)},
"noise": {"noise_level_db":(40,95),"peak_db":(60,110)},
"weather": {"temperature_celsius":(22,34),"humidity_percent":(50,95),
"wind_speed_kmh":(0,50),"pressure_hpa":(1005,1025),
"rain_mm":(0,20),"uv_index":(0,11)},
"light": {"brightness_lux":(0,100000),"power_consumption_w":(0,500)},
}
NOISE_CATEGORIES = ["quiet","moderate","loud","very_loud"]
LIGHT_STATUSES = ["on","off","dimmed","auto"]
# =============================================================================
# Capteurs déclarés
# =============================================================================
SENSORS: dict[str, dict] = {}
counter = 0
for stype, locs in SENSOR_LOCATIONS.items():
for loc in locs:
sid = f"{stype}_{counter:03d}"
SENSORS[sid] = {"type": stype, "lat": loc["lat"], "lon": loc["lon"], "name": loc["name"]}
counter += 1
# =============================================================================
# Payload NGSI-LD pour Orion-LD / Stellio
# =============================================================================
# Contextes NGSI-LD : core + Smart Data Models
# https://smartdatamodels.org pour les @context officiels
STELLIO_CONTEXT_URL = "http://172.29.0.5:8085"
ORION_CONTEXT = [
f"{STELLIO_CONTEXT_URL}/ngsi-ld-core.jsonld",
"https://raw.githubusercontent.com/smart-data-models/dataModel.Environment/master/context.jsonld",
"https://raw.githubusercontent.com/smart-data-models/dataModel.Transportation/master/context.jsonld",
"https://raw.githubusercontent.com/smart-data-models/dataModel.Parking/master/context.jsonld",
"https://raw.githubusercontent.com/smart-data-models/dataModel.Weather/master/context.jsonld",
"https://raw.githubusercontent.com/smart-data-models/dataModel.Device/master/context.jsonld",
]
# Mapping sensor type → Smart Data Model type NGSI-LD
SMART_MODEL_MAPPING = {
"airquality": "AirQualityObserved",
"traffic": "TrafficFlowObserved",
"parking": "OffStreetParking",
"noise": "NoiseLevelObserved",
"weather": "WeatherObserved",
"light": "Device",
}
FROST_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"}
# Cache FROST : éviter de recréer Thing/Datastream
_frost_cache: dict[str, tuple[str, str]] = {} # (sid, field) -> (thing_id, ds_id)
def _ngsi_payload(sid: str, sensor: dict) -> dict:
"""Construit un payload NGSI-LD avec Smart Data Models officiels."""
stype = sensor["type"]
model_type = SMART_MODEL_MAPPING.get(stype, "Device")
now = datetime.now(timezone.utc).isoformat()
# Attributs communs à tous les modèles
payload = {
"@context": ORION_CONTEXT,
"id": f"urn:ngsi-ld:{model_type}:{sid}",
"type": model_type,
"dateObserved": {"type": "Property", "value": now},
"location": {"type": "GeoProperty",
"value": {"type": "Point",
"coordinates": [sensor["lon"], sensor["lat"]]}},
"name": {"type": "Property", "value": sensor["name"]},
"batteryLevel": {"type": "Property", "value": random.randint(60, 100)},
}
# Attributs spécifiques par type de modèle
ranges = SENSOR_RANGES.get(stype, {})
props = {}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
props[field] = {"type": "Property", "value": round(random.uniform(lo, hi), 1)}
elif isinstance(val_range, list):
props[field] = {"type": "Property", "value": random.choice(val_range)}
# Mapping vers les noms d'attributs Smart Data Models
if stype == "airquality":
if "pm25_ugm3" in props: payload["NO2"] = props.pop("pm25_ugm3") # Simplifié
if "pm10_ugm3" in props: payload["PM10"] = props.pop("pm10_ugm3")
if "no2_ugm3" in props: payload["NO2"] = props.pop("no2_ugm3")
if "o3_ugm3" in props: payload["O3"] = props.pop("o3_ugm3")
if "co_mgm3" in props: payload["CO"] = props.pop("co_mgm3")
if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius")
if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent")
elif stype == "traffic":
if "vehicle_count" in props: payload["vehicleCount"] = props.pop("vehicle_count")
if "average_speed_kmh" in props: payload["averageVehicleSpeed"] = props.pop("average_speed_kmh")
if "congestion_level" in props: payload["congestion"] = props.pop("congestion_level")
if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent")
elif stype == "parking":
if "available_spots" in props: payload["availableSpotNumber"] = props.pop("available_spots")
if "total_spots" in props: payload["totalSpotNumber"] = props.pop("total_spots")
if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent")
if "turnover_per_hour" in props: payload["turnover"] = props.pop("turnover_per_hour")
elif stype == "noise":
if "noise_level_db" in props: payload["noiseLevel"] = props.pop("noise_level_db")
if "peak_db" in props: payload["noisePeak"] = props.pop("peak_db")
payload["noiseCategory"] = {"type": "Property", "value": random.choice(NOISE_CATEGORIES)}
elif stype == "weather":
if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius")
if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent")
if "rain_mm" in props: payload["rainfall"] = props.pop("rain_mm")
if "uv_index" in props: payload["uvIndex"] = props.pop("uv_index")
if "wind_speed_kmh" in props: payload["windSpeed"] = props.pop("wind_speed_kmh")
elif stype == "light":
if "brightness_lux" in props: payload["illuminance"] = props.pop("brightness_lux")
if "power_consumption_w" in props: payload["power"] = props.pop("power_consumption_w")
payload["status"] = {"type": "Property", "value": random.choice(LIGHT_STATUSES)}
return payload
def _frost_payload(sid: str, sensor: dict) -> dict:
"""Construit un payload SensorThings pour FROST-Server."""
stype = sensor["type"]
ranges = SENSOR_RANGES.get(stype, {})
datastreams = []
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)) and isinstance(hi, (int, float)):
val = round(random.uniform(lo, hi), 1)
unit = "http://www.qudt.org/vocab/unit#DegreeCelsius"
obs_prop = {
"name": f"{field} Observation",
"description": f"Observation of {field}",
"definition": unit,
}
sensor_data = {
"name": f"Sensor {sid} {field}",
"description": f"Sensor {sid} measuring {field}",
"encodingType": "http://www.opengis.net/doc/IS/SensorML/2.0",
"metadata": {"unit": unit},
}
ds = {
"name": f"Datastream {stype}/{field}",
"description": f"Datastream for {stype} sensor {sid} - {field}",
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
"unitOfMeasurement": {"name": field, "symbol": "", "definition": unit},
"Sensor": sensor_data,
"ObservedProperty": obs_prop,
}
datastreams.append((field, ds, val))
thing_payload = {
"name": f"Thing_{sid}",
"description": f"Smart City {stype} sensor in Martinique",
"properties": {"sensorType": stype, "region": "Martinique"},
}
return thing_payload, datastreams
# =============================================================================
# HTTP helper
# =============================================================================
def _http_post(url: str, data: dict, headers: dict) -> str:
"""POST et retourne 'ok' ou 'created' (ou '' si échec)."""
try:
body = json.dumps(data).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=8) as resp:
if resp.status == 204:
return 'created' # No Content — succès
if resp.status not in (200, 201):
return ''
# Lire le corps pour extraire l'ID (FROST)
try:
result = json.loads(resp.read())
if '@iot.selfLink' in result:
link = result['@iot.selfLink']
return link.split('(')[1].rstrip(')')
if '@iot.id' in result:
return str(result['@iot.id'])
except Exception:
pass
location = resp.headers.get('Location', '')
if location:
return location.split('(')[1].rstrip(')') if '(' in location else ''
return 'created'
except urllib.error.HTTPError as e:
# Lire le corps de l'erreur pour debug
try:
err_body = e.read().decode()[:200]
except Exception:
err_body = str(e)
print(f" ⚠️ HTTP POST {url}{e.code}: {err_body}")
return ''
except Exception as e:
print(f" ⚠️ HTTP POST {url}{e}")
return ''
def _http_put(url: str, data: dict, headers: dict) -> bool:
try:
body = json.dumps(data).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="PUT")
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status in (200, 204)
except urllib.error.HTTPError as e:
if e.code == 409:
return True # Already exists - that's fine
print(f" ⚠️ HTTP PUT {url}{e}")
return False
except Exception as e:
print(f" ⚠️ HTTP PUT {url}{e}")
return False
# =============================================================================
# MQTT Client multi-broker
# =============================================================================
class MultiMQTT:
def __init__(self):
self.clients: dict[str, mqtt.Client] = {}
self.ok: dict[str, bool] = {}
self._lock = threading.Lock()
self._setup()
def _mk_client(self, name: str, host: str, port: int,
tls: bool = False, user: str = "", pwd: str = "",
ws: bool = False) -> mqtt.Client:
cid = f"smartcity-sim-{name}-{os.getpid()}"
c = mqtt.Client(client_id=cid, protocol=mqtt.MQTTv311)
if user:
c.username_pw_set(user, pwd)
if tls:
c.tls_set(cert_reqs=ssl.CERT_NONE)
c.tls_insecure_set(True)
if ws:
c.ws_set(b"/mqtt")
c.on_connect = lambda _c, _, __, rc: self._on_connect(name, rc)
c.on_disconnect = lambda _c, _, __: self._on_disconnect(name)
try:
c.connect(host, port, keepalive=30)
c.loop_start()
except Exception as e:
print(f"[MQTT] ❌ {name} @ {host}:{port}{e}")
self.ok[name] = False
return c
def _on_connect(self, name: str, rc: int):
with self._lock:
if rc == 0:
self.ok[name] = True
print(f"[MQTT] ✅ {name} connecté")
else:
self.ok[name] = False
print(f"[MQTT] ❌ {name} rc={rc}")
def _on_disconnect(self, name: str):
with self._lock:
self.ok[name] = False
print(f"[MQTT] ⚠️ {name} déconnecté")
def _setup(self):
# Garder que EMQX et Mosquitto (MQTT fonctionnels)
# BunkerM via HTTP API (port 2000) au lieu de MQTT/TLS
brokers = [
("EMQX", "emqx_emqx_1", 1883, False, "", ""),
("Mosquitto", "mosquitto-traefik", 1883, False, "bunker", "bunker"),
]
print("[MQTT] 🔌 Connexion aux brokers...")
for name, host, port, tls, user, pwd in brokers:
c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd)
self.clients[name] = c
self.ok[name] = False
time.sleep(3) # Attend les connexions
def publish(self, topic: str, payload: str) -> dict[str, bool]:
results = {}
with self._lock:
for name, client in self.clients.items():
if self.ok.get(name, False):
try:
r = client.publish(topic, payload, qos=1)
results[name] = (r.rc == mqtt.MQTT_ERR_SUCCESS)
except Exception:
results[name] = False
else:
results[name] = False
return results
def stop(self):
for name, c in self.clients.items():
try:
c.loop_stop()
c.disconnect()
except Exception:
pass
# =============================================================================
# URLs de base (résolues au démarrage)
# =============================================================================
ORION_HOST = "fiware-gis-quickstart-orion-1"
ORION_IP = ""
try:
import socket
ORION_IP = socket.gethostbyname(ORION_HOST)
except:
pass
ORION_URL = f"http://{ORION_IP or ORION_HOST}:1026" if ORION_IP else "http://fiware-gis-quickstart-orion-1:1026"
STELLIO_URL = "http://stellio-api-gateway:8080"
# Configuration OpenRemote (URLs dynamiques)
OR_URL = os.environ.get("OR_URL", "http://openremote-manager-1:8080") # Hostname Docker interne
OR_REALM = os.environ.get("OR_REALM", "smartcity") # Default: smartcity
OR_TOKEN_URL = os.environ.get("OR_TOKEN_URL", f"http://openremote-keycloak-1:8080/auth/realms/{OR_TOKEN_REALM}/protocol/openid-connect/token")
OR_TOKEN_TTL = int(os.environ.get("OR_TOKEN_TTL", "3600")) # Refresh token every hour
def publish_stellio(sid: str, sensor: dict) -> bool:
"""Publie sur Stellio (gère le 409)."""
entity = _ngsi_payload(sid, sensor)
url = f"{STELLIO_URL}/ngsi-ld/v1/entities" # Sans options=upsert
headers = {
"Content-Type": "application/ld+json",
"Accept": "application/ld+json",
}
try:
body = json.dumps(entity).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=8) as resp:
print(f" 🏢 Stellio: ✅ (HTTP {resp.status})")
return True
except urllib.error.HTTPError as e:
if e.code == 409: # Already exists, do update with PUT
try:
entity_id = urllib.parse.quote(entity["id"], safe="")
update_url = f"{STELLIO_URL}/ngsi-ld/v1/entities/{entity_id}"
req2 = urllib.request.Request(update_url, data=body, headers=headers, method="PUT")
with urllib.request.urlopen(req2, timeout=8) as resp2:
print(f" 🏢 Stellio: ✅ (HTTP {resp2.status} updated)")
return True
except Exception as e2:
print(f" ⚠️ Stellio update failed: {e2}")
return False
try:
err = e.read().decode()[:300]
except Exception:
err = str(e)
print(f" ⚠️ Stellio → {e.code}: {err}")
return False
except Exception as e:
print(f" ⚠️ Stellio → {e}")
return False
def publish_orion(sid: str, sensor: dict) -> bool:
"""Publie sur Orion-LD (POST create, PATCH update)."""
import socket
entity = _ngsi_payload(sid, sensor)
if not hasattr(publish_orion, "orion_ip"):
try:
publish_orion.orion_ip = socket.gethostbyname("fiware-gis-quickstart-orion-1")
except Exception:
publish_orion.orion_ip = "192.168.192.20"
base = f"http://{publish_orion.orion_ip}:1026/ngsi-ld/v1"
# 1. Essayer de créer (POST)
try:
body = json.dumps(entity).encode()
req = urllib.request.Request(f"{base}/entities", data=body,
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="POST")
with urllib.request.urlopen(req, timeout=8) as resp:
print(f" 🌐 Orion-LD: ✅ (HTTP {resp.status} created)")
return True
except urllib.error.HTTPError as e:
if e.code != 409:
print(f" ⚠️ Orion-LD → {e.code}: {e.read().decode()[:200]}")
return False
# 2. Déjà existant (409) → PATCH sur les attributs
try:
eid = urllib.parse.quote(entity['id'], safe='')
patch_url = f"{base}/entities/{eid}/attrs"
req2 = urllib.request.Request(patch_url, data=body,
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="PATCH")
with urllib.request.urlopen(req2, timeout=8) as resp2:
print(f" 🌐 Orion-LD: ✅ (HTTP {resp2.status} updated)")
return True
except Exception as e2:
print(f" ⚠️ Orion-LD → update failed: {e2}")
return False
except Exception as e:
print(f" ⚠️ Orion-LD → {e}")
return False
def publish_bunkerm(sid: str, sensor: dict, values: dict) -> bool:
"""Publie sur BunkerM via HTTP API (port 2000) avec session."""
import base64, http.cookiejar, urllib.request, json
from datetime import datetime, timezone
host = "bunkerm_bunkerm_1:2000"
login_url = f"http://{host}/login"
data_url = f"http://{host}/api/sensors/data"
# 1. Cookie jar pour maintenir la session
cj = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
# 2. Authentification (Basic) pour obtenir le cookie de session
creds = base64.b64encode(b"bunker:bunker").decode()
auth_header = {"Authorization": f"Basic {creds}"}
try:
# GET sur /login pour initialiser la session
req_login = urllib.request.Request(login_url, headers=auth_header)
opener.open(req_login, timeout=5)
except Exception as e:
print(f" ⚠️ BunkerM login → {e}")
return False
# 3. Préparer le payload
payload = {
"sensor_id": sid,
"sensor_type": sensor["type"],
"timestamp": datetime.now(timezone.utc).isoformat(),
}
for k, v in values.items():
if k not in payload:
payload[k] = v
# 4. POST avec le cookie de session
data = json.dumps(payload).encode()
req = urllib.request.Request(
data_url,
data=data,
headers={**auth_header, "Content-Type": "application/json"},
method="POST"
)
try:
with opener.open(req, timeout=5) as resp:
print(f" ✅ BunkerM: HTTP {resp.status}")
return resp.status in (200, 201, 204)
except Exception as e:
print(f" ⚠️ BunkerM POST → {e}")
return False
def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool:
"""Crée le Thing (1 par capteur) + Datastreams, puis POST l'Observation."""
# Cache : {sid: (thing_id, {field: ds_id})}
if sid in _frost_cache:
thing_id, ds_map = _frost_cache[sid]
if field in ds_map:
ds_id = ds_map[field]
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
obs = {
"resultTime": datetime.now(timezone.utc).isoformat(),
"result": value,
"FeatureOfInterest": {
"name": f"Location {sid}",
"description": f"Feature of interest for sensor {sid}",
"encodingType": "application/vnd.geo+json",
"feature": {
"type": "Point",
"coordinates": [sensor.get("lon", -61.0), sensor.get("lat", 14.6)]
}
}
}
if _http_post(obs_url, obs, FROST_HEADERS):
print(f" ✅ FROST Observation {sid}/{field} → OK (cached)")
return True
else:
print(f" ⚠️ FROST Observation {sid}/{field} → échec")
return False
# Premier appel pour ce capteur : créer Thing + tous les Datastreams
thing_payload, datastreams = _frost_payload(sid, sensor)
print(f" 📊 FROST: POST Thing {sid}...")
tid = _http_post(f"{FROST_URL}/Things", thing_payload, FROST_HEADERS)
if not tid:
print(f" ⚠️ FROST Thing {sid} → échec création")
return False
print(f" ✅ FROST Thing {sid} créé (ID: {tid})")
# Créer les Datastreams
ds_map = {}
for f, ds, _ in datastreams:
ds["Thing"] = {"@iot.id": tid}
print(f" 📊 FROST: POST Datastream {sid}/{f}...")
ds_id = _http_post(f"{FROST_URL}/Datastreams", ds, FROST_HEADERS)
if ds_id:
print(f" ✅ FROST Datastream {sid}/{f} créé (ID: {ds_id})")
ds_map[f] = ds_id
else:
print(f" ⚠️ FROST Datastream {sid}/{f} → échec")
_frost_cache[sid] = (tid, ds_map)
# Poster l'observation pour le field actuel
if field in ds_map:
ds_id = ds_map[field]
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
obs = {"resultTime": datetime.now(timezone.utc).isoformat(), "result": value}
if _http_post(obs_url, obs, FROST_HEADERS):
print(f" ✅ FROST Observation {sid}/{field} → OK")
return True
return False
# =============================================================================
# OpenRemote REST
# =============================================================================
_or_token_cache = {"token": "", "expires": 0}
def _get_or_token() -> str:
"""Obtain an OpenRemote token via password grant (admin-cli, directAccessGrants enabled)."""
import time, urllib.parse
if _or_token_cache["token"] and _or_token_cache["expires"] > time.time() + 60:
return _or_token_cache["token"]
try:
# Use password grant with openremote client in the target realm (smartcity)
data = urllib.parse.urlencode({
"grant_type": "password",
"username": os.environ.get("OR_ADMIN_USER", "admin"),
"password": os.environ.get("OR_ADMIN_PASS", "Digitribe972"),
"client_id": os.environ.get("OR_CLIENT_ID", "openremote")
}).encode()
# Token URL uses OR_REALM (smartcity) not OR_TOKEN_REALM
token_url = f"http://openremote-keycloak-1:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token"
req = urllib.request.Request(
token_url,
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
with urllib.request.urlopen(req, timeout=5) as r:
token_data = json.loads(r.read().decode())
_or_token_cache["token"] = token_data["access_token"]
_or_token_cache["expires"] = time.time() + token_data.get("expires_in", 300) - 60
return _or_token_cache["token"]
except Exception as e:
print(f" ⚠️ OpenRemote token → {e}")
return ""
def _or_put(asset_id: str, payload: dict) -> bool:
"""PUT update sur un asset OpenRemote (avec version)."""
token = _get_or_token()
if not token:
return False
try:
body = json.dumps(payload).encode()
url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}"
req = urllib.request.Request(url, data=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"If-Match": str(payload.get("version", 1)),
},
method="PUT")
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status in (200, 204)
except urllib.error.HTTPError as e:
print(f" ⚠️ OR PUT {asset_id} → HTTP {e.code}")
return False
except Exception as e:
print(f" ⚠️ OR PUT {asset_id}{e}")
return False
def publish_openremote(sid: str, sensor: dict, values: dict) -> bool:
"""Met à jour les attributs d'un asset OpenRemote via REST."""
# Mapping sid → asset ID (créés manuellement dans OR)
ASSET_MAP = {
"traffic_000": "7b5c5670d1b84865ba3ac7", # Traffic Fort-de-France Centre
"traffic_001": "557f6e993b994d3cb81017", # Traffic Fort-de-France North
"traffic_002": "cb81dfd2d2dc4d25adc9c2", # Traffic Fort-de-France South
"airquality_000": "a51c982a2d3e451898b978", # Air Quality Fort-de-France
"parking_000": "b8f0df19e0af47b386ebb9", # Parking Fort-de-France Centre
"noise_000": "9035103d1866454fb7e451", # Noise Fort-de-France Centre
"weather_000": "b9de80905ac640f488ab27", # Weather Lamentin Airport
"light_000": "ee7823a41e594851ba202f", # Light Fort-de-France
}
asset_id = ASSET_MAP.get(sid)
if not asset_id:
return False
# Construire les attributs à jour
now = datetime.now(timezone.utc).isoformat()
attrs = {
"timestamp": {"type": "DateTime", "value": now, "timestamp": now},
"battery_level": {"type": "Number", "value": random.randint(60, 100), "timestamp": now},
}
# Mapper les valeurs du payload vers les attributs OR
field_map = {
"temperature_celsius": "temperature",
"humidity_percent": "humidity",
"noise_level_db": "noiseLevel",
"pm25_ugm3": "airQuality",
"vehicle_count": "trafficFlow",
"available_spots": "parking",
"brightness_lux": "light",
"flood": "flood",
}
for field, val in values.items():
attr_name = field_map.get(field, field)
attrs[attr_name] = {
"type": "Number",
"value": val,
"timestamp": now,
"unit": "µg/m³" if "ugm3" in field else ("°C" if "celsius" in field else ("%" if "percent" in field or "humidity" in field else ("dB" if "db" in field else ""))),
}
payload = {
"id": asset_id,
"name": sensor["name"],
"type": "IOTSensor",
"realm": OR_REALM,
"attributes": attrs,
}
return _or_put(asset_id, payload)
# =============================================================================
def main():
print("╔══════════════════════════════════════════════════╗")
print("║ Smart City Simulator — Martinique ║")
print("╚══════════════════════════════════════════════════╝")
print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s")
print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}")
mqtt_client = MultiMQTT()
running = True
def signal_handler(*_):
nonlocal running
running = False
print("\n[SIM] 🛑 Arrêt...")
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
iteration = 0
while running:
iteration += 1
ts = datetime.now().strftime("%H:%M:%S")
print(f"\n[SIM] ⏱️ It #{iteration}{ts}")
for sid, sensor in SENSORS.items():
stype = sensor["type"]
topic = f"city/sensors/{stype}/{sid}"
# --- Payload MQTT ---
ranges = SENSOR_RANGES.get(stype, {})
payload_mqtt = {
"id": sid,
"type": stype,
"name": sensor["name"],
"lat": sensor["lat"],
"lon": sensor["lon"],
"timestamp": datetime.now(timezone.utc).isoformat(),
"battery_level": random.randint(60, 100),
}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
payload_mqtt[field] = round(random.uniform(lo, hi), 1)
elif isinstance(val_range, list):
payload_mqtt[field] = random.choice(val_range)
msg = json.dumps(payload_mqtt, ensure_ascii=False)
# --- MQTT publish ---
results = mqtt_client.publish(topic, msg)
ok_mqtt = [n for n, r in results.items() if r]
if ok_mqtt:
print(f" 📤 {topic}{','.join(ok_mqtt)}")
# Extraire les valeurs pour OpenRemote
or_values = {}
for field, val_range in ranges.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
or_values[field] = round(random.uniform(lo, hi), 1)
# --- OpenRemote REST ---
if ENABLE_OPENREMOTE:
ok_or = publish_openremote(sid, sensor, or_values)
print(f" 🏠 OpenRemote: {'' if ok_or else '⚠️ skipped'}")
# --- Orion-LD ---
if ENABLE_ORION:
ok_or = publish_orion(sid, sensor)
print(f" 🌐 Orion-LD: {'' if ok_or else '⚠️ skipped'}")
# --- Stellio ---
if ENABLE_STELLIO:
ok_st = publish_stellio(sid, sensor)
print(f" 🏢 Stellio: {'' if ok_st else ''}")
# --- FROST ---
if ENABLE_FROST:
ranges2 = SENSOR_RANGES.get(stype, {})
for field, val_range in ranges2.items():
if isinstance(val_range, tuple) and len(val_range) == 2:
lo, hi = val_range
if isinstance(lo, (int, float)):
val = round(random.uniform(lo, hi), 1)
ok_fr = publish_frost(sid, sensor, field, val)
print(f" 📊 FROST: {'' if ok_fr else ''}")
# --- BunkerM HTTP ---
if os.getenv("BUNKERM_HTTP", "0") == "1":
ok_bunkerm = publish_bunkerm(sid, sensor, payload_mqtt)
print(f" 📦 BunkerM: {'' if ok_bunkerm else ''}")
print(f"[SIM] ✅ {len(SENSORS)} capteurs | MQTT OK: {sum(mqtt_client.ok.values())}/{len(mqtt_client.clients)} | OR: {ENABLE_OPENREMOTE}")
try:
time.sleep(INTERVAL)
except KeyboardInterrupt:
break
mqtt_client.stop()
print("[SIM] ✅ Arrêté proprement.")
if __name__ == "__main__":
main()