"""Reporting routes — daily, weekly, custom reports and PDF export. All endpoints require JWT authentication (Bearer token). """ from __future__ import annotations import io from datetime import datetime, timedelta, timezone from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query, status from fastapi.responses import StreamingResponse from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.auth.jwt import get_current_user from app.database import get_session from app.models.models import Alert, Sensor, SensorReading, Zone # --------------------------------------------------------------------------- # Router # --------------------------------------------------------------------------- router = APIRouter(prefix="/reports", tags=["Reporting"]) # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- async def _scalar_count( session: AsyncSession, expression, *filters, ) -> int: """Shorthand: execute a COUNT query and return the integer scalar.""" stmt = select(expression) if filters: for f in filters: stmt = stmt.where(f) result = await session.execute(stmt) return result.scalar_one() async def _build_report_data( session: AsyncSession, from_dt: datetime, to_dt: datetime, period_label: str, ) -> dict: """Aggregate IoT data for the given time window and return a report dict.""" # ── Sensor overview ────────────────────────────────────────────── total_sensors = await _scalar_count(session, func.count(Sensor.id)) active_sensors = await _scalar_count( session, func.count(Sensor.id), Sensor.status == "active" ) inactive_sensors = await _scalar_count( session, func.count(Sensor.id), Sensor.status == "inactive" ) maintenance_sensors = await _scalar_count( session, func.count(Sensor.id), Sensor.status == "maintenance" ) # ── Readings in period ─────────────────────────────────────────── readings_count = await _scalar_count( session, func.count(SensorReading.id), SensorReading.recorded_at >= from_dt, SensorReading.recorded_at <= to_dt, ) avg_value_result = await session.execute( select(func.avg(SensorReading.value)).where( SensorReading.recorded_at >= from_dt, SensorReading.recorded_at <= to_dt, ) ) avg_value: Optional[float] = avg_value_result.scalar_one() min_value_result = await session.execute( select(func.min(SensorReading.value)).where( SensorReading.recorded_at >= from_dt, SensorReading.recorded_at <= to_dt, ) ) min_value: Optional[float] = min_value_result.scalar_one() max_value_result = await session.execute( select(func.max(SensorReading.value)).where( SensorReading.recorded_at >= from_dt, SensorReading.recorded_at <= to_dt, ) ) max_value: Optional[float] = max_value_result.scalar_one() # ── Readings by sensor type ────────────────────────────────────── type_avg_rows = await session.execute( select(Sensor.type, func.avg(SensorReading.value), func.count(SensorReading.id)) .join(SensorReading, SensorReading.sensor_id == Sensor.id) .where( SensorReading.recorded_at >= from_dt, SensorReading.recorded_at <= to_dt, ) .group_by(Sensor.type) ) readings_by_type = { row[0]: {"avg_value": round(row[1], 4) if row[1] is not None else None, "count": row[2]} for row in type_avg_rows.all() } # ── Alerts in period ───────────────────────────────────────────── total_alerts = await _scalar_count( session, func.count(Alert.id), Alert.created_at >= from_dt, Alert.created_at <= to_dt, ) active_alerts = await _scalar_count( session, func.count(Alert.id), Alert.created_at >= from_dt, Alert.created_at <= to_dt, Alert.status == "active", ) resolved_alerts = await _scalar_count( session, func.count(Alert.id), Alert.created_at >= from_dt, Alert.created_at <= to_dt, Alert.status == "resolved", ) critical_alerts = await _scalar_count( session, func.count(Alert.id), Alert.created_at >= from_dt, Alert.created_at <= to_dt, Alert.severity == "critical", ) # ── Alerts by severity ─────────────────────────────────────────── severity_rows = await session.execute( select(Alert.severity, func.count(Alert.id)) .where( Alert.created_at >= from_dt, Alert.created_at <= to_dt, ) .group_by(Alert.severity) ) alerts_by_severity = {row[0]: row[1] for row in severity_rows.all()} # ── Alerts by type ─────────────────────────────────────────────── type_rows = await session.execute( select(Alert.type, func.count(Alert.id)) .where( Alert.created_at >= from_dt, Alert.created_at <= to_dt, ) .group_by(Alert.type) ) alerts_by_type = {row[0]: row[1] for row in type_rows.all()} # ── Zones ──────────────────────────────────────────────────────── total_zones = await _scalar_count(session, func.count(Zone.id)) # ── Avg battery level ──────────────────────────────────────────── battery_result = await session.execute( select(func.avg(Sensor.battery_level)).where( Sensor.battery_level.is_not(None) ) ) avg_battery: Optional[float] = battery_result.scalar_one() # ── Low-battery sensors (< 20 %) ──────────────────────────────── low_battery_count = await _scalar_count( session, func.count(Sensor.id), Sensor.battery_level.is_not(None), Sensor.battery_level < 20, ) now = datetime.now(timezone.utc) return { "period": period_label, "from": from_dt.isoformat(), "to": to_dt.isoformat(), "generated_at": now.isoformat(), "sensors": { "total": total_sensors, "active": active_sensors, "inactive": inactive_sensors, "maintenance": maintenance_sensors, "avg_battery_level": round(avg_battery, 1) if avg_battery is not None else None, "low_battery_count": low_battery_count, }, "readings": { "count": readings_count, "avg_value": round(avg_value, 4) if avg_value is not None else None, "min_value": round(min_value, 4) if min_value is not None else None, "max_value": round(max_value, 4) if max_value is not None else None, "by_type": readings_by_type, }, "zones": { "total": total_zones, }, "alerts": { "total": total_alerts, "active": active_alerts, "resolved": resolved_alerts, "critical": critical_alerts, "by_severity": alerts_by_severity, "by_type": alerts_by_type, }, } def _generate_pdf_bytes(report: dict) -> bytes: """Generate a minimal PDF (text-based) from the report data. Uses only stdlib — no external PDF dependency required. Produces a valid PDF 1.4 document with the report content as text. """ lines: list[str] = [] lines.append("=" * 60) lines.append(" SMART CITY DIGITAL TWIN — MARTINIQUE") lines.append(f" RAPPORT {report['period'].upper()}") lines.append("=" * 60) lines.append("") lines.append(f" Periode : {report['from']} -> {report['to']}") lines.append(f" Genere le : {report['generated_at']}") lines.append("") lines.append("-" * 60) lines.append(" CAPTEURS") lines.append("-" * 60) s = report["sensors"] lines.append(f" Total : {s['total']}") lines.append(f" Actifs : {s['active']}") lines.append(f" Inactifs : {s['inactive']}") lines.append(f" Maintenance : {s['maintenance']}") lines.append(f" Batterie moy. : {s['avg_battery_level']}%") lines.append(f" Batterie basse : {s['low_battery_count']} capteurs") lines.append("") lines.append("-" * 60) lines.append(" LECTURES") lines.append("-" * 60) r = report["readings"] lines.append(f" Nombre : {r['count']}") lines.append(f" Valeur moy. : {r['avg_value']}") lines.append(f" Valeur min. : {r['min_value']}") lines.append(f" Valeur max. : {r['max_value']}") if r["by_type"]: lines.append("") lines.append(" Par type de capteur:") for sensor_type, data in r["by_type"].items(): lines.append(f" - {sensor_type}: avg={data['avg_value']}, n={data['count']}") lines.append("") lines.append("-" * 60) lines.append(" ZONES") lines.append("-" * 60) lines.append(f" Total : {report['zones']['total']}") lines.append("") lines.append("-" * 60) lines.append(" ALERTES") lines.append("-" * 60) a = report["alerts"] lines.append(f" Total : {a['total']}") lines.append(f" Actives : {a['active']}") lines.append(f" Resolues : {a['resolved']}") lines.append(f" Critiques : {a['critical']}") if a["by_severity"]: lines.append("") lines.append(" Par severite:") for sev, cnt in a["by_severity"].items(): lines.append(f" - {sev}: {cnt}") if a["by_type"]: lines.append("") lines.append(" Par type:") for atype, cnt in a["by_type"].items(): lines.append(f" - {atype}: {cnt}") lines.append("") lines.append("=" * 60) lines.append(" FIN DU RAPPORT") lines.append("=" * 60) text = "\n".join(lines) + "\n" # Build a minimal valid PDF 1.4 document buf = io.BytesIO() offsets: list[int] = [] def write(s: str) -> None: buf.write(s.encode("latin-1")) # Object 1: Catalog offsets.append(buf.tell()) write("1 0 obj\n<< /Type /Catalog /Pages 2 0 R >>\nendobj\n\n") # Object 2: Pages offsets.append(buf.tell()) write("2 0 obj\n<< /Type /Pages /Kids [3 0 R] /Count 1 >>\nendobj\n\n") # Object 3: Page offsets.append(buf.tell()) write( "3 0 obj\n" "<< /Type /Page /Parent 2 0 R /MediaBox [0 0 612 792] " "/Contents 4 0 R /Resources << /Font << /F1 5 0 R >> >> >>\n" "endobj\n\n" ) # Object 4: Content stream escaped = text.replace("\\", "\\\\").replace("(", "\\(").replace(")", "\\)") stream_lines = [ "BT", "/F1 10 Tf", "50 750 Td", f"({escaped}) Tj", "ET", ] stream = "\n".join(stream_lines) + "\n" offsets.append(buf.tell()) write( f"4 0 obj\n<< /Length {len(stream)} >>\nstream\n" f"{stream}" "endstream\nendobj\n\n" ) # Object 5: Font offsets.append(buf.tell()) write( "5 0 obj\n" "<< /Type /Font /Subtype /Type1 /BaseFont /Courier >>\n" "endobj\n\n" ) # Cross-reference table xref_offset = buf.tell() write("xref\n") write(f"0 {len(offsets) + 1}\n") write("0000000000 65535 f \n") for off in offsets: write(f"{off:010d} 00000 n \n") # Trailer write( "trailer\n" f"<< /Size {len(offsets) + 1} /Root 1 0 R >>\n" "startxref\n" f"{xref_offset}\n" "%%EOF\n" ) return buf.getvalue() # =========================================================================== # GET /reports/daily # =========================================================================== @router.get( "/daily", summary="Daily report", description="Return an aggregated IoT report for a specific date (defaults to today).", ) async def daily_report( date: Optional[str] = Query( None, description="Date in YYYY-MM-DD format. Defaults to today (UTC).", pattern=r"^\d{4}-\d{2}-\d{2}$", ), session: AsyncSession = Depends(get_session), _current_user: dict = Depends(get_current_user), ) -> dict: if date is not None: try: day = datetime.strptime(date, "%Y-%m-%d").replace(tzinfo=timezone.utc) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid date format: '{date}'. Expected YYYY-MM-DD.", ) else: day = datetime.now(timezone.utc).replace( hour=0, minute=0, second=0, microsecond=0 ) from_dt = day to_dt = day + timedelta(days=1) - timedelta(microseconds=1) return await _build_report_data(session, from_dt, to_dt, period_label="daily") # =========================================================================== # GET /reports/weekly # =========================================================================== @router.get( "/weekly", summary="Weekly report", description="Return an aggregated IoT report for a specific week. " "Defaults to the current ISO week (Monday–Sunday).", ) async def weekly_report( week: Optional[int] = Query( None, ge=1, le=53, description="ISO week number (1-53). Defaults to current week.", ), year: Optional[int] = Query( None, ge=2020, le=2100, description="Year. Defaults to current year.", ), session: AsyncSession = Depends(get_session), _current_user: dict = Depends(get_current_user), ) -> dict: now = datetime.now(timezone.utc) if week is not None and year is not None: # Monday of the given ISO week from_dt = datetime.strptime(f"{year}-W{week:02d}-1", "%G-W%V-%u").replace( tzinfo=timezone.utc ) elif week is not None and year is None: from_dt = datetime.strptime( f"{now.isocalendar()[0]}-W{week:02d}-1", "%G-W%V-%u" ).replace(tzinfo=timezone.utc) else: # Current ISO week iso_year, iso_week, _ = now.isocalendar() from_dt = datetime.strptime( f"{iso_year}-W{iso_week:02d}-1", "%G-W%V-%u" ).replace(tzinfo=timezone.utc) to_dt = from_dt + timedelta(weeks=1) - timedelta(microseconds=1) return await _build_report_data(session, from_dt, to_dt, period_label="weekly") # =========================================================================== # GET /reports/custom # =========================================================================== @router.get( "/custom", summary="Custom date-range report", description="Return an aggregated IoT report for a custom time window.", ) async def custom_report( from_: datetime = Query( ..., alias="from", description="Start of time range (ISO 8601). Required.", ), to: datetime = Query( ..., description="End of time range (ISO 8601). Required.", ), session: AsyncSession = Depends(get_session), _current_user: dict = Depends(get_current_user), ) -> dict: # Ensure timezone-aware if from_.tzinfo is None: from_ = from_.replace(tzinfo=timezone.utc) if to.tzinfo is None: to = to.replace(tzinfo=timezone.utc) if to <= from_: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="'to' must be after 'from'.", ) max_range = timedelta(days=366) if (to - from_) > max_range: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Date range cannot exceed 366 days.", ) return await _build_report_data( session, from_, to, period_label="custom" ) # =========================================================================== # GET /reports/export/pdf # =========================================================================== @router.get( "/export/pdf", summary="Export report as PDF", description="Generate and download a PDF report for a given type and date.", ) async def export_pdf( type: str = Query( ..., description="Report type: daily or weekly.", pattern=r"^(daily|weekly)$", ), date: Optional[str] = Query( None, description="Date in YYYY-MM-DD. For daily: that day. For weekly: any day in the week. Defaults to today.", pattern=r"^\d{4}-\d{2}-\d{2}$", ), session: AsyncSession = Depends(get_session), _current_user: dict = Depends(get_current_user), ) -> StreamingResponse: now = datetime.now(timezone.utc) if type == "daily": if date is not None: try: day = datetime.strptime(date, "%Y-%m-%d").replace(tzinfo=timezone.utc) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid date format: '{date}'. Expected YYYY-MM-DD.", ) else: day = now.replace(hour=0, minute=0, second=0, microsecond=0) from_dt = day to_dt = day + timedelta(days=1) - timedelta(microseconds=1) period_label = "daily" filename_date = day.strftime("%Y-%m-%d") else: # weekly if date is not None: try: ref_day = datetime.strptime(date, "%Y-%m-%d").replace( tzinfo=timezone.utc ) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid date format: '{date}'. Expected YYYY-MM-DD.", ) else: ref_day = now iso_year, iso_week, _ = ref_day.isocalendar() from_dt = datetime.strptime( f"{iso_year}-W{iso_week:02d}-1", "%G-W%V-%u" ).replace(tzinfo=timezone.utc) to_dt = from_dt + timedelta(weeks=1) - timedelta(microseconds=1) period_label = "weekly" filename_date = f"W{iso_week:02d}-{iso_year}" report = await _build_report_data(session, from_dt, to_dt, period_label) pdf_bytes = _generate_pdf_bytes(report) filename = f"smart-city-report-{type}-{filename_date}.pdf" return StreamingResponse( io.BytesIO(pdf_bytes), media_type="application/pdf", headers={ "Content-Disposition": f'attachment; filename="{filename}"', }, )