- Token InfluxDB corrigé dans simulator.py (my-super-token) - Bucket iot_data créé dans InfluxDB - CrateDB-Stellio ports sécurisés (suppression exposition publique) - Healthchecks MongoDB/Mosquitto corrigés - Nettoyage container digital-twin-grafana
1341 lines
61 KiB
Python
1341 lines
61 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Smart City IoT Simulator — Martinique (14.6°N, 61.2°W)
|
||
=======================================================
|
||
Publie vers MULTIPLES brokers MQTT + context brokers NGSI-LD.
|
||
|
||
Brokers MQTT:
|
||
- EMQX: emqx_emqx_1:1883 (sans auth)
|
||
- Mosquitto: mainfluxlabs-mosquitto:1883 (bunker/bunker)
|
||
- BunkerM: bunkerm_bunkerm_1:1900 (TLS, bunker/bunker)
|
||
- OpenRemote: openremote-manager-1:1883 (admin/Digitribe972)
|
||
|
||
Context Brokers REST:
|
||
- Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD)
|
||
- Stellio: stellio-api-gateway:8080 (NGSI-LD)
|
||
- FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings)
|
||
|
||
Streaming Platforms:
|
||
- Pulsar: smart-city-pulsar:8080 (HTTP REST Producer API)
|
||
- Redpanda: smart-city-redpanda:8082 (Kafka REST Proxy)
|
||
|
||
Time-Series DB:
|
||
- InfluxDB v2: smart-city-influxdb:8086 (via influxdb-client)
|
||
|
||
Variables d'environnement:
|
||
PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s)
|
||
BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France)
|
||
ENABLE_ORION=1 : activer Orion-LD (défaut: 1)
|
||
ENABLE_STELLIO=1 : activer Stellio (défaut: 1)
|
||
ENABLE_FROST=1 : activer FROST-Server (défaut: 1)
|
||
ENABLE_INFLUX=1 : activer InfluxDB v2 (défaut: 1)
|
||
ENABLE_PULSAR=1 : activer Apache Pulsar (défaut: 1)
|
||
ENABLE_REDPANDA=1 : activer Redpanda Kafka (défaut: 1)
|
||
"""
|
||
|
||
import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse
|
||
import paho.mqtt.client as mqtt
|
||
import urllib.request, urllib.error
|
||
from datetime import datetime, timezone
|
||
from typing import Any
|
||
|
||
# Prometheus metrics
|
||
import prometheus_client
|
||
from prometheus_client import Counter, Histogram, Gauge, Info
|
||
|
||
# InfluxDB support
|
||
import influxdb_client
|
||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||
|
||
# =============================================================================
|
||
# Configuration des brokers MQTT
|
||
# Configuration des brokers MQTT
|
||
# Par défaut localhost (simulateur tourne sur l'hôte)
|
||
EMQX_HOST = os.environ.get("EMQX_HOST", "localhost")
|
||
EMQX_PORT = int(os.environ.get("EMQX_PORT", "11883"))
|
||
MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "localhost")
|
||
MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883"))
|
||
BUNKERM_HOST = os.environ.get("BUNKERM_HOST", "mqtt.digitribe.fr")
|
||
BUNKERM_PORT = int(os.environ.get("BUNKERM_PORT", "1900"))
|
||
|
||
# =============================================================================
|
||
# Configuration
|
||
# =============================================================================
|
||
BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091"))
|
||
BASE_LON = float(os.environ.get("BASE_LON", "-61.2155"))
|
||
INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "1")) # 1s pour temps réel
|
||
ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1"
|
||
ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1").lower() in ("1", "true", "yes", "on")
|
||
ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1"
|
||
ENABLE_OPENREMOTE = os.environ.get("ENABLE_OPENREMOTE", "1") == "1"
|
||
OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin")
|
||
OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "Digitribe972")
|
||
OR_REALM = os.environ.get("OR_REALM", "smartcity")
|
||
OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token
|
||
ENABLE_IOT_AGENT = os.environ.get("ENABLE_IOT_AGENT", "1") == "1"
|
||
FROST_URL = os.environ.get("FROST_URL", "http://localhost:8090/FROST-Server/v1.1") # Exposer frost_http-web-1:8080 -> host:8086
|
||
|
||
# Pulsar config (HTTP REST — pulsar-admin + producer REST API)
|
||
ENABLE_PULSAR = os.environ.get("ENABLE_PULSAR", "1").lower() in ("1", "true", "yes", "on")
|
||
PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar")
|
||
PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "8080"))
|
||
PULSAR_BASE = f"http://{PULSAR_HOST}:{PULSAR_PORT}"
|
||
|
||
# Redpanda / Kafka config (REST Proxy HTTP)
|
||
ENABLE_REDPANDA = os.environ.get("ENABLE_REDPANDA", "1").lower() in ("1", "true", "yes", "on")
|
||
REDPANDA_HOST = os.environ.get("REDPANDA_HOST", "smart-city-redpanda")
|
||
REDPANDA_PORT = int(os.environ.get("REDPANDA_PORT", "8082"))
|
||
REDPANDA_BASE = f"http://{REDPANDA_HOST}:{REDPANDA_PORT}"
|
||
|
||
# InfluxDB config
|
||
ENABLE_INFLUX = os.environ.get("ENABLE_INFLUX", "1").lower() in ("1", "true", "yes", "on")
|
||
INFLUX_URL = os.environ.get("INFLUX_URL", "http://smart-city-influxdb:8086") # InfluxDB v2 sur smartcity-shared
|
||
INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe")
|
||
INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "iot_data")
|
||
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-token")
|
||
|
||
# Prometheus metrics HTTP server
|
||
METRICS_PORT = int(os.environ.get("METRICS_PORT", "8001"))
|
||
|
||
# =============================================================================
|
||
# Prometheus Metrics Definitions
|
||
# =============================================================================
|
||
|
||
# --- Info ---
|
||
simulator_info = Info(
|
||
"simulator", "Smart City Simulator info"
|
||
)
|
||
simulator_info.info({
|
||
"version": "1.0.0",
|
||
"python_version": sys.version.split()[0],
|
||
"mqtt_brokers": "emqx,mosquitto,bunkerm",
|
||
"context_brokers": "orion_ld,stellio,frost",
|
||
})
|
||
|
||
# --- Counters ---
|
||
messages_published_total = Counter(
|
||
"simulator_messages_published_total",
|
||
"Total messages published by broker",
|
||
["broker", "sensor_type"]
|
||
)
|
||
|
||
messages_errors_total = Counter(
|
||
"simulator_messages_errors_total",
|
||
"Total publish errors",
|
||
["broker", "sensor_type", "error_type"]
|
||
)
|
||
|
||
mqtt_connection_total = Counter(
|
||
"simulator_mqtt_connection_total",
|
||
"MQTT connection attempts",
|
||
["broker", "status"] # status: success, failure
|
||
)
|
||
|
||
http_requests_total = Counter(
|
||
"simulator_http_requests_total",
|
||
"HTTP requests to REST APIs",
|
||
["broker", "method", "status_code"]
|
||
)
|
||
|
||
influx_write_total = Counter(
|
||
"simulator_influx_write_total",
|
||
"InfluxDB write operations",
|
||
["status"] # success, error
|
||
)
|
||
|
||
# --- Gauges ---
|
||
mqtt_broker_connected = Gauge(
|
||
"simulator_mqtt_broker_connected",
|
||
"MQTT broker connection status (1=connected, 0=disconnected)",
|
||
["broker"]
|
||
)
|
||
|
||
sensors_total = Gauge(
|
||
"simulator_sensors_total",
|
||
"Total number of sensors by type",
|
||
["sensor_type"]
|
||
)
|
||
|
||
up = Gauge(
|
||
"simulator_up",
|
||
"Simulator is running (1=yes, 0=no)"
|
||
)
|
||
|
||
# --- Histograms ---
|
||
publish_duration = Histogram(
|
||
"simulator_publish_duration_seconds",
|
||
"Time spent publishing a message",
|
||
["broker"],
|
||
buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5)
|
||
)
|
||
|
||
http_request_duration = Histogram(
|
||
"simulator_http_request_duration_seconds",
|
||
"HTTP request latency to REST APIs",
|
||
["broker", "method"],
|
||
buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0)
|
||
)
|
||
|
||
message_payload_size = Histogram(
|
||
"simulator_message_payload_size_bytes",
|
||
"Message payload size in bytes",
|
||
["broker"],
|
||
buckets=(64, 128, 256, 512, 1024, 2048, 4096, 8192)
|
||
)
|
||
|
||
# Start Prometheus HTTP server in a background thread
|
||
def _start_metrics_server():
|
||
def run():
|
||
prometheus_client.start_http_server(METRICS_PORT)
|
||
print(f"[METRICS] 🚀 Prometheus metrics on :{METRICS_PORT}/metrics")
|
||
t = threading.Thread(target=run, daemon=True)
|
||
t.start()
|
||
return t
|
||
|
||
# Initialize InfluxDB client
|
||
_influx_client = None
|
||
_influx_write_api = None
|
||
if ENABLE_INFLUX:
|
||
try:
|
||
_influx_client = influxdb_client.InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
|
||
_influx_write_api = _influx_client.write_api(write_options=SYNCHRONOUS)
|
||
print(f"[INFLUX] ✅ Connected to {INFLUX_URL}")
|
||
except Exception as e:
|
||
print(f"[INFLUX] ❌ Connection failed: {e}")
|
||
|
||
SENSOR_COUNTS = {
|
||
"traffic": int(os.environ.get("SENSOR_COUNT_traffic", "3")),
|
||
"airquality": int(os.environ.get("SENSOR_COUNT_airquality", "2")),
|
||
"parking": int(os.environ.get("SENSOR_COUNT_parking", "2")),
|
||
"noise": int(os.environ.get("SENSOR_COUNT_noise", "1")),
|
||
"weather": int(os.environ.get("SENSOR_COUNT_weather", "1")),
|
||
"light": int(os.environ.get("SENSOR_COUNT_light", "1")),
|
||
}
|
||
# Si SENSOR_COUNT est défini, multiplier les counts de façon proportionnelle
|
||
_total_default = sum(SENSOR_COUNTS.values())
|
||
if "SENSOR_COUNT" in os.environ:
|
||
target = int(os.environ["SENSOR_COUNT"])
|
||
ratio = target / _total_default
|
||
for k in SENSOR_COUNTS:
|
||
SENSOR_COUNTS[k] = max(1, int(SENSOR_COUNTS[k] * ratio))
|
||
|
||
# =============================================================================
|
||
# Localisation des capteurs Martinique — Coordonnées FIXES sur terre ferme
|
||
# IMPORTANT : ±0.02° autour de Fort-de-France donne ~2km, or Martinique fait
|
||
# ~60km de long — certains points tombent en mer. Solution : coordonnées fixes.
|
||
# =============================================================================
|
||
|
||
# Coordonnées réelles Martinique (terre ferme uniquement)
|
||
# Martinique : 14.4°N–14.9°N, -61.23°W–-60.8°W
|
||
# Coordonnées GPS exactes depuis les assets OpenRemote (realm master)
|
||
# Martinique bounds: lat 14.37–14.88°N, lon 61.0–61.25°W
|
||
FIXED_LOCATIONS: dict[str, dict[str, tuple[float, float]]] = {
|
||
"traffic": {
|
||
# OpenRemote: "Traffic Fort-de-France Centre"
|
||
"FdF Centre": (14.6036, -61.1783),
|
||
# OpenRemote: "Traffic Fort-de-France North"
|
||
"FdF North": (14.6200, -61.1700),
|
||
# OpenRemote: "Traffic Fort-de-France South"
|
||
"FdF South": (14.5900, -61.1900),
|
||
# OpenRemote: "trafficFlow - Fort-de-France"
|
||
"FdF Centre Rue": (14.6036, -61.1783),
|
||
# OpenRemote: "Test Sensor"
|
||
"FdF Place": (14.6000, -61.2000),
|
||
},
|
||
"airquality": {
|
||
# OpenRemote: "Air Quality Fort-de-France"
|
||
"FdF Centre": (14.6036, -61.1783),
|
||
# OpenRemote: "airQuality - Fort-de-France"
|
||
"FdF Bonde": (14.6050, -61.1750),
|
||
# OpenRemote: "airQuality - Sainte-Luce"
|
||
"Sainte-Luce": (14.5950, -61.1700),
|
||
# OpenRemote: "floodLevel - Schoelcher"
|
||
"Schoelcher": (14.7400, -61.1850),
|
||
# OpenRemote: "humidity - Le Robert"
|
||
"Le Robert": (14.6800, -60.9400),
|
||
},
|
||
"parking": {
|
||
# OpenRemote: "Parking Fort-de-France Centre"
|
||
"FdF Centre": (14.6036, -61.1783),
|
||
# OpenRemote: "parkingAvailability - Fort-de-France"
|
||
"FdF Bonde": (14.6050, -61.1750),
|
||
# OpenRemote: "Test Sensor"
|
||
"FdF Cluny": (14.6000, -61.2000),
|
||
# OpenRemote: "Traffic Fort-de-France South"
|
||
"FdF Sud": (14.5900, -61.1900),
|
||
# OpenRemote: "Weather Lamentin Airport"
|
||
"Lamentin": (14.5950, -61.1700),
|
||
},
|
||
"noise": {
|
||
# OpenRemote: "Noise Fort-de-France Centre"
|
||
"FdF Centre": (14.6036, -61.1783),
|
||
# OpenRemote: "Traffic Fort-de-France Centre"
|
||
"FdF Rue": (14.6036, -61.1783),
|
||
# OpenRemote: "trafficFlow - Fort-de-France"
|
||
"FdF Pasteur": (14.6200, -61.1700),
|
||
# OpenRemote: "temperature - Lamentin"
|
||
"Lamentin": (14.5950, -61.1650),
|
||
# OpenRemote: "temperature - Le Robert"
|
||
"Le Robert": (14.6776, -60.9395),
|
||
},
|
||
"weather": {
|
||
# OpenRemote: "Weather Lamentin Airport"
|
||
"Lamentin": (14.5950, -61.1700),
|
||
# OpenRemote: "temperature - Lamentin"
|
||
"Lamentin Ville": (14.5950, -61.1650),
|
||
# OpenRemote: "temperature - Le Robert"
|
||
"Le Robert": (14.6776, -60.9395),
|
||
# OpenRemote: "humidity - Le Robert"
|
||
"Le Robert Hum": (14.6800, -60.9400),
|
||
# OpenRemote: "floodLevel - Schoelcher"
|
||
"Schoelcher": (14.7400, -61.1850),
|
||
},
|
||
"light": {
|
||
# OpenRemote: "Light Fort-de-France"
|
||
"FdF Centre": (14.6036, -61.1783),
|
||
# OpenRemote: "lightIntensity - Fort-de-France"
|
||
"FdF Bonde": (14.6050, -61.1800),
|
||
# OpenRemote: "Traffic Fort-de-France North"
|
||
"FdF North": (14.6200, -61.1700),
|
||
# OpenRemote: "Traffic Fort-de-France South"
|
||
"FdF South": (14.5900, -61.1900),
|
||
# OpenRemote: "airQuality - Sainte-Luce"
|
||
"Sainte-Luce": (14.5950, -61.1700),
|
||
},
|
||
}
|
||
|
||
def _build_locs(stype: str, count: int) -> list[dict]:
|
||
"""Construit la liste des capteurs avec coordonnées fixes (sur terre)."""
|
||
locs = []
|
||
names = list(FIXED_LOCATIONS.get(stype, {stype: (BASE_LAT, BASE_LON)}).keys())
|
||
# Répéter les noms si count > len(names)
|
||
for i in range(count):
|
||
name = names[i % len(names)]
|
||
coords = FIXED_LOCATIONS.get(stype, {}).get(name, (BASE_LAT, BASE_LON))
|
||
locs.append({
|
||
"lat": round(coords[0], 6),
|
||
"lon": round(coords[1], 6),
|
||
"name": name,
|
||
})
|
||
return locs
|
||
|
||
SENSOR_LOCATIONS: dict[str, list[dict]] = {}
|
||
for stype, count in SENSOR_COUNTS.items():
|
||
SENSOR_LOCATIONS[stype] = _build_locs(stype, count)
|
||
|
||
# Ranges par type
|
||
SENSOR_RANGES: dict[str, dict] = {
|
||
"traffic": {"vehicle_count":(10,150),"average_speed_kmh":(10,80),
|
||
"congestion_level":(0,5),"occupancy_percent":(0,100)},
|
||
"airquality": {"pm25_ugm3":(5,80),"pm10_ugm3":(10,150),"no2_ugm3":(5,60),
|
||
"o3_ugm3":(20,120),"co_mgm3":(0.1,5.0),
|
||
"temperature_celsius":(20,35),"humidity_percent":(40,95)},
|
||
"parking": {"total_spots":(50,500),"available_spots":(0,500),
|
||
"occupancy_percent":(0,100),"turnover_per_hour":(5,50)},
|
||
"noise": {"noise_level_db":(40,95),"peak_db":(60,110)},
|
||
"weather": {"temperature_celsius":(22,34),"humidity_percent":(50,95),
|
||
"wind_speed_kmh":(0,50),"pressure_hpa":(1005,1025),
|
||
"rain_mm":(0,20),"uv_index":(0,11)},
|
||
"light": {"brightness_lux":(0,100000),"power_consumption_w":(0,500)},
|
||
}
|
||
|
||
NOISE_CATEGORIES = ["quiet","moderate","loud","very_loud"]
|
||
LIGHT_STATUSES = ["on","off","dimmed","auto"]
|
||
|
||
# =============================================================================
|
||
# Capteurs déclarés
|
||
# =============================================================================
|
||
SENSORS: dict[str, dict] = {}
|
||
counter = 0
|
||
for stype, locs in SENSOR_LOCATIONS.items():
|
||
for loc in locs:
|
||
sid = f"{stype}_{counter:03d}"
|
||
SENSORS[sid] = {"type": stype, "lat": loc["lat"], "lon": loc["lon"], "name": loc["name"]}
|
||
counter += 1
|
||
|
||
# =============================================================================
|
||
# Payload NGSI-LD pour Orion-LD / Stellio
|
||
# =============================================================================
|
||
# Contextes NGSI-LD : core + Smart Data Models
|
||
# https://smartdatamodels.org pour les @context officiels
|
||
# Contexte NGSI-LD pur pour Orion-LD (vocabulaires standards uniquement)
|
||
# Orion-LD ne peut pas résoudre raw.githubusercontent.com — utiliser uri.etsi.org uniquement
|
||
ORION_CONTEXT = [
|
||
"https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
|
||
]
|
||
|
||
# Mapping sensor type → Smart Data Model type NGSI-LD
|
||
SMART_MODEL_MAPPING = {
|
||
"airquality": "AirQualityObserved",
|
||
"traffic": "TrafficFlowObserved",
|
||
"parking": "OffStreetParking",
|
||
"noise": "NoiseLevelObserved",
|
||
"weather": "WeatherObserved",
|
||
"light": "Device",
|
||
}
|
||
FROST_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"}
|
||
|
||
# Cache FROST : éviter de recréer Thing/Datastream
|
||
_frost_cache: dict[str, tuple[str, str]] = {} # (sid, field) -> (thing_id, ds_id)
|
||
|
||
# Contexte NGSI-LD pur pour Stellio et Orion-LD (vocabulaires standards uniquement)
|
||
# Stellio et Orion-LD embarquent le contexte core NGSI-LD : https://uri.etsi.org/ngsi-ld/
|
||
# On n'utilise PAS les vocabulaires smartdatamodels.org distants (inaccessibles depuis les containers)
|
||
# Les types d'entité Smart Data Models (AirQualityObserved, etc.) sont reconnus par leur nom
|
||
# Les propriétés spécifiques sont stockées telles quelles (vocabulaire libre)
|
||
STELLIO_INLINE_CONTEXT = [
|
||
"https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
|
||
]
|
||
|
||
def _ngsi_payload(sid: str, sensor: dict, context: list | dict = ORION_CONTEXT, source: str = "simulator", topic: str = "") -> dict:
|
||
"""Construit un payload NGSI-LD avec Smart Data Models officiels."""
|
||
stype = sensor["type"]
|
||
model_type = SMART_MODEL_MAPPING.get(stype, "Device")
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
|
||
# Attributs communs à tous les modèles
|
||
payload = {
|
||
"@context": context,
|
||
"id": f"urn:ngsi-ld:{model_type}:{sid}",
|
||
"type": model_type,
|
||
"dateObserved": {"type": "Property", "value": now},
|
||
"location": {"type": "GeoProperty",
|
||
"value": {"type": "Point",
|
||
"coordinates": [sensor["lon"], sensor["lat"]]}},
|
||
"name": {"type": "Property", "value": sensor["name"]},
|
||
"batteryLevel": {"type": "Property", "value": random.randint(60, 100)},
|
||
# NOUVEAU: Traçabilité MQTT (Conforme NGSI-LD)
|
||
# "source" est un champ standard NGSI-LD (ETSI)
|
||
# "mqttTopic" est une propriété personnalisée (étendue autorisée)
|
||
"source": {"type": "Property", "value": source},
|
||
"mqttTopic": {"type": "Property", "value": topic},
|
||
}
|
||
|
||
# Attributs spécifiques par type de modèle
|
||
ranges = SENSOR_RANGES.get(stype, {})
|
||
props = {}
|
||
for field, val_range in ranges.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)):
|
||
props[field] = {"type": "Property", "value": round(random.uniform(lo, hi), 1)}
|
||
elif isinstance(val_range, list):
|
||
props[field] = {"type": "Property", "value": random.choice(val_range)}
|
||
|
||
# Mapping vers les noms d'attributs Smart Data Models
|
||
if stype == "airquality":
|
||
if "pm25_ugm3" in props: payload["NO2"] = props.pop("pm25_ugm3") # Simplifié
|
||
if "pm10_ugm3" in props: payload["PM10"] = props.pop("pm10_ugm3")
|
||
if "no2_ugm3" in props: payload["NO2"] = props.pop("no2_ugm3")
|
||
if "o3_ugm3" in props: payload["O3"] = props.pop("o3_ugm3")
|
||
if "co_mgm3" in props: payload["CO"] = props.pop("co_mgm3")
|
||
if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius")
|
||
if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent")
|
||
|
||
elif stype == "traffic":
|
||
if "vehicle_count" in props: payload["vehicleCount"] = props.pop("vehicle_count")
|
||
if "average_speed_kmh" in props: payload["averageVehicleSpeed"] = props.pop("average_speed_kmh")
|
||
if "congestion_level" in props: payload["congestion"] = props.pop("congestion_level")
|
||
if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent")
|
||
|
||
elif stype == "parking":
|
||
if "available_spots" in props: payload["availableSpotNumber"] = props.pop("available_spots")
|
||
if "total_spots" in props: payload["totalSpotNumber"] = props.pop("total_spots")
|
||
if "occupancy_percent" in props: payload["occupancy"] = props.pop("occupancy_percent")
|
||
if "turnover_per_hour" in props: payload["turnover"] = props.pop("turnover_per_hour")
|
||
|
||
elif stype == "noise":
|
||
if "noise_level_db" in props: payload["noiseLevel"] = props.pop("noise_level_db")
|
||
if "peak_db" in props: payload["noisePeak"] = props.pop("peak_db")
|
||
payload["noiseCategory"] = {"type": "Property", "value": random.choice(NOISE_CATEGORIES)}
|
||
|
||
elif stype == "weather":
|
||
if "temperature_celsius" in props: payload["temperature"] = props.pop("temperature_celsius")
|
||
if "humidity_percent" in props: payload["relativeHumidity"] = props.pop("humidity_percent")
|
||
if "rain_mm" in props: payload["rainfall"] = props.pop("rain_mm")
|
||
if "uv_index" in props: payload["uvIndex"] = props.pop("uv_index")
|
||
if "wind_speed_kmh" in props: payload["windSpeed"] = props.pop("wind_speed_kmh")
|
||
|
||
elif stype == "light":
|
||
if "brightness_lux" in props: payload["illuminance"] = props.pop("brightness_lux")
|
||
if "power_consumption_w" in props: payload["power"] = props.pop("power_consumption_w")
|
||
payload["status"] = {"type": "Property", "value": random.choice(LIGHT_STATUSES)}
|
||
|
||
return payload
|
||
|
||
def _frost_payload(sid: str, sensor: dict, source: str = "simulator", topic: str = "") -> dict:
|
||
"""Construit un payload SensorThings pour FROST-Server."""
|
||
stype = sensor["type"]
|
||
ranges = SENSOR_RANGES.get(stype, {})
|
||
datastreams = []
|
||
|
||
for field, val_range in ranges.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)) and isinstance(hi, (int, float)):
|
||
val = round(random.uniform(lo, hi), 1)
|
||
unit = "http://www.qudt.org/vocab/unit#DegreeCelsius"
|
||
obs_prop = {
|
||
"name": f"{field} Observation",
|
||
"description": f"Observation of {field}",
|
||
"definition": unit,
|
||
}
|
||
sensor_data = {
|
||
"name": f"Sensor {sid} {field}",
|
||
"description": f"Sensor {sid} measuring {field}",
|
||
"encodingType": "http://www.opengis.net/doc/IS/SensorML/2.0",
|
||
"metadata": {"unit": unit},
|
||
}
|
||
ds = {
|
||
"name": f"Datastream {stype}/{field}",
|
||
"description": f"Datastream for {stype} sensor {sid} - {field}",
|
||
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
|
||
"unitOfMeasurement": {"name": field, "symbol": "", "definition": unit},
|
||
"Sensor": sensor_data,
|
||
"ObservedProperty": obs_prop,
|
||
}
|
||
datastreams.append((field, ds, val))
|
||
|
||
thing_payload = {
|
||
"name": f"Thing_{sid}",
|
||
"description": f"Smart City {stype} sensor in Martinique",
|
||
"properties": {
|
||
"sensorType": stype,
|
||
"region": "Martinique",
|
||
"source": source, # Traçabilité
|
||
"mqttTopic": topic # Traçabilité
|
||
},
|
||
}
|
||
return thing_payload, datastreams
|
||
|
||
# =============================================================================
|
||
# HTTP helper
|
||
# =============================================================================
|
||
def _http_post(url: str, data: dict, headers: dict, broker: str = "unknown") -> str:
|
||
"""POST et retourne 'ok' ou 'created' (ou '' si échec)."""
|
||
try:
|
||
body = json.dumps(data).encode()
|
||
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
|
||
with http_request_duration.labels(broker=broker, method="POST").time():
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||
http_requests_total.labels(broker=broker, method="POST", status_code=str(resp.status)).inc()
|
||
if resp.status == 204:
|
||
return 'created' # No Content — succès
|
||
if resp.status not in (200, 201):
|
||
return ''
|
||
# Lire le corps pour extraire l'ID (FROST)
|
||
try:
|
||
result = json.loads(resp.read())
|
||
if '@iot.selfLink' in result:
|
||
link = result['@iot.selfLink']
|
||
return link.split('(')[1].rstrip(')')
|
||
if '@iot.id' in result:
|
||
return str(result['@iot.id'])
|
||
except Exception:
|
||
pass
|
||
location = resp.headers.get('Location', '')
|
||
if location:
|
||
return location.split('(')[1].rstrip(')') if '(' in location else ''
|
||
return 'created'
|
||
except Exception:
|
||
pass
|
||
except urllib.error.HTTPError as e:
|
||
# Lire le corps de l'erreur pour debug
|
||
try:
|
||
err_body = e.read().decode()[:200]
|
||
except Exception:
|
||
err_body = str(e)
|
||
print(f" ⚠️ HTTP POST {url} → {e.code}: {err_body}")
|
||
http_requests_total.labels(broker=broker, method="POST", status_code=str(e.code)).inc()
|
||
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc()
|
||
return ''
|
||
except Exception as e:
|
||
http_requests_total.labels(broker=broker, method="POST", status_code="exception").inc()
|
||
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc()
|
||
print(f" ⚠️ HTTP POST {url} → {e}")
|
||
return ''
|
||
|
||
def _http_put(url: str, data: dict, headers: dict, broker: str = "unknown") -> bool:
|
||
try:
|
||
body = json.dumps(data).encode()
|
||
req = urllib.request.Request(url, data=body, headers=headers, method="PUT")
|
||
with http_request_duration.labels(broker=broker, method="PUT").time():
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
http_requests_total.labels(broker=broker, method="PUT", status_code=str(resp.status)).inc()
|
||
return resp.status in (200, 204)
|
||
except urllib.error.HTTPError as e:
|
||
http_requests_total.labels(broker=broker, method="PUT", status_code=str(e.code)).inc()
|
||
if e.code == 409:
|
||
return True # Already exists - that's fine
|
||
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc()
|
||
print(f" ⚠️ HTTP PUT {url} → {e}")
|
||
return False
|
||
except Exception as e:
|
||
http_requests_total.labels(broker=broker, method="PUT", status_code="exception").inc()
|
||
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc()
|
||
print(f" ⚠️ HTTP PUT {url} → {e}")
|
||
return False
|
||
|
||
# =============================================================================
|
||
# MQTT Client multi-broker
|
||
# =============================================================================
|
||
class MultiMQTT:
|
||
def __init__(self):
|
||
self.clients: dict[str, mqtt.Client] = {}
|
||
self.ok: dict[str, bool] = {}
|
||
self._lock = threading.Lock()
|
||
self._setup()
|
||
|
||
def _mk_client(self, name: str, host: str, port: int,
|
||
tls: bool = False, user: str = "", pwd: str = "",
|
||
ws: bool = False) -> mqtt.Client:
|
||
cid = f"smartcity-sim-{name}-{os.getpid()}"
|
||
c = mqtt.Client(client_id=cid, protocol=mqtt.MQTTv311)
|
||
if user:
|
||
c.username_pw_set(user, pwd)
|
||
if tls:
|
||
c.tls_set(cert_reqs=ssl.CERT_NONE)
|
||
c.tls_insecure_set(True)
|
||
if ws:
|
||
c.ws_set(b"/mqtt")
|
||
c.on_connect = lambda _c, _, __, rc: self._on_connect(name, rc)
|
||
c.on_disconnect = lambda _c, _, __: self._on_disconnect(name)
|
||
try:
|
||
c.connect(host, port, keepalive=30)
|
||
c.loop_start()
|
||
except Exception as e:
|
||
print(f"[MQTT] ❌ {name} @ {host}:{port} → {e}")
|
||
self.ok[name] = False
|
||
return c
|
||
|
||
def _on_connect(self, name: str, rc: int):
|
||
with self._lock:
|
||
if rc == 0:
|
||
self.ok[name] = True
|
||
mqtt_broker_connected.labels(broker=name).set(1)
|
||
mqtt_connection_total.labels(broker=name, status="success").inc()
|
||
print(f"[MQTT] ✅ {name} connecté")
|
||
else:
|
||
self.ok[name] = False
|
||
mqtt_broker_connected.labels(broker=name).set(0)
|
||
mqtt_connection_total.labels(broker=name, status="failure").inc()
|
||
print(f"[MQTT] ❌ {name} rc={rc}")
|
||
|
||
def _on_disconnect(self, name: str):
|
||
with self._lock:
|
||
self.ok[name] = False
|
||
mqtt_broker_connected.labels(broker=name).set(0)
|
||
print(f"[MQTT] ⚠️ {name} déconnecté")
|
||
|
||
def _setup(self):
|
||
# Utiliser les variables d'environnement pour les brokers
|
||
brokers = [
|
||
("EMQX", EMQX_HOST, EMQX_PORT, False, "", ""),
|
||
("Mosquitto", MOSQUITTO_HOST, MOSQUITTO_PORT, False, "bunker", "bunker"),
|
||
("BunkerM", BUNKERM_HOST, BUNKERM_PORT, False, "bunker", "bunker"), # Port 1900 = MQTT simple, pas TLS
|
||
]
|
||
print("[MQTT] 🔌 Connexion aux brokers...")
|
||
for name, host, port, tls, user, pwd in brokers:
|
||
c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd)
|
||
self.clients[name] = c
|
||
self.ok[name] = False
|
||
time.sleep(3) # Attend les connexions
|
||
|
||
def publish(self, topic: str, payload: str, sensor_type: str = "unknown") -> dict[str, bool]:
|
||
results = {}
|
||
payload_bytes = len(payload.encode())
|
||
with self._lock:
|
||
for name, client in self.clients.items():
|
||
if self.ok.get(name, False):
|
||
with publish_duration.labels(broker=name).time():
|
||
try:
|
||
r = client.publish(topic, payload, qos=1)
|
||
success = (r.rc == mqtt.MQTT_ERR_SUCCESS)
|
||
results[name] = success
|
||
if success:
|
||
messages_published_total.labels(broker=name, sensor_type=sensor_type).inc()
|
||
message_payload_size.labels(broker=name).observe(payload_bytes)
|
||
else:
|
||
messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="mqtt_rc").inc()
|
||
except Exception:
|
||
results[name] = False
|
||
messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="exception").inc()
|
||
else:
|
||
results[name] = False
|
||
return results
|
||
|
||
def publish_iot_agent(self, sid: str, payload: dict, sensor_type: str = "unknown") -> bool:
|
||
"""Publie sur le topic IoT-Agent (json/smartcity-api-key/{sid}/attrs) via les 3 brokers."""
|
||
topic = f"json/smartcity-api-key/{sid}/attrs"
|
||
msg = json.dumps(payload, ensure_ascii=False)
|
||
payload_bytes = len(msg.encode())
|
||
|
||
success = False
|
||
# Publier sur les 3 brokers: emqx, mosquitto, bunkerm
|
||
for broker_name in ['emqx', 'mosquitto', 'bunkerm']:
|
||
if broker_name in self.clients and self.ok.get(broker_name, False):
|
||
try:
|
||
r = self.clients[broker_name].publish(topic, msg, qos=1)
|
||
if r.rc == mqtt.MQTT_ERR_SUCCESS:
|
||
success = True
|
||
messages_published_total.labels(broker='iot-agent', sensor_type=sensor_type).inc()
|
||
message_payload_size.labels(broker='iot-agent').observe(payload_bytes)
|
||
except Exception:
|
||
messages_errors_total.labels(broker='iot-agent', sensor_type=sensor_type, error_type="exception").inc()
|
||
return success
|
||
|
||
def stop(self):
|
||
for name, c in self.clients.items():
|
||
try:
|
||
c.loop_stop()
|
||
c.disconnect()
|
||
except Exception:
|
||
pass
|
||
|
||
# =============================================================================
|
||
# URLs de base (résolues au démarrage)
|
||
# =============================================================================
|
||
ORION_HOST = "localhost"
|
||
ORION_PORT = "2026"
|
||
ORION_URL = f"http://{ORION_HOST}:{ORION_PORT}"
|
||
STELLIO_URL = os.environ.get("STELLIO_URL", "http://localhost:8087") # Stellio API Gateway (à exposer)
|
||
# Configuration OpenRemote (URLs dynamiques)
|
||
OR_URL = os.environ.get("OR_URL", "http://localhost:8080") # OpenRemote Manager (Traefik)
|
||
OR_REALM = os.environ.get("OR_REALM", "smartcity") # Default: smartcity
|
||
OR_TOKEN_URL = os.environ.get("OR_TOKEN_URL", "http://localhost:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token")
|
||
OR_TOKEN_TTL = int(os.environ.get("OR_TOKEN_TTL", "3600")) # Refresh token every hour
|
||
STELLIO_TENANT = os.environ.get("STELLIO_TENANT", "urn:ngsi-ld:tenant:default")
|
||
|
||
def publish_stellio(sid: str, sensor: dict) -> bool:
|
||
"""Publie sur Stellio via Traefik (gère le 409)."""
|
||
# Topic MQTT correspondant (pour traçabilité)
|
||
stype = sensor["type"]
|
||
topic = f"city/sensors/{stype}/{sid}"
|
||
entity = _ngsi_payload(sid, sensor, context=STELLIO_INLINE_CONTEXT, source="simulator", topic=topic)
|
||
# Stellio a besoin du @context pour résoudre les vocabulaires NGSI-LD
|
||
# (uri.etsi.org résolu depuis le JAR embarqué)
|
||
url = f"{STELLIO_URL}/ngsi-ld/v1/entities"
|
||
headers = {
|
||
"Content-Type": "application/ld+json",
|
||
"Accept": "application/ld+json",
|
||
"NGSILD-Tenant": STELLIO_TENANT,
|
||
}
|
||
try:
|
||
body = json.dumps(entity).encode()
|
||
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
|
||
with http_request_duration.labels(broker="stellio", method="POST").time():
|
||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||
http_requests_total.labels(broker="stellio", method="POST", status_code=str(resp.status)).inc()
|
||
print(f" 🏢 Stellio: ✅ (HTTP {resp.status})")
|
||
return True
|
||
except urllib.error.HTTPError as e:
|
||
http_requests_total.labels(broker="stellio", method="POST", status_code=str(e.code)).inc()
|
||
if e.code == 409: # Already exists, do update with PUT
|
||
try:
|
||
entity_id = urllib.parse.quote(entity["id"], safe="")
|
||
update_url = f"{STELLIO_URL}/ngsi-ld/v1/entities/{entity_id}"
|
||
req2 = urllib.request.Request(update_url, data=body, headers=headers, method="PUT")
|
||
with http_request_duration.labels(broker="stellio", method="PUT").time():
|
||
with urllib.request.urlopen(req2, timeout=8) as resp2:
|
||
http_requests_total.labels(broker="stellio", method="PUT", status_code=str(resp2.status)).inc()
|
||
print(f" 🏢 Stellio: ✅ (HTTP {resp2.status} updated)")
|
||
return True
|
||
except Exception as e2:
|
||
http_requests_total.labels(broker="stellio", method="PUT", status_code="error").inc()
|
||
messages_errors_total.labels(broker="stellio", sensor_type=stype, error_type="http_error").inc()
|
||
print(f" ⚠️ Stellio update failed: {e2}")
|
||
return False
|
||
try:
|
||
err = e.read().decode()[:300]
|
||
except Exception:
|
||
err = str(e)
|
||
messages_errors_total.labels(broker="stellio", sensor_type=stype, error_type="http_error").inc()
|
||
print(f" ⚠️ Stellio → {e.code}: {err}")
|
||
return False
|
||
except Exception as e:
|
||
http_requests_total.labels(broker="stellio", method="POST", status_code="exception").inc()
|
||
messages_errors_total.labels(broker="stellio", sensor_type=stype, error_type="exception").inc()
|
||
print(f" ⚠️ Stellio → {e}")
|
||
return False
|
||
|
||
def publish_orion(sid: str, sensor: dict) -> bool:
|
||
"""Publie sur Orion-LD (POST create, PATCH update)."""
|
||
# Topic MQTT correspondant (pour traçabilité)
|
||
stype = sensor["type"]
|
||
topic = f"city/sensors/{stype}/{sid}"
|
||
entity = _ngsi_payload(sid, sensor, source="simulator", topic=topic)
|
||
# Orion-LD est exposé sur localhost:2026 (hôte)
|
||
base = "http://localhost:2026/ngsi-ld/v1"
|
||
# 1. Essayer de créer (POST)
|
||
try:
|
||
body = json.dumps(entity).encode()
|
||
req = urllib.request.Request(f"{base}/entities", data=body,
|
||
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="POST")
|
||
with http_request_duration.labels(broker="orion_ld", method="POST").time():
|
||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||
http_requests_total.labels(broker="orion_ld", method="POST", status_code=str(resp.status)).inc()
|
||
print(f" 🌐 Orion-LD: ✅ (HTTP {resp.status} created)")
|
||
return True
|
||
except urllib.error.HTTPError as e:
|
||
http_requests_total.labels(broker="orion_ld", method="POST", status_code=str(e.code)).inc()
|
||
if e.code != 409:
|
||
messages_errors_total.labels(broker="orion_ld", sensor_type=stype, error_type="http_error").inc()
|
||
print(f" ⚠️ Orion-LD → {e.code}: {e.read().decode()[:200]}")
|
||
return False
|
||
# 409 = déjà existant → PATCH
|
||
# 2. Déjà existant (409) → PATCH sur les attributs
|
||
try:
|
||
eid = urllib.parse.quote(entity['id'], safe='')
|
||
patch_url = f"{base}/entities/{eid}/attrs"
|
||
req2 = urllib.request.Request(patch_url, data=body,
|
||
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="PATCH")
|
||
with http_request_duration.labels(broker="orion_ld", method="PATCH").time():
|
||
with urllib.request.urlopen(req2, timeout=8) as resp2:
|
||
http_requests_total.labels(broker="orion_ld", method="PATCH", status_code=str(resp2.status)).inc()
|
||
print(f" 🌐 Orion-LD: ✅ (HTTP {resp2.status} updated)")
|
||
return True
|
||
except Exception as e2:
|
||
http_requests_total.labels(broker="orion_ld", method="PATCH", status_code="error").inc()
|
||
messages_errors_total.labels(broker="orion_ld", sensor_type=stype, error_type="http_error").inc()
|
||
print(f" ⚠️ Orion-LD PATCH failed: {e2}")
|
||
return False
|
||
|
||
def publish_bunkerm(sid: str, sensor: dict, values: dict) -> bool:
|
||
"""Publie sur BunkerM via HTTP API (port 2000) avec session."""
|
||
import base64, http.cookiejar, urllib.request, json
|
||
from datetime import datetime, timezone
|
||
|
||
host = "bunkerm_bunkerm_1:2000"
|
||
login_url = f"http://{host}/login"
|
||
data_url = f"http://{host}/api/sensors/data"
|
||
|
||
# 1. Cookie jar pour maintenir la session
|
||
cj = http.cookiejar.CookieJar()
|
||
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
|
||
|
||
# 2. Authentification (Basic) pour obtenir le cookie de session
|
||
creds = base64.b64encode(b"bunker:bunker").decode()
|
||
auth_header = {"Authorization": f"Basic {creds}"}
|
||
|
||
try:
|
||
# GET sur /login pour initialiser la session
|
||
req_login = urllib.request.Request(login_url, headers=auth_header)
|
||
opener.open(req_login, timeout=5)
|
||
except Exception as e:
|
||
print(f" ⚠️ BunkerM login → {e}")
|
||
return False
|
||
|
||
# 3. Préparer le payload
|
||
payload = {
|
||
"sensor_id": sid,
|
||
"sensor_type": sensor["type"],
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
}
|
||
for k, v in values.items():
|
||
if k not in payload:
|
||
payload[k] = v
|
||
|
||
# 4. POST avec le cookie de session
|
||
data = json.dumps(payload).encode()
|
||
req = urllib.request.Request(
|
||
data_url,
|
||
data=data,
|
||
headers={**auth_header, "Content-Type": "application/json"},
|
||
method="POST"
|
||
)
|
||
try:
|
||
with http_request_duration.labels(broker="bunkerm", method="POST").time():
|
||
with opener.open(req, timeout=5) as resp:
|
||
http_requests_total.labels(broker="bunkerm", method="POST", status_code=str(resp.status)).inc()
|
||
messages_published_total.labels(broker="bunkerm", sensor_type=sensor["type"]).inc()
|
||
print(f" ✅ BunkerM: HTTP {resp.status}")
|
||
return resp.status in (200, 201, 204)
|
||
except Exception as e:
|
||
http_requests_total.labels(broker="bunkerm", method="POST", status_code="exception").inc()
|
||
messages_errors_total.labels(broker="bunkerm", sensor_type=sensor["type"], error_type="exception").inc()
|
||
print(f" ⚠️ BunkerM POST → {e}")
|
||
return False
|
||
|
||
def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool:
|
||
"""Crée le Thing (1 par capteur) + Datastreams, puis POST l'Observation."""
|
||
# Cache : {sid: (thing_id, {field: ds_id})}
|
||
if sid in _frost_cache:
|
||
thing_id, ds_map = _frost_cache[sid]
|
||
if field in ds_map:
|
||
ds_id = ds_map[field]
|
||
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
|
||
obs = {
|
||
"resultTime": datetime.now(timezone.utc).isoformat(),
|
||
"result": value,
|
||
"FeatureOfInterest": {
|
||
"name": f"Location {sid}",
|
||
"description": f"Feature of interest for sensor {sid}",
|
||
"encodingType": "application/vnd.geo+json",
|
||
"feature": {
|
||
"type": "Point",
|
||
"coordinates": [sensor.get("lon", -61.0), sensor.get("lat", 14.6)]
|
||
}
|
||
}
|
||
}
|
||
if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"):
|
||
print(f" ✅ FROST Observation {sid}/{field} → OK (cached)")
|
||
return True
|
||
else:
|
||
print(f" ⚠️ FROST Observation {sid}/{field} → échec")
|
||
return False
|
||
|
||
# Premier appel pour ce capteur : créer Thing + tous les Datastreams
|
||
# Topic MQTT pour traçabilité
|
||
stype = sensor["type"]
|
||
topic = f"city/sensors/{stype}/{sid}"
|
||
thing_payload, datastreams = _frost_payload(sid, sensor, source="simulator", topic=topic)
|
||
print(f" 📊 FROST: POST Thing {sid}...")
|
||
tid = _http_post(f"{FROST_URL}/Things", thing_payload, FROST_HEADERS, broker="frost")
|
||
if not tid:
|
||
print(f" ⚠️ FROST Thing {sid} → échec création")
|
||
return False
|
||
print(f" ✅ FROST Thing {sid} créé (ID: {tid})")
|
||
|
||
# Créer les Datastreams
|
||
ds_map = {}
|
||
for f, ds, _ in datastreams:
|
||
ds["Thing"] = {"@iot.id": tid}
|
||
print(f" 📊 FROST: POST Datastream {sid}/{f}...")
|
||
ds_id = _http_post(f"{FROST_URL}/Datastreams", ds, FROST_HEADERS, broker="frost")
|
||
if ds_id:
|
||
print(f" ✅ FROST Datastream {sid}/{f} créé (ID: {ds_id})")
|
||
ds_map[f] = ds_id
|
||
else:
|
||
print(f" ⚠️ FROST Datastream {sid}/{f} → échec")
|
||
|
||
_frost_cache[sid] = (tid, ds_map)
|
||
|
||
# Poster l'observation pour le field actuel
|
||
if field in ds_map:
|
||
ds_id = ds_map[field]
|
||
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
|
||
obs = {"resultTime": datetime.now(timezone.utc).isoformat(), "result": value}
|
||
if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"):
|
||
print(f" ✅ FROST Observation {sid}/{field} → OK")
|
||
return True
|
||
return False
|
||
|
||
# =============================================================================
|
||
# OpenRemote REST
|
||
# =============================================================================
|
||
_or_token_cache = {"token": "", "expires": 0}
|
||
|
||
def _get_or_token() -> str:
|
||
"""Obtain an OpenRemote token via password grant (admin user)."""
|
||
import time, urllib.parse
|
||
if _or_token_cache["token"] and _or_token_cache["expires"] > time.time() + 60:
|
||
return _or_token_cache["token"]
|
||
try:
|
||
# Use password grant with admin user (full rights)
|
||
token_url = f"http://localhost:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token"
|
||
client_id = os.environ.get("OR_CLIENT_ID", "openremote")
|
||
client_secret = os.environ.get("OR_CLIENT_SECRET", "QVTnyObwXdpQ0Vuc60kFSonidK49FiXb")
|
||
data = urllib.parse.urlencode({
|
||
"grant_type": "password",
|
||
"username": os.environ.get("OR_ADMIN_USER", "admin"),
|
||
"password": os.environ.get("OR_ADMIN_PASS", "Digitribe972"),
|
||
"client_id": client_id,
|
||
"client_secret": client_secret
|
||
}).encode()
|
||
req = urllib.request.Request(
|
||
token_url,
|
||
data=data,
|
||
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
||
)
|
||
with urllib.request.urlopen(req, timeout=5) as r:
|
||
token_data = json.loads(r.read().decode())
|
||
_or_token_cache["token"] = token_data["access_token"]
|
||
_or_token_cache["expires"] = time.time() + token_data.get("expires_in", 300) - 60
|
||
return _or_token_cache["token"]
|
||
except Exception as e:
|
||
print(f" ⚠️ OpenRemote token → {e}")
|
||
return ""
|
||
|
||
def _or_put(asset_id: str, payload: dict) -> bool:
|
||
"""PUT update sur un asset OpenRemote (avec version)."""
|
||
token = _get_or_token()
|
||
if not token:
|
||
return False
|
||
try:
|
||
body = json.dumps(payload).encode()
|
||
url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}"
|
||
req = urllib.request.Request(url, data=body,
|
||
headers={
|
||
"Authorization": f"Bearer {token}",
|
||
"Content-Type": "application/json",
|
||
"If-Match": str(payload.get("version", 1)),
|
||
},
|
||
method="PUT")
|
||
with http_request_duration.labels(broker="openremote", method="PUT").time():
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
http_requests_total.labels(broker="openremote", method="PUT", status_code=str(resp.status)).inc()
|
||
messages_published_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown")).inc()
|
||
return resp.status in (200, 204)
|
||
except urllib.error.HTTPError as e:
|
||
http_requests_total.labels(broker="openremote", method="PUT", status_code=str(e.code)).inc()
|
||
messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="http_error").inc()
|
||
print(f" ⚠️ OR PUT {asset_id} → HTTP {e.code}")
|
||
return False
|
||
except Exception as e:
|
||
http_requests_total.labels(broker="openremote", method="PUT", status_code="exception").inc()
|
||
messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="exception").inc()
|
||
print(f" ⚠️ OR PUT {asset_id} → {e}")
|
||
return False
|
||
|
||
def publish_openremote(sid: str, sensor: dict, values: dict) -> bool:
|
||
"""Met à jour les attributs d'un asset OpenRemote via REST."""
|
||
# Mapping sid → asset ID (créés manuellement dans OR)
|
||
ASSET_MAP = {
|
||
"traffic_000": "7b5c5670d1b84865ba3ac7", # Traffic Fort-de-France Centre
|
||
"traffic_001": "557f6e993b994d3cb81017", # Traffic Fort-de-France North
|
||
"traffic_002": "cb81dfd2d2dc4d25adc9c2", # Traffic Fort-de-France South
|
||
"airquality_000": "a51c982a2d3e451898b978", # Air Quality Fort-de-France
|
||
"parking_000": "b8f0df19e0af47b386ebb9", # Parking Fort-de-France Centre
|
||
"noise_000": "9035103d1866454fb7e451", # Noise Fort-de-France Centre
|
||
"weather_000": "b9de80905ac640f488ab27", # Weather Lamentin Airport
|
||
"light_000": "ee7823a41e594851ba202f", # Light Fort-de-France
|
||
}
|
||
asset_id = ASSET_MAP.get(sid)
|
||
if not asset_id:
|
||
return False
|
||
# Construire les attributs à jour
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
attrs = {
|
||
"timestamp": {"type": "DateTime", "value": now, "timestamp": now},
|
||
"battery_level": {"type": "Number", "value": random.randint(60, 100), "timestamp": now},
|
||
}
|
||
# Mapper les valeurs du payload vers les attributs OR
|
||
field_map = {
|
||
"temperature_celsius": "temperature",
|
||
"humidity_percent": "humidity",
|
||
"noise_level_db": "noiseLevel",
|
||
"pm25_ugm3": "airQuality",
|
||
"vehicle_count": "trafficFlow",
|
||
"available_spots": "parking",
|
||
"brightness_lux": "light",
|
||
"flood": "flood",
|
||
}
|
||
for field, val in values.items():
|
||
attr_name = field_map.get(field, field)
|
||
attrs[attr_name] = {
|
||
"type": "Number",
|
||
"value": val,
|
||
"timestamp": now,
|
||
"unit": "µg/m³" if "ugm3" in field else ("°C" if "celsius" in field else ("%" if "percent" in field or "humidity" in field else ("dB" if "db" in field else ""))),
|
||
}
|
||
payload = {
|
||
"id": asset_id,
|
||
"name": sensor["name"],
|
||
"type": "IOTSensor",
|
||
"realm": OR_REALM,
|
||
"attributes": attrs,
|
||
}
|
||
return _or_put(asset_id, payload)
|
||
|
||
# =============================================================================
|
||
# Pulsar — HTTP REST Producer
|
||
# API: POST http://host:8080/admin/v2/persistent/public/default/{topic}/produce
|
||
# Payload: {"messages": [{"payload": "<base64>", "properties": {...}}]}
|
||
# Topics auto-créés par le premier message (Pulsar standalone)
|
||
# =============================================================================
|
||
_pulsar_session = None
|
||
|
||
def _get_pulsar_session():
|
||
global _pulsar_session
|
||
if _pulsar_session is None:
|
||
import urllib.request
|
||
_pulsar_session = urllib.request
|
||
return _pulsar_session
|
||
|
||
def _init_pulsar() -> bool:
|
||
"""Teste la connectivité Pulsar au démarrage."""
|
||
try:
|
||
import urllib.request
|
||
req = urllib.request.Request(f"{PULSAR_BASE}/admin/v2/clusters")
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
if resp.status == 200:
|
||
print(f"[PULSAR] ✅ Connected to {PULSAR_BASE}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"[PULSAR] ⚠️ Cannot reach {PULSAR_BASE}: {e}")
|
||
return False
|
||
|
||
def publish_pulsar(sid: str, sensor: dict, payload: dict) -> bool:
|
||
"""Publie un message sur Pulsar via le client Python (port binaire 6650)."""
|
||
stype = sensor["type"]
|
||
topic = f"persistent://public/default/smartcity-{stype.replace('-','')}"
|
||
try:
|
||
import pulsar
|
||
with publish_duration.labels(broker="pulsar").time():
|
||
client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650")
|
||
producer = client.create_producer(topic)
|
||
body = json.dumps(payload, ensure_ascii=False).encode()
|
||
producer.send(body, properties={"sensor_id": sid, "source": "simulator"})
|
||
client.close()
|
||
messages_published_total.labels(broker="pulsar", sensor_type=stype).inc()
|
||
message_payload_size.labels(broker="pulsar").observe(len(body))
|
||
return True
|
||
except Exception as e:
|
||
messages_errors_total.labels(broker="pulsar", sensor_type=stype, error_type="exception").inc()
|
||
print(f" ⚠️ Pulsar → {e}")
|
||
return False
|
||
|
||
# =============================================================================
|
||
# Redpanda / Kafka — HTTP REST Proxy
|
||
# API: POST http://host:8082/topics/{topic}
|
||
# Payload: {"records": [{"value": "<base64>"}]}
|
||
# Topics auto-créés par le premier message (Redpanda)
|
||
# =============================================================================
|
||
_redpanda_session = None
|
||
|
||
def _get_redpanda_session():
|
||
global _redpanda_session
|
||
if _redpanda_session is None:
|
||
import urllib.request
|
||
_redpanda_session = urllib.request
|
||
return _redpanda_session
|
||
|
||
def _init_redpanda() -> bool:
|
||
"""Teste la connectivité Redpanda au démarrage."""
|
||
try:
|
||
import urllib.request
|
||
req = urllib.request.Request(f"{REDPANDA_BASE}/v1/status/alive")
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
if resp.status == 200:
|
||
print(f"[REDPANDA] ✅ Connected to {REDPANDA_BASE}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"[REDPANDA] ⚠️ Cannot reach {REDPANDA_BASE}: {e}")
|
||
return False
|
||
|
||
def publish_redpanda(sid: str, sensor: dict, payload: dict) -> bool:
|
||
"""Publie un message sur Redpanda/Kafka via le REST Proxy."""
|
||
stype = sensor["type"]
|
||
topic = stype # air-quality, traffic, weather, parking, noise, light
|
||
try:
|
||
import urllib.request, base64
|
||
body = json.dumps(payload, ensure_ascii=False)
|
||
b64 = base64.b64encode(body.encode()).decode()
|
||
record = {
|
||
"records": [{"value": b64}]
|
||
}
|
||
url = f"{REDPANDA_BASE}/topics/{topic}"
|
||
req = urllib.request.Request(
|
||
url,
|
||
data=json.dumps(record).encode(),
|
||
headers={"Content-Type": "application/vnd.kafka.json.v2+json"},
|
||
method="POST"
|
||
)
|
||
with http_request_duration.labels(broker="redpanda", method="POST").time():
|
||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||
http_requests_total.labels(broker="redpanda", method="POST", status_code=str(resp.status)).inc()
|
||
messages_published_total.labels(broker="redpanda", sensor_type=stype).inc()
|
||
message_payload_size.labels(broker="redpanda").observe(len(body.encode()))
|
||
return resp.status in (200, 201, 204)
|
||
except urllib.error.HTTPError as e:
|
||
http_requests_total.labels(broker="redpanda", method="POST", status_code=str(e.code)).inc()
|
||
messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="http_error").inc()
|
||
print(f" ⚠️ Redpanda → {e.code}")
|
||
return False
|
||
except Exception as e:
|
||
http_requests_total.labels(broker="redpanda", method="POST", status_code="exception").inc()
|
||
messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="exception").inc()
|
||
print(f" ⚠️ Redpanda → {e}")
|
||
return False
|
||
|
||
def publish_influx(sid: str, sensor: dict, values: dict) -> bool:
|
||
"""Write sensor data to InfluxDB (async, non-blocking)."""
|
||
if not _influx_write_api:
|
||
influx_write_total.labels(status="skipped").inc()
|
||
return False
|
||
|
||
def _write_async():
|
||
try:
|
||
stype = sensor["type"]
|
||
lat = sensor.get("lat", BASE_LAT)
|
||
lon = sensor.get("lon", BASE_LON)
|
||
|
||
points = []
|
||
for field, value in values.items():
|
||
if isinstance(value, (int, float)):
|
||
p = influxdb_client.Point(stype)\
|
||
.tag("sensor_id", sid)\
|
||
.tag("location", sensor.get("name", sid))\
|
||
.field(field, float(value))\
|
||
.field("lat", float(lat))\
|
||
.field("lon", float(lon))
|
||
points.append(p)
|
||
|
||
if points:
|
||
_influx_write_api.write(bucket=INFLUX_BUCKET, record=points)
|
||
influx_write_total.labels(status="success").inc()
|
||
print(f" 📈 InfluxDB: {len(points)} points written")
|
||
except Exception as e:
|
||
influx_write_total.labels(status="error").inc()
|
||
print(f" ⚠️ InfluxDB → {e}")
|
||
|
||
# Exécution asynchrone (non-bloquante)
|
||
t = threading.Thread(target=_write_async, daemon=True)
|
||
t.start()
|
||
return True
|
||
|
||
def main():
|
||
print("╔══════════════════════════════════════════════════╗")
|
||
print("║ Smart City Simulator — Martinique ║")
|
||
print("╚══════════════════════════════════════════════════╝")
|
||
print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s")
|
||
print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}")
|
||
print(f"[CFG] InfluxDB: {ENABLE_INFLUX} | Pulsar: {ENABLE_PULSAR} | Redpanda: {ENABLE_REDPANDA}")
|
||
|
||
# --- Démarrer le serveur Prometheus ---
|
||
_start_metrics_server()
|
||
|
||
# --- Configurer les gauges ---
|
||
for stype, count in SENSOR_COUNTS.items():
|
||
sensors_total.labels(sensor_type=stype).set(count)
|
||
up.set(1)
|
||
|
||
# Init connectivity checks
|
||
if ENABLE_PULSAR:
|
||
_init_pulsar()
|
||
# Test immédiat
|
||
print(f" 🌪️ DEBUG: Test Pulsar direct...", flush=True)
|
||
test_payload = {"type": "test", "value": 123}
|
||
test_result = publish_pulsar("test_001", {"type": "air-quality"}, test_payload)
|
||
print(f" 🌪️ DEBUG: Test Pulsar result: {test_result}", flush=True)
|
||
if ENABLE_REDPANDA:
|
||
_init_redpanda()
|
||
|
||
mqtt_client = MultiMQTT()
|
||
|
||
running = True
|
||
def signal_handler(*_):
|
||
nonlocal running
|
||
running = False
|
||
up.set(0)
|
||
print("\n[SIM] 🛑 Arrêt...")
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
signal.signal(signal.SIGTERM, signal_handler)
|
||
|
||
iteration = 0
|
||
while running:
|
||
iteration += 1
|
||
ts = datetime.now().strftime("%H:%M:%S")
|
||
print(f"\n[SIM] ⏱️ It #{iteration} — {ts}")
|
||
|
||
for sid, sensor in SENSORS.items():
|
||
stype = sensor["type"]
|
||
topic = f"city/sensors/{stype}/{sid}"
|
||
|
||
# --- Payload MQTT ---
|
||
ranges = SENSOR_RANGES.get(stype, {})
|
||
payload_mqtt = {
|
||
"id": sid,
|
||
"type": stype,
|
||
"name": sensor["name"],
|
||
"lat": sensor["lat"],
|
||
"lon": sensor["lon"],
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"battery_level": random.randint(60, 100),
|
||
}
|
||
for field, val_range in ranges.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)):
|
||
payload_mqtt[field] = round(random.uniform(lo, hi), 1)
|
||
elif isinstance(val_range, list):
|
||
payload_mqtt[field] = random.choice(val_range)
|
||
|
||
msg = json.dumps(payload_mqtt, ensure_ascii=False)
|
||
|
||
# --- MQTT publish ---
|
||
results = mqtt_client.publish(topic, msg, sensor_type=stype)
|
||
ok_mqtt = [n for n, r in results.items() if r]
|
||
if ok_mqtt:
|
||
print(f" 📤 {topic} → {','.join(ok_mqtt)}")
|
||
|
||
# --- IoT-Agent (via EMQX) ---
|
||
if ENABLE_IOT_AGENT:
|
||
ok_iot = mqtt_client.publish_iot_agent(sid, payload_mqtt, sensor_type=stype)
|
||
print(f" 🤖 IoT-Agent: {'✅' if ok_iot else '❌'}")
|
||
|
||
# Extraire les valeurs pour OpenRemote
|
||
or_values = {}
|
||
for field, val_range in ranges.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)):
|
||
or_values[field] = round(random.uniform(lo, hi), 1)
|
||
|
||
# --- OpenRemote REST ---
|
||
if ENABLE_OPENREMOTE:
|
||
ok_or = publish_openremote(sid, sensor, or_values)
|
||
print(f" 🏠 OpenRemote: {'✅' if ok_or else '⚠️ skipped'}")
|
||
|
||
# --- Orion-LD --- (DÉSACTIVÉ: Utiliser uniquement IoT-Agents MQTT)
|
||
# if ENABLE_ORION:
|
||
# ok_or = publish_orion(sid, sensor)
|
||
# print(f" 🌐 Orion-LD: {'✅' if ok_or else '⚠️ skipped'}")
|
||
|
||
# --- Stellio --- (DÉSACTIVÉ: Utiliser uniquement IoT-Agents MQTT)
|
||
# if ENABLE_STELLIO:
|
||
# ok_st = publish_stellio(sid, sensor)
|
||
# print(f" 🏢 Stellio: {'✅' if ok_st else '❌'}")
|
||
|
||
# --- FROST ---
|
||
if ENABLE_FROST:
|
||
ranges2 = SENSOR_RANGES.get(stype, {})
|
||
for field, val_range in ranges2.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)):
|
||
val = round(random.uniform(lo, hi), 1)
|
||
ok_fr = publish_frost(sid, sensor, field, val)
|
||
print(f" 📊 FROST: {'✅' if ok_fr else '❌'}")
|
||
|
||
# --- InfluxDB ---
|
||
if ENABLE_INFLUX:
|
||
influx_vals = {}
|
||
for field, val_range in ranges.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)):
|
||
influx_vals[field] = round(random.uniform(lo, hi), 1)
|
||
ok_influx = publish_influx(sid, sensor, influx_vals)
|
||
print(f" 📈 InfluxDB: {'✅' if ok_influx else '❌'}")
|
||
|
||
# --- Pulsar (HTTP REST) ---
|
||
if ENABLE_PULSAR:
|
||
print(f" 🌪️ DEBUG: calling publish_pulsar for {sid}, payload_mqtt exists: {bool(locals().get('payload_mqtt'))}", flush=True)
|
||
ok_pulsar = publish_pulsar(sid, sensor, payload_mqtt)
|
||
print(f" 🌪️ Pulsar: {'✅' if ok_pulsar else '❌'}")
|
||
|
||
# --- Redpanda (Kafka REST Proxy) ---
|
||
if ENABLE_REDPANDA:
|
||
ok_redpanda = publish_redpanda(sid, sensor, payload_mqtt)
|
||
print(f" 🐟 Redpanda: {'✅' if ok_redpanda else '❌'}")
|
||
|
||
# --- BunkerM HTTP ---
|
||
if os.getenv("BUNKERM_HTTP", "0") == "1":
|
||
ok_bunkerm = publish_bunkerm(sid, sensor, payload_mqtt)
|
||
print(f" 📦 BunkerM: {'✅' if ok_bunkerm else '❌'}")
|
||
|
||
print(f"[SIM] ✅ {len(SENSORS)} capteurs | MQTT OK: {sum(mqtt_client.ok.values())}/{len(mqtt_client.clients)} | OR: {ENABLE_OPENREMOTE}")
|
||
|
||
try:
|
||
time.sleep(INTERVAL)
|
||
except KeyboardInterrupt:
|
||
break
|
||
|
||
mqtt_client.stop()
|
||
print("[SIM] ✅ Arrêté proprement.")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|