Files
padelnomics/scripts/check_pipeline.py
Deeman 6b7fa45bce feat(admin): add pipeline diagnostic script + extraction card UX improvements
- Add scripts/check_pipeline.py: read-only diagnostic for pricing pipeline
  row counts, date range analysis, HAVING filter impact, join coverage
- Add description field to all 12 workflows in workflows.toml
- Parse and display descriptions on extraction status cards
- Show spinner + "Running" state with blue-tinted card border
- Display start time with "running..." text for active extractions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-05 15:40:12 +01:00

201 lines
7.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Diagnostic script: check row counts at every layer of the pricing pipeline.
Run on prod via SSH:
DUCKDB_PATH=/opt/padelnomics/data/lakehouse.duckdb uv run python scripts/check_pipeline.py
Or locally:
DUCKDB_PATH=data/lakehouse.duckdb uv run python scripts/check_pipeline.py
Read-only — never writes to the database.
"""
import os
import sys
import duckdb
DUCKDB_PATH = os.environ.get("DUCKDB_PATH", "data/lakehouse.duckdb")
PIPELINE_TABLES = [
("staging", "stg_playtomic_availability"),
("foundation", "fct_availability_slot"),
("foundation", "dim_venue_capacity"),
("foundation", "fct_daily_availability"),
("serving", "venue_pricing_benchmarks"),
("serving", "pseo_city_pricing"),
]
def main():
if not os.path.exists(DUCKDB_PATH):
print(f"ERROR: {DUCKDB_PATH} not found")
sys.exit(1)
con = duckdb.connect(DUCKDB_PATH, read_only=True)
print(f"Database: {DUCKDB_PATH}")
print(f"DuckDB version: {con.execute('SELECT version()').fetchone()[0]}")
print()
# ── Row counts at each layer ──────────────────────────────────────────
print("=" * 60)
print("PIPELINE ROW COUNTS")
print("=" * 60)
for schema, table in PIPELINE_TABLES:
# SQLMesh may use __<env> suffixed physical tables
# Try the logical name first, then scan for physical tables
candidates = [f"{schema}.{table}"]
try:
phys = con.execute(
f"SELECT table_schema || '.' || table_name "
f"FROM information_schema.tables "
f"WHERE table_name LIKE '{table}%' "
f"ORDER BY table_name"
).fetchall()
for (name,) in phys:
if name not in candidates:
candidates.append(name)
except Exception:
pass
for fqn in candidates:
try:
(count,) = con.execute(f"SELECT COUNT(*) FROM {fqn}").fetchone()
print(f" {fqn:50s} {count:>10,} rows")
except Exception as e:
print(f" {fqn:50s} ERROR: {e}")
# ── Date range in fct_daily_availability ──────────────────────────────
print()
print("=" * 60)
print("DATE RANGE: fct_daily_availability")
print("=" * 60)
try:
row = con.execute("""
SELECT
MIN(snapshot_date) AS min_date,
MAX(snapshot_date) AS max_date,
COUNT(DISTINCT snapshot_date) AS distinct_days,
CURRENT_DATE AS today,
CURRENT_DATE - INTERVAL '30 days' AS window_start
FROM foundation.fct_daily_availability
""").fetchone()
if row:
min_date, max_date, days, today, window_start = row
print(f" Min snapshot_date: {min_date}")
print(f" Max snapshot_date: {max_date}")
print(f" Distinct days: {days}")
print(f" Today: {today}")
print(f" 30-day window start: {window_start}")
if max_date and str(max_date) < str(window_start):
print()
print(" *** ALL DATA IS OUTSIDE THE 30-DAY WINDOW ***")
print(" This is why venue_pricing_benchmarks is empty.")
except Exception as e:
print(f" ERROR: {e}")
# ── HAVING filter impact in venue_pricing_benchmarks ──────────────────
print()
print("=" * 60)
print("HAVING FILTER IMPACT (venue_pricing_benchmarks)")
print("=" * 60)
try:
row = con.execute("""
WITH venue_stats AS (
SELECT
da.tenant_id,
da.country_code,
da.city,
COUNT(DISTINCT da.snapshot_date) AS days_observed
FROM foundation.fct_daily_availability da
WHERE TRY_CAST(da.snapshot_date AS DATE) >= CURRENT_DATE - INTERVAL '30 days'
AND da.occupancy_rate IS NOT NULL
AND da.occupancy_rate BETWEEN 0 AND 1.5
GROUP BY da.tenant_id, da.country_code, da.city
)
SELECT
COUNT(*) AS total_venues,
COUNT(*) FILTER (WHERE days_observed >= 3) AS venues_passing_having,
COUNT(*) FILTER (WHERE days_observed < 3) AS venues_failing_having,
MAX(days_observed) AS max_days,
MIN(days_observed) AS min_days
FROM venue_stats
""").fetchone()
if row:
total, passing, failing, max_d, min_d = row
print(f" Venues in 30-day window: {total}")
print(f" Venues with >= 3 days (PASSING): {passing}")
print(f" Venues with < 3 days (FILTERED): {failing}")
print(f" Max days observed: {max_d}")
print(f" Min days observed: {min_d}")
if total == 0:
print()
print(" *** NO VENUES IN 30-DAY WINDOW — check fct_daily_availability dates ***")
except Exception as e:
print(f" ERROR: {e}")
# ── Occupancy rate distribution ───────────────────────────────────────
print()
print("=" * 60)
print("OCCUPANCY RATE DISTRIBUTION (fct_daily_availability)")
print("=" * 60)
try:
rows = con.execute("""
SELECT
CASE
WHEN occupancy_rate IS NULL THEN 'NULL'
WHEN occupancy_rate < 0 THEN '< 0 (invalid)'
WHEN occupancy_rate > 1.5 THEN '> 1.5 (filtered)'
WHEN occupancy_rate <= 0.25 THEN '0 0.25'
WHEN occupancy_rate <= 0.50 THEN '0.25 0.50'
WHEN occupancy_rate <= 0.75 THEN '0.50 0.75'
ELSE '0.75 1.0+'
END AS bucket,
COUNT(*) AS cnt
FROM foundation.fct_daily_availability
GROUP BY 1
ORDER BY 1
""").fetchall()
for bucket, cnt in rows:
print(f" {bucket:25s} {cnt:>10,}")
except Exception as e:
print(f" ERROR: {e}")
# ── dim_venue_capacity join coverage ──────────────────────────────────
print()
print("=" * 60)
print("JOIN COVERAGE: fct_availability_slot → dim_venue_capacity")
print("=" * 60)
try:
row = con.execute("""
SELECT
COUNT(DISTINCT a.tenant_id) AS slot_tenants,
COUNT(DISTINCT c.tenant_id) AS capacity_tenants,
COUNT(DISTINCT a.tenant_id) - COUNT(DISTINCT c.tenant_id) AS missing_capacity
FROM foundation.fct_availability_slot a
LEFT JOIN foundation.dim_venue_capacity c ON a.tenant_id = c.tenant_id
""").fetchone()
if row:
slot_t, cap_t, missing = row
print(f" Tenants in fct_availability_slot: {slot_t}")
print(f" Tenants with capacity match: {cap_t}")
print(f" Tenants missing capacity: {missing}")
if missing and missing > 0:
print(f" *** {missing} tenants dropped by INNER JOIN to dim_venue_capacity ***")
except Exception as e:
print(f" ERROR: {e}")
con.close()
print()
print("Done.")
if __name__ == "__main__":
main()