merge: pipeline diagnostic script + extraction card UX improvements
All checks were successful
CI / test (push) Successful in 54s
CI / tag (push) Successful in 3s

This commit is contained in:
Deeman
2026-03-05 15:40:16 +01:00
5 changed files with 239 additions and 3 deletions

View File

@@ -7,6 +7,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased]
### Added
- **Pipeline diagnostic script** (`scripts/check_pipeline.py`) — read-only script that reports row counts at every layer of the pricing pipeline (staging → foundation → serving), date range analysis, HAVING filter impact, and join coverage. Run on prod to diagnose empty serving tables.
- **Extraction card descriptions** — each workflow card on the admin pipeline page now shows a one-line description explaining what the data source is (e.g. "EU geographic boundaries (NUTS2 polygons) from Eurostat GISCO"). Descriptions defined in `workflows.toml`.
- **Running state indicator** — extraction cards show a spinner + "Running" label with a blue-tinted border when an extraction is actively running, replacing the plain Run button. Cards also display the start time with "running..." text.
- **Interactive Leaflet maps** — geographic visualization across 4 key placements using self-hosted Leaflet 1.9.4 (GDPR-safe, no CDN):
- **Markets hub** (`/markets`): country bubble map with circles sized by total venues, colored by avg market score (green ≥ 60, amber 30-60, red < 30). Click navigates to country overview.
- **Country overview articles**: city bubble map loads after article render, auto-fits bounds, click navigates to city page. Bubbles colored by market score.

View File

@@ -8,54 +8,67 @@
# entry — optional: function name if not "main" (default: "main")
# depends_on — optional: list of workflow names that must run first
# proxy_mode — optional: "round-robin" (default) or "sticky"
# description — optional: human-readable one-liner shown in the admin UI
[overpass]
module = "padelnomics_extract.overpass"
schedule = "monthly"
description = "Padel court locations from OpenStreetMap via Overpass API"
[overpass_tennis]
module = "padelnomics_extract.overpass_tennis"
schedule = "monthly"
description = "Tennis court locations from OpenStreetMap via Overpass API"
[eurostat]
module = "padelnomics_extract.eurostat"
schedule = "monthly"
description = "City population data from Eurostat Urban Audit"
[geonames]
module = "padelnomics_extract.geonames"
schedule = "monthly"
description = "Global city/town gazetteer from GeoNames (pop >= 1K)"
[playtomic_tenants]
module = "padelnomics_extract.playtomic_tenants"
schedule = "daily"
description = "Padel venue directory from Playtomic (names, locations, courts)"
[playtomic_availability]
module = "padelnomics_extract.playtomic_availability"
schedule = "daily"
depends_on = ["playtomic_tenants"]
description = "Morning availability snapshots — slot-level pricing per venue"
[playtomic_recheck]
module = "padelnomics_extract.playtomic_availability"
entry = "main_recheck"
schedule = "0,30 6-23 * * *"
depends_on = ["playtomic_availability"]
description = "Intraday availability rechecks for occupancy tracking"
[census_usa]
module = "padelnomics_extract.census_usa"
schedule = "monthly"
description = "US city/place population from Census Bureau ACS"
[census_usa_income]
module = "padelnomics_extract.census_usa_income"
schedule = "monthly"
description = "US county median household income from Census Bureau ACS"
[eurostat_city_labels]
module = "padelnomics_extract.eurostat_city_labels"
schedule = "monthly"
description = "City code-to-name mapping for Eurostat Urban Audit cities"
[ons_uk]
module = "padelnomics_extract.ons_uk"
schedule = "monthly"
description = "UK local authority population estimates from ONS"
[gisco]
module = "padelnomics_extract.gisco"
schedule = "monthly"
description = "EU geographic boundaries (NUTS2 polygons) from Eurostat GISCO"

200
scripts/check_pipeline.py Normal file
View File

@@ -0,0 +1,200 @@
"""
Diagnostic script: check row counts at every layer of the pricing pipeline.
Run on prod via SSH:
DUCKDB_PATH=/opt/padelnomics/data/lakehouse.duckdb uv run python scripts/check_pipeline.py
Or locally:
DUCKDB_PATH=data/lakehouse.duckdb uv run python scripts/check_pipeline.py
Read-only — never writes to the database.
"""
import os
import sys
import duckdb
DUCKDB_PATH = os.environ.get("DUCKDB_PATH", "data/lakehouse.duckdb")
PIPELINE_TABLES = [
("staging", "stg_playtomic_availability"),
("foundation", "fct_availability_slot"),
("foundation", "dim_venue_capacity"),
("foundation", "fct_daily_availability"),
("serving", "venue_pricing_benchmarks"),
("serving", "pseo_city_pricing"),
]
def main():
if not os.path.exists(DUCKDB_PATH):
print(f"ERROR: {DUCKDB_PATH} not found")
sys.exit(1)
con = duckdb.connect(DUCKDB_PATH, read_only=True)
print(f"Database: {DUCKDB_PATH}")
print(f"DuckDB version: {con.execute('SELECT version()').fetchone()[0]}")
print()
# ── Row counts at each layer ──────────────────────────────────────────
print("=" * 60)
print("PIPELINE ROW COUNTS")
print("=" * 60)
for schema, table in PIPELINE_TABLES:
# SQLMesh may use __<env> suffixed physical tables
# Try the logical name first, then scan for physical tables
candidates = [f"{schema}.{table}"]
try:
phys = con.execute(
f"SELECT table_schema || '.' || table_name "
f"FROM information_schema.tables "
f"WHERE table_name LIKE '{table}%' "
f"ORDER BY table_name"
).fetchall()
for (name,) in phys:
if name not in candidates:
candidates.append(name)
except Exception:
pass
for fqn in candidates:
try:
(count,) = con.execute(f"SELECT COUNT(*) FROM {fqn}").fetchone()
print(f" {fqn:50s} {count:>10,} rows")
except Exception as e:
print(f" {fqn:50s} ERROR: {e}")
# ── Date range in fct_daily_availability ──────────────────────────────
print()
print("=" * 60)
print("DATE RANGE: fct_daily_availability")
print("=" * 60)
try:
row = con.execute("""
SELECT
MIN(snapshot_date) AS min_date,
MAX(snapshot_date) AS max_date,
COUNT(DISTINCT snapshot_date) AS distinct_days,
CURRENT_DATE AS today,
CURRENT_DATE - INTERVAL '30 days' AS window_start
FROM foundation.fct_daily_availability
""").fetchone()
if row:
min_date, max_date, days, today, window_start = row
print(f" Min snapshot_date: {min_date}")
print(f" Max snapshot_date: {max_date}")
print(f" Distinct days: {days}")
print(f" Today: {today}")
print(f" 30-day window start: {window_start}")
if max_date and str(max_date) < str(window_start):
print()
print(" *** ALL DATA IS OUTSIDE THE 30-DAY WINDOW ***")
print(" This is why venue_pricing_benchmarks is empty.")
except Exception as e:
print(f" ERROR: {e}")
# ── HAVING filter impact in venue_pricing_benchmarks ──────────────────
print()
print("=" * 60)
print("HAVING FILTER IMPACT (venue_pricing_benchmarks)")
print("=" * 60)
try:
row = con.execute("""
WITH venue_stats AS (
SELECT
da.tenant_id,
da.country_code,
da.city,
COUNT(DISTINCT da.snapshot_date) AS days_observed
FROM foundation.fct_daily_availability da
WHERE TRY_CAST(da.snapshot_date AS DATE) >= CURRENT_DATE - INTERVAL '30 days'
AND da.occupancy_rate IS NOT NULL
AND da.occupancy_rate BETWEEN 0 AND 1.5
GROUP BY da.tenant_id, da.country_code, da.city
)
SELECT
COUNT(*) AS total_venues,
COUNT(*) FILTER (WHERE days_observed >= 3) AS venues_passing_having,
COUNT(*) FILTER (WHERE days_observed < 3) AS venues_failing_having,
MAX(days_observed) AS max_days,
MIN(days_observed) AS min_days
FROM venue_stats
""").fetchone()
if row:
total, passing, failing, max_d, min_d = row
print(f" Venues in 30-day window: {total}")
print(f" Venues with >= 3 days (PASSING): {passing}")
print(f" Venues with < 3 days (FILTERED): {failing}")
print(f" Max days observed: {max_d}")
print(f" Min days observed: {min_d}")
if total == 0:
print()
print(" *** NO VENUES IN 30-DAY WINDOW — check fct_daily_availability dates ***")
except Exception as e:
print(f" ERROR: {e}")
# ── Occupancy rate distribution ───────────────────────────────────────
print()
print("=" * 60)
print("OCCUPANCY RATE DISTRIBUTION (fct_daily_availability)")
print("=" * 60)
try:
rows = con.execute("""
SELECT
CASE
WHEN occupancy_rate IS NULL THEN 'NULL'
WHEN occupancy_rate < 0 THEN '< 0 (invalid)'
WHEN occupancy_rate > 1.5 THEN '> 1.5 (filtered)'
WHEN occupancy_rate <= 0.25 THEN '0 0.25'
WHEN occupancy_rate <= 0.50 THEN '0.25 0.50'
WHEN occupancy_rate <= 0.75 THEN '0.50 0.75'
ELSE '0.75 1.0+'
END AS bucket,
COUNT(*) AS cnt
FROM foundation.fct_daily_availability
GROUP BY 1
ORDER BY 1
""").fetchall()
for bucket, cnt in rows:
print(f" {bucket:25s} {cnt:>10,}")
except Exception as e:
print(f" ERROR: {e}")
# ── dim_venue_capacity join coverage ──────────────────────────────────
print()
print("=" * 60)
print("JOIN COVERAGE: fct_availability_slot → dim_venue_capacity")
print("=" * 60)
try:
row = con.execute("""
SELECT
COUNT(DISTINCT a.tenant_id) AS slot_tenants,
COUNT(DISTINCT c.tenant_id) AS capacity_tenants,
COUNT(DISTINCT a.tenant_id) - COUNT(DISTINCT c.tenant_id) AS missing_capacity
FROM foundation.fct_availability_slot a
LEFT JOIN foundation.dim_venue_capacity c ON a.tenant_id = c.tenant_id
""").fetchone()
if row:
slot_t, cap_t, missing = row
print(f" Tenants in fct_availability_slot: {slot_t}")
print(f" Tenants with capacity match: {cap_t}")
print(f" Tenants missing capacity: {missing}")
if missing and missing > 0:
print(f" *** {missing} tenants dropped by INNER JOIN to dim_venue_capacity ***")
except Exception as e:
print(f" ERROR: {e}")
con.close()
print()
print("Done.")
if __name__ == "__main__":
main()

View File

@@ -540,6 +540,7 @@ def _load_workflows() -> list[dict]:
"schedule": schedule,
"schedule_label": schedule_label,
"depends_on": config.get("depends_on", []),
"description": config.get("description", ""),
})
return workflows

View File

@@ -16,8 +16,9 @@
{% set wf = row.workflow %}
{% set run = row.run %}
{% set stale = row.stale %}
<div style="border:1px solid #E2E8F0;border-radius:10px;padding:0.875rem;background:#FAFAFA">
<div class="flex items-center gap-2 mb-2">
{% set is_running = run and run.status == 'running' and not stale %}
<div style="border:1px solid {% if is_running %}#93C5FD{% else %}#E2E8F0{% endif %};border-radius:10px;padding:0.875rem;background:{% if is_running %}#EFF6FF{% else %}#FAFAFA{% endif %}">
<div class="flex items-center gap-2 mb-1">
{% if not run %}
<span class="status-dot pending"></span>
{% elif stale %}
@@ -33,6 +34,15 @@
{% if stale %}
<span class="badge-warning" style="font-size:10px;padding:1px 6px;margin-left:auto">stale</span>
{% endif %}
{% if is_running %}
<span class="btn btn-sm ml-auto"
style="padding:2px 8px;font-size:11px;opacity:0.6;cursor:default;pointer-events:none">
<svg class="spinner-icon" width="12" height="12" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="3">
<path d="M12 2a10 10 0 0 1 10 10" stroke-linecap="round"/>
</svg>
Running
</span>
{% else %}
<button type="button"
class="btn btn-sm ml-auto"
style="padding:2px 8px;font-size:11px"
@@ -41,9 +51,17 @@
hx-swap="outerHTML"
hx-vals='{"extractor": "{{ wf.name }}", "csrf_token": "{{ csrf_token() }}"}'
hx-confirm="Run {{ wf.name }} extractor?">Run</button>
{% endif %}
</div>
{% if wf.description %}
<p class="text-xs text-slate" style="margin-top:2px;margin-bottom:4px">{{ wf.description }}</p>
{% endif %}
<p class="text-xs text-slate">{{ wf.schedule_label }}</p>
{% if run %}
{% if is_running %}
<p class="text-xs mt-1" style="color:#2563EB">
Started {{ run.started_at[:16].replace('T', ' ') if run.started_at else '—' }} — running...
</p>
{% elif run %}
<p class="text-xs mono text-slate-dark mt-1">{{ run.started_at[:16].replace('T', ' ') if run.started_at else '—' }}</p>
{% if run.status == 'failed' and run.error_message %}
<p class="text-xs text-danger mt-1" style="font-family:monospace;word-break:break-all">