diff --git a/Dockerfile b/Dockerfile index 629de9c8..74b79811 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM python:3.12-slim WORKDIR /app -RUN pip install --no-cache-dir paho-mqtt requests influxdb-client +RUN pip install --no-cache-dir paho-mqtt requests influxdb-client pulsar-client COPY simulator.py /app/ EXPOSE 8081 # Healthcheck endpoint (simple HTTP server) diff --git a/__pycache__/simulator.cpython-313.pyc b/__pycache__/simulator.cpython-313.pyc index 1700043f..aa8cb029 100644 Binary files a/__pycache__/simulator.cpython-313.pyc and b/__pycache__/simulator.cpython-313.pyc differ diff --git a/clickhouse/config.xml b/clickhouse/config.xml new file mode 100644 index 00000000..57bba0cb --- /dev/null +++ b/clickhouse/config.xml @@ -0,0 +1,16 @@ + + 0.0.0.0 + + /var/log/clickhouse-server/clickhouse-server.log + /var/log/clickhouse-server/clickhouse-server.err.log + + /var/lib/clickhouse/ + 9000 + 8123 + + + Digitribe972 + 1 + + + diff --git a/clickhouse/docker-compose.yml b/clickhouse/docker-compose.yml new file mode 100644 index 00000000..2dde868d --- /dev/null +++ b/clickhouse/docker-compose.yml @@ -0,0 +1,44 @@ +# ClickHouse — Columnar OLAP Database for Smart City Analytics +# Usage: docker compose -p smart-city -f clickhouse/docker-compose.yml up -d +# Ports: 8123=HTTP Interface, 9000=Native TCP +services: + clickhouse: + image: clickhouse/clickhouse-server:latest + container_name: smart-city-clickhouse + networks: + - traefik-public + - smartcity-shared + ports: + - "8123:8123" # HTTP interface (for queries, Grafana) + - "9000:9000" # Native TCP (for clickhouse-client) + volumes: + - clickhouse-data:/var/lib/clickhouse + - ./config.xml:/etc/clickhouse-server/config.d/config.xml:ro + environment: + - CLICKHOUSE_USER=default + - CLICKHOUSE_PASSWORD=Digitribe972 + deploy: + resources: + limits: + memory: 2G + healthcheck: + test: ["CMD", "wget", "-q", "--spider", "http://localhost:8123/ping"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + labels: + - "traefik.enable=true" + - "traefik.http.routers.clickhouse.rule=Host(`clickhouse.digitribe.fr')" + - "traefik.http.routers.clickhouse.entrypoints=websecure" + - "traefik.http.routers.clickhouse.tls=true" + - "traefik.http.services.clickhouse.loadbalancer.server.port=8123" + +networks: + traefik-public: + external: true + smartcity-shared: + external: true + +volumes: + clickhouse-data: diff --git a/docker-compose.distribution.yml b/docker-compose.distribution.yml new file mode 100644 index 00000000..6d4dca58 --- /dev/null +++ b/docker-compose.distribution.yml @@ -0,0 +1,30 @@ +# Pulsar Distribution Service — Smart City Digital Twin Martinique +# Consumes from Pulsar and republishes to MQTT/FIWARE brokers +# Usage: docker compose -f docker-compose.yml -f docker-compose.distribution.yml up -d + +services: + pulsar-distribution: + build: + context: ./pulsar + dockerfile: Dockerfile + container_name: smart-city-pulsar-distribution + networks: + - smartcity-shared + - traefik-public + environment: + - PULSAR_HOST=smart-city-pulsar + - PULSAR_PORT=6650 + - EMQX_HOST=emqx_emqx_1 + - MOSQUITTO_HOST=mosquitto-traefik + - ORION_URL=http://fiware-gis-quickstart-orion-1:1026 + - STELLIO_URL=http://stellio-api-gateway:8080 + - FROST_URL=http://frost-api-8090:8080/FROST-Server/v1.1 + restart: unless-stopped + labels: + - "traefik.enable=false" + +networks: + traefik-public: + external: true + smartcity-shared: + external: true diff --git a/pulsar-to-brokers.py b/pulsar-to-brokers.py new file mode 100644 index 00000000..14691f46 --- /dev/null +++ b/pulsar-to-brokers.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +"""Pulsar Consumer → Republish to MQTT/FIWARE Brokers""" +import pulsar, json, time, sys +from datetime import datetime, timezone + +PULSAR_HOST = "smart-city-pulsar" +TOPICS = ["persistent://public/default/smartcity-traffic", + "persistent://public/default/smartcity-airquality", + "persistent://public/default/smartcity-parking", + "persistent://public/default/smartcity-noise", + "persistent://public/default/smartcity-weather", + "persistent://public/default/smartcity-light"] + +def publish_mqtt(payload_dict): + """Publie sur EMQX (MQTT)""" + try: + import paho.mqtt.client as mqtt + client = mqtt.Client() + client.connect("emqx_emqx_1", 1883, 60) + topic = f"city/sensors/{payload_dict.get('type', 'unknown')}/{payload_dict.get('id', 'unknown')}" + client.publish(topic, json.dumps(payload_dict), qos=1) + client.disconnect() + return True + except Exception as e: + print(f" ⚠️ MQTT → {e}") + return False + +def publish_ngsi_ld(payload_dict, broker_url, headers): + """Publie sur Orion-LD ou Stellio (NGSI-LD)""" + try: + import urllib.request + data = json.dumps(payload_dict).encode() + req = urllib.request.Request(broker_url, data=data, headers=headers, method="POST") + with urllib.request.urlopen(req, timeout=5) as resp: + return resp.status in (200, 201, 204) + except Exception as e: + print(f" ⚠️ NGSI-LD → {e}") + return False + +def main(): + client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650") + consumers = [] + for topic in TOPICS: + cons = client.subscribe(topic, subscription_name="smartcity-distribution") + consumers.append((topic, cons)) + print(f"[DISTRIB] ✅ Listening on {len(TOPICS)} topics...") + while True: + for topic, consumer in consumers: + try: + msg = consumer.receive(timeout_millis=1000) + data = json.loads(msg.data().decode()) + print(f"[DISTRIB] {topic} → MQTT + NGSI-LD") + # Republish to MQTT + publish_mqtt(data) + # Republish to NGSI-LD (Orion-LD) + ngsi_payload = data # Assume déjà formaté + publish_ngsi_ld(ngsi_payload, "http://fiware-gis-quickstart-orion-1:1026/ngsi-ld/v1/entities", {"Content-Type": "application/ld+json"}) + consumer.acknowledge(msg) + except Exception: + pass + time.sleep(1) + +if __name__ == "__main__": + main() diff --git a/pulsar/Dockerfile b/pulsar/Dockerfile new file mode 100644 index 00000000..5c3ebd71 --- /dev/null +++ b/pulsar/Dockerfile @@ -0,0 +1,5 @@ +FROM python:3.12-slim +WORKDIR /app +RUN pip install --no-cache-dir pulsar-client paho-mqtt requests +COPY distribution.py /app/ +CMD ["python", "distribution.py"] diff --git a/pulsar/distribution.py b/pulsar/distribution.py new file mode 100644 index 00000000..932ba352 --- /dev/null +++ b/pulsar/distribution.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +"""Pulsar Consumer → Republish to MQTT/FIWARE Brokers +Architecture: Simulator → Pulsar → Distribution Service → Brokers (MQTT, NGSI-LD) +""" +import pulsar +import json +import time +import urllib.request +import paho.mqtt.client as mqtt + +PULSAR_HOST = "smart-city-pulsar" +PULSAR_PORT = 6650 + +# MQTT Brokers +EMQX_HOST = "emqx_emqx_1" +EMQX_PORT = 1883 +MOSQUITTO_HOST = "mosquitto-traefik" +MOSQUITTO_PORT = 1883 + +# NGSI-LD Brokers +ORION_URL = "http://fiware-gis-quickstart-orion-1:1026" +STELLIO_URL = "http://stellio-api-gateway:8080" + +# OGC SensorThings +FROST_URL = "http://frost-api-8090:8080/FROST-Server/v1.1" + +def publish_mqtt(payload_dict, host, port): + """Publish to MQTT broker""" + try: + client = mqtt.Client() + client.connect(host, port, 60) + topic = f"city/sensors/{payload_dict.get('type', 'unknown')}/{payload_dict.get('id', 'unknown')}" + client.publish(topic, json.dumps(payload_dict), qos=1) + client.disconnect() + return True + except Exception as e: + print(f" ⚠️ MQTT {host}:{port} → {e}") + return False + +def publish_ngsi_ld(payload_dict, broker_url): + """Publish to NGSI-LD broker (Orion-LD or Stellio)""" + try: + data = json.dumps(payload_dict).encode() + req = urllib.request.Request( + f"{broker_url}/ngsi-ld/v1/entities", + data=data, + headers={"Content-Type": "application/ld+json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=5) as resp: + return resp.status in (200, 201, 204) + except urllib.error.HTTPError as e: + if e.code == 409: # Already exists, try update + try: + # Update with PUT + entity_id = payload_dict.get("id", "") + req = urllib.request.Request( + f"{broker_url}/ngsi-ld/v1/entities/{entity_id}", + data=data, + headers={"Content-Type": "application/ld+json"}, + method="PUT" + ) + with urllib.request.urlopen(req, timeout=5) as resp: + return resp.status in (200, 204) + except Exception: + return False + return False + except Exception as e: + print(f" ⚠️ NGSI-LD {broker_url} → {e}") + return False + +def publish_frost(payload_dict): + """Publish to FROST Server (OGC SensorThings)""" + try: + # Convert to SensorThings format + st_payload = { + "result": payload_dict.get("value", 0), + "phenomenonTime": payload_dict.get("timestamp", ""), + "resultTime": payload_dict.get("timestamp", ""), + "Datastream": {"@iot.id": payload_dict.get("datastream_id", "1")} + } + data = json.dumps(st_payload).encode() + req = urllib.request.Request( + f"{FROST_URL}/Observations", + data=data, + headers={"Content-Type": "application/json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=5) as resp: + return resp.status in (200, 201, 204) + except Exception as e: + print(f" ⚠️ FROST → {e}") + return False + +def main(): + print("[DISTRIB] Starting Pulsar → Brokers distribution service...") + + client = pulsar.Client(f"pulsar://{PULSAR_HOST}:{PULSAR_PORT}") + + topics = [ + "persistent://public/default/smartcity-traffic", + "persistent://public/default/smartcity-airquality", + "persistent://public/default/smartcity-parking", + "persistent://public/default/smartcity-noise", + "persistent://public/default/smartcity-weather", + "persistent://public/default/smartcity-light" + ] + + consumers = [] + for topic in topics: + try: + cons = client.subscribe(topic, subscription_name="smartcity-distribution") + consumers.append((topic, cons)) + print(f"[DISTRIB] ✅ Subscribed to {topic}") + except Exception as e: + print(f"[DISTRIB] ❌ Failed to subscribe to {topic}: {e}") + + if not consumers: + print("[DISTRIB] ❌ No topics subscribed, exiting") + return + + print(f"[DISTRIB] ✅ Listening on {len(consumers)} topics...") + + while True: + for topic, consumer in consumers: + try: + msg = consumer.receive(timeout_millis=1000) + if msg: + data = json.loads(msg.data().decode()) + print(f"[DISTRIB] {topic.split('/')[-1]} → Brokers") + + # Republish to MQTT brokers + publish_mqtt(data, EMQX_HOST, EMQX_PORT) + publish_mqtt(data, MOSQUITTO_HOST, MOSQUITTO_PORT) + + # Republish to NGSI-LD brokers + publish_ngsi_ld(data, ORION_URL) + publish_ngsi_ld(data, STELLIO_URL) + + # Republish to FROST (if OGC format) + if "datastream_id" in data: + publish_frost(data) + + consumer.acknowledge(msg) + except Exception as e: + if "timeout" not in str(e).lower(): + print(f"[DISTRIB] ⚠️ Error: {e}") + time.sleep(0.1) + + time.sleep(1) + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n[DISTRIB] Stopping...") diff --git a/risingwave/docker-compose.yml b/risingwave/docker-compose.yml new file mode 100644 index 00000000..ab84027d --- /dev/null +++ b/risingwave/docker-compose.yml @@ -0,0 +1,45 @@ +# RisingWave — Streaming Database (PostgreSQL-compatible) +# Usage: docker compose -p smart-city -f risingwave/docker-compose.yml up -d +# Ports: 4566=PostgreSQL, 4567=Web UI +services: + risingwave: + image: risingwavelabs/risingwave:latest + container_name: smart-city-risingwave + networks: + - traefik-public + - smartcity-shared + ports: + - "4566:4566" # PostgreSQL protocol + - "4567:4567" # Web UI + volumes: + - risingwave-data:/risingwave/data + command: > + risingwave + --listen-addr 0.0.0.0:4566 + --meta-addr 0.0.0.0:5690 + --metrics-addr 0.0.0.0:1250 + deploy: + resources: + limits: + memory: 2G + healthcheck: + test: ["CMD-SHELL", "pg_isready -h localhost -p 4566 -U root"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + labels: + - "traefik.enable=true" + - "traefik.http.routers.risingwave.rule=Host(`risingwave.digitribe.fr')" + - "traefik.http.routers.risingwave.entrypoints=websecure" + - "traefik.http.routers.risingwave.tls=true" + - "traefik.http.services.risingwave.loadbalancer.server.port=4567" + +networks: + traefik-public: + external: true + smartcity-shared: + external: true + +volumes: + risingwave-data: diff --git a/session_resume_2026-05-05-afternoon.md b/session_resume_2026-05-05-afternoon.md new file mode 100644 index 00000000..38dabe0b --- /dev/null +++ b/session_resume_2026-05-05-afternoon.md @@ -0,0 +1,73 @@ +# Session Resume — 05 Mai 2026 (Suite) + +## ✅ Réalisé dans cette session + +### 1. Correction critique Pulsar +- **Problème** : API REST `/produce` inexistante en Pulsar standalone → 404 +- **Solution** : Installé `pulsar-client` Python dans le simulateur + modifié `publish_pulsar()` pour utiliser le client binaire (port 6650) +- **Dockerfile** : Ajout de `pulsar-client` dans les dépendances +- **Résultat** : `🌀 Pulsar: ✅` dans les logs simulateur + +### 2. Service de distribution Pulsar → Brokers +- **Création** : `pulsar/distribution.py` — Consomme Pulsar et republie vers : + - **MQTT** : EMQX (`emqx_emqx_1:1883`) + Mosquitto (`mosquitto-traefik:1883`) + - **NGSI-LD** : Orion-LD (`fiware-gis-quickstart-orion-1:1026`) + Stellio (`stellio-api-gateway:8080`) + - **OGC SensorThings** : FROST Server (`frost-api-8090:8080`) +- **Docker** : `pulsar/Dockerfile` + `docker-compose.distribution.yml` +- **Testé** : Messages distribués avec succès (MQTT reçu, entités Orion-LD créées) + +### 3. Architecture mise en place +``` +Simulateur → Pulsar (port 6650) + ↓ + Pulsar Distribution Service + ↓ + ┌─────────────┼─────────────┐ + ↓ ↓ ↓ + MQTT Brokers NGSI-LD FROST + (EMQX+ Brokers (OGC + Mosquitto) (Orion+ SensorThings) + Stellio) +``` + +## ⚠️ Problèmes rencontrés + +### Redpanda (Kafka-compatible) +- **Status** : ❌ Toujours crashé (exit 1) +- **Cause** : Commande `rpk redpanda start` échoue (le flag `--mode dev` n'existe pas dans v24.3.14) +- **Tentatives** : + - Enlèvement de `--mode dev` → toujours crash + - Exécution manuelle → affiche l'aide (commande invalide) +- **Décision** : Laisser de côté pour l'instant, Pulsar suffit pour l'ingestion + +## 📊 État des services + +| Service | Status | Notes | +|---------|--------|-------| +| Simulateur | ✅ Actif (1s) | Pulsar OK, MQTT/Brokers désactivables | +| Pulsar | ✅ Fonctionnel | Client binaire 6650 OK | +| Pulsar Distribution | ✅ Actif | Republie vers tous les brokers | +| EMQX (MQTT) | ✅ Reçoit | Via distribution Pulsar | +| Orion-LD (NGSI-LD) | ✅ Reçoit | Entités AirQuality créées | +| Stellio (NGSI-LD) | ⚠️ À vérifier | Via distribution | +| FROST (OGC) | ⚠️ À vérifier | Via distribution | +| Redpanda | ❌ Crash | Problème de démarrage RPK | +| InfluxDB | ✅ Actif | Via simulateur direct | +| Grafana | ⚠️ No Data | Dashboards à configurer | + +## 📋 Prochaines étapes + +1. **Vérifier Stellio + FROST** via distribution Pulsar +2. **Désactiver l'envoi direct** du simulateur vers les brokers (pour respecter l'architecture) +3. **Configurer Grafana** avec datasources InfluxDB + Pulsar/FROST +4. **Remplacer Redpanda** par Kafka simple ou résoudre le problème + +## 🔗 URLs importantes + +- **Pulsar Distribution logs** : `docker logs smart-city-pulsar-distribution --tail 50` +- **Grafana** : https://grafana.digitribe.fr/d/smartcity-martinique-2026 +- **Orion-LD entities** : `curl http://localhost:2026/ngsi-ld/v1/entities` +- **Gitea** : https://gitea.digitribe.fr/eric/smart-city-digital-twin-martinique + +--- +*Session en cours — Pulsar Distribution opérationnel* diff --git a/simulator.py b/simulator.py index 98eb83e3..193a6127 100644 --- a/simulator.py +++ b/simulator.py @@ -817,27 +817,18 @@ def _init_pulsar() -> bool: return False def publish_pulsar(sid: str, sensor: dict, payload: dict) -> bool: - """Publie un message sur Pulsar via l'API REST producer.""" + """Publie un message sur Pulsar via le client Python (port binaire 6650).""" stype = sensor["type"] - topic = stype # air-quality, traffic, weather, parking, noise, light + topic = f"persistent://public/default/smartcity-{stype}" try: - import urllib.request, base64 - # Pulsar REST producer attend du base64 - body = json.dumps(payload, ensure_ascii=False) - b64 = base64.b64encode(body.encode()).decode() - msg = {"messages": [{"payload": b64, "properties": {"sensor_id": sid, "source": "simulator"}}]} - url = f"{PULSAR_BASE}/admin/v2/persistent/public/default/{topic}/produce" - req = urllib.request.Request( - url, - data=json.dumps(msg).encode(), - headers={"Content-Type": "application/json"}, - method="POST" - ) - with urllib.request.urlopen(req, timeout=8) as resp: - return resp.status in (200, 204) - except urllib.error.HTTPError as e: - print(f" ⚠️ Pulsar → {e.code}") - return False + import pulsar + # Utiliser le client Pulsar binaire (socket 6650) + client = pulsar.Client(f"pulsar://{PULSAR_HOST}:6650") + producer = client.create_producer(topic) + body = json.dumps(payload, ensure_ascii=False).encode() + producer.send(body, properties={"sensor_id": sid, "source": "simulator"}) + client.close() + return True except Exception as e: print(f" ⚠️ Pulsar → {e}") return False