- Redpanda : correction start.sh (v24.3.14) - FROST : ENABLE_FROST=true dans simulator (test direct) - Pulsar : distribution.py mis à jour (mais ConnectError) - Prometheus : config ajoutée (prometheus.yml) - Grafana : datasources prêtes
307 lines
12 KiB
Python
307 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""Pulsar Consumer → Republish to MQTT/FIWARE Brokers
|
|
Architecture: Simulator → Pulsar → Distribution Service → Brokers (MQTT, NGSI-LD)
|
|
"""
|
|
import pulsar
|
|
import json
|
|
import time
|
|
import urllib.request
|
|
import paho.mqtt.client as mqtt
|
|
import os
|
|
|
|
PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar")
|
|
PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "6650"))
|
|
|
|
# MQTT Brokers
|
|
EMQX_HOST = os.environ.get("EMQX_HOST", "emqx_emqx_1")
|
|
EMQX_PORT = int(os.environ.get("EMQX_PORT", "1883"))
|
|
MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "mosquitto-traefik")
|
|
MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883"))
|
|
|
|
# NGSI-LD Brokers
|
|
ORION_URL = os.environ.get("ORION_URL", "http://fiware-gis-quickstart-orion-1:1026")
|
|
STELLIO_URL = os.environ.get("STELLIO_URL", "http://stellio-api-gateway:8080")
|
|
|
|
# OGC SensorThings
|
|
FROST_URL = os.environ.get("FROST_URL", "http://frost-api-8090:8080/FROST-Server/v1.1")
|
|
|
|
# Cache des Datastreams FROST créés
|
|
_frost_datastreams = {}
|
|
|
|
def ensure_frost_datastream(sensor_type, sensor_name):
|
|
"""Crée un Datastream FROST s'il n'existe pas, retourne l'@iot.id"""
|
|
cache_key = f"{sensor_type}_{sensor_name}"
|
|
if cache_key in _frost_datastreams:
|
|
return _frost_datastreams[cache_key]
|
|
|
|
try:
|
|
# Vérifier si le Datastream existe déjà
|
|
req = urllib.request.Request(
|
|
f"{FROST_URL}/Datastreams?$filter=name eq '{sensor_name}'",
|
|
headers={"Accept": "application/json"}
|
|
)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
data = json.loads(resp.read().decode())
|
|
if data.get("value"):
|
|
ds_id = data["value"][0]["@iot.id"]
|
|
_frost_datastreams[cache_key] = ds_id
|
|
return ds_id
|
|
except Exception:
|
|
pass # Pas trouvé, on va créer
|
|
|
|
# Créer le Datastream
|
|
try:
|
|
# 1. Créer ou récupérer Thing
|
|
thing_id = ensure_frost_thing("SmartCity Martinique")
|
|
|
|
# 2. Créer ou récupérer Sensor
|
|
sensor_id = ensure_frost_sensor(sensor_type)
|
|
|
|
# 3. Créer ou récupérer ObservedProperty
|
|
obsprop_id = ensure_frost_observed_property(sensor_type)
|
|
|
|
# 4. Créer Datastream
|
|
datastream = {
|
|
"name": sensor_name,
|
|
"description": f"Observations for {sensor_name}",
|
|
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
|
|
"unitOfMeasurement": {"name": "Units", "symbol": "u", "definition": "http://www.opengis.net/def/uom/UCUM/"},
|
|
"Thing": {"@iot.id": thing_id},
|
|
"Sensor": {"@iot.id": sensor_id},
|
|
"ObservedProperty": {"@iot.id": obsprop_id}
|
|
}
|
|
req = urllib.request.Request(
|
|
f"{FROST_URL}/Datastreams",
|
|
data=json.dumps(datastream).encode(),
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST"
|
|
)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
if resp.status in (201, 200):
|
|
# Récupérer l'ID depuis le header Location
|
|
location = resp.headers.get("Location", "")
|
|
if location:
|
|
ds_id = location.split("(")[-1].rstrip(")")
|
|
else:
|
|
# Fallback : requête GET
|
|
ds_id = ensure_frost_datastream(sensor_type, sensor_name) # Retry to get ID
|
|
_frost_datastreams[cache_key] = ds_id
|
|
return ds_id
|
|
except Exception as e:
|
|
print(f" ⚠️ FROST Create Datastream → {e}")
|
|
return None
|
|
|
|
def ensure_frost_thing(name):
|
|
"""Crée ou récupére un Thing"""
|
|
try:
|
|
req = urllib.request.Request(f"{FROST_URL}/Things?$filter=name eq '{name}'")
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
data = json.loads(resp.read().decode())
|
|
if data.get("value"):
|
|
return data["value"][0]["@iot.id"]
|
|
# Créer
|
|
thing = {"name": name, "description": "Smart City Digital Twin Martinique"}
|
|
req = urllib.request.Request(
|
|
f"{FROST_URL}/Things",
|
|
data=json.dumps(thing).encode(),
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST"
|
|
)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
if resp.status in (201, 200):
|
|
return resp.headers.get("Location", "").split("(")[-1].rstrip(")")
|
|
except Exception as e:
|
|
print(f" ⚠️ FROST Thing → {e}")
|
|
return "1"
|
|
|
|
def ensure_frost_sensor(sensor_type):
|
|
"""Crée ou récupére un Sensor"""
|
|
try:
|
|
req = urllib.request.Request(f"{FROST_URL}/Sensors?$filter=name eq '{sensor_type}'")
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
data = json.loads(resp.read().decode())
|
|
if data.get("value"):
|
|
return data["value"][0]["@iot.id"]
|
|
sensor = {"name": sensor_type, "description": f"Sensor for {sensor_type}"}
|
|
req = urllib.request.Request(
|
|
f"{FROST_URL}/Sensors",
|
|
data=json.dumps(sensor).encode(),
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST"
|
|
)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
if resp.status in (201, 200):
|
|
return resp.headers.get("Location", "").split("(")[-1].rstrip(")")
|
|
except Exception as e:
|
|
print(f" ⚠️ FROST Sensor → {e}")
|
|
return "1"
|
|
|
|
def ensure_frost_observed_property(sensor_type):
|
|
"""Crée ou récupére un ObservedProperty"""
|
|
prop_map = {
|
|
"traffic": ("Traffic Flow", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
|
|
"airquality": ("Air Quality", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
|
|
"parking": ("Parking Occupancy", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
|
|
"noise": ("Noise Level", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
|
|
"weather": ("Weather", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
|
|
"light": ("Light Intensity", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement")
|
|
}
|
|
name, definition = prop_map.get(sensor_type, (sensor_type, "http://example.org"))
|
|
try:
|
|
req = urllib.request.Request(f"{FROST_URL}/ObservedProperties?$filter=name eq '{name}'")
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
data = json.loads(resp.read().decode())
|
|
if data.get("value"):
|
|
return data["value"][0]["@iot.id"]
|
|
prop = {"name": name, "definition": definition, "description": f"Observed property for {sensor_type}"}
|
|
req = urllib.request.Request(
|
|
f"{FROST_URL}/ObservedProperties",
|
|
data=json.dumps(prop).encode(),
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST"
|
|
)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
if resp.status in (201, 200):
|
|
return resp.headers.get("Location", "").split("(")[-1].rstrip(")")
|
|
except Exception as e:
|
|
print(f" ⚠️ FROST ObservedProperty → {e}")
|
|
return "1"
|
|
|
|
def publish_mqtt(payload_dict, host, port):
|
|
"""Publish to MQTT broker"""
|
|
try:
|
|
client = mqtt.Client()
|
|
client.connect(host, port, 60)
|
|
topic = f"city/sensors/{payload_dict.get('type', 'unknown')}/{payload_dict.get('id', 'unknown')}"
|
|
client.publish(topic, json.dumps(payload_dict), qos=1)
|
|
client.disconnect()
|
|
return True
|
|
except Exception as e:
|
|
print(f" ⚠️ MQTT {host}:{port} → {e}")
|
|
return False
|
|
|
|
def publish_ngsi_ld(payload_dict, broker_url):
|
|
"""Publish to NGSI-LD broker (Orion-LD or Stellio)"""
|
|
try:
|
|
data = json.dumps(payload_dict).encode()
|
|
req = urllib.request.Request(
|
|
f"{broker_url}/ngsi-ld/v1/entities",
|
|
data=data,
|
|
headers={"Content-Type": "application/ld+json"},
|
|
method="POST"
|
|
)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
return resp.status in (200, 201, 204)
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 409: # Already exists, try update
|
|
try:
|
|
entity_id = payload_dict.get("id", "")
|
|
req = urllib.request.Request(
|
|
f"{broker_url}/ngsi-ld/v1/entities/{entity_id}",
|
|
data=data,
|
|
headers={"Content-Type": "application/ld+json"},
|
|
method="PUT"
|
|
)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
return resp.status in (200, 204)
|
|
except Exception:
|
|
return False
|
|
return False
|
|
except Exception as e:
|
|
print(f" ⚠️ NGSI-LD {broker_url} → {e}")
|
|
return False
|
|
|
|
def publish_frost(payload_dict):
|
|
"""Publish to FROST Server (OGC SensorThings)"""
|
|
try:
|
|
sensor_type = payload_dict.get("type", "unknown")
|
|
sensor_name = payload_dict.get("name", sensor_type)
|
|
|
|
# S'assurer que le Datastream existe
|
|
ds_id = ensure_frost_datastream(sensor_type, sensor_name)
|
|
if not ds_id:
|
|
print(f" ⚠️ FROST → No Datastream for {sensor_name}")
|
|
return False
|
|
|
|
# Convert to SensorThings format
|
|
st_payload = {
|
|
"result": payload_dict.get("value", payload_dict.get("temperature_celsius", 0)),
|
|
"phenomenonTime": payload_dict.get("timestamp", ""),
|
|
"resultTime": payload_dict.get("timestamp", ""),
|
|
"Datastream": {"@iot.id": ds_id}
|
|
}
|
|
data = json.dumps(st_payload).encode()
|
|
req = urllib.request.Request(
|
|
f"{FROST_URL}/Observations",
|
|
data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST"
|
|
)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
return resp.status in (200, 201, 204)
|
|
except Exception as e:
|
|
print(f" ⚠️ FROST → {e}")
|
|
return False
|
|
|
|
def main():
|
|
print("[DISTRIB] Starting Pulsar → Brokers distribution service...")
|
|
|
|
client = pulsar.Client(f"pulsar://{PULSAR_HOST}:{PULSAR_PORT}")
|
|
|
|
topics = [
|
|
"persistent://public/default/smartcity-traffic",
|
|
"persistent://public/default/smartcity-airquality",
|
|
"persistent://public/default/smartcity-parking",
|
|
"persistent://public/default/smartcity-noise",
|
|
"persistent://public/default/smartcity-weather",
|
|
"persistent://public/default/smartcity-light"
|
|
]
|
|
|
|
consumers = []
|
|
for topic in topics:
|
|
try:
|
|
cons = client.subscribe(topic, subscription_name="smartcity-distribution")
|
|
consumers.append((topic, cons))
|
|
print(f"[DISTRIB] ✅ Subscribed to {topic}")
|
|
except Exception as e:
|
|
print(f"[DISTRIB] ❌ Failed to subscribe to {topic}: {e}")
|
|
|
|
if not consumers:
|
|
print("[DISTRIB] ❌ No topics subscribed, exiting")
|
|
return
|
|
|
|
print(f"[DISTRIB] ✅ Listening on {len(consumers)} topics...")
|
|
|
|
while True:
|
|
for topic, consumer in consumers:
|
|
try:
|
|
msg = consumer.receive(timeout_millis=1000)
|
|
if msg:
|
|
data = json.loads(msg.data().decode())
|
|
print(f"[DISTRIB] {topic.split('/')[-1]} → Brokers")
|
|
|
|
# Republish to MQTT brokers
|
|
publish_mqtt(data, EMQX_HOST, EMQX_PORT)
|
|
publish_mqtt(data, MOSQUITTO_HOST, MOSQUITTO_PORT)
|
|
|
|
# Republish to NGSI-LD brokers
|
|
publish_ngsi_ld(data, ORION_URL)
|
|
publish_ngsi_ld(data, STELLIO_URL)
|
|
|
|
# Republish to FROST (OGC SensorThings)
|
|
publish_frost(data)
|
|
|
|
consumer.acknowledge(msg)
|
|
except Exception as e:
|
|
if "timeout" not in str(e).lower():
|
|
print(f"[DISTRIB] ⚠️ Error: {e}")
|
|
time.sleep(0.1)
|
|
|
|
time.sleep(1)
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
print("\n[DISTRIB] Stopping...")
|