fix: OpenRemote PUT 403/409, MQTTv5 callback, geojson-proxy API REST

- simulator.py: Fix MQTTv5 callback crash (5th arg *args)
- simulator.py: Fix _or_put() - GET version+realm before PUT, inject version in payload
- simulator.py: Fix token TTL (min 30s cache)
- simulator.py: Round-robin OR updates (~5 assets/iteration instead of 60)
- geojson-proxy: Rewrite using REST API instead of psycopg2 (PG auth issue)
- geojson-proxy: Add sensorType + attributes in properties for map styling
- docker-compose.yml: Add openremote_default network + DB vars for proxy
- docker-compose.yml: Add OR_REALM=master for geojson-proxy

Resolves: OpenRemote 403 (wrong realm in payload), 409 (missing version),
MQTTv5 callback crash, geojson-proxy DB connection failure
This commit is contained in:
Eric FELIXINE
2026-05-18 10:04:12 -04:00
parent 7937e2bb43
commit 47746b584c
3 changed files with 125 additions and 76 deletions

View File

@@ -64,10 +64,17 @@ services:
networks:
- smartcity-shared
- traefik-public
- openremote_default
environment:
- OR_URL=http://openremote_manager_1:8080
- OR_ADMIN_USER=admin
- OR_ADMIN_PASS=Digitribe972
- OR_REALM=master
- DB_HOST=openremote-postgresql-1
- DB_PORT=5432
- DB_NAME=openremote
- DB_USER=postgres
- DB_PASS=
labels:
- "traefik.enable=true"
- "traefik.http.routers.geojson-proxy.rule=Host(`geojson-proxy.digitribe.fr`)"

View File

@@ -1,94 +1,116 @@
#!/usr/bin/env python3
"""GeoJSON proxy service for OpenRemote assets map display."""
"""GeoJSON proxy service for OpenRemote assets map display.
Fetches IoT sensor assets from OpenRemote REST API and serves them as GeoJSON.
"""
import json
import os
import urllib.request
import urllib.error
import urllib.parse
from http.server import HTTPServer, BaseHTTPRequestHandler
OR_URL = os.environ.get("OR_URL", "http://openremote_manager_1:8080")
OR_ADMIN_USER = os.environ.get("OR_ADMIN_USER", "admin")
OR_ADMIN_PASS = os.environ.get("OR_ADMIN_PASS", "")
OR_TOKEN = os.environ.get("OR_TOKEN", "")
OR_REALM = os.environ.get("OR_REALM", "master")
OR_CLIENT_SECRET = os.environ.get("OR_CLIENT_SECRET", "0oQjzTfiEELYmj5jFwT4iIuWUDtQDvVa")
# All known IOTSensor asset IDs (from simulator ASSET_MAP + DB)
ASSET_IDS = [
"429858caca3341f56fbf65", "301218322f5aaca9d6d168", "bd35fe2a90133118b9b004",
"da59ec9301c4efd3fd55c4", "834f4b7b9df848f5c5c2d8",
"0f922351a9894bc0144c94", "4f83219bbee703b3e0a255", "381cc31ab83dd66ed4be37",
"808b73c22ecd19589a33be", "03c18679226329183b44b6",
"0ee6689f5c0499643d48eb", "8fb6b2d0601d98b47a4172", "0c00bda9e5075d12d59694",
"ae981dc9d155d1313b9acf", "96020cc5aef95c5fda7bb4",
"0be31930e45d2eb5c12ccd", "1802e76e3432d5eda1deb7", "08edb6518750d50644afe3",
"93d09bfac36d2ed95fc858", "7942726d84d2bd29de1e5d",
"9942f881ab6df375d8d9fa", "5400fdf5c51a4fe4f5a89c", "1a3bf32aa5208892e68965",
"d3725f922f96085f2df3f7", "13be192a8c23dd8fdceada",
"1f4302946b1a4a1ded23f6", "35e6ef027ed9a157ad8780", "526538589aa981bdc77ce9",
"d4a6ac7f34d64e581937c0", "40bbe989be2ae5b2a98b30",
# Additional assets from DB
"8b8f50aa8d13d65b2bafb7", "d642131a593c1cddcca3df",
"f4afeba492308772a9a1a4", "988232e4b779fd2cde2157",
"b75157bd68fde1577eda4d", "f388f67ec11e7860c352a3",
"ba30baf1fb3c69bdcc1b44",
]
_token_cache = {"token": "", "expires": 0}
def get_token():
"""Fetch an OpenRemote access token using admin credentials."""
if OR_TOKEN:
return OR_TOKEN
data = json.dumps({
import time
if _token_cache["token"] and _token_cache["expires"] > time.time() + 30:
return _token_cache["token"]
data = urllib.parse.urlencode({
"username": OR_ADMIN_USER,
"password": OR_ADMIN_PASS,
"grant_type": "password",
"client_id": "openremote"
"client_id": "openremote",
"client_secret": OR_CLIENT_SECRET
}).encode()
req = urllib.request.Request(
f"{OR_URL}/auth/realms/master/protocol/openid-connect/token",
f"http://openremote-keycloak-1:8080/auth/realms/{OR_REALM}/protocol/openid-connect/token",
data=data,
headers={"Content-Type": "application/json"},
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST"
)
resp = urllib.request.urlopen(req, timeout=10)
body = json.loads(resp.read())
return body["access_token"]
_token_cache["token"] = body["access_token"]
_token_cache["expires"] = time.time() + max(body.get("expires_in", 300) - 60, 30)
return _token_cache["token"]
def fetch_assets(token):
"""Fetch IoT sensor assets from OpenRemote."""
url = f"{OR_URL}/api/master/assets?type=IOTSensor"
req = urllib.request.Request(
url,
headers={
"Authorization": f"Bearer {token}",
"Accept": "application/json"
},
method="GET"
)
resp = urllib.request.urlopen(req, timeout=15)
return json.loads(resp.read())
def to_geojson(assets):
"""Convert OpenRemote assets to a GeoJSON FeatureCollection."""
def fetch_assets():
"""Fetch IoT sensor assets from OpenRemote REST API."""
token = get_token()
features = []
for asset in assets:
attrs = asset.get("attributes", {})
location = attrs.get("location", {})
if not location:
continue
value = location.get("value")
if not value:
continue
coords = value.get("coordinates")
if not coords or len(coords) < 2:
for asset_id in ASSET_IDS:
try:
req = urllib.request.Request(
f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}",
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"}
)
with urllib.request.urlopen(req, timeout=5) as r:
asset = json.loads(r.read().decode())
attrs = asset.get("attributes", {})
location = attrs.get("location", {})
value = location.get("value") if isinstance(location, dict) else None
coords = value.get("coordinates") if isinstance(value, dict) else None
if not coords or len(coords) < 2:
continue
props = {
"id": asset.get("id"),
"name": asset.get("name", ""),
"type": asset.get("type", ""),
"realm": asset.get("realm", ""),
}
# Add sensorType for color mapping
sensor_type = attrs.get("sensorType", {})
if isinstance(sensor_type, dict):
props["sensorType"] = sensor_type.get("value", "")
# Add scalar attribute values
for attr_name, attr_val in attrs.items():
if isinstance(attr_val, dict):
v = attr_val.get("value")
if v is not None and not isinstance(v, (dict, list)):
props[attr_name] = v
features.append({
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [coords[0], coords[1]]},
"properties": props
})
except Exception:
continue
# Build properties from asset attributes
properties = {}
for attr_name, attr_val in attrs.items():
v = attr_val.get("value")
if v is not None:
properties[attr_name] = v
# Ensure key fields are at top level for Mapbox filters
properties.setdefault("id", asset.get("id"))
properties.setdefault("name", asset.get("name", ""))
properties.setdefault("type", asset.get("type", ""))
properties.setdefault("sensorType", attrs.get("sensorType", {}).get("value", ""))
features.append({
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [coords[0], coords[1]]
},
"properties": properties
})
return {
"type": "FeatureCollection",
"features": features
}
return {"type": "FeatureCollection", "features": features}
class GeoJSONHandler(BaseHTTPRequestHandler):
@@ -96,15 +118,11 @@ class GeoJSONHandler(BaseHTTPRequestHandler):
path = self.path.split("?")[0]
if path == "/geojson":
try:
token = get_token()
assets = fetch_assets(token)
geojson = to_geojson(assets)
body = json.dumps(geojson).encode()
result = fetch_assets()
body = json.dumps(result).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "*")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)

View File

@@ -506,8 +506,8 @@ class MultiMQTT:
c.tls_insecure_set(True)
if ws:
c.ws_set(b"/mqtt")
c.on_connect = lambda _c, _, __, rc: self._on_connect(name, rc)
c.on_disconnect = lambda _c, _, __: self._on_disconnect(name)
c.on_connect = lambda _c, _, __, rc, *args: self._on_connect(name, rc)
c.on_disconnect = lambda _c, _, __, *args: self._on_disconnect(name)
try:
c.connect(host, port, keepalive=120)
c.loop_start()
@@ -791,21 +791,41 @@ def _get_or_token() -> str:
with urllib.request.urlopen(req, timeout=5) as r:
token_data = json.loads(r.read().decode())
_or_token_cache["token"] = token_data["access_token"]
_or_token_cache["expires"] = time.time() + token_data.get("expires_in", 300) - 60
_or_token_cache["expires"] = time.time() + max(token_data.get("expires_in", 300) - 60, 30)
return _or_token_cache["token"]
except Exception as e:
print(f" ⚠️ OpenRemote token → {e}")
return ""
def _or_put(asset_id: str, payload: dict) -> bool:
"""PUT update sur un asset OpenRemote (sans If-Match pour éviter 403)."""
"""PUT update sur un asset OpenRemote (avec version pour éviter 409 Conflict).
Always uses OR_REALM (master) for the API URL, which has cross-realm access.
The payload realm is corrected to match the asset's actual realm."""
token = _get_or_token()
if not token:
return False
try:
# GET current asset to obtain version and actual realm
get_url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}"
get_req = urllib.request.Request(get_url, headers={"Authorization": f"Bearer {token}"})
version = None
try:
with urllib.request.urlopen(get_req, timeout=5) as get_resp:
current = json.loads(get_resp.read().decode())
version = current.get("version")
# Correct the payload realm to match the asset's actual realm
payload["realm"] = current.get("realm", OR_REALM)
except Exception:
pass
# Inject version into payload to avoid 409 Conflict
if version is not None:
payload["version"] = version
body = json.dumps(payload).encode()
url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}"
req = urllib.request.Request(url, data=body,
# Always use OR_REALM for PUT URL (master has cross-realm access)
put_url = f"{OR_URL}/api/{OR_REALM}/asset/{asset_id}"
req = urllib.request.Request(put_url, data=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
@@ -1150,10 +1170,14 @@ def main():
if isinstance(lo, (int, float)):
or_values[field] = round(random.uniform(lo, hi), 1)
# --- OpenRemote REST ---
# --- OpenRemote REST (round-robin: seulement quelques assets/itération) ---
if ENABLE_OPENREMOTE:
ok_or = publish_openremote(sid, sensor, or_values)
print(f" 🏠 OpenRemote: {'' if ok_or else '⚠️ skipped'}")
# Mettre à jour seulement ~5 assets par itération pour éviter timeups
# sensor_num % 12 == iteration % 12 → chaque asset est mis à jour toutes les ~60s
if sensor_num % 12 == iteration % 12:
ok_or = publish_openremote(sid, sensor, or_values)
print(f" 🏠 OpenRemote: {'' if ok_or else '⚠️ skipped'}")
# else: skip OR update this iteration (asset updated in previous cycle)
# # --- Orion-LD --- (DÉSACTIVÉ: tout passe par les IoT-Agents MQTT)
# # if ENABLE_ORION: