""" Diagnostic script: check row counts at every layer of the pricing pipeline. Run on prod via SSH: DUCKDB_PATH=/opt/padelnomics/data/lakehouse.duckdb uv run python scripts/check_pipeline.py Or locally: DUCKDB_PATH=data/lakehouse.duckdb uv run python scripts/check_pipeline.py Read-only — never writes to the database. """ import os import sys import duckdb DUCKDB_PATH = os.environ.get("DUCKDB_PATH", "data/lakehouse.duckdb") PIPELINE_TABLES = [ ("staging", "stg_playtomic_availability"), ("foundation", "fct_availability_slot"), ("foundation", "dim_venue_capacity"), ("foundation", "fct_daily_availability"), ("serving", "venue_pricing_benchmarks"), ("serving", "pseo_city_pricing"), ] def main(): if not os.path.exists(DUCKDB_PATH): print(f"ERROR: {DUCKDB_PATH} not found") sys.exit(1) con = duckdb.connect(DUCKDB_PATH, read_only=True) print(f"Database: {DUCKDB_PATH}") print(f"DuckDB version: {con.execute('SELECT version()').fetchone()[0]}") print() # ── Row counts at each layer ────────────────────────────────────────── print("=" * 60) print("PIPELINE ROW COUNTS") print("=" * 60) for schema, table in PIPELINE_TABLES: # SQLMesh may use __ suffixed physical tables # Try the logical name first, then scan for physical tables candidates = [f"{schema}.{table}"] try: phys = con.execute( f"SELECT table_schema || '.' || table_name " f"FROM information_schema.tables " f"WHERE table_name LIKE '{table}%' " f"ORDER BY table_name" ).fetchall() for (name,) in phys: if name not in candidates: candidates.append(name) except Exception: pass for fqn in candidates: try: (count,) = con.execute(f"SELECT COUNT(*) FROM {fqn}").fetchone() print(f" {fqn:50s} {count:>10,} rows") except Exception as e: print(f" {fqn:50s} ERROR: {e}") # ── Date range in fct_daily_availability ────────────────────────────── print() print("=" * 60) print("DATE RANGE: fct_daily_availability") print("=" * 60) try: row = con.execute(""" SELECT MIN(snapshot_date) AS min_date, MAX(snapshot_date) AS max_date, COUNT(DISTINCT snapshot_date) AS distinct_days, CURRENT_DATE AS today, CURRENT_DATE - INTERVAL '30 days' AS window_start FROM foundation.fct_daily_availability """).fetchone() if row: min_date, max_date, days, today, window_start = row print(f" Min snapshot_date: {min_date}") print(f" Max snapshot_date: {max_date}") print(f" Distinct days: {days}") print(f" Today: {today}") print(f" 30-day window start: {window_start}") if max_date and str(max_date) < str(window_start): print() print(" *** ALL DATA IS OUTSIDE THE 30-DAY WINDOW ***") print(" This is why venue_pricing_benchmarks is empty.") except Exception as e: print(f" ERROR: {e}") # ── HAVING filter impact in venue_pricing_benchmarks ────────────────── print() print("=" * 60) print("HAVING FILTER IMPACT (venue_pricing_benchmarks)") print("=" * 60) try: row = con.execute(""" WITH venue_stats AS ( SELECT da.tenant_id, da.country_code, da.city, COUNT(DISTINCT da.snapshot_date) AS days_observed FROM foundation.fct_daily_availability da WHERE TRY_CAST(da.snapshot_date AS DATE) >= CURRENT_DATE - INTERVAL '30 days' AND da.occupancy_rate IS NOT NULL AND da.occupancy_rate BETWEEN 0 AND 1.5 GROUP BY da.tenant_id, da.country_code, da.city ) SELECT COUNT(*) AS total_venues, COUNT(*) FILTER (WHERE days_observed >= 3) AS venues_passing_having, COUNT(*) FILTER (WHERE days_observed < 3) AS venues_failing_having, MAX(days_observed) AS max_days, MIN(days_observed) AS min_days FROM venue_stats """).fetchone() if row: total, passing, failing, max_d, min_d = row print(f" Venues in 30-day window: {total}") print(f" Venues with >= 3 days (PASSING): {passing}") print(f" Venues with < 3 days (FILTERED): {failing}") print(f" Max days observed: {max_d}") print(f" Min days observed: {min_d}") if total == 0: print() print(" *** NO VENUES IN 30-DAY WINDOW — check fct_daily_availability dates ***") except Exception as e: print(f" ERROR: {e}") # ── Occupancy rate distribution ─────────────────────────────────────── print() print("=" * 60) print("OCCUPANCY RATE DISTRIBUTION (fct_daily_availability)") print("=" * 60) try: rows = con.execute(""" SELECT CASE WHEN occupancy_rate IS NULL THEN 'NULL' WHEN occupancy_rate < 0 THEN '< 0 (invalid)' WHEN occupancy_rate > 1.5 THEN '> 1.5 (filtered)' WHEN occupancy_rate <= 0.25 THEN '0 – 0.25' WHEN occupancy_rate <= 0.50 THEN '0.25 – 0.50' WHEN occupancy_rate <= 0.75 THEN '0.50 – 0.75' ELSE '0.75 – 1.0+' END AS bucket, COUNT(*) AS cnt FROM foundation.fct_daily_availability GROUP BY 1 ORDER BY 1 """).fetchall() for bucket, cnt in rows: print(f" {bucket:25s} {cnt:>10,}") except Exception as e: print(f" ERROR: {e}") # ── dim_venue_capacity join coverage ────────────────────────────────── print() print("=" * 60) print("JOIN COVERAGE: fct_availability_slot → dim_venue_capacity") print("=" * 60) try: row = con.execute(""" SELECT COUNT(DISTINCT a.tenant_id) AS slot_tenants, COUNT(DISTINCT c.tenant_id) AS capacity_tenants, COUNT(DISTINCT a.tenant_id) - COUNT(DISTINCT c.tenant_id) AS missing_capacity FROM foundation.fct_availability_slot a LEFT JOIN foundation.dim_venue_capacity c ON a.tenant_id = c.tenant_id """).fetchone() if row: slot_t, cap_t, missing = row print(f" Tenants in fct_availability_slot: {slot_t}") print(f" Tenants with capacity match: {cap_t}") print(f" Tenants missing capacity: {missing}") if missing and missing > 0: print(f" *** {missing} tenants dropped by INNER JOIN to dim_venue_capacity ***") except Exception as e: print(f" ERROR: {e}") con.close() print() print("Done.") if __name__ == "__main__": main()