#!/usr/bin/env python3 """Pulsar Consumer → Republish to MQTT/FIWARE Brokers Architecture: Simulator → Pulsar → Distribution Service → Brokers (MQTT, NGSI-LD) """ import pulsar import json import time import urllib.request import paho.mqtt.client as mqtt import os PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar") PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "6650")) # MQTT Brokers EMQX_HOST = os.environ.get("EMQX_HOST", "emqx_emqx_1") EMQX_PORT = int(os.environ.get("EMQX_PORT", "1883")) MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "mosquitto-traefik") MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883")) # NGSI-LD Brokers ORION_URL = os.environ.get("ORION_URL", "http://fiware-gis-quickstart-orion-1:1026") STELLIO_URL = os.environ.get("STELLIO_URL", "http://stellio-api-gateway:8080") # OGC SensorThings FROST_URL = os.environ.get("FROST_URL", "http://frost-api-8090:8080/FROST-Server/v1.1") # Cache des Datastreams FROST créés _frost_datastreams = {} def ensure_frost_datastream(sensor_type, sensor_name): """Crée un Datastream FROST s'il n'existe pas, retourne l'@iot.id""" cache_key = f"{sensor_type}_{sensor_name}" if cache_key in _frost_datastreams: return _frost_datastreams[cache_key] try: # Vérifier si le Datastream existe déjà req = urllib.request.Request( f"{FROST_URL}/Datastreams?$filter=name eq '{sensor_name}'", headers={"Accept": "application/json"} ) with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read().decode()) if data.get("value"): ds_id = data["value"][0]["@iot.id"] _frost_datastreams[cache_key] = ds_id return ds_id except Exception: pass # Pas trouvé, on va créer # Créer le Datastream try: # 1. Créer ou récupérer Thing thing_id = ensure_frost_thing("SmartCity Martinique") # 2. Créer ou récupérer Sensor sensor_id = ensure_frost_sensor(sensor_type) # 3. Créer ou récupérer ObservedProperty obsprop_id = ensure_frost_observed_property(sensor_type) # 4. Créer Datastream datastream = { "name": sensor_name, "description": f"Observations for {sensor_name}", "observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement", "unitOfMeasurement": {"name": "Units", "symbol": "u", "definition": "http://www.opengis.net/def/uom/UCUM/"}, "Thing": {"@iot.id": thing_id}, "Sensor": {"@iot.id": sensor_id}, "ObservedProperty": {"@iot.id": obsprop_id} } req = urllib.request.Request( f"{FROST_URL}/Datastreams", data=json.dumps(datastream).encode(), headers={"Content-Type": "application/json"}, method="POST" ) with urllib.request.urlopen(req, timeout=5) as resp: if resp.status in (201, 200): # Récupérer l'ID depuis le header Location location = resp.headers.get("Location", "") if location: ds_id = location.split("(")[-1].rstrip(")") else: # Fallback : requête GET ds_id = ensure_frost_datastream(sensor_type, sensor_name) # Retry to get ID _frost_datastreams[cache_key] = ds_id return ds_id except Exception as e: print(f" ⚠️ FROST Create Datastream → {e}") return None def ensure_frost_thing(name): """Crée ou récupére un Thing""" try: req = urllib.request.Request(f"{FROST_URL}/Things?$filter=name eq '{name}'") with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read().decode()) if data.get("value"): return data["value"][0]["@iot.id"] # Créer thing = {"name": name, "description": "Smart City Digital Twin Martinique"} req = urllib.request.Request( f"{FROST_URL}/Things", data=json.dumps(thing).encode(), headers={"Content-Type": "application/json"}, method="POST" ) with urllib.request.urlopen(req, timeout=5) as resp: if resp.status in (201, 200): return resp.headers.get("Location", "").split("(")[-1].rstrip(")") except Exception as e: print(f" ⚠️ FROST Thing → {e}") return "1" def ensure_frost_sensor(sensor_type): """Crée ou récupére un Sensor""" try: req = urllib.request.Request(f"{FROST_URL}/Sensors?$filter=name eq '{sensor_type}'") with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read().decode()) if data.get("value"): return data["value"][0]["@iot.id"] sensor = {"name": sensor_type, "description": f"Sensor for {sensor_type}"} req = urllib.request.Request( f"{FROST_URL}/Sensors", data=json.dumps(sensor).encode(), headers={"Content-Type": "application/json"}, method="POST" ) with urllib.request.urlopen(req, timeout=5) as resp: if resp.status in (201, 200): return resp.headers.get("Location", "").split("(")[-1].rstrip(")") except Exception as e: print(f" ⚠️ FROST Sensor → {e}") return "1" def ensure_frost_observed_property(sensor_type): """Crée ou récupére un ObservedProperty""" prop_map = { "traffic": ("Traffic Flow", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"), "airquality": ("Air Quality", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"), "parking": ("Parking Occupancy", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"), "noise": ("Noise Level", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"), "weather": ("Weather", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"), "light": ("Light Intensity", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement") } name, definition = prop_map.get(sensor_type, (sensor_type, "http://example.org")) try: req = urllib.request.Request(f"{FROST_URL}/ObservedProperties?$filter=name eq '{name}'") with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read().decode()) if data.get("value"): return data["value"][0]["@iot.id"] prop = {"name": name, "definition": definition, "description": f"Observed property for {sensor_type}"} req = urllib.request.Request( f"{FROST_URL}/ObservedProperties", data=json.dumps(prop).encode(), headers={"Content-Type": "application/json"}, method="POST" ) with urllib.request.urlopen(req, timeout=5) as resp: if resp.status in (201, 200): return resp.headers.get("Location", "").split("(")[-1].rstrip(")") except Exception as e: print(f" ⚠️ FROST ObservedProperty → {e}") return "1" def publish_mqtt(payload_dict, host, port): """Publish to MQTT broker""" try: client = mqtt.Client() client.connect(host, port, 60) topic = f"city/sensors/{payload_dict.get('type', 'unknown')}/{payload_dict.get('id', 'unknown')}" client.publish(topic, json.dumps(payload_dict), qos=1) client.disconnect() return True except Exception as e: print(f" ⚠️ MQTT {host}:{port} → {e}") return False def publish_ngsi_ld(payload_dict, broker_url): """Publish to NGSI-LD broker (Orion-LD or Stellio)""" try: data = json.dumps(payload_dict).encode() req = urllib.request.Request( f"{broker_url}/ngsi-ld/v1/entities", data=data, headers={"Content-Type": "application/ld+json"}, method="POST" ) with urllib.request.urlopen(req, timeout=5) as resp: return resp.status in (200, 201, 204) except urllib.error.HTTPError as e: if e.code == 409: # Already exists, try update try: entity_id = payload_dict.get("id", "") req = urllib.request.Request( f"{broker_url}/ngsi-ld/v1/entities/{entity_id}", data=data, headers={"Content-Type": "application/ld+json"}, method="PUT" ) with urllib.request.urlopen(req, timeout=5) as resp: return resp.status in (200, 204) except Exception: return False return False except Exception as e: print(f" ⚠️ NGSI-LD {broker_url} → {e}") return False def publish_frost(payload_dict): """Publish to FROST Server (OGC SensorThings)""" try: sensor_type = payload_dict.get("type", "unknown") sensor_name = payload_dict.get("name", sensor_type) # S'assurer que le Datastream existe ds_id = ensure_frost_datastream(sensor_type, sensor_name) if not ds_id: print(f" ⚠️ FROST → No Datastream for {sensor_name}") return False # Convert to SensorThings format st_payload = { "result": payload_dict.get("value", payload_dict.get("temperature_celsius", 0)), "phenomenonTime": payload_dict.get("timestamp", ""), "resultTime": payload_dict.get("timestamp", ""), "Datastream": {"@iot.id": ds_id} } data = json.dumps(st_payload).encode() req = urllib.request.Request( f"{FROST_URL}/Observations", data=data, headers={"Content-Type": "application/json"}, method="POST" ) with urllib.request.urlopen(req, timeout=5) as resp: return resp.status in (200, 201, 204) except Exception as e: print(f" ⚠️ FROST → {e}") return False def main(): print("[DISTRIB] Starting Pulsar → Brokers distribution service...") client = pulsar.Client(f"pulsar://{PULSAR_HOST}:{PULSAR_PORT}") topics = [ "persistent://public/default/smartcity-traffic", "persistent://public/default/smartcity-airquality", "persistent://public/default/smartcity-parking", "persistent://public/default/smartcity-noise", "persistent://public/default/smartcity-weather", "persistent://public/default/smartcity-light" ] consumers = [] for topic in topics: try: cons = client.subscribe(topic, subscription_name="smartcity-distribution") consumers.append((topic, cons)) print(f"[DISTRIB] ✅ Subscribed to {topic}") except Exception as e: print(f"[DISTRIB] ❌ Failed to subscribe to {topic}: {e}") if not consumers: print("[DISTRIB] ❌ No topics subscribed, exiting") return print(f"[DISTRIB] ✅ Listening on {len(consumers)} topics...") while True: for topic, consumer in consumers: try: msg = consumer.receive(timeout_millis=1000) if msg: data = json.loads(msg.data().decode()) print(f"[DISTRIB] {topic.split('/')[-1]} → Brokers") # Republish to MQTT brokers publish_mqtt(data, EMQX_HOST, EMQX_PORT) publish_mqtt(data, MOSQUITTO_HOST, MOSQUITTO_PORT) # Republish to NGSI-LD brokers publish_ngsi_ld(data, ORION_URL) publish_ngsi_ld(data, STELLIO_URL) # Republish to FROST (OGC SensorThings) publish_frost(data) consumer.acknowledge(msg) except Exception as e: if "timeout" not in str(e).lower(): print(f"[DISTRIB] ⚠️ Error: {e}") time.sleep(0.1) time.sleep(1) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n[DISTRIB] Stopping...")