#!/usr/bin/env python3 """ Redpanda Consumer → InfluxDB Lit les topics Redpanda et écrit dans InfluxDB pour Grafana. Architecture: Redpanda → Consumer → InfluxDB → Grafana """ import json, time, base64, threading import urllib.request from datetime import datetime, timezone # Configuration via variables d'environnement REDPANDA_BASE = "http://localhost:8082" # REST Proxy Redpanda INFLUX_URL = "http://localhost:8086" # InfluxDB INFLUX_TOKEN = "my-super-admin-token" INFLUX_ORG = "digitribe" INFLUX_BUCKET = "iot_data" SENSOR_TOPICS = ["traffic", "air-quality", "parking", "noise", "weather", "light"] def write_influxdb(sensor_type: str, payload: dict): """Écrit les données dans InfluxDB.""" try: sid = payload.get("id", "") sname = payload.get("name", sid) lat = payload.get("lat", 14.6) lon = payload.get("lon", -61.0) # Extraire les champs numériques du payload fields = {k: v for k, v in payload.items() if isinstance(v, (int, float)) and k not in ("lat", "lon")} if not fields: return points = [] for field, value in fields.items(): line = ( f"{sensor_type},sensor_id={sid},location={sname.replace(' ','_')} " f"{field}={value},lat={lat},lon={lon}" ) points.append(line) data = "\n".join(points) req = urllib.request.Request( f"{INFLUX_URL}/api/v2/write?org={INFLUX_ORG}&bucket={INFLUX_BUCKET}", data=data.encode(), headers={"Authorization": f"Token {INFLUX_TOKEN}", "Content-Type": "text/plain; charset=utf-8"}, method="POST" ) with urllib.request.urlopen(req, timeout=8) as resp: if resp.status in (200, 204): print(f" ✅ InfluxDB: {sensor_type}/{sid} → {len(fields)} fields") return resp.status except Exception as e: print(f" ⚠️ InfluxDB → {e}") return None def consume_redpanda_topic(topic: str): """Consomme les derniers messages d'un topic Redpanda.""" try: # Récupérer les offsets actuels req = urllib.request.Request( f"{REDPANDA_BASE}/topics/{topic}/offsets", headers={"Accept": "application/json"}, ) with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read().decode()) offsets = data.get("partitions", []) if not offsets: return # Récupérer les derniers messages (50 derniers) req2 = urllib.request.Request( f"{REDPANDA_BASE}/topics/{topic}?offset=-50&limit=50", headers={"Accept": "application/json"}, ) with urllib.request.urlopen(req2, timeout=8) as resp2: result = json.loads(resp2.read().decode()) messages = result.get("messages", []) for msg in messages: if msg.get("value"): b64 = msg["value"] decoded = base64.b64decode(b64).decode() payload = json.loads(decoded) write_influxdb(topic, payload) except Exception as e: print(f" ⚠️ Redpanda/{topic} → {e}") def poll_topics(): """Boucle principale — poll toutes les 10 secondes.""" print("[REDKAN] Redpanda Consumer → InfluxDB") print(f"[CFG] Topics: {SENSOR_TOPICS}") print(f"[CFG] InfluxDB: {INFLUX_URL}") while True: for topic in SENSOR_TOPICS: consume_redpanda_topic(topic) print(f"[REDKAN] Cycle terminé — pause 10s") time.sleep(10) if __name__ == "__main__": poll_topics()