""" Diagnostic script: check row counts at every layer of the pricing pipeline. Run on prod via SSH: DUCKDB_PATH=/opt/padelnomics/data/lakehouse.duckdb uv run python scripts/check_pipeline.py Or locally: DUCKDB_PATH=data/lakehouse.duckdb uv run python scripts/check_pipeline.py Read-only — never writes to the database. Handles the DuckDB catalog naming quirk: when the file is named lakehouse.duckdb, the catalog is "lakehouse" not "local". SQLMesh views may reference the wrong catalog, so we fall back to querying physical tables (sqlmesh__.__). """ import os import sys import duckdb DUCKDB_PATH = os.environ.get("DUCKDB_PATH", "data/lakehouse.duckdb") PIPELINE_TABLES = [ ("staging", "stg_playtomic_availability"), ("foundation", "fct_availability_slot"), ("foundation", "dim_venue_capacity"), ("foundation", "fct_daily_availability"), ("serving", "venue_pricing_benchmarks"), ("serving", "pseo_city_pricing"), ] def _use_catalog(con): """Detect and USE the database catalog so schema-qualified queries work.""" catalogs = [ row[0] for row in con.execute( "SELECT catalog_name FROM information_schema.schemata" ).fetchall() ] # Pick the non-system catalog (not 'system', 'temp', 'memory') user_catalogs = [c for c in set(catalogs) if c not in ("system", "temp", "memory")] if user_catalogs: catalog = user_catalogs[0] con.execute(f"USE {catalog}") return catalog return None def _find_physical_table(con, schema, table): """Find the SQLMesh physical table name for a logical table. SQLMesh stores physical tables as: sqlmesh__.__
__ """ sqlmesh_schema = f"sqlmesh__{schema}" try: rows = con.execute( "SELECT table_schema, table_name " "FROM information_schema.tables " f"WHERE table_schema = '{sqlmesh_schema}' " f"AND table_name LIKE '{schema}__{table}%' " "ORDER BY table_name " "LIMIT 1" ).fetchall() if rows: return f"{rows[0][0]}.{rows[0][1]}" except Exception: pass return None def _query_table(con, schema, table): """Try logical view first, fall back to physical table. Returns (fqn, count) or (fqn, error_str).""" logical = f"{schema}.{table}" try: (count,) = con.execute(f"SELECT COUNT(*) FROM {logical}").fetchone() return logical, count except Exception: pass physical = _find_physical_table(con, schema, table) if physical: try: (count,) = con.execute(f"SELECT COUNT(*) FROM {physical}").fetchone() return f"{physical} (physical)", count except Exception as e: return f"{physical} (physical)", f"ERROR: {e}" return logical, "ERROR: view broken, no physical table found" def _query_sql(con, sql, schema_tables): """Execute SQL, falling back to rewritten SQL using physical table names if views fail. schema_tables: list of (schema, table) tuples used in the SQL, in order of appearance. The SQL must use {schema}.{table} format for these references. """ try: return con.execute(sql) except Exception: # Rewrite SQL to use physical table names rewritten = sql for schema, table in schema_tables: physical = _find_physical_table(con, schema, table) if physical: rewritten = rewritten.replace(f"{schema}.{table}", physical) else: raise return con.execute(rewritten) def main(): if not os.path.exists(DUCKDB_PATH): print(f"ERROR: {DUCKDB_PATH} not found") sys.exit(1) con = duckdb.connect(DUCKDB_PATH, read_only=True) print(f"Database: {DUCKDB_PATH}") print(f"DuckDB version: {con.execute('SELECT version()').fetchone()[0]}") catalog = _use_catalog(con) if catalog: print(f"Catalog: {catalog}") print() # ── Row counts at each layer ────────────────────────────────────────── print("=" * 60) print("PIPELINE ROW COUNTS") print("=" * 60) for schema, table in PIPELINE_TABLES: fqn, result = _query_table(con, schema, table) if isinstance(result, int): print(f" {fqn:55s} {result:>10,} rows") else: print(f" {fqn:55s} {result}") # ── Date range in fct_daily_availability ────────────────────────────── print() print("=" * 60) print("DATE RANGE: fct_daily_availability") print("=" * 60) try: row = _query_sql( con, """ SELECT MIN(snapshot_date) AS min_date, MAX(snapshot_date) AS max_date, COUNT(DISTINCT snapshot_date) AS distinct_days, CURRENT_DATE AS today, CURRENT_DATE - INTERVAL '30 days' AS window_start FROM foundation.fct_daily_availability """, [("foundation", "fct_daily_availability")], ).fetchone() if row: min_date, max_date, days, today, window_start = row print(f" Min snapshot_date: {min_date}") print(f" Max snapshot_date: {max_date}") print(f" Distinct days: {days}") print(f" Today: {today}") print(f" 30-day window start: {window_start}") if max_date and str(max_date) < str(window_start): print() print(" *** ALL DATA IS OUTSIDE THE 30-DAY WINDOW ***") print(" This is why venue_pricing_benchmarks is empty.") except Exception as e: print(f" ERROR: {e}") # ── HAVING filter impact in venue_pricing_benchmarks ────────────────── print() print("=" * 60) print("HAVING FILTER IMPACT (venue_pricing_benchmarks)") print("=" * 60) try: row = _query_sql( con, """ WITH venue_stats AS ( SELECT da.tenant_id, da.country_code, da.city, COUNT(DISTINCT da.snapshot_date) AS days_observed FROM foundation.fct_daily_availability da WHERE TRY_CAST(da.snapshot_date AS DATE) >= CURRENT_DATE - INTERVAL '30 days' AND da.occupancy_rate IS NOT NULL AND da.occupancy_rate BETWEEN 0 AND 1.5 GROUP BY da.tenant_id, da.country_code, da.city ) SELECT COUNT(*) AS total_venues, COUNT(*) FILTER (WHERE days_observed >= 3) AS venues_passing_having, COUNT(*) FILTER (WHERE days_observed < 3) AS venues_failing_having, MAX(days_observed) AS max_days, MIN(days_observed) AS min_days FROM venue_stats """, [("foundation", "fct_daily_availability")], ).fetchone() if row: total, passing, failing, max_d, min_d = row print(f" Venues in 30-day window: {total}") print(f" Venues with >= 3 days (PASSING): {passing}") print(f" Venues with < 3 days (FILTERED): {failing}") print(f" Max days observed: {max_d}") print(f" Min days observed: {min_d}") if total == 0: print() print(" *** NO VENUES IN 30-DAY WINDOW — check fct_daily_availability dates ***") except Exception as e: print(f" ERROR: {e}") # ── Occupancy rate distribution ─────────────────────────────────────── print() print("=" * 60) print("OCCUPANCY RATE DISTRIBUTION (fct_daily_availability)") print("=" * 60) try: rows = _query_sql( con, """ SELECT CASE WHEN occupancy_rate IS NULL THEN 'NULL' WHEN occupancy_rate < 0 THEN '< 0 (invalid)' WHEN occupancy_rate > 1.5 THEN '> 1.5 (filtered)' WHEN occupancy_rate <= 0.25 THEN '0 – 0.25' WHEN occupancy_rate <= 0.50 THEN '0.25 – 0.50' WHEN occupancy_rate <= 0.75 THEN '0.50 – 0.75' ELSE '0.75 – 1.0+' END AS bucket, COUNT(*) AS cnt FROM foundation.fct_daily_availability GROUP BY 1 ORDER BY 1 """, [("foundation", "fct_daily_availability")], ).fetchall() for bucket, cnt in rows: print(f" {bucket:25s} {cnt:>10,}") except Exception as e: print(f" ERROR: {e}") # ── dim_venue_capacity join coverage ────────────────────────────────── print() print("=" * 60) print("JOIN COVERAGE: fct_availability_slot → dim_venue_capacity") print("=" * 60) try: row = _query_sql( con, """ SELECT COUNT(DISTINCT a.tenant_id) AS slot_tenants, COUNT(DISTINCT c.tenant_id) AS capacity_tenants, COUNT(DISTINCT a.tenant_id) - COUNT(DISTINCT c.tenant_id) AS missing_capacity FROM foundation.fct_availability_slot a LEFT JOIN foundation.dim_venue_capacity c ON a.tenant_id = c.tenant_id """, [ ("foundation", "fct_availability_slot"), ("foundation", "dim_venue_capacity"), ], ).fetchone() if row: slot_t, cap_t, missing = row print(f" Tenants in fct_availability_slot: {slot_t}") print(f" Tenants with capacity match: {cap_t}") print(f" Tenants missing capacity: {missing}") if missing and missing > 0: print(f" *** {missing} tenants dropped by INNER JOIN to dim_venue_capacity ***") except Exception as e: print(f" ERROR: {e}") con.close() print() print("Done.") if __name__ == "__main__": main()