Files
smart-city-digital-twin-mar…/pulsar/distribution.py
Eric FELIXINE 98954e86fb fix: Redpanda start.sh + FROST direct simulator + Prometheus config
- Redpanda : correction start.sh (v24.3.14)
- FROST : ENABLE_FROST=true dans simulator (test direct)
- Pulsar : distribution.py mis à jour (mais ConnectError)
- Prometheus : config ajoutée (prometheus.yml)
- Grafana : datasources prêtes
2026-05-05 11:29:07 -04:00

307 lines
12 KiB
Python

#!/usr/bin/env python3
"""Pulsar Consumer → Republish to MQTT/FIWARE Brokers
Architecture: Simulator → Pulsar → Distribution Service → Brokers (MQTT, NGSI-LD)
"""
import pulsar
import json
import time
import urllib.request
import paho.mqtt.client as mqtt
import os
PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar")
PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "6650"))
# MQTT Brokers
EMQX_HOST = os.environ.get("EMQX_HOST", "emqx_emqx_1")
EMQX_PORT = int(os.environ.get("EMQX_PORT", "1883"))
MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "mosquitto-traefik")
MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883"))
# NGSI-LD Brokers
ORION_URL = os.environ.get("ORION_URL", "http://fiware-gis-quickstart-orion-1:1026")
STELLIO_URL = os.environ.get("STELLIO_URL", "http://stellio-api-gateway:8080")
# OGC SensorThings
FROST_URL = os.environ.get("FROST_URL", "http://frost-api-8090:8080/FROST-Server/v1.1")
# Cache des Datastreams FROST créés
_frost_datastreams = {}
def ensure_frost_datastream(sensor_type, sensor_name):
"""Crée un Datastream FROST s'il n'existe pas, retourne l'@iot.id"""
cache_key = f"{sensor_type}_{sensor_name}"
if cache_key in _frost_datastreams:
return _frost_datastreams[cache_key]
try:
# Vérifier si le Datastream existe déjà
req = urllib.request.Request(
f"{FROST_URL}/Datastreams?$filter=name eq '{sensor_name}'",
headers={"Accept": "application/json"}
)
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read().decode())
if data.get("value"):
ds_id = data["value"][0]["@iot.id"]
_frost_datastreams[cache_key] = ds_id
return ds_id
except Exception:
pass # Pas trouvé, on va créer
# Créer le Datastream
try:
# 1. Créer ou récupérer Thing
thing_id = ensure_frost_thing("SmartCity Martinique")
# 2. Créer ou récupérer Sensor
sensor_id = ensure_frost_sensor(sensor_type)
# 3. Créer ou récupérer ObservedProperty
obsprop_id = ensure_frost_observed_property(sensor_type)
# 4. Créer Datastream
datastream = {
"name": sensor_name,
"description": f"Observations for {sensor_name}",
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
"unitOfMeasurement": {"name": "Units", "symbol": "u", "definition": "http://www.opengis.net/def/uom/UCUM/"},
"Thing": {"@iot.id": thing_id},
"Sensor": {"@iot.id": sensor_id},
"ObservedProperty": {"@iot.id": obsprop_id}
}
req = urllib.request.Request(
f"{FROST_URL}/Datastreams",
data=json.dumps(datastream).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status in (201, 200):
# Récupérer l'ID depuis le header Location
location = resp.headers.get("Location", "")
if location:
ds_id = location.split("(")[-1].rstrip(")")
else:
# Fallback : requête GET
ds_id = ensure_frost_datastream(sensor_type, sensor_name) # Retry to get ID
_frost_datastreams[cache_key] = ds_id
return ds_id
except Exception as e:
print(f" ⚠️ FROST Create Datastream → {e}")
return None
def ensure_frost_thing(name):
"""Crée ou récupére un Thing"""
try:
req = urllib.request.Request(f"{FROST_URL}/Things?$filter=name eq '{name}'")
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read().decode())
if data.get("value"):
return data["value"][0]["@iot.id"]
# Créer
thing = {"name": name, "description": "Smart City Digital Twin Martinique"}
req = urllib.request.Request(
f"{FROST_URL}/Things",
data=json.dumps(thing).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status in (201, 200):
return resp.headers.get("Location", "").split("(")[-1].rstrip(")")
except Exception as e:
print(f" ⚠️ FROST Thing → {e}")
return "1"
def ensure_frost_sensor(sensor_type):
"""Crée ou récupére un Sensor"""
try:
req = urllib.request.Request(f"{FROST_URL}/Sensors?$filter=name eq '{sensor_type}'")
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read().decode())
if data.get("value"):
return data["value"][0]["@iot.id"]
sensor = {"name": sensor_type, "description": f"Sensor for {sensor_type}"}
req = urllib.request.Request(
f"{FROST_URL}/Sensors",
data=json.dumps(sensor).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status in (201, 200):
return resp.headers.get("Location", "").split("(")[-1].rstrip(")")
except Exception as e:
print(f" ⚠️ FROST Sensor → {e}")
return "1"
def ensure_frost_observed_property(sensor_type):
"""Crée ou récupére un ObservedProperty"""
prop_map = {
"traffic": ("Traffic Flow", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
"airquality": ("Air Quality", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
"parking": ("Parking Occupancy", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
"noise": ("Noise Level", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
"weather": ("Weather", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
"light": ("Light Intensity", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement")
}
name, definition = prop_map.get(sensor_type, (sensor_type, "http://example.org"))
try:
req = urllib.request.Request(f"{FROST_URL}/ObservedProperties?$filter=name eq '{name}'")
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read().decode())
if data.get("value"):
return data["value"][0]["@iot.id"]
prop = {"name": name, "definition": definition, "description": f"Observed property for {sensor_type}"}
req = urllib.request.Request(
f"{FROST_URL}/ObservedProperties",
data=json.dumps(prop).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status in (201, 200):
return resp.headers.get("Location", "").split("(")[-1].rstrip(")")
except Exception as e:
print(f" ⚠️ FROST ObservedProperty → {e}")
return "1"
def publish_mqtt(payload_dict, host, port):
"""Publish to MQTT broker"""
try:
client = mqtt.Client()
client.connect(host, port, 60)
topic = f"city/sensors/{payload_dict.get('type', 'unknown')}/{payload_dict.get('id', 'unknown')}"
client.publish(topic, json.dumps(payload_dict), qos=1)
client.disconnect()
return True
except Exception as e:
print(f" ⚠️ MQTT {host}:{port}{e}")
return False
def publish_ngsi_ld(payload_dict, broker_url):
"""Publish to NGSI-LD broker (Orion-LD or Stellio)"""
try:
data = json.dumps(payload_dict).encode()
req = urllib.request.Request(
f"{broker_url}/ngsi-ld/v1/entities",
data=data,
headers={"Content-Type": "application/ld+json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status in (200, 201, 204)
except urllib.error.HTTPError as e:
if e.code == 409: # Already exists, try update
try:
entity_id = payload_dict.get("id", "")
req = urllib.request.Request(
f"{broker_url}/ngsi-ld/v1/entities/{entity_id}",
data=data,
headers={"Content-Type": "application/ld+json"},
method="PUT"
)
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status in (200, 204)
except Exception:
return False
return False
except Exception as e:
print(f" ⚠️ NGSI-LD {broker_url}{e}")
return False
def publish_frost(payload_dict):
"""Publish to FROST Server (OGC SensorThings)"""
try:
sensor_type = payload_dict.get("type", "unknown")
sensor_name = payload_dict.get("name", sensor_type)
# S'assurer que le Datastream existe
ds_id = ensure_frost_datastream(sensor_type, sensor_name)
if not ds_id:
print(f" ⚠️ FROST → No Datastream for {sensor_name}")
return False
# Convert to SensorThings format
st_payload = {
"result": payload_dict.get("value", payload_dict.get("temperature_celsius", 0)),
"phenomenonTime": payload_dict.get("timestamp", ""),
"resultTime": payload_dict.get("timestamp", ""),
"Datastream": {"@iot.id": ds_id}
}
data = json.dumps(st_payload).encode()
req = urllib.request.Request(
f"{FROST_URL}/Observations",
data=data,
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status in (200, 201, 204)
except Exception as e:
print(f" ⚠️ FROST → {e}")
return False
def main():
print("[DISTRIB] Starting Pulsar → Brokers distribution service...")
client = pulsar.Client(f"pulsar://{PULSAR_HOST}:{PULSAR_PORT}")
topics = [
"persistent://public/default/smartcity-traffic",
"persistent://public/default/smartcity-airquality",
"persistent://public/default/smartcity-parking",
"persistent://public/default/smartcity-noise",
"persistent://public/default/smartcity-weather",
"persistent://public/default/smartcity-light"
]
consumers = []
for topic in topics:
try:
cons = client.subscribe(topic, subscription_name="smartcity-distribution")
consumers.append((topic, cons))
print(f"[DISTRIB] ✅ Subscribed to {topic}")
except Exception as e:
print(f"[DISTRIB] ❌ Failed to subscribe to {topic}: {e}")
if not consumers:
print("[DISTRIB] ❌ No topics subscribed, exiting")
return
print(f"[DISTRIB] ✅ Listening on {len(consumers)} topics...")
while True:
for topic, consumer in consumers:
try:
msg = consumer.receive(timeout_millis=1000)
if msg:
data = json.loads(msg.data().decode())
print(f"[DISTRIB] {topic.split('/')[-1]} → Brokers")
# Republish to MQTT brokers
publish_mqtt(data, EMQX_HOST, EMQX_PORT)
publish_mqtt(data, MOSQUITTO_HOST, MOSQUITTO_PORT)
# Republish to NGSI-LD brokers
publish_ngsi_ld(data, ORION_URL)
publish_ngsi_ld(data, STELLIO_URL)
# Republish to FROST (OGC SensorThings)
publish_frost(data)
consumer.acknowledge(msg)
except Exception as e:
if "timeout" not in str(e).lower():
print(f"[DISTRIB] ⚠️ Error: {e}")
time.sleep(0.1)
time.sleep(1)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n[DISTRIB] Stopping...")