fix: Redpanda start.sh + FROST direct simulator + Prometheus config
- Redpanda : correction start.sh (v24.3.14) - FROST : ENABLE_FROST=true dans simulator (test direct) - Pulsar : distribution.py mis à jour (mais ConnectError) - Prometheus : config ajoutée (prometheus.yml) - Grafana : datasources prêtes
This commit is contained in:
@@ -7,22 +7,165 @@ import json
|
||||
import time
|
||||
import urllib.request
|
||||
import paho.mqtt.client as mqtt
|
||||
import os
|
||||
|
||||
PULSAR_HOST = "smart-city-pulsar"
|
||||
PULSAR_PORT = 6650
|
||||
PULSAR_HOST = os.environ.get("PULSAR_HOST", "smart-city-pulsar")
|
||||
PULSAR_PORT = int(os.environ.get("PULSAR_PORT", "6650"))
|
||||
|
||||
# MQTT Brokers
|
||||
EMQX_HOST = "emqx_emqx_1"
|
||||
EMQX_PORT = 1883
|
||||
MOSQUITTO_HOST = "mosquitto-traefik"
|
||||
MOSQUITTO_PORT = 1883
|
||||
EMQX_HOST = os.environ.get("EMQX_HOST", "emqx_emqx_1")
|
||||
EMQX_PORT = int(os.environ.get("EMQX_PORT", "1883"))
|
||||
MOSQUITTO_HOST = os.environ.get("MOSQUITTO_HOST", "mosquitto-traefik")
|
||||
MOSQUITTO_PORT = int(os.environ.get("MOSQUITTO_PORT", "1883"))
|
||||
|
||||
# NGSI-LD Brokers
|
||||
ORION_URL = "http://fiware-gis-quickstart-orion-1:1026"
|
||||
STELLIO_URL = "http://stellio-api-gateway:8080"
|
||||
ORION_URL = os.environ.get("ORION_URL", "http://fiware-gis-quickstart-orion-1:1026")
|
||||
STELLIO_URL = os.environ.get("STELLIO_URL", "http://stellio-api-gateway:8080")
|
||||
|
||||
# OGC SensorThings
|
||||
FROST_URL = "http://frost-api-8090:8080/FROST-Server/v1.1"
|
||||
FROST_URL = os.environ.get("FROST_URL", "http://frost-api-8090:8080/FROST-Server/v1.1")
|
||||
|
||||
# Cache des Datastreams FROST créés
|
||||
_frost_datastreams = {}
|
||||
|
||||
def ensure_frost_datastream(sensor_type, sensor_name):
|
||||
"""Crée un Datastream FROST s'il n'existe pas, retourne l'@iot.id"""
|
||||
cache_key = f"{sensor_type}_{sensor_name}"
|
||||
if cache_key in _frost_datastreams:
|
||||
return _frost_datastreams[cache_key]
|
||||
|
||||
try:
|
||||
# Vérifier si le Datastream existe déjà
|
||||
req = urllib.request.Request(
|
||||
f"{FROST_URL}/Datastreams?$filter=name eq '{sensor_name}'",
|
||||
headers={"Accept": "application/json"}
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
data = json.loads(resp.read().decode())
|
||||
if data.get("value"):
|
||||
ds_id = data["value"][0]["@iot.id"]
|
||||
_frost_datastreams[cache_key] = ds_id
|
||||
return ds_id
|
||||
except Exception:
|
||||
pass # Pas trouvé, on va créer
|
||||
|
||||
# Créer le Datastream
|
||||
try:
|
||||
# 1. Créer ou récupérer Thing
|
||||
thing_id = ensure_frost_thing("SmartCity Martinique")
|
||||
|
||||
# 2. Créer ou récupérer Sensor
|
||||
sensor_id = ensure_frost_sensor(sensor_type)
|
||||
|
||||
# 3. Créer ou récupérer ObservedProperty
|
||||
obsprop_id = ensure_frost_observed_property(sensor_type)
|
||||
|
||||
# 4. Créer Datastream
|
||||
datastream = {
|
||||
"name": sensor_name,
|
||||
"description": f"Observations for {sensor_name}",
|
||||
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
|
||||
"unitOfMeasurement": {"name": "Units", "symbol": "u", "definition": "http://www.opengis.net/def/uom/UCUM/"},
|
||||
"Thing": {"@iot.id": thing_id},
|
||||
"Sensor": {"@iot.id": sensor_id},
|
||||
"ObservedProperty": {"@iot.id": obsprop_id}
|
||||
}
|
||||
req = urllib.request.Request(
|
||||
f"{FROST_URL}/Datastreams",
|
||||
data=json.dumps(datastream).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST"
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
if resp.status in (201, 200):
|
||||
# Récupérer l'ID depuis le header Location
|
||||
location = resp.headers.get("Location", "")
|
||||
if location:
|
||||
ds_id = location.split("(")[-1].rstrip(")")
|
||||
else:
|
||||
# Fallback : requête GET
|
||||
ds_id = ensure_frost_datastream(sensor_type, sensor_name) # Retry to get ID
|
||||
_frost_datastreams[cache_key] = ds_id
|
||||
return ds_id
|
||||
except Exception as e:
|
||||
print(f" ⚠️ FROST Create Datastream → {e}")
|
||||
return None
|
||||
|
||||
def ensure_frost_thing(name):
|
||||
"""Crée ou récupére un Thing"""
|
||||
try:
|
||||
req = urllib.request.Request(f"{FROST_URL}/Things?$filter=name eq '{name}'")
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
data = json.loads(resp.read().decode())
|
||||
if data.get("value"):
|
||||
return data["value"][0]["@iot.id"]
|
||||
# Créer
|
||||
thing = {"name": name, "description": "Smart City Digital Twin Martinique"}
|
||||
req = urllib.request.Request(
|
||||
f"{FROST_URL}/Things",
|
||||
data=json.dumps(thing).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST"
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
if resp.status in (201, 200):
|
||||
return resp.headers.get("Location", "").split("(")[-1].rstrip(")")
|
||||
except Exception as e:
|
||||
print(f" ⚠️ FROST Thing → {e}")
|
||||
return "1"
|
||||
|
||||
def ensure_frost_sensor(sensor_type):
|
||||
"""Crée ou récupére un Sensor"""
|
||||
try:
|
||||
req = urllib.request.Request(f"{FROST_URL}/Sensors?$filter=name eq '{sensor_type}'")
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
data = json.loads(resp.read().decode())
|
||||
if data.get("value"):
|
||||
return data["value"][0]["@iot.id"]
|
||||
sensor = {"name": sensor_type, "description": f"Sensor for {sensor_type}"}
|
||||
req = urllib.request.Request(
|
||||
f"{FROST_URL}/Sensors",
|
||||
data=json.dumps(sensor).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST"
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
if resp.status in (201, 200):
|
||||
return resp.headers.get("Location", "").split("(")[-1].rstrip(")")
|
||||
except Exception as e:
|
||||
print(f" ⚠️ FROST Sensor → {e}")
|
||||
return "1"
|
||||
|
||||
def ensure_frost_observed_property(sensor_type):
|
||||
"""Crée ou récupére un ObservedProperty"""
|
||||
prop_map = {
|
||||
"traffic": ("Traffic Flow", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
|
||||
"airquality": ("Air Quality", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
|
||||
"parking": ("Parking Occupancy", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
|
||||
"noise": ("Noise Level", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
|
||||
"weather": ("Weather", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"),
|
||||
"light": ("Light Intensity", "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement")
|
||||
}
|
||||
name, definition = prop_map.get(sensor_type, (sensor_type, "http://example.org"))
|
||||
try:
|
||||
req = urllib.request.Request(f"{FROST_URL}/ObservedProperties?$filter=name eq '{name}'")
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
data = json.loads(resp.read().decode())
|
||||
if data.get("value"):
|
||||
return data["value"][0]["@iot.id"]
|
||||
prop = {"name": name, "definition": definition, "description": f"Observed property for {sensor_type}"}
|
||||
req = urllib.request.Request(
|
||||
f"{FROST_URL}/ObservedProperties",
|
||||
data=json.dumps(prop).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST"
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
if resp.status in (201, 200):
|
||||
return resp.headers.get("Location", "").split("(")[-1].rstrip(")")
|
||||
except Exception as e:
|
||||
print(f" ⚠️ FROST ObservedProperty → {e}")
|
||||
return "1"
|
||||
|
||||
def publish_mqtt(payload_dict, host, port):
|
||||
"""Publish to MQTT broker"""
|
||||
@@ -52,7 +195,6 @@ def publish_ngsi_ld(payload_dict, broker_url):
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 409: # Already exists, try update
|
||||
try:
|
||||
# Update with PUT
|
||||
entity_id = payload_dict.get("id", "")
|
||||
req = urllib.request.Request(
|
||||
f"{broker_url}/ngsi-ld/v1/entities/{entity_id}",
|
||||
@@ -72,12 +214,21 @@ def publish_ngsi_ld(payload_dict, broker_url):
|
||||
def publish_frost(payload_dict):
|
||||
"""Publish to FROST Server (OGC SensorThings)"""
|
||||
try:
|
||||
sensor_type = payload_dict.get("type", "unknown")
|
||||
sensor_name = payload_dict.get("name", sensor_type)
|
||||
|
||||
# S'assurer que le Datastream existe
|
||||
ds_id = ensure_frost_datastream(sensor_type, sensor_name)
|
||||
if not ds_id:
|
||||
print(f" ⚠️ FROST → No Datastream for {sensor_name}")
|
||||
return False
|
||||
|
||||
# Convert to SensorThings format
|
||||
st_payload = {
|
||||
"result": payload_dict.get("value", 0),
|
||||
"result": payload_dict.get("value", payload_dict.get("temperature_celsius", 0)),
|
||||
"phenomenonTime": payload_dict.get("timestamp", ""),
|
||||
"resultTime": payload_dict.get("timestamp", ""),
|
||||
"Datastream": {"@iot.id": payload_dict.get("datastream_id", "1")}
|
||||
"Datastream": {"@iot.id": ds_id}
|
||||
}
|
||||
data = json.dumps(st_payload).encode()
|
||||
req = urllib.request.Request(
|
||||
@@ -137,9 +288,8 @@ def main():
|
||||
publish_ngsi_ld(data, ORION_URL)
|
||||
publish_ngsi_ld(data, STELLIO_URL)
|
||||
|
||||
# Republish to FROST (if OGC format)
|
||||
if "datastream_id" in data:
|
||||
publish_frost(data)
|
||||
# Republish to FROST (OGC SensorThings)
|
||||
publish_frost(data)
|
||||
|
||||
consumer.acknowledge(msg)
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user