Corrections: - Topics MQTT: index basé sur position du capteur (pas compteur global itération) - ASSET_MAP: mise à jour avec bons asset IDs (agentLink + location) - Payload REST: ajout attribut location (GeoJSONPoint) - Désactivation PUT REST sur assets avec agentLink (403 Forbidden) - MQTT OpenRemote: tentative connexion anonyme (rc=5 persistant) - Keepalive augmenté à 120s pour stabilité Note: connexion MQTT au broker Artemis d'OpenRemote échoue (rc=5 Not Authorized) Le broker nécessite une authentification spécifique non documentée. Les agents MQTT d'OpenRemote ne reçoivent donc pas les données du simulateur. La location est déjà correctement définie dans les assets en BDD.
1233 lines
56 KiB
Python
1233 lines
56 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Smart City IoT Simulator — Martinique (14.6°N, 61.2°W)
|
||
=======================================================
|
||
Publie vers MULTIPLES brokers MQTT + context brokers NGSI-LD.
|
||
|
||
Brokers MQTT:
|
||
- EMQX: emqx_emqx_1:1883 (sans auth)
|
||
- Mosquitto: mainfluxlabs-mosquitto:1883 (bunker/bunker)
|
||
- BunkerM: bunkerm_bunkerm_1:1900 (TLS, bunker/bunker)
|
||
- OpenRemote: openremote-manager-1:1883 (admin/Digitribe972)
|
||
|
||
Context Brokers REST:
|
||
# - Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD)
|
||
# - Stellio: stellio-api-gateway:8080 (NGSI-LD)
|
||
- FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings)
|
||
|
||
Streaming Platforms:
|
||
- Pulsar: smart-city-pulsar:8080 (HTTP REST Producer API)
|
||
- Redpanda: smart-city-redpanda:8082 (Kafka REST Proxy)
|
||
|
||
Time-Series DB:
|
||
- InfluxDB v2: smart-city-influxdb:8086 (via influxdb-client)
|
||
|
||
Variables d'environnement:
|
||
PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s)
|
||
BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France)
|
||
# ENABLE_ORION=1 : activer Orion-LD (défaut: 1)
|
||
# ENABLE_STELLIO=1 : activer Stellio (défaut: 1)
|
||
ENABLE_FROST=1 : activer FROST-Server (défaut: 1)
|
||
ENABLE_INFLUX=1 : activer InfluxDB v2 (défaut: 1)
|
||
ENABLE_PULSAR=1 : activer Apache Pulsar (défaut: 1)
|
||
ENABLE_REDPANDA=1 : activer Redpanda Kafka (défaut: 1)
|
||
"""
|
||
|
||
import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse
|
||
import paho.mqtt.client as mqtt
|
||
import urllib.request, urllib.error
|
||
from datetime import datetime, timezone
|
||
from typing import Any
|
||
|
||
# Prometheus metrics
|
||
import prometheus_client
|
||
from prometheus_client import Counter, Histogram, Gauge, Info
|
||
|
||
# InfluxDB support
|
||
import influxdb_client
|
||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||
|
||
# =============================================================================
|
||
# Configuration des brokers MQTT
|
||
# Configuration des brokers MQTT
|
||
# Utilise les noms de services Docker par défaut
|
||
EMQX_HOST = os.environ.get("EMQX_HOST", "emqx_emqx_1")
|
||
EMQX_PORT = int(os.environ.get("EMQX_PORT", "1883"))
|
||
MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "smart-city-mosquitto")
|
||
MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883"))
|
||
BUNKERM_HOST = os.environ.get("BUNKERM_HOST", "bunkerm_bunkerm_1")
|
||
BUNKERM_PORT = int(os.environ.get("BUNKERM_PORT", "1900"))
|
||
|
||
# =============================================================================
|
||
# Configuration
|
||
# =============================================================================
|
||
BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091"))
|
||
BASE_LON = float(os.environ.get("BASE_LON", "-61.2155"))
|
||
INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "1")) # 1s pour temps réel
|
||
#ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1"
|
||
#ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1").lower() in ("1", "true", "yes", "on")
|
||
ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1"
|
||
ENABLE_OPENREMOTE = os.environ.get("ENABLE_OPENREMOTE", "1") == "1"
|
||
OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin")
|
||
OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "Digitribe972")
|
||
OR_REALM = os.environ.get("OR_REALM", "master")
|
||
OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token
|
||
#ENABLE_IOT_AGENT = os.environ.get("ENABLE_IOT_AGENT", "1") == "1"
|
||
FROST_URL = os.environ.get("FROST_URL", "http://localhost:8090/FROST-Server/v1.1") # Exposer frost_http-web-1:8080 -> host:8086
|
||
|
||
# Pulsar config (HTTP REST — pulsar-admin + producer REST API)
|
||
ENABLE_PULSAR = os.environ.get("ENABLE_PULSAR", "1").lower() in ("1", "true", "yes", "on")
|
||
PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar")
|
||
PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "8080"))
|
||
PULSAR_BASE = f"http://{PULSAR_HOST}:{PULSAR_PORT}"
|
||
|
||
# Redpanda / Kafka config (REST Proxy HTTP)
|
||
ENABLE_REDPANDA = os.environ.get("ENABLE_REDPANDA", "1").lower() in ("1", "true", "yes", "on")
|
||
REDPANDA_HOST = os.environ.get("REDPANDA_HOST", "smart-city-redpanda")
|
||
REDPANDA_PORT = int(os.environ.get("REDPANDA_PORT", "8082"))
|
||
REDPANDA_BASE = f"http://{REDPANDA_HOST}:{REDPANDA_PORT}"
|
||
|
||
# InfluxDB config
|
||
ENABLE_INFLUX = os.environ.get("ENABLE_INFLUX", "1").lower() in ("1", "true", "yes", "on")
|
||
INFLUX_URL = os.environ.get("INFLUX_URL", "http://smart-city-influxdb:8086") # InfluxDB v2 sur smartcity-shared
|
||
INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe")
|
||
INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "smartcity") # Correspond au bucket de Telegraf
|
||
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-token")
|
||
|
||
# Prometheus metrics HTTP server
|
||
METRICS_PORT = int(os.environ.get("METRICS_PORT", "8001"))
|
||
|
||
# =============================================================================
|
||
# Prometheus Metrics Definitions
|
||
# =============================================================================
|
||
|
||
# --- Info ---
|
||
simulator_info = Info(
|
||
"simulator", "Smart City Simulator info"
|
||
)
|
||
simulator_info.info({
|
||
"version": "1.0.0",
|
||
"python_version": sys.version.split()[0],
|
||
"mqtt_brokers": "emqx,mosquitto,bunkerm",
|
||
# "context_brokers": "orion_ld,stellio,frost",
|
||
})
|
||
|
||
# --- Counters ---
|
||
messages_published_total = Counter(
|
||
"simulator_messages_published_total",
|
||
"Total messages published by broker",
|
||
["broker", "sensor_type"]
|
||
)
|
||
|
||
messages_errors_total = Counter(
|
||
"simulator_messages_errors_total",
|
||
"Total publish errors",
|
||
["broker", "sensor_type", "error_type"]
|
||
)
|
||
|
||
mqtt_connection_total = Counter(
|
||
"simulator_mqtt_connection_total",
|
||
"MQTT connection attempts",
|
||
["broker", "status"] # status: success, failure
|
||
)
|
||
|
||
http_requests_total = Counter(
|
||
"simulator_http_requests_total",
|
||
"HTTP requests to REST APIs",
|
||
["broker", "method", "status_code"]
|
||
)
|
||
|
||
influx_write_total = Counter(
|
||
"simulator_influx_write_total",
|
||
"InfluxDB write operations",
|
||
["status"] # success, error
|
||
)
|
||
|
||
# --- Gauges ---
|
||
mqtt_broker_connected = Gauge(
|
||
"simulator_mqtt_broker_connected",
|
||
"MQTT broker connection status (1=connected, 0=disconnected)",
|
||
["broker"]
|
||
)
|
||
|
||
sensors_total = Gauge(
|
||
"simulator_sensors_total",
|
||
"Total number of sensors by type",
|
||
["sensor_type"]
|
||
)
|
||
|
||
up = Gauge(
|
||
"simulator_up",
|
||
"Simulator is running (1=yes, 0=no)"
|
||
)
|
||
|
||
# --- Histograms ---
|
||
publish_duration = Histogram(
|
||
"simulator_publish_duration_seconds",
|
||
"Time spent publishing a message",
|
||
["broker"],
|
||
buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5)
|
||
)
|
||
|
||
http_request_duration = Histogram(
|
||
"simulator_http_request_duration_seconds",
|
||
"HTTP request latency to REST APIs",
|
||
["broker", "method"],
|
||
buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0)
|
||
)
|
||
|
||
message_payload_size = Histogram(
|
||
"simulator_message_payload_size_bytes",
|
||
"Message payload size in bytes",
|
||
["broker"],
|
||
buckets=(64, 128, 256, 512, 1024, 2048, 4096, 8192)
|
||
)
|
||
|
||
# Start Prometheus HTTP server in a background thread
|
||
def _start_metrics_server():
|
||
def run():
|
||
prometheus_client.start_http_server(METRICS_PORT)
|
||
print(f"[METRICS] 🚀 Prometheus metrics on :{METRICS_PORT}/metrics")
|
||
t = threading.Thread(target=run, daemon=True)
|
||
t.start()
|
||
return t
|
||
|
||
# Initialize InfluxDB client
|
||
_influx_client = None
|
||
_influx_write_api = None
|
||
if ENABLE_INFLUX:
|
||
try:
|
||
_influx_client = influxdb_client.InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
|
||
_influx_write_api = _influx_client.write_api(write_options=influxdb_client.client.write_api.ASYNCHRONOUS)
|
||
print(f"[INFLUX] ✅ Connected to {INFLUX_URL} (async mode)")
|
||
except Exception as e:
|
||
print(f"[INFLUX] ❌ Connection failed: {e}")
|
||
|
||
SENSOR_COUNTS = {
|
||
"traffic": int(os.environ.get("SENSOR_COUNT_traffic", "3")),
|
||
"airquality": int(os.environ.get("SENSOR_COUNT_airquality", "10")),
|
||
"parking": int(os.environ.get("SENSOR_COUNT_parking", "10")),
|
||
"noise": int(os.environ.get("SENSOR_COUNT_noise", "1")),
|
||
"weather": int(os.environ.get("SENSOR_COUNT_weather", "1")),
|
||
"light": int(os.environ.get("SENSOR_COUNT_light", "1")),
|
||
}
|
||
# Si SENSOR_COUNT est défini, multiplier les counts de façon proportionnelle
|
||
_total_default = sum(SENSOR_COUNTS.values())
|
||
if "SENSOR_COUNT" in os.environ:
|
||
target = int(os.environ["SENSOR_COUNT"])
|
||
ratio = target / _total_default
|
||
for k in SENSOR_COUNTS:
|
||
SENSOR_COUNTS[k] = max(1, int(SENSOR_COUNTS[k] * ratio))
|
||
|
||
# =============================================================================
|
||
# Localisation des capteurs Martinique — Coordonnées FIXES sur terre ferme
|
||
# IMPORTANT : ±0.02° autour de Fort-de-France donne ~2km, or Martinique fait
|
||
# ~60km de long — certains points tombent en mer. Solution : coordonnées fixes.
|
||
# =============================================================================
|
||
|
||
# Coordonnées réelles Martinique (terre ferme uniquement)
|
||
# Martinique : 14.4°N–14.9°N, -61.23°W–-60.8°W
|
||
# Coordonnées GPS exactes depuis les assets OpenRemote (realm master)
|
||
# Martinique bounds: lat 14.37–14.88°N, lon 61.0–61.25°W
|
||
FIXED_LOCATIONS: dict[str, dict[str, tuple[float, float]]] = {
|
||
"traffic": {
|
||
"Fort-de-France Centre": (14.6164, -61.07),
|
||
"Le Lamentin Aéroport": (14.6167, -61.0035),
|
||
"Le Robert D110": (14.6833, -60.9333),
|
||
"Sainte-Anne Plage": (14.4333, -60.9833),
|
||
"Saint-Joseph D1": (14.7, -61.05),
|
||
"Trinité Centre": (14.7167, -60.9167),
|
||
"Le François D2": (14.6833, -60.8333),
|
||
"Ducos Penitencier": (14.5833, -61.0667),
|
||
"Schœlcher Morne": (14.65, -61.1),
|
||
"Case-Pilote Bourg": (14.5167, -61.1167),
|
||
},
|
||
"airquality": {
|
||
"Fort-de-France Lamartine": (14.613, -61.0667),
|
||
"Le Lamentin Zac": (14.62, -61.0),
|
||
"Le Robert Bourg": (14.68, -60.93),
|
||
"Sainte-Anne Village": (14.43, -60.98),
|
||
"Saint-Joseph Morne": (14.705, -61.04),
|
||
"Trinité Eglise": (14.72, -60.91),
|
||
"Le François Bourg": (14.68, -60.83),
|
||
"Ducos Centre": (14.58, -61.06),
|
||
"Schœlcher Plage": (14.655, -61.11),
|
||
"Case-Pilote D1": (14.52, -61.12),
|
||
},
|
||
"parking": {
|
||
"Fort-de-France Place Clémenceau": (14.615, -61.068),
|
||
"Le Lamentin Centre Commercial": (14.618, -61.002),
|
||
"Le Robert Stade": (14.685, -60.935),
|
||
"Sainte-Anne Mairie": (14.432, -60.985),
|
||
"Saint-Joseph Ecole": (14.702, -61.045),
|
||
"Trinité Port": (14.715, -60.92),
|
||
"Le François Mairie": (14.682, -60.835),
|
||
"Ducos ZI": (14.585, -61.055),
|
||
"Schœlcher Bourg": (14.652, -61.105),
|
||
"Case-Pilote Stade": (14.518, -61.118),
|
||
},
|
||
"noise": {
|
||
"Fort-de-France Théâtre": (14.617, -61.069),
|
||
"Le Lamentin Zone Industrielle": (14.619, -61.001),
|
||
"Le Robert Bourg": (14.681, -60.932),
|
||
"Sainte-Anne Plage": (14.434, -60.982),
|
||
"Saint-Joseph Morne": (14.703, -61.042),
|
||
"Trinité Centre": (14.717, -60.918),
|
||
"Le François Bourg": (14.681, -60.832),
|
||
"Ducos Penitencier": (14.584, -61.058),
|
||
"Schœlcher Morne": (14.651, -61.102),
|
||
"Case-Pilote Village": (14.519, -61.115),
|
||
},
|
||
"weather": {
|
||
"Fort-de-France Meteo": (14.616, -61.067),
|
||
"Le Lamentin Aéroport": (14.617, -61.004),
|
||
"Le Robert Bourg": (14.682, -60.934),
|
||
"Sainte-Anne Village": (14.431, -60.981),
|
||
"Saint-Joseph Morne": (14.704, -61.043),
|
||
"Trinité Eglise": (14.718, -60.912),
|
||
"Le François Bourg": (14.683, -60.834),
|
||
"Ducos Centre": (14.586, -61.057),
|
||
"Schœlcher Plage": (14.654, -61.108),
|
||
"Case-Pilote D1": (14.521, -61.113),
|
||
},
|
||
"light": {
|
||
"Fort-de-France Place": (14.6155, -61.0685),
|
||
"Le Lamentin Rond-point": (14.6185, -61.0025),
|
||
"Le Robert D110": (14.6835, -60.9335),
|
||
"Sainte-Anne Plage": (14.4335, -60.9835),
|
||
"Saint-Joseph D1": (14.7005, -61.0505),
|
||
"Trinité Centre": (14.7165, -60.9165),
|
||
"Le François D2": (14.6835, -60.8335),
|
||
"Ducos Penitencier": (14.5835, -61.0665),
|
||
"Schœlcher Morne": (14.6505, -61.1005),
|
||
"Case-Pilote Bourg": (14.5165, -61.1165),
|
||
},
|
||
}
|
||
|
||
# Ranges par type
|
||
SENSOR_RANGES: dict[str, dict] = {
|
||
"traffic": {"vehicle_count":(10,150),"average_speed_kmh":(10,80),
|
||
"congestion_level":(0,5),"occupancy_percent":(0,100)},
|
||
"airquality": {"pm25_ugm3":(5,80),"pm10_ugm3":(10,150),"no2_ugm3":(5,60),
|
||
"o3_ugm3":(20,120),"co_mgm3":(0.1,5.0),
|
||
"temperature_celsius":(20,35),"humidity_percent":(40,95)},
|
||
"parking": {"total_spots":(50,500),"available_spots":(0,500),
|
||
"occupancy_percent":(0,100),"turnover_per_hour":(5,50)},
|
||
"noise": {"noise_level_db":(40,95),"peak_db":(60,110)},
|
||
"weather": {"temperature_celsius":(22,34),"humidity_percent":(50,95),
|
||
"wind_speed_kmh":(0,50),"pressure_hpa":(1005,1025),
|
||
"rain_mm":(0,20),"uv_index":(0,11)},
|
||
"light": {"brightness_lux":(0,100000),"power_consumption_w":(0,500)},
|
||
}
|
||
|
||
NOISE_CATEGORIES = ["quiet","moderate","loud","very_loud"]
|
||
LIGHT_STATUSES = ["on","off","dimmed","auto"]
|
||
|
||
# =============================================================================
|
||
# Capteurs déclarés
|
||
# =============================================================================
|
||
SENSORS: dict[str, dict] = {}
|
||
counter = 0
|
||
for stype, locs in FIXED_LOCATIONS.items():
|
||
for name, coords in locs.items():
|
||
sid = f"{stype}_{counter:03d}"
|
||
SENSORS[sid] = {"type": stype, "lat": coords[0], "lon": coords[1], "name": name}
|
||
counter += 1
|
||
|
||
# =============================================================================
|
||
## Payload NGSI-LD pour Orion-LD / Stellio
|
||
# =============================================================================
|
||
# Contextes NGSI-LD : core + Smart Data Models
|
||
# https://smartdatamodels.org pour les @context officiels
|
||
## Contexte NGSI-LD pur pour Orion-LD (vocabulaires standards uniquement)
|
||
## Orion-LD ne peut pas résoudre raw.githubusercontent.com — utiliser uri.etsi.org uniquement
|
||
#ORION_CONTEXT = [
|
||
# "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
|
||
#]
|
||
|
||
# Mapping sensor type → Smart Data Model type NGSI-LD
|
||
SMART_MODEL_MAPPING = {
|
||
"airquality": "AirQualityObserved",
|
||
"traffic": "TrafficFlowObserved",
|
||
"parking": "OffStreetParking",
|
||
"noise": "NoiseLevelObserved",
|
||
"weather": "WeatherObserved",
|
||
"light": "Device",
|
||
}
|
||
FROST_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"}
|
||
|
||
# Cache FROST : éviter de recréer Thing/Datastream
|
||
_frost_cache: dict[str, tuple[str, str]] = {} # (sid, field) -> (thing_id, ds_id)
|
||
|
||
## Contexte NGSI-LD pur pour Stellio et Orion-LD (vocabulaires standards uniquement)
|
||
## Stellio et Orion-LD embarquent le contexte core NGSI-LD : https://uri.etsi.org/ngsi-ld/
|
||
# On n'utilise PAS les vocabulaires smartdatamodels.org distants (inaccessibles depuis les containers)
|
||
# Les types d'entité Smart Data Models (AirQualityObserved, etc.) sont reconnus par leur nom
|
||
# Les propriétés spécifiques sont stockées telles quelles (vocabulaire libre)
|
||
#STELLIO_INLINE_CONTEXT = [
|
||
# "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
|
||
#]
|
||
|
||
def _frost_payload(sid: str, sensor: dict, source: str = "simulator", topic: str = "") -> dict:
|
||
"""Construit un payload SensorThings pour FROST-Server."""
|
||
stype = sensor["type"]
|
||
ranges = SENSOR_RANGES.get(stype, {})
|
||
datastreams = []
|
||
|
||
for field, val_range in ranges.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)) and isinstance(hi, (int, float)):
|
||
val = round(random.uniform(lo, hi), 1)
|
||
unit = "http://www.qudt.org/vocab/unit#DegreeCelsius"
|
||
obs_prop = {
|
||
"name": f"{field} Observation",
|
||
"description": f"Observation of {field}",
|
||
"definition": unit,
|
||
}
|
||
sensor_data = {
|
||
"name": f"Sensor {sid} {field}",
|
||
"description": f"Sensor {sid} measuring {field}",
|
||
"encodingType": "http://www.opengis.net/doc/IS/SensorML/2.0",
|
||
"metadata": {"unit": unit},
|
||
}
|
||
ds = {
|
||
"name": f"Datastream {stype}/{field}",
|
||
"description": f"Datastream for {stype} sensor {sid} - {field}",
|
||
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
|
||
"unitOfMeasurement": {"name": field, "symbol": "", "definition": unit},
|
||
"Sensor": sensor_data,
|
||
"ObservedProperty": obs_prop,
|
||
}
|
||
datastreams.append((field, ds, val))
|
||
|
||
thing_payload = {
|
||
"name": f"Thing_{sid}",
|
||
"description": f"Smart City {stype} sensor in Martinique",
|
||
"properties": {
|
||
"sensorType": stype,
|
||
"region": "Martinique",
|
||
"source": source, # Traçabilité
|
||
"mqttTopic": topic # Traçabilité
|
||
},
|
||
}
|
||
return thing_payload, datastreams
|
||
|
||
# =============================================================================
|
||
# HTTP helper
|
||
# =============================================================================
|
||
def _http_post(url: str, data: dict, headers: dict, broker: str = "unknown") -> str:
|
||
"""POST et retourne 'ok' ou 'created' (ou '' si échec)."""
|
||
try:
|
||
body = json.dumps(data).encode()
|
||
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
|
||
with http_request_duration.labels(broker=broker, method="POST").time():
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||
http_requests_total.labels(broker=broker, method="POST", status_code=str(resp.status)).inc()
|
||
if resp.status == 204:
|
||
return 'created' # No Content — succès
|
||
if resp.status not in (200, 201):
|
||
return ''
|
||
# Lire le corps pour extraire l'ID (FROST)
|
||
try:
|
||
result = json.loads(resp.read())
|
||
if '@iot.selfLink' in result:
|
||
link = result['@iot.selfLink']
|
||
return link.split('(')[1].rstrip(')')
|
||
if '@iot.id' in result:
|
||
return str(result['@iot.id'])
|
||
except Exception:
|
||
pass
|
||
location = resp.headers.get('Location', '')
|
||
if location:
|
||
return location.split('(')[1].rstrip(')') if '(' in location else ''
|
||
return 'created'
|
||
except Exception:
|
||
pass
|
||
except urllib.error.HTTPError as e:
|
||
# Lire le corps de l'erreur pour debug
|
||
try:
|
||
err_body = e.read().decode()[:200]
|
||
except Exception:
|
||
err_body = str(e)
|
||
print(f" ⚠️ HTTP POST {url} → {e.code}: {err_body}")
|
||
http_requests_total.labels(broker=broker, method="POST", status_code=str(e.code)).inc()
|
||
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc()
|
||
return ''
|
||
except Exception as e:
|
||
http_requests_total.labels(broker=broker, method="POST", status_code="exception").inc()
|
||
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc()
|
||
print(f" ⚠️ HTTP POST {url} → {e}")
|
||
return ''
|
||
|
||
def _http_put(url: str, data: dict, headers: dict, broker: str = "unknown") -> bool:
|
||
try:
|
||
body = json.dumps(data).encode()
|
||
req = urllib.request.Request(url, data=body, headers=headers, method="PUT")
|
||
with http_request_duration.labels(broker=broker, method="PUT").time():
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
http_requests_total.labels(broker=broker, method="PUT", status_code=str(resp.status)).inc()
|
||
return resp.status in (200, 204)
|
||
except urllib.error.HTTPError as e:
|
||
http_requests_total.labels(broker=broker, method="PUT", status_code=str(e.code)).inc()
|
||
if e.code == 409:
|
||
return True # Already exists - that's fine
|
||
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc()
|
||
print(f" ⚠️ HTTP PUT {url} → {e}")
|
||
return False
|
||
except Exception as e:
|
||
http_requests_total.labels(broker=broker, method="PUT", status_code="exception").inc()
|
||
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc()
|
||
print(f" ⚠️ HTTP PUT {url} → {e}")
|
||
return False
|
||
|
||
# =============================================================================
|
||
# MQTT Client multi-broker
|
||
# =============================================================================
|
||
class MultiMQTT:
|
||
def __init__(self):
|
||
self.clients: dict[str, mqtt.Client] = {}
|
||
self.ok: dict[str, bool] = {}
|
||
self._lock = threading.Lock()
|
||
self._setup()
|
||
|
||
def _mk_client(self, name: str, host: str, port: int,
|
||
tls: bool = False, user: str = "", pwd: str = "",
|
||
ws: bool = False, use_v5: bool = False) -> mqtt.Client:
|
||
cid = f"smartcity-sim-{name}-{os.getpid()}"
|
||
protocol = mqtt.MQTTv5 if use_v5 else mqtt.MQTTv311
|
||
# Use Callback API v1 for compatibility with existing lambda callbacks
|
||
c = mqtt.Client(client_id=cid, protocol=protocol, callback_api_version=mqtt.CallbackAPIVersion.VERSION1)
|
||
if user:
|
||
c.username_pw_set(user, pwd)
|
||
if tls:
|
||
c.tls_set(cert_reqs=ssl.CERT_NONE)
|
||
c.tls_insecure_set(True)
|
||
if ws:
|
||
c.ws_set(b"/mqtt")
|
||
c.on_connect = lambda _c, _, __, rc: self._on_connect(name, rc)
|
||
c.on_disconnect = lambda _c, _, __: self._on_disconnect(name)
|
||
try:
|
||
c.connect(host, port, keepalive=120)
|
||
c.loop_start()
|
||
except Exception as e:
|
||
print(f"[MQTT] ❌ {name} @ {host}:{port} → {e}")
|
||
self.ok[name] = False
|
||
return c
|
||
|
||
def _on_connect(self, name: str, rc: int):
|
||
with self._lock:
|
||
if rc == 0:
|
||
self.ok[name] = True
|
||
mqtt_broker_connected.labels(broker=name).set(1)
|
||
mqtt_connection_total.labels(broker=name, status="success").inc()
|
||
print(f"[MQTT] ✅ {name} connecté")
|
||
else:
|
||
self.ok[name] = False
|
||
mqtt_broker_connected.labels(broker=name).set(0)
|
||
mqtt_connection_total.labels(broker=name, status="failure").inc()
|
||
print(f"[MQTT] ❌ {name} rc={rc}")
|
||
|
||
def _on_disconnect(self, name: str):
|
||
with self._lock:
|
||
self.ok[name] = False
|
||
mqtt_broker_connected.labels(broker=name).set(0)
|
||
print(f"[MQTT] ⚠️ {name} déconnecté")
|
||
|
||
def _setup(self):
|
||
# Utiliser les variables d'environnement pour les brokers
|
||
mqtt_version = os.environ.get("MQTT_PROTOCOL_VERSION", "311")
|
||
use_v5 = (mqtt_version == "5")
|
||
print(f"[MQTT] Version du protocole: {'v5.0' if use_v5 else 'v3.1.1'} (MQTT_PROTOCOL_VERSION={mqtt_version})")
|
||
|
||
# Check which brokers are enabled
|
||
enable_emqx = os.environ.get("ENABLE_EMQX", "1") == "1"
|
||
enable_mosquitto = os.environ.get("ENABLE_MOSQUITTO", "1") == "1"
|
||
enable_bunkerm = os.environ.get("ENABLE_BUNKER", "1") == "1"
|
||
|
||
brokers = []
|
||
if enable_emqx:
|
||
brokers.append(("emqx", EMQX_HOST, EMQX_PORT, False, "", "", use_v5))
|
||
if enable_mosquitto:
|
||
brokers.append(("mosquitto", MOSQUITTO_HOST, MOSQUITTO_PORT, False, "", "", use_v5))
|
||
if enable_bunkerm:
|
||
brokers.append(("bunkerm", BUNKERM_HOST, BUNKERM_PORT, False, "bunker", "bunker", use_v5))
|
||
|
||
# OpenRemote MQTT broker (pour agents MQTT créés dans OpenRemote)
|
||
# Le broker Artemis d'OR nécessite MQTTv5 pour l'auth anonyme
|
||
if ENABLE_OPENREMOTE:
|
||
brokers.append(("openremote", "openremote_manager_1", 1883, False, "", "", True))
|
||
|
||
print(f"[MQTT] 🔌 Connexion aux brokers (EMQX={enable_emqx}, Mosquitto={enable_mosquitto}, BunkerM={enable_bunkerm})...")
|
||
print("[MQTT] 🔌 Connexion aux brokers...")
|
||
for name, host, port, tls, user, pwd, use_v5 in brokers:
|
||
c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd, use_v5=use_v5)
|
||
self.clients[name] = c
|
||
self.ok[name] = False
|
||
# Attendre que tous les brokers soient connectés (Artemis peut être lent)
|
||
for _ in range(20):
|
||
time.sleep(1)
|
||
if all(self.ok.values()):
|
||
break
|
||
print(f"[MQTT] État des connexions: {dict(self.ok)}")
|
||
|
||
def publish(self, topic: str, payload: str, sensor_type: str = "unknown") -> dict[str, bool]:
|
||
results = {}
|
||
payload_bytes = len(payload.encode())
|
||
with self._lock:
|
||
for name, client in self.clients.items():
|
||
if self.ok.get(name, False):
|
||
with publish_duration.labels(broker=name).time():
|
||
try:
|
||
r = client.publish(topic, payload, qos=1)
|
||
success = (r.rc == mqtt.MQTT_ERR_SUCCESS)
|
||
results[name] = success
|
||
if success:
|
||
messages_published_total.labels(broker=name, sensor_type=sensor_type).inc()
|
||
message_payload_size.labels(broker=name).observe(payload_bytes)
|
||
else:
|
||
messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="mqtt_rc").inc()
|
||
except Exception:
|
||
results[name] = False
|
||
messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="exception").inc()
|
||
else:
|
||
results[name] = False
|
||
return results
|
||
|
||
# def publish_iot_agent(self, sid: str, payload: dict, sensor_type: str = "unknown") -> bool:
|
||
# """Publie sur le topic IoT-Agent (json/smartcity-api-key/{sid}/attrs) via les 3 brokers."""
|
||
topic = f"json/smartcity-api-key/{sid}/attrs"
|
||
msg = json.dumps(payload, ensure_ascii=False)
|
||
payload_bytes = len(msg.encode())
|
||
|
||
success = False
|
||
# Publier sur les 3 brokers: emqx, mosquitto, bunkerm
|
||
for broker_name in ['emqx', 'mosquitto', 'bunkerm']:
|
||
client_ok = self.clients.get(broker_name) is not None and self.ok.get(broker_name, False)
|
||
print(f"[MQTT-DEBUG] {broker_name}: client_exists={self.clients.get(broker_name) is not None}, ok={self.ok.get(broker_name, False)}")
|
||
if client_ok:
|
||
try:
|
||
r = self.clients[broker_name].publish(topic, msg, qos=1)
|
||
if r.rc == mqtt.MQTT_ERR_SUCCESS:
|
||
success = True
|
||
# messages_published_total.labels(broker='iot-agent', sensor_type=sensor_type).inc()
|
||
# message_payload_size.labels(broker='iot-agent').observe(payload_bytes)
|
||
except Exception:
|
||
pass # IoT-Agent code removed
|
||
return success
|
||
|
||
def stop(self):
|
||
for name, c in self.clients.items():
|
||
try:
|
||
c.loop_stop()
|
||
c.disconnect()
|
||
except Exception:
|
||
pass
|
||
|
||
# =============================================================================
|
||
# URLs de base (résolues au démarrage)
|
||
# =============================================================================
|
||
#ORION_HOST = "localhost"
|
||
#ORION_PORT = "2026"
|
||
#ORION_URL = f"http://{ORION_HOST}:{ORION_PORT}"
|
||
#STELLIO_URL = os.environ.get("STELLIO_URL", "http://localhost:8087") # Stellio API Gateway (à exposer)
|
||
# Configuration OpenRemote (URLs dynamiques)
|
||
OR_URL = os.environ.get("OR_URL", "http://localhost:8080") # OpenRemote Manager (Traefik)
|
||
OR_REALM = os.environ.get("OR_REALM", "smartcity") # Default: smartcity
|
||
OR_TOKEN_URL = os.environ.get("OR_TOKEN_URL", "http://localhost:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token")
|
||
OR_TOKEN_TTL = int(os.environ.get("OR_TOKEN_TTL", "3600")) # Refresh token every hour
|
||
#STELLIO_TENANT = os.environ.get("STELLIO_TENANT", "urn:ngsi-ld:tenant:default")
|
||
|
||
|
||
|
||
def publish_bunkerm(sid: str, sensor: dict, values: dict) -> bool:
|
||
"""Publie sur BunkerM via HTTP API (port 2000) avec session."""
|
||
import base64, http.cookiejar, urllib.request, json
|
||
from datetime import datetime, timezone
|
||
|
||
host = "bunkerm_bunkerm_1:2000"
|
||
login_url = f"http://{host}/login"
|
||
data_url = f"http://{host}/api/sensors/data"
|
||
|
||
# 1. Cookie jar pour maintenir la session
|
||
cj = http.cookiejar.CookieJar()
|
||
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
|
||
|
||
# 2. Authentification (Basic) pour obtenir le cookie de session
|
||
creds = base64.b64encode(b"bunker:bunker").decode()
|
||
auth_header = {"Authorization": f"Basic {creds}"}
|
||
|
||
try:
|
||
# GET sur /login pour initialiser la session
|
||
req_login = urllib.request.Request(login_url, headers=auth_header)
|
||
opener.open(req_login, timeout=5)
|
||
except Exception as e:
|
||
print(f" ⚠️ BunkerM login → {e}")
|
||
return False
|
||
|
||
# 3. Préparer le payload
|
||
payload = {
|
||
"sensor_id": sid,
|
||
"sensor_type": sensor["type"],
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
}
|
||
for k, v in values.items():
|
||
if k not in payload:
|
||
payload[k] = v
|
||
|
||
# 4. POST avec le cookie de session
|
||
data = json.dumps(payload).encode()
|
||
req = urllib.request.Request(
|
||
data_url,
|
||
data=data,
|
||
headers={**auth_header, "Content-Type": "application/json"},
|
||
method="POST"
|
||
)
|
||
try:
|
||
with http_request_duration.labels(broker="bunkerm", method="POST").time():
|
||
with opener.open(req, timeout=5) as resp:
|
||
http_requests_total.labels(broker="bunkerm", method="POST", status_code=str(resp.status)).inc()
|
||
messages_published_total.labels(broker="bunkerm", sensor_type=sensor["type"]).inc()
|
||
print(f" ✅ BunkerM: HTTP {resp.status}")
|
||
return resp.status in (200, 201, 204)
|
||
except Exception as e:
|
||
http_requests_total.labels(broker="bunkerm", method="POST", status_code="exception").inc()
|
||
messages_errors_total.labels(broker="bunkerm", sensor_type=sensor["type"], error_type="exception").inc()
|
||
print(f" ⚠️ BunkerM POST → {e}")
|
||
return False
|
||
|
||
def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool:
|
||
"""Crée le Thing (1 par capteur) + Datastreams, puis POST l'Observation."""
|
||
# Cache : {sid: (thing_id, {field: ds_id})}
|
||
if sid in _frost_cache:
|
||
thing_id, ds_map = _frost_cache[sid]
|
||
if field in ds_map:
|
||
ds_id = ds_map[field]
|
||
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
|
||
obs = {
|
||
"resultTime": datetime.now(timezone.utc).isoformat(),
|
||
"result": value,
|
||
"FeatureOfInterest": {
|
||
"name": f"Location {sid}",
|
||
"description": f"Feature of interest for sensor {sid}",
|
||
"encodingType": "application/vnd.geo+json",
|
||
"feature": {
|
||
"type": "Point",
|
||
"coordinates": [sensor.get("lon", -61.0), sensor.get("lat", 14.6)]
|
||
}
|
||
}
|
||
}
|
||
if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"):
|
||
print(f" ✅ FROST Observation {sid}/{field} → OK (cached)")
|
||
return True
|
||
else:
|
||
print(f" ⚠️ FROST Observation {sid}/{field} → échec")
|
||
return False
|
||
|
||
# Premier appel pour ce capteur : créer Thing + tous les Datastreams
|
||
# Topic MQTT pour traçabilité
|
||
stype = sensor["type"]
|
||
topic = f"city/sensors/{stype}/{sid}"
|
||
thing_payload, datastreams = _frost_payload(sid, sensor, source="simulator", topic=topic)
|
||
print(f" 📊 FROST: POST Thing {sid}...")
|
||
tid = _http_post(f"{FROST_URL}/Things", thing_payload, FROST_HEADERS, broker="frost")
|
||
if not tid:
|
||
print(f" ⚠️ FROST Thing {sid} → échec création")
|
||
return False
|
||
print(f" ✅ FROST Thing {sid} créé (ID: {tid})")
|
||
|
||
# Créer les Datastreams
|
||
ds_map = {}
|
||
for f, ds, _ in datastreams:
|
||
ds["Thing"] = {"@iot.id": tid}
|
||
print(f" 📊 FROST: POST Datastream {sid}/{f}...")
|
||
ds_id = _http_post(f"{FROST_URL}/Datastreams", ds, FROST_HEADERS, broker="frost")
|
||
if ds_id:
|
||
print(f" ✅ FROST Datastream {sid}/{f} créé (ID: {ds_id})")
|
||
ds_map[f] = ds_id
|
||
else:
|
||
print(f" ⚠️ FROST Datastream {sid}/{f} → échec")
|
||
|
||
_frost_cache[sid] = (tid, ds_map)
|
||
|
||
# Poster l'observation pour le field actuel
|
||
if field in ds_map:
|
||
ds_id = ds_map[field]
|
||
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
|
||
obs = {"resultTime": datetime.now(timezone.utc).isoformat(), "result": value}
|
||
if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"):
|
||
print(f" ✅ FROST Observation {sid}/{field} → OK")
|
||
return True
|
||
return False
|
||
|
||
# =============================================================================
|
||
# OpenRemote REST
|
||
# =============================================================================
|
||
_or_token_cache = {"token": "", "expires": 0}
|
||
|
||
def _get_or_token() -> str:
|
||
"""Obtain an OpenRemote token via password grant (admin user)."""
|
||
import time, urllib.parse
|
||
if _or_token_cache["token"] and _or_token_cache["expires"] > time.time() + 60:
|
||
return _or_token_cache["token"]
|
||
try:
|
||
# Use password grant with admin user (full rights)
|
||
token_url = f"http://openremote-keycloak-1:8080/auth/realms/{OR_TOKEN_REALM}/protocol/openid-connect/token"
|
||
client_id = os.environ.get("OR_CLIENT_ID", "openremote")
|
||
client_secret = os.environ.get("OR_CLIENT_SECRET", "0oQjzTfiEELYmj5jFwT4iIuWUDtQDvVa")
|
||
data = urllib.parse.urlencode({
|
||
"grant_type": "password",
|
||
"username": os.environ.get("OR_ADMIN_USER", "admin"),
|
||
"password": os.environ.get("OR_ADMIN_PASS", "Digitribe972"),
|
||
"client_id": client_id,
|
||
"client_secret": client_secret
|
||
}).encode()
|
||
req = urllib.request.Request(
|
||
token_url,
|
||
data=data,
|
||
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
||
)
|
||
with urllib.request.urlopen(req, timeout=5) as r:
|
||
token_data = json.loads(r.read().decode())
|
||
_or_token_cache["token"] = token_data["access_token"]
|
||
_or_token_cache["expires"] = time.time() + token_data.get("expires_in", 300) - 60
|
||
return _or_token_cache["token"]
|
||
except Exception as e:
|
||
print(f" ⚠️ OpenRemote token → {e}")
|
||
return ""
|
||
|
||
def _or_put(asset_id: str, payload: dict) -> bool:
|
||
"""PUT update sur un asset OpenRemote (avec version)."""
|
||
token = _get_or_token()
|
||
if not token:
|
||
return False
|
||
try:
|
||
# Récupérer la version actuelle de l'asset
|
||
get_url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}"
|
||
get_req = urllib.request.Request(get_url, headers={"Authorization": f"Bearer {token}"})
|
||
version = 1
|
||
try:
|
||
with urllib.request.urlopen(get_req, timeout=5) as resp:
|
||
asset_data = json.loads(resp.read().decode())
|
||
version = asset_data.get("version", 1)
|
||
except:
|
||
pass # Si GET échoue, utiliser version=1
|
||
|
||
# Ajouter la version au payload
|
||
payload["version"] = version
|
||
body = json.dumps(payload).encode()
|
||
url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}"
|
||
req = urllib.request.Request(url, data=body,
|
||
headers={
|
||
"Authorization": f"Bearer {token}",
|
||
"Content-Type": "application/json",
|
||
"If-Match": str(version),
|
||
},
|
||
method="PUT")
|
||
with http_request_duration.labels(broker="openremote", method="PUT").time():
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
http_requests_total.labels(broker="openremote", method="PUT", status_code=str(resp.status)).inc()
|
||
messages_published_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown")).inc()
|
||
return resp.status in (200, 204)
|
||
except urllib.error.HTTPError as e:
|
||
http_requests_total.labels(broker="openremote", method="PUT", status_code=str(e.code)).inc()
|
||
messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="http_error").inc()
|
||
print(f" ⚠️ OR PUT {asset_id} → HTTP {e.code}")
|
||
return False
|
||
except Exception as e:
|
||
http_requests_total.labels(broker="openremote", method="PUT", status_code="exception").inc()
|
||
messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="exception").inc()
|
||
print(f" ⚠️ OR PUT {asset_id} → {e}")
|
||
return False
|
||
|
||
def publish_openremote(sid: str, sensor: dict, values: dict) -> bool:
|
||
"""Met à jour les attributs d'un asset OpenRemote via REST."""
|
||
# Mapping sid → asset ID (realm master) — assets avec agentLink + location
|
||
# Note: le compteur SENSORS est global (traffic=0-9, airquality=10-19, parking=20-29, noise=30-39, weather=40-49, light=50-59)
|
||
ASSET_MAP = {
|
||
"traffic_000": "429858caca3341f56fbf65",
|
||
"traffic_001": "301218322f5aaca9d6d168",
|
||
"traffic_002": "bd35fe2a90133118b9b004",
|
||
"traffic_003": "da59ec9301c4efd3fd55c4",
|
||
"traffic_004": "834f4b7b9df848f5c5c2d8",
|
||
"airquality_010": "0f922351a9894bc0144c94",
|
||
"airquality_011": "4f83219bbee703b3e0a255",
|
||
"airquality_012": "381cc31ab83dd66ed4be37",
|
||
"airquality_013": "808b73c22ecd19589a33be",
|
||
"airquality_014": "03c18679226329183b44b6",
|
||
"parking_020": "0ee6689f5c0499643d48eb",
|
||
"parking_021": "8fb6b2d0601d98b47a4172",
|
||
"parking_022": "0c00bda9e5075d12d59694",
|
||
"parking_023": "ae981dc9d155d1313b9acf",
|
||
"parking_024": "96020cc5aef95c5fda7bb4",
|
||
"noise_030": "0be31930e45d2eb5c12ccd",
|
||
"noise_031": "1802e76e3432d5eda1deb7",
|
||
"noise_032": "08edb6518750d50644afe3",
|
||
"noise_033": "93d09bfac36d2ed95fc858",
|
||
"noise_034": "7942726d84d2bd29de1e5d",
|
||
"weather_040": "9942f881ab6df375d8d9fa",
|
||
"weather_041": "5400fdf5c51a4fe4f5a89c",
|
||
"weather_042": "1a3bf32aa5208892e68965",
|
||
"weather_043": "d3725f922f96085f2df3f7",
|
||
"weather_044": "13be192a8c23dd8fdceada",
|
||
"light_050": "1f4302946b1a4a1ded23f6",
|
||
"light_051": "35e6ef027ed9a157ad8780",
|
||
"light_052": "526538589aa981bdc77ce9",
|
||
"light_053": "d4a6ac7f34d64e581937c0",
|
||
"light_054": "40bbe989be2ae5b2a98b30",
|
||
}
|
||
asset_id = ASSET_MAP.get(sid)
|
||
if not asset_id:
|
||
return False
|
||
# Construire les attributs à jour
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
attrs = {
|
||
"timestamp": {"type": "DateTime", "value": now, "timestamp": now},
|
||
"battery_level": {"type": "Number", "value": random.randint(60, 100), "timestamp": now},
|
||
}
|
||
# Mapper les valeurs du payload vers les attributs OR
|
||
field_map = {
|
||
"temperature_celsius": "temperature",
|
||
"humidity_percent": "humidity",
|
||
"noise_level_db": "noiseLevel",
|
||
"pm25_ugm3": "airQuality",
|
||
"vehicle_count": "trafficFlow",
|
||
"available_spots": "parking",
|
||
"brightness_lux": "light",
|
||
"flood": "flood",
|
||
}
|
||
for field, val in values.items():
|
||
attr_name = field_map.get(field, field)
|
||
attrs[attr_name] = {
|
||
"type": "Number",
|
||
"value": val,
|
||
"timestamp": now,
|
||
"unit": "µg/m³" if "ugm3" in field else ("°C" if "celsius" in field else ("%" if "percent" in field or "humidity" in field else ("dB" if "db" in field else ""))),
|
||
}
|
||
|
||
# Ajouter la location du capteur (GeoJSON Point)
|
||
lat = sensor.get("lat", 0)
|
||
lon = sensor.get("lon", 0)
|
||
attrs["location"] = {
|
||
"type": "GeoJSONPoint",
|
||
"value": {"type": "Point", "coordinates": [lat, lon]},
|
||
"timestamp": now,
|
||
}
|
||
|
||
payload = {
|
||
"id": asset_id,
|
||
"name": sensor["name"],
|
||
"type": "IOTSensor",
|
||
"realm": OR_REALM,
|
||
"attributes": attrs,
|
||
}
|
||
return _or_put(asset_id, payload)
|
||
|
||
# =============================================================================
|
||
# Pulsar — HTTP REST Producer
|
||
# API: POST http://host:8080/admin/v2/persistent/public/default/{topic}/produce
|
||
# Payload: {"messages": [{"payload": "<base64>", "properties": {...}}]}
|
||
# Topics auto-créés par le premier message (Pulsar standalone)
|
||
# =============================================================================
|
||
_pulsar_session = None
|
||
|
||
def _get_pulsar_session():
|
||
global _pulsar_session
|
||
if _pulsar_session is None:
|
||
import urllib.request
|
||
_pulsar_session = urllib.request
|
||
return _pulsar_session
|
||
|
||
def _init_pulsar() -> bool:
|
||
"""Teste la connectivité Pulsar au démarrage."""
|
||
try:
|
||
import urllib.request
|
||
req = urllib.request.Request(f"{PULSAR_BASE}/admin/v2/clusters")
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
if resp.status == 200:
|
||
print(f"[PULSAR] ✅ Connected to {PULSAR_BASE}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"[PULSAR] ⚠️ Cannot reach {PULSAR_BASE}: {e}")
|
||
return False
|
||
|
||
def publish_pulsar(sid: str, sensor: dict, payload: dict) -> bool:
|
||
"""Publie un message sur Pulsar via le client Python (port binaire 6650)."""
|
||
stype = sensor["type"]
|
||
topic = f"persistent://public/default/smartcity-{stype.replace('-','')}"
|
||
try:
|
||
import pulsar
|
||
with publish_duration.labels(broker="pulsar").time():
|
||
client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650")
|
||
producer = client.create_producer(topic)
|
||
body = json.dumps(payload, ensure_ascii=False).encode()
|
||
producer.send(body, properties={"sensor_id": sid, "source": "simulator"})
|
||
client.close()
|
||
messages_published_total.labels(broker="pulsar", sensor_type=stype).inc()
|
||
message_payload_size.labels(broker="pulsar").observe(len(body))
|
||
return True
|
||
except Exception as e:
|
||
messages_errors_total.labels(broker="pulsar", sensor_type=stype, error_type="exception").inc()
|
||
print(f" ⚠️ Pulsar → {e}")
|
||
return False
|
||
|
||
# =============================================================================
|
||
# Redpanda / Kafka — HTTP REST Proxy
|
||
# API: POST http://host:8082/topics/{topic}
|
||
# Payload: {"records": [{"value": "<base64>"}]}
|
||
# Topics auto-créés par le premier message (Redpanda)
|
||
# =============================================================================
|
||
_redpanda_session = None
|
||
|
||
def _get_redpanda_session():
|
||
global _redpanda_session
|
||
if _redpanda_session is None:
|
||
import urllib.request
|
||
_redpanda_session = urllib.request
|
||
return _redpanda_session
|
||
|
||
def _init_redpanda() -> bool:
|
||
"""Teste la connectivité Redpanda au démarrage."""
|
||
try:
|
||
import urllib.request
|
||
req = urllib.request.Request(f"{REDPANDA_BASE}/v1/status/alive")
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
if resp.status == 200:
|
||
print(f"[REDPANDA] ✅ Connected to {REDPANDA_BASE}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"[REDPANDA] ⚠️ Cannot reach {REDPANDA_BASE}: {e}")
|
||
return False
|
||
|
||
def publish_redpanda(sid: str, sensor: dict, payload: dict) -> bool:
|
||
"""Publie un message sur Redpanda/Kafka via le REST Proxy."""
|
||
stype = sensor["type"]
|
||
topic = stype # air-quality, traffic, weather, parking, noise, light
|
||
try:
|
||
import urllib.request, base64
|
||
body = json.dumps(payload, ensure_ascii=False)
|
||
b64 = base64.b64encode(body.encode()).decode()
|
||
record = {
|
||
"records": [{"value": b64}]
|
||
}
|
||
url = f"{REDPANDA_BASE}/topics/{topic}"
|
||
req = urllib.request.Request(
|
||
url,
|
||
data=json.dumps(record).encode(),
|
||
headers={"Content-Type": "application/vnd.kafka.json.v2+json"},
|
||
method="POST"
|
||
)
|
||
with http_request_duration.labels(broker="redpanda", method="POST").time():
|
||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||
http_requests_total.labels(broker="redpanda", method="POST", status_code=str(resp.status)).inc()
|
||
messages_published_total.labels(broker="redpanda", sensor_type=stype).inc()
|
||
message_payload_size.labels(broker="redpanda").observe(len(body.encode()))
|
||
return resp.status in (200, 201, 204)
|
||
except urllib.error.HTTPError as e:
|
||
http_requests_total.labels(broker="redpanda", method="POST", status_code=str(e.code)).inc()
|
||
messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="http_error").inc()
|
||
print(f" ⚠️ Redpanda → {e.code}")
|
||
return False
|
||
except Exception as e:
|
||
http_requests_total.labels(broker="redpanda", method="POST", status_code="exception").inc()
|
||
messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="exception").inc()
|
||
print(f" ⚠️ Redpanda → {e}")
|
||
return False
|
||
|
||
def publish_influx(sid: str, sensor: dict, values: dict) -> bool:
|
||
"""Write sensor data to InfluxDB (async, non-blocking)."""
|
||
if not _influx_write_api:
|
||
influx_write_total.labels(status="skipped").inc()
|
||
return False
|
||
|
||
def _write_async():
|
||
try:
|
||
stype = sensor["type"]
|
||
lat = sensor.get("lat", BASE_LAT)
|
||
lon = sensor.get("lon", BASE_LON)
|
||
|
||
points = []
|
||
for field, value in values.items():
|
||
if isinstance(value, (int, float)):
|
||
p = influxdb_client.Point(stype)\
|
||
.tag("sensor_id", sid)\
|
||
.tag("location", sensor.get("name", sid))\
|
||
.field(field, float(value))\
|
||
.field("lat", float(lat))\
|
||
.field("lon", float(lon))
|
||
points.append(p)
|
||
|
||
if points:
|
||
_influx_write_api.write(bucket=INFLUX_BUCKET, record=points)
|
||
influx_write_total.labels(status="success").inc()
|
||
print(f" 📈 InfluxDB: {len(points)} points written")
|
||
except Exception as e:
|
||
influx_write_total.labels(status="error").inc()
|
||
print(f" ⚠️ InfluxDB → {e}")
|
||
|
||
# Exécution asynchrone (non-bloquante)
|
||
t = threading.Thread(target=_write_async, daemon=True)
|
||
t.start()
|
||
return True # Async: on ne peut pas savoir immédiatement
|
||
|
||
def main():
|
||
print("╔══════════════════════════════════════════════════╗")
|
||
print("║ Smart City Simulator — Martinique ║")
|
||
print("╚══════════════════════════════════════════════════╝")
|
||
print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s")
|
||
# print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}")
|
||
print(f"[CFG] InfluxDB: {ENABLE_INFLUX} | Pulsar: {ENABLE_PULSAR} | Redpanda: {ENABLE_REDPANDA}")
|
||
|
||
# --- Démarrer le serveur Prometheus ---
|
||
_start_metrics_server()
|
||
|
||
# --- Configurer les gauges ---
|
||
for stype, count in SENSOR_COUNTS.items():
|
||
sensors_total.labels(sensor_type=stype).set(count)
|
||
up.set(1)
|
||
|
||
# Init connectivity checks
|
||
if ENABLE_PULSAR:
|
||
_init_pulsar()
|
||
# Test immédiat
|
||
print(f" 🌪️ DEBUG: Test Pulsar direct...", flush=True)
|
||
test_payload = {"type": "test", "value": 123}
|
||
test_result = publish_pulsar("test_001", {"type": "air-quality"}, test_payload)
|
||
print(f" 🌪️ DEBUG: Test Pulsar result: {test_result}", flush=True)
|
||
if ENABLE_REDPANDA:
|
||
_init_redpanda()
|
||
|
||
mqtt_client = MultiMQTT()
|
||
|
||
running = True
|
||
def signal_handler(*_):
|
||
nonlocal running
|
||
running = False
|
||
up.set(0)
|
||
print("\n[SIM] 🛑 Arrêt...")
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
signal.signal(signal.SIGTERM, signal_handler)
|
||
|
||
iteration = 0
|
||
while running:
|
||
iteration += 1
|
||
ts = datetime.now().strftime("%H:%M:%S")
|
||
print(f"\n[SIM] ⏱️ It #{iteration} — {ts}")
|
||
|
||
for sid, sensor in SENSORS.items():
|
||
stype = sensor["type"]
|
||
# Utiliser l'index du capteur dans FIXED_LOCATIONS (1-5) pour correspondre aux agents OpenRemote
|
||
# Le sid est formaté comme {stype}_{counter:03d} avec counter global
|
||
# On extrait l'index du capteur depuis le sid (derniers chiffres)
|
||
sensor_num = int(sid.split("_")[1])
|
||
# Calculer l'index 1-5 basé sur la position dans le type
|
||
# traffic: 0-9, airquality: 10-19, parking: 20-29, noise: 30-39, weather: 40-49, light: 50-59
|
||
type_offsets = {"traffic": 0, "airquality": 10, "parking": 20, "noise": 30, "weather": 40, "light": 50}
|
||
type_offset = type_offsets.get(stype, 0)
|
||
sensor_index = sensor_num - type_offset + 1 # 1-indexed
|
||
topic = f"smartcity/{stype}/{sensor_index}"
|
||
|
||
# --- Payload MQTT (ATTRIBUTES ONLY - pas de id/type/lat/lon !)
|
||
# # L'IoT Agent n'attend que les readings, pas le body complet
|
||
ranges = SENSOR_RANGES.get(stype, {})
|
||
payload_mqtt = {
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"battery_level": random.randint(60, 100),
|
||
}
|
||
for field, val_range in ranges.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)):
|
||
payload_mqtt[field] = round(random.uniform(lo, hi), 1)
|
||
elif isinstance(val_range, list):
|
||
payload_mqtt[field] = random.choice(val_range)
|
||
|
||
msg = json.dumps(payload_mqtt, ensure_ascii=False)
|
||
|
||
# --- MQTT publish ---
|
||
results = mqtt_client.publish(topic, msg, sensor_type=stype)
|
||
ok_mqtt = [n for n, r in results.items() if r]
|
||
if ok_mqtt:
|
||
print(f" 📤 {topic} → {','.join(ok_mqtt)}")
|
||
|
||
# # --- IoT-Agent (via EMQX) ---
|
||
# if ENABLE_IOT_AGENT:
|
||
# ok_iot = mqtt_client.publish_iot_agent(sid, payload_mqtt, sensor_type=stype)
|
||
# print(f" 🤖 IoT-Agent: {'✅' if ok_iot else '❌'}")
|
||
|
||
# Extraire les valeurs pour OpenRemote
|
||
or_values = {}
|
||
for field, val_range in ranges.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)):
|
||
or_values[field] = round(random.uniform(lo, hi), 1)
|
||
|
||
# --- OpenRemote REST --- (DÉSACTIVÉ: les agents MQTT gèrent les mises à jour)
|
||
# Le REST échoue en 403 sur les assets avec agentLink
|
||
# if ENABLE_OPENREMOTE:
|
||
# ok_or = publish_openremote(sid, sensor, or_values)
|
||
# print(f" 🏠 OpenRemote: {'✅' if ok_or else '⚠️ skipped'}")
|
||
|
||
# # --- Orion-LD --- (DÉSACTIVÉ: tout passe par les IoT-Agents MQTT)
|
||
# # if ENABLE_ORION:
|
||
# # ok_or = publish_orion(sid, sensor)
|
||
# # print(f" 🌐 Orion-LD: {'✅' if ok_or else '⚠️ skipped'}")
|
||
|
||
# # --- Stellio --- (DÉSACTIVÉ: tout passe par les IoT-Agents MQTT)
|
||
# # if ENABLE_STELLIO:
|
||
# # ok_st = publish_stellio(sid, sensor)
|
||
# # print(f" 🏢 Stellio: {'✅' if ok_st else '❌'}")
|
||
|
||
# --- FROST ---
|
||
if ENABLE_FROST:
|
||
ranges2 = SENSOR_RANGES.get(stype, {})
|
||
for field, val_range in ranges2.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)):
|
||
val = round(random.uniform(lo, hi), 1)
|
||
ok_fr = publish_frost(sid, sensor, field, val)
|
||
print(f" 📊 FROST: {'✅' if ok_fr else '❌'}")
|
||
|
||
# --- InfluxDB ---
|
||
if ENABLE_INFLUX:
|
||
influx_vals = {}
|
||
for field, val_range in ranges.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)):
|
||
influx_vals[field] = round(random.uniform(lo, hi), 1)
|
||
ok_influx = publish_influx(sid, sensor, influx_vals)
|
||
print(f" 📈 InfluxDB: {'✅' if ok_influx else '❌'}")
|
||
|
||
# --- Pulsar (HTTP REST) ---
|
||
if ENABLE_PULSAR:
|
||
print(f" 🌪️ DEBUG: calling publish_pulsar for {sid}, payload_mqtt exists: {bool(locals().get('payload_mqtt'))}", flush=True)
|
||
ok_pulsar = publish_pulsar(sid, sensor, payload_mqtt)
|
||
print(f" 🌪️ Pulsar: {'✅' if ok_pulsar else '❌'}")
|
||
|
||
# --- Redpanda (Kafka REST Proxy) ---
|
||
if ENABLE_REDPANDA:
|
||
ok_redpanda = publish_redpanda(sid, sensor, payload_mqtt)
|
||
print(f" 🐟 Redpanda: {'✅' if ok_redpanda else '❌'}")
|
||
|
||
# --- BunkerM HTTP ---
|
||
if os.getenv("BUNKERM_HTTP", "0") == "1":
|
||
ok_bunkerm = publish_bunkerm(sid, sensor, payload_mqtt)
|
||
print(f" 📦 BunkerM: {'✅' if ok_bunkerm else '❌'}")
|
||
|
||
print(f"[SIM] ✅ {len(SENSORS)} capteurs | MQTT OK: {sum(mqtt_client.ok.values())}/{len(mqtt_client.clients)} | OR: {ENABLE_OPENREMOTE}")
|
||
|
||
try:
|
||
time.sleep(INTERVAL)
|
||
except KeyboardInterrupt:
|
||
break
|
||
|
||
mqtt_client.stop()
|
||
print("[SIM] ✅ Arrêté proprement.")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|