Remove raw/ layer — staging models now read landing JSON directly. Rename all model schemas from padelnomics.* to staging.*/foundation.*/serving.*. Web app queries updated to serving.planner_defaults via SERVING_DUCKDB_PATH. Supervisor gets daily sleep interval between pipeline runs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
62 lines
1.7 KiB
Python
62 lines
1.7 KiB
Python
"""
|
|
DuckDB read-only analytics reader.
|
|
|
|
Opens a single long-lived DuckDB connection at startup (read_only=True).
|
|
All queries run via asyncio.to_thread() to avoid blocking the event loop.
|
|
|
|
Usage:
|
|
from .analytics import fetch_analytics
|
|
|
|
rows = await fetch_analytics("SELECT * FROM serving.planner_defaults WHERE city_slug = ?", ["berlin"])
|
|
"""
|
|
import asyncio
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import duckdb
|
|
|
|
_conn: duckdb.DuckDBPyConnection | None = None
|
|
_DUCKDB_PATH = os.environ.get("SERVING_DUCKDB_PATH", "data/analytics.duckdb")
|
|
|
|
|
|
def open_analytics_db() -> None:
|
|
"""Open the DuckDB connection. Call once at app startup."""
|
|
global _conn
|
|
path = Path(_DUCKDB_PATH)
|
|
if not path.exists():
|
|
# Database doesn't exist yet — skip silently. Queries will return empty.
|
|
return
|
|
_conn = duckdb.connect(str(path), read_only=True)
|
|
|
|
|
|
def close_analytics_db() -> None:
|
|
"""Close the DuckDB connection. Call at app shutdown."""
|
|
global _conn
|
|
if _conn is not None:
|
|
_conn.close()
|
|
_conn = None
|
|
|
|
|
|
async def fetch_analytics(sql: str, params: list | None = None) -> list[dict[str, Any]]:
|
|
"""
|
|
Run a read-only DuckDB query and return rows as dicts.
|
|
|
|
Returns [] if analytics DB is unavailable (not yet built, or DUCKDB_PATH unset).
|
|
Never raises — callers should treat empty results as "no data yet".
|
|
"""
|
|
assert sql, "sql must not be empty"
|
|
|
|
if _conn is None:
|
|
return []
|
|
|
|
def _run() -> list[dict]:
|
|
rel = _conn.execute(sql, params or [])
|
|
cols = [d[0] for d in rel.description]
|
|
return [dict(zip(cols, row)) for row in rel.fetchall()]
|
|
|
|
try:
|
|
return await asyncio.to_thread(_run)
|
|
except Exception:
|
|
return []
|