- Fix Pulsar: use binary client (port 6650) instead of non-existent REST /produce API - Add pulsar-client to Dockerfile - Create pulsar/distribution.py: consumes Pulsar and republishes to MQTT (EMQX/Mosquitto), NGSI-LD (Orion/Stellio), FROST - Add docker-compose.distribution.yml for the distribution service - Tested: Messages successfully distributed to EMQX and Orion-LD - Update session resume
65 lines
2.5 KiB
Python
65 lines
2.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Pulsar Consumer → Republish to MQTT/FIWARE Brokers"""
|
|
import pulsar, json, time, sys
|
|
from datetime import datetime, timezone
|
|
|
|
PULSAR_HOST = "smart-city-pulsar"
|
|
TOPICS = ["persistent://public/default/smartcity-traffic",
|
|
"persistent://public/default/smartcity-airquality",
|
|
"persistent://public/default/smartcity-parking",
|
|
"persistent://public/default/smartcity-noise",
|
|
"persistent://public/default/smartcity-weather",
|
|
"persistent://public/default/smartcity-light"]
|
|
|
|
def publish_mqtt(payload_dict):
|
|
"""Publie sur EMQX (MQTT)"""
|
|
try:
|
|
import paho.mqtt.client as mqtt
|
|
client = mqtt.Client()
|
|
client.connect("emqx_emqx_1", 1883, 60)
|
|
topic = f"city/sensors/{payload_dict.get('type', 'unknown')}/{payload_dict.get('id', 'unknown')}"
|
|
client.publish(topic, json.dumps(payload_dict), qos=1)
|
|
client.disconnect()
|
|
return True
|
|
except Exception as e:
|
|
print(f" ⚠️ MQTT → {e}")
|
|
return False
|
|
|
|
def publish_ngsi_ld(payload_dict, broker_url, headers):
|
|
"""Publie sur Orion-LD ou Stellio (NGSI-LD)"""
|
|
try:
|
|
import urllib.request
|
|
data = json.dumps(payload_dict).encode()
|
|
req = urllib.request.Request(broker_url, data=data, headers=headers, method="POST")
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
return resp.status in (200, 201, 204)
|
|
except Exception as e:
|
|
print(f" ⚠️ NGSI-LD → {e}")
|
|
return False
|
|
|
|
def main():
|
|
client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650")
|
|
consumers = []
|
|
for topic in TOPICS:
|
|
cons = client.subscribe(topic, subscription_name="smartcity-distribution")
|
|
consumers.append((topic, cons))
|
|
print(f"[DISTRIB] ✅ Listening on {len(TOPICS)} topics...")
|
|
while True:
|
|
for topic, consumer in consumers:
|
|
try:
|
|
msg = consumer.receive(timeout_millis=1000)
|
|
data = json.loads(msg.data().decode())
|
|
print(f"[DISTRIB] {topic} → MQTT + NGSI-LD")
|
|
# Republish to MQTT
|
|
publish_mqtt(data)
|
|
# Republish to NGSI-LD (Orion-LD)
|
|
ngsi_payload = data # Assume déjà formaté
|
|
publish_ngsi_ld(ngsi_payload, "http://fiware-gis-quickstart-orion-1:1026/ngsi-ld/v1/entities", {"Content-Type": "application/ld+json"})
|
|
consumer.acknowledge(msg)
|
|
except Exception:
|
|
pass
|
|
time.sleep(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|