- ChirpStack opérationnel (port 8080/8090, gateway bridge UDP 1700) - The Things Stack opérationnel (port 1885/1884, gateway UDP 1701) - Fichages de configuration créés - Docker-compose corrigés (réseaux smartcity-shared) - Désactivation agentLink sur 35 assets du simulateur - Correction _or_put: suppression If-Match header (403) - realm smartcity identifié pour les assets du simulateur
1219 lines
55 KiB
Python
1219 lines
55 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Smart City IoT Simulator — Martinique (14.6°N, 61.2°W)
|
||
=======================================================
|
||
Publie vers MULTIPLES brokers MQTT + context brokers NGSI-LD.
|
||
|
||
Brokers MQTT:
|
||
- EMQX: emqx_emqx_1:1883 (sans auth)
|
||
- Mosquitto: mainfluxlabs-mosquitto:1883 (bunker/bunker)
|
||
- BunkerM: bunkerm_bunkerm_1:1900 (TLS, bunker/bunker)
|
||
- OpenRemote: openremote-manager-1:1883 (admin/Digitribe972)
|
||
|
||
Context Brokers REST:
|
||
# - Orion-LD: fiware-gis-quickstart-orion-1:1026 (NGSI-LD)
|
||
# - Stellio: stellio-api-gateway:8080 (NGSI-LD)
|
||
- FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings)
|
||
|
||
Streaming Platforms:
|
||
- Pulsar: smart-city-pulsar:8080 (HTTP REST Producer API)
|
||
- Redpanda: smart-city-redpanda:8082 (Kafka REST Proxy)
|
||
|
||
Time-Series DB:
|
||
- InfluxDB v2: smart-city-influxdb:8086 (via influxdb-client)
|
||
|
||
Variables d'environnement:
|
||
PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s)
|
||
BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France)
|
||
# ENABLE_ORION=1 : activer Orion-LD (défaut: 1)
|
||
# ENABLE_STELLIO=1 : activer Stellio (défaut: 1)
|
||
ENABLE_FROST=1 : activer FROST-Server (défaut: 1)
|
||
ENABLE_INFLUX=1 : activer InfluxDB v2 (défaut: 1)
|
||
ENABLE_PULSAR=1 : activer Apache Pulsar (défaut: 1)
|
||
ENABLE_REDPANDA=1 : activer Redpanda Kafka (défaut: 1)
|
||
"""
|
||
|
||
import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse
|
||
import paho.mqtt.client as mqtt
|
||
import urllib.request, urllib.error
|
||
from datetime import datetime, timezone
|
||
from typing import Any
|
||
|
||
# Prometheus metrics
|
||
import prometheus_client
|
||
from prometheus_client import Counter, Histogram, Gauge, Info
|
||
|
||
# InfluxDB support
|
||
import influxdb_client
|
||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||
|
||
# =============================================================================
|
||
# Configuration des brokers MQTT
|
||
# Configuration des brokers MQTT
|
||
# Utilise les noms de services Docker par défaut
|
||
EMQX_HOST = os.environ.get("EMQX_HOST", "emqx_emqx_1")
|
||
EMQX_PORT = int(os.environ.get("EMQX_PORT", "1883"))
|
||
MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "smart-city-mosquitto")
|
||
MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883"))
|
||
BUNKERM_HOST = os.environ.get("BUNKERM_HOST", "bunkerm_bunkerm_1")
|
||
BUNKERM_PORT = int(os.environ.get("BUNKERM_PORT", "1900"))
|
||
|
||
# =============================================================================
|
||
# Configuration
|
||
# =============================================================================
|
||
BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091"))
|
||
BASE_LON = float(os.environ.get("BASE_LON", "-61.2155"))
|
||
INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "1")) # 1s pour temps réel
|
||
#ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1"
|
||
#ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1").lower() in ("1", "true", "yes", "on")
|
||
ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1"
|
||
ENABLE_OPENREMOTE = os.environ.get("ENABLE_OPENREMOTE", "1") == "1"
|
||
OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin")
|
||
OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "Digitribe972")
|
||
OR_REALM = os.environ.get("OR_REALM", "master")
|
||
OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token
|
||
#ENABLE_IOT_AGENT = os.environ.get("ENABLE_IOT_AGENT", "1") == "1"
|
||
FROST_URL = os.environ.get("FROST_URL", "http://localhost:8090/FROST-Server/v1.1") # Exposer frost_http-web-1:8080 -> host:8086
|
||
|
||
# Pulsar config (HTTP REST — pulsar-admin + producer REST API)
|
||
ENABLE_PULSAR = os.environ.get("ENABLE_PULSAR", "1").lower() in ("1", "true", "yes", "on")
|
||
PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar")
|
||
PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "8080"))
|
||
PULSAR_BASE = f"http://{PULSAR_HOST}:{PULSAR_PORT}"
|
||
|
||
# Redpanda / Kafka config (REST Proxy HTTP)
|
||
ENABLE_REDPANDA = os.environ.get("ENABLE_REDPANDA", "1").lower() in ("1", "true", "yes", "on")
|
||
REDPANDA_HOST = os.environ.get("REDPANDA_HOST", "smart-city-redpanda")
|
||
REDPANDA_PORT = int(os.environ.get("REDPANDA_PORT", "8082"))
|
||
REDPANDA_BASE = f"http://{REDPANDA_HOST}:{REDPANDA_PORT}"
|
||
|
||
# InfluxDB config
|
||
ENABLE_INFLUX = os.environ.get("ENABLE_INFLUX", "1").lower() in ("1", "true", "yes", "on")
|
||
INFLUX_URL = os.environ.get("INFLUX_URL", "http://smart-city-influxdb:8086") # InfluxDB v2 sur smartcity-shared
|
||
INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe")
|
||
INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "smartcity") # Correspond au bucket de Telegraf
|
||
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-token")
|
||
|
||
# Prometheus metrics HTTP server
|
||
METRICS_PORT = int(os.environ.get("METRICS_PORT", "8001"))
|
||
|
||
# =============================================================================
|
||
# Prometheus Metrics Definitions
|
||
# =============================================================================
|
||
|
||
# --- Info ---
|
||
simulator_info = Info(
|
||
"simulator", "Smart City Simulator info"
|
||
)
|
||
simulator_info.info({
|
||
"version": "1.0.0",
|
||
"python_version": sys.version.split()[0],
|
||
"mqtt_brokers": "emqx,mosquitto,bunkerm",
|
||
# "context_brokers": "orion_ld,stellio,frost",
|
||
})
|
||
|
||
# --- Counters ---
|
||
messages_published_total = Counter(
|
||
"simulator_messages_published_total",
|
||
"Total messages published by broker",
|
||
["broker", "sensor_type"]
|
||
)
|
||
|
||
messages_errors_total = Counter(
|
||
"simulator_messages_errors_total",
|
||
"Total publish errors",
|
||
["broker", "sensor_type", "error_type"]
|
||
)
|
||
|
||
mqtt_connection_total = Counter(
|
||
"simulator_mqtt_connection_total",
|
||
"MQTT connection attempts",
|
||
["broker", "status"] # status: success, failure
|
||
)
|
||
|
||
http_requests_total = Counter(
|
||
"simulator_http_requests_total",
|
||
"HTTP requests to REST APIs",
|
||
["broker", "method", "status_code"]
|
||
)
|
||
|
||
influx_write_total = Counter(
|
||
"simulator_influx_write_total",
|
||
"InfluxDB write operations",
|
||
["status"] # success, error
|
||
)
|
||
|
||
# --- Gauges ---
|
||
mqtt_broker_connected = Gauge(
|
||
"simulator_mqtt_broker_connected",
|
||
"MQTT broker connection status (1=connected, 0=disconnected)",
|
||
["broker"]
|
||
)
|
||
|
||
sensors_total = Gauge(
|
||
"simulator_sensors_total",
|
||
"Total number of sensors by type",
|
||
["sensor_type"]
|
||
)
|
||
|
||
up = Gauge(
|
||
"simulator_up",
|
||
"Simulator is running (1=yes, 0=no)"
|
||
)
|
||
|
||
# --- Histograms ---
|
||
publish_duration = Histogram(
|
||
"simulator_publish_duration_seconds",
|
||
"Time spent publishing a message",
|
||
["broker"],
|
||
buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5)
|
||
)
|
||
|
||
http_request_duration = Histogram(
|
||
"simulator_http_request_duration_seconds",
|
||
"HTTP request latency to REST APIs",
|
||
["broker", "method"],
|
||
buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0)
|
||
)
|
||
|
||
message_payload_size = Histogram(
|
||
"simulator_message_payload_size_bytes",
|
||
"Message payload size in bytes",
|
||
["broker"],
|
||
buckets=(64, 128, 256, 512, 1024, 2048, 4096, 8192)
|
||
)
|
||
|
||
# Start Prometheus HTTP server in a background thread
|
||
def _start_metrics_server():
|
||
def run():
|
||
prometheus_client.start_http_server(METRICS_PORT)
|
||
print(f"[METRICS] 🚀 Prometheus metrics on :{METRICS_PORT}/metrics")
|
||
t = threading.Thread(target=run, daemon=True)
|
||
t.start()
|
||
return t
|
||
|
||
# Initialize InfluxDB client
|
||
_influx_client = None
|
||
_influx_write_api = None
|
||
if ENABLE_INFLUX:
|
||
try:
|
||
_influx_client = influxdb_client.InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
|
||
_influx_write_api = _influx_client.write_api(write_options=influxdb_client.client.write_api.ASYNCHRONOUS)
|
||
print(f"[INFLUX] ✅ Connected to {INFLUX_URL} (async mode)")
|
||
except Exception as e:
|
||
print(f"[INFLUX] ❌ Connection failed: {e}")
|
||
|
||
SENSOR_COUNTS = {
|
||
"traffic": int(os.environ.get("SENSOR_COUNT_traffic", "3")),
|
||
"airquality": int(os.environ.get("SENSOR_COUNT_airquality", "10")),
|
||
"parking": int(os.environ.get("SENSOR_COUNT_parking", "10")),
|
||
"noise": int(os.environ.get("SENSOR_COUNT_noise", "1")),
|
||
"weather": int(os.environ.get("SENSOR_COUNT_weather", "1")),
|
||
"light": int(os.environ.get("SENSOR_COUNT_light", "1")),
|
||
}
|
||
# Si SENSOR_COUNT est défini, multiplier les counts de façon proportionnelle
|
||
_total_default = sum(SENSOR_COUNTS.values())
|
||
if "SENSOR_COUNT" in os.environ:
|
||
target = int(os.environ["SENSOR_COUNT"])
|
||
ratio = target / _total_default
|
||
for k in SENSOR_COUNTS:
|
||
SENSOR_COUNTS[k] = max(1, int(SENSOR_COUNTS[k] * ratio))
|
||
|
||
# =============================================================================
|
||
# Localisation des capteurs Martinique — Coordonnées FIXES sur terre ferme
|
||
# IMPORTANT : ±0.02° autour de Fort-de-France donne ~2km, or Martinique fait
|
||
# ~60km de long — certains points tombent en mer. Solution : coordonnées fixes.
|
||
# =============================================================================
|
||
|
||
# Coordonnées réelles Martinique (terre ferme uniquement)
|
||
# Martinique : 14.4°N–14.9°N, -61.23°W–-60.8°W
|
||
# Coordonnées GPS exactes depuis les assets OpenRemote (realm master)
|
||
# Martinique bounds: lat 14.37–14.88°N, lon 61.0–61.25°W
|
||
FIXED_LOCATIONS: dict[str, dict[str, tuple[float, float]]] = {
|
||
"traffic": {
|
||
"Fort-de-France Centre": (14.6164, -61.07),
|
||
"Le Lamentin Aéroport": (14.6167, -61.0035),
|
||
"Le Robert D110": (14.6833, -60.9333),
|
||
"Sainte-Anne Plage": (14.4333, -60.9833),
|
||
"Saint-Joseph D1": (14.7, -61.05),
|
||
"Trinité Centre": (14.7167, -60.9167),
|
||
"Le François D2": (14.6833, -60.8333),
|
||
"Ducos Penitencier": (14.5833, -61.0667),
|
||
"Schœlcher Morne": (14.65, -61.1),
|
||
"Case-Pilote Bourg": (14.5167, -61.1167),
|
||
},
|
||
"airquality": {
|
||
"Fort-de-France Lamartine": (14.613, -61.0667),
|
||
"Le Lamentin Zac": (14.62, -61.0),
|
||
"Le Robert Bourg": (14.68, -60.93),
|
||
"Sainte-Anne Village": (14.43, -60.98),
|
||
"Saint-Joseph Morne": (14.705, -61.04),
|
||
"Trinité Eglise": (14.72, -60.91),
|
||
"Le François Bourg": (14.68, -60.83),
|
||
"Ducos Centre": (14.58, -61.06),
|
||
"Schœlcher Plage": (14.655, -61.11),
|
||
"Case-Pilote D1": (14.52, -61.12),
|
||
},
|
||
"parking": {
|
||
"Fort-de-France Place Clémenceau": (14.615, -61.068),
|
||
"Le Lamentin Centre Commercial": (14.618, -61.002),
|
||
"Le Robert Stade": (14.685, -60.935),
|
||
"Sainte-Anne Mairie": (14.432, -60.985),
|
||
"Saint-Joseph Ecole": (14.702, -61.045),
|
||
"Trinité Port": (14.715, -60.92),
|
||
"Le François Mairie": (14.682, -60.835),
|
||
"Ducos ZI": (14.585, -61.055),
|
||
"Schœlcher Bourg": (14.652, -61.105),
|
||
"Case-Pilote Stade": (14.518, -61.118),
|
||
},
|
||
"noise": {
|
||
"Fort-de-France Théâtre": (14.617, -61.069),
|
||
"Le Lamentin Zone Industrielle": (14.619, -61.001),
|
||
"Le Robert Bourg": (14.681, -60.932),
|
||
"Sainte-Anne Plage": (14.434, -60.982),
|
||
"Saint-Joseph Morne": (14.703, -61.042),
|
||
"Trinité Centre": (14.717, -60.918),
|
||
"Le François Bourg": (14.681, -60.832),
|
||
"Ducos Penitencier": (14.584, -61.058),
|
||
"Schœlcher Morne": (14.651, -61.102),
|
||
"Case-Pilote Village": (14.519, -61.115),
|
||
},
|
||
"weather": {
|
||
"Fort-de-France Meteo": (14.616, -61.067),
|
||
"Le Lamentin Aéroport": (14.617, -61.004),
|
||
"Le Robert Bourg": (14.682, -60.934),
|
||
"Sainte-Anne Village": (14.431, -60.981),
|
||
"Saint-Joseph Morne": (14.704, -61.043),
|
||
"Trinité Eglise": (14.718, -60.912),
|
||
"Le François Bourg": (14.683, -60.834),
|
||
"Ducos Centre": (14.586, -61.057),
|
||
"Schœlcher Plage": (14.654, -61.108),
|
||
"Case-Pilote D1": (14.521, -61.113),
|
||
},
|
||
"light": {
|
||
"Fort-de-France Place": (14.6155, -61.0685),
|
||
"Le Lamentin Rond-point": (14.6185, -61.0025),
|
||
"Le Robert D110": (14.6835, -60.9335),
|
||
"Sainte-Anne Plage": (14.4335, -60.9835),
|
||
"Saint-Joseph D1": (14.7005, -61.0505),
|
||
"Trinité Centre": (14.7165, -60.9165),
|
||
"Le François D2": (14.6835, -60.8335),
|
||
"Ducos Penitencier": (14.5835, -61.0665),
|
||
"Schœlcher Morne": (14.6505, -61.1005),
|
||
"Case-Pilote Bourg": (14.5165, -61.1165),
|
||
},
|
||
}
|
||
|
||
# Ranges par type
|
||
SENSOR_RANGES: dict[str, dict] = {
|
||
"traffic": {"vehicle_count":(10,150),"average_speed_kmh":(10,80),
|
||
"congestion_level":(0,5),"occupancy_percent":(0,100)},
|
||
"airquality": {"pm25_ugm3":(5,80),"pm10_ugm3":(10,150),"no2_ugm3":(5,60),
|
||
"o3_ugm3":(20,120),"co_mgm3":(0.1,5.0),
|
||
"temperature_celsius":(20,35),"humidity_percent":(40,95)},
|
||
"parking": {"total_spots":(50,500),"available_spots":(0,500),
|
||
"occupancy_percent":(0,100),"turnover_per_hour":(5,50)},
|
||
"noise": {"noise_level_db":(40,95),"peak_db":(60,110)},
|
||
"weather": {"temperature_celsius":(22,34),"humidity_percent":(50,95),
|
||
"wind_speed_kmh":(0,50),"pressure_hpa":(1005,1025),
|
||
"rain_mm":(0,20),"uv_index":(0,11)},
|
||
"light": {"brightness_lux":(0,100000),"power_consumption_w":(0,500)},
|
||
}
|
||
|
||
NOISE_CATEGORIES = ["quiet","moderate","loud","very_loud"]
|
||
LIGHT_STATUSES = ["on","off","dimmed","auto"]
|
||
|
||
# =============================================================================
|
||
# Capteurs déclarés
|
||
# =============================================================================
|
||
SENSORS: dict[str, dict] = {}
|
||
counter = 0
|
||
for stype, locs in FIXED_LOCATIONS.items():
|
||
for name, coords in locs.items():
|
||
sid = f"{stype}_{counter:03d}"
|
||
SENSORS[sid] = {"type": stype, "lat": coords[0], "lon": coords[1], "name": name}
|
||
counter += 1
|
||
|
||
# =============================================================================
|
||
## Payload NGSI-LD pour Orion-LD / Stellio
|
||
# =============================================================================
|
||
# Contextes NGSI-LD : core + Smart Data Models
|
||
# https://smartdatamodels.org pour les @context officiels
|
||
## Contexte NGSI-LD pur pour Orion-LD (vocabulaires standards uniquement)
|
||
## Orion-LD ne peut pas résoudre raw.githubusercontent.com — utiliser uri.etsi.org uniquement
|
||
#ORION_CONTEXT = [
|
||
# "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
|
||
#]
|
||
|
||
# Mapping sensor type → Smart Data Model type NGSI-LD
|
||
SMART_MODEL_MAPPING = {
|
||
"airquality": "AirQualityObserved",
|
||
"traffic": "TrafficFlowObserved",
|
||
"parking": "OffStreetParking",
|
||
"noise": "NoiseLevelObserved",
|
||
"weather": "WeatherObserved",
|
||
"light": "Device",
|
||
}
|
||
FROST_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"}
|
||
|
||
# Cache FROST : éviter de recréer Thing/Datastream
|
||
_frost_cache: dict[str, tuple[str, str]] = {} # (sid, field) -> (thing_id, ds_id)
|
||
|
||
## Contexte NGSI-LD pur pour Stellio et Orion-LD (vocabulaires standards uniquement)
|
||
## Stellio et Orion-LD embarquent le contexte core NGSI-LD : https://uri.etsi.org/ngsi-ld/
|
||
# On n'utilise PAS les vocabulaires smartdatamodels.org distants (inaccessibles depuis les containers)
|
||
# Les types d'entité Smart Data Models (AirQualityObserved, etc.) sont reconnus par leur nom
|
||
# Les propriétés spécifiques sont stockées telles quelles (vocabulaire libre)
|
||
#STELLIO_INLINE_CONTEXT = [
|
||
# "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
|
||
#]
|
||
|
||
def _frost_payload(sid: str, sensor: dict, source: str = "simulator", topic: str = "") -> dict:
|
||
"""Construit un payload SensorThings pour FROST-Server."""
|
||
stype = sensor["type"]
|
||
ranges = SENSOR_RANGES.get(stype, {})
|
||
datastreams = []
|
||
|
||
for field, val_range in ranges.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)) and isinstance(hi, (int, float)):
|
||
val = round(random.uniform(lo, hi), 1)
|
||
unit = "http://www.qudt.org/vocab/unit#DegreeCelsius"
|
||
obs_prop = {
|
||
"name": f"{field} Observation",
|
||
"description": f"Observation of {field}",
|
||
"definition": unit,
|
||
}
|
||
sensor_data = {
|
||
"name": f"Sensor {sid} {field}",
|
||
"description": f"Sensor {sid} measuring {field}",
|
||
"encodingType": "http://www.opengis.net/doc/IS/SensorML/2.0",
|
||
"metadata": {"unit": unit},
|
||
}
|
||
ds = {
|
||
"name": f"Datastream {stype}/{field}",
|
||
"description": f"Datastream for {stype} sensor {sid} - {field}",
|
||
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
|
||
"unitOfMeasurement": {"name": field, "symbol": "", "definition": unit},
|
||
"Sensor": sensor_data,
|
||
"ObservedProperty": obs_prop,
|
||
}
|
||
datastreams.append((field, ds, val))
|
||
|
||
thing_payload = {
|
||
"name": f"Thing_{sid}",
|
||
"description": f"Smart City {stype} sensor in Martinique",
|
||
"properties": {
|
||
"sensorType": stype,
|
||
"region": "Martinique",
|
||
"source": source, # Traçabilité
|
||
"mqttTopic": topic # Traçabilité
|
||
},
|
||
}
|
||
return thing_payload, datastreams
|
||
|
||
# =============================================================================
|
||
# HTTP helper
|
||
# =============================================================================
|
||
def _http_post(url: str, data: dict, headers: dict, broker: str = "unknown") -> str:
|
||
"""POST et retourne 'ok' ou 'created' (ou '' si échec)."""
|
||
try:
|
||
body = json.dumps(data).encode()
|
||
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
|
||
with http_request_duration.labels(broker=broker, method="POST").time():
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||
http_requests_total.labels(broker=broker, method="POST", status_code=str(resp.status)).inc()
|
||
if resp.status == 204:
|
||
return 'created' # No Content — succès
|
||
if resp.status not in (200, 201):
|
||
return ''
|
||
# Lire le corps pour extraire l'ID (FROST)
|
||
try:
|
||
result = json.loads(resp.read())
|
||
if '@iot.selfLink' in result:
|
||
link = result['@iot.selfLink']
|
||
return link.split('(')[1].rstrip(')')
|
||
if '@iot.id' in result:
|
||
return str(result['@iot.id'])
|
||
except Exception:
|
||
pass
|
||
location = resp.headers.get('Location', '')
|
||
if location:
|
||
return location.split('(')[1].rstrip(')') if '(' in location else ''
|
||
return 'created'
|
||
except Exception:
|
||
pass
|
||
except urllib.error.HTTPError as e:
|
||
# Lire le corps de l'erreur pour debug
|
||
try:
|
||
err_body = e.read().decode()[:200]
|
||
except Exception:
|
||
err_body = str(e)
|
||
print(f" ⚠️ HTTP POST {url} → {e.code}: {err_body}")
|
||
http_requests_total.labels(broker=broker, method="POST", status_code=str(e.code)).inc()
|
||
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc()
|
||
return ''
|
||
except Exception as e:
|
||
http_requests_total.labels(broker=broker, method="POST", status_code="exception").inc()
|
||
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc()
|
||
print(f" ⚠️ HTTP POST {url} → {e}")
|
||
return ''
|
||
|
||
def _http_put(url: str, data: dict, headers: dict, broker: str = "unknown") -> bool:
|
||
try:
|
||
body = json.dumps(data).encode()
|
||
req = urllib.request.Request(url, data=body, headers=headers, method="PUT")
|
||
with http_request_duration.labels(broker=broker, method="PUT").time():
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
http_requests_total.labels(broker=broker, method="PUT", status_code=str(resp.status)).inc()
|
||
return resp.status in (200, 204)
|
||
except urllib.error.HTTPError as e:
|
||
http_requests_total.labels(broker=broker, method="PUT", status_code=str(e.code)).inc()
|
||
if e.code == 409:
|
||
return True # Already exists - that's fine
|
||
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc()
|
||
print(f" ⚠️ HTTP PUT {url} → {e}")
|
||
return False
|
||
except Exception as e:
|
||
http_requests_total.labels(broker=broker, method="PUT", status_code="exception").inc()
|
||
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc()
|
||
print(f" ⚠️ HTTP PUT {url} → {e}")
|
||
return False
|
||
|
||
# =============================================================================
|
||
# MQTT Client multi-broker
|
||
# =============================================================================
|
||
class MultiMQTT:
|
||
def __init__(self):
|
||
self.clients: dict[str, mqtt.Client] = {}
|
||
self.ok: dict[str, bool] = {}
|
||
self._lock = threading.Lock()
|
||
self._setup()
|
||
|
||
def _mk_client(self, name: str, host: str, port: int,
|
||
tls: bool = False, user: str = "", pwd: str = "",
|
||
ws: bool = False, use_v5: bool = False) -> mqtt.Client:
|
||
cid = f"smartcity-sim-{name}-{os.getpid()}"
|
||
protocol = mqtt.MQTTv5 if use_v5 else mqtt.MQTTv311
|
||
# Use Callback API v1 for compatibility with existing lambda callbacks
|
||
c = mqtt.Client(client_id=cid, protocol=protocol, callback_api_version=mqtt.CallbackAPIVersion.VERSION1)
|
||
if user:
|
||
c.username_pw_set(user, pwd)
|
||
if tls:
|
||
c.tls_set(cert_reqs=ssl.CERT_NONE)
|
||
c.tls_insecure_set(True)
|
||
if ws:
|
||
c.ws_set(b"/mqtt")
|
||
c.on_connect = lambda _c, _, __, rc: self._on_connect(name, rc)
|
||
c.on_disconnect = lambda _c, _, __: self._on_disconnect(name)
|
||
try:
|
||
c.connect(host, port, keepalive=120)
|
||
c.loop_start()
|
||
except Exception as e:
|
||
print(f"[MQTT] ❌ {name} @ {host}:{port} → {e}")
|
||
self.ok[name] = False
|
||
return c
|
||
|
||
def _on_connect(self, name: str, rc: int):
|
||
with self._lock:
|
||
if rc == 0:
|
||
self.ok[name] = True
|
||
mqtt_broker_connected.labels(broker=name).set(1)
|
||
mqtt_connection_total.labels(broker=name, status="success").inc()
|
||
print(f"[MQTT] ✅ {name} connecté")
|
||
else:
|
||
self.ok[name] = False
|
||
mqtt_broker_connected.labels(broker=name).set(0)
|
||
mqtt_connection_total.labels(broker=name, status="failure").inc()
|
||
print(f"[MQTT] ❌ {name} rc={rc}")
|
||
|
||
def _on_disconnect(self, name: str):
|
||
with self._lock:
|
||
self.ok[name] = False
|
||
mqtt_broker_connected.labels(broker=name).set(0)
|
||
print(f"[MQTT] ⚠️ {name} déconnecté")
|
||
|
||
def _setup(self):
|
||
# Utiliser les variables d'environnement pour les brokers
|
||
mqtt_version = os.environ.get("MQTT_PROTOCOL_VERSION", "311")
|
||
use_v5 = (mqtt_version == "5")
|
||
print(f"[MQTT] Version du protocole: {'v5.0' if use_v5 else 'v3.1.1'} (MQTT_PROTOCOL_VERSION={mqtt_version})")
|
||
|
||
# Check which brokers are enabled
|
||
enable_emqx = os.environ.get("ENABLE_EMQX", "1") == "1"
|
||
enable_mosquitto = os.environ.get("ENABLE_MOSQUITTO", "1") == "1"
|
||
enable_bunkerm = os.environ.get("ENABLE_BUNKER", "1") == "1"
|
||
|
||
brokers = []
|
||
if enable_emqx:
|
||
brokers.append(("emqx", EMQX_HOST, EMQX_PORT, False, "", "", use_v5))
|
||
if enable_mosquitto:
|
||
brokers.append(("mosquitto", MOSQUITTO_HOST, MOSQUITTO_PORT, False, "", "", use_v5))
|
||
if enable_bunkerm:
|
||
brokers.append(("bunkerm", BUNKERM_HOST, BUNKERM_PORT, False, "bunker", "bunker", use_v5))
|
||
|
||
# OpenRemote MQTT broker (pour agents MQTT créés dans OpenRemote)
|
||
# Le broker Artemis d'OR nécessite MQTTv5 pour l'auth anonyme
|
||
if ENABLE_OPENREMOTE:
|
||
brokers.append(("openremote", "openremote_manager_1", 1883, False, "", "", True))
|
||
|
||
print(f"[MQTT] 🔌 Connexion aux brokers (EMQX={enable_emqx}, Mosquitto={enable_mosquitto}, BunkerM={enable_bunkerm})...")
|
||
print("[MQTT] 🔌 Connexion aux brokers...")
|
||
for name, host, port, tls, user, pwd, use_v5 in brokers:
|
||
c = self._mk_client(name, host, port, tls=tls, user=user, pwd=pwd, use_v5=use_v5)
|
||
self.clients[name] = c
|
||
self.ok[name] = False
|
||
# Attendre que tous les brokers soient connectés (Artemis peut être lent)
|
||
for _ in range(20):
|
||
time.sleep(1)
|
||
if all(self.ok.values()):
|
||
break
|
||
print(f"[MQTT] État des connexions: {dict(self.ok)}")
|
||
|
||
def publish(self, topic: str, payload: str, sensor_type: str = "unknown") -> dict[str, bool]:
|
||
results = {}
|
||
payload_bytes = len(payload.encode())
|
||
with self._lock:
|
||
for name, client in self.clients.items():
|
||
if self.ok.get(name, False):
|
||
with publish_duration.labels(broker=name).time():
|
||
try:
|
||
r = client.publish(topic, payload, qos=1)
|
||
success = (r.rc == mqtt.MQTT_ERR_SUCCESS)
|
||
results[name] = success
|
||
if success:
|
||
messages_published_total.labels(broker=name, sensor_type=sensor_type).inc()
|
||
message_payload_size.labels(broker=name).observe(payload_bytes)
|
||
else:
|
||
messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="mqtt_rc").inc()
|
||
except Exception:
|
||
results[name] = False
|
||
messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="exception").inc()
|
||
else:
|
||
results[name] = False
|
||
return results
|
||
|
||
# def publish_iot_agent(self, sid: str, payload: dict, sensor_type: str = "unknown") -> bool:
|
||
# """Publie sur le topic IoT-Agent (json/smartcity-api-key/{sid}/attrs) via les 3 brokers."""
|
||
topic = f"json/smartcity-api-key/{sid}/attrs"
|
||
msg = json.dumps(payload, ensure_ascii=False)
|
||
payload_bytes = len(msg.encode())
|
||
|
||
success = False
|
||
# Publier sur les 3 brokers: emqx, mosquitto, bunkerm
|
||
for broker_name in ['emqx', 'mosquitto', 'bunkerm']:
|
||
client_ok = self.clients.get(broker_name) is not None and self.ok.get(broker_name, False)
|
||
print(f"[MQTT-DEBUG] {broker_name}: client_exists={self.clients.get(broker_name) is not None}, ok={self.ok.get(broker_name, False)}")
|
||
if client_ok:
|
||
try:
|
||
r = self.clients[broker_name].publish(topic, msg, qos=1)
|
||
if r.rc == mqtt.MQTT_ERR_SUCCESS:
|
||
success = True
|
||
# messages_published_total.labels(broker='iot-agent', sensor_type=sensor_type).inc()
|
||
# message_payload_size.labels(broker='iot-agent').observe(payload_bytes)
|
||
except Exception:
|
||
pass # IoT-Agent code removed
|
||
return success
|
||
|
||
def stop(self):
|
||
for name, c in self.clients.items():
|
||
try:
|
||
c.loop_stop()
|
||
c.disconnect()
|
||
except Exception:
|
||
pass
|
||
|
||
# =============================================================================
|
||
# URLs de base (résolues au démarrage)
|
||
# =============================================================================
|
||
#ORION_HOST = "localhost"
|
||
#ORION_PORT = "2026"
|
||
#ORION_URL = f"http://{ORION_HOST}:{ORION_PORT}"
|
||
#STELLIO_URL = os.environ.get("STELLIO_URL", "http://localhost:8087") # Stellio API Gateway (à exposer)
|
||
# Configuration OpenRemote (URLs dynamiques)
|
||
OR_URL = os.environ.get("OR_URL", "http://localhost:8080") # OpenRemote Manager (Traefik)
|
||
OR_REALM = os.environ.get("OR_REALM", "smartcity") # Default: smartcity
|
||
OR_TOKEN_URL = os.environ.get("OR_TOKEN_URL", "http://localhost:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token")
|
||
OR_TOKEN_TTL = int(os.environ.get("OR_TOKEN_TTL", "3600")) # Refresh token every hour
|
||
#STELLIO_TENANT = os.environ.get("STELLIO_TENANT", "urn:ngsi-ld:tenant:default")
|
||
|
||
|
||
|
||
def publish_bunkerm(sid: str, sensor: dict, values: dict) -> bool:
|
||
"""Publie sur BunkerM via HTTP API (port 2000) avec session."""
|
||
import base64, http.cookiejar, urllib.request, json
|
||
from datetime import datetime, timezone
|
||
|
||
host = "bunkerm_bunkerm_1:2000"
|
||
login_url = f"http://{host}/login"
|
||
data_url = f"http://{host}/api/sensors/data"
|
||
|
||
# 1. Cookie jar pour maintenir la session
|
||
cj = http.cookiejar.CookieJar()
|
||
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
|
||
|
||
# 2. Authentification (Basic) pour obtenir le cookie de session
|
||
creds = base64.b64encode(b"bunker:bunker").decode()
|
||
auth_header = {"Authorization": f"Basic {creds}"}
|
||
|
||
try:
|
||
# GET sur /login pour initialiser la session
|
||
req_login = urllib.request.Request(login_url, headers=auth_header)
|
||
opener.open(req_login, timeout=5)
|
||
except Exception as e:
|
||
print(f" ⚠️ BunkerM login → {e}")
|
||
return False
|
||
|
||
# 3. Préparer le payload
|
||
payload = {
|
||
"sensor_id": sid,
|
||
"sensor_type": sensor["type"],
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
}
|
||
for k, v in values.items():
|
||
if k not in payload:
|
||
payload[k] = v
|
||
|
||
# 4. POST avec le cookie de session
|
||
data = json.dumps(payload).encode()
|
||
req = urllib.request.Request(
|
||
data_url,
|
||
data=data,
|
||
headers={**auth_header, "Content-Type": "application/json"},
|
||
method="POST"
|
||
)
|
||
try:
|
||
with http_request_duration.labels(broker="bunkerm", method="POST").time():
|
||
with opener.open(req, timeout=5) as resp:
|
||
http_requests_total.labels(broker="bunkerm", method="POST", status_code=str(resp.status)).inc()
|
||
messages_published_total.labels(broker="bunkerm", sensor_type=sensor["type"]).inc()
|
||
print(f" ✅ BunkerM: HTTP {resp.status}")
|
||
return resp.status in (200, 201, 204)
|
||
except Exception as e:
|
||
http_requests_total.labels(broker="bunkerm", method="POST", status_code="exception").inc()
|
||
messages_errors_total.labels(broker="bunkerm", sensor_type=sensor["type"], error_type="exception").inc()
|
||
print(f" ⚠️ BunkerM POST → {e}")
|
||
return False
|
||
|
||
def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool:
|
||
"""Crée le Thing (1 par capteur) + Datastreams, puis POST l'Observation."""
|
||
# Cache : {sid: (thing_id, {field: ds_id})}
|
||
if sid in _frost_cache:
|
||
thing_id, ds_map = _frost_cache[sid]
|
||
if field in ds_map:
|
||
ds_id = ds_map[field]
|
||
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
|
||
obs = {
|
||
"resultTime": datetime.now(timezone.utc).isoformat(),
|
||
"result": value,
|
||
"FeatureOfInterest": {
|
||
"name": f"Location {sid}",
|
||
"description": f"Feature of interest for sensor {sid}",
|
||
"encodingType": "application/vnd.geo+json",
|
||
"feature": {
|
||
"type": "Point",
|
||
"coordinates": [sensor.get("lon", -61.0), sensor.get("lat", 14.6)]
|
||
}
|
||
}
|
||
}
|
||
if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"):
|
||
print(f" ✅ FROST Observation {sid}/{field} → OK (cached)")
|
||
return True
|
||
else:
|
||
print(f" ⚠️ FROST Observation {sid}/{field} → échec")
|
||
return False
|
||
|
||
# Premier appel pour ce capteur : créer Thing + tous les Datastreams
|
||
# Topic MQTT pour traçabilité
|
||
stype = sensor["type"]
|
||
topic = f"city/sensors/{stype}/{sid}"
|
||
thing_payload, datastreams = _frost_payload(sid, sensor, source="simulator", topic=topic)
|
||
print(f" 📊 FROST: POST Thing {sid}...")
|
||
tid = _http_post(f"{FROST_URL}/Things", thing_payload, FROST_HEADERS, broker="frost")
|
||
if not tid:
|
||
print(f" ⚠️ FROST Thing {sid} → échec création")
|
||
return False
|
||
print(f" ✅ FROST Thing {sid} créé (ID: {tid})")
|
||
|
||
# Créer les Datastreams
|
||
ds_map = {}
|
||
for f, ds, _ in datastreams:
|
||
ds["Thing"] = {"@iot.id": tid}
|
||
print(f" 📊 FROST: POST Datastream {sid}/{f}...")
|
||
ds_id = _http_post(f"{FROST_URL}/Datastreams", ds, FROST_HEADERS, broker="frost")
|
||
if ds_id:
|
||
print(f" ✅ FROST Datastream {sid}/{f} créé (ID: {ds_id})")
|
||
ds_map[f] = ds_id
|
||
else:
|
||
print(f" ⚠️ FROST Datastream {sid}/{f} → échec")
|
||
|
||
_frost_cache[sid] = (tid, ds_map)
|
||
|
||
# Poster l'observation pour le field actuel
|
||
if field in ds_map:
|
||
ds_id = ds_map[field]
|
||
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
|
||
obs = {"resultTime": datetime.now(timezone.utc).isoformat(), "result": value}
|
||
if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"):
|
||
print(f" ✅ FROST Observation {sid}/{field} → OK")
|
||
return True
|
||
return False
|
||
|
||
# =============================================================================
|
||
# OpenRemote REST
|
||
# =============================================================================
|
||
_or_token_cache = {"token": "", "expires": 0}
|
||
|
||
def _get_or_token() -> str:
|
||
"""Obtain an OpenRemote token via password grant (admin user)."""
|
||
import time, urllib.parse
|
||
if _or_token_cache["token"] and _or_token_cache["expires"] > time.time() + 60:
|
||
return _or_token_cache["token"]
|
||
try:
|
||
# Use password grant with admin user (full rights)
|
||
token_url = f"http://openremote-keycloak-1:8080/auth/realms/{OR_TOKEN_REALM}/protocol/openid-connect/token"
|
||
client_id = os.environ.get("OR_CLIENT_ID", "openremote")
|
||
client_secret = os.environ.get("OR_CLIENT_SECRET", "0oQjzTfiEELYmj5jFwT4iIuWUDtQDvVa")
|
||
data = urllib.parse.urlencode({
|
||
"grant_type": "password",
|
||
"username": os.environ.get("OR_ADMIN_USER", "admin"),
|
||
"password": os.environ.get("OR_ADMIN_PASS", "Digitribe972"),
|
||
"client_id": client_id,
|
||
"client_secret": client_secret
|
||
}).encode()
|
||
req = urllib.request.Request(
|
||
token_url,
|
||
data=data,
|
||
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
||
)
|
||
with urllib.request.urlopen(req, timeout=5) as r:
|
||
token_data = json.loads(r.read().decode())
|
||
_or_token_cache["token"] = token_data["access_token"]
|
||
_or_token_cache["expires"] = time.time() + token_data.get("expires_in", 300) - 60
|
||
return _or_token_cache["token"]
|
||
except Exception as e:
|
||
print(f" ⚠️ OpenRemote token → {e}")
|
||
return ""
|
||
|
||
def _or_put(asset_id: str, payload: dict) -> bool:
|
||
"""PUT update sur un asset OpenRemote (sans If-Match pour éviter 403)."""
|
||
token = _get_or_token()
|
||
if not token:
|
||
return False
|
||
try:
|
||
body = json.dumps(payload).encode()
|
||
url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}"
|
||
req = urllib.request.Request(url, data=body,
|
||
headers={
|
||
"Authorization": f"Bearer {token}",
|
||
"Content-Type": "application/json",
|
||
},
|
||
method="PUT")
|
||
with http_request_duration.labels(broker="openremote", method="PUT").time():
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
http_requests_total.labels(broker="openremote", method="PUT", status_code=str(resp.status)).inc()
|
||
messages_published_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown")).inc()
|
||
return resp.status in (200, 204)
|
||
except urllib.error.HTTPError as e:
|
||
http_requests_total.labels(broker="openremote", method="PUT", status_code=str(e.code)).inc()
|
||
messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="http_error").inc()
|
||
print(f" ⚠️ OR PUT {asset_id} → HTTP {e.code}")
|
||
return False
|
||
except Exception as e:
|
||
http_requests_total.labels(broker="openremote", method="PUT", status_code="exception").inc()
|
||
messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="exception").inc()
|
||
print(f" ⚠️ OR PUT {asset_id} → {e}")
|
||
return False
|
||
|
||
def publish_openremote(sid: str, sensor: dict, values: dict) -> bool:
|
||
"""Met à jour les attributs d'un asset OpenRemote via REST."""
|
||
# Mapping sid → asset ID (realm master) — assets avec agentLink + location
|
||
# Note: le compteur SENSORS est global (traffic=0-9, airquality=10-19, parking=20-29, noise=30-39, weather=40-49, light=50-59)
|
||
ASSET_MAP = {
|
||
"traffic_000": "429858caca3341f56fbf65",
|
||
"traffic_001": "301218322f5aaca9d6d168",
|
||
"traffic_002": "bd35fe2a90133118b9b004",
|
||
"traffic_003": "da59ec9301c4efd3fd55c4",
|
||
"traffic_004": "834f4b7b9df848f5c5c2d8",
|
||
"airquality_010": "0f922351a9894bc0144c94",
|
||
"airquality_011": "4f83219bbee703b3e0a255",
|
||
"airquality_012": "381cc31ab83dd66ed4be37",
|
||
"airquality_013": "808b73c22ecd19589a33be",
|
||
"airquality_014": "03c18679226329183b44b6",
|
||
"parking_020": "0ee6689f5c0499643d48eb",
|
||
"parking_021": "8fb6b2d0601d98b47a4172",
|
||
"parking_022": "0c00bda9e5075d12d59694",
|
||
"parking_023": "ae981dc9d155d1313b9acf",
|
||
"parking_024": "96020cc5aef95c5fda7bb4",
|
||
"noise_030": "0be31930e45d2eb5c12ccd",
|
||
"noise_031": "1802e76e3432d5eda1deb7",
|
||
"noise_032": "08edb6518750d50644afe3",
|
||
"noise_033": "93d09bfac36d2ed95fc858",
|
||
"noise_034": "7942726d84d2bd29de1e5d",
|
||
"weather_040": "9942f881ab6df375d8d9fa",
|
||
"weather_041": "5400fdf5c51a4fe4f5a89c",
|
||
"weather_042": "1a3bf32aa5208892e68965",
|
||
"weather_043": "d3725f922f96085f2df3f7",
|
||
"weather_044": "13be192a8c23dd8fdceada",
|
||
"light_050": "1f4302946b1a4a1ded23f6",
|
||
"light_051": "35e6ef027ed9a157ad8780",
|
||
"light_052": "526538589aa981bdc77ce9",
|
||
"light_053": "d4a6ac7f34d64e581937c0",
|
||
"light_054": "40bbe989be2ae5b2a98b30",
|
||
}
|
||
asset_id = ASSET_MAP.get(sid)
|
||
if not asset_id:
|
||
return False
|
||
# Construire les attributs à jour
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
attrs = {
|
||
"timestamp": {"type": "DateTime", "value": now, "timestamp": now},
|
||
"battery_level": {"type": "Number", "value": random.randint(60, 100), "timestamp": now},
|
||
}
|
||
# Mapper les valeurs du payload vers les attributs OR
|
||
field_map = {
|
||
"temperature_celsius": "temperature",
|
||
"humidity_percent": "humidity",
|
||
"noise_level_db": "noiseLevel",
|
||
"pm25_ugm3": "airQuality",
|
||
"vehicle_count": "trafficFlow",
|
||
"available_spots": "parking",
|
||
"brightness_lux": "light",
|
||
"flood": "flood",
|
||
}
|
||
for field, val in values.items():
|
||
attr_name = field_map.get(field, field)
|
||
attrs[attr_name] = {
|
||
"type": "Number",
|
||
"value": val,
|
||
"timestamp": now,
|
||
"unit": "µg/m³" if "ugm3" in field else ("°C" if "celsius" in field else ("%" if "percent" in field or "humidity" in field else ("dB" if "db" in field else ""))),
|
||
}
|
||
|
||
# Ajouter la location du capteur (GeoJSON Point)
|
||
lat = sensor.get("lat", 0)
|
||
lon = sensor.get("lon", 0)
|
||
attrs["location"] = {
|
||
"type": "GeoJSONPoint",
|
||
"value": {"type": "Point", "coordinates": [lat, lon]},
|
||
"timestamp": now,
|
||
}
|
||
|
||
payload = {
|
||
"id": asset_id,
|
||
"name": sensor["name"],
|
||
"type": "IOTSensor",
|
||
"realm": OR_REALM,
|
||
"attributes": attrs,
|
||
}
|
||
return _or_put(asset_id, payload)
|
||
|
||
# =============================================================================
|
||
# Pulsar — HTTP REST Producer
|
||
# API: POST http://host:8080/admin/v2/persistent/public/default/{topic}/produce
|
||
# Payload: {"messages": [{"payload": "<base64>", "properties": {...}}]}
|
||
# Topics auto-créés par le premier message (Pulsar standalone)
|
||
# =============================================================================
|
||
_pulsar_session = None
|
||
|
||
def _get_pulsar_session():
|
||
global _pulsar_session
|
||
if _pulsar_session is None:
|
||
import urllib.request
|
||
_pulsar_session = urllib.request
|
||
return _pulsar_session
|
||
|
||
def _init_pulsar() -> bool:
|
||
"""Teste la connectivité Pulsar au démarrage."""
|
||
try:
|
||
import urllib.request
|
||
req = urllib.request.Request(f"{PULSAR_BASE}/admin/v2/clusters")
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
if resp.status == 200:
|
||
print(f"[PULSAR] ✅ Connected to {PULSAR_BASE}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"[PULSAR] ⚠️ Cannot reach {PULSAR_BASE}: {e}")
|
||
return False
|
||
|
||
def publish_pulsar(sid: str, sensor: dict, payload: dict) -> bool:
|
||
"""Publie un message sur Pulsar via le client Python (port binaire 6650)."""
|
||
stype = sensor["type"]
|
||
topic = f"persistent://public/default/smartcity-{stype.replace('-','')}"
|
||
try:
|
||
import pulsar
|
||
with publish_duration.labels(broker="pulsar").time():
|
||
client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650")
|
||
producer = client.create_producer(topic)
|
||
body = json.dumps(payload, ensure_ascii=False).encode()
|
||
producer.send(body, properties={"sensor_id": sid, "source": "simulator"})
|
||
client.close()
|
||
messages_published_total.labels(broker="pulsar", sensor_type=stype).inc()
|
||
message_payload_size.labels(broker="pulsar").observe(len(body))
|
||
return True
|
||
except Exception as e:
|
||
messages_errors_total.labels(broker="pulsar", sensor_type=stype, error_type="exception").inc()
|
||
print(f" ⚠️ Pulsar → {e}")
|
||
return False
|
||
|
||
# =============================================================================
|
||
# Redpanda / Kafka — HTTP REST Proxy
|
||
# API: POST http://host:8082/topics/{topic}
|
||
# Payload: {"records": [{"value": "<base64>"}]}
|
||
# Topics auto-créés par le premier message (Redpanda)
|
||
# =============================================================================
|
||
_redpanda_session = None
|
||
|
||
def _get_redpanda_session():
|
||
global _redpanda_session
|
||
if _redpanda_session is None:
|
||
import urllib.request
|
||
_redpanda_session = urllib.request
|
||
return _redpanda_session
|
||
|
||
def _init_redpanda() -> bool:
|
||
"""Teste la connectivité Redpanda au démarrage."""
|
||
try:
|
||
import urllib.request
|
||
req = urllib.request.Request(f"{REDPANDA_BASE}/v1/status/alive")
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
if resp.status == 200:
|
||
print(f"[REDPANDA] ✅ Connected to {REDPANDA_BASE}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"[REDPANDA] ⚠️ Cannot reach {REDPANDA_BASE}: {e}")
|
||
return False
|
||
|
||
def publish_redpanda(sid: str, sensor: dict, payload: dict) -> bool:
|
||
"""Publie un message sur Redpanda/Kafka via le REST Proxy."""
|
||
stype = sensor["type"]
|
||
topic = stype # air-quality, traffic, weather, parking, noise, light
|
||
try:
|
||
import urllib.request, base64
|
||
body = json.dumps(payload, ensure_ascii=False)
|
||
b64 = base64.b64encode(body.encode()).decode()
|
||
record = {
|
||
"records": [{"value": b64}]
|
||
}
|
||
url = f"{REDPANDA_BASE}/topics/{topic}"
|
||
req = urllib.request.Request(
|
||
url,
|
||
data=json.dumps(record).encode(),
|
||
headers={"Content-Type": "application/vnd.kafka.json.v2+json"},
|
||
method="POST"
|
||
)
|
||
with http_request_duration.labels(broker="redpanda", method="POST").time():
|
||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||
http_requests_total.labels(broker="redpanda", method="POST", status_code=str(resp.status)).inc()
|
||
messages_published_total.labels(broker="redpanda", sensor_type=stype).inc()
|
||
message_payload_size.labels(broker="redpanda").observe(len(body.encode()))
|
||
return resp.status in (200, 201, 204)
|
||
except urllib.error.HTTPError as e:
|
||
http_requests_total.labels(broker="redpanda", method="POST", status_code=str(e.code)).inc()
|
||
messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="http_error").inc()
|
||
print(f" ⚠️ Redpanda → {e.code}")
|
||
return False
|
||
except Exception as e:
|
||
http_requests_total.labels(broker="redpanda", method="POST", status_code="exception").inc()
|
||
messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="exception").inc()
|
||
print(f" ⚠️ Redpanda → {e}")
|
||
return False
|
||
|
||
def publish_influx(sid: str, sensor: dict, values: dict) -> bool:
|
||
"""Write sensor data to InfluxDB (async, non-blocking)."""
|
||
if not _influx_write_api:
|
||
influx_write_total.labels(status="skipped").inc()
|
||
return False
|
||
|
||
def _write_async():
|
||
try:
|
||
stype = sensor["type"]
|
||
lat = sensor.get("lat", BASE_LAT)
|
||
lon = sensor.get("lon", BASE_LON)
|
||
|
||
points = []
|
||
for field, value in values.items():
|
||
if isinstance(value, (int, float)):
|
||
p = influxdb_client.Point(stype)\
|
||
.tag("sensor_id", sid)\
|
||
.tag("location", sensor.get("name", sid))\
|
||
.field(field, float(value))\
|
||
.field("lat", float(lat))\
|
||
.field("lon", float(lon))
|
||
points.append(p)
|
||
|
||
if points:
|
||
_influx_write_api.write(bucket=INFLUX_BUCKET, record=points)
|
||
influx_write_total.labels(status="success").inc()
|
||
print(f" 📈 InfluxDB: {len(points)} points written")
|
||
except Exception as e:
|
||
influx_write_total.labels(status="error").inc()
|
||
print(f" ⚠️ InfluxDB → {e}")
|
||
|
||
# Exécution asynchrone (non-bloquante)
|
||
t = threading.Thread(target=_write_async, daemon=True)
|
||
t.start()
|
||
return True # Async: on ne peut pas savoir immédiatement
|
||
|
||
def main():
|
||
print("╔══════════════════════════════════════════════════╗")
|
||
print("║ Smart City Simulator — Martinique ║")
|
||
print("╚══════════════════════════════════════════════════╝")
|
||
print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s")
|
||
# print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}")
|
||
print(f"[CFG] InfluxDB: {ENABLE_INFLUX} | Pulsar: {ENABLE_PULSAR} | Redpanda: {ENABLE_REDPANDA}")
|
||
|
||
# --- Démarrer le serveur Prometheus ---
|
||
_start_metrics_server()
|
||
|
||
# --- Configurer les gauges ---
|
||
for stype, count in SENSOR_COUNTS.items():
|
||
sensors_total.labels(sensor_type=stype).set(count)
|
||
up.set(1)
|
||
|
||
# Init connectivity checks
|
||
if ENABLE_PULSAR:
|
||
_init_pulsar()
|
||
# Test immédiat
|
||
print(f" 🌪️ DEBUG: Test Pulsar direct...", flush=True)
|
||
test_payload = {"type": "test", "value": 123}
|
||
test_result = publish_pulsar("test_001", {"type": "air-quality"}, test_payload)
|
||
print(f" 🌪️ DEBUG: Test Pulsar result: {test_result}", flush=True)
|
||
if ENABLE_REDPANDA:
|
||
_init_redpanda()
|
||
|
||
mqtt_client = MultiMQTT()
|
||
|
||
running = True
|
||
def signal_handler(*_):
|
||
nonlocal running
|
||
running = False
|
||
up.set(0)
|
||
print("\n[SIM] 🛑 Arrêt...")
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
signal.signal(signal.SIGTERM, signal_handler)
|
||
|
||
iteration = 0
|
||
while running:
|
||
iteration += 1
|
||
ts = datetime.now().strftime("%H:%M:%S")
|
||
print(f"\n[SIM] ⏱️ It #{iteration} — {ts}")
|
||
|
||
for sid, sensor in SENSORS.items():
|
||
stype = sensor["type"]
|
||
# Utiliser l'index du capteur dans FIXED_LOCATIONS (1-5) pour correspondre aux agents OpenRemote
|
||
# Le sid est formaté comme {stype}_{counter:03d} avec counter global
|
||
# On extrait l'index du capteur depuis le sid (derniers chiffres)
|
||
sensor_num = int(sid.split("_")[1])
|
||
# Calculer l'index 1-5 basé sur la position dans le type
|
||
# traffic: 0-9, airquality: 10-19, parking: 20-29, noise: 30-39, weather: 40-49, light: 50-59
|
||
type_offsets = {"traffic": 0, "airquality": 10, "parking": 20, "noise": 30, "weather": 40, "light": 50}
|
||
type_offset = type_offsets.get(stype, 0)
|
||
sensor_index = sensor_num - type_offset + 1 # 1-indexed
|
||
topic = f"smartcity/{stype}/{sensor_index}"
|
||
|
||
# --- Payload MQTT (ATTRIBUTES ONLY - pas de id/type/lat/lon !)
|
||
# # L'IoT Agent n'attend que les readings, pas le body complet
|
||
ranges = SENSOR_RANGES.get(stype, {})
|
||
payload_mqtt = {
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"battery_level": random.randint(60, 100),
|
||
}
|
||
for field, val_range in ranges.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)):
|
||
payload_mqtt[field] = round(random.uniform(lo, hi), 1)
|
||
elif isinstance(val_range, list):
|
||
payload_mqtt[field] = random.choice(val_range)
|
||
|
||
msg = json.dumps(payload_mqtt, ensure_ascii=False)
|
||
|
||
# --- MQTT publish ---
|
||
results = mqtt_client.publish(topic, msg, sensor_type=stype)
|
||
ok_mqtt = [n for n, r in results.items() if r]
|
||
if ok_mqtt:
|
||
print(f" 📤 {topic} → {','.join(ok_mqtt)}")
|
||
|
||
# # --- IoT-Agent (via EMQX) ---
|
||
# if ENABLE_IOT_AGENT:
|
||
# ok_iot = mqtt_client.publish_iot_agent(sid, payload_mqtt, sensor_type=stype)
|
||
# print(f" 🤖 IoT-Agent: {'✅' if ok_iot else '❌'}")
|
||
|
||
# Extraire les valeurs pour OpenRemote
|
||
or_values = {}
|
||
for field, val_range in ranges.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)):
|
||
or_values[field] = round(random.uniform(lo, hi), 1)
|
||
|
||
# --- OpenRemote REST --- (DÉSACTIVÉ: les agents MQTT gèrent les mises à jour)
|
||
# Le REST échoue en 403 sur les assets avec agentLink
|
||
# if ENABLE_OPENREMOTE:
|
||
# ok_or = publish_openremote(sid, sensor, or_values)
|
||
# print(f" 🏠 OpenRemote: {'✅' if ok_or else '⚠️ skipped'}")
|
||
|
||
# # --- Orion-LD --- (DÉSACTIVÉ: tout passe par les IoT-Agents MQTT)
|
||
# # if ENABLE_ORION:
|
||
# # ok_or = publish_orion(sid, sensor)
|
||
# # print(f" 🌐 Orion-LD: {'✅' if ok_or else '⚠️ skipped'}")
|
||
|
||
# # --- Stellio --- (DÉSACTIVÉ: tout passe par les IoT-Agents MQTT)
|
||
# # if ENABLE_STELLIO:
|
||
# # ok_st = publish_stellio(sid, sensor)
|
||
# # print(f" 🏢 Stellio: {'✅' if ok_st else '❌'}")
|
||
|
||
# --- FROST ---
|
||
if ENABLE_FROST:
|
||
ranges2 = SENSOR_RANGES.get(stype, {})
|
||
for field, val_range in ranges2.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)):
|
||
val = round(random.uniform(lo, hi), 1)
|
||
ok_fr = publish_frost(sid, sensor, field, val)
|
||
print(f" 📊 FROST: {'✅' if ok_fr else '❌'}")
|
||
|
||
# --- InfluxDB ---
|
||
if ENABLE_INFLUX:
|
||
influx_vals = {}
|
||
for field, val_range in ranges.items():
|
||
if isinstance(val_range, tuple) and len(val_range) == 2:
|
||
lo, hi = val_range
|
||
if isinstance(lo, (int, float)):
|
||
influx_vals[field] = round(random.uniform(lo, hi), 1)
|
||
ok_influx = publish_influx(sid, sensor, influx_vals)
|
||
print(f" 📈 InfluxDB: {'✅' if ok_influx else '❌'}")
|
||
|
||
# --- Pulsar (HTTP REST) ---
|
||
if ENABLE_PULSAR:
|
||
print(f" 🌪️ DEBUG: calling publish_pulsar for {sid}, payload_mqtt exists: {bool(locals().get('payload_mqtt'))}", flush=True)
|
||
ok_pulsar = publish_pulsar(sid, sensor, payload_mqtt)
|
||
print(f" 🌪️ Pulsar: {'✅' if ok_pulsar else '❌'}")
|
||
|
||
# --- Redpanda (Kafka REST Proxy) ---
|
||
if ENABLE_REDPANDA:
|
||
ok_redpanda = publish_redpanda(sid, sensor, payload_mqtt)
|
||
print(f" 🐟 Redpanda: {'✅' if ok_redpanda else '❌'}")
|
||
|
||
# --- BunkerM HTTP ---
|
||
if os.getenv("BUNKERM_HTTP", "0") == "1":
|
||
ok_bunkerm = publish_bunkerm(sid, sensor, payload_mqtt)
|
||
print(f" 📦 BunkerM: {'✅' if ok_bunkerm else '❌'}")
|
||
|
||
print(f"[SIM] ✅ {len(SENSORS)} capteurs | MQTT OK: {sum(mqtt_client.ok.values())}/{len(mqtt_client.clients)} | OR: {ENABLE_OPENREMOTE}")
|
||
|
||
try:
|
||
time.sleep(INTERVAL)
|
||
except KeyboardInterrupt:
|
||
break
|
||
|
||
mqtt_client.stop()
|
||
print("[SIM] ✅ Arrêté proprement.")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|