748 lines
32 KiB
Python
748 lines
32 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Smart City IoT Simulator — Martinique (14.6°N, 61.2°W)
|
|
=======================================================
|
|
Publie vers MULTIPLES brokers MQTT + context brokers NGSI-LD.
|
|
|
|
Brokers MQTT:
|
|
- EMQX: emqx_emqx_1:1883 (sans auth)
|
|
- Mosquitto: mosquitto-traefik:1883 (bunker/bunker)
|
|
- BunkerM: bunkerm_bunkerm_1:1900 (TLS, bunker/bunker)
|
|
- OpenRemote: openremote-manager-1:1883 (admin/Digitribe972)
|
|
|
|
Context Brokers REST:
|
|
- Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD)
|
|
- Stellio: stellio-api-gateway:8080 (NGSI-LD)
|
|
- FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings)
|
|
|
|
Variables d'environnement:
|
|
PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s)
|
|
BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France)
|
|
ENABLE_ORION=1 : activer Orion-LD (défaut: 1)
|
|
ENABLE_STELLIO=1 : activer Stellio (défaut: 1)
|
|
ENABLE_FROST=1 : activer FROST-Server (défaut: 1)
|
|
"""
|
|
|
|
import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse
|
|
import paho.mqtt.client as mqtt
|
|
import urllib.request, urllib.error
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
# =============================================================================
|
|
# Configuration
|
|
# =============================================================================
|
|
BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091"))
|
|
BASE_LON = float(os.environ.get("BASE_LON", "-61.2155"))
|
|
INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "10"))
|
|
ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1"
|
|
ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1") == "1"
|
|
ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1"
|
|
ENABLE_OPENREMOTE = os.environ.get("ENABLE_OPENREMOTE", "1") == "1"
|
|
OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin")
|
|
OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "Digitribe972")
|
|
OR_REALM = os.environ.get("OR_REALM", "master")
|
|
|
|
SENSOR_COUNTS = {
|
|
"traffic": int(os.environ.get("SENSOR_COUNT_traffic", "3")),
|
|
"airquality": int(os.environ.get("SENSOR_COUNT_airquality", "2")),
|
|
"parking": int(os.environ.get("SENSOR_COUNT_parking", "2")),
|
|
"noise": int(os.environ.get("SENSOR_COUNT_noise", "1")),
|
|
"weather": int(os.environ.get("SENSOR_COUNT_weather", "1")),
|
|
"light": int(os.environ.get("SENSOR_COUNT_light", "1")),
|
|
}
|
|
# Si SENSOR_COUNT est défini, multiplier les counts de façon proportionnelle
|
|
_total_default = sum(SENSOR_COUNTS.values())
|
|
if "SENSOR_COUNT" in os.environ:
|
|
target = int(os.environ["SENSOR_COUNT"])
|
|
ratio = target / _total_default
|
|
for k in SENSOR_COUNTS:
|
|
SENSOR_COUNTS[k] = max(1, int(SENSOR_COUNTS[k] * ratio))
|
|
|
|
# =============================================================================
|
|
# Localisation des capteurs Martinique
|
|
# =============================================================================
|
|
SENSOR_LOCATIONS: dict[str, list[dict]] = {}
|
|
SENSOR_NAMES: dict[str, list[str]] = {
|
|
"traffic": ["Carrefour Central", "Avenue des Caraïbes", "Boulevard Pasteur",
|
|
"Rue des Flamboyants", "Place de la République"],
|
|
"airquality": ["Quartier Bonde", "Port de Fort-de-France", "Château Denis",
|
|
"Lamentin Aéroport", "Schoelcher Village"],
|
|
"parking": ["Parking Rivière-Saleé", "Parking Cluny", "Parking Média",
|
|
"Parking Grand-Camp", "Parking Dillon"],
|
|
"noise": ["Rue des Arts", "Marché Central", "Université Fort-de-France",
|
|
"Stade de Dillon", "Place du Champs de Mars"],
|
|
"weather": ["Station Météo Lamentin", "Station Schoelcher",
|
|
"Station Ajoupa-Bouillon", "Station Le François", "Station Le Robert"],
|
|
"light": ["Eclairage Rue des Mouettes", "Candela Boulevard",
|
|
"Lumiere Rue des Acacias", "Feux Signalisation Centre", "Eclairage Port"],
|
|
}
|
|
|
|
def _gen_locs(stype: str, count: int) -> list[dict]:
|
|
locs = []
|
|
for i in range(count):
|
|
lat = BASE_LAT + random.uniform(-0.05, 0.05)
|
|
lon = BASE_LON + random.uniform(-0.05, 0.05)
|
|
names = SENSOR_NAMES.get(stype, [stype])
|
|
locs.append({
|
|
"lat": round(lat, 6),
|
|
"lon": round(lon, 6),
|
|
"name": names[i % len(names)],
|
|
})
|
|
return locs
|
|
|
|
for stype, count in SENSOR_COUNTS.items():
|
|
SENSOR_LOCATIONS[stype] = _gen_locs(stype, count)
|
|
|
|
# Ranges par type
|
|
SENSOR_RANGES: dict[str, dict] = {
|
|
"traffic": {"vehicle_count":(10,150),"average_speed_kmh":(10,80),
|
|
"congestion_level":(0,5),"occupancy_percent":(0,100)},
|
|
"airquality": {"pm25_ugm3":(5,80),"pm10_ugm3":(10,150),"no2_ugm3":(5,60),
|
|
"o3_ugm3":(20,120),"co_mgm3":(0.1,5.0),
|
|
"temperature_celsius":(20,35),"humidity_percent":(40,95)},
|
|
"parking": {"total_spots":(50,500),"available_spots":(0,500),
|
|
"occupancy_percent":(0,100),"turnover_per_hour":(5,50)},
|
|
"noise": {"noise_level_db":(40,95),"peak_db":(60,110)},
|
|
"weather": {"temperature_celsius":(22,34),"humidity_percent":(50,95),
|
|
"wind_speed_kmh":(0,50),"pressure_hpa":(1005,1025),
|
|
"rain_mm":(0,20),"uv_index":(0,11)},
|
|
"light": {"brightness_lux":(0,100000),"power_consumption_w":(0,500)},
|
|
}
|
|
|
|
NOISE_CATEGORIES = ["quiet","moderate","loud","very_loud"]
|
|
LIGHT_STATUSES = ["on","off","dimmed","auto"]
|
|
|
|
# =============================================================================
|
|
# Capteurs déclarés
|
|
# =============================================================================
|
|
SENSORS: dict[str, dict] = {}
|
|
counter = 0
|
|
for stype, locs in SENSOR_LOCATIONS.items():
|
|
for loc in locs:
|
|
sid = f"{stype}_{counter:03d}"
|
|
SENSORS[sid] = {"type": stype, "lat": loc["lat"], "lon": loc["lon"], "name": loc["name"]}
|
|
counter += 1
|
|
|
|
# =============================================================================
|
|
# Payload NGSI-LD pour Orion-LD / Stellio
|
|
# =============================================================================
|
|
ORION_CONTEXT = ["https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context-v1.8.jsonld"] # schema.org invalide en JSON-LD
|
|
FROST_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"}
|
|
|
|
# Cache FROST : éviter de recréer Thing/Datastream
|
|
_frost_cache: dict[str, tuple[str, str]] = {} # (sid, field) -> (thing_id, ds_id)
|
|
|
|
def _ngsi_payload(sid: str, sensor: dict) -> dict:
|
|
"""Construit un payload NGSI-LD pour Orion-LD / Stellio."""
|
|
stype = sensor["type"]
|
|
ranges = SENSOR_RANGES.get(stype, {})
|
|
props = {}
|
|
for field, val_range in ranges.items():
|
|
if isinstance(val_range, tuple) and len(val_range) == 2:
|
|
lo, hi = val_range
|
|
if isinstance(lo, (int, float)) and isinstance(hi, (int, float)):
|
|
props[field] = {"type": "Property", "value": round(random.uniform(lo, hi), 1)}
|
|
elif isinstance(val_range, list):
|
|
val = random.choice(val_range)
|
|
props[field] = {"type": "Property", "value": val}
|
|
|
|
if stype == "noise":
|
|
props["noise_category"] = {"type": "Property", "value": random.choice(NOISE_CATEGORIES)}
|
|
if stype == "light":
|
|
props["status"] = {"type": "Property", "value": random.choice(LIGHT_STATUSES)}
|
|
|
|
props["battery_level"] = {"type": "Property", "value": random.randint(60, 100)}
|
|
|
|
return {
|
|
"@context": ORION_CONTEXT,
|
|
"id": f"urn:ngsi-ld:Sensor:{sid}",
|
|
"type": "Sensor",
|
|
"name": {"type": "Property", "value": sensor["name"]},
|
|
"location": {"type": "GeoProperty",
|
|
"value": {"type": "Point",
|
|
"coordinates": [sensor["lon"], sensor["lat"]]}},
|
|
"sensorType": {"type": "Property", "value": stype},
|
|
**props,
|
|
}
|
|
|
|
def _frost_payload(sid: str, sensor: dict) -> dict:
|
|
"""Construit un payload SensorThings pour FROST-Server."""
|
|
stype = sensor["type"]
|
|
ranges = SENSOR_RANGES.get(stype, {})
|
|
datastreams = []
|
|
|
|
for field, val_range in ranges.items():
|
|
if isinstance(val_range, tuple) and len(val_range) == 2:
|
|
lo, hi = val_range
|
|
if isinstance(lo, (int, float)) and isinstance(hi, (int, float)):
|
|
val = round(random.uniform(lo, hi), 1)
|
|
unit = "https://unitsofmeasure.org/..."
|
|
obs_prop = {
|
|
"name": f"{field} Observation",
|
|
"description": f"Observation of {field}",
|
|
"definition": unit,
|
|
}
|
|
sensor_data = {
|
|
"name": f"Sensor {sid} {field}",
|
|
"description": f"Sensor {sid} measuring {field}",
|
|
"encodingType": "http://www.opengis.net/doc/IS/SensorML/2.0",
|
|
"metadata": {"unit": unit},
|
|
}
|
|
ds = {
|
|
"name": f"Datastream {stype}/{field}",
|
|
"description": f"Datastream for {stype} sensor {sid} - {field}",
|
|
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
|
|
"unitOfMeasurement": {"name": field, "symbol": "", "definition": unit},
|
|
"Sensor": sensor_data,
|
|
"ObservedProperty": obs_prop,
|
|
}
|
|
datastreams.append((field, ds, val))
|
|
|
|
thing_payload = {
|
|
"name": f"Thing_{sid}",
|
|
"description": f"Smart City {stype} sensor in Martinique",
|
|
"properties": {"sensorType": stype, "region": "Martinique"},
|
|
"Locations": [{
|
|
"name": sensor["name"],
|
|
"description": f"Location of {stype} sensor {sid}",
|
|
"encodingType": "application/vnd.geo+json",
|
|
"location": {"type": "Point", "coordinates": [sensor["lon"], sensor["lat"]]},
|
|
}],
|
|
}
|
|
return thing_payload, datastreams
|
|
|
|
# =============================================================================
|
|
# HTTP helper
|
|
# =============================================================================
|
|
def _http_post(url: str, data: dict, headers: dict) -> str:
|
|
"""POST et retourne 'ok' ou 'created' (ou '' si échec)."""
|
|
try:
|
|
body = json.dumps(data).encode()
|
|
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
|
|
with urllib.request.urlopen(req, timeout=8) as resp:
|
|
if resp.status == 204:
|
|
return 'created' # No Content — succès
|
|
if resp.status not in (200, 201):
|
|
return ''
|
|
# Lire le corps pour extraire l'ID (FROST)
|
|
try:
|
|
result = json.loads(resp.read())
|
|
if '@iot.selfLink' in result:
|
|
link = result['@iot.selfLink']
|
|
return link.split('(')[1].rstrip(')')
|
|
if '@iot.id' in result:
|
|
return str(result['@iot.id'])
|
|
except Exception:
|
|
pass
|
|
location = resp.headers.get('Location', '')
|
|
if location:
|
|
return location.split('(')[1].rstrip(')') if '(' in location else ''
|
|
return 'created'
|
|
except urllib.error.HTTPError as e:
|
|
# Lire le corps de l'erreur pour debug
|
|
try:
|
|
err_body = e.read().decode()[:200]
|
|
except Exception:
|
|
err_body = str(e)
|
|
print(f" ⚠️ HTTP POST {url} → {e.code}: {err_body}")
|
|
return ''
|
|
except Exception as e:
|
|
print(f" ⚠️ HTTP POST {url} → {e}")
|
|
return ''
|
|
|
|
def _http_put(url: str, data: dict, headers: dict) -> bool:
|
|
try:
|
|
body = json.dumps(data).encode()
|
|
req = urllib.request.Request(url, data=body, headers=headers, method="PUT")
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
return resp.status in (200, 204)
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 409:
|
|
return True # Already exists - that's fine
|
|
print(f" ⚠️ HTTP PUT {url} → {e}")
|
|
return False
|
|
except Exception as e:
|
|
print(f" ⚠️ HTTP PUT {url} → {e}")
|
|
return False
|
|
|
|
# =============================================================================
|
|
# MQTT Client multi-broker
|
|
# =============================================================================
|
|
class MultiMQTT:
|
|
def __init__(self):
|
|
self.clients: dict[str, mqtt.Client] = {}
|
|
self.ok: dict[str, bool] = {}
|
|
self._lock = threading.Lock()
|
|
self._setup()
|
|
|
|
def _mk_client(self, name: str, host: str, port: int,
|
|
tls: bool = False, user: str = "", pwd: str = "",
|
|
ws: bool = False) -> mqtt.Client:
|
|
cid = f"smartcity-sim-{name}-{os.getpid()}"
|
|
c = mqtt.Client(client_id=cid, protocol=mqtt.MQTTv311)
|
|
if user:
|
|
c.username_pw_set(user, pwd)
|
|
if tls:
|
|
c.tls_set(cert_reqs=ssl.CERT_NONE)
|
|
c.tls_insecure_set(True)
|
|
if ws:
|
|
c.ws_set(b"/mqtt")
|
|
c.on_connect = lambda _c, _, __, rc: self._on_connect(name, rc)
|
|
c.on_disconnect = lambda _c, _, __: self._on_disconnect(name)
|
|
try:
|
|
c.connect(host, port, keepalive=30)
|
|
c.loop_start()
|
|
except Exception as e:
|
|
print(f"[MQTT] ❌ {name} @ {host}:{port} → {e}")
|
|
self.ok[name] = False
|
|
return c
|
|
|
|
def _on_connect(self, name: str, rc: int):
|
|
with self._lock:
|
|
if rc == 0:
|
|
self.ok[name] = True
|
|
print(f"[MQTT] ✅ {name} connecté")
|
|
else:
|
|
self.ok[name] = False
|
|
print(f"[MQTT] ❌ {name} rc={rc}")
|
|
|
|
def _on_disconnect(self, name: str):
|
|
with self._lock:
|
|
self.ok[name] = False
|
|
print(f"[MQTT] ⚠️ {name} déconnecté")
|
|
|
|
def _setup(self):
|
|
# Garder que EMQX et Mosquitto (MQTT fonctionnels)
|
|
# BunkerM via HTTP API (port 2000) au lieu de MQTT/TLS
|
|
brokers = [
|
|
("EMQX", "emqx_emqx_1", 1883, False, "", ""),
|
|
("Mosquitto", "mosquitto-traefik", 1883, False, "bunker", "bunker"),
|
|
]
|
|
print("[MQTT] 🔌 Connexion aux brokers...")
|
|
for name, host, port, tls, user, pwd in brokers:
|
|
c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd)
|
|
self.clients[name] = c
|
|
self.ok[name] = False
|
|
time.sleep(3) # Attend les connexions
|
|
|
|
def publish(self, topic: str, payload: str) -> dict[str, bool]:
|
|
results = {}
|
|
with self._lock:
|
|
for name, client in self.clients.items():
|
|
if self.ok.get(name, False):
|
|
try:
|
|
r = client.publish(topic, payload, qos=1)
|
|
results[name] = (r.rc == mqtt.MQTT_ERR_SUCCESS)
|
|
except Exception:
|
|
results[name] = False
|
|
else:
|
|
results[name] = False
|
|
return results
|
|
|
|
def stop(self):
|
|
for name, c in self.clients.items():
|
|
try:
|
|
c.loop_stop()
|
|
c.disconnect()
|
|
except Exception:
|
|
pass
|
|
|
|
# =============================================================================
|
|
# URLs de base (résolues au démarrage)
|
|
# =============================================================================
|
|
ORION_HOST = "fiware-gis-quickstart-orion-1"
|
|
ORION_IP = ""
|
|
try:
|
|
import socket
|
|
ORION_IP = socket.gethostbyname(ORION_HOST)
|
|
except:
|
|
pass
|
|
ORION_URL = f"http://{ORION_IP or ORION_HOST}:1026" if ORION_IP else "http://fiware-gis-quickstart-orion-1:1026"
|
|
STELLIO_URL = "http://stellio-api-gateway:8080"
|
|
# Configuration OpenRemote (URLs dynamiques)
|
|
OR_URL = os.environ.get("OR_URL", "http://192.168.192.10:8080") # IP directe (évite DNS)
|
|
OR_REALM = os.environ.get("OR_REALM", "smartcity") # Default: smartcity
|
|
OR_TOKEN_URL = f"{OR_URL}/auth/realms/{OR_REALM}/protocol/openid-connect/token"
|
|
OR_TOKEN_TTL = 3600 # Refresh token every hour
|
|
def publish_stellio(sid: str, sensor: dict) -> bool:
|
|
"""Publie sur Stellio (gère le 409)."""
|
|
entity = _ngsi_payload(sid, sensor)
|
|
url = f"{STELLIO_URL}/ngsi-ld/v1/entities" # Sans options=upsert
|
|
headers = {
|
|
"Content-Type": "application/ld+json",
|
|
"Accept": "application/ld+json",
|
|
}
|
|
try:
|
|
body = json.dumps(entity).encode()
|
|
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
|
|
with urllib.request.urlopen(req, timeout=8) as resp:
|
|
print(f" 🏢 Stellio: ✅ (HTTP {resp.status})")
|
|
return True
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 409: # Already exists, do update with PUT
|
|
try:
|
|
entity_id = urllib.parse.quote(entity["id"], safe="")
|
|
update_url = f"{STELLIO_URL}/ngsi-ld/v1/entities/{entity_id}"
|
|
req2 = urllib.request.Request(update_url, data=body, headers=headers, method="PUT")
|
|
with urllib.request.urlopen(req2, timeout=8) as resp2:
|
|
print(f" 🏢 Stellio: ✅ (HTTP {resp2.status} updated)")
|
|
return True
|
|
except Exception as e2:
|
|
print(f" ⚠️ Stellio update failed: {e2}")
|
|
return False
|
|
try:
|
|
err = e.read().decode()[:300]
|
|
except Exception:
|
|
err = str(e)
|
|
print(f" ⚠️ Stellio → {e.code}: {err}")
|
|
return False
|
|
except Exception as e:
|
|
print(f" ⚠️ Stellio → {e}")
|
|
return False
|
|
|
|
def publish_orion(sid: str, sensor: dict) -> bool:
|
|
"""Publie sur Orion-LD (POST create, PATCH update)."""
|
|
import socket
|
|
entity = _ngsi_payload(sid, sensor)
|
|
if not hasattr(publish_orion, "orion_ip"):
|
|
try:
|
|
publish_orion.orion_ip = socket.gethostbyname("fiware-gis-quickstart-orion-1")
|
|
except Exception:
|
|
publish_orion.orion_ip = "192.168.192.20"
|
|
base = f"http://{publish_orion.orion_ip}:1026/ngsi-ld/v1"
|
|
# 1. Essayer de créer (POST)
|
|
try:
|
|
body = json.dumps(entity).encode()
|
|
req = urllib.request.Request(f"{base}/entities", data=body,
|
|
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="POST")
|
|
with urllib.request.urlopen(req, timeout=8) as resp:
|
|
print(f" 🌐 Orion-LD: ✅ (HTTP {resp.status} created)")
|
|
return True
|
|
except urllib.error.HTTPError as e:
|
|
if e.code != 409:
|
|
print(f" ⚠️ Orion-LD → {e.code}: {e.read().decode()[:200]}")
|
|
return False
|
|
# 2. Déjà existant (409) → PATCH sur les attributs
|
|
try:
|
|
eid = urllib.parse.quote(entity['id'], safe='')
|
|
patch_url = f"{base}/entities/{eid}/attrs"
|
|
req2 = urllib.request.Request(patch_url, data=body,
|
|
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="PATCH")
|
|
with urllib.request.urlopen(req2, timeout=8) as resp2:
|
|
print(f" 🌐 Orion-LD: ✅ (HTTP {resp2.status} updated)")
|
|
return True
|
|
except Exception as e2:
|
|
print(f" ⚠️ Orion-LD → update failed: {e2}")
|
|
return False
|
|
except Exception as e:
|
|
print(f" ⚠️ Orion-LD → {e}")
|
|
return False
|
|
|
|
def publish_bunkerm(sid: str, sensor: dict, values: dict) -> bool:
|
|
"""Publie sur BunkerM via HTTP API (port 2000) avec session."""
|
|
import base64, http.cookiejar, urllib.request, json
|
|
from datetime import datetime, timezone
|
|
|
|
host = "bunkerm_bunkerm_1:2000"
|
|
login_url = f"http://{host}/login"
|
|
data_url = f"http://{host}/api/sensors/data"
|
|
|
|
# 1. Cookie jar pour maintenir la session
|
|
cj = http.cookiejar.CookieJar()
|
|
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
|
|
|
|
# 2. Authentification (Basic) pour obtenir le cookie de session
|
|
creds = base64.b64encode(b"bunker:bunker").decode()
|
|
auth_header = {"Authorization": f"Basic {creds}"}
|
|
|
|
try:
|
|
# GET sur /login pour initialiser la session
|
|
req_login = urllib.request.Request(login_url, headers=auth_header)
|
|
opener.open(req_login, timeout=5)
|
|
except Exception as e:
|
|
print(f" ⚠️ BunkerM login → {e}")
|
|
return False
|
|
|
|
# 3. Préparer le payload
|
|
payload = {
|
|
"sensor_id": sid,
|
|
"sensor_type": sensor["type"],
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
for k, v in values.items():
|
|
if k not in payload:
|
|
payload[k] = v
|
|
|
|
# 4. POST avec le cookie de session
|
|
data = json.dumps(payload).encode()
|
|
req = urllib.request.Request(
|
|
data_url,
|
|
data=data,
|
|
headers={**auth_header, "Content-Type": "application/json"},
|
|
method="POST"
|
|
)
|
|
try:
|
|
with opener.open(req, timeout=5) as resp:
|
|
print(f" ✅ BunkerM: HTTP {resp.status}")
|
|
return resp.status in (200, 201, 204)
|
|
except Exception as e:
|
|
print(f" ⚠️ BunkerM POST → {e}")
|
|
return False
|
|
|
|
def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool:
|
|
"""Crée le Thing (1 par capteur) + Datastreams, puis POST l'Observation."""
|
|
# Cache : {sid: (thing_id, {field: ds_id})}
|
|
if sid in _frost_cache:
|
|
thing_id, ds_map = _frost_cache[sid]
|
|
if field in ds_map:
|
|
ds_id = ds_map[field]
|
|
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
|
|
obs = {"resultTime": datetime.now(timezone.utc).isoformat(), "result": value}
|
|
if _http_post(obs_url, obs, FROST_HEADERS):
|
|
print(f" ✅ FROST Observation {sid}/{field} → OK (cached)")
|
|
return True
|
|
else:
|
|
print(f" ⚠️ FROST Observation {sid}/{field} → échec")
|
|
return False
|
|
|
|
# Premier appel pour ce capteur : créer Thing + tous les Datastreams
|
|
thing_payload, datastreams = _frost_payload(sid, sensor)
|
|
print(f" 📊 FROST: POST Thing {sid}...")
|
|
tid = _http_post(f"{FROST_URL}/Things", thing_payload, FROST_HEADERS)
|
|
if not tid:
|
|
print(f" ⚠️ FROST Thing {sid} → échec création")
|
|
return False
|
|
print(f" ✅ FROST Thing {sid} créé (ID: {tid})")
|
|
|
|
# Créer les Datastreams
|
|
ds_map = {}
|
|
for f, ds, _ in datastreams:
|
|
ds["Thing"] = {"@iot.id": tid}
|
|
print(f" 📊 FROST: POST Datastream {sid}/{f}...")
|
|
ds_id = _http_post(f"{FROST_URL}/Datastreams", ds, FROST_HEADERS)
|
|
if ds_id:
|
|
print(f" ✅ FROST Datastream {sid}/{f} créé (ID: {ds_id})")
|
|
ds_map[f] = ds_id
|
|
else:
|
|
print(f" ⚠️ FROST Datastream {sid}/{f} → échec")
|
|
|
|
_frost_cache[sid] = (tid, ds_map)
|
|
|
|
# Poster l'observation pour le field actuel
|
|
if field in ds_map:
|
|
ds_id = ds_map[field]
|
|
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
|
|
obs = {"resultTime": datetime.now(timezone.utc).isoformat(), "result": value}
|
|
if _http_post(obs_url, obs, FROST_HEADERS):
|
|
print(f" ✅ FROST Observation {sid}/{field} → OK")
|
|
return True
|
|
return False
|
|
|
|
# =============================================================================
|
|
# OpenRemote REST
|
|
# =============================================================================
|
|
_or_token_cache = {"token": "", "expires": 0}
|
|
|
|
def _get_or_token() -> str:
|
|
"""Obtient un token OpenRemote via client credentials (service account)."""
|
|
import time
|
|
if _or_token_cache["token"] and _or_token_cache["expires"] > time.time() + 60:
|
|
return _or_token_cache["token"]
|
|
try:
|
|
# Utiliser le client openremote avec client secret (service account)
|
|
data = urllib.parse.urlencode({
|
|
"grant_type": "client_credentials",
|
|
"client_id": os.environ.get('OR_CLIENT_ID', 'openremote'),
|
|
"client_secret": os.environ.get('OR_CLIENT_SECRET', ''),
|
|
}).encode()
|
|
req = urllib.request.Request(OR_TOKEN_URL, data=data)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
result = json.loads(resp.read())
|
|
_or_token_cache["token"] = result["access_token"]
|
|
_or_token_cache["expires"] = time.time() + result.get("expires_in", 300)
|
|
return result["access_token"]
|
|
except Exception as e:
|
|
print(f" ⚠️ OpenRemote token → {e}")
|
|
return ""
|
|
|
|
def _or_put(asset_id: str, payload: dict) -> bool:
|
|
"""PUT update sur un asset OpenRemote (avec version)."""
|
|
token = _get_or_token()
|
|
if not token:
|
|
return False
|
|
try:
|
|
body = json.dumps(payload).encode()
|
|
url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}"
|
|
req = urllib.request.Request(url, data=body,
|
|
headers={
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
"If-Match": str(payload.get("version", 1)),
|
|
},
|
|
method="PUT")
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
return resp.status in (200, 204)
|
|
except urllib.error.HTTPError as e:
|
|
print(f" ⚠️ OR PUT {asset_id} → HTTP {e.code}")
|
|
return False
|
|
except Exception as e:
|
|
print(f" ⚠️ OR PUT {asset_id} → {e}")
|
|
return False
|
|
|
|
def publish_openremote(sid: str, sensor: dict, values: dict) -> bool:
|
|
"""Met à jour les attributs d'un asset OpenRemote via REST."""
|
|
# Mapping sid → asset ID (créés manuellement dans OR)
|
|
ASSET_MAP = {
|
|
"traffic_000": "7b5c5670d1b84865ba3ac7", # Traffic Fort-de-France Centre
|
|
"traffic_001": "557f6e993b994d3cb81017", # Traffic Fort-de-France North
|
|
"traffic_002": "cb81dfd2d2dc4d25adc9c2", # Traffic Fort-de-France South
|
|
"airquality_000": "a51c982a2d3e451898b978", # Air Quality Fort-de-France
|
|
"parking_000": "b8f0df19e0af47b386ebb9", # Parking Fort-de-France Centre
|
|
"noise_000": "9035103d1866454fb7e451", # Noise Fort-de-France Centre
|
|
"weather_000": "b9de80905ac640f488ab27", # Weather Lamentin Airport
|
|
"light_000": "ee7823a41e594851ba202f", # Light Fort-de-France
|
|
}
|
|
asset_id = ASSET_MAP.get(sid)
|
|
if not asset_id:
|
|
return False
|
|
# Construire les attributs à jour
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
attrs = {
|
|
"timestamp": {"type": "DateTime", "value": now, "timestamp": now},
|
|
"battery_level": {"type": "Number", "value": random.randint(60, 100), "timestamp": now},
|
|
}
|
|
# Mapper les valeurs du payload vers les attributs OR
|
|
field_map = {
|
|
"temperature_celsius": "temperature",
|
|
"humidity_percent": "humidity",
|
|
"noise_level_db": "noiseLevel",
|
|
"pm25_ugm3": "airQuality",
|
|
"vehicle_count": "trafficFlow",
|
|
"available_spots": "parking",
|
|
"brightness_lux": "light",
|
|
"flood": "flood",
|
|
}
|
|
for field, val in values.items():
|
|
attr_name = field_map.get(field, field)
|
|
attrs[attr_name] = {
|
|
"type": "Number",
|
|
"value": val,
|
|
"timestamp": now,
|
|
"unit": "µg/m³" if "ugm3" in field else ("°C" if "celsius" in field else ("%" if "percent" in field or "humidity" in field else ("dB" if "db" in field else ""))),
|
|
}
|
|
payload = {
|
|
"id": asset_id,
|
|
"name": sensor["name"],
|
|
"type": "IOTSensor",
|
|
"realm": OR_REALM,
|
|
"attributes": attrs,
|
|
}
|
|
return _or_put(asset_id, payload)
|
|
# =============================================================================
|
|
def main():
|
|
print("╔══════════════════════════════════════════════════╗")
|
|
print("║ Smart City Simulator — Martinique ║")
|
|
print("╚══════════════════════════════════════════════════╝")
|
|
print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s")
|
|
print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}")
|
|
|
|
mqtt_client = MultiMQTT()
|
|
|
|
running = True
|
|
def signal_handler(*_):
|
|
nonlocal running
|
|
running = False
|
|
print("\n[SIM] 🛑 Arrêt...")
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
iteration = 0
|
|
while running:
|
|
iteration += 1
|
|
ts = datetime.now().strftime("%H:%M:%S")
|
|
print(f"\n[SIM] ⏱️ It #{iteration} — {ts}")
|
|
|
|
for sid, sensor in SENSORS.items():
|
|
stype = sensor["type"]
|
|
topic = f"city/sensors/{stype}/{sid}"
|
|
|
|
# --- Payload MQTT ---
|
|
ranges = SENSOR_RANGES.get(stype, {})
|
|
payload_mqtt = {
|
|
"id": sid,
|
|
"type": stype,
|
|
"name": sensor["name"],
|
|
"lat": sensor["lat"],
|
|
"lon": sensor["lon"],
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"battery_level": random.randint(60, 100),
|
|
}
|
|
for field, val_range in ranges.items():
|
|
if isinstance(val_range, tuple) and len(val_range) == 2:
|
|
lo, hi = val_range
|
|
if isinstance(lo, (int, float)):
|
|
payload_mqtt[field] = round(random.uniform(lo, hi), 1)
|
|
elif isinstance(val_range, list):
|
|
payload_mqtt[field] = random.choice(val_range)
|
|
|
|
msg = json.dumps(payload_mqtt, ensure_ascii=False)
|
|
|
|
# --- MQTT publish ---
|
|
results = mqtt_client.publish(topic, msg)
|
|
ok_mqtt = [n for n, r in results.items() if r]
|
|
if ok_mqtt:
|
|
print(f" 📤 {topic} → {','.join(ok_mqtt)}")
|
|
|
|
# Extraire les valeurs pour OpenRemote
|
|
or_values = {}
|
|
for field, val_range in ranges.items():
|
|
if isinstance(val_range, tuple) and len(val_range) == 2:
|
|
lo, hi = val_range
|
|
if isinstance(lo, (int, float)):
|
|
or_values[field] = round(random.uniform(lo, hi), 1)
|
|
|
|
# --- OpenRemote REST ---
|
|
if ENABLE_OPENREMOTE:
|
|
ok_or = publish_openremote(sid, sensor, or_values)
|
|
print(f" 🏠 OpenRemote: {'✅' if ok_or else '⚠️ skipped'}")
|
|
|
|
# --- Orion-LD ---
|
|
if ENABLE_ORION:
|
|
ok_or = publish_orion(sid, sensor)
|
|
print(f" 🌐 Orion-LD: {'✅' if ok_or else '⚠️ skipped'}")
|
|
|
|
# --- Stellio ---
|
|
if ENABLE_STELLIO:
|
|
ok_st = publish_stellio(sid, sensor)
|
|
print(f" 🏢 Stellio: {'✅' if ok_st else '❌'}")
|
|
|
|
# --- FROST ---
|
|
if ENABLE_FROST:
|
|
ranges2 = SENSOR_RANGES.get(stype, {})
|
|
for field, val_range in ranges2.items():
|
|
if isinstance(val_range, tuple) and len(val_range) == 2:
|
|
lo, hi = val_range
|
|
if isinstance(lo, (int, float)):
|
|
val = round(random.uniform(lo, hi), 1)
|
|
ok_fr = publish_frost(sid, sensor, field, val)
|
|
print(f" 📊 FROST: {'✅' if ok_fr else '❌'}")
|
|
|
|
# --- BunkerM HTTP ---
|
|
if os.getenv("BUNKERM_HTTP", "0") == "1":
|
|
ok_bunkerm = publish_bunkerm(sid, sensor, payload_mqtt)
|
|
print(f" 📦 BunkerM: {'✅' if ok_bunkerm else '❌'}")
|
|
|
|
print(f"[SIM] ✅ {len(SENSORS)} capteurs | MQTT OK: {sum(mqtt_client.ok.values())}/{len(mqtt_client.clients)} | OR: {ENABLE_OPENREMOTE}")
|
|
|
|
try:
|
|
time.sleep(INTERVAL)
|
|
except KeyboardInterrupt:
|
|
break
|
|
|
|
mqtt_client.stop()
|
|
print("[SIM] ✅ Arrêté proprement.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|