WIP: Orion Grafana blocked (readOnly) + move to OpenRemote MQTT
This commit is contained in:
BIN
__pycache__/simulator.cpython-313.pyc
Normal file
BIN
__pycache__/simulator.cpython-313.pyc
Normal file
Binary file not shown.
501
simulator.py.backup_20260504_141747
Normal file
501
simulator.py.backup_20260504_141747
Normal file
@@ -0,0 +1,501 @@
|
|||||||
|
1|#!/usr/bin/env python3
|
||||||
|
2|"""
|
||||||
|
3|Smart City IoT Simulator — Martinique (14.6°N, 61.2°W)
|
||||||
|
4|=======================================================
|
||||||
|
5|Publie vers MULTIPLES brokers MQTT + context brokers NGSI-LD.
|
||||||
|
6|
|
||||||
|
7|Brokers MQTT:
|
||||||
|
8| - EMQX: emqx_emqx_1:1883 (sans auth)
|
||||||
|
9| - Mosquitto: mosquitto-traefik:1883 (bunker/bunker)
|
||||||
|
10| - BunkerM: bunkerm_bunkerm_1:1900 (TLS, bunker/bunker)
|
||||||
|
11| - OpenRemote: openremote-manager-1:1883 (admin/Digitribe972)
|
||||||
|
12|
|
||||||
|
13|Context Brokers REST:
|
||||||
|
14| - Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD)
|
||||||
|
15| - Stellio: stellio-api-gateway:8080 (NGSI-LD)
|
||||||
|
16| - FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings)
|
||||||
|
17|
|
||||||
|
18|Variables d'environnement:
|
||||||
|
19| PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s)
|
||||||
|
20| BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France)
|
||||||
|
21| ENABLE_ORION=1 : activer Orion-LD (défaut: 1)
|
||||||
|
22| ENABLE_STELLIO=1 : activer Stellio (défaut: 1)
|
||||||
|
23| ENABLE_FROST=1 : activer FROST-Server (défaut: 1)
|
||||||
|
24|"""
|
||||||
|
25|
|
||||||
|
26|import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse
|
||||||
|
27|import paho.mqtt.client as mqtt
|
||||||
|
28|import urllib.request, urllib.error
|
||||||
|
29|from datetime import datetime, timezone
|
||||||
|
30|from typing import Any
|
||||||
|
31|import influxdb_client
|
||||||
|
32|from influxdb_client.client.write_api import SYNCHRONOUS
|
||||||
|
33|
|
||||||
|
34|# =============================================================================
|
||||||
|
35|# Configuration
|
||||||
|
36|# =============================================================================
|
||||||
|
37|BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091"))
|
||||||
|
38|BASE_LON = float(os.environ.get("BASE_LON", "-61.2155"))
|
||||||
|
39|INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "10"))
|
||||||
|
40|ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1"
|
||||||
|
41|ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1") == "1"
|
||||||
|
42|ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1"
|
||||||
|
43|ENABLE_OPENREMOTE = os.environ.get("ENABLE_OPENREMOTE", "1") == "1"
|
||||||
|
44|OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin")
|
||||||
|
45|OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "Digitribe972")
|
||||||
|
46|OR_REALM = os.environ.get("OR_REALM", "smartcity")
|
||||||
|
47|OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token
|
||||||
|
48|
|
||||||
|
49|# InfluxDB config
|
||||||
|
50|ENABLE_INFLUX = os.environ.get("ENABLE_INFLUX", "1") == "1"
|
||||||
|
51|INFLUX_URL = os.environ.get("INFLUX_URL", "http://digital-twin-influxdb:8086")
|
||||||
|
52|INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe")
|
||||||
|
53|INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "iot_data")
|
||||||
|
54|INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN",
|
||||||
|
55| "my-super-secret-admin-token")
|
||||||
|
56|
|
||||||
|
57|# Initialize InfluxDB client
|
||||||
|
58|_influx_client = None
|
||||||
|
59|_influx_write_api = None
|
||||||
|
60|if ENABLE_INFLUX:
|
||||||
|
61| try:
|
||||||
|
62| _influx_client = influxdb_client.InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
|
||||||
|
63| _influx_write_api = _influx_client.write_api(write_options=SYNCHRONOUS)
|
||||||
|
64| print(f"[INFLUX] ✅ Connected to {INFLUX_URL}")
|
||||||
|
65| except Exception as e:
|
||||||
|
66| print(f"[INFLUX] ❌ Connection failed: {e}")
|
||||||
|
FROST_URL = os.environ.get("FROST_URL", "http://frost_http-web-1:8080/FROST-Server/v1.1")
|
||||||
|
68|
|
||||||
|
69|SENSOR_COUNTS = {
|
||||||
|
70| "traffic": int(os.environ.get("SENSOR_COUNT_traffic", "3")),
|
||||||
|
71| "airquality": int(os.environ.get("SENSOR_COUNT_airquality", "2")),
|
||||||
|
72| "parking": int(os.environ.get("SENSOR_COUNT_parking", "2")),
|
||||||
|
73| "noise": int(os.environ.get("SENSOR_COUNT_noise", "1")),
|
||||||
|
74| "weather": int(os.environ.get("SENSOR_COUNT_weather", "1")),
|
||||||
|
75| "light": int(os.environ.get("SENSOR_COUNT_light", "1")),
|
||||||
|
76|}
|
||||||
|
77|# Si SENSOR_COUNT est défini, multiplier les counts de façon proportionnelle
|
||||||
|
78|_total_default = sum(SENSOR_COUNTS.values())
|
||||||
|
79|if "SENSOR_COUNT" in os.environ:
|
||||||
|
80| target = int(os.environ["SENSOR_COUNT"])
|
||||||
|
81| ratio = target / _total_default
|
||||||
|
82| for k in SENSOR_COUNTS:
|
||||||
|
83| SENSOR_COUNTS[k] = max(1, int(SENSOR_COUNTS[k] * ratio))
|
||||||
|
84|
|
||||||
|
85|# =============================================================================
|
||||||
|
86|# Localisation des capteurs Martinique
|
||||||
|
87|# =============================================================================
|
||||||
|
88|SENSOR_LOCATIONS: dict[str, list[dict]] = {}
|
||||||
|
89|SENSOR_NAMES: dict[str, list[str]] = {
|
||||||
|
90| "traffic": ["Carrefour Central", "Avenue des Caraïbes", "Boulevard Pasteur",
|
||||||
|
91| "Rue des Flamboyants", "Place de la République"],
|
||||||
|
92| "airquality": ["Quartier Bonde", "Port de Fort-de-France", "Château Denis",
|
||||||
|
93| "Lamentin Aéroport", "Schoelcher Village"],
|
||||||
|
94| "parking": ["Parking Rivière-Saleé", "Parking Cluny", "Parking Média",
|
||||||
|
95| "Parking Grand-Camp", "Parking Dillon"],
|
||||||
|
96| "noise": ["Rue des Arts", "Marché Central", "Université Fort-de-France",
|
||||||
|
97| "Stade de Dillon", "Place du Champs de Mars"],
|
||||||
|
98| "weather": ["Station Météo Lamentin", "Station Schoelcher",
|
||||||
|
99| "Station Ajoupa-Bouillon", "Station Le François", "Station Le Robert"],
|
||||||
|
100| "light": ["Eclairage Rue des Mouettes", "Candela Boulevard",
|
||||||
|
101| "Lumiere Rue des Acacias", "Feux Signalisation Centre", "Eclairage Port"],
|
||||||
|
102|}
|
||||||
|
103|
|
||||||
|
104|def _gen_locs(stype: str, count: int) -> list[dict]:
|
||||||
|
105| locs = []
|
||||||
|
106| for i in range(count):
|
||||||
|
107| lat = BASE_LAT + random.uniform(-0.05, 0.05)
|
||||||
|
108| lon = BASE_LON + random.uniform(-0.05, 0.05)
|
||||||
|
109| names = SENSOR_NAMES.get(stype, [stype])
|
||||||
|
110| locs.append({
|
||||||
|
111| "lat": round(lat, 6),
|
||||||
|
112| "lon": round(lon, 6),
|
||||||
|
113| "name": names[i % len(names)],
|
||||||
|
114| })
|
||||||
|
115| return locs
|
||||||
|
116|
|
||||||
|
117|for stype, count in SENSOR_COUNTS.items():
|
||||||
|
118| SENSOR_LOCATIONS[stype] = _gen_locs(stype, count)
|
||||||
|
119|
|
||||||
|
120|# Ranges par type
|
||||||
|
121|SENSOR_RANGES: dict[str, dict] = {
|
||||||
|
122| "traffic": {"vehicle_count":(10,150),"average_speed_kmh":(10,80),
|
||||||
|
123| "congestion_level":(0,5),"occupancy_percent":(0,100)},
|
||||||
|
124| "airquality": {"pm25_ugm3":(5,80),"pm10_ugm3":(10,150),"no2_ugm3":(5,60),
|
||||||
|
125| "o3_ugm3":(20,120),"co_mgm3":(0.1,5.0),
|
||||||
|
126| "temperature_celsius":(20,35),"humidity_percent":(40,95)},
|
||||||
|
127| "parking": {"total_spots":(50,500),"available_spots":(0,500),
|
||||||
|
128| "occupancy_percent":(0,100),"turnover_per_hour":(5,50)},
|
||||||
|
129| "noise": {"noise_level_db":(40,95),"peak_db":(60,110)},
|
||||||
|
130| "weather": {"temperature_celsius":(22,34),"humidity_percent":(50,95),
|
||||||
|
131| "wind_speed_kmh":(0,50),"pressure_hpa":(1005,1025),
|
||||||
|
132| "rain_mm":(0,20),"uv_index":(0,11)},
|
||||||
|
133| "light": {"brightness_lux":(0,100000),"power_consumption_w":(0,500)},
|
||||||
|
134|}
|
||||||
|
135|
|
||||||
|
136|NOISE_CATEGORIES = ["quiet","moderate","loud","very_loud"]
|
||||||
|
137|LIGHT_STATUSES = ["on","off","dimmed","auto"]
|
||||||
|
138|
|
||||||
|
139|# =============================================================================
|
||||||
|
140|# Capteurs déclarés
|
||||||
|
141|# =============================================================================
|
||||||
|
142|SENSORS: dict[str, dict] = {}
|
||||||
|
143|counter = 0
|
||||||
|
144|for stype, locs in SENSOR_LOCATIONS.items():
|
||||||
|
145| for loc in locs:
|
||||||
|
146| sid = f"{stype}_{counter:03d}"
|
||||||
|
147| SENSORS[sid] = {"type": stype, "lat": loc["lat"], "lon": loc["lon"], "name": loc["name"]}
|
||||||
|
148| counter += 1
|
||||||
|
149|
|
||||||
|
150|# =============================================================================
|
||||||
|
151|# Payload NGSI-LD pour Orion-LD / Stellio
|
||||||
|
152|# =============================================================================
|
||||||
|
153|# Contextes NGSI-LD : core + Smart Data Models
|
||||||
|
154|# https://smartdatamodels.org pour les @context officiels
|
||||||
|
155|# Contexte NGSI-LD pur pour Orion-LD (vocabulaires standards uniquement)
|
||||||
|
156|# Orion-LD ne peut pas résoudre raw.githubusercontent.com — utiliser uri.etsi.org uniquement
|
||||||
|
157|ORION_CONTEXT = [
|
||||||
|
158| "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
|
||||||
|
159|]
|
||||||
|
160|
|
||||||
|
161|# Mapping sensor type → Smart Data Model type NGSI-LD
|
||||||
|
162|SMART_MODEL_MAPPING = {
|
||||||
|
163| "airquality": "AirQualityObserved",
|
||||||
|
164| "traffic": "TrafficFlowObserved",
|
||||||
|
165| "parking": "OffStreetParking",
|
||||||
|
166| "noise": "NoiseLevelObserved",
|
||||||
|
167| "weather": "WeatherObserved",
|
||||||
|
168| "light": "Device",
|
||||||
|
169|}
|
||||||
|
170|FROST_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"}
|
||||||
|
171|
|
||||||
|
172|# Cache FROST : éviter de recréer Thing/Datastream
|
||||||
|
173|_frost_cache: dict[str, tuple[str, str]] = {} # (sid, field) -> (thing_id, ds_id)
|
||||||
|
174|
|
||||||
|
175|# Contexte NGSI-LD pur pour Stellio et Orion-LD (vocabulaires standards uniquement)
|
||||||
|
176|# Stellio et Orion-LD embarquent le contexte core NGSI-LD : https://uri.etsi.org/ngsi-ld/
|
||||||
|
177|# On n'utilise PAS les vocabulaires smartdatamodels.org distants (inaccessibles depuis les containers)
|
||||||
|
178|# Les types d'entité Smart Data Models (AirQualityObserved, etc.) sont reconnus par leur nom
|
||||||
|
179|# Les propriétés spécifiques sont stockées telles quelles (vocabulaire libre)
|
||||||
|
180|STELLIO_INLINE_CONTEXT = [
|
||||||
|
181| "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
|
||||||
|
182|]
|
||||||
|
183|
|
||||||
|
184|def _ngsi_payload(sid: str, sensor: dict, context: list | dict = ORION_CONTEXT) -> dict:
|
||||||
|
185| """Construit un payload NGSI-LD avec Smart Data Models officiels."""
|
||||||
|
186| stype = sensor["type"]
|
||||||
|
187| model_type = SMART_MODEL_MAPPING.get(stype, "Device")
|
||||||
|
188| now = datetime.now(timezone.utc).isoformat()
|
||||||
|
189|
|
||||||
|
190| # Attributs communs à tous les modèles
|
||||||
|
191| payload = {
|
||||||
|
192| "@context": context,
|
||||||
|
193| "id": f"urn:ngsi-ld:{model_type}:{sid}",
|
||||||
|
194| "type": model_type,
|
||||||
|
195| "dateObserved": {"type": "Property", "value": now},
|
||||||
|
196| "location": {"type": "GeoProperty",
|
||||||
|
197| "value": {"type": "Point",
|
||||||
|
198| "coordinates": [sensor["lon"], sensor["lat"]]}},
|
||||||
|
199| "name": {"type": "Property", "value": sensor["name"]},
|
||||||
|
200| "batteryLevel": {"type": "Property", "value": random.randint(60, 100)},
|
||||||
|
201| }
|
||||||
|
202|
|
||||||
|
203| # Attributs spécifiques par type de modèle
|
||||||
|
204| ranges = SENSOR_RANGES.get(stype, {})
|
||||||
|
205| props = {}
|
||||||
|
206| for field, val_range in ranges.items():
|
||||||
|
207| if isinstance(val_range, tuple) and len(val_range) == 2:
|
||||||
|
208| lo, hi = val_range
|
||||||
|
209| if isinstance(lo, (int, float)):
|
||||||
|
210| props[field] = {"type": "Property", "value": round(random.uniform(lo, hi), 1)}
|
||||||
|
211| elif isinstance(val_range, list):
|
||||||
|
212| props[field] = {"type": "Property", "value": random.choice(val_range)}
|
||||||
|
213|
|
||||||
|
214| # Mapping vers les noms d'attributs Smart Data Models
|
||||||
|
215| if stype == "airquality":
|
||||||
|
216| if "pm25_ugm3" in props: payload["NO2"] = props.pop("pm25_ugm3") # Simplifié
|
||||||
|
217| if "pm10_ugm3" in props: payload["PM10"] = props.pop("pm10_ugm3")
|
||||||
|
218| if "no2_ugm3" in props: payload["NO2"] = props.pop("no2_ugm3")
|
||||||
|
219| if "o3_ugm3" in props: payload["O3"] = props.pop("o3_ugm3")
|
||||||
|
220| if "co_mgm3" in props: payload["CO"] = props.pop("co_mgm3")
|
||||||
|
221| if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius")
|
||||||
|
222| if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent")
|
||||||
|
223|
|
||||||
|
224| elif stype == "traffic":
|
||||||
|
225| if "vehicle_count" in props: payload["vehicleCount"] = props.pop("vehicle_count")
|
||||||
|
226| if "average_speed_kmh" in props: payload["averageVehicleSpeed"] = props.pop("average_speed_kmh")
|
||||||
|
227| if "congestion_level" in props: payload["congestion"] = props.pop("congestion_level")
|
||||||
|
228| if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent")
|
||||||
|
229|
|
||||||
|
230| elif stype == "parking":
|
||||||
|
231| if "available_spots" in props: payload["availableSpotNumber"] = props.pop("available_spots")
|
||||||
|
232| if "total_spots" in props: payload["totalSpotNumber"] = props.pop("total_spots")
|
||||||
|
233| if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent")
|
||||||
|
234| if "turnover_per_hour" in props: payload["turnover"] = props.pop("turnover_per_hour")
|
||||||
|
235|
|
||||||
|
236| elif stype == "noise":
|
||||||
|
237| if "noise_level_db" in props: payload["noiseLevel"] = props.pop("noise_level_db")
|
||||||
|
238| if "peak_db" in props: payload["noisePeak"] = props.pop("peak_db")
|
||||||
|
239| payload["noiseCategory"] = {"type": "Property", "value": random.choice(NOISE_CATEGORIES)}
|
||||||
|
240|
|
||||||
|
241| elif stype == "weather":
|
||||||
|
242| if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius")
|
||||||
|
243| if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent")
|
||||||
|
244| if "rain_mm" in props: payload["rainfall"] = props.pop("rain_mm")
|
||||||
|
245| if "uv_index" in props: payload["uvIndex"] = props.pop("uv_index")
|
||||||
|
246| if "wind_speed_kmh" in props: payload["windSpeed"] = props.pop("wind_speed_kmh")
|
||||||
|
247|
|
||||||
|
248| elif stype == "light":
|
||||||
|
249| if "brightness_lux" in props: payload["illuminance"] = props.pop("brightness_lux")
|
||||||
|
250| if "power_consumption_w" in props: payload["power"] = props.pop("power_consumption_w")
|
||||||
|
251| payload["status"] = {"type": "Property", "value": random.choice(LIGHT_STATUSES)}
|
||||||
|
252|
|
||||||
|
253| return payload
|
||||||
|
254|
|
||||||
|
255|def _frost_payload(sid: str, sensor: dict) -> dict:
|
||||||
|
256| """Construit un payload SensorThings pour FROST-Server."""
|
||||||
|
257| stype = sensor["type"]
|
||||||
|
258| ranges = SENSOR_RANGES.get(stype, {})
|
||||||
|
259| datastreams = []
|
||||||
|
260|
|
||||||
|
261| for field, val_range in ranges.items():
|
||||||
|
262| if isinstance(val_range, tuple) and len(val_range) == 2:
|
||||||
|
263| lo, hi = val_range
|
||||||
|
264| if isinstance(lo, (int, float)) and isinstance(hi, (int, float)):
|
||||||
|
265| val = round(random.uniform(lo, hi), 1)
|
||||||
|
266| unit = "http://www.qudt.org/vocab/unit#DegreeCelsius"
|
||||||
|
267| obs_prop = {
|
||||||
|
268| "name": f"{field} Observation",
|
||||||
|
269| "description": f"Observation of {field}",
|
||||||
|
270| "definition": unit,
|
||||||
|
271| }
|
||||||
|
272| sensor_data = {
|
||||||
|
273| "name": f"Sensor {sid} {field}",
|
||||||
|
274| "description": f"Sensor {sid} measuring {field}",
|
||||||
|
275| "encodingType": "http://www.opengis.net/doc/IS/SensorML/2.0",
|
||||||
|
276| "metadata": {"unit": unit},
|
||||||
|
277| }
|
||||||
|
278| ds = {
|
||||||
|
279| "name": f"Datastream {stype}/{field}",
|
||||||
|
280| "description": f"Datastream for {stype} sensor {sid} - {field}",
|
||||||
|
281| "observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
|
||||||
|
282| "unitOfMeasurement": {"name": field, "symbol": "", "definition": unit},
|
||||||
|
283| "Sensor": sensor_data,
|
||||||
|
284| "ObservedProperty": obs_prop,
|
||||||
|
285| }
|
||||||
|
286| datastreams.append((field, ds, val))
|
||||||
|
287|
|
||||||
|
288| thing_payload = {
|
||||||
|
289| "name": f"Thing_{sid}",
|
||||||
|
290| "description": f"Smart City {stype} sensor in Martinique",
|
||||||
|
291| "properties": {"sensorType": stype, "region": "Martinique"},
|
||||||
|
292| }
|
||||||
|
293| return thing_payload, datastreams
|
||||||
|
294|
|
||||||
|
295|# =============================================================================
|
||||||
|
296|# HTTP helper
|
||||||
|
297|# =============================================================================
|
||||||
|
298|def _http_post(url: str, data: dict, headers: dict) -> str:
|
||||||
|
299| """POST et retourne 'ok' ou 'created' (ou '' si échec)."""
|
||||||
|
300| try:
|
||||||
|
301| body = json.dumps(data).encode()
|
||||||
|
302| req = urllib.request.Request(url, data=body, headers=headers, method="POST")
|
||||||
|
303| with urllib.request.urlopen(req, timeout=8) as resp:
|
||||||
|
304| if resp.status == 204:
|
||||||
|
305| return 'created' # No Content — succès
|
||||||
|
306| if resp.status not in (200, 201):
|
||||||
|
307| return ''
|
||||||
|
308| # Lire le corps pour extraire l'ID (FROST)
|
||||||
|
309| try:
|
||||||
|
310| result = json.loads(resp.read())
|
||||||
|
311| if '@iot.selfLink' in result:
|
||||||
|
312| link = result['@iot.selfLink']
|
||||||
|
313| return link.split('(')[1].rstrip(')')
|
||||||
|
314| if '@iot.id' in result:
|
||||||
|
315| return str(result['@iot.id'])
|
||||||
|
316| except Exception:
|
||||||
|
317| pass
|
||||||
|
318| location = resp.headers.get('Location', '')
|
||||||
|
319| if location:
|
||||||
|
320| return location.split('(')[1].rstrip(')') if '(' in location else ''
|
||||||
|
321| return 'created'
|
||||||
|
322| except urllib.error.HTTPError as e:
|
||||||
|
323| # Lire le corps de l'erreur pour debug
|
||||||
|
324| try:
|
||||||
|
325| err_body = e.read().decode()[:200]
|
||||||
|
326| except Exception:
|
||||||
|
327| err_body = str(e)
|
||||||
|
328| print(f" ⚠️ HTTP POST {url} → {e.code}: {err_body}")
|
||||||
|
329| return ''
|
||||||
|
330| except Exception as e:
|
||||||
|
331| print(f" ⚠️ HTTP POST {url} → {e}")
|
||||||
|
332| return ''
|
||||||
|
333|
|
||||||
|
334|def _http_put(url: str, data: dict, headers: dict) -> bool:
|
||||||
|
335| try:
|
||||||
|
336| body = json.dumps(data).encode()
|
||||||
|
337| req = urllib.request.Request(url, data=body, headers=headers, method="PUT")
|
||||||
|
338| with urllib.request.urlopen(req, timeout=5) as resp:
|
||||||
|
339| return resp.status in (200, 204)
|
||||||
|
340| except urllib.error.HTTPError as e:
|
||||||
|
341| if e.code == 409:
|
||||||
|
342| return True # Already exists - that's fine
|
||||||
|
343| print(f" ⚠️ HTTP PUT {url} → {e}")
|
||||||
|
344| return False
|
||||||
|
345| except Exception as e:
|
||||||
|
346| print(f" ⚠️ HTTP PUT {url} → {e}")
|
||||||
|
347| return False
|
||||||
|
348|
|
||||||
|
349|# =============================================================================
|
||||||
|
350|# MQTT Client multi-broker
|
||||||
|
351|# =============================================================================
|
||||||
|
352|class MultiMQTT:
|
||||||
|
353| def __init__(self):
|
||||||
|
354| self.clients: dict[str, mqtt.Client] = {}
|
||||||
|
355| self.ok: dict[str, bool] = {}
|
||||||
|
356| self._lock = threading.Lock()
|
||||||
|
357| self._setup()
|
||||||
|
358|
|
||||||
|
359| def _mk_client(self, name: str, host: str, port: int,
|
||||||
|
360| tls: bool = False, user: str = "", pwd: str = "",
|
||||||
|
361| ws: bool = False) -> mqtt.Client:
|
||||||
|
362| cid = f"smartcity-sim-{name}-{os.getpid()}"
|
||||||
|
363| c = mqtt.Client(client_id=cid, protocol=mqtt.MQTTv311)
|
||||||
|
364| if user:
|
||||||
|
365| c.username_pw_set(user, pwd)
|
||||||
|
366| if tls:
|
||||||
|
367| c.tls_set(cert_reqs=ssl.CERT_NONE)
|
||||||
|
368| c.tls_insecure_set(True)
|
||||||
|
369| if ws:
|
||||||
|
370| c.ws_set(b"/mqtt")
|
||||||
|
371| c.on_connect = lambda _c, _, __, rc: self._on_connect(name, rc)
|
||||||
|
372| c.on_disconnect = lambda _c, _, __: self._on_disconnect(name)
|
||||||
|
373| try:
|
||||||
|
374| c.connect(host, port, keepalive=30)
|
||||||
|
375| c.loop_start()
|
||||||
|
376| except Exception as e:
|
||||||
|
377| print(f"[MQTT] ❌ {name} @ {host}:{port} → {e}")
|
||||||
|
378| self.ok[name] = False
|
||||||
|
379| return c
|
||||||
|
380|
|
||||||
|
381| def _on_connect(self, name: str, rc: int):
|
||||||
|
382| with self._lock:
|
||||||
|
383| if rc == 0:
|
||||||
|
384| self.ok[name] = True
|
||||||
|
385| print(f"[MQTT] ✅ {name} connecté")
|
||||||
|
386| else:
|
||||||
|
387| self.ok[name] = False
|
||||||
|
388| print(f"[MQTT] ❌ {name} rc={rc}")
|
||||||
|
389|
|
||||||
|
390| def _on_disconnect(self, name: str):
|
||||||
|
391| with self._lock:
|
||||||
|
392| self.ok[name] = False
|
||||||
|
393| print(f"[MQTT] ⚠️ {name} déconnecté")
|
||||||
|
394|
|
||||||
|
395| def _setup(self):
|
||||||
|
396| # Garder que EMQX et Mosquitto (MQTT fonctionnels)
|
||||||
|
397| # BunkerM via HTTP API (port 2000) au lieu de MQTT/TLS
|
||||||
|
398| brokers = [
|
||||||
|
399| ("EMQX", "emqx_emqx_1", 1883, False, "", ""),
|
||||||
|
400| ("Mosquitto", "mosquitto-traefik", 1883, False, "bunker", "bunker"),
|
||||||
|
401| ]
|
||||||
|
402| print("[MQTT] 🔌 Connexion aux brokers...")
|
||||||
|
403| for name, host, port, tls, user, pwd in brokers:
|
||||||
|
404| c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd)
|
||||||
|
405| self.clients[name] = c
|
||||||
|
406| self.ok[name] = False
|
||||||
|
407| time.sleep(3) # Attend les connexions
|
||||||
|
408|
|
||||||
|
409| def publish(self, topic: str, payload: str) -> dict[str, bool]:
|
||||||
|
410| results = {}
|
||||||
|
411| with self._lock:
|
||||||
|
412| for name, client in self.clients.items():
|
||||||
|
413| if self.ok.get(name, False):
|
||||||
|
414| try:
|
||||||
|
415| r = client.publish(topic, payload, qos=1)
|
||||||
|
416| results[name] = (r.rc == mqtt.MQTT_ERR_SUCCESS)
|
||||||
|
417| except Exception:
|
||||||
|
418| results[name] = False
|
||||||
|
419| else:
|
||||||
|
420| results[name] = False
|
||||||
|
421| return results
|
||||||
|
422|
|
||||||
|
423| def stop(self):
|
||||||
|
424| for name, c in self.clients.items():
|
||||||
|
425| try:
|
||||||
|
426| c.loop_stop()
|
||||||
|
427| c.disconnect()
|
||||||
|
428| except Exception:
|
||||||
|
429| pass
|
||||||
|
430|
|
||||||
|
431|# =============================================================================
|
||||||
|
432|# URLs de base (résolues au démarrage)
|
||||||
|
433|# =============================================================================
|
||||||
|
434|ORION_HOST = "fiware-gis-quickstart-orion-1"
|
||||||
|
435|ORION_IP = ""
|
||||||
|
436|try:
|
||||||
|
437| import socket
|
||||||
|
438| ORION_IP = socket.gethostbyname(ORION_HOST)
|
||||||
|
439|except:
|
||||||
|
440| pass
|
||||||
|
441|ORION_URL = f"http://{ORION_IP or ORION_HOST}:1026" if ORION_IP else "http://fiware-gis-quickstart-orion-1:1026"
|
||||||
|
442|STELLIO_URL = os.environ.get("STELLIO_URL", "http://stellio-api-gateway:8080")
|
||||||
|
443|# Configuration OpenRemote (URLs dynamiques)
|
||||||
|
444|OR_URL = os.environ.get("OR_URL", "http://openremote-manager-1:8080") # Hostname Docker interne
|
||||||
|
445|OR_REALM = os.environ.get("OR_REALM", "smartcity") # Default: smartcity
|
||||||
|
446|OR_TOKEN_URL = os.environ.get("OR_TOKEN_URL", f"http://openremote-keycloak-1:8080/auth/realms/{OR_TOKEN_REALM}/protocol/openid-connect/token")
|
||||||
|
447|OR_TOKEN_TTL = int(os.environ.get("OR_TOKEN_TTL", "3600")) # Refresh token every hour
|
||||||
|
448|STELLIO_TENANT = os.environ.get("STELLIO_TENANT", "urn:ngsi-ld:tenant:default")
|
||||||
|
449|
|
||||||
|
450|def publish_stellio(sid: str, sensor: dict) -> bool:
|
||||||
|
451| """Publie sur Stellio via Traefik (gère le 409)."""
|
||||||
|
452| entity = _ngsi_payload(sid, sensor, context=STELLIO_INLINE_CONTEXT)
|
||||||
|
453| # Stellio a besoin du @context pour résoudre les vocabulaires NGSI-LD
|
||||||
|
454| # (uri.etsi.org résolu depuis le JAR embarqué)
|
||||||
|
455| url = f"{STELLIO_URL}/ngsi-ld/v1/entities"
|
||||||
|
456| headers = {
|
||||||
|
457| "Content-Type": "application/ld+json",
|
||||||
|
458| "Accept": "application/ld+json",
|
||||||
|
459| "NGSILD-Tenant": STELLIO_TENANT,
|
||||||
|
460| }
|
||||||
|
461| try:
|
||||||
|
462| body = json.dumps(entity).encode()
|
||||||
|
463| req = urllib.request.Request(url, data=body, headers=headers, method="POST")
|
||||||
|
464| with urllib.request.urlopen(req, timeout=8) as resp:
|
||||||
|
465| print(f" 🏢 Stellio: ✅ (HTTP {resp.status})")
|
||||||
|
466| return True
|
||||||
|
467| except urllib.error.HTTPError as e:
|
||||||
|
468| if e.code == 409: # Already exists, do update with PUT
|
||||||
|
469| try:
|
||||||
|
470| entity_id = urllib.parse.quote(entity["id"], safe="")
|
||||||
|
471| update_url = f"{STELLIO_URL}/ngsi-ld/v1/entities/{entity_id}"
|
||||||
|
472| req2 = urllib.request.Request(update_url, data=body, headers=headers, method="PUT")
|
||||||
|
473| with urllib.request.urlopen(req2, timeout=8) as resp2:
|
||||||
|
474| print(f" 🏢 Stellio: ✅ (HTTP {resp2.status} updated)")
|
||||||
|
475| return True
|
||||||
|
476| except Exception as e2:
|
||||||
|
477| print(f" ⚠️ Stellio update failed: {e2}")
|
||||||
|
478| return False
|
||||||
|
479| try:
|
||||||
|
480| err = e.read().decode()[:300]
|
||||||
|
481| except Exception:
|
||||||
|
482| err = str(e)
|
||||||
|
483| print(f" ⚠️ Stellio → {e.code}: {err}")
|
||||||
|
484| return False
|
||||||
|
485| except Exception as e:
|
||||||
|
486| print(f" ⚠️ Stellio → {e}")
|
||||||
|
487| return False
|
||||||
|
488|
|
||||||
|
489|def publish_orion(sid: str, sensor: dict) -> bool:
|
||||||
|
490| """Publie sur Orion-LD (POST create, PATCH update)."""
|
||||||
|
491| import socket
|
||||||
|
492| entity = _ngsi_payload(sid, sensor)
|
||||||
|
493| if not hasattr(publish_orion, "orion_ip"):
|
||||||
|
494| try:
|
||||||
|
495| publish_orion.orion_ip = socket.gethostbyname("fiware-gis-quickstart-orion-1")
|
||||||
|
496| except Exception:
|
||||||
|
497| publish_orion.orion_ip = "192.168.192.20"
|
||||||
|
498| base = f"http://{publish_orion.orion_ip}:1026/ngsi-ld/v1"
|
||||||
|
499| # 1. Essayer de créer (POST)
|
||||||
|
500| try:
|
||||||
|
501|
|
||||||
Reference in New Issue
Block a user