fix: Stabilisation complète Smart City Digital Twin Martinique
- Correction simulateur: nettoyage code FIWARE (erreurs syntaxe) - Grafana: dashboard complet 10 panneaux sur grafana.digitribe.fr - InfluxDB: datasource corrigée (bucket smartcity, org digitribe) - Nettoyage: suppression services FIWARE (Orion-LD, Stellio, QuantumLeap) - Pipeline validé: Simulator → 3 MQTT brokers → Telegraf → InfluxDB → Grafana - Dashboard URL: https://grafana.digitribe.fr/d/smartcity-martinique-complete/ Architecture simplifiée: - 3 MQTT brokers (EMQX, Mosquitto, BunkerM) - Telegraf pour agrégation - InfluxDB pour stockage time-series - Grafana pour visualisation (Traefik: grafana.digitribe.fr)
This commit is contained in:
448
simulator.py
448
simulator.py
@@ -11,8 +11,8 @@ Brokers MQTT:
|
||||
- OpenRemote: openremote-manager-1:1883 (admin/Digitribe972)
|
||||
|
||||
Context Brokers REST:
|
||||
- Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD)
|
||||
- Stellio: stellio-api-gateway:8080 (NGSI-LD)
|
||||
# - Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD)
|
||||
# - Stellio: stellio-api-gateway:8080 (NGSI-LD)
|
||||
- FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings)
|
||||
|
||||
Streaming Platforms:
|
||||
@@ -25,8 +25,8 @@ Time-Series DB:
|
||||
Variables d'environnement:
|
||||
PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s)
|
||||
BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France)
|
||||
ENABLE_ORION=1 : activer Orion-LD (défaut: 1)
|
||||
ENABLE_STELLIO=1 : activer Stellio (défaut: 1)
|
||||
# ENABLE_ORION=1 : activer Orion-LD (défaut: 1)
|
||||
# ENABLE_STELLIO=1 : activer Stellio (défaut: 1)
|
||||
ENABLE_FROST=1 : activer FROST-Server (défaut: 1)
|
||||
ENABLE_INFLUX=1 : activer InfluxDB v2 (défaut: 1)
|
||||
ENABLE_PULSAR=1 : activer Apache Pulsar (défaut: 1)
|
||||
@@ -50,12 +50,12 @@ from influxdb_client.client.write_api import SYNCHRONOUS
|
||||
# =============================================================================
|
||||
# Configuration des brokers MQTT
|
||||
# Configuration des brokers MQTT
|
||||
# Par défaut localhost (simulateur tourne sur l'hôte)
|
||||
EMQX_HOST = os.environ.get("EMQX_HOST", "localhost")
|
||||
EMQX_PORT = int(os.environ.get("EMQX_PORT", "11883"))
|
||||
MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "localhost")
|
||||
# Utilise les noms de services Docker par défaut
|
||||
EMQX_HOST = os.environ.get("EMQX_HOST", "emqx_emqx_1")
|
||||
EMQX_PORT = int(os.environ.get("EMQX_PORT", "1883"))
|
||||
MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "smart-city-mosquitto")
|
||||
MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883"))
|
||||
BUNKERM_HOST = os.environ.get("BUNKERM_HOST", "mqtt.digitribe.fr")
|
||||
BUNKERM_HOST = os.environ.get("BUNKERM_HOST", "bunkerm_bunkerm_1")
|
||||
BUNKERM_PORT = int(os.environ.get("BUNKERM_PORT", "1900"))
|
||||
|
||||
# =============================================================================
|
||||
@@ -64,15 +64,15 @@ BUNKERM_PORT = int(os.environ.get("BUNKERM_PORT", "1900"))
|
||||
BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091"))
|
||||
BASE_LON = float(os.environ.get("BASE_LON", "-61.2155"))
|
||||
INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "1")) # 1s pour temps réel
|
||||
ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1"
|
||||
ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1").lower() in ("1", "true", "yes", "on")
|
||||
#ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1"
|
||||
#ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1").lower() in ("1", "true", "yes", "on")
|
||||
ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1"
|
||||
ENABLE_OPENREMOTE = os.environ.get("ENABLE_OPENREMOTE", "1") == "1"
|
||||
OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin")
|
||||
OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "Digitribe972")
|
||||
OR_REALM = os.environ.get("OR_REALM", "smartcity")
|
||||
OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token
|
||||
ENABLE_IOT_AGENT = os.environ.get("ENABLE_IOT_AGENT", "1") == "1"
|
||||
#ENABLE_IOT_AGENT = os.environ.get("ENABLE_IOT_AGENT", "1") == "1"
|
||||
FROST_URL = os.environ.get("FROST_URL", "http://localhost:8090/FROST-Server/v1.1") # Exposer frost_http-web-1:8080 -> host:8086
|
||||
|
||||
# Pulsar config (HTTP REST — pulsar-admin + producer REST API)
|
||||
@@ -91,8 +91,8 @@ REDPANDA_BASE = f"http://{REDPANDA_HOST}:{REDPANDA_PORT}"
|
||||
ENABLE_INFLUX = os.environ.get("ENABLE_INFLUX", "1").lower() in ("1", "true", "yes", "on")
|
||||
INFLUX_URL = os.environ.get("INFLUX_URL", "http://smart-city-influxdb:8086") # InfluxDB v2 sur smartcity-shared
|
||||
INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe")
|
||||
INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "iot_data")
|
||||
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-token")
|
||||
INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "smartcity") # Correspond au bucket de Telegraf
|
||||
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-token")
|
||||
|
||||
# Prometheus metrics HTTP server
|
||||
METRICS_PORT = int(os.environ.get("METRICS_PORT", "8001"))
|
||||
@@ -109,7 +109,7 @@ simulator_info.info({
|
||||
"version": "1.0.0",
|
||||
"python_version": sys.version.split()[0],
|
||||
"mqtt_brokers": "emqx,mosquitto,bunkerm",
|
||||
"context_brokers": "orion_ld,stellio,frost",
|
||||
# "context_brokers": "orion_ld,stellio,frost",
|
||||
})
|
||||
|
||||
# --- Counters ---
|
||||
@@ -198,15 +198,15 @@ _influx_write_api = None
|
||||
if ENABLE_INFLUX:
|
||||
try:
|
||||
_influx_client = influxdb_client.InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
|
||||
_influx_write_api = _influx_client.write_api(write_options=SYNCHRONOUS)
|
||||
print(f"[INFLUX] ✅ Connected to {INFLUX_URL}")
|
||||
_influx_write_api = _influx_client.write_api(write_options=influxdb_client.client.write_api.ASYNCHRONOUS)
|
||||
print(f"[INFLUX] ✅ Connected to {INFLUX_URL} (async mode)")
|
||||
except Exception as e:
|
||||
print(f"[INFLUX] ❌ Connection failed: {e}")
|
||||
|
||||
SENSOR_COUNTS = {
|
||||
"traffic": int(os.environ.get("SENSOR_COUNT_traffic", "3")),
|
||||
"airquality": int(os.environ.get("SENSOR_COUNT_airquality", "2")),
|
||||
"parking": int(os.environ.get("SENSOR_COUNT_parking", "2")),
|
||||
"airquality": int(os.environ.get("SENSOR_COUNT_airquality", "10")),
|
||||
"parking": int(os.environ.get("SENSOR_COUNT_parking", "10")),
|
||||
"noise": int(os.environ.get("SENSOR_COUNT_noise", "1")),
|
||||
"weather": int(os.environ.get("SENSOR_COUNT_weather", "1")),
|
||||
"light": int(os.environ.get("SENSOR_COUNT_light", "1")),
|
||||
@@ -231,98 +231,79 @@ if "SENSOR_COUNT" in os.environ:
|
||||
# Martinique bounds: lat 14.37–14.88°N, lon 61.0–61.25°W
|
||||
FIXED_LOCATIONS: dict[str, dict[str, tuple[float, float]]] = {
|
||||
"traffic": {
|
||||
# OpenRemote: "Traffic Fort-de-France Centre"
|
||||
"FdF Centre": (14.6036, -61.1783),
|
||||
# OpenRemote: "Traffic Fort-de-France North"
|
||||
"FdF North": (14.6200, -61.1700),
|
||||
# OpenRemote: "Traffic Fort-de-France South"
|
||||
"FdF South": (14.5900, -61.1900),
|
||||
# OpenRemote: "trafficFlow - Fort-de-France"
|
||||
"FdF Centre Rue": (14.6036, -61.1783),
|
||||
# OpenRemote: "Test Sensor"
|
||||
"FdF Place": (14.6000, -61.2000),
|
||||
"Fort-de-France Centre": (14.6164, -61.07),
|
||||
"Le Lamentin Aéroport": (14.6167, -61.0035),
|
||||
"Le Robert D110": (14.6833, -60.9333),
|
||||
"Sainte-Anne Plage": (14.4333, -60.9833),
|
||||
"Saint-Joseph D1": (14.7, -61.05),
|
||||
"Trinité Centre": (14.7167, -60.9167),
|
||||
"Le François D2": (14.6833, -60.8333),
|
||||
"Ducos Penitencier": (14.5833, -61.0667),
|
||||
"Schœlcher Morne": (14.65, -61.1),
|
||||
"Case-Pilote Bourg": (14.5167, -61.1167),
|
||||
},
|
||||
"airquality": {
|
||||
# OpenRemote: "Air Quality Fort-de-France"
|
||||
"FdF Centre": (14.6036, -61.1783),
|
||||
# OpenRemote: "airQuality - Fort-de-France"
|
||||
"FdF Bonde": (14.6050, -61.1750),
|
||||
# OpenRemote: "airQuality - Sainte-Luce"
|
||||
"Sainte-Luce": (14.5950, -61.1700),
|
||||
# OpenRemote: "floodLevel - Schoelcher"
|
||||
"Schoelcher": (14.7400, -61.1850),
|
||||
# OpenRemote: "humidity - Le Robert"
|
||||
"Le Robert": (14.6800, -60.9400),
|
||||
"Fort-de-France Lamartine": (14.613, -61.0667),
|
||||
"Le Lamentin Zac": (14.62, -61.0),
|
||||
"Le Robert Bourg": (14.68, -60.93),
|
||||
"Sainte-Anne Village": (14.43, -60.98),
|
||||
"Saint-Joseph Morne": (14.705, -61.04),
|
||||
"Trinité Eglise": (14.72, -60.91),
|
||||
"Le François Bourg": (14.68, -60.83),
|
||||
"Ducos Centre": (14.58, -61.06),
|
||||
"Schœlcher Plage": (14.655, -61.11),
|
||||
"Case-Pilote D1": (14.52, -61.12),
|
||||
},
|
||||
"parking": {
|
||||
# OpenRemote: "Parking Fort-de-France Centre"
|
||||
"FdF Centre": (14.6036, -61.1783),
|
||||
# OpenRemote: "parkingAvailability - Fort-de-France"
|
||||
"FdF Bonde": (14.6050, -61.1750),
|
||||
# OpenRemote: "Test Sensor"
|
||||
"FdF Cluny": (14.6000, -61.2000),
|
||||
# OpenRemote: "Traffic Fort-de-France South"
|
||||
"FdF Sud": (14.5900, -61.1900),
|
||||
# OpenRemote: "Weather Lamentin Airport"
|
||||
"Lamentin": (14.5950, -61.1700),
|
||||
"Fort-de-France Place Clémenceau": (14.615, -61.068),
|
||||
"Le Lamentin Centre Commercial": (14.618, -61.002),
|
||||
"Le Robert Stade": (14.685, -60.935),
|
||||
"Sainte-Anne Mairie": (14.432, -60.985),
|
||||
"Saint-Joseph Ecole": (14.702, -61.045),
|
||||
"Trinité Port": (14.715, -60.92),
|
||||
"Le François Mairie": (14.682, -60.835),
|
||||
"Ducos ZI": (14.585, -61.055),
|
||||
"Schœlcher Bourg": (14.652, -61.105),
|
||||
"Case-Pilote Stade": (14.518, -61.118),
|
||||
},
|
||||
"noise": {
|
||||
# OpenRemote: "Noise Fort-de-France Centre"
|
||||
"FdF Centre": (14.6036, -61.1783),
|
||||
# OpenRemote: "Traffic Fort-de-France Centre"
|
||||
"FdF Rue": (14.6036, -61.1783),
|
||||
# OpenRemote: "trafficFlow - Fort-de-France"
|
||||
"FdF Pasteur": (14.6200, -61.1700),
|
||||
# OpenRemote: "temperature - Lamentin"
|
||||
"Lamentin": (14.5950, -61.1650),
|
||||
# OpenRemote: "temperature - Le Robert"
|
||||
"Le Robert": (14.6776, -60.9395),
|
||||
"Fort-de-France Théâtre": (14.617, -61.069),
|
||||
"Le Lamentin Zone Industrielle": (14.619, -61.001),
|
||||
"Le Robert Bourg": (14.681, -60.932),
|
||||
"Sainte-Anne Plage": (14.434, -60.982),
|
||||
"Saint-Joseph Morne": (14.703, -61.042),
|
||||
"Trinité Centre": (14.717, -60.918),
|
||||
"Le François Bourg": (14.681, -60.832),
|
||||
"Ducos Penitencier": (14.584, -61.058),
|
||||
"Schœlcher Morne": (14.651, -61.102),
|
||||
"Case-Pilote Village": (14.519, -61.115),
|
||||
},
|
||||
"weather": {
|
||||
# OpenRemote: "Weather Lamentin Airport"
|
||||
"Lamentin": (14.5950, -61.1700),
|
||||
# OpenRemote: "temperature - Lamentin"
|
||||
"Lamentin Ville": (14.5950, -61.1650),
|
||||
# OpenRemote: "temperature - Le Robert"
|
||||
"Le Robert": (14.6776, -60.9395),
|
||||
# OpenRemote: "humidity - Le Robert"
|
||||
"Le Robert Hum": (14.6800, -60.9400),
|
||||
# OpenRemote: "floodLevel - Schoelcher"
|
||||
"Schoelcher": (14.7400, -61.1850),
|
||||
"Fort-de-France Meteo": (14.616, -61.067),
|
||||
"Le Lamentin Aéroport": (14.617, -61.004),
|
||||
"Le Robert Bourg": (14.682, -60.934),
|
||||
"Sainte-Anne Village": (14.431, -60.981),
|
||||
"Saint-Joseph Morne": (14.704, -61.043),
|
||||
"Trinité Eglise": (14.718, -60.912),
|
||||
"Le François Bourg": (14.683, -60.834),
|
||||
"Ducos Centre": (14.586, -61.057),
|
||||
"Schœlcher Plage": (14.654, -61.108),
|
||||
"Case-Pilote D1": (14.521, -61.113),
|
||||
},
|
||||
"light": {
|
||||
# OpenRemote: "Light Fort-de-France"
|
||||
"FdF Centre": (14.6036, -61.1783),
|
||||
# OpenRemote: "lightIntensity - Fort-de-France"
|
||||
"FdF Bonde": (14.6050, -61.1800),
|
||||
# OpenRemote: "Traffic Fort-de-France North"
|
||||
"FdF North": (14.6200, -61.1700),
|
||||
# OpenRemote: "Traffic Fort-de-France South"
|
||||
"FdF South": (14.5900, -61.1900),
|
||||
# OpenRemote: "airQuality - Sainte-Luce"
|
||||
"Sainte-Luce": (14.5950, -61.1700),
|
||||
"Fort-de-France Place": (14.6155, -61.0685),
|
||||
"Le Lamentin Rond-point": (14.6185, -61.0025),
|
||||
"Le Robert D110": (14.6835, -60.9335),
|
||||
"Sainte-Anne Plage": (14.4335, -60.9835),
|
||||
"Saint-Joseph D1": (14.7005, -61.0505),
|
||||
"Trinité Centre": (14.7165, -60.9165),
|
||||
"Le François D2": (14.6835, -60.8335),
|
||||
"Ducos Penitencier": (14.5835, -61.0665),
|
||||
"Schœlcher Morne": (14.6505, -61.1005),
|
||||
"Case-Pilote Bourg": (14.5165, -61.1165),
|
||||
},
|
||||
}
|
||||
|
||||
def _build_locs(stype: str, count: int) -> list[dict]:
|
||||
"""Construit la liste des capteurs avec coordonnées fixes (sur terre)."""
|
||||
locs = []
|
||||
names = list(FIXED_LOCATIONS.get(stype, {stype: (BASE_LAT, BASE_LON)}).keys())
|
||||
# Répéter les noms si count > len(names)
|
||||
for i in range(count):
|
||||
name = names[i % len(names)]
|
||||
coords = FIXED_LOCATIONS.get(stype, {}).get(name, (BASE_LAT, BASE_LON))
|
||||
locs.append({
|
||||
"lat": round(coords[0], 6),
|
||||
"lon": round(coords[1], 6),
|
||||
"name": name,
|
||||
})
|
||||
return locs
|
||||
|
||||
SENSOR_LOCATIONS: dict[str, list[dict]] = {}
|
||||
for stype, count in SENSOR_COUNTS.items():
|
||||
SENSOR_LOCATIONS[stype] = _build_locs(stype, count)
|
||||
|
||||
# Ranges par type
|
||||
SENSOR_RANGES: dict[str, dict] = {
|
||||
"traffic": {"vehicle_count":(10,150),"average_speed_kmh":(10,80),
|
||||
@@ -347,22 +328,22 @@ LIGHT_STATUSES = ["on","off","dimmed","auto"]
|
||||
# =============================================================================
|
||||
SENSORS: dict[str, dict] = {}
|
||||
counter = 0
|
||||
for stype, locs in SENSOR_LOCATIONS.items():
|
||||
for loc in locs:
|
||||
for stype, locs in FIXED_LOCATIONS.items():
|
||||
for name, coords in locs.items():
|
||||
sid = f"{stype}_{counter:03d}"
|
||||
SENSORS[sid] = {"type": stype, "lat": loc["lat"], "lon": loc["lon"], "name": loc["name"]}
|
||||
SENSORS[sid] = {"type": stype, "lat": coords[0], "lon": coords[1], "name": name}
|
||||
counter += 1
|
||||
|
||||
# =============================================================================
|
||||
# Payload NGSI-LD pour Orion-LD / Stellio
|
||||
## Payload NGSI-LD pour Orion-LD / Stellio
|
||||
# =============================================================================
|
||||
# Contextes NGSI-LD : core + Smart Data Models
|
||||
# https://smartdatamodels.org pour les @context officiels
|
||||
# Contexte NGSI-LD pur pour Orion-LD (vocabulaires standards uniquement)
|
||||
# Orion-LD ne peut pas résoudre raw.githubusercontent.com — utiliser uri.etsi.org uniquement
|
||||
ORION_CONTEXT = [
|
||||
"https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
|
||||
]
|
||||
## Contexte NGSI-LD pur pour Orion-LD (vocabulaires standards uniquement)
|
||||
## Orion-LD ne peut pas résoudre raw.githubusercontent.com — utiliser uri.etsi.org uniquement
|
||||
#ORION_CONTEXT = [
|
||||
# "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
|
||||
#]
|
||||
|
||||
# Mapping sensor type → Smart Data Model type NGSI-LD
|
||||
SMART_MODEL_MAPPING = {
|
||||
@@ -378,90 +359,14 @@ FROST_HEADERS = {"Accept": "application/json", "Content-Type": "application/json
|
||||
# Cache FROST : éviter de recréer Thing/Datastream
|
||||
_frost_cache: dict[str, tuple[str, str]] = {} # (sid, field) -> (thing_id, ds_id)
|
||||
|
||||
# Contexte NGSI-LD pur pour Stellio et Orion-LD (vocabulaires standards uniquement)
|
||||
# Stellio et Orion-LD embarquent le contexte core NGSI-LD : https://uri.etsi.org/ngsi-ld/
|
||||
## Contexte NGSI-LD pur pour Stellio et Orion-LD (vocabulaires standards uniquement)
|
||||
## Stellio et Orion-LD embarquent le contexte core NGSI-LD : https://uri.etsi.org/ngsi-ld/
|
||||
# On n'utilise PAS les vocabulaires smartdatamodels.org distants (inaccessibles depuis les containers)
|
||||
# Les types d'entité Smart Data Models (AirQualityObserved, etc.) sont reconnus par leur nom
|
||||
# Les propriétés spécifiques sont stockées telles quelles (vocabulaire libre)
|
||||
STELLIO_INLINE_CONTEXT = [
|
||||
"https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
|
||||
]
|
||||
|
||||
def _ngsi_payload(sid: str, sensor: dict, context: list | dict = ORION_CONTEXT, source: str = "simulator", topic: str = "") -> dict:
|
||||
"""Construit un payload NGSI-LD avec Smart Data Models officiels."""
|
||||
stype = sensor["type"]
|
||||
model_type = SMART_MODEL_MAPPING.get(stype, "Device")
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
# Attributs communs à tous les modèles
|
||||
payload = {
|
||||
"@context": context,
|
||||
"id": f"urn:ngsi-ld:{model_type}:{sid}",
|
||||
"type": model_type,
|
||||
"dateObserved": {"type": "Property", "value": now},
|
||||
"location": {"type": "GeoProperty",
|
||||
"value": {"type": "Point",
|
||||
"coordinates": [sensor["lon"], sensor["lat"]]}},
|
||||
"name": {"type": "Property", "value": sensor["name"]},
|
||||
"batteryLevel": {"type": "Property", "value": random.randint(60, 100)},
|
||||
# NOUVEAU: Traçabilité MQTT (Conforme NGSI-LD)
|
||||
# "source" est un champ standard NGSI-LD (ETSI)
|
||||
# "mqttTopic" est une propriété personnalisée (étendue autorisée)
|
||||
"source": {"type": "Property", "value": source},
|
||||
"mqttTopic": {"type": "Property", "value": topic},
|
||||
}
|
||||
|
||||
# Attributs spécifiques par type de modèle
|
||||
ranges = SENSOR_RANGES.get(stype, {})
|
||||
props = {}
|
||||
for field, val_range in ranges.items():
|
||||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||||
lo, hi = val_range
|
||||
if isinstance(lo, (int, float)):
|
||||
props[field] = {"type": "Property", "value": round(random.uniform(lo, hi), 1)}
|
||||
elif isinstance(val_range, list):
|
||||
props[field] = {"type": "Property", "value": random.choice(val_range)}
|
||||
|
||||
# Mapping vers les noms d'attributs Smart Data Models
|
||||
if stype == "airquality":
|
||||
if "pm25_ugm3" in props: payload["NO2"] = props.pop("pm25_ugm3") # Simplifié
|
||||
if "pm10_ugm3" in props: payload["PM10"] = props.pop("pm10_ugm3")
|
||||
if "no2_ugm3" in props: payload["NO2"] = props.pop("no2_ugm3")
|
||||
if "o3_ugm3" in props: payload["O3"] = props.pop("o3_ugm3")
|
||||
if "co_mgm3" in props: payload["CO"] = props.pop("co_mgm3")
|
||||
if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius")
|
||||
if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent")
|
||||
|
||||
elif stype == "traffic":
|
||||
if "vehicle_count" in props: payload["vehicleCount"] = props.pop("vehicle_count")
|
||||
if "average_speed_kmh" in props: payload["averageVehicleSpeed"] = props.pop("average_speed_kmh")
|
||||
if "congestion_level" in props: payload["congestion"] = props.pop("congestion_level")
|
||||
if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent")
|
||||
|
||||
elif stype == "parking":
|
||||
if "available_spots" in props: payload["availableSpotNumber"] = props.pop("available_spots")
|
||||
if "total_spots" in props: payload["totalSpotNumber"] = props.pop("total_spots")
|
||||
if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent")
|
||||
if "turnover_per_hour" in props: payload["turnover"] = props.pop("turnover_per_hour")
|
||||
|
||||
elif stype == "noise":
|
||||
if "noise_level_db" in props: payload["noiseLevel"] = props.pop("noise_level_db")
|
||||
if "peak_db" in props: payload["noisePeak"] = props.pop("peak_db")
|
||||
payload["noiseCategory"] = {"type": "Property", "value": random.choice(NOISE_CATEGORIES)}
|
||||
|
||||
elif stype == "weather":
|
||||
if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius")
|
||||
if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent")
|
||||
if "rain_mm" in props: payload["rainfall"] = props.pop("rain_mm")
|
||||
if "uv_index" in props: payload["uvIndex"] = props.pop("uv_index")
|
||||
if "wind_speed_kmh" in props: payload["windSpeed"] = props.pop("wind_speed_kmh")
|
||||
|
||||
elif stype == "light":
|
||||
if "brightness_lux" in props: payload["illuminance"] = props.pop("brightness_lux")
|
||||
if "power_consumption_w" in props: payload["power"] = props.pop("power_consumption_w")
|
||||
payload["status"] = {"type": "Property", "value": random.choice(LIGHT_STATUSES)}
|
||||
|
||||
return payload
|
||||
#STELLIO_INLINE_CONTEXT = [
|
||||
# "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
|
||||
#]
|
||||
|
||||
def _frost_payload(sid: str, sensor: dict, source: str = "simulator", topic: str = "") -> dict:
|
||||
"""Construit un payload SensorThings pour FROST-Server."""
|
||||
@@ -589,9 +494,10 @@ class MultiMQTT:
|
||||
|
||||
def _mk_client(self, name: str, host: str, port: int,
|
||||
tls: bool = False, user: str = "", pwd: str = "",
|
||||
ws: bool = False) -> mqtt.Client:
|
||||
ws: bool = False, use_v5: bool = False) -> mqtt.Client:
|
||||
cid = f"smartcity-sim-{name}-{os.getpid()}"
|
||||
c = mqtt.Client(client_id=cid, protocol=mqtt.MQTTv311)
|
||||
protocol = mqtt.MQTTv5 if use_v5 else mqtt.MQTTv311
|
||||
c = mqtt.Client(client_id=cid, protocol=protocol)
|
||||
if user:
|
||||
c.username_pw_set(user, pwd)
|
||||
if tls:
|
||||
@@ -631,13 +537,13 @@ class MultiMQTT:
|
||||
def _setup(self):
|
||||
# Utiliser les variables d'environnement pour les brokers
|
||||
brokers = [
|
||||
("emqx", EMQX_HOST, EMQX_PORT, False, "", ""),
|
||||
("mosquitto", MOSQUITTO_HOST, MOSQUITTO_PORT, False, "", ""),
|
||||
("bunkerm", BUNKERM_HOST, BUNKERM_PORT, False, "bunker", "bunker"), # Port 1900 = MQTT simple, pas TLS
|
||||
("emqx", EMQX_HOST, EMQX_PORT, False, "", "", False),
|
||||
("mosquitto", MOSQUITTO_HOST, MOSQUITTO_PORT, False, "", "", False), # Same as emqx
|
||||
("bunkerm", BUNKERM_HOST, BUNKERM_PORT, False, "bunker", "bunker", False),
|
||||
]
|
||||
print("[MQTT] 🔌 Connexion aux brokers...")
|
||||
for name, host, port, tls, user, pwd in brokers:
|
||||
c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd)
|
||||
for name, host, port, tls, user, pwd, use_v5 in brokers:
|
||||
c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd, use_v5=use_v5)
|
||||
self.clients[name] = c
|
||||
self.ok[name] = False
|
||||
time.sleep(3) # Attend les connexions
|
||||
@@ -665,8 +571,8 @@ class MultiMQTT:
|
||||
results[name] = False
|
||||
return results
|
||||
|
||||
def publish_iot_agent(self, sid: str, payload: dict, sensor_type: str = "unknown") -> bool:
|
||||
"""Publie sur le topic IoT-Agent (json/smartcity-api-key/{sid}/attrs) via les 3 brokers."""
|
||||
# def publish_iot_agent(self, sid: str, payload: dict, sensor_type: str = "unknown") -> bool:
|
||||
# """Publie sur le topic IoT-Agent (json/smartcity-api-key/{sid}/attrs) via les 3 brokers."""
|
||||
topic = f"json/smartcity-api-key/{sid}/attrs"
|
||||
msg = json.dumps(payload, ensure_ascii=False)
|
||||
payload_bytes = len(msg.encode())
|
||||
@@ -674,15 +580,17 @@ class MultiMQTT:
|
||||
success = False
|
||||
# Publier sur les 3 brokers: emqx, mosquitto, bunkerm
|
||||
for broker_name in ['emqx', 'mosquitto', 'bunkerm']:
|
||||
if broker_name in self.clients and self.ok.get(broker_name, False):
|
||||
client_ok = self.clients.get(broker_name) is not None and self.ok.get(broker_name, False)
|
||||
print(f"[MQTT-DEBUG] {broker_name}: client_exists={self.clients.get(broker_name) is not None}, ok={self.ok.get(broker_name, False)}")
|
||||
if client_ok:
|
||||
try:
|
||||
r = self.clients[broker_name].publish(topic, msg, qos=1)
|
||||
if r.rc == mqtt.MQTT_ERR_SUCCESS:
|
||||
success = True
|
||||
messages_published_total.labels(broker='iot-agent', sensor_type=sensor_type).inc()
|
||||
message_payload_size.labels(broker='iot-agent').observe(payload_bytes)
|
||||
# messages_published_total.labels(broker='iot-agent', sensor_type=sensor_type).inc()
|
||||
# message_payload_size.labels(broker='iot-agent').observe(payload_bytes)
|
||||
except Exception:
|
||||
messages_errors_total.labels(broker='iot-agent', sensor_type=sensor_type, error_type="exception").inc()
|
||||
pass # IoT-Agent code removed
|
||||
return success
|
||||
|
||||
def stop(self):
|
||||
@@ -696,110 +604,18 @@ class MultiMQTT:
|
||||
# =============================================================================
|
||||
# URLs de base (résolues au démarrage)
|
||||
# =============================================================================
|
||||
ORION_HOST = "localhost"
|
||||
ORION_PORT = "2026"
|
||||
ORION_URL = f"http://{ORION_HOST}:{ORION_PORT}"
|
||||
STELLIO_URL = os.environ.get("STELLIO_URL", "http://localhost:8087") # Stellio API Gateway (à exposer)
|
||||
#ORION_HOST = "localhost"
|
||||
#ORION_PORT = "2026"
|
||||
#ORION_URL = f"http://{ORION_HOST}:{ORION_PORT}"
|
||||
#STELLIO_URL = os.environ.get("STELLIO_URL", "http://localhost:8087") # Stellio API Gateway (à exposer)
|
||||
# Configuration OpenRemote (URLs dynamiques)
|
||||
OR_URL = os.environ.get("OR_URL", "http://localhost:8080") # OpenRemote Manager (Traefik)
|
||||
OR_REALM = os.environ.get("OR_REALM", "smartcity") # Default: smartcity
|
||||
OR_TOKEN_URL = os.environ.get("OR_TOKEN_URL", "http://localhost:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token")
|
||||
OR_TOKEN_TTL = int(os.environ.get("OR_TOKEN_TTL", "3600")) # Refresh token every hour
|
||||
STELLIO_TENANT = os.environ.get("STELLIO_TENANT", "urn:ngsi-ld:tenant:default")
|
||||
#STELLIO_TENANT = os.environ.get("STELLIO_TENANT", "urn:ngsi-ld:tenant:default")
|
||||
|
||||
def publish_stellio(sid: str, sensor: dict) -> bool:
|
||||
"""Publie sur Stellio via Traefik (gère le 409)."""
|
||||
# Topic MQTT correspondant (pour traçabilité)
|
||||
stype = sensor["type"]
|
||||
topic = f"city/sensors/{stype}/{sid}"
|
||||
entity = _ngsi_payload(sid, sensor, context=STELLIO_INLINE_CONTEXT, source="simulator", topic=topic)
|
||||
# Stellio a besoin du @context pour résoudre les vocabulaires NGSI-LD
|
||||
# (uri.etsi.org résolu depuis le JAR embarqué)
|
||||
url = f"{STELLIO_URL}/ngsi-ld/v1/entities"
|
||||
headers = {
|
||||
"Content-Type": "application/ld+json",
|
||||
"Accept": "application/ld+json",
|
||||
"NGSILD-Tenant": STELLIO_TENANT,
|
||||
}
|
||||
try:
|
||||
body = json.dumps(entity).encode()
|
||||
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
|
||||
with http_request_duration.labels(broker="stellio", method="POST").time():
|
||||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||||
http_requests_total.labels(broker="stellio", method="POST", status_code=str(resp.status)).inc()
|
||||
print(f" 🏢 Stellio: ✅ (HTTP {resp.status})")
|
||||
return True
|
||||
except urllib.error.HTTPError as e:
|
||||
http_requests_total.labels(broker="stellio", method="POST", status_code=str(e.code)).inc()
|
||||
if e.code == 409: # Already exists, do update with PUT
|
||||
try:
|
||||
entity_id = urllib.parse.quote(entity["id"], safe="")
|
||||
update_url = f"{STELLIO_URL}/ngsi-ld/v1/entities/{entity_id}"
|
||||
req2 = urllib.request.Request(update_url, data=body, headers=headers, method="PUT")
|
||||
with http_request_duration.labels(broker="stellio", method="PUT").time():
|
||||
with urllib.request.urlopen(req2, timeout=8) as resp2:
|
||||
http_requests_total.labels(broker="stellio", method="PUT", status_code=str(resp2.status)).inc()
|
||||
print(f" 🏢 Stellio: ✅ (HTTP {resp2.status} updated)")
|
||||
return True
|
||||
except Exception as e2:
|
||||
http_requests_total.labels(broker="stellio", method="PUT", status_code="error").inc()
|
||||
messages_errors_total.labels(broker="stellio", sensor_type=stype, error_type="http_error").inc()
|
||||
print(f" ⚠️ Stellio update failed: {e2}")
|
||||
return False
|
||||
try:
|
||||
err = e.read().decode()[:300]
|
||||
except Exception:
|
||||
err = str(e)
|
||||
messages_errors_total.labels(broker="stellio", sensor_type=stype, error_type="http_error").inc()
|
||||
print(f" ⚠️ Stellio → {e.code}: {err}")
|
||||
return False
|
||||
except Exception as e:
|
||||
http_requests_total.labels(broker="stellio", method="POST", status_code="exception").inc()
|
||||
messages_errors_total.labels(broker="stellio", sensor_type=stype, error_type="exception").inc()
|
||||
print(f" ⚠️ Stellio → {e}")
|
||||
return False
|
||||
|
||||
def publish_orion(sid: str, sensor: dict) -> bool:
|
||||
"""Publie sur Orion-LD (POST create, PATCH update)."""
|
||||
# Topic MQTT correspondant (pour traçabilité)
|
||||
stype = sensor["type"]
|
||||
topic = f"city/sensors/{stype}/{sid}"
|
||||
entity = _ngsi_payload(sid, sensor, source="simulator", topic=topic)
|
||||
# Orion-LD est exposé sur localhost:2026 (hôte)
|
||||
base = "http://localhost:2026/ngsi-ld/v1"
|
||||
# 1. Essayer de créer (POST)
|
||||
try:
|
||||
body = json.dumps(entity).encode()
|
||||
req = urllib.request.Request(f"{base}/entities", data=body,
|
||||
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="POST")
|
||||
with http_request_duration.labels(broker="orion_ld", method="POST").time():
|
||||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||||
http_requests_total.labels(broker="orion_ld", method="POST", status_code=str(resp.status)).inc()
|
||||
print(f" 🌐 Orion-LD: ✅ (HTTP {resp.status} created)")
|
||||
return True
|
||||
except urllib.error.HTTPError as e:
|
||||
http_requests_total.labels(broker="orion_ld", method="POST", status_code=str(e.code)).inc()
|
||||
if e.code != 409:
|
||||
messages_errors_total.labels(broker="orion_ld", sensor_type=stype, error_type="http_error").inc()
|
||||
print(f" ⚠️ Orion-LD → {e.code}: {e.read().decode()[:200]}")
|
||||
return False
|
||||
# 409 = déjà existant → PATCH
|
||||
# 2. Déjà existant (409) → PATCH sur les attributs
|
||||
try:
|
||||
eid = urllib.parse.quote(entity['id'], safe='')
|
||||
patch_url = f"{base}/entities/{eid}/attrs"
|
||||
req2 = urllib.request.Request(patch_url, data=body,
|
||||
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="PATCH")
|
||||
with http_request_duration.labels(broker="orion_ld", method="PATCH").time():
|
||||
with urllib.request.urlopen(req2, timeout=8) as resp2:
|
||||
http_requests_total.labels(broker="orion_ld", method="PATCH", status_code=str(resp2.status)).inc()
|
||||
print(f" 🌐 Orion-LD: ✅ (HTTP {resp2.status} updated)")
|
||||
return True
|
||||
except Exception as e2:
|
||||
http_requests_total.labels(broker="orion_ld", method="PATCH", status_code="error").inc()
|
||||
messages_errors_total.labels(broker="orion_ld", sensor_type=stype, error_type="http_error").inc()
|
||||
print(f" ⚠️ Orion-LD PATCH failed: {e2}")
|
||||
return False
|
||||
|
||||
def publish_bunkerm(sid: str, sensor: dict, values: dict) -> bool:
|
||||
"""Publie sur BunkerM via HTTP API (port 2000) avec session."""
|
||||
@@ -1183,14 +999,14 @@ def publish_influx(sid: str, sensor: dict, values: dict) -> bool:
|
||||
# Exécution asynchrone (non-bloquante)
|
||||
t = threading.Thread(target=_write_async, daemon=True)
|
||||
t.start()
|
||||
return True
|
||||
return True # Async: on ne peut pas savoir immédiatement
|
||||
|
||||
def main():
|
||||
print("╔══════════════════════════════════════════════════╗")
|
||||
print("║ Smart City Simulator — Martinique ║")
|
||||
print("╚══════════════════════════════════════════════════╝")
|
||||
print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s")
|
||||
print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}")
|
||||
# print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}")
|
||||
print(f"[CFG] InfluxDB: {ENABLE_INFLUX} | Pulsar: {ENABLE_PULSAR} | Redpanda: {ENABLE_REDPANDA}")
|
||||
|
||||
# --- Démarrer le serveur Prometheus ---
|
||||
@@ -1233,14 +1049,10 @@ def main():
|
||||
stype = sensor["type"]
|
||||
topic = f"city/sensors/{stype}/{sid}"
|
||||
|
||||
# --- Payload MQTT ---
|
||||
# --- Payload MQTT (ATTRIBUTES ONLY - pas de id/type/lat/lon !)
|
||||
# # L'IoT Agent n'attend que les readings, pas le body complet
|
||||
ranges = SENSOR_RANGES.get(stype, {})
|
||||
payload_mqtt = {
|
||||
"id": sid,
|
||||
"type": stype,
|
||||
"name": sensor["name"],
|
||||
"lat": sensor["lat"],
|
||||
"lon": sensor["lon"],
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"battery_level": random.randint(60, 100),
|
||||
}
|
||||
@@ -1260,10 +1072,10 @@ def main():
|
||||
if ok_mqtt:
|
||||
print(f" 📤 {topic} → {','.join(ok_mqtt)}")
|
||||
|
||||
# --- IoT-Agent (via EMQX) ---
|
||||
if ENABLE_IOT_AGENT:
|
||||
ok_iot = mqtt_client.publish_iot_agent(sid, payload_mqtt, sensor_type=stype)
|
||||
print(f" 🤖 IoT-Agent: {'✅' if ok_iot else '❌'}")
|
||||
# # --- IoT-Agent (via EMQX) ---
|
||||
# if ENABLE_IOT_AGENT:
|
||||
# ok_iot = mqtt_client.publish_iot_agent(sid, payload_mqtt, sensor_type=stype)
|
||||
# print(f" 🤖 IoT-Agent: {'✅' if ok_iot else '❌'}")
|
||||
|
||||
# Extraire les valeurs pour OpenRemote
|
||||
or_values = {}
|
||||
@@ -1278,15 +1090,15 @@ def main():
|
||||
ok_or = publish_openremote(sid, sensor, or_values)
|
||||
print(f" 🏠 OpenRemote: {'✅' if ok_or else '⚠️ skipped'}")
|
||||
|
||||
# --- Orion-LD --- (DÉSACTIVÉ: Utiliser uniquement IoT-Agents MQTT)
|
||||
# if ENABLE_ORION:
|
||||
# ok_or = publish_orion(sid, sensor)
|
||||
# print(f" 🌐 Orion-LD: {'✅' if ok_or else '⚠️ skipped'}")
|
||||
# # --- Orion-LD --- (DÉSACTIVÉ: tout passe par les IoT-Agents MQTT)
|
||||
# # if ENABLE_ORION:
|
||||
# # ok_or = publish_orion(sid, sensor)
|
||||
# # print(f" 🌐 Orion-LD: {'✅' if ok_or else '⚠️ skipped'}")
|
||||
|
||||
# --- Stellio --- (DÉSACTIVÉ: Utiliser uniquement IoT-Agents MQTT)
|
||||
# if ENABLE_STELLIO:
|
||||
# ok_st = publish_stellio(sid, sensor)
|
||||
# print(f" 🏢 Stellio: {'✅' if ok_st else '❌'}")
|
||||
# # --- Stellio --- (DÉSACTIVÉ: tout passe par les IoT-Agents MQTT)
|
||||
# # if ENABLE_STELLIO:
|
||||
# # ok_st = publish_stellio(sid, sensor)
|
||||
# # print(f" 🏢 Stellio: {'✅' if ok_st else '❌'}")
|
||||
|
||||
# --- FROST ---
|
||||
if ENABLE_FROST:
|
||||
|
||||
Reference in New Issue
Block a user