feat: distribution service + redpanda consumer + updated flow diagram

- Add Pulsar distribution service (consumes smartcity-* → MQTT + context brokers)
- Add Redpanda → InfluxDB consumer (redpanda/consumer.py)
- Update FIXED_LOCATIONS with exact OpenRemote asset coordinates
- Fix Pulsar topics (underscore: smartcity-traffic not smartcity-traffic)
- Fix prometheus.yml endpoints (Redpanda:9644, comment inactive stacks)
- Add docker-compose.redpanda-consumer.yml
This commit is contained in:
Eric FELIXINE
2026-05-05 22:12:38 -04:00
parent 742b437ed9
commit c06acf4fe8
9 changed files with 2242 additions and 434 deletions

View File

@@ -39,6 +39,10 @@ import urllib.request, urllib.error
from datetime import datetime, timezone
from typing import Any
# Prometheus metrics
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge, Info
# InfluxDB support
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
@@ -89,6 +93,104 @@ INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe")
INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "iot_data")
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-secret-admin-token")
# Prometheus metrics HTTP server
METRICS_PORT = int(os.environ.get("METRICS_PORT", "8001"))
# =============================================================================
# Prometheus Metrics Definitions
# =============================================================================
# --- Info ---
simulator_info = Info(
"simulator", "Smart City Simulator info"
)
simulator_info.info({
"version": "1.0.0",
"python_version": sys.version.split()[0],
"mqtt_brokers": "emqx,mosquitto,bunkerm",
"context_brokers": "orion_ld,stellio,frost",
})
# --- Counters ---
messages_published_total = Counter(
"simulator_messages_published_total",
"Total messages published by broker",
["broker", "sensor_type"]
)
messages_errors_total = Counter(
"simulator_messages_errors_total",
"Total publish errors",
["broker", "sensor_type", "error_type"]
)
mqtt_connection_total = Counter(
"simulator_mqtt_connection_total",
"MQTT connection attempts",
["broker", "status"] # status: success, failure
)
http_requests_total = Counter(
"simulator_http_requests_total",
"HTTP requests to REST APIs",
["broker", "method", "status_code"]
)
influx_write_total = Counter(
"simulator_influx_write_total",
"InfluxDB write operations",
["status"] # success, error
)
# --- Gauges ---
mqtt_broker_connected = Gauge(
"simulator_mqtt_broker_connected",
"MQTT broker connection status (1=connected, 0=disconnected)",
["broker"]
)
sensors_total = Gauge(
"simulator_sensors_total",
"Total number of sensors by type",
["sensor_type"]
)
up = Gauge(
"simulator_up",
"Simulator is running (1=yes, 0=no)"
)
# --- Histograms ---
publish_duration = Histogram(
"simulator_publish_duration_seconds",
"Time spent publishing a message",
["broker"],
buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5)
)
http_request_duration = Histogram(
"simulator_http_request_duration_seconds",
"HTTP request latency to REST APIs",
["broker", "method"],
buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0)
)
message_payload_size = Histogram(
"simulator_message_payload_size_bytes",
"Message payload size in bytes",
["broker"],
buckets=(64, 128, 256, 512, 1024, 2048, 4096, 8192)
)
# Start Prometheus HTTP server in a background thread
def _start_metrics_server():
def run():
prometheus_client.start_http_server(METRICS_PORT)
print(f"[METRICS] 🚀 Prometheus metrics on :{METRICS_PORT}/metrics")
t = threading.Thread(target=run, daemon=True)
t.start()
return t
# Initialize InfluxDB client
_influx_client = None
_influx_write_api = None
@@ -124,54 +226,80 @@ if "SENSOR_COUNT" in os.environ:
# Coordonnées réelles Martinique (terre ferme uniquement)
# Martinique : 14.4°N14.9°N, -61.23°W-60.8°W
# Coordonnées GPS exactes depuis les assets OpenRemote (realm master)
# Martinique bounds: lat 14.3714.88°N, lon 61.061.25°W
FIXED_LOCATIONS: dict[str, dict[str, tuple[float, float]]] = {
"traffic": {
# Fort-de-France — grands axes
"Carrefour Central": (14.6036, -61.1783), # Place du Palais, centre-ville
"Avenue des Caraïbes": (14.6100, -61.1850), # Route de Schoelcher (N1)
"Boulevard Pasteur": (14.6150, -61.1700), # Boulevard Pasteur, nord FdF
"Rue des Flamboyants": (14.5970, -61.1900), # Zone Industrielle, Lamentin
"Place de la République": (14.6000, -61.2100), # Centre administratif, sud FdF
# OpenRemote: "Traffic Fort-de-France Centre"
"FdF Centre": (14.6036, -61.1783),
# OpenRemote: "Traffic Fort-de-France North"
"FdF North": (14.6200, -61.1700),
# OpenRemote: "Traffic Fort-de-France South"
"FdF South": (14.5900, -61.1900),
# OpenRemote: "trafficFlow - Fort-de-France"
"FdF Centre Rue": (14.6036, -61.1783),
# OpenRemote: "Test Sensor"
"FdF Place": (14.6000, -61.2000),
},
"airquality": {
# Points de mesure qualité de l'air sur terre
"Quartier Bonde": (14.6050, -61.1750), # Bonde, sud-est FdF
"Port de Fort-de-France": (14.5980, -61.2250), # Zone portuaire, bord de mer (OK, port ≠ mer)
"Château Denis": (14.6200, -61.1550), # Château Denis, nord montagne
"Lamentin Aéroport": (14.5950, -61.1700), # Aéroport, Lamentin
"Schoelcher Village": (14.7400, -61.1850), # Schoelcher, nord-ouest
# OpenRemote: "Air Quality Fort-de-France"
"FdF Centre": (14.6036, -61.1783),
# OpenRemote: "airQuality - Fort-de-France"
"FdF Bonde": (14.6050, -61.1750),
# OpenRemote: "airQuality - Sainte-Luce"
"Sainte-Luce": (14.5950, -61.1700),
# OpenRemote: "floodLevel - Schoelcher"
"Schoelcher": (14.7400, -61.1850),
# OpenRemote: "humidity - Le Robert"
"Le Robert": (14.6800, -60.9400),
},
"parking": {
# Parkings publics sur terre
"Parking Rivière-Saleé": (14.5820, -61.2050), # Rivière-Salée (sud)
"Parking Cluny": (14.6050, -61.1750), # Cluny, FdF
"Parking Média": (14.6000, -61.1850), # Quartier Média, FdF
"Parking Grand-Camp": (14.6100, -61.1700), # Grand-Camp, Lamentin
"Parking Dillon": (14.6200, -61.1650), # Dillon, nord FdF
# OpenRemote: "Parking Fort-de-France Centre"
"FdF Centre": (14.6036, -61.1783),
# OpenRemote: "parkingAvailability - Fort-de-France"
"FdF Bonde": (14.6050, -61.1750),
# OpenRemote: "Test Sensor"
"FdF Cluny": (14.6000, -61.2000),
# OpenRemote: "Traffic Fort-de-France South"
"FdF Sud": (14.5900, -61.1900),
# OpenRemote: "Weather Lamentin Airport"
"Lamentin": (14.5950, -61.1700),
},
"noise": {
# Zones urbainesbruyantes
"Rue des Arts": (14.6020, -61.1800), # Rue des Arts, centre FdF
"Marché Central": (14.6000, -61.2100), # Marché Central, FdF
"Université Fort-de-France": (14.6400, -61.1600), # Campus Schoe, nord
"Stade de Dillon": (14.6250, -61.1600), # Stade Dillon, nord
"Place du Champs de Mars": (14.6030, -61.1750), # Champs de Mars, FdF
# OpenRemote: "Noise Fort-de-France Centre"
"FdF Centre": (14.6036, -61.1783),
# OpenRemote: "Traffic Fort-de-France Centre"
"FdF Rue": (14.6036, -61.1783),
# OpenRemote: "trafficFlow - Fort-de-France"
"FdF Pasteur": (14.6200, -61.1700),
# OpenRemote: "temperature - Lamentin"
"Lamentin": (14.5950, -61.1650),
# OpenRemote: "temperature - Le Robert"
"Le Robert": (14.6776, -60.9395),
},
"weather": {
# Stations météo — terre ferme uniquement
"Station Météo Lamentin": (14.5950, -61.1650), # Aéroport Lamentin
"Station Schoelcher": (14.7350, -61.1800), # Schoelcher, NW
"Station Ajoupa-Bouillon": (14.8100, -61.0500), # Ajoupa-Bouillon, nord (interieur)
"Station Le François": (14.6150, -60.9000), # Le François, côte atlantique est
"Station Le Robert": (14.6800, -60.9400), # Le Robert, côte atlantique
# OpenRemote: "Weather Lamentin Airport"
"Lamentin": (14.5950, -61.1700),
# OpenRemote: "temperature - Lamentin"
"Lamentin Ville": (14.5950, -61.1650),
# OpenRemote: "temperature - Le Robert"
"Le Robert": (14.6776, -60.9395),
# OpenRemote: "humidity - Le Robert"
"Le Robert Hum": (14.6800, -60.9400),
# OpenRemote: "floodLevel - Schoelcher"
"Schoelcher": (14.7400, -61.1850),
},
"light": {
# Éclairage public — zones urbaines
"Eclairage Rue des Mouettes": (14.6050, -61.1800), # Rue des Mouettes, FdF
"Candela Boulevard": (14.6150, -61.1700), # Boulevard Pasteur
"Lumiere Rue des Acacias": (14.6000, -61.1850), # Rue des Acacias, FdF
"Feux Signalisation Centre": (14.6030, -61.1780), # Carrefours centraux
"Eclairage Port": (14.5980, -61.2250), # Zone portuaire
# OpenRemote: "Light Fort-de-France"
"FdF Centre": (14.6036, -61.1783),
# OpenRemote: "lightIntensity - Fort-de-France"
"FdF Bonde": (14.6050, -61.1800),
# OpenRemote: "Traffic Fort-de-France North"
"FdF North": (14.6200, -61.1700),
# OpenRemote: "Traffic Fort-de-France South"
"FdF South": (14.5900, -61.1900),
# OpenRemote: "airQuality - Sainte-Luce"
"Sainte-Luce": (14.5950, -61.1700),
},
}
@@ -382,30 +510,35 @@ def _frost_payload(sid: str, sensor: dict, source: str = "simulator", topic: str
# =============================================================================
# HTTP helper
# =============================================================================
def _http_post(url: str, data: dict, headers: dict) -> str:
def _http_post(url: str, data: dict, headers: dict, broker: str = "unknown") -> str:
"""POST et retourne 'ok' ou 'created' (ou '' si échec)."""
try:
body = json.dumps(data).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=8) as resp:
if resp.status == 204:
return 'created' # No Content — succès
if resp.status not in (200, 201):
return ''
# Lire le corps pour extraire l'ID (FROST)
with http_request_duration.labels(broker=broker, method="POST").time():
try:
result = json.loads(resp.read())
if '@iot.selfLink' in result:
link = result['@iot.selfLink']
return link.split('(')[1].rstrip(')')
if '@iot.id' in result:
return str(result['@iot.id'])
with urllib.request.urlopen(req, timeout=8) as resp:
http_requests_total.labels(broker=broker, method="POST", status_code=str(resp.status)).inc()
if resp.status == 204:
return 'created' # No Content — succès
if resp.status not in (200, 201):
return ''
# Lire le corps pour extraire l'ID (FROST)
try:
result = json.loads(resp.read())
if '@iot.selfLink' in result:
link = result['@iot.selfLink']
return link.split('(')[1].rstrip(')')
if '@iot.id' in result:
return str(result['@iot.id'])
except Exception:
pass
location = resp.headers.get('Location', '')
if location:
return location.split('(')[1].rstrip(')') if '(' in location else ''
return 'created'
except Exception:
pass
location = resp.headers.get('Location', '')
if location:
return location.split('(')[1].rstrip(')') if '(' in location else ''
return 'created'
except urllib.error.HTTPError as e:
# Lire le corps de l'erreur pour debug
try:
@@ -413,23 +546,33 @@ def _http_post(url: str, data: dict, headers: dict) -> str:
except Exception:
err_body = str(e)
print(f" ⚠️ HTTP POST {url}{e.code}: {err_body}")
http_requests_total.labels(broker=broker, method="POST", status_code=str(e.code)).inc()
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc()
return ''
except Exception as e:
http_requests_total.labels(broker=broker, method="POST", status_code="exception").inc()
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc()
print(f" ⚠️ HTTP POST {url}{e}")
return ''
def _http_put(url: str, data: dict, headers: dict) -> bool:
def _http_put(url: str, data: dict, headers: dict, broker: str = "unknown") -> bool:
try:
body = json.dumps(data).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="PUT")
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status in (200, 204)
with http_request_duration.labels(broker=broker, method="PUT").time():
with urllib.request.urlopen(req, timeout=5) as resp:
http_requests_total.labels(broker=broker, method="PUT", status_code=str(resp.status)).inc()
return resp.status in (200, 204)
except urllib.error.HTTPError as e:
http_requests_total.labels(broker=broker, method="PUT", status_code=str(e.code)).inc()
if e.code == 409:
return True # Already exists - that's fine
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="http_error").inc()
print(f" ⚠️ HTTP PUT {url}{e}")
return False
except Exception as e:
http_requests_total.labels(broker=broker, method="PUT", status_code="exception").inc()
messages_errors_total.labels(broker=broker, sensor_type="http", error_type="exception").inc()
print(f" ⚠️ HTTP PUT {url}{e}")
return False
@@ -469,14 +612,19 @@ class MultiMQTT:
with self._lock:
if rc == 0:
self.ok[name] = True
mqtt_broker_connected.labels(broker=name).set(1)
mqtt_connection_total.labels(broker=name, status="success").inc()
print(f"[MQTT] ✅ {name} connecté")
else:
self.ok[name] = False
mqtt_broker_connected.labels(broker=name).set(0)
mqtt_connection_total.labels(broker=name, status="failure").inc()
print(f"[MQTT] ❌ {name} rc={rc}")
def _on_disconnect(self, name: str):
with self._lock:
self.ok[name] = False
mqtt_broker_connected.labels(broker=name).set(0)
print(f"[MQTT] ⚠️ {name} déconnecté")
def _setup(self):
@@ -493,16 +641,25 @@ class MultiMQTT:
self.ok[name] = False
time.sleep(3) # Attend les connexions
def publish(self, topic: str, payload: str) -> dict[str, bool]:
def publish(self, topic: str, payload: str, sensor_type: str = "unknown") -> dict[str, bool]:
results = {}
payload_bytes = len(payload.encode())
with self._lock:
for name, client in self.clients.items():
if self.ok.get(name, False):
try:
r = client.publish(topic, payload, qos=1)
results[name] = (r.rc == mqtt.MQTT_ERR_SUCCESS)
except Exception:
results[name] = False
with publish_duration.labels(broker=name).time():
try:
r = client.publish(topic, payload, qos=1)
success = (r.rc == mqtt.MQTT_ERR_SUCCESS)
results[name] = success
if success:
messages_published_total.labels(broker=name, sensor_type=sensor_type).inc()
message_payload_size.labels(broker=name).observe(payload_bytes)
else:
messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="mqtt_rc").inc()
except Exception:
results[name] = False
messages_errors_total.labels(broker=name, sensor_type=sensor_type, error_type="exception").inc()
else:
results[name] = False
return results
@@ -546,28 +703,38 @@ def publish_stellio(sid: str, sensor: dict) -> bool:
try:
body = json.dumps(entity).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=8) as resp:
print(f" 🏢 Stellio: ✅ (HTTP {resp.status})")
return True
with http_request_duration.labels(broker="stellio", method="POST").time():
with urllib.request.urlopen(req, timeout=8) as resp:
http_requests_total.labels(broker="stellio", method="POST", status_code=str(resp.status)).inc()
print(f" 🏢 Stellio: ✅ (HTTP {resp.status})")
return True
except urllib.error.HTTPError as e:
http_requests_total.labels(broker="stellio", method="POST", status_code=str(e.code)).inc()
if e.code == 409: # Already exists, do update with PUT
try:
entity_id = urllib.parse.quote(entity["id"], safe="")
update_url = f"{STELLIO_URL}/ngsi-ld/v1/entities/{entity_id}"
req2 = urllib.request.Request(update_url, data=body, headers=headers, method="PUT")
with urllib.request.urlopen(req2, timeout=8) as resp2:
print(f" 🏢 Stellio: ✅ (HTTP {resp2.status} updated)")
return True
with http_request_duration.labels(broker="stellio", method="PUT").time():
with urllib.request.urlopen(req2, timeout=8) as resp2:
http_requests_total.labels(broker="stellio", method="PUT", status_code=str(resp2.status)).inc()
print(f" 🏢 Stellio: ✅ (HTTP {resp2.status} updated)")
return True
except Exception as e2:
http_requests_total.labels(broker="stellio", method="PUT", status_code="error").inc()
messages_errors_total.labels(broker="stellio", sensor_type=stype, error_type="http_error").inc()
print(f" ⚠️ Stellio update failed: {e2}")
return False
try:
err = e.read().decode()[:300]
except Exception:
err = str(e)
messages_errors_total.labels(broker="stellio", sensor_type=stype, error_type="http_error").inc()
print(f" ⚠️ Stellio → {e.code}: {err}")
return False
except Exception as e:
http_requests_total.labels(broker="stellio", method="POST", status_code="exception").inc()
messages_errors_total.labels(broker="stellio", sensor_type=stype, error_type="exception").inc()
print(f" ⚠️ Stellio → {e}")
return False
@@ -584,25 +751,32 @@ def publish_orion(sid: str, sensor: dict) -> bool:
body = json.dumps(entity).encode()
req = urllib.request.Request(f"{base}/entities", data=body,
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="POST")
with urllib.request.urlopen(req, timeout=8) as resp:
print(f" 🌐 Orion-LD: ✅ (HTTP {resp.status} created)")
return True
with http_request_duration.labels(broker="orion_ld", method="POST").time():
with urllib.request.urlopen(req, timeout=8) as resp:
http_requests_total.labels(broker="orion_ld", method="POST", status_code=str(resp.status)).inc()
print(f" 🌐 Orion-LD: ✅ (HTTP {resp.status} created)")
return True
except urllib.error.HTTPError as e:
http_requests_total.labels(broker="orion_ld", method="POST", status_code=str(e.code)).inc()
if e.code != 409:
messages_errors_total.labels(broker="orion_ld", sensor_type=stype, error_type="http_error").inc()
print(f" ⚠️ Orion-LD → {e.code}: {e.read().decode()[:200]}")
return False
# 409 = déjà existant → PATCH
# 2. Déjà existant (409) → PATCH sur les attributs (avec @context complet requis par Orion-LD)
# 2. Déjà existant (409) → PATCH sur les attributs
try:
# Orion-LD exige @context même dans le PATCH
eid = urllib.parse.quote(entity['id'], safe='')
patch_url = f"{base}/entities/{eid}/attrs"
req2 = urllib.request.Request(patch_url, data=body,
headers={"Content-Type": "application/ld+json", "Accept": "application/ld+json"}, method="PATCH")
with urllib.request.urlopen(req2, timeout=8) as resp2:
print(f" 🌐 Orion-LD: ✅ (HTTP {resp2.status} updated)")
return True
with http_request_duration.labels(broker="orion_ld", method="PATCH").time():
with urllib.request.urlopen(req2, timeout=8) as resp2:
http_requests_total.labels(broker="orion_ld", method="PATCH", status_code=str(resp2.status)).inc()
print(f" 🌐 Orion-LD: ✅ (HTTP {resp2.status} updated)")
return True
except Exception as e2:
http_requests_total.labels(broker="orion_ld", method="PATCH", status_code="error").inc()
messages_errors_total.labels(broker="orion_ld", sensor_type=stype, error_type="http_error").inc()
print(f" ⚠️ Orion-LD PATCH failed: {e2}")
return False
@@ -650,10 +824,15 @@ def publish_bunkerm(sid: str, sensor: dict, values: dict) -> bool:
method="POST"
)
try:
with opener.open(req, timeout=5) as resp:
print(f" ✅ BunkerM: HTTP {resp.status}")
return resp.status in (200, 201, 204)
with http_request_duration.labels(broker="bunkerm", method="POST").time():
with opener.open(req, timeout=5) as resp:
http_requests_total.labels(broker="bunkerm", method="POST", status_code=str(resp.status)).inc()
messages_published_total.labels(broker="bunkerm", sensor_type=sensor["type"]).inc()
print(f" ✅ BunkerM: HTTP {resp.status}")
return resp.status in (200, 201, 204)
except Exception as e:
http_requests_total.labels(broker="bunkerm", method="POST", status_code="exception").inc()
messages_errors_total.labels(broker="bunkerm", sensor_type=sensor["type"], error_type="exception").inc()
print(f" ⚠️ BunkerM POST → {e}")
return False
@@ -678,7 +857,7 @@ def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool:
}
}
}
if _http_post(obs_url, obs, FROST_HEADERS):
if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"):
print(f" ✅ FROST Observation {sid}/{field} → OK (cached)")
return True
else:
@@ -691,7 +870,7 @@ def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool:
topic = f"city/sensors/{stype}/{sid}"
thing_payload, datastreams = _frost_payload(sid, sensor, source="simulator", topic=topic)
print(f" 📊 FROST: POST Thing {sid}...")
tid = _http_post(f"{FROST_URL}/Things", thing_payload, FROST_HEADERS)
tid = _http_post(f"{FROST_URL}/Things", thing_payload, FROST_HEADERS, broker="frost")
if not tid:
print(f" ⚠️ FROST Thing {sid} → échec création")
return False
@@ -702,7 +881,7 @@ def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool:
for f, ds, _ in datastreams:
ds["Thing"] = {"@iot.id": tid}
print(f" 📊 FROST: POST Datastream {sid}/{f}...")
ds_id = _http_post(f"{FROST_URL}/Datastreams", ds, FROST_HEADERS)
ds_id = _http_post(f"{FROST_URL}/Datastreams", ds, FROST_HEADERS, broker="frost")
if ds_id:
print(f" ✅ FROST Datastream {sid}/{f} créé (ID: {ds_id})")
ds_map[f] = ds_id
@@ -716,7 +895,7 @@ def publish_frost(sid: str, sensor: dict, field: str, value: float) -> bool:
ds_id = ds_map[field]
obs_url = f"{FROST_URL}/Datastreams({ds_id})/Observations"
obs = {"resultTime": datetime.now(timezone.utc).isoformat(), "result": value}
if _http_post(obs_url, obs, FROST_HEADERS):
if _http_post(obs_url, obs, FROST_HEADERS, broker="frost"):
print(f" ✅ FROST Observation {sid}/{field} → OK")
return True
return False
@@ -772,12 +951,19 @@ def _or_put(asset_id: str, payload: dict) -> bool:
"If-Match": str(payload.get("version", 1)),
},
method="PUT")
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status in (200, 204)
with http_request_duration.labels(broker="openremote", method="PUT").time():
with urllib.request.urlopen(req, timeout=5) as resp:
http_requests_total.labels(broker="openremote", method="PUT", status_code=str(resp.status)).inc()
messages_published_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown")).inc()
return resp.status in (200, 204)
except urllib.error.HTTPError as e:
http_requests_total.labels(broker="openremote", method="PUT", status_code=str(e.code)).inc()
messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="http_error").inc()
print(f" ⚠️ OR PUT {asset_id} → HTTP {e.code}")
return False
except Exception as e:
http_requests_total.labels(broker="openremote", method="PUT", status_code="exception").inc()
messages_errors_total.labels(broker="openremote", sensor_type=payload.get("type", "unknown"), error_type="exception").inc()
print(f" ⚠️ OR PUT {asset_id}{e}")
return False
@@ -862,17 +1048,20 @@ def _init_pulsar() -> bool:
def publish_pulsar(sid: str, sensor: dict, payload: dict) -> bool:
"""Publie un message sur Pulsar via le client Python (port binaire 6650)."""
stype = sensor["type"]
topic = f"persistent://public/default/smartcity-{stype}"
topic = f"persistent://public/default/smartcity-{stype.replace('-','')}"
try:
import pulsar
# Utiliser le client Pulsar binaire (socket 6650)
client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650")
producer = client.create_producer(topic)
body = json.dumps(payload, ensure_ascii=False).encode()
producer.send(body, properties={"sensor_id": sid, "source": "simulator"})
client.close()
with publish_duration.labels(broker="pulsar").time():
client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650")
producer = client.create_producer(topic)
body = json.dumps(payload, ensure_ascii=False).encode()
producer.send(body, properties={"sensor_id": sid, "source": "simulator"})
client.close()
messages_published_total.labels(broker="pulsar", sensor_type=stype).inc()
message_payload_size.labels(broker="pulsar").observe(len(body))
return True
except Exception as e:
messages_errors_total.labels(broker="pulsar", sensor_type=stype, error_type="exception").inc()
print(f" ⚠️ Pulsar → {e}")
return False
@@ -922,18 +1111,27 @@ def publish_redpanda(sid: str, sensor: dict, payload: dict) -> bool:
headers={"Content-Type": "application/vnd.kafka.json.v2+json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=8) as resp:
return resp.status in (200, 201, 204)
with http_request_duration.labels(broker="redpanda", method="POST").time():
with urllib.request.urlopen(req, timeout=8) as resp:
http_requests_total.labels(broker="redpanda", method="POST", status_code=str(resp.status)).inc()
messages_published_total.labels(broker="redpanda", sensor_type=stype).inc()
message_payload_size.labels(broker="redpanda").observe(len(body.encode()))
return resp.status in (200, 201, 204)
except urllib.error.HTTPError as e:
http_requests_total.labels(broker="redpanda", method="POST", status_code=str(e.code)).inc()
messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="http_error").inc()
print(f" ⚠️ Redpanda → {e.code}")
return False
except Exception as e:
http_requests_total.labels(broker="redpanda", method="POST", status_code="exception").inc()
messages_errors_total.labels(broker="redpanda", sensor_type=stype, error_type="exception").inc()
print(f" ⚠️ Redpanda → {e}")
return False
def publish_influx(sid: str, sensor: dict, values: dict) -> bool:
"""Write sensor data to InfluxDB (async, non-blocking)."""
if not _influx_write_api:
influx_write_total.labels(status="skipped").inc()
return False
def _write_async():
@@ -955,8 +1153,10 @@ def publish_influx(sid: str, sensor: dict, values: dict) -> bool:
if points:
_influx_write_api.write(bucket=INFLUX_BUCKET, record=points)
influx_write_total.labels(status="success").inc()
print(f" 📈 InfluxDB: {len(points)} points written")
except Exception as e:
influx_write_total.labels(status="error").inc()
print(f" ⚠️ InfluxDB → {e}")
# Exécution asynchrone (non-bloquante)
@@ -972,6 +1172,14 @@ def main():
print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}")
print(f"[CFG] InfluxDB: {ENABLE_INFLUX} | Pulsar: {ENABLE_PULSAR} | Redpanda: {ENABLE_REDPANDA}")
# --- Démarrer le serveur Prometheus ---
_start_metrics_server()
# --- Configurer les gauges ---
for stype, count in SENSOR_COUNTS.items():
sensors_total.labels(sensor_type=stype).set(count)
up.set(1)
# Init connectivity checks
if ENABLE_PULSAR:
_init_pulsar()
@@ -989,6 +1197,7 @@ def main():
def signal_handler(*_):
nonlocal running
running = False
up.set(0)
print("\n[SIM] 🛑 Arrêt...")
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
@@ -1025,7 +1234,7 @@ def main():
msg = json.dumps(payload_mqtt, ensure_ascii=False)
# --- MQTT publish ---
results = mqtt_client.publish(topic, msg)
results = mqtt_client.publish(topic, msg, sensor_type=stype)
ok_mqtt = [n for n, r in results.items() if r]
if ok_mqtt:
print(f" 📤 {topic}{','.join(ok_mqtt)}")