Files
smart-city-digital-twin-mar…/data-flow-diagram.md
Eric FELIXINE c06acf4fe8 feat: distribution service + redpanda consumer + updated flow diagram
- Add Pulsar distribution service (consumes smartcity-* → MQTT + context brokers)
- Add Redpanda → InfluxDB consumer (redpanda/consumer.py)
- Update FIXED_LOCATIONS with exact OpenRemote asset coordinates
- Fix Pulsar topics (underscore: smartcity-traffic not smartcity-traffic)
- Fix prometheus.yml endpoints (Redpanda:9644, comment inactive stacks)
- Add docker-compose.redpanda-consumer.yml
2026-05-05 22:12:38 -04:00

8.7 KiB
Raw Blame History

Smart City Digital Twin Martinique — Diagramme des Flux de Données

Dernière mise à jour : 05 Mai 2026
Projet : Smart City Digital Twin Martinique


Architecture Globale

graph TB
    subgraph Simulateur["🖥️  Simulateur (Host Python)"]
        SIM[Smart City Simulator<br/>10 capteurs<br/>Intervalle: configurable]
    end

    subgraph MQTT_Brokers["📡  MQTT Brokers"]
        EMQ[EMQX<br/>port 11883]
        MOS[Mosquitto<br/>port 1883]
        BUN[BunkerM<br/>port 1900<br/>MQTTS/TLS]
    end

    subgraph Stream["⚡ Event Streaming"]
        PUL[Pulsar<br/>port 6650<br/>Topics: smartcity-*]
        RED[Redpanda<br/>port 8082 REST<br/>Topics: traffic, air-quality, ...]
    end

    subgraph CB["🔗  Context Brokers"]
        ORI[Orion-LD<br/>NGSI-LD<br/>port 1026]
        STE[Stellio<br/>NGSI-LD<br/>port 8080]
        FRO[FROST-Server<br/>SensorThings<br/>port 8080]
    end

    subgraph Storage["💾  Stockage & Métriques"]
        INF[InfluxDB<br/>Bucket: iot_data<br/>port 8086]
        PRO[Prometheus<br/>Scrape: /metrics<br/>port 9090]
        GEO[GeoServer<br/>WMS/WFS/WMTS<br/>port 8080]
    end

    subgraph IoT_Platform["🏢  Plateforme IoT"]
        ORM[OpenRemote Manager<br/>MQTT Agent<br/>port 8080]
        KC[Keycloak<br/>port 8080]
    end

    subgraph VIZ["📊  Visualisation"]
        GRA[Grafana<br/>Dashboards<br/>port 3000]
        MAP[MapStore<br/>WMS/WFS<br/>port 8080]
    end

    subgraph Distribution["🔄  Distribution Service"]
        DIST[Pulsar Distribution<br/>Pulsar → Brokers]
    end

    subgraph Consumer["📥  Redpanda Consumer"]
        RCONS[Redpanda → InfluxDB<br/>REST → InfluxDB]
    end

    %% ── Flux Simulateur ──────────────────────────────────────────────────
    SIM -->|"1⃣  MQTT publish<br/>city/sensors/{type}/{id}"| EMQ
    SIM -->|"1⃣  MQTT publish"| MOS
    SIM -->|"1⃣  MQTT publish"| BUN
    SIM -->|"2⃣  HTTP POST<br/>NGSI-LD"| ORI
    SIM -->|"2⃣  HTTP POST<br/>NGSI-LD"| STE
    SIM -->|"2⃣  HTTP POST<br/>SensorThings"| FRO
    SIM -->|"3⃣  Pulsar client<br/>pulsar://localhost:6650"| PUL
    SIM -->|"4⃣  HTTP REST Proxy<br/>localhost:8082/topics/"| RED
    SIM -->|"5⃣  InfluxDB v2 API<br/>async non-bloquant"| INF

    %% ── Flux Distribution (Pulsar → Brokers) ──────────────────────────────
    PUL -->|"Consomme<br/>smartcity-*"| DIST
    DIST -->|"Republish<br/>MQTT"| EMQ
    DIST -->|"Republish<br/>MQTT"| MOS
    DIST -->|"Republish<br/>NGSI-LD"| ORI
    DIST -->|"Republish<br/>NGSI-LD"| STE
    DIST -->|"Republish<br/>SensorThings"| FRO

    %% ── Flux Redpanda → InfluxDB ──────────────────────────────────────────
    RED -->|"REST poll<br/>topics/{name}/offsets"| RCONS
    RCONS -->|"Line Protocol<br/>Write API"| INF

    %% ── OpenRemote MQTT Agent ──────────────────────────────────────────────
    EMQ -->|"6⃣  Subscribe<br/>city/sensors/#"| ORM
    MOS -->|"6⃣  Subscribe"| ORM
    BUN -->|"6⃣  Subscribe"| ORM

    %% ── Métriques Prometheus ────────────────────────────────────────────────
    SIM -->|"7⃣  /metrics<br/>port 8001"| PRO
    EMQ -->|"/api/v5/metrics"| PRO
    STE -->|"/actuator/prometheus"| PRO
    FRO -->|"/metrics"| PRO
    INF -->|"/metrics"| PRO
    RED -->|"/public_metrics"| PRO
    ORM -->|"/actuator/prometheus"| PRO
    GRA -->|"/metrics"| PRO

    %% ── Visualisation ─────────────────────────────────────────────────────
    INF -->|"Datasources<br/>Flux IoT"| GRA
    ORI -->|"NGSI-LD<br/>Datasource"| GRA
    STE -->|"NGSI-LD<br/>Datasource"| GRA
    FRO -->|"SensorThings<br/>Datasource"| GRA
    GEO -->|"WMS/WMTS"| MAP
    ORM -->|MapSettings<br/>Martinique| MAP
    ORM -->|"Live assets<br/>REST"| GRA

Flux Détaillés

1 Flux MQTT — Brokers

Broker Port Protocol Topics
EMQX 11883 MQTT city/sensors/{type}/{id}
Mosquitto 1883 MQTT city/sensors/{type}/{id}
BunkerM 1900 MQTTS (TLS) city/sensors/{type}/{id}

Le simulateur publie simultanément sur les 3 brokers.

2 Flux HTTP REST — Context Brokers

Broker Format Port Topics
Orion-LD NGSI-LD 1026 Entités par type
Stellio NGSI-LD 8080 Entités par type
FROST-Server SensorThings 8080 Things → Datastreams → Observations

3 Flux Pulsar — Event Streaming

  • Topics : persistent://public/default/smartcity-traffic, smartcity-airquality, smartcity-parking, smartcity-noise, smartcity-weather, smartcity-light
  • Port binaire : 6650 (connectable depuis le host)
  • Distribution : Le service pulsar-distribution consomme ces topics et republie vers les brokers MQTT et context brokers

4 Flux Redpanda — Kafka-compatible REST

  • REST Proxy : http://localhost:8082
  • Topics : traffic, air-quality, parking, noise, weather, air-quality
  • Payload : Base64(JSON) dans {"records": [{"value": "<base64>"}]}
  • Consumer : redpanda/consumer.py — poll toutes les 10s et écrit dans InfluxDB

5 Flux InfluxDB — Temps Réel

  • API : http://localhost:8086/api/v2/write
  • Bucket : iot_data
  • Org : digitribe
  • Mode : Asynchrone (thread daemon) pour ne pas bloquer le publish MQTT

6 OpenRemote — MQTT Agent

L'agent MQTT d'OpenRemote souscrit aux topics city/sensors/# sur les brokers MQTT (EMQX, Mosquitto, BunkerM). Les payloads sont automatiquement parsés et les attributs des assets sont mis à jour.

Configuration via Manager UI (https://openremote.digitribe.fr/manager/) :

  1. Se connecter avec admin/Digitribe972
  2. Choisir le realm smartcity
  3. Assets → Agents → + Add Agent
  4. Type : MQTT Agent
  5. Configurer :
    • MQTT Broker URI : tcp://emqx_emqx_1:1883 (réseau smartcity-shared)
    • Topic Filter : city/sensors/#
    • QoS : 1
    • Enabled :

7 Flux Prometheus — Métriques

Service Endpoint /metrics Scrape
Simulator localhost:8001
EMQX emqx_emqx_1:8081/api/v5/metrics
Stellio stellio-api-gateway:8080/actuator/prometheus
FROST frost_http-web-1:8080/metrics
InfluxDB smart-city-influxdb:8086/metrics
Redpanda smart-city-redpanda-console:8080/public_metrics
OpenRemote openremote-manager-1:8080/actuator/prometheus
Grafana smart-city-grafana:3000/metrics

Tableau Récapitulatif

Composant Technologie Port Statut
Simulator Python + paho-mqtt Host:8001 (metrics) Actif
EMQX MQTT Broker 11883 Connecté
Mosquitto MQTT Broker 1883 Connecté
BunkerM MQTTS Broker 1900 Connecté
Orion-LD NGSI-LD Broker 1026 Données
Stellio NGSI-LD Broker 8080 Données
FROST-Server SensorThings API 8080 Données
OpenRemote IoT Platform 8080 UI OK
InfluxDB Time Series DB 8086 Bucket iot_data
Redpanda Kafka-compatible 8082 REST Topics actifs
Pulsar Event Streaming 6650 Connecté
Prometheus Metrics 9090 (conf) Container arrêté
Grafana Visualisation 3000 Dashboards
GeoServer Geo Data 8080 REST OK
MapStore Cartographie 8080 WMS/WMTS

Commandes Utiles

# Redémarrer le service de distribution Pulsar
cd ~/smart-city-digital-twin-martinique
docker build -t smart-city-pulsar-distribution:latest -f pulsar/Dockerfile pulsar/
docker compose -f docker-compose.yml -f docker-compose.distribution.yml up -d pulsar-distribution

# Redémarrer Prometheus (prometheus-brokers)
cd ~/smart-city-digital-twin-martinique
docker compose up -d prometheus-brokers

# Lancer le consumer Redpanda (host)
cd ~/smart-city-digital-twin-martinique
python3 redpanda/consumer.py

# Vérifier les topics Redpanda
curl -s http://localhost:8082/topics

# Vérifier les métriques simulator
curl -s http://localhost:8001/metrics | grep "^simulator_"

# Logs distribution service
docker logs -f smart-city-pulsar-distribution