From 98954e86fb5755e4916a405532b9fb68d06854a1 Mon Sep 17 00:00:00 2001 From: Eric FELIXINE Date: Tue, 5 May 2026 11:29:07 -0400 Subject: [PATCH] fix: Redpanda start.sh + FROST direct simulator + Prometheus config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Redpanda : correction start.sh (v24.3.14) - FROST : ENABLE_FROST=true dans simulator (test direct) - Pulsar : distribution.py mis à jour (mais ConnectError) - Prometheus : config ajoutée (prometheus.yml) - Grafana : datasources prêtes --- QUICK_REFERENCE.md | 56 ++++++++++ docker-compose.distribution.yml | 5 +- docker-compose.yml | 2 +- prometheus.yml | 31 ++++++ pulsar/distribution.py | 180 +++++++++++++++++++++++++++++--- redpanda/docker-compose.yml | 33 ++++-- redpanda/start.sh | 29 +++-- 7 files changed, 297 insertions(+), 39 deletions(-) create mode 100644 QUICK_REFERENCE.md create mode 100644 prometheus.yml diff --git a/QUICK_REFERENCE.md b/QUICK_REFERENCE.md new file mode 100644 index 00000000..cf25f5af --- /dev/null +++ b/QUICK_REFERENCE.md @@ -0,0 +1,56 @@ +# Smart City Digital Twin Martinique — Quick Reference + +## Architecture Actuelle + +``` +Simulateur (Python) + ↓ (pulsar-client binaire, port 6650) +Pulsar Standalone + ↓ (consumer + republish) +Pulsar Distribution Service (Python) + ├→ MQTT Brokers (EMQX :1883, Mosquitto :1883) + ├→ NGSI-LD Brokers (Orion-LD :2026, Stellio :8087) + └→ OGC SensorThings (FROST :8090) +``` + +## Commandes Utiles + +### Vérifier les services +```bash +docker ps | grep -E "(simulator|pulsar|emqx|orion|stellio|frost)" +``` + +### Voir les logs +```bash +# Simulateur +docker logs smart-city-simulator --tail 50 + +# Distribution Pulsar +docker logs smart-city-pulsar-distribution --tail 50 + +# Orion-LD +curl -s "http://localhost:2026/ngsi-ld/v1/entities?limit=3" | python3 -m json.tool +``` + +### Redémarrer un service +```bash +cd ~/smart-city-digital-twin-martinique +docker-compose -f docker-compose.yml -f docker-compose.distribution.yml restart simulator +``` + +## Prochaines Étapes + +1. **Grafana** : Configurer datasources (InfluxDB, Pulsar, FROST) + dashboards +2. **Redpanda** : Remplacer par Kafka simple ou résoudre le problème de démarrage +3. **FROST** : Corriger le format du payload (datastream_id requis) +4. **Monitoring** : Prometheus pour les métriques des stacks (pas d'ingestion payloads) + +## Git + +```bash +cd ~/smart-city-digital-twin-martinique +git add -A && git commit -m "message" && git push origin master +``` + +--- +*Dernière mise à jour : 05 Mai 2026* diff --git a/docker-compose.distribution.yml b/docker-compose.distribution.yml index 6d4dca58..d1e815b0 100644 --- a/docker-compose.distribution.yml +++ b/docker-compose.distribution.yml @@ -10,7 +10,6 @@ services: container_name: smart-city-pulsar-distribution networks: - smartcity-shared - - traefik-public environment: - PULSAR_HOST=smart-city-pulsar - PULSAR_PORT=6650 @@ -20,11 +19,11 @@ services: - STELLIO_URL=http://stellio-api-gateway:8080 - FROST_URL=http://frost-api-8090:8080/FROST-Server/v1.1 restart: unless-stopped + depends_on: + - smart-city-pulsar labels: - "traefik.enable=false" networks: - traefik-public: - external: true smartcity-shared: external: true diff --git a/docker-compose.yml b/docker-compose.yml index 5983d6fa..de8bb3c9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -26,7 +26,7 @@ services: # Context Brokers (Disabled - using Pulsar distribution) - ENABLE_ORION=false - ENABLE_STELLIO=false - - ENABLE_FROST=false + - ENABLE_FROST=true # Temporaire: test direct pour Grafana # Databases - ENABLE_INFLUX=true - INFLUX_URL=http://smart-city-influxdb:8086 diff --git a/prometheus.yml b/prometheus.yml new file mode 100644 index 00000000..6bca659a --- /dev/null +++ b/prometheus.yml @@ -0,0 +1,31 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + # Mosquitto MQTT Broker + - job_name: 'mosquitto' + static_configs: + - targets: ['mosquitto-exporter:9234'] + scrape_interval: 10s + + # Orion-LD (FIWARE) + - job_name: 'orion-ld' + static_configs: + - targets: ['fiware-gis-quickstart-orion-1:1026'] + metrics_path: '/metrics' + scrape_interval: 10s + + # FROST-Server (SensorThings) + - job_name: 'frost-server' + static_configs: + - targets: ['frost_http-web-1:8080'] + metrics_path: '/FROST-Server/metrics' + scrape_interval: 10s + + # Stellio NGSI-LD + - job_name: 'stellio' + static_configs: + - targets: ['stellio:8080'] + metrics_path: '/metrics' + scrape_interval: 10s diff --git a/pulsar/distribution.py b/pulsar/distribution.py index 932ba352..a4b43905 100644 --- a/pulsar/distribution.py +++ b/pulsar/distribution.py @@ -7,22 +7,165 @@ import json import time import urllib.request import paho.mqtt.client as mqtt +import os -PULSAR_HOST = "smart-city-pulsar" -PULSAR_PORT = 6650 +PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar") +PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "6650")) # MQTT Brokers -EMQX_HOST = "emqx_emqx_1" -EMQX_PORT = 1883 -MOSQUITTO_HOST = "mosquitto-traefik" -MOSQUITTO_PORT = 1883 +EMQX_HOST = os.environ.get("EMQX_HOST", "emqx_emqx_1") +EMQX_PORT = int(os.environ.get("EMQX_PORT", "1883")) +MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "mosquitto-traefik") +MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883")) # NGSI-LD Brokers -ORION_URL = "http://fiware-gis-quickstart-orion-1:1026" -STELLIO_URL = "http://stellio-api-gateway:8080" +ORION_URL = os.environ.get("ORION_URL", "http://fiware-gis-quickstart-orion-1:1026") +STELLIO_URL = os.environ.get("STELLIO_URL", "http://stellio-api-gateway:8080") # OGC SensorThings -FROST_URL = "http://frost-api-8090:8080/FROST-Server/v1.1" +FROST_URL = os.environ.get("FROST_URL", "http://frost-api-8090:8080/FROST-Server/v1.1") + +# Cache des Datastreams FROST créés +_frost_datastreams = {} + +def ensure_frost_datastream(sensor_type, sensor_name): + """Crée un Datastream FROST s'il n'existe pas, retourne l'@iot.id""" + cache_key = f"{sensor_type}_{sensor_name}" + if cache_key in _frost_datastreams: + return _frost_datastreams[cache_key] + + try: + # Vérifier si le Datastream existe déjà + req = urllib.request.Request( + f"{FROST_URL}/Datastreams?$filter=name eq '{sensor_name}'", + headers={"Accept": "application/json"} + ) + with urllib.request.urlopen(req, timeout=5) as resp: + data = json.loads(resp.read().decode()) + if data.get("value"): + ds_id = data["value"][0]["@iot.id"] + _frost_datastreams[cache_key] = ds_id + return ds_id + except Exception: + pass # Pas trouvé, on va créer + + # Créer le Datastream + try: + # 1. Créer ou récupérer Thing + thing_id = ensure_frost_thing("SmartCity Martinique") + + # 2. Créer ou récupérer Sensor + sensor_id = ensure_frost_sensor(sensor_type) + + # 3. Créer ou récupérer ObservedProperty + obsprop_id = ensure_frost_observed_property(sensor_type) + + # 4. Créer Datastream + datastream = { + "name": sensor_name, + "description": f"Observations for {sensor_name}", + "observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement", + "unitOfMeasurement": {"name": "Units", "symbol": "u", "definition": "http://www.opengis.net/def/uom/UCUM/"}, + "Thing": {"@iot.id": thing_id}, + "Sensor": {"@iot.id": sensor_id}, + "ObservedProperty": {"@iot.id": obsprop_id} + } + req = urllib.request.Request( + f"{FROST_URL}/Datastreams", + data=json.dumps(datastream).encode(), + headers={"Content-Type": "application/json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=5) as resp: + if resp.status in (201, 200): + # Récupérer l'ID depuis le header Location + location = resp.headers.get("Location", "") + if location: + ds_id = location.split("(")[-1].rstrip(")") + else: + # Fallback : requête GET + ds_id = ensure_frost_datastream(sensor_type, sensor_name) # Retry to get ID + _frost_datastreams[cache_key] = ds_id + return ds_id + except Exception as e: + print(f" ⚠️ FROST Create Datastream → {e}") + return None + +def ensure_frost_thing(name): + """Crée ou récupére un Thing""" + try: + req = urllib.request.Request(f"{FROST_URL}/Things?$filter=name eq '{name}'") + with urllib.request.urlopen(req, timeout=5) as resp: + data = json.loads(resp.read().decode()) + if data.get("value"): + return data["value"][0]["@iot.id"] + # Créer + thing = {"name": name, "description": "Smart City Digital Twin Martinique"} + req = urllib.request.Request( + f"{FROST_URL}/Things", + data=json.dumps(thing).encode(), + headers={"Content-Type": "application/json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=5) as resp: + if resp.status in (201, 200): + return resp.headers.get("Location", "").split("(")[-1].rstrip(")") + except Exception as e: + print(f" ⚠️ FROST Thing → {e}") + return "1" + +def ensure_frost_sensor(sensor_type): + """Crée ou récupére un Sensor""" + try: + req = urllib.request.Request(f"{FROST_URL}/Sensors?$filter=name eq '{sensor_type}'") + with urllib.request.urlopen(req, timeout=5) as resp: + data = json.loads(resp.read().decode()) + if data.get("value"): + return data["value"][0]["@iot.id"] + sensor = {"name": sensor_type, "description": f"Sensor for {sensor_type}"} + req = urllib.request.Request( + f"{FROST_URL}/Sensors", + data=json.dumps(sensor).encode(), + headers={"Content-Type": "application/json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=5) as resp: + if resp.status in (201, 200): + return resp.headers.get("Location", "").split("(")[-1].rstrip(")") + except Exception as e: + print(f" ⚠️ FROST Sensor → {e}") + return "1" + +def ensure_frost_observed_property(sensor_type): + """Crée ou récupére un ObservedProperty""" + prop_map = { + "traffic": ("Traffic Flow", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"), + "airquality": ("Air Quality", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"), + "parking": ("Parking Occupancy", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"), + "noise": ("Noise Level", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"), + "weather": ("Weather", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"), + "light": ("Light Intensity", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement") + } + name, definition = prop_map.get(sensor_type, (sensor_type, "http://example.org")) + try: + req = urllib.request.Request(f"{FROST_URL}/ObservedProperties?$filter=name eq '{name}'") + with urllib.request.urlopen(req, timeout=5) as resp: + data = json.loads(resp.read().decode()) + if data.get("value"): + return data["value"][0]["@iot.id"] + prop = {"name": name, "definition": definition, "description": f"Observed property for {sensor_type}"} + req = urllib.request.Request( + f"{FROST_URL}/ObservedProperties", + data=json.dumps(prop).encode(), + headers={"Content-Type": "application/json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=5) as resp: + if resp.status in (201, 200): + return resp.headers.get("Location", "").split("(")[-1].rstrip(")") + except Exception as e: + print(f" ⚠️ FROST ObservedProperty → {e}") + return "1" def publish_mqtt(payload_dict, host, port): """Publish to MQTT broker""" @@ -52,7 +195,6 @@ def publish_ngsi_ld(payload_dict, broker_url): except urllib.error.HTTPError as e: if e.code == 409: # Already exists, try update try: - # Update with PUT entity_id = payload_dict.get("id", "") req = urllib.request.Request( f"{broker_url}/ngsi-ld/v1/entities/{entity_id}", @@ -72,12 +214,21 @@ def publish_ngsi_ld(payload_dict, broker_url): def publish_frost(payload_dict): """Publish to FROST Server (OGC SensorThings)""" try: + sensor_type = payload_dict.get("type", "unknown") + sensor_name = payload_dict.get("name", sensor_type) + + # S'assurer que le Datastream existe + ds_id = ensure_frost_datastream(sensor_type, sensor_name) + if not ds_id: + print(f" ⚠️ FROST → No Datastream for {sensor_name}") + return False + # Convert to SensorThings format st_payload = { - "result": payload_dict.get("value", 0), + "result": payload_dict.get("value", payload_dict.get("temperature_celsius", 0)), "phenomenonTime": payload_dict.get("timestamp", ""), "resultTime": payload_dict.get("timestamp", ""), - "Datastream": {"@iot.id": payload_dict.get("datastream_id", "1")} + "Datastream": {"@iot.id": ds_id} } data = json.dumps(st_payload).encode() req = urllib.request.Request( @@ -137,9 +288,8 @@ def main(): publish_ngsi_ld(data, ORION_URL) publish_ngsi_ld(data, STELLIO_URL) - # Republish to FROST (if OGC format) - if "datastream_id" in data: - publish_frost(data) + # Republish to FROST (OGC SensorThings) + publish_frost(data) consumer.acknowledge(msg) except Exception as e: diff --git a/redpanda/docker-compose.yml b/redpanda/docker-compose.yml index b1ea6a37..2c30c332 100644 --- a/redpanda/docker-compose.yml +++ b/redpanda/docker-compose.yml @@ -1,24 +1,39 @@ # Redpanda (Kafka-compatible) — Single Node for Smart City Digital Twin Martinique # Usage: docker compose -p smart-city -f redpanda/docker-compose.yml up -d -# Ports: 19092=Kafka (host), 9644=Admin API +# Ports: 19092=Kafka (host), 9644=Admin API, 18083=Schema Registry services: redpanda: image: redpandadata/redpanda:v24.3.14 container_name: smart-city-redpanda - entrypoint: ["/bin/bash", "/start.sh"] - volumes: - - redpanda-data:/var/lib/redpanda/data - - ./start.sh:/start.sh:ro + command: + - redpanda + - start + - --overprovisioned + - --smp + - "1" + - --memory + - 1G + - --reserve-memory + - 0M + - --node-id + - "0" + - --check=false + - --kafka-addr + - internal://0.0.0.0:9092 + - --advertise-kafka-addr + - internal://smart-city-redpanda:9092 + - --rpc-addr + - internal://0.0.0.0:33145 + - --advertise-rpc-addr + - smart-city-redpanda:33145 ports: - "19092:9092" - "19644:9644" + volumes: + - redpanda-data:/var/lib/redpanda/data networks: - traefik-public - smartcity-shared - deploy: - resources: - limits: - memory: 2G healthcheck: test: ["CMD-SHELL", "curl -sf http://localhost:9644/v1/status/ready 2>/dev/null || exit 1"] interval: 30s diff --git a/redpanda/start.sh b/redpanda/start.sh index 85bb24c8..f7759967 100755 --- a/redpanda/start.sh +++ b/redpanda/start.sh @@ -1,12 +1,19 @@ #!/bin/bash -# Start Redpanda with minimal dev config -exec /usr/bin/rpk redpanda start \ - --mode dev \ - --smp 1 \ - --memory 1G \ - --overprovisioned \ - --kafka-addr 0.0.0.0:9092 \ - --advertise-kafka-addr smart-city-redpanda:9092 \ - --rpc-addr 0.0.0.0:33145 \ - --advertise-rpc-addr smart-city-redpanda:33145 \ - --check=false +# Start Redpanda - Corrected for v24.3.14 +# Generate config first, then start +/usr/bin/rpk redpanda config init --overprovisioned + +# Set configuration via rpk config set +/usr/bin/rpk config set redpanda.node_id 0 +/usr/bin/rpk config set redpanda.data_directory /var/lib/redpanda/data +/usr/bin/rpk config set redpanda.kafka_api "[{'address': '0.0.0.0', 'port': 9092}]" +/usr/bin/rpk config set redpanda.advertised_kafka_api "[{'address': 'smart-city-redpanda', 'port': 9092}]" +/usr/bin/rpk config set redpanda.admin "[{'address': '0.0.0.0', 'port': 9644}]" +/usr/bin/rpk config set redpanda.rpc_server "[{'address': '0.0.0.0', 'port': 33145}]" +/usr/bin/rpk config set redpanda.seed_servers "[]" +/usr/bin/rpk config set seastar.smp 1 +/usr/bin/rpk config set seastar.memory 1G +/usr/bin/rpk config set seastar.overprovisioned true + +# Start Redpanda +exec /usr/bin/rpk redpanda start --check=false