feat(planner): wire DuckDB analytics + market-data endpoint
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -6,6 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
- `analytics.py`: DuckDB read-only reader (`open_analytics_db`, `close_analytics_db`,
|
||||
`fetch_analytics`) registered in app lifecycle (startup/shutdown)
|
||||
- `GET /planner/api/market-data?city_slug=<slug>`: returns per-city planner defaults from
|
||||
DuckDB `planner_defaults` serving table; falls back to `{}` when analytics DB unavailable
|
||||
|
||||
|
||||
### Added
|
||||
- `transform/sqlmesh_padelnomics` workspace member: SQLMesh 4-layer model pipeline over DuckDB
|
||||
- Raw: `raw_overpass_courts`, `raw_playtomic_tenants`, `raw_eurostat_population`
|
||||
|
||||
61
padelnomics/web/src/padelnomics/analytics.py
Normal file
61
padelnomics/web/src/padelnomics/analytics.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""
|
||||
DuckDB read-only analytics reader.
|
||||
|
||||
Opens a single long-lived DuckDB connection at startup (read_only=True).
|
||||
All queries run via asyncio.to_thread() to avoid blocking the event loop.
|
||||
|
||||
Usage:
|
||||
from .analytics import fetch_analytics
|
||||
|
||||
rows = await fetch_analytics("SELECT * FROM padelnomics.planner_defaults WHERE city_slug = ?", ["berlin"])
|
||||
"""
|
||||
import asyncio
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import duckdb
|
||||
|
||||
_conn: duckdb.DuckDBPyConnection | None = None
|
||||
_DUCKDB_PATH = os.environ.get("DUCKDB_PATH", "data/lakehouse.duckdb")
|
||||
|
||||
|
||||
def open_analytics_db() -> None:
|
||||
"""Open the DuckDB connection. Call once at app startup."""
|
||||
global _conn
|
||||
path = Path(_DUCKDB_PATH)
|
||||
if not path.exists():
|
||||
# Database doesn't exist yet — skip silently. Queries will return empty.
|
||||
return
|
||||
_conn = duckdb.connect(str(path), read_only=True)
|
||||
|
||||
|
||||
def close_analytics_db() -> None:
|
||||
"""Close the DuckDB connection. Call at app shutdown."""
|
||||
global _conn
|
||||
if _conn is not None:
|
||||
_conn.close()
|
||||
_conn = None
|
||||
|
||||
|
||||
async def fetch_analytics(sql: str, params: list | None = None) -> list[dict[str, Any]]:
|
||||
"""
|
||||
Run a read-only DuckDB query and return rows as dicts.
|
||||
|
||||
Returns [] if analytics DB is unavailable (not yet built, or DUCKDB_PATH unset).
|
||||
Never raises — callers should treat empty results as "no data yet".
|
||||
"""
|
||||
assert sql, "sql must not be empty"
|
||||
|
||||
if _conn is None:
|
||||
return []
|
||||
|
||||
def _run() -> list[dict]:
|
||||
rel = _conn.execute(sql, params or [])
|
||||
cols = [d[0] for d in rel.description]
|
||||
return [dict(zip(cols, row)) for row in rel.fetchall()]
|
||||
|
||||
try:
|
||||
return await asyncio.to_thread(_run)
|
||||
except Exception:
|
||||
return []
|
||||
@@ -5,6 +5,7 @@ from pathlib import Path
|
||||
|
||||
from quart import Quart, Response, abort, g, redirect, request, session, url_for
|
||||
|
||||
from .analytics import close_analytics_db, open_analytics_db
|
||||
from .core import close_db, config, get_csrf_token, init_db, setup_request_id
|
||||
from .i18n import LANG_BLUEPRINTS, SUPPORTED_LANGS, get_translations
|
||||
|
||||
@@ -120,10 +121,12 @@ def create_app() -> Quart:
|
||||
@app.before_serving
|
||||
async def startup():
|
||||
await init_db()
|
||||
open_analytics_db()
|
||||
|
||||
@app.after_serving
|
||||
async def shutdown():
|
||||
await close_db()
|
||||
close_analytics_db()
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Per-request hooks
|
||||
|
||||
@@ -565,3 +565,60 @@ async def export_download(export_id: int):
|
||||
"Content-Disposition": f'attachment; filename="padel-business-plan-{export_id}.pdf"'
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# DuckDB analytics integration — market data for planner pre-fill
|
||||
# =============================================================================
|
||||
|
||||
@bp.route("/api/market-data")
|
||||
async def market_data():
|
||||
"""Return per-city planner defaults from DuckDB serving layer.
|
||||
|
||||
GET /planner/api/market-data?city_slug=berlin
|
||||
|
||||
Returns a partial DEFAULTS override dict (camelCase keys).
|
||||
Returns {} when the analytics DB has no data yet — caller merges with DEFAULTS.
|
||||
"""
|
||||
city_slug = request.args.get("city_slug", "").strip()
|
||||
if not city_slug:
|
||||
return jsonify({}), 200
|
||||
|
||||
from ..analytics import fetch_analytics
|
||||
|
||||
rows = await fetch_analytics(
|
||||
"SELECT * FROM padelnomics.planner_defaults WHERE city_slug = ? LIMIT 1",
|
||||
[city_slug],
|
||||
)
|
||||
if not rows:
|
||||
return jsonify({}), 200
|
||||
|
||||
row = rows[0]
|
||||
|
||||
# Map DuckDB snake_case columns → DEFAULTS camelCase keys.
|
||||
# Only include fields that exist in the row and have non-null values.
|
||||
col_map: dict[str, str] = {
|
||||
"rate_peak": "ratePeak",
|
||||
"rate_off_peak": "rateOffPeak",
|
||||
"court_cost_dbl": "courtCostDbl",
|
||||
"court_cost_sgl": "courtCostSgl",
|
||||
"rent_sqm": "rentSqm",
|
||||
"insurance": "insurance",
|
||||
"electricity": "electricity",
|
||||
"maintenance": "maintenance",
|
||||
"marketing": "marketing",
|
||||
}
|
||||
|
||||
overrides: dict = {}
|
||||
for col, key in col_map.items():
|
||||
val = row.get(col)
|
||||
if val is not None:
|
||||
overrides[key] = round(float(val))
|
||||
|
||||
# Include data quality metadata so frontend can show confidence indicator
|
||||
if row.get("data_confidence") is not None:
|
||||
overrides["_dataConfidence"] = round(float(row["data_confidence"]), 2)
|
||||
if row.get("country_code"):
|
||||
overrides["_countryCode"] = row["country_code"]
|
||||
|
||||
return jsonify(overrides), 200
|
||||
|
||||
Reference in New Issue
Block a user