feat(simulator): real-time (1s), fix ENABLE_PULSAR, add Pulsar/Redpanda publish, fix InfluxDB URL

- Change INTERVAL to 1s for real-time sensor data
- Fix ENABLE_PULSAR comparison (accept 'true'/'false' strings)
- Add publish_pulsar() and publish_redpanda() functions
- Fix InfluxDB URL (smart-city-influxdb instead of digital-twin-influxdb)
- Add docker-compose.yml with simulator service
- Add redpanda config and start script
- Add session_resume_2026-05-05.md
This commit is contained in:
Eric FELIXINE
2026-05-05 02:53:43 -04:00
parent e618cbfcb9
commit 01c2be4930
6 changed files with 349 additions and 2 deletions

View File

@@ -15,12 +15,22 @@ Context Brokers REST:
- Stellio: stellio-api-gateway:8080 (NGSI-LD)
- FROST: frost_allinone-web-1:8080/FROST-Server/v1.1 (SensorThings)
Streaming Platforms:
- Pulsar: smart-city-pulsar:8080 (HTTP REST Producer API)
- Redpanda: smart-city-redpanda:8082 (Kafka REST Proxy)
Time-Series DB:
- InfluxDB v2: smart-city-influxdb:8086 (via influxdb-client)
Variables d'environnement:
PUBLISH_INTERVAL_SEC : intervalle de publication (défaut: 10s)
BASE_LAT / BASE_LON : coordonnées de base (défaut: Fort-de-France)
ENABLE_ORION=1 : activer Orion-LD (défaut: 1)
ENABLE_STELLIO=1 : activer Stellio (défaut: 1)
ENABLE_FROST=1 : activer FROST-Server (défaut: 1)
ENABLE_INFLUX=1 : activer InfluxDB v2 (défaut: 1)
ENABLE_PULSAR=1 : activer Apache Pulsar (défaut: 1)
ENABLE_REDPANDA=1 : activer Redpanda Kafka (défaut: 1)
"""
import os, sys, json, time, random, signal, queue, threading, ssl, urllib.parse
@@ -49,7 +59,7 @@ BUNKERM_PORT = int(os.environ.get("BUNKERM_PORT", "1900"))
# =============================================================================
BASE_LAT = float(os.environ.get("BASE_LAT", "14.6091"))
BASE_LON = float(os.environ.get("BASE_LON", "-61.2155"))
INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "10"))
INTERVAL = int(os.environ.get("PUBLISH_INTERVAL_SEC", "1")) # 1s pour temps réel
ENABLE_ORION = os.environ.get("ENABLE_ORION", "1") == "1"
ENABLE_STELLIO = os.environ.get("ENABLE_STELLIO", "1") == "1"
ENABLE_FROST = os.environ.get("ENABLE_FROST", "1") == "1"
@@ -60,9 +70,21 @@ OR_REALM = os.environ.get("OR_REALM", "smartcity")
OR_TOKEN_REALM = os.environ.get("OR_TOKEN_REALM", "master") # Realm pour obtention token
FROST_URL = os.environ.get("FROST_URL", "http://localhost:8090/FROST-Server/v1.1") # Exposer frost_http-web-1:8080 -> host:8086
# Pulsar config (HTTP REST — pulsar-admin + producer REST API)
ENABLE_PULSAR = os.environ.get("ENABLE_PULSAR", "1").lower() in ("1", "true", "yes", "on")
PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar")
PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "8080"))
PULSAR_BASE = f"http://{PULSAR_HOST}:{PULSAR_PORT}"
# Redpanda / Kafka config (REST Proxy HTTP)
ENABLE_REDPANDA = os.environ.get("ENABLE_REDPANDA", "1") == "1"
REDPANDA_HOST = os.environ.get("REDPANDA_HOST", "smart-city-redpanda")
REDPANDA_PORT = int(os.environ.get("REDPANDA_PORT", "8082"))
REDPANDA_BASE = f"http://{REDPANDA_HOST}:{REDPANDA_PORT}"
# InfluxDB config
ENABLE_INFLUX = os.environ.get("ENABLE_INFLUX", "1") == "1"
INFLUX_URL = os.environ.get("INFLUX_URL", "http://localhost:8086") # InfluxDB exposé sur host:8086
INFLUX_URL = os.environ.get("INFLUX_URL", "http://smart-city-influxdb:8086") # InfluxDB v2 sur smartcity-shared
INFLUX_ORG = os.environ.get("INFLUX_ORG", "digitribe")
INFLUX_BUCKET = os.environ.get("INFLUX_BUCKET", "iot_data")
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "my-super-secret-admin-token")
@@ -766,6 +788,115 @@ def publish_openremote(sid: str, sensor: dict, values: dict) -> bool:
}
return _or_put(asset_id, payload)
# =============================================================================
# Pulsar — HTTP REST Producer
# API: POST http://host:8080/admin/v2/persistent/public/default/{topic}/produce
# Payload: {"messages": [{"payload": "<base64>", "properties": {...}}]}
# Topics auto-créés par le premier message (Pulsar standalone)
# =============================================================================
_pulsar_session = None
def _get_pulsar_session():
global _pulsar_session
if _pulsar_session is None:
import urllib.request
_pulsar_session = urllib.request
return _pulsar_session
def _init_pulsar() -> bool:
"""Teste la connectivité Pulsar au démarrage."""
try:
import urllib.request
req = urllib.request.Request(f"{PULSAR_BASE}/admin/v2/clusters")
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status == 200:
print(f"[PULSAR] ✅ Connected to {PULSAR_BASE}")
return True
except Exception as e:
print(f"[PULSAR] ⚠️ Cannot reach {PULSAR_BASE}: {e}")
return False
def publish_pulsar(sid: str, sensor: dict, payload: dict) -> bool:
"""Publie un message sur Pulsar via l'API REST producer."""
stype = sensor["type"]
topic = stype # air-quality, traffic, weather, parking, noise, light
try:
import urllib.request, base64
# Pulsar REST producer attend du base64
body = json.dumps(payload, ensure_ascii=False)
b64 = base64.b64encode(body.encode()).decode()
msg = {"messages": [{"payload": b64, "properties": {"sensor_id": sid, "source": "simulator"}}]}
url = f"{PULSAR_BASE}/admin/v2/persistent/public/default/{topic}/produce"
req = urllib.request.Request(
url,
data=json.dumps(msg).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=8) as resp:
return resp.status in (200, 204)
except urllib.error.HTTPError as e:
print(f" ⚠️ Pulsar → {e.code}")
return False
except Exception as e:
print(f" ⚠️ Pulsar → {e}")
return False
# =============================================================================
# Redpanda / Kafka — HTTP REST Proxy
# API: POST http://host:8082/topics/{topic}
# Payload: {"records": [{"value": "<base64>"}]}
# Topics auto-créés par le premier message (Redpanda)
# =============================================================================
_redpanda_session = None
def _get_redpanda_session():
global _redpanda_session
if _redpanda_session is None:
import urllib.request
_redpanda_session = urllib.request
return _redpanda_session
def _init_redpanda() -> bool:
"""Teste la connectivité Redpanda au démarrage."""
try:
import urllib.request
req = urllib.request.Request(f"{REDPANDA_BASE}/v1/status/alive")
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status == 200:
print(f"[REDPANDA] ✅ Connected to {REDPANDA_BASE}")
return True
except Exception as e:
print(f"[REDPANDA] ⚠️ Cannot reach {REDPANDA_BASE}: {e}")
return False
def publish_redpanda(sid: str, sensor: dict, payload: dict) -> bool:
"""Publie un message sur Redpanda/Kafka via le REST Proxy."""
stype = sensor["type"]
topic = stype # air-quality, traffic, weather, parking, noise, light
try:
import urllib.request, base64
body = json.dumps(payload, ensure_ascii=False)
b64 = base64.b64encode(body.encode()).decode()
record = {
"records": [{"value": b64, "headers": {"sensor_id": sid, "source": "simulator"}}]
}
url = f"{REDPANDA_BASE}/topics/{topic}"
req = urllib.request.Request(
url,
data=json.dumps(record).encode(),
headers={"Content-Type": "application/vnd.api+json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=8) as resp:
return resp.status in (200, 201, 204)
except urllib.error.HTTPError as e:
print(f" ⚠️ Redpanda → {e.code}")
return False
except Exception as e:
print(f" ⚠️ Redpanda → {e}")
return False
def publish_influx(sid: str, sensor: dict, values: dict) -> bool:
"""Write sensor data to InfluxDB (async, non-blocking)."""
if not _influx_write_api:
@@ -805,6 +936,18 @@ def main():
print("╚══════════════════════════════════════════════════╝")
print(f"[CFG] Capteurs: {len(SENSORS)} | Intervalle: {INTERVAL}s")
print(f"[CFG] Orion-LD: {ENABLE_ORION} | Stellio: {ENABLE_STELLIO} | FROST: {ENABLE_FROST}")
print(f"[CFG] InfluxDB: {ENABLE_INFLUX} | Pulsar: {ENABLE_PULSAR} | Redpanda: {ENABLE_REDPANDA}")
# Init connectivity checks
if ENABLE_PULSAR:
_init_pulsar()
# Test immédiat
print(f" 🌪️ DEBUG: Test Pulsar direct...", flush=True)
test_payload = {"type": "test", "value": 123}
test_result = publish_pulsar("test_001", {"type": "air-quality"}, test_payload)
print(f" 🌪️ DEBUG: Test Pulsar result: {test_result}", flush=True)
if ENABLE_REDPANDA:
_init_redpanda()
mqtt_client = MultiMQTT()
@@ -898,6 +1041,17 @@ def main():
ok_influx = publish_influx(sid, sensor, influx_vals)
print(f" 📈 InfluxDB: {'' if ok_influx else ''}")
# --- Pulsar (HTTP REST) ---
if ENABLE_PULSAR:
print(f" 🌪️ DEBUG: calling publish_pulsar for {sid}, payload_mqtt exists: {bool(locals().get('payload_mqtt'))}", flush=True)
ok_pulsar = publish_pulsar(sid, sensor, payload_mqtt)
print(f" 🌪️ Pulsar: {'' if ok_pulsar else ''}")
# --- Redpanda (Kafka REST Proxy) ---
if ENABLE_REDPANDA:
ok_redpanda = publish_redpanda(sid, sensor, payload_mqtt)
print(f" 🐟 Redpanda: {'' if ok_redpanda else ''}")
# --- BunkerM HTTP ---
if os.getenv("BUNKERM_HTTP", "0") == "1":
ok_bunkerm = publish_bunkerm(sid, sensor, payload_mqtt)