diff --git a/CHANGELOG.md b/CHANGELOG.md index 8cf7577..bd8e1f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] ### Added +- **Pipeline diagnostic script** (`scripts/check_pipeline.py`) — read-only script that reports row counts at every layer of the pricing pipeline (staging → foundation → serving), date range analysis, HAVING filter impact, and join coverage. Run on prod to diagnose empty serving tables. +- **Extraction card descriptions** — each workflow card on the admin pipeline page now shows a one-line description explaining what the data source is (e.g. "EU geographic boundaries (NUTS2 polygons) from Eurostat GISCO"). Descriptions defined in `workflows.toml`. +- **Running state indicator** — extraction cards show a spinner + "Running" label with a blue-tinted border when an extraction is actively running, replacing the plain Run button. Cards also display the start time with "running..." text. + - **Interactive Leaflet maps** — geographic visualization across 4 key placements using self-hosted Leaflet 1.9.4 (GDPR-safe, no CDN): - **Markets hub** (`/markets`): country bubble map with circles sized by total venues, colored by avg market score (green ≥ 60, amber 30-60, red < 30). Click navigates to country overview. - **Country overview articles**: city bubble map loads after article render, auto-fits bounds, click navigates to city page. Bubbles colored by market score. diff --git a/infra/supervisor/workflows.toml b/infra/supervisor/workflows.toml index e4a18b9..5e68732 100644 --- a/infra/supervisor/workflows.toml +++ b/infra/supervisor/workflows.toml @@ -8,54 +8,67 @@ # entry — optional: function name if not "main" (default: "main") # depends_on — optional: list of workflow names that must run first # proxy_mode — optional: "round-robin" (default) or "sticky" +# description — optional: human-readable one-liner shown in the admin UI [overpass] module = "padelnomics_extract.overpass" schedule = "monthly" +description = "Padel court locations from OpenStreetMap via Overpass API" [overpass_tennis] module = "padelnomics_extract.overpass_tennis" schedule = "monthly" +description = "Tennis court locations from OpenStreetMap via Overpass API" [eurostat] module = "padelnomics_extract.eurostat" schedule = "monthly" +description = "City population data from Eurostat Urban Audit" [geonames] module = "padelnomics_extract.geonames" schedule = "monthly" +description = "Global city/town gazetteer from GeoNames (pop >= 1K)" [playtomic_tenants] module = "padelnomics_extract.playtomic_tenants" schedule = "daily" +description = "Padel venue directory from Playtomic (names, locations, courts)" [playtomic_availability] module = "padelnomics_extract.playtomic_availability" schedule = "daily" depends_on = ["playtomic_tenants"] +description = "Morning availability snapshots — slot-level pricing per venue" [playtomic_recheck] module = "padelnomics_extract.playtomic_availability" entry = "main_recheck" schedule = "0,30 6-23 * * *" depends_on = ["playtomic_availability"] +description = "Intraday availability rechecks for occupancy tracking" [census_usa] module = "padelnomics_extract.census_usa" schedule = "monthly" +description = "US city/place population from Census Bureau ACS" [census_usa_income] module = "padelnomics_extract.census_usa_income" schedule = "monthly" +description = "US county median household income from Census Bureau ACS" [eurostat_city_labels] module = "padelnomics_extract.eurostat_city_labels" schedule = "monthly" +description = "City code-to-name mapping for Eurostat Urban Audit cities" [ons_uk] module = "padelnomics_extract.ons_uk" schedule = "monthly" +description = "UK local authority population estimates from ONS" [gisco] module = "padelnomics_extract.gisco" schedule = "monthly" +description = "EU geographic boundaries (NUTS2 polygons) from Eurostat GISCO" diff --git a/scripts/check_pipeline.py b/scripts/check_pipeline.py new file mode 100644 index 0000000..eb39b4e --- /dev/null +++ b/scripts/check_pipeline.py @@ -0,0 +1,200 @@ +""" +Diagnostic script: check row counts at every layer of the pricing pipeline. + +Run on prod via SSH: + DUCKDB_PATH=/opt/padelnomics/data/lakehouse.duckdb uv run python scripts/check_pipeline.py + +Or locally: + DUCKDB_PATH=data/lakehouse.duckdb uv run python scripts/check_pipeline.py + +Read-only — never writes to the database. +""" + +import os +import sys + +import duckdb + +DUCKDB_PATH = os.environ.get("DUCKDB_PATH", "data/lakehouse.duckdb") + +PIPELINE_TABLES = [ + ("staging", "stg_playtomic_availability"), + ("foundation", "fct_availability_slot"), + ("foundation", "dim_venue_capacity"), + ("foundation", "fct_daily_availability"), + ("serving", "venue_pricing_benchmarks"), + ("serving", "pseo_city_pricing"), +] + + +def main(): + if not os.path.exists(DUCKDB_PATH): + print(f"ERROR: {DUCKDB_PATH} not found") + sys.exit(1) + + con = duckdb.connect(DUCKDB_PATH, read_only=True) + + print(f"Database: {DUCKDB_PATH}") + print(f"DuckDB version: {con.execute('SELECT version()').fetchone()[0]}") + print() + + # ── Row counts at each layer ────────────────────────────────────────── + print("=" * 60) + print("PIPELINE ROW COUNTS") + print("=" * 60) + + for schema, table in PIPELINE_TABLES: + # SQLMesh may use __ suffixed physical tables + # Try the logical name first, then scan for physical tables + candidates = [f"{schema}.{table}"] + try: + phys = con.execute( + f"SELECT table_schema || '.' || table_name " + f"FROM information_schema.tables " + f"WHERE table_name LIKE '{table}%' " + f"ORDER BY table_name" + ).fetchall() + for (name,) in phys: + if name not in candidates: + candidates.append(name) + except Exception: + pass + + for fqn in candidates: + try: + (count,) = con.execute(f"SELECT COUNT(*) FROM {fqn}").fetchone() + print(f" {fqn:50s} {count:>10,} rows") + except Exception as e: + print(f" {fqn:50s} ERROR: {e}") + + # ── Date range in fct_daily_availability ────────────────────────────── + print() + print("=" * 60) + print("DATE RANGE: fct_daily_availability") + print("=" * 60) + + try: + row = con.execute(""" + SELECT + MIN(snapshot_date) AS min_date, + MAX(snapshot_date) AS max_date, + COUNT(DISTINCT snapshot_date) AS distinct_days, + CURRENT_DATE AS today, + CURRENT_DATE - INTERVAL '30 days' AS window_start + FROM foundation.fct_daily_availability + """).fetchone() + if row: + min_date, max_date, days, today, window_start = row + print(f" Min snapshot_date: {min_date}") + print(f" Max snapshot_date: {max_date}") + print(f" Distinct days: {days}") + print(f" Today: {today}") + print(f" 30-day window start: {window_start}") + if max_date and str(max_date) < str(window_start): + print() + print(" *** ALL DATA IS OUTSIDE THE 30-DAY WINDOW ***") + print(" This is why venue_pricing_benchmarks is empty.") + except Exception as e: + print(f" ERROR: {e}") + + # ── HAVING filter impact in venue_pricing_benchmarks ────────────────── + print() + print("=" * 60) + print("HAVING FILTER IMPACT (venue_pricing_benchmarks)") + print("=" * 60) + + try: + row = con.execute(""" + WITH venue_stats AS ( + SELECT + da.tenant_id, + da.country_code, + da.city, + COUNT(DISTINCT da.snapshot_date) AS days_observed + FROM foundation.fct_daily_availability da + WHERE TRY_CAST(da.snapshot_date AS DATE) >= CURRENT_DATE - INTERVAL '30 days' + AND da.occupancy_rate IS NOT NULL + AND da.occupancy_rate BETWEEN 0 AND 1.5 + GROUP BY da.tenant_id, da.country_code, da.city + ) + SELECT + COUNT(*) AS total_venues, + COUNT(*) FILTER (WHERE days_observed >= 3) AS venues_passing_having, + COUNT(*) FILTER (WHERE days_observed < 3) AS venues_failing_having, + MAX(days_observed) AS max_days, + MIN(days_observed) AS min_days + FROM venue_stats + """).fetchone() + if row: + total, passing, failing, max_d, min_d = row + print(f" Venues in 30-day window: {total}") + print(f" Venues with >= 3 days (PASSING): {passing}") + print(f" Venues with < 3 days (FILTERED): {failing}") + print(f" Max days observed: {max_d}") + print(f" Min days observed: {min_d}") + if total == 0: + print() + print(" *** NO VENUES IN 30-DAY WINDOW — check fct_daily_availability dates ***") + except Exception as e: + print(f" ERROR: {e}") + + # ── Occupancy rate distribution ─────────────────────────────────────── + print() + print("=" * 60) + print("OCCUPANCY RATE DISTRIBUTION (fct_daily_availability)") + print("=" * 60) + + try: + rows = con.execute(""" + SELECT + CASE + WHEN occupancy_rate IS NULL THEN 'NULL' + WHEN occupancy_rate < 0 THEN '< 0 (invalid)' + WHEN occupancy_rate > 1.5 THEN '> 1.5 (filtered)' + WHEN occupancy_rate <= 0.25 THEN '0 – 0.25' + WHEN occupancy_rate <= 0.50 THEN '0.25 – 0.50' + WHEN occupancy_rate <= 0.75 THEN '0.50 – 0.75' + ELSE '0.75 – 1.0+' + END AS bucket, + COUNT(*) AS cnt + FROM foundation.fct_daily_availability + GROUP BY 1 + ORDER BY 1 + """).fetchall() + for bucket, cnt in rows: + print(f" {bucket:25s} {cnt:>10,}") + except Exception as e: + print(f" ERROR: {e}") + + # ── dim_venue_capacity join coverage ────────────────────────────────── + print() + print("=" * 60) + print("JOIN COVERAGE: fct_availability_slot → dim_venue_capacity") + print("=" * 60) + + try: + row = con.execute(""" + SELECT + COUNT(DISTINCT a.tenant_id) AS slot_tenants, + COUNT(DISTINCT c.tenant_id) AS capacity_tenants, + COUNT(DISTINCT a.tenant_id) - COUNT(DISTINCT c.tenant_id) AS missing_capacity + FROM foundation.fct_availability_slot a + LEFT JOIN foundation.dim_venue_capacity c ON a.tenant_id = c.tenant_id + """).fetchone() + if row: + slot_t, cap_t, missing = row + print(f" Tenants in fct_availability_slot: {slot_t}") + print(f" Tenants with capacity match: {cap_t}") + print(f" Tenants missing capacity: {missing}") + if missing and missing > 0: + print(f" *** {missing} tenants dropped by INNER JOIN to dim_venue_capacity ***") + except Exception as e: + print(f" ERROR: {e}") + + con.close() + print() + print("Done.") + + +if __name__ == "__main__": + main() diff --git a/web/src/padelnomics/admin/pipeline_routes.py b/web/src/padelnomics/admin/pipeline_routes.py index cd687ea..f477e85 100644 --- a/web/src/padelnomics/admin/pipeline_routes.py +++ b/web/src/padelnomics/admin/pipeline_routes.py @@ -540,6 +540,7 @@ def _load_workflows() -> list[dict]: "schedule": schedule, "schedule_label": schedule_label, "depends_on": config.get("depends_on", []), + "description": config.get("description", ""), }) return workflows diff --git a/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html b/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html index ce22a1d..a9961a0 100644 --- a/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html +++ b/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html @@ -16,8 +16,9 @@ {% set wf = row.workflow %} {% set run = row.run %} {% set stale = row.stale %} -
-
+ {% set is_running = run and run.status == 'running' and not stale %} +
+
{% if not run %} {% elif stale %} @@ -33,6 +34,15 @@ {% if stale %} stale {% endif %} + {% if is_running %} + + + + + Running + + {% else %} + {% endif %}
+ {% if wf.description %} +

{{ wf.description }}

+ {% endif %}

{{ wf.schedule_label }}

- {% if run %} + {% if is_running %} +

+ Started {{ run.started_at[:16].replace('T', ' ') if run.started_at else '—' }} — running... +

+ {% elif run %}

{{ run.started_at[:16].replace('T', ' ') if run.started_at else '—' }}

{% if run.status == 'failed' and run.error_message %}