#!/usr/bin/env python3 """Cariflex - Corrige l'injection des données météo.""" import json import requests import re import warnings import random from datetime import datetime, timezone, timedelta import pytz warnings.filterwarnings("ignore") FM_HOST = "https://cariflex.digitribe.fr" CREDS_FILE = "/tmp/fm_creds.json" MARTINIQUE_TZ = pytz.timezone("America/Martinique") with open(CREDS_FILE) as f: creds = json.load(f) session = requests.Session() session.verify = False # Login r = session.get(f"{FM_HOST}/login") match = re.search(r']*csrf_token[^>]*value="([^"]+)"', r.text) csrf = match.group(1) r = session.post(f"{FM_HOST}/login", data={ "email": creds["email"], "password": creds["password"], "csrf_token": csrf, "remember": "y" }, allow_redirects=True) print(f"Login: {'OK' if 'dashboard' in r.url else 'FAILED'}") # Sensor IDs (from previous creation) sensors = { "irradiance": 81, "temperature": 82, "wind_speed": 83, "consumption_price": 84, "production_price": 85, } # Inject weather data print("\n=== Données météo réelles ===") LAT, LON = 14.6091, -61.2155 today = datetime.now(MARTINIQUE_TZ).strftime("%Y-%m-%d") resp = requests.get( f"https://api.open-meteo.com/v1/forecast?" f"latitude={LAT}&longitude={LON}" f"&hourly=shortwave_radiation,temperature_2m,wind_speed_10m" f"&timezone=America/Martinique" f"&start_date={today}&end_date={today}", timeout=15 ) if resp.status_code == 200: wdata = resp.json() times = wdata["hourly"]["time"] for sensor_name, (key, unit) in { "irradiance": ("shortwave_radiation", "W/m²"), "temperature": ("temperature_2m", "°C"), "wind_speed": ("wind_speed_10m", "m/s") }.items(): sensor_id = sensors[sensor_name] values = wdata["hourly"][key] posted = 0 for i, t in enumerate(times): dt = datetime.fromisoformat(t) val = round(max(0, values[i]), 1) try: r = session.post( f"{FM_HOST}/api/v3_0/sensors/{sensor_id}/data", json={ "values": [val], "start": dt.isoformat(), "duration": "PT1H", "unit": unit }, timeout=30 ) if r.status_code in [200, 201, 202]: posted += 1 else: if posted == 0: print(f" Error response: {r.text[:100]}") except Exception as e: print(f" Exception: {e}") print(f" '{sensor_name}': {posted}/{len(times)} posted") else: print(f" API error: {resp.status_code}") print("\n=== Terminé ===")