Files
padelnomics/scripts/check_pipeline.py
Deeman c3f15535b8 fix(pipeline): handle DuckDB catalog naming in diagnostic script
The lakehouse.duckdb file uses catalog "lakehouse" not "local", causing
SQLMesh logical views to break. Script now auto-detects the catalog via
USE and falls back to physical tables when views fail.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-05 17:06:44 +01:00

291 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Diagnostic script: check row counts at every layer of the pricing pipeline.
Run on prod via SSH:
DUCKDB_PATH=/opt/padelnomics/data/lakehouse.duckdb uv run python scripts/check_pipeline.py
Or locally:
DUCKDB_PATH=data/lakehouse.duckdb uv run python scripts/check_pipeline.py
Read-only — never writes to the database.
Handles the DuckDB catalog naming quirk: when the file is named lakehouse.duckdb,
the catalog is "lakehouse" not "local". SQLMesh views may reference the wrong catalog,
so we fall back to querying physical tables (sqlmesh__<schema>.<table>__<hash>).
"""
import os
import sys
import duckdb
DUCKDB_PATH = os.environ.get("DUCKDB_PATH", "data/lakehouse.duckdb")
PIPELINE_TABLES = [
("staging", "stg_playtomic_availability"),
("foundation", "fct_availability_slot"),
("foundation", "dim_venue_capacity"),
("foundation", "fct_daily_availability"),
("serving", "venue_pricing_benchmarks"),
("serving", "pseo_city_pricing"),
]
def _use_catalog(con):
"""Detect and USE the database catalog so schema-qualified queries work."""
catalogs = [
row[0]
for row in con.execute(
"SELECT catalog_name FROM information_schema.schemata"
).fetchall()
]
# Pick the non-system catalog (not 'system', 'temp', 'memory')
user_catalogs = [c for c in set(catalogs) if c not in ("system", "temp", "memory")]
if user_catalogs:
catalog = user_catalogs[0]
con.execute(f"USE {catalog}")
return catalog
return None
def _find_physical_table(con, schema, table):
"""Find the SQLMesh physical table name for a logical table.
SQLMesh stores physical tables as:
sqlmesh__<schema>.<schema>__<table>__<hash>
"""
sqlmesh_schema = f"sqlmesh__{schema}"
try:
rows = con.execute(
"SELECT table_schema, table_name "
"FROM information_schema.tables "
f"WHERE table_schema = '{sqlmesh_schema}' "
f"AND table_name LIKE '{schema}__{table}%' "
"ORDER BY table_name "
"LIMIT 1"
).fetchall()
if rows:
return f"{rows[0][0]}.{rows[0][1]}"
except Exception:
pass
return None
def _query_table(con, schema, table):
"""Try logical view first, fall back to physical table. Returns (fqn, count) or (fqn, error_str)."""
logical = f"{schema}.{table}"
try:
(count,) = con.execute(f"SELECT COUNT(*) FROM {logical}").fetchone()
return logical, count
except Exception:
pass
physical = _find_physical_table(con, schema, table)
if physical:
try:
(count,) = con.execute(f"SELECT COUNT(*) FROM {physical}").fetchone()
return f"{physical} (physical)", count
except Exception as e:
return f"{physical} (physical)", f"ERROR: {e}"
return logical, "ERROR: view broken, no physical table found"
def _query_sql(con, sql, schema_tables):
"""Execute SQL, falling back to rewritten SQL using physical table names if views fail.
schema_tables: list of (schema, table) tuples used in the SQL, in order of appearance.
The SQL must use {schema}.{table} format for these references.
"""
try:
return con.execute(sql)
except Exception:
# Rewrite SQL to use physical table names
rewritten = sql
for schema, table in schema_tables:
physical = _find_physical_table(con, schema, table)
if physical:
rewritten = rewritten.replace(f"{schema}.{table}", physical)
else:
raise
return con.execute(rewritten)
def main():
if not os.path.exists(DUCKDB_PATH):
print(f"ERROR: {DUCKDB_PATH} not found")
sys.exit(1)
con = duckdb.connect(DUCKDB_PATH, read_only=True)
print(f"Database: {DUCKDB_PATH}")
print(f"DuckDB version: {con.execute('SELECT version()').fetchone()[0]}")
catalog = _use_catalog(con)
if catalog:
print(f"Catalog: {catalog}")
print()
# ── Row counts at each layer ──────────────────────────────────────────
print("=" * 60)
print("PIPELINE ROW COUNTS")
print("=" * 60)
for schema, table in PIPELINE_TABLES:
fqn, result = _query_table(con, schema, table)
if isinstance(result, int):
print(f" {fqn:55s} {result:>10,} rows")
else:
print(f" {fqn:55s} {result}")
# ── Date range in fct_daily_availability ──────────────────────────────
print()
print("=" * 60)
print("DATE RANGE: fct_daily_availability")
print("=" * 60)
try:
row = _query_sql(
con,
"""
SELECT
MIN(snapshot_date) AS min_date,
MAX(snapshot_date) AS max_date,
COUNT(DISTINCT snapshot_date) AS distinct_days,
CURRENT_DATE AS today,
CURRENT_DATE - INTERVAL '30 days' AS window_start
FROM foundation.fct_daily_availability
""",
[("foundation", "fct_daily_availability")],
).fetchone()
if row:
min_date, max_date, days, today, window_start = row
print(f" Min snapshot_date: {min_date}")
print(f" Max snapshot_date: {max_date}")
print(f" Distinct days: {days}")
print(f" Today: {today}")
print(f" 30-day window start: {window_start}")
if max_date and str(max_date) < str(window_start):
print()
print(" *** ALL DATA IS OUTSIDE THE 30-DAY WINDOW ***")
print(" This is why venue_pricing_benchmarks is empty.")
except Exception as e:
print(f" ERROR: {e}")
# ── HAVING filter impact in venue_pricing_benchmarks ──────────────────
print()
print("=" * 60)
print("HAVING FILTER IMPACT (venue_pricing_benchmarks)")
print("=" * 60)
try:
row = _query_sql(
con,
"""
WITH venue_stats AS (
SELECT
da.tenant_id,
da.country_code,
da.city,
COUNT(DISTINCT da.snapshot_date) AS days_observed
FROM foundation.fct_daily_availability da
WHERE TRY_CAST(da.snapshot_date AS DATE) >= CURRENT_DATE - INTERVAL '30 days'
AND da.occupancy_rate IS NOT NULL
AND da.occupancy_rate BETWEEN 0 AND 1.5
GROUP BY da.tenant_id, da.country_code, da.city
)
SELECT
COUNT(*) AS total_venues,
COUNT(*) FILTER (WHERE days_observed >= 3) AS venues_passing_having,
COUNT(*) FILTER (WHERE days_observed < 3) AS venues_failing_having,
MAX(days_observed) AS max_days,
MIN(days_observed) AS min_days
FROM venue_stats
""",
[("foundation", "fct_daily_availability")],
).fetchone()
if row:
total, passing, failing, max_d, min_d = row
print(f" Venues in 30-day window: {total}")
print(f" Venues with >= 3 days (PASSING): {passing}")
print(f" Venues with < 3 days (FILTERED): {failing}")
print(f" Max days observed: {max_d}")
print(f" Min days observed: {min_d}")
if total == 0:
print()
print(" *** NO VENUES IN 30-DAY WINDOW — check fct_daily_availability dates ***")
except Exception as e:
print(f" ERROR: {e}")
# ── Occupancy rate distribution ───────────────────────────────────────
print()
print("=" * 60)
print("OCCUPANCY RATE DISTRIBUTION (fct_daily_availability)")
print("=" * 60)
try:
rows = _query_sql(
con,
"""
SELECT
CASE
WHEN occupancy_rate IS NULL THEN 'NULL'
WHEN occupancy_rate < 0 THEN '< 0 (invalid)'
WHEN occupancy_rate > 1.5 THEN '> 1.5 (filtered)'
WHEN occupancy_rate <= 0.25 THEN '0 0.25'
WHEN occupancy_rate <= 0.50 THEN '0.25 0.50'
WHEN occupancy_rate <= 0.75 THEN '0.50 0.75'
ELSE '0.75 1.0+'
END AS bucket,
COUNT(*) AS cnt
FROM foundation.fct_daily_availability
GROUP BY 1
ORDER BY 1
""",
[("foundation", "fct_daily_availability")],
).fetchall()
for bucket, cnt in rows:
print(f" {bucket:25s} {cnt:>10,}")
except Exception as e:
print(f" ERROR: {e}")
# ── dim_venue_capacity join coverage ──────────────────────────────────
print()
print("=" * 60)
print("JOIN COVERAGE: fct_availability_slot → dim_venue_capacity")
print("=" * 60)
try:
row = _query_sql(
con,
"""
SELECT
COUNT(DISTINCT a.tenant_id) AS slot_tenants,
COUNT(DISTINCT c.tenant_id) AS capacity_tenants,
COUNT(DISTINCT a.tenant_id) - COUNT(DISTINCT c.tenant_id) AS missing_capacity
FROM foundation.fct_availability_slot a
LEFT JOIN foundation.dim_venue_capacity c ON a.tenant_id = c.tenant_id
""",
[
("foundation", "fct_availability_slot"),
("foundation", "dim_venue_capacity"),
],
).fetchone()
if row:
slot_t, cap_t, missing = row
print(f" Tenants in fct_availability_slot: {slot_t}")
print(f" Tenants with capacity match: {cap_t}")
print(f" Tenants missing capacity: {missing}")
if missing and missing > 0:
print(f" *** {missing} tenants dropped by INNER JOIN to dim_venue_capacity ***")
except Exception as e:
print(f" ERROR: {e}")
con.close()
print()
print("Done.")
if __name__ == "__main__":
main()