From 2db66efe774fc22381fbd2a3a9edcb0b057f2406 Mon Sep 17 00:00:00 2001 From: Deeman Date: Sun, 22 Feb 2026 19:04:40 +0100 Subject: [PATCH] feat: migrate transform to 3-layer architecture with per-layer schemas MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove raw/ layer — staging models now read landing JSON directly. Rename all model schemas from padelnomics.* to staging.*/foundation.*/serving.*. Web app queries updated to serving.planner_defaults via SERVING_DUCKDB_PATH. Supervisor gets daily sleep interval between pipeline runs. Co-Authored-By: Claude Opus 4.6 --- .claude/CLAUDE.md | 14 +- CHANGELOG.md | 26 ++++ extract/padelnomics_extract/README.md | 131 +++++++++++------- infra/supervisor/supervisor.sh | 3 + transform/sqlmesh_padelnomics/README.md | 96 +++++++------ .../models/foundation/dim_cities.sql | 6 +- .../models/foundation/dim_venues.sql | 6 +- .../sqlmesh_padelnomics/models/raw/README.md | 6 - .../models/raw/raw_eurostat_population.sql | 64 --------- .../models/raw/raw_overpass_courts.sql | 42 ------ .../models/raw/raw_playtomic_tenants.sql | 35 ----- .../models/serving/city_market_profile.sql | 4 +- .../models/serving/planner_defaults.sql | 4 +- .../models/staging/stg_padel_courts.sql | 64 +++++---- .../models/staging/stg_playtomic_venues.sql | 42 ++++-- .../models/staging/stg_population.sql | 54 +++++++- web/src/padelnomics/analytics.py | 4 +- web/src/padelnomics/planner/routes.py | 2 +- .../padelnomics/scripts/refresh_from_daas.py | 4 +- 19 files changed, 306 insertions(+), 301 deletions(-) delete mode 100644 transform/sqlmesh_padelnomics/models/raw/README.md delete mode 100644 transform/sqlmesh_padelnomics/models/raw/raw_eurostat_population.sql delete mode 100644 transform/sqlmesh_padelnomics/models/raw/raw_overpass_courts.sql delete mode 100644 transform/sqlmesh_padelnomics/models/raw/raw_playtomic_tenants.sql diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md index d936ee1..d22d026 100644 --- a/.claude/CLAUDE.md +++ b/.claude/CLAUDE.md @@ -17,7 +17,7 @@ External APIs → extract → landing zone → SQLMesh transform → DuckDB → - `web/` — Quart + HTMX web application (auth, billing, dashboard) - `extract/padelnomics_extract/` — data extraction to local landing zone -- `transform/sqlmesh_padelnomics/` — 4-layer SQL transformation (raw → staging → foundation → serving) +- `transform/sqlmesh_padelnomics/` — 3-layer SQL transformation (staging → foundation → serving) - `src/padelnomics/` — CLI utilities, export_serving helper @@ -27,10 +27,10 @@ External APIs → extract → landing zone → SQLMesh transform → DuckDB → Use the **`data-engineer`** skill for: - Designing or reviewing SQLMesh model logic -- Adding a new data source (extract + raw + staging models) +- Adding a new data source (extract + staging model) - Performance tuning DuckDB queries - Data modeling decisions (dimensions, facts, aggregates) -- Understanding the 4-layer architecture +- Understanding the 3-layer architecture ``` /data-engineer (or ask Claude to invoke it) @@ -79,16 +79,18 @@ DUCKDB_PATH=local.duckdb SERVING_DUCKDB_PATH=analytics.duckdb \ | Topic | File | |-------|------| | Extraction patterns, state tracking, adding new sources | `extract/padelnomics_extract/README.md` | -| 4-layer SQLMesh architecture, materialization strategy | `transform/sqlmesh_padelnomics/README.md` | +| 3-layer SQLMesh architecture, materialization strategy | `transform/sqlmesh_padelnomics/README.md` | | Two-file DuckDB architecture (SQLMesh lock isolation) | `src/padelnomics/export_serving.py` docstring | ## Pipeline data flow ``` data/landing/ - └── padelnomics/{year}/{etag}.csv.gz ← extraction output + ├── overpass/{year}/{month}/courts.json.gz + ├── eurostat/{year}/{month}/urb_cpop1.json.gz + └── playtomic/{year}/{month}/tenants.json.gz -local.duckdb ← SQLMesh exclusive (raw → staging → foundation → serving) +data/lakehouse.duckdb ← SQLMesh exclusive (staging → foundation → serving) analytics.duckdb ← serving tables only, web app read-only └── serving.* ← atomically replaced by export_serving.py diff --git a/CHANGELOG.md b/CHANGELOG.md index d7d6bea..2f49bee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,32 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] +### Changed +- **Extraction: one file per source** — replaced monolithic `execute.py` with per-source + modules (`overpass.py`, `eurostat.py`, `playtomic_tenants.py`, `playtomic_availability.py`); + each module has its own CLI entry point (`extract-overpass`, `extract-eurostat`, etc.); + shared boilerplate extracted to `_shared.py` with `run_extractor()` wrapper that handles + SQLite state tracking, logging, and session management +- **Transform: 4-layer → 3-layer** — removed `raw/` layer; staging models now read landing + zone JSON files directly via `read_json()` with `@LANDING_DIR` variable; model schemas + renamed from `padelnomics.*` to per-layer namespaces (`staging.*`, `foundation.*`, `serving.*`) +- **Two-DuckDB architecture** — web app now reads from `SERVING_DUCKDB_PATH` (analytics.duckdb) + instead of `DUCKDB_PATH` (lakehouse.duckdb); `export_serving.py` atomically swaps serving + tables after each transform run +- Supervisor: added daily sleep interval between pipeline runs + +### Added +- **Playtomic availability extractor** (`playtomic_availability.py`) — daily next-day booking + slot snapshots for occupancy rate estimation and pricing benchmarking; reads tenant IDs from + latest `tenants.json.gz`, queries `/v1/availability` per venue with 2s throttle, resumable + via cursor, bounded at 10K venues per run +- Template sync: copier update v0.9.0 → v0.10.0 — `export_serving.py` module, + `@padelnomics_glob()` macro, `setup_server.sh`, supervisor export_serving step + +### Removed +- `extract/.../execute.py` — replaced by per-source modules +- `models/raw/` directory — raw layer eliminated; staging reads landing files directly + ### Added - Template sync: copier update from `29ac25b` → `v0.9.0` (29 template commits) - `.claude/CLAUDE.md`: project-specific Claude Code instructions (skills, commands, architecture) diff --git a/extract/padelnomics_extract/README.md b/extract/padelnomics_extract/README.md index b870176..4a6d75e 100644 --- a/extract/padelnomics_extract/README.md +++ b/extract/padelnomics_extract/README.md @@ -5,86 +5,121 @@ Fetches raw data from external sources to the local landing zone. The pipeline t ## Running ```bash -# One-shot (most recent data only) +# Run all extractors sequentially LANDING_DIR=data/landing uv run extract -# First-time full backfill (add your own backfill entry point) -LANDING_DIR=data/landing uv run python -m padelnomics_extract.execute +# Run a single extractor +LANDING_DIR=data/landing uv run extract-overpass +LANDING_DIR=data/landing uv run extract-eurostat +LANDING_DIR=data/landing uv run extract-playtomic-tenants +LANDING_DIR=data/landing uv run extract-playtomic-availability ``` +## Architecture: one file per source + +Each data source lives in its own module with a dedicated CLI entry point: + +``` +src/padelnomics_extract/ +├── __init__.py +├── _shared.py # LANDING_DIR, logger, run_extractor() wrapper +├── utils.py # SQLite state tracking, atomic I/O helpers +├── overpass.py # OSM padel courts via Overpass API +├── eurostat.py # Eurostat city demographics (urb_cpop1, ilc_di03) +├── playtomic_tenants.py # Playtomic venue listings (tenant search) +├── playtomic_availability.py # Playtomic booking slots (next-day availability) +└── all.py # Runs all extractors sequentially +``` + +### Adding a new extractor + +1. Create `my_source.py` following the pattern: + +```python +from ._shared import run_extractor, setup_logging +from .utils import landing_path, write_gzip_atomic + +logger = setup_logging("padelnomics.extract.my_source") +EXTRACTOR_NAME = "my_source" + +def extract(landing_dir, year_month, conn, session): + """Returns {"files_written": N, "bytes_written": N, ...}.""" + year, month = year_month.split("/") + dest_dir = landing_path(landing_dir, "my_source", year, month) + # ... fetch data, write to dest_dir ... + return {"files_written": 1, "files_skipped": 0, "bytes_written": n} + +def main(): + run_extractor(EXTRACTOR_NAME, extract) +``` + +2. Add entry point to `pyproject.toml`: +```toml +extract-my-source = "padelnomics_extract.my_source:main" +``` + +3. Import in `all.py` and add to `EXTRACTORS` list. + +4. Add a staging model in `transform/sqlmesh_padelnomics/models/staging/`. + ## Design: filesystem as state -The landing zone is an append-only store of raw files. Each file is named by its content fingerprint (etag or SHA256 hash), so: +The landing zone is an append-only store of raw files: - **Idempotency**: running twice writes nothing if the source hasn't changed -- **Debugging**: every historical raw file is preserved — reprocess any window by re-running transforms +- **Debugging**: every historical raw file is preserved - **Safety**: extraction never mutates existing files, only appends new ones -### Etag-based dedup (preferred) +### Etag-based dedup (Eurostat) -When the source provides an `ETag` header, use it as the filename: +When the source provides an `ETag` header, store it in a sibling `.etag` file. +On the next request, send `If-None-Match` — 304 means skip. -``` -data/landing/padelnomics/{year}/{month:02d}/{etag}.csv.gz -``` +### Content-addressed (Overpass, Playtomic) -The file existing on disk means the content matches the server's current version. No content download needed. - -### Hash-based dedup (fallback) - -When the source has no etag (static files that update in-place), download the content and use its SHA256 prefix as the filename: - -``` -data/landing/padelnomics/{year}/{date}_{sha256[:8]}.csv.gz -``` - -Two runs that produce identical content produce the same hash → same filename → skip. +Files named by date or content. `write_gzip_atomic()` writes to a `.tmp` sibling +then renames — never leaves partial files on crash. ## State tracking -Every run writes one row to `data/landing/.state.sqlite`. Query it to answer operational questions: +Every run writes one row to `data/landing/.state.sqlite`: ```bash -# When did extraction last succeed? sqlite3 data/landing/.state.sqlite \ - "SELECT extractor, started_at, status, files_written, files_skipped, cursor_value + "SELECT extractor, started_at, status, files_written, cursor_value FROM extraction_runs ORDER BY run_id DESC LIMIT 10" - -# Did anything fail in the last 7 days? -sqlite3 data/landing/.state.sqlite \ - "SELECT * FROM extraction_runs WHERE status = 'failed' - AND started_at > datetime('now', '-7 days')" ``` -State table schema: - | Column | Type | Description | |--------|------|-------------| | `run_id` | INTEGER | Auto-increment primary key | -| `extractor` | TEXT | Extractor name (e.g. `padelnomics`) | +| `extractor` | TEXT | Extractor name (e.g. `overpass`, `eurostat`) | | `started_at` | TEXT | ISO 8601 UTC timestamp | -| `finished_at` | TEXT | ISO 8601 UTC timestamp, NULL if still running | +| `finished_at` | TEXT | ISO 8601 UTC timestamp | | `status` | TEXT | `running` → `success` or `failed` | | `files_written` | INTEGER | New files written this run | -| `files_skipped` | INTEGER | Files already present (content unchanged) | +| `files_skipped` | INTEGER | Files already present | | `bytes_written` | INTEGER | Compressed bytes written | -| `cursor_value` | TEXT | Last successful cursor (date, etag, page, etc.) | -| `error_message` | TEXT | Exception message if status = `failed` | - -## Adding a new extractor - -1. Add a function in `execute.py` following the same pattern as `extract_file_by_etag()` or `extract_file_by_hash()` -2. Call it from `extract_dataset()` with its own `extractor` name in `start_run()` -3. Store files under a new subdirectory: `landing_path(LANDING_DIR, "my_new_source", year)` -4. Add a new SQLMesh `raw/` model that reads from the new subdirectory glob +| `cursor_value` | TEXT | Resume cursor (date, index, etc.) | +| `error_message` | TEXT | Exception message if failed | ## Landing zone structure ``` data/landing/ -├── .state.sqlite # extraction run history -└── padelnomics/ # one subdirectory per source - └── {year}/ - └── {month:02d}/ - └── {etag}.csv.gz # immutable, content-addressed files +├── .state.sqlite +├── overpass/{year}/{month}/courts.json.gz +├── eurostat/{year}/{month}/urb_cpop1.json.gz +├── eurostat/{year}/{month}/ilc_di03.json.gz +├── playtomic/{year}/{month}/tenants.json.gz +└── playtomic/{year}/{month}/availability_{date}.json.gz ``` + +## Data sources + +| Source | Module | Schedule | Notes | +|--------|--------|----------|-------| +| Overpass API | `overpass.py` | Daily | OSM padel courts, ~5K nodes | +| Eurostat | `eurostat.py` | Daily (304 most runs) | urb_cpop1, ilc_di03 — etag dedup | +| Playtomic tenants | `playtomic_tenants.py` | Daily | ~8K venues, bounded pagination | +| Playtomic availability | `playtomic_availability.py` | Daily | Next-day slots, ~4.5h runtime | diff --git a/infra/supervisor/supervisor.sh b/infra/supervisor/supervisor.sh index 276a82e..5bd849b 100644 --- a/infra/supervisor/supervisor.sh +++ b/infra/supervisor/supervisor.sh @@ -50,5 +50,8 @@ do "$ALERT_WEBHOOK_URL" 2>/dev/null || true fi sleep 600 # back off 10 min on failure + continue } + + sleep 86400 # run once per day done diff --git a/transform/sqlmesh_padelnomics/README.md b/transform/sqlmesh_padelnomics/README.md index 9f83f58..fee287d 100644 --- a/transform/sqlmesh_padelnomics/README.md +++ b/transform/sqlmesh_padelnomics/README.md @@ -1,6 +1,6 @@ # Padelnomics Transform (SQLMesh) -4-layer SQL transformation pipeline using SQLMesh + DuckDB. Reads from the landing zone, produces analytics-ready tables consumed by the web app. +3-layer SQL transformation pipeline using SQLMesh + DuckDB. Reads from the landing zone, produces analytics-ready tables consumed by the web app via an atomically-swapped serving DB. ## Running @@ -16,42 +16,41 @@ uv run sqlmesh -p transform/sqlmesh_padelnomics test # Format SQL uv run sqlmesh -p transform/sqlmesh_padelnomics format + +# Export serving tables to analytics.duckdb (run after SQLMesh) +DUCKDB_PATH=data/lakehouse.duckdb SERVING_DUCKDB_PATH=data/analytics.duckdb \ + uv run python -m padelnomics.export_serving ``` -## 4-layer architecture +## 3-layer architecture ``` landing/ ← raw files (extraction output) - └── padelnomics/ - └── {year}/{etag}.csv.gz + ├── overpass/*/*/courts.json.gz + ├── eurostat/*/*/urb_cpop1.json.gz + └── playtomic/*/*/tenants.json.gz -raw/ ← reads files verbatim - └── raw.padelnomics - -staging/ ← type casting, deduplication - └── staging.stg_padelnomics +staging/ ← reads landing files directly, type casting, dedup + ├── staging.stg_padel_courts + ├── staging.stg_playtomic_venues + └── staging.stg_population foundation/ ← business logic, dimensions, facts - └── foundation.dim_category + ├── foundation.dim_venues + └── foundation.dim_cities serving/ ← pre-aggregated for web app - └── serving.padelnomics_metrics + ├── serving.city_market_profile + └── serving.planner_defaults ``` -### raw/ — verbatim source reads +### staging/ — read landing files + type casting -- Reads landing zone files directly with `read_csv(..., all_varchar=true)` -- No transformations, no business logic -- Column names match the source exactly -- Uses a macro (`@padelnomics_glob()`) so new landing files are picked up automatically -- Naming: `raw.` - -### staging/ — type casting and cleansing - -- One model per raw model (1:1) -- Cast all columns to correct types: `TRY_CAST(report_date AS DATE)` -- Deduplicate if source produces duplicates -- Minimal renaming — only where raw names are genuinely unclear +- Reads landing zone JSON files directly with `read_json(..., format='auto', filename=true)` +- Uses `@LANDING_DIR` variable for file path discovery +- Casts all columns to correct types: `TRY_CAST(... AS DOUBLE)` +- Deduplicates where source produces duplicates (ROW_NUMBER partitioned on ID) +- Validates coordinates, nulls, and data quality inline - Naming: `staging.stg_` ### foundation/ — business logic @@ -59,49 +58,54 @@ serving/ ← pre-aggregated for web app - Dimensions (`dim_*`): slowly changing attributes, one row per entity - Facts (`fact_*`): events and measurements, one row per event - May join across multiple staging models from different sources -- Surrogate keys: `MD5(business_key)` for stable joins - Naming: `foundation.dim_`, `foundation.fact_` ### serving/ — analytics-ready aggregates - Pre-aggregated for specific web app query patterns -- These are the only tables the web app reads +- These are the only tables the web app reads (via `analytics.duckdb`) - Queried from `analytics.py` via `fetch_analytics()` -- Named to match what the frontend expects - Naming: `serving.` +## Two-DuckDB architecture + +``` +data/lakehouse.duckdb ← SQLMesh exclusive write (DUCKDB_PATH) + ├── staging.* + ├── foundation.* + └── serving.* + +data/analytics.duckdb ← web app read-only (SERVING_DUCKDB_PATH) + └── serving.* ← atomically replaced by export_serving.py +``` + +SQLMesh holds an exclusive write lock on `lakehouse.duckdb` during plan/run. +The web app needs read-only access at all times. `export_serving.py` copies +`serving.*` tables to a temp file, then atomically renames it to `analytics.duckdb`. +The web app detects the inode change on next query — no restart needed. + +**Never point DUCKDB_PATH and SERVING_DUCKDB_PATH to the same file.** + ## Adding a new data source -1. Add a landing zone directory in the extraction package -2. Add a glob macro in `macros/__init__.py`: - ```python - @macro() - def my_source_glob(evaluator) -> str: - landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") - return f"'{landing_dir}/my_source/**/*.csv.gz'" - ``` -3. Add a raw model: `models/raw/raw_my_source.sql` -4. Add a staging model: `models/staging/stg_my_source.sql` -5. Join into foundation or serving models as needed +1. Add an extractor in `extract/padelnomics_extract/` (see extraction README) +2. Add a staging model: `models/staging/stg_.sql` that reads landing files directly +3. Join into foundation or serving models as needed ## Model materialization | Layer | Default kind | Rationale | |-------|-------------|-----------| -| raw | FULL | Always re-reads all files; cheap with DuckDB parallel scan | -| staging | FULL | 1:1 with raw; same cost | +| staging | FULL | Re-reads all landing files; cheap with DuckDB parallel scan | | foundation | FULL | Business logic rarely changes; recompute is fast | | serving | FULL | Small aggregates; web app needs latest at all times | -For large historical tables, switch to `kind INCREMENTAL_BY_TIME_RANGE` with a time partition column. SQLMesh handles the incremental logic automatically. +For large historical tables, switch to `kind INCREMENTAL_BY_TIME_RANGE` with a time partition column. ## Environment variables | Variable | Default | Description | |----------|---------|-------------| | `LANDING_DIR` | `data/landing` | Root of the landing zone | -| `DUCKDB_PATH` | `local.duckdb` | DuckDB file (SQLMesh exclusive write access) | - -The web app reads from a **separate** `analytics.duckdb` file via `export_serving.py`. -Never point `DUCKDB_PATH` and `SERVING_DUCKDB_PATH` to the same file — -SQLMesh holds an exclusive write lock during plan/run. +| `DUCKDB_PATH` | `data/lakehouse.duckdb` | DuckDB file (SQLMesh exclusive write access) | +| `SERVING_DUCKDB_PATH` | `data/analytics.duckdb` | Serving DB (web app reads from here) | diff --git a/transform/sqlmesh_padelnomics/models/foundation/dim_cities.sql b/transform/sqlmesh_padelnomics/models/foundation/dim_cities.sql index 95150e8..4eae720 100644 --- a/transform/sqlmesh_padelnomics/models/foundation/dim_cities.sql +++ b/transform/sqlmesh_padelnomics/models/foundation/dim_cities.sql @@ -3,7 +3,7 @@ -- Cities without Eurostat coverage (US, non-EU) are derived from venue clusters. MODEL ( - name padelnomics.dim_cities, + name foundation.dim_cities, kind FULL, cron '@daily', grain city_code @@ -16,7 +16,7 @@ eurostat_cities AS ( country_code, population, ref_year - FROM padelnomics.stg_population + FROM staging.stg_population QUALIFY ROW_NUMBER() OVER (PARTITION BY city_code ORDER BY ref_year DESC) = 1 ), -- Venue counts per (country_code, city) from dim_venues @@ -27,7 +27,7 @@ venue_counts AS ( COUNT(*) AS venue_count, AVG(lat) AS centroid_lat, AVG(lon) AS centroid_lon - FROM padelnomics.dim_venues + FROM foundation.dim_venues WHERE city IS NOT NULL AND city != '' GROUP BY country_code, city ), diff --git a/transform/sqlmesh_padelnomics/models/foundation/dim_venues.sql b/transform/sqlmesh_padelnomics/models/foundation/dim_venues.sql index bb3c448..825232d 100644 --- a/transform/sqlmesh_padelnomics/models/foundation/dim_venues.sql +++ b/transform/sqlmesh_padelnomics/models/foundation/dim_venues.sql @@ -4,7 +4,7 @@ -- Proximity dedup uses haversine approximation: 1 degree lat ≈ 111 km. MODEL ( - name padelnomics.dim_venues, + name foundation.dim_venues, kind FULL, cron '@daily', grain venue_id @@ -22,7 +22,7 @@ WITH all_venues AS ( postcode, NULL AS tenant_type, extracted_date - FROM padelnomics.stg_padel_courts + FROM staging.stg_padel_courts WHERE country_code IS NOT NULL UNION ALL @@ -38,7 +38,7 @@ WITH all_venues AS ( postcode, tenant_type, extracted_date - FROM padelnomics.stg_playtomic_venues + FROM staging.stg_playtomic_venues WHERE country_code IS NOT NULL ), -- Rank venues so Playtomic records win ties in proximity dedup diff --git a/transform/sqlmesh_padelnomics/models/raw/README.md b/transform/sqlmesh_padelnomics/models/raw/README.md deleted file mode 100644 index 4deaf69..0000000 --- a/transform/sqlmesh_padelnomics/models/raw/README.md +++ /dev/null @@ -1,6 +0,0 @@ -# raw - -Read raw landing zone files directly with `read_csv_auto()`. -No transformations — schema as-is from source. - -Naming convention: `raw._` diff --git a/transform/sqlmesh_padelnomics/models/raw/raw_eurostat_population.sql b/transform/sqlmesh_padelnomics/models/raw/raw_eurostat_population.sql deleted file mode 100644 index aac5aaf..0000000 --- a/transform/sqlmesh_padelnomics/models/raw/raw_eurostat_population.sql +++ /dev/null @@ -1,64 +0,0 @@ --- Raw Eurostat Urban Audit city population (dataset: urb_cpop1). --- Source: data/landing/eurostat/{year}/{month}/urb_cpop1.json.gz --- Format: Eurostat JSON Statistics API (dimensions + flat value array). --- --- The Eurostat JSON format encodes dimensions separately from values: --- dimension.cities.category.index → maps city code to flat array position --- dimension.time.category.index → maps year to flat array position --- values → flat object {position_str: value} --- --- This model stores one row per (city_code, year) by computing positions. --- Reference: https://wikis.ec.europa.eu/display/EUROSTATHELP/API+Statistics - -MODEL ( - name padelnomics.raw_eurostat_population, - kind FULL, - cron '@daily', - grain (city_code, ref_year) -); - -WITH raw AS ( - SELECT - raw_json, - filename - FROM read_json( - @LANDING_DIR || '/eurostat/*/*/urb_cpop1.json.gz', - format = 'auto', - filename = true, - columns = { 'raw_json': 'JSON' } - ) -), --- Unnest city codes with their ordinal positions -cities AS ( - SELECT - city_code, - (city_pos)::INTEGER AS city_pos, - filename, - raw_json, - (json_extract(raw_json, '$.size[1]'))::INTEGER AS n_times - FROM raw, - LATERAL ( - SELECT key AS city_code, value::INTEGER AS city_pos - FROM json_each(json_extract(raw_json, '$.dimension.cities.category.index')) - ) -), --- Unnest time (year) values with positions -times AS ( - SELECT key AS ref_year, value::INTEGER AS time_pos - FROM (SELECT raw_json FROM raw LIMIT 1), - LATERAL ( - SELECT key, value - FROM json_each(json_extract(raw_json, '$.dimension.time.category.index')) - ) -) -SELECT - c.city_code, - t.ref_year, - TRY_CAST( - json_extract(c.raw_json, '$.' || (c.city_pos * c.n_times + t.time_pos)::TEXT) - AS DOUBLE - ) AS population, - c.filename AS source_file, - CURRENT_DATE AS extracted_date -FROM cities c -CROSS JOIN times t diff --git a/transform/sqlmesh_padelnomics/models/raw/raw_overpass_courts.sql b/transform/sqlmesh_padelnomics/models/raw/raw_overpass_courts.sql deleted file mode 100644 index 1993807..0000000 --- a/transform/sqlmesh_padelnomics/models/raw/raw_overpass_courts.sql +++ /dev/null @@ -1,42 +0,0 @@ --- Raw OpenStreetMap padel courts from Overpass API landing files. --- Source: data/landing/overpass/{year}/{month}/courts.json.gz --- Format: {"version": ..., "elements": [{type, id, lat, lon, tags}, ...]} --- --- Only node elements carry direct lat/lon. Way and relation elements need --- centroid calculation from member nodes (not done here — filter to node only --- for the initial raw layer; ways/relations retained as-is for future enrichment). - -MODEL ( - name padelnomics.raw_overpass_courts, - kind FULL, - cron '@daily', - grain (osm_type, osm_id) -); - -SELECT - elem ->> 'type' AS osm_type, - (elem ->> 'id')::BIGINT AS osm_id, - TRY_CAST(elem ->> 'lat' AS DOUBLE) AS lat, - TRY_CAST(elem ->> 'lon' AS DOUBLE) AS lon, - elem -> 'tags' ->> 'name' AS name, - elem -> 'tags' ->> 'sport' AS sport, - elem -> 'tags' ->> 'leisure' AS leisure, - elem -> 'tags' ->> 'addr:country' AS country_code, - elem -> 'tags' ->> 'addr:city' AS city_tag, - elem -> 'tags' ->> 'addr:postcode' AS postcode, - elem -> 'tags' ->> 'operator' AS operator_name, - elem -> 'tags' ->> 'opening_hours' AS opening_hours, - elem -> 'tags' ->> 'fee' AS fee, - filename AS source_file, - CURRENT_DATE AS extracted_date -FROM ( - SELECT - UNNEST(elements) AS elem, - filename - FROM read_json( - @LANDING_DIR || '/overpass/*/*/courts.json.gz', - format = 'auto', - filename = true - ) -) -WHERE (elem ->> 'type') IS NOT NULL diff --git a/transform/sqlmesh_padelnomics/models/raw/raw_playtomic_tenants.sql b/transform/sqlmesh_padelnomics/models/raw/raw_playtomic_tenants.sql deleted file mode 100644 index ab1555e..0000000 --- a/transform/sqlmesh_padelnomics/models/raw/raw_playtomic_tenants.sql +++ /dev/null @@ -1,35 +0,0 @@ --- Raw Playtomic venue (tenant) listings from unauthenticated tenant search API. --- Source: data/landing/playtomic/{year}/{month}/tenants.json.gz --- Format: {"tenants": [{tenant_id, name, address, sport_ids, ...}], "count": N} - -MODEL ( - name padelnomics.raw_playtomic_tenants, - kind FULL, - cron '@daily', - grain tenant_id -); - -SELECT - tenant ->> 'tenant_id' AS tenant_id, - tenant ->> 'tenant_name' AS tenant_name, - tenant -> 'address' ->> 'street' AS street, - tenant -> 'address' ->> 'city' AS city, - tenant -> 'address' ->> 'postal_code' AS postal_code, - tenant -> 'address' ->> 'country_code' AS country_code, - TRY_CAST(tenant -> 'address' ->> 'coordinate_lat' AS DOUBLE) AS lat, - TRY_CAST(tenant -> 'address' ->> 'coordinate_lon' AS DOUBLE) AS lon, - tenant ->> 'sport_ids' AS sport_ids_raw, - tenant ->> 'tenant_type' AS tenant_type, - filename AS source_file, - CURRENT_DATE AS extracted_date -FROM ( - SELECT - UNNEST(tenants) AS tenant, - filename - FROM read_json( - @LANDING_DIR || '/playtomic/*/*/tenants.json.gz', - format = 'auto', - filename = true - ) -) -WHERE (tenant ->> 'tenant_id') IS NOT NULL diff --git a/transform/sqlmesh_padelnomics/models/serving/city_market_profile.sql b/transform/sqlmesh_padelnomics/models/serving/city_market_profile.sql index 1d3033e..7d09746 100644 --- a/transform/sqlmesh_padelnomics/models/serving/city_market_profile.sql +++ b/transform/sqlmesh_padelnomics/models/serving/city_market_profile.sql @@ -7,7 +7,7 @@ -- 20% data confidence (completeness of both population + venue data) MODEL ( - name padelnomics.city_market_profile, + name serving.city_market_profile, kind FULL, cron '@daily', grain city_slug @@ -35,7 +35,7 @@ WITH base AS ( WHEN c.population > 0 OR c.padel_venue_count > 0 THEN 0.5 ELSE 0.0 END AS data_confidence - FROM padelnomics.dim_cities c + FROM foundation.dim_cities c WHERE c.padel_venue_count > 0 ), scored AS ( diff --git a/transform/sqlmesh_padelnomics/models/serving/planner_defaults.sql b/transform/sqlmesh_padelnomics/models/serving/planner_defaults.sql index 5748e06..b13ac74 100644 --- a/transform/sqlmesh_padelnomics/models/serving/planner_defaults.sql +++ b/transform/sqlmesh_padelnomics/models/serving/planner_defaults.sql @@ -8,7 +8,7 @@ -- Units are explicit in column names (EUR, %, h). All monetary values in EUR. MODEL ( - name padelnomics.planner_defaults, + name serving.planner_defaults, kind FULL, cron '@daily', grain city_slug @@ -43,7 +43,7 @@ city_venue_density AS ( population, venues_per_100k, market_score - FROM padelnomics.city_market_profile + FROM serving.city_market_profile ) SELECT cvd.city_slug, diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_padel_courts.sql b/transform/sqlmesh_padelnomics/models/staging/stg_padel_courts.sql index a5b94e5..5a21831 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_padel_courts.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_padel_courts.sql @@ -1,30 +1,53 @@ --- Cleaned OSM padel courts — node elements only (direct lat/lon available). --- Deduplicates on osm_id, keeps most recently extracted record. --- Country code resolved from addr:country tag or approximated by lat/lon bbox. +-- Padel court locations from OpenStreetMap via Overpass API. +-- Reads landing zone JSON directly, unnests elements, filters to nodes with +-- valid coordinates, deduplicates on osm_id, and approximates country from bbox. +-- +-- Source: data/landing/overpass/{year}/{month}/courts.json.gz MODEL ( - name padelnomics.stg_padel_courts, + name staging.stg_padel_courts, kind FULL, cron '@daily', grain osm_id ); -WITH deduped AS ( +WITH parsed AS ( + SELECT + elem ->> 'type' AS osm_type, + (elem ->> 'id')::BIGINT AS osm_id, + TRY_CAST(elem ->> 'lat' AS DOUBLE) AS lat, + TRY_CAST(elem ->> 'lon' AS DOUBLE) AS lon, + elem -> 'tags' ->> 'name' AS name, + elem -> 'tags' ->> 'addr:country' AS country_code, + elem -> 'tags' ->> 'addr:city' AS city_tag, + elem -> 'tags' ->> 'addr:postcode' AS postcode, + elem -> 'tags' ->> 'operator' AS operator_name, + elem -> 'tags' ->> 'opening_hours' AS opening_hours, + elem -> 'tags' ->> 'fee' AS fee, + filename AS source_file, + CURRENT_DATE AS extracted_date + FROM ( + SELECT UNNEST(elements) AS elem, filename + FROM read_json( + @LANDING_DIR || '/overpass/*/*/courts.json.gz', + format = 'auto', + filename = true + ) + ) + WHERE (elem ->> 'type') IS NOT NULL +), +deduped AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY osm_id ORDER BY extracted_date DESC) AS rn - FROM padelnomics.raw_overpass_courts + FROM parsed WHERE osm_type = 'node' - AND lat IS NOT NULL - AND lon IS NOT NULL + AND lat IS NOT NULL AND lon IS NOT NULL AND lat BETWEEN -90 AND 90 AND lon BETWEEN -180 AND 180 ), --- Approximate country from lat/lon when addr:country tag is absent with_country AS ( SELECT - osm_id, - lat, - lon, + osm_id, lat, lon, COALESCE(NULLIF(TRIM(UPPER(country_code)), ''), CASE WHEN lat BETWEEN 47.27 AND 55.06 AND lon BETWEEN 5.87 AND 15.04 THEN 'DE' WHEN lat BETWEEN 35.95 AND 43.79 AND lon BETWEEN -9.39 AND 4.33 THEN 'ES' @@ -37,26 +60,15 @@ with_country AS ( ELSE NULL END) AS country_code, NULLIF(TRIM(name), '') AS name, - NULLIF(TRIM(city_tag), '') AS city_tag, - postcode, - operator_name, - opening_hours, - fee, - extracted_date + NULLIF(TRIM(city_tag), '') AS city, + postcode, operator_name, opening_hours, fee, extracted_date FROM deduped WHERE rn = 1 ) SELECT osm_id, 'osm' AS source, - lat, - lon, - country_code, - name, - city_tag AS city, - postcode, - operator_name, - opening_hours, + lat, lon, country_code, name, city, postcode, operator_name, opening_hours, CASE LOWER(fee) WHEN 'yes' THEN TRUE WHEN 'no' THEN FALSE ELSE NULL END AS is_paid, extracted_date FROM with_country diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql index 25e21eb..c8c4083 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql @@ -1,27 +1,53 @@ --- Cleaned Playtomic padel venue records. One row per venue, deduped on tenant_id. +-- Playtomic padel venue records from unauthenticated tenant search API. +-- Reads landing zone JSON directly, unnests tenant array, deduplicates on +-- tenant_id (keeps most recent), and normalizes address fields. +-- +-- Source: data/landing/playtomic/{year}/{month}/tenants.json.gz MODEL ( - name padelnomics.stg_playtomic_venues, + name staging.stg_playtomic_venues, kind FULL, cron '@daily', grain tenant_id ); -WITH deduped AS ( +WITH parsed AS ( + SELECT + tenant ->> 'tenant_id' AS tenant_id, + tenant ->> 'tenant_name' AS tenant_name, + tenant -> 'address' ->> 'street' AS street, + tenant -> 'address' ->> 'city' AS city, + tenant -> 'address' ->> 'postal_code' AS postal_code, + tenant -> 'address' ->> 'country_code' AS country_code, + TRY_CAST(tenant -> 'address' ->> 'coordinate_lat' AS DOUBLE) AS lat, + TRY_CAST(tenant -> 'address' ->> 'coordinate_lon' AS DOUBLE) AS lon, + tenant ->> 'sport_ids' AS sport_ids_raw, + tenant ->> 'tenant_type' AS tenant_type, + filename AS source_file, + CURRENT_DATE AS extracted_date + FROM ( + SELECT UNNEST(tenants) AS tenant, filename + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/tenants.json.gz', + format = 'auto', + filename = true + ) + ) + WHERE (tenant ->> 'tenant_id') IS NOT NULL +), +deduped AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY tenant_id ORDER BY extracted_date DESC) AS rn - FROM padelnomics.raw_playtomic_tenants + FROM parsed WHERE tenant_id IS NOT NULL - AND lat IS NOT NULL - AND lon IS NOT NULL + AND lat IS NOT NULL AND lon IS NOT NULL AND lat BETWEEN -90 AND 90 AND lon BETWEEN -180 AND 180 ) SELECT tenant_id, 'playtomic' AS source, - lat, - lon, + lat, lon, UPPER(country_code) AS country_code, NULLIF(TRIM(tenant_name), '') AS name, NULLIF(TRIM(city), '') AS city, diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_population.sql b/transform/sqlmesh_padelnomics/models/staging/stg_population.sql index 47020de..418408e 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_population.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_population.sql @@ -1,21 +1,65 @@ --- Eurostat Urban Audit city population, cleaned and typed. --- Eurostat city codes follow the NUTS Urban Audit convention (e.g. DE001C). --- Country code is the first two characters of the city code. +-- Eurostat Urban Audit city population (dataset: urb_cpop1). +-- Reads landing zone JSON directly and parses the Eurostat multidimensional format. +-- One row per (city_code, year) with validated population values. +-- +-- Source: data/landing/eurostat/{year}/{month}/urb_cpop1.json.gz MODEL ( - name padelnomics.stg_population, + name staging.stg_population, kind FULL, cron '@daily', grain (city_code, ref_year) ); +WITH raw AS ( + SELECT raw_json, filename + FROM read_json( + @LANDING_DIR || '/eurostat/*/*/urb_cpop1.json.gz', + format = 'auto', + filename = true, + columns = { 'raw_json': 'JSON' } + ) +), +cities AS ( + SELECT + city_code, + (city_pos)::INTEGER AS city_pos, + filename, raw_json, + (json_extract(raw_json, '$.size[1]'))::INTEGER AS n_times + FROM raw, + LATERAL ( + SELECT key AS city_code, value::INTEGER AS city_pos + FROM json_each(json_extract(raw_json, '$.dimension.cities.category.index')) + ) +), +times AS ( + SELECT key AS ref_year, value::INTEGER AS time_pos + FROM (SELECT raw_json FROM raw LIMIT 1), + LATERAL ( + SELECT key, value + FROM json_each(json_extract(raw_json, '$.dimension.time.category.index')) + ) +), +parsed AS ( + SELECT + c.city_code, + t.ref_year, + TRY_CAST( + json_extract(c.raw_json, '$.' || (c.city_pos * c.n_times + t.time_pos)::TEXT) + AS DOUBLE + ) AS population, + c.filename AS source_file, + CURRENT_DATE AS extracted_date + FROM cities c + CROSS JOIN times t +) SELECT UPPER(city_code) AS city_code, UPPER(LEFT(city_code, 2)) AS country_code, ref_year::INTEGER AS ref_year, population::BIGINT AS population, extracted_date -FROM padelnomics.raw_eurostat_population +FROM parsed WHERE population IS NOT NULL AND population > 0 AND ref_year ~ '^\d{4}$' diff --git a/web/src/padelnomics/analytics.py b/web/src/padelnomics/analytics.py index 6e12937..adf4273 100644 --- a/web/src/padelnomics/analytics.py +++ b/web/src/padelnomics/analytics.py @@ -7,7 +7,7 @@ All queries run via asyncio.to_thread() to avoid blocking the event loop. Usage: from .analytics import fetch_analytics - rows = await fetch_analytics("SELECT * FROM padelnomics.planner_defaults WHERE city_slug = ?", ["berlin"]) + rows = await fetch_analytics("SELECT * FROM serving.planner_defaults WHERE city_slug = ?", ["berlin"]) """ import asyncio import os @@ -17,7 +17,7 @@ from typing import Any import duckdb _conn: duckdb.DuckDBPyConnection | None = None -_DUCKDB_PATH = os.environ.get("DUCKDB_PATH", "data/lakehouse.duckdb") +_DUCKDB_PATH = os.environ.get("SERVING_DUCKDB_PATH", "data/analytics.duckdb") def open_analytics_db() -> None: diff --git a/web/src/padelnomics/planner/routes.py b/web/src/padelnomics/planner/routes.py index a3c9cff..1f5e999 100644 --- a/web/src/padelnomics/planner/routes.py +++ b/web/src/padelnomics/planner/routes.py @@ -603,7 +603,7 @@ async def market_data(): from ..analytics import fetch_analytics rows = await fetch_analytics( - "SELECT * FROM padelnomics.planner_defaults WHERE city_slug = ? LIMIT 1", + "SELECT * FROM serving.planner_defaults WHERE city_slug = ? LIMIT 1", [city_slug], ) if not rows: diff --git a/web/src/padelnomics/scripts/refresh_from_daas.py b/web/src/padelnomics/scripts/refresh_from_daas.py index 93c04cf..04e9b93 100644 --- a/web/src/padelnomics/scripts/refresh_from_daas.py +++ b/web/src/padelnomics/scripts/refresh_from_daas.py @@ -1,7 +1,7 @@ """ Refresh template_data rows from DuckDB analytics serving layer. -Reads per-city market data from the `padelnomics.planner_defaults` serving table +Reads per-city market data from the `serving.planner_defaults` serving table and overwrites matching static values in `template_data.data_json`. This keeps article financial model inputs in sync with the real-world data pipeline output. @@ -81,7 +81,7 @@ def _load_analytics(city_slugs: list[str]) -> dict[str, dict]: conn = duckdb.connect(str(path), read_only=True) placeholders = ", ".join(["?"] * len(city_slugs)) rows = conn.execute( - f"SELECT * FROM padelnomics.planner_defaults WHERE city_slug IN ({placeholders})", + f"SELECT * FROM serving.planner_defaults WHERE city_slug IN ({placeholders})", city_slugs, ).fetchall() cols = [d[0] for d in conn.description]