- Add Pulsar distribution service (consumes smartcity-* → MQTT + context brokers) - Add Redpanda → InfluxDB consumer (redpanda/consumer.py) - Update FIXED_LOCATIONS with exact OpenRemote asset coordinates - Fix Pulsar topics (underscore: smartcity-traffic not smartcity-traffic) - Fix prometheus.yml endpoints (Redpanda:9644, comment inactive stacks) - Add docker-compose.redpanda-consumer.yml
102 lines
3.8 KiB
Python
102 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Redpanda Consumer → InfluxDB
|
|
Lit les topics Redpanda et écrit dans InfluxDB pour Grafana.
|
|
Architecture: Redpanda → Consumer → InfluxDB → Grafana
|
|
"""
|
|
import json, time, base64, threading
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
|
|
# Configuration via variables d'environnement
|
|
REDPANDA_BASE = "http://localhost:8082" # REST Proxy Redpanda
|
|
INFLUX_URL = "http://localhost:8086" # InfluxDB
|
|
INFLUX_TOKEN = "my-super-admin-token"
|
|
INFLUX_ORG = "digitribe"
|
|
INFLUX_BUCKET = "iot_data"
|
|
|
|
SENSOR_TOPICS = ["traffic", "air-quality", "parking", "noise", "weather", "light"]
|
|
|
|
def write_influxdb(sensor_type: str, payload: dict):
|
|
"""Écrit les données dans InfluxDB."""
|
|
try:
|
|
sid = payload.get("id", "")
|
|
sname = payload.get("name", sid)
|
|
lat = payload.get("lat", 14.6)
|
|
lon = payload.get("lon", -61.0)
|
|
|
|
# Extraire les champs numériques du payload
|
|
fields = {k: v for k, v in payload.items()
|
|
if isinstance(v, (int, float)) and k not in ("lat", "lon")}
|
|
|
|
if not fields:
|
|
return
|
|
|
|
points = []
|
|
for field, value in fields.items():
|
|
line = (
|
|
f"{sensor_type},sensor_id={sid},location={sname.replace(' ','_')} "
|
|
f"{field}={value},lat={lat},lon={lon}"
|
|
)
|
|
points.append(line)
|
|
|
|
data = "\n".join(points)
|
|
req = urllib.request.Request(
|
|
f"{INFLUX_URL}/api/v2/write?org={INFLUX_ORG}&bucket={INFLUX_BUCKET}",
|
|
data=data.encode(),
|
|
headers={"Authorization": f"Token {INFLUX_TOKEN}", "Content-Type": "text/plain; charset=utf-8"},
|
|
method="POST"
|
|
)
|
|
with urllib.request.urlopen(req, timeout=8) as resp:
|
|
if resp.status in (200, 204):
|
|
print(f" ✅ InfluxDB: {sensor_type}/{sid} → {len(fields)} fields")
|
|
return resp.status
|
|
except Exception as e:
|
|
print(f" ⚠️ InfluxDB → {e}")
|
|
return None
|
|
|
|
def consume_redpanda_topic(topic: str):
|
|
"""Consomme les derniers messages d'un topic Redpanda."""
|
|
try:
|
|
# Récupérer les offsets actuels
|
|
req = urllib.request.Request(
|
|
f"{REDPANDA_BASE}/topics/{topic}/offsets",
|
|
headers={"Accept": "application/json"},
|
|
)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
data = json.loads(resp.read().decode())
|
|
offsets = data.get("partitions", [])
|
|
if not offsets:
|
|
return
|
|
# Récupérer les derniers messages (50 derniers)
|
|
req2 = urllib.request.Request(
|
|
f"{REDPANDA_BASE}/topics/{topic}?offset=-50&limit=50",
|
|
headers={"Accept": "application/json"},
|
|
)
|
|
with urllib.request.urlopen(req2, timeout=8) as resp2:
|
|
result = json.loads(resp2.read().decode())
|
|
messages = result.get("messages", [])
|
|
for msg in messages:
|
|
if msg.get("value"):
|
|
b64 = msg["value"]
|
|
decoded = base64.b64decode(b64).decode()
|
|
payload = json.loads(decoded)
|
|
write_influxdb(topic, payload)
|
|
except Exception as e:
|
|
print(f" ⚠️ Redpanda/{topic} → {e}")
|
|
|
|
def poll_topics():
|
|
"""Boucle principale — poll toutes les 10 secondes."""
|
|
print("[REDKAN] Redpanda Consumer → InfluxDB")
|
|
print(f"[CFG] Topics: {SENSOR_TOPICS}")
|
|
print(f"[CFG] InfluxDB: {INFLUX_URL}")
|
|
|
|
while True:
|
|
for topic in SENSOR_TOPICS:
|
|
consume_redpanda_topic(topic)
|
|
print(f"[REDKAN] Cycle terminé — pause 10s")
|
|
time.sleep(10)
|
|
|
|
if __name__ == "__main__":
|
|
poll_topics()
|