feat(planner): wire DuckDB analytics + market-data endpoint

- analytics.py: open/close_analytics_db registered in app lifecycle
- GET /planner/api/market-data?city_slug=<slug>: returns per-city
  planner defaults from DuckDB planner_defaults serving table

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-21 23:26:06 +01:00
parent 47db8c2418
commit a2de1a0206
4 changed files with 128 additions and 0 deletions

View File

@@ -6,6 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased]
### Added
- `analytics.py`: DuckDB read-only reader (`open_analytics_db`, `close_analytics_db`,
`fetch_analytics`) registered in app lifecycle (startup/shutdown)
- `GET /planner/api/market-data?city_slug=<slug>`: returns per-city planner defaults from
DuckDB `planner_defaults` serving table; falls back to `{}` when analytics DB unavailable
### Added
- `transform/sqlmesh_padelnomics` workspace member: SQLMesh 4-layer model pipeline over DuckDB
- Raw: `raw_overpass_courts`, `raw_playtomic_tenants`, `raw_eurostat_population`

View File

@@ -0,0 +1,61 @@
"""
DuckDB read-only analytics reader.
Opens a single long-lived DuckDB connection at startup (read_only=True).
All queries run via asyncio.to_thread() to avoid blocking the event loop.
Usage:
from .analytics import fetch_analytics
rows = await fetch_analytics("SELECT * FROM padelnomics.planner_defaults WHERE city_slug = ?", ["berlin"])
"""
import asyncio
import os
from pathlib import Path
from typing import Any
import duckdb
_conn: duckdb.DuckDBPyConnection | None = None
_DUCKDB_PATH = os.environ.get("DUCKDB_PATH", "data/lakehouse.duckdb")
def open_analytics_db() -> None:
"""Open the DuckDB connection. Call once at app startup."""
global _conn
path = Path(_DUCKDB_PATH)
if not path.exists():
# Database doesn't exist yet — skip silently. Queries will return empty.
return
_conn = duckdb.connect(str(path), read_only=True)
def close_analytics_db() -> None:
"""Close the DuckDB connection. Call at app shutdown."""
global _conn
if _conn is not None:
_conn.close()
_conn = None
async def fetch_analytics(sql: str, params: list | None = None) -> list[dict[str, Any]]:
"""
Run a read-only DuckDB query and return rows as dicts.
Returns [] if analytics DB is unavailable (not yet built, or DUCKDB_PATH unset).
Never raises — callers should treat empty results as "no data yet".
"""
assert sql, "sql must not be empty"
if _conn is None:
return []
def _run() -> list[dict]:
rel = _conn.execute(sql, params or [])
cols = [d[0] for d in rel.description]
return [dict(zip(cols, row)) for row in rel.fetchall()]
try:
return await asyncio.to_thread(_run)
except Exception:
return []

View File

@@ -5,6 +5,7 @@ from pathlib import Path
from quart import Quart, Response, abort, g, redirect, request, session, url_for
from .analytics import close_analytics_db, open_analytics_db
from .core import close_db, config, get_csrf_token, init_db, setup_request_id
from .i18n import LANG_BLUEPRINTS, SUPPORTED_LANGS, get_translations
@@ -120,10 +121,12 @@ def create_app() -> Quart:
@app.before_serving
async def startup():
await init_db()
open_analytics_db()
@app.after_serving
async def shutdown():
await close_db()
close_analytics_db()
# -------------------------------------------------------------------------
# Per-request hooks

View File

@@ -565,3 +565,60 @@ async def export_download(export_id: int):
"Content-Disposition": f'attachment; filename="padel-business-plan-{export_id}.pdf"'
},
)
# =============================================================================
# DuckDB analytics integration — market data for planner pre-fill
# =============================================================================
@bp.route("/api/market-data")
async def market_data():
"""Return per-city planner defaults from DuckDB serving layer.
GET /planner/api/market-data?city_slug=berlin
Returns a partial DEFAULTS override dict (camelCase keys).
Returns {} when the analytics DB has no data yet — caller merges with DEFAULTS.
"""
city_slug = request.args.get("city_slug", "").strip()
if not city_slug:
return jsonify({}), 200
from ..analytics import fetch_analytics
rows = await fetch_analytics(
"SELECT * FROM padelnomics.planner_defaults WHERE city_slug = ? LIMIT 1",
[city_slug],
)
if not rows:
return jsonify({}), 200
row = rows[0]
# Map DuckDB snake_case columns → DEFAULTS camelCase keys.
# Only include fields that exist in the row and have non-null values.
col_map: dict[str, str] = {
"rate_peak": "ratePeak",
"rate_off_peak": "rateOffPeak",
"court_cost_dbl": "courtCostDbl",
"court_cost_sgl": "courtCostSgl",
"rent_sqm": "rentSqm",
"insurance": "insurance",
"electricity": "electricity",
"maintenance": "maintenance",
"marketing": "marketing",
}
overrides: dict = {}
for col, key in col_map.items():
val = row.get(col)
if val is not None:
overrides[key] = round(float(val))
# Include data quality metadata so frontend can show confidence indicator
if row.get("data_confidence") is not None:
overrides["_dataConfidence"] = round(float(row["data_confidence"]), 2)
if row.get("country_code"):
overrides["_countryCode"] = row["country_code"]
return jsonify(overrides), 200