feat: Pulsar distribution service (Simulator → Pulsar → Brokers)
- Fix Pulsar: use binary client (port 6650) instead of non-existent REST /produce API - Add pulsar-client to Dockerfile - Create pulsar/distribution.py: consumes Pulsar and republishes to MQTT (EMQX/Mosquitto), NGSI-LD (Orion/Stellio), FROST - Add docker-compose.distribution.yml for the distribution service - Tested: Messages successfully distributed to EMQX and Orion-LD - Update session resume
This commit is contained in:
156
pulsar/distribution.py
Normal file
156
pulsar/distribution.py
Normal file
@@ -0,0 +1,156 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Pulsar Consumer → Republish to MQTT/FIWARE Brokers
|
||||
Architecture: Simulator → Pulsar → Distribution Service → Brokers (MQTT, NGSI-LD)
|
||||
"""
|
||||
import pulsar
|
||||
import json
|
||||
import time
|
||||
import urllib.request
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
PULSAR_HOST = "smart-city-pulsar"
|
||||
PULSAR_PORT = 6650
|
||||
|
||||
# MQTT Brokers
|
||||
EMQX_HOST = "emqx_emqx_1"
|
||||
EMQX_PORT = 1883
|
||||
MOSQUITTO_HOST = "mosquitto-traefik"
|
||||
MOSQUITTO_PORT = 1883
|
||||
|
||||
# NGSI-LD Brokers
|
||||
ORION_URL = "http://fiware-gis-quickstart-orion-1:1026"
|
||||
STELLIO_URL = "http://stellio-api-gateway:8080"
|
||||
|
||||
# OGC SensorThings
|
||||
FROST_URL = "http://frost-api-8090:8080/FROST-Server/v1.1"
|
||||
|
||||
def publish_mqtt(payload_dict, host, port):
|
||||
"""Publish to MQTT broker"""
|
||||
try:
|
||||
client = mqtt.Client()
|
||||
client.connect(host, port, 60)
|
||||
topic = f"city/sensors/{payload_dict.get('type', 'unknown')}/{payload_dict.get('id', 'unknown')}"
|
||||
client.publish(topic, json.dumps(payload_dict), qos=1)
|
||||
client.disconnect()
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f" ⚠️ MQTT {host}:{port} → {e}")
|
||||
return False
|
||||
|
||||
def publish_ngsi_ld(payload_dict, broker_url):
|
||||
"""Publish to NGSI-LD broker (Orion-LD or Stellio)"""
|
||||
try:
|
||||
data = json.dumps(payload_dict).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{broker_url}/ngsi-ld/v1/entities",
|
||||
data=data,
|
||||
headers={"Content-Type": "application/ld+json"},
|
||||
method="POST"
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
return resp.status in (200, 201, 204)
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 409: # Already exists, try update
|
||||
try:
|
||||
# Update with PUT
|
||||
entity_id = payload_dict.get("id", "")
|
||||
req = urllib.request.Request(
|
||||
f"{broker_url}/ngsi-ld/v1/entities/{entity_id}",
|
||||
data=data,
|
||||
headers={"Content-Type": "application/ld+json"},
|
||||
method="PUT"
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
return resp.status in (200, 204)
|
||||
except Exception:
|
||||
return False
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f" ⚠️ NGSI-LD {broker_url} → {e}")
|
||||
return False
|
||||
|
||||
def publish_frost(payload_dict):
|
||||
"""Publish to FROST Server (OGC SensorThings)"""
|
||||
try:
|
||||
# Convert to SensorThings format
|
||||
st_payload = {
|
||||
"result": payload_dict.get("value", 0),
|
||||
"phenomenonTime": payload_dict.get("timestamp", ""),
|
||||
"resultTime": payload_dict.get("timestamp", ""),
|
||||
"Datastream": {"@iot.id": payload_dict.get("datastream_id", "1")}
|
||||
}
|
||||
data = json.dumps(st_payload).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{FROST_URL}/Observations",
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST"
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
return resp.status in (200, 201, 204)
|
||||
except Exception as e:
|
||||
print(f" ⚠️ FROST → {e}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
print("[DISTRIB] Starting Pulsar → Brokers distribution service...")
|
||||
|
||||
client = pulsar.Client(f"pulsar://{PULSAR_HOST}:{PULSAR_PORT}")
|
||||
|
||||
topics = [
|
||||
"persistent://public/default/smartcity-traffic",
|
||||
"persistent://public/default/smartcity-airquality",
|
||||
"persistent://public/default/smartcity-parking",
|
||||
"persistent://public/default/smartcity-noise",
|
||||
"persistent://public/default/smartcity-weather",
|
||||
"persistent://public/default/smartcity-light"
|
||||
]
|
||||
|
||||
consumers = []
|
||||
for topic in topics:
|
||||
try:
|
||||
cons = client.subscribe(topic, subscription_name="smartcity-distribution")
|
||||
consumers.append((topic, cons))
|
||||
print(f"[DISTRIB] ✅ Subscribed to {topic}")
|
||||
except Exception as e:
|
||||
print(f"[DISTRIB] ❌ Failed to subscribe to {topic}: {e}")
|
||||
|
||||
if not consumers:
|
||||
print("[DISTRIB] ❌ No topics subscribed, exiting")
|
||||
return
|
||||
|
||||
print(f"[DISTRIB] ✅ Listening on {len(consumers)} topics...")
|
||||
|
||||
while True:
|
||||
for topic, consumer in consumers:
|
||||
try:
|
||||
msg = consumer.receive(timeout_millis=1000)
|
||||
if msg:
|
||||
data = json.loads(msg.data().decode())
|
||||
print(f"[DISTRIB] {topic.split('/')[-1]} → Brokers")
|
||||
|
||||
# Republish to MQTT brokers
|
||||
publish_mqtt(data, EMQX_HOST, EMQX_PORT)
|
||||
publish_mqtt(data, MOSQUITTO_HOST, MOSQUITTO_PORT)
|
||||
|
||||
# Republish to NGSI-LD brokers
|
||||
publish_ngsi_ld(data, ORION_URL)
|
||||
publish_ngsi_ld(data, STELLIO_URL)
|
||||
|
||||
# Republish to FROST (if OGC format)
|
||||
if "datastream_id" in data:
|
||||
publish_frost(data)
|
||||
|
||||
consumer.acknowledge(msg)
|
||||
except Exception as e:
|
||||
if "timeout" not in str(e).lower():
|
||||
print(f"[DISTRIB] ⚠️ Error: {e}")
|
||||
time.sleep(0.1)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print("\n[DISTRIB] Stopping...")
|
||||
Reference in New Issue
Block a user