Files
cariflex/scripts/inject_weather.py
2026-06-09 18:59:37 -04:00

98 lines
2.8 KiB
Python

#!/usr/bin/env python3
"""Cariflex - Corrige l'injection des données météo."""
import json
import requests
import re
import warnings
import random
from datetime import datetime, timezone, timedelta
import pytz
warnings.filterwarnings("ignore")
FM_HOST = "https://cariflex.digitribe.fr"
CREDS_FILE = "/tmp/fm_creds.json"
MARTINIQUE_TZ = pytz.timezone("America/Martinique")
with open(CREDS_FILE) as f:
creds = json.load(f)
session = requests.Session()
session.verify = False
# Login
r = session.get(f"{FM_HOST}/login")
match = re.search(r'<input[^>]*csrf_token[^>]*value="([^"]+)"', r.text)
csrf = match.group(1)
r = session.post(f"{FM_HOST}/login", data={
"email": creds["email"], "password": creds["password"],
"csrf_token": csrf, "remember": "y"
}, allow_redirects=True)
print(f"Login: {'OK' if 'dashboard' in r.url else 'FAILED'}")
# Sensor IDs (from previous creation)
sensors = {
"irradiance": 81,
"temperature": 82,
"wind_speed": 83,
"consumption_price": 84,
"production_price": 85,
}
# Inject weather data
print("\n=== Données météo réelles ===")
LAT, LON = 14.6091, -61.2155
today = datetime.now(MARTINIQUE_TZ).strftime("%Y-%m-%d")
resp = requests.get(
f"https://api.open-meteo.com/v1/forecast?"
f"latitude={LAT}&longitude={LON}"
f"&hourly=shortwave_radiation,temperature_2m,wind_speed_10m"
f"&timezone=America/Martinique"
f"&start_date={today}&end_date={today}",
timeout=15
)
if resp.status_code == 200:
wdata = resp.json()
times = wdata["hourly"]["time"]
for sensor_name, (key, unit) in {
"irradiance": ("shortwave_radiation", "W/m²"),
"temperature": ("temperature_2m", "°C"),
"wind_speed": ("wind_speed_10m", "m/s")
}.items():
sensor_id = sensors[sensor_name]
values = wdata["hourly"][key]
posted = 0
for i, t in enumerate(times):
dt = datetime.fromisoformat(t)
val = round(max(0, values[i]), 1)
try:
r = session.post(
f"{FM_HOST}/api/v3_0/sensors/{sensor_id}/data",
json={
"values": [val],
"start": dt.isoformat(),
"duration": "PT1H",
"unit": unit
},
timeout=30
)
if r.status_code in [200, 201, 202]:
posted += 1
else:
if posted == 0:
print(f" Error response: {r.text[:100]}")
except Exception as e:
print(f" Exception: {e}")
print(f" '{sensor_name}': {posted}/{len(times)} posted")
else:
print(f" API error: {resp.status_code}")
print("\n=== Terminé ===")