Files
Eric FELIXINE c06acf4fe8 feat: distribution service + redpanda consumer + updated flow diagram
- Add Pulsar distribution service (consumes smartcity-* → MQTT + context brokers)
- Add Redpanda → InfluxDB consumer (redpanda/consumer.py)
- Update FIXED_LOCATIONS with exact OpenRemote asset coordinates
- Fix Pulsar topics (underscore: smartcity-traffic not smartcity-traffic)
- Fix prometheus.yml endpoints (Redpanda:9644, comment inactive stacks)
- Add docker-compose.redpanda-consumer.yml
2026-05-05 22:12:38 -04:00

102 lines
3.8 KiB
Python

#!/usr/bin/env python3
"""
Redpanda Consumer → InfluxDB
Lit les topics Redpanda et écrit dans InfluxDB pour Grafana.
Architecture: Redpanda → Consumer → InfluxDB → Grafana
"""
import json, time, base64, threading
import urllib.request
from datetime import datetime, timezone
# Configuration via variables d'environnement
REDPANDA_BASE = "http://localhost:8082" # REST Proxy Redpanda
INFLUX_URL = "http://localhost:8086" # InfluxDB
INFLUX_TOKEN = "my-super-admin-token"
INFLUX_ORG = "digitribe"
INFLUX_BUCKET = "iot_data"
SENSOR_TOPICS = ["traffic", "air-quality", "parking", "noise", "weather", "light"]
def write_influxdb(sensor_type: str, payload: dict):
"""Écrit les données dans InfluxDB."""
try:
sid = payload.get("id", "")
sname = payload.get("name", sid)
lat = payload.get("lat", 14.6)
lon = payload.get("lon", -61.0)
# Extraire les champs numériques du payload
fields = {k: v for k, v in payload.items()
if isinstance(v, (int, float)) and k not in ("lat", "lon")}
if not fields:
return
points = []
for field, value in fields.items():
line = (
f"{sensor_type},sensor_id={sid},location={sname.replace(' ','_')} "
f"{field}={value},lat={lat},lon={lon}"
)
points.append(line)
data = "\n".join(points)
req = urllib.request.Request(
f"{INFLUX_URL}/api/v2/write?org={INFLUX_ORG}&bucket={INFLUX_BUCKET}",
data=data.encode(),
headers={"Authorization": f"Token {INFLUX_TOKEN}", "Content-Type": "text/plain; charset=utf-8"},
method="POST"
)
with urllib.request.urlopen(req, timeout=8) as resp:
if resp.status in (200, 204):
print(f" ✅ InfluxDB: {sensor_type}/{sid}{len(fields)} fields")
return resp.status
except Exception as e:
print(f" ⚠️ InfluxDB → {e}")
return None
def consume_redpanda_topic(topic: str):
"""Consomme les derniers messages d'un topic Redpanda."""
try:
# Récupérer les offsets actuels
req = urllib.request.Request(
f"{REDPANDA_BASE}/topics/{topic}/offsets",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read().decode())
offsets = data.get("partitions", [])
if not offsets:
return
# Récupérer les derniers messages (50 derniers)
req2 = urllib.request.Request(
f"{REDPANDA_BASE}/topics/{topic}?offset=-50&limit=50",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req2, timeout=8) as resp2:
result = json.loads(resp2.read().decode())
messages = result.get("messages", [])
for msg in messages:
if msg.get("value"):
b64 = msg["value"]
decoded = base64.b64decode(b64).decode()
payload = json.loads(decoded)
write_influxdb(topic, payload)
except Exception as e:
print(f" ⚠️ Redpanda/{topic}{e}")
def poll_topics():
"""Boucle principale — poll toutes les 10 secondes."""
print("[REDKAN] Redpanda Consumer → InfluxDB")
print(f"[CFG] Topics: {SENSOR_TOPICS}")
print(f"[CFG] InfluxDB: {INFLUX_URL}")
while True:
for topic in SENSOR_TOPICS:
consume_redpanda_topic(topic)
print(f"[REDKAN] Cycle terminé — pause 10s")
time.sleep(10)
if __name__ == "__main__":
poll_topics()