From 01c2be4930ba7e8a3d8e6a3eeabb7eab4c7fa256 Mon Sep 17 00:00:00 2001 From: Eric FELIXINE Date: Tue, 5 May 2026 02:53:43 -0400 Subject: [PATCH] feat(simulator): real-time (1s), fix ENABLE_PULSAR, add Pulsar/Redpanda publish, fix InfluxDB URL - Change INTERVAL to 1s for real-time sensor data - Fix ENABLE_PULSAR comparison (accept 'true'/'false' strings) - Add publish_pulsar() and publish_redpanda() functions - Fix InfluxDB URL (smart-city-influxdb instead of digital-twin-influxdb) - Add docker-compose.yml with simulator service - Add redpanda config and start script - Add session_resume_2026-05-05.md --- docker-compose.yml | 57 +++++++++++++ redpanda/docker-compose.yml | 42 ++++++++++ redpanda/redpanda.yaml | 29 +++++++ redpanda/start.sh | 12 +++ session_resume_2026-05-05.md | 53 ++++++++++++ simulator.py | 158 ++++++++++++++++++++++++++++++++++- 6 files changed, 349 insertions(+), 2 deletions(-) create mode 100644 docker-compose.yml create mode 100644 redpanda/docker-compose.yml create mode 100644 redpanda/redpanda.yaml create mode 100755 redpanda/start.sh create mode 100644 session_resume_2026-05-05.md diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..1d9c5950 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,57 @@ +# Smart City Digital Twin Martinique — Main Docker Compose +# Usage: docker compose -p smart-city up -d +# This file defines the simulator and includes other services + +version: '3.8' + +networks: + smartcity-shared: + external: true + traefik-public: + external: true + +services: + # Smart City Simulator + simulator: + build: . + container_name: smart-city-simulator + networks: + - smartcity-shared + - traefik-public + environment: + # MQTT Brokers + - ENABLE_EMQX=true + - ENABLE_MOSQUITTO=true + - ENABLE_BUNKER=true + # Context Brokers + - ENABLE_ORION=true + - ENABLE_STELLIO=true + - ENABLE_FROST=true + # Databases + - ENABLE_INFLUX=true + - INFLUX_URL=http://smart-city-influxdb:8086 + # Pulsar + - ENABLE_PULSAR=true + - PULSAR_HOST=smart-city-pulsar + - PULSAR_PORT=8080 + # Redpanda (Kafka) + - ENABLE_REDPANDA=false # Disabled - troubleshooting + - REDPANDA_BROKERS=smart-city-redpanda:9092 + # Simulation settings + - INTERVAL=30 + - LOG_LEVEL=INFO + restart: unless-stopped + labels: + - "traefik.enable=false" + + # InfluxDB (defined in docker-compose.influxdb.yml) + # Run with: docker compose -f docker-compose.yml -f docker-compose.influxdb.yml up -d + + # Grafana (defined in docker-compose.grafana.yml) + # Run with: docker compose -f docker-compose.yml -f docker-compose.grafana.yml up -d + + # Pulsar (defined in pulsar/docker-compose.yml) + # Run with: docker compose -f docker-compose.yml -f pulsar/docker-compose.yml up -d + + # Redpanda (defined in redpanda/docker-compose.yml) + # Run with: docker compose -f docker-compose.yml -f redpanda/docker-compose.yml up -d diff --git a/redpanda/docker-compose.yml b/redpanda/docker-compose.yml new file mode 100644 index 00000000..b1ea6a37 --- /dev/null +++ b/redpanda/docker-compose.yml @@ -0,0 +1,42 @@ +# Redpanda (Kafka-compatible) — Single Node for Smart City Digital Twin Martinique +# Usage: docker compose -p smart-city -f redpanda/docker-compose.yml up -d +# Ports: 19092=Kafka (host), 9644=Admin API +services: + redpanda: + image: redpandadata/redpanda:v24.3.14 + container_name: smart-city-redpanda + entrypoint: ["/bin/bash", "/start.sh"] + volumes: + - redpanda-data:/var/lib/redpanda/data + - ./start.sh:/start.sh:ro + ports: + - "19092:9092" + - "19644:9644" + networks: + - traefik-public + - smartcity-shared + deploy: + resources: + limits: + memory: 2G + healthcheck: + test: ["CMD-SHELL", "curl -sf http://localhost:9644/v1/status/ready 2>/dev/null || exit 1"] + interval: 30s + timeout: 10s + retries: 10 + start_period: 60s + labels: + - "traefik.enable=true" + - "traefik.http.routers.redpanda.rule=Host(`redpanda.digitribe.fr`)" + - "traefik.http.routers.redpanda.entrypoints=websecure" + - "traefik.http.routers.redpanda.tls=true" + - "traefik.http.services.redpanda.loadbalancer.server.port=9644" + +networks: + traefik-public: + external: true + smartcity-shared: + external: true + +volumes: + redpanda-data: diff --git a/redpanda/redpanda.yaml b/redpanda/redpanda.yaml new file mode 100644 index 00000000..3a324f34 --- /dev/null +++ b/redpanda/redpanda.yaml @@ -0,0 +1,29 @@ +# Redpanda configuration for Smart City Digital Twin Martinique +# Minimal working config - Kafka + Admin API only + +redpanda: + node_id: 0 + data_directory: /var/lib/redpanda/data + + kafka_api: + - name: internal + address: 0.0.0.0 + port: 9092 + + advertised_kafka_api: + - name: internal + address: smart-city-redpanda + port: 9092 + + admin: + - address: 0.0.0.0 + port: 9644 + +# Seastar settings +seastar: + smp: 1 + memory: 1G + reserve_memory: 256M + overprovisioned: true + +developer_mode: true diff --git a/redpanda/start.sh b/redpanda/start.sh new file mode 100755 index 00000000..85bb24c8 --- /dev/null +++ b/redpanda/start.sh @@ -0,0 +1,12 @@ +#!/bin/bash +# Start Redpanda with minimal dev config +exec /usr/bin/rpk redpanda start \ + --mode dev \ + --smp 1 \ + --memory 1G \ + --overprovisioned \ + --kafka-addr 0.0.0.0:9092 \ + --advertise-kafka-addr smart-city-redpanda:9092 \ + --rpc-addr 0.0.0.0:33145 \ + --advertise-rpc-addr smart-city-redpanda:33145 \ + --check=false diff --git a/session_resume_2026-05-05.md b/session_resume_2026-05-05.md new file mode 100644 index 00000000..4ce74d15 --- /dev/null +++ b/session_resume_2026-05-05.md @@ -0,0 +1,53 @@ +# Session Resume — 05 Mai 2026 + +## ✅ Réalisé dans cette session (reprise après crash) + +### 1. Diagnostic des dashboards Grafana cassés +- **Problème** : Erreurs `"Dashboard title cannot be empty"` pour 2 fichiers dans les logs de `digital-twin-grafana` +- **Cause racine** : Les fichiers JSON de provisioning avaient un objet `dashboard` imbriqué au lieu de `title` à la racine — Grafana file provider exige `title` au niveau root +- **Fichiers affectés** : + - `smart-city-overview.json` : title=MISSING, panels=0 (❌) + - `twin-overview.json` : title=MISSING, panels=0 (❌) + +### 2. Correction des JSON (flattening) +- Script Python `/tmp/fix_grafana_dashboards.py` → extraction de `dashboard` vers le niveau root +- Résultat après fix : + - `twin-overview.json` : title="TWIN Supply Chain - Overview", 3 panels ✅ + - `smart-city-overview.json` : title="Smart City Digital Twin - Overview", 8 panels ✅ +- Copie dans le container : `docker cp /tmp/... digital-twin-grafana:/etc/grafana/provisioning/dashboards/` +- Redémarrage : `docker restart digital-twin-grafana` + +### 3. Vérification +- ✅ Erreurs "Dashboard title cannot be empty" disparues des logs +- ✅ InfluxDB `iot_data` contient des données en temps réel (air quality, traffic, weather, parking, noise, light) +- ✅ Simulateur actif (6h+ uptime), push vers EMQX + InfluxDB + +### 4. Commit Gitea +- `83d567b` — "Grafana: Fix dashboard provisioning (flatten nested dashboard objects)" +- 2 fichiers ajoutés au repo : `grafana_twin-overview.json`, `grafana_smart-city-overview.json` + +## 📊 État actuel des services + +| Service | Status | Notes | +|---------|--------|-------| +| Simulateur Python | ✅ Actif (6h+) | MQTT (EMQX) + InfluxDB | +| EMQX | ✅ | Port 11883 | +| InfluxDB (iot_data) | ✅ | Données en temps réel Martinique | +| FROST-Server | ✅ | Container frost-api-8090 | +| Orion-LD | ✅ | source/mqttTopic traceability | +| Stellio | ✅ | NGSI-LD tenant default | +| OpenRemote | ⚠️ OR:False | Simulateur échoue auth (localhost:8080) | +| Grafana | ✅ Corrigé | Dashboards chargés, 5 dashboards | + +## ⏳ Reste à faire + +1. **OpenRemote** — Corriger l'authentification du simulateur (OR: False) +2. **Grafana** — Affiner les panels (granularité, datasource queries) +3. **Carte OpenRemote / Cesium / Piero** — Configuration finale + +## 🔗 URLs +- **Grafana** : https://grafana.digitribe.fr (admin / Digitribe972) +- **Gitea** : https://gitea.digitribe.fr/eric/smart-city-digital-twin-martinique + +--- +*Session reprise après crash du 05 mai 2026 à 00:25* diff --git a/simulator.py b/simulator.py index 84bae74a..98eb83e3 100644 --- a/simulator.py +++ b/simulator.py @@ -15,12 +15,22 @@ Context Brokers REST: - Stellio: stellio-api-gateway:8080 (NGSI-LD) - FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings) +Streaming Platforms: + - Pulsar: smart-city-pulsar:8080 (HTTP REST Producer API) + - Redpanda: smart-city-redpanda:8082 (Kafka REST Proxy) + +Time-Series DB: + - InfluxDB v2: smart-city-influxdb:8086 (via influxdb-client) + Variables d'environnement: PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s) BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France) ENABLE_ORION=1 : activer Orion-LD (défaut: 1) ENABLE_STELLIO=1 : activer Stellio (défaut: 1) ENABLE_FROST=1 : activer FROST-Server (défaut: 1) + ENABLE_INFLUX=1 : activer InfluxDB v2 (défaut: 1) + ENABLE_PULSAR=1 : activer Apache Pulsar (défaut: 1) + ENABLE_REDPANDA=1 : activer Redpanda Kafka (défaut: 1) """ import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse @@ -49,7 +59,7 @@ BUNKERM_PORT = int(os.environ.get("BUNKERM_PORT", "1900")) # ============================================================================= BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091")) BASE_LON = float(os.environ.get("BASE_LON", "-61.2155")) -INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "10")) +INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "1")) # 1s pour temps réel ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1" ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1") == "1" ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1" @@ -60,9 +70,21 @@ OR_REALM = os.environ.get("OR_REALM", "smartcity") OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token FROST_URL = os.environ.get("FROST_URL", "http://localhost:8090/FROST-Server/v1.1") # Exposer frost_http-web-1:8080 -> host:8086 +# Pulsar config (HTTP REST — pulsar-admin + producer REST API) +ENABLE_PULSAR = os.environ.get("ENABLE_PULSAR", "1").lower() in ("1", "true", "yes", "on") +PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar") +PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "8080")) +PULSAR_BASE = f"http://{PULSAR_HOST}:{PULSAR_PORT}" + +# Redpanda / Kafka config (REST Proxy HTTP) +ENABLE_REDPANDA = os.environ.get("ENABLE_REDPANDA", "1") == "1" +REDPANDA_HOST = os.environ.get("REDPANDA_HOST", "smart-city-redpanda") +REDPANDA_PORT = int(os.environ.get("REDPANDA_PORT", "8082")) +REDPANDA_BASE = f"http://{REDPANDA_HOST}:{REDPANDA_PORT}" + # InfluxDB config ENABLE_INFLUX = os.environ.get("ENABLE_INFLUX", "1") == "1" -INFLUX_URL = os.environ.get("INFLUX_URL", "http://localhost:8086") # InfluxDB exposé sur host:8086 +INFLUX_URL = os.environ.get("INFLUX_URL", "http://smart-city-influxdb:8086") # InfluxDB v2 sur smartcity-shared INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe") INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "iot_data") INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-secret-admin-token") @@ -766,6 +788,115 @@ def publish_openremote(sid: str, sensor: dict, values: dict) -> bool: } return _or_put(asset_id, payload) +# ============================================================================= +# Pulsar — HTTP REST Producer +# API: POST http://host:8080/admin/v2/persistent/public/default/{topic}/produce +# Payload: {"messages": [{"payload": "", "properties": {...}}]} +# Topics auto-créés par le premier message (Pulsar standalone) +# ============================================================================= +_pulsar_session = None + +def _get_pulsar_session(): + global _pulsar_session + if _pulsar_session is None: + import urllib.request + _pulsar_session = urllib.request + return _pulsar_session + +def _init_pulsar() -> bool: + """Teste la connectivité Pulsar au démarrage.""" + try: + import urllib.request + req = urllib.request.Request(f"{PULSAR_BASE}/admin/v2/clusters") + with urllib.request.urlopen(req, timeout=5) as resp: + if resp.status == 200: + print(f"[PULSAR] ✅ Connected to {PULSAR_BASE}") + return True + except Exception as e: + print(f"[PULSAR] ⚠️ Cannot reach {PULSAR_BASE}: {e}") + return False + +def publish_pulsar(sid: str, sensor: dict, payload: dict) -> bool: + """Publie un message sur Pulsar via l'API REST producer.""" + stype = sensor["type"] + topic = stype # air-quality, traffic, weather, parking, noise, light + try: + import urllib.request, base64 + # Pulsar REST producer attend du base64 + body = json.dumps(payload, ensure_ascii=False) + b64 = base64.b64encode(body.encode()).decode() + msg = {"messages": [{"payload": b64, "properties": {"sensor_id": sid, "source": "simulator"}}]} + url = f"{PULSAR_BASE}/admin/v2/persistent/public/default/{topic}/produce" + req = urllib.request.Request( + url, + data=json.dumps(msg).encode(), + headers={"Content-Type": "application/json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=8) as resp: + return resp.status in (200, 204) + except urllib.error.HTTPError as e: + print(f" ⚠️ Pulsar → {e.code}") + return False + except Exception as e: + print(f" ⚠️ Pulsar → {e}") + return False + +# ============================================================================= +# Redpanda / Kafka — HTTP REST Proxy +# API: POST http://host:8082/topics/{topic} +# Payload: {"records": [{"value": ""}]} +# Topics auto-créés par le premier message (Redpanda) +# ============================================================================= +_redpanda_session = None + +def _get_redpanda_session(): + global _redpanda_session + if _redpanda_session is None: + import urllib.request + _redpanda_session = urllib.request + return _redpanda_session + +def _init_redpanda() -> bool: + """Teste la connectivité Redpanda au démarrage.""" + try: + import urllib.request + req = urllib.request.Request(f"{REDPANDA_BASE}/v1/status/alive") + with urllib.request.urlopen(req, timeout=5) as resp: + if resp.status == 200: + print(f"[REDPANDA] ✅ Connected to {REDPANDA_BASE}") + return True + except Exception as e: + print(f"[REDPANDA] ⚠️ Cannot reach {REDPANDA_BASE}: {e}") + return False + +def publish_redpanda(sid: str, sensor: dict, payload: dict) -> bool: + """Publie un message sur Redpanda/Kafka via le REST Proxy.""" + stype = sensor["type"] + topic = stype # air-quality, traffic, weather, parking, noise, light + try: + import urllib.request, base64 + body = json.dumps(payload, ensure_ascii=False) + b64 = base64.b64encode(body.encode()).decode() + record = { + "records": [{"value": b64, "headers": {"sensor_id": sid, "source": "simulator"}}] + } + url = f"{REDPANDA_BASE}/topics/{topic}" + req = urllib.request.Request( + url, + data=json.dumps(record).encode(), + headers={"Content-Type": "application/vnd.api+json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=8) as resp: + return resp.status in (200, 201, 204) + except urllib.error.HTTPError as e: + print(f" ⚠️ Redpanda → {e.code}") + return False + except Exception as e: + print(f" ⚠️ Redpanda → {e}") + return False + def publish_influx(sid: str, sensor: dict, values: dict) -> bool: """Write sensor data to InfluxDB (async, non-blocking).""" if not _influx_write_api: @@ -805,6 +936,18 @@ def main(): print("╚══════════════════════════════════════════════════╝") print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s") print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}") + print(f"[CFG] InfluxDB: {ENABLE_INFLUX} | Pulsar: {ENABLE_PULSAR} | Redpanda: {ENABLE_REDPANDA}") + + # Init connectivity checks + if ENABLE_PULSAR: + _init_pulsar() + # Test immédiat + print(f" 🌪️ DEBUG: Test Pulsar direct...", flush=True) + test_payload = {"type": "test", "value": 123} + test_result = publish_pulsar("test_001", {"type": "air-quality"}, test_payload) + print(f" 🌪️ DEBUG: Test Pulsar result: {test_result}", flush=True) + if ENABLE_REDPANDA: + _init_redpanda() mqtt_client = MultiMQTT() @@ -898,6 +1041,17 @@ def main(): ok_influx = publish_influx(sid, sensor, influx_vals) print(f" 📈 InfluxDB: {'✅' if ok_influx else '❌'}") + # --- Pulsar (HTTP REST) --- + if ENABLE_PULSAR: + print(f" 🌪️ DEBUG: calling publish_pulsar for {sid}, payload_mqtt exists: {bool(locals().get('payload_mqtt'))}", flush=True) + ok_pulsar = publish_pulsar(sid, sensor, payload_mqtt) + print(f" 🌪️ Pulsar: {'✅' if ok_pulsar else '❌'}") + + # --- Redpanda (Kafka REST Proxy) --- + if ENABLE_REDPANDA: + ok_redpanda = publish_redpanda(sid, sensor, payload_mqtt) + print(f" 🐟 Redpanda: {'✅' if ok_redpanda else '❌'}") + # --- BunkerM HTTP --- if os.getenv("BUNKERM_HTTP", "0") == "1": ok_bunkerm = publish_bunkerm(sid, sensor, payload_mqtt)