#!/usr/bin/env python3 """Pulsar Consumer → Republish to MQTT/FIWARE Brokers""" import pulsar, json, time, sys from datetime import datetime, timezone PULSAR_HOST = "smart-city-pulsar" TOPICS = ["persistent://public/default/smartcity-traffic", "persistent://public/default/smartcity-airquality", "persistent://public/default/smartcity-parking", "persistent://public/default/smartcity-noise", "persistent://public/default/smartcity-weather", "persistent://public/default/smartcity-light"] def publish_mqtt(payload_dict): """Publie sur EMQX (MQTT)""" try: import paho.mqtt.client as mqtt client = mqtt.Client() client.connect("emqx_emqx_1", 1883, 60) topic = f"city/sensors/{payload_dict.get('type', 'unknown')}/{payload_dict.get('id', 'unknown')}" client.publish(topic, json.dumps(payload_dict), qos=1) client.disconnect() return True except Exception as e: print(f" ⚠️ MQTT → {e}") return False def publish_ngsi_ld(payload_dict, broker_url, headers): """Publie sur Orion-LD ou Stellio (NGSI-LD)""" try: import urllib.request data = json.dumps(payload_dict).encode() req = urllib.request.Request(broker_url, data=data, headers=headers, method="POST") with urllib.request.urlopen(req, timeout=5) as resp: return resp.status in (200, 201, 204) except Exception as e: print(f" ⚠️ NGSI-LD → {e}") return False def main(): client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650") consumers = [] for topic in TOPICS: cons = client.subscribe(topic, subscription_name="smartcity-distribution") consumers.append((topic, cons)) print(f"[DISTRIB] ✅ Listening on {len(TOPICS)} topics...") while True: for topic, consumer in consumers: try: msg = consumer.receive(timeout_millis=1000) data = json.loads(msg.data().decode()) print(f"[DISTRIB] {topic} → MQTT + NGSI-LD") # Republish to MQTT publish_mqtt(data) # Republish to NGSI-LD (Orion-LD) ngsi_payload = data # Assume déjà formaté publish_ngsi_ld(ngsi_payload, "http://fiware-gis-quickstart-orion-1:1026/ngsi-ld/v1/entities", {"Content-Type": "application/ld+json"}) consumer.acknowledge(msg) except Exception: pass time.sleep(1) if __name__ == "__main__": main()