diff --git a/CHANGELOG.md b/CHANGELOG.md index bd8e1f1..c62254f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] +### Fixed +- **Pipeline diagnostic script** (`scripts/check_pipeline.py`) — handle DuckDB catalog naming quirk where `lakehouse.duckdb` uses catalog `lakehouse` instead of `local`, causing SQLMesh logical views to break. Script now auto-detects the catalog via `USE`, and falls back to querying physical tables (`sqlmesh__.__`) when views fail. + ### Added - **Pipeline diagnostic script** (`scripts/check_pipeline.py`) — read-only script that reports row counts at every layer of the pricing pipeline (staging → foundation → serving), date range analysis, HAVING filter impact, and join coverage. Run on prod to diagnose empty serving tables. - **Extraction card descriptions** — each workflow card on the admin pipeline page now shows a one-line description explaining what the data source is (e.g. "EU geographic boundaries (NUTS2 polygons) from Eurostat GISCO"). Descriptions defined in `workflows.toml`. diff --git a/scripts/check_pipeline.py b/scripts/check_pipeline.py index eb39b4e..e291491 100644 --- a/scripts/check_pipeline.py +++ b/scripts/check_pipeline.py @@ -8,6 +8,10 @@ Or locally: DUCKDB_PATH=data/lakehouse.duckdb uv run python scripts/check_pipeline.py Read-only — never writes to the database. + +Handles the DuckDB catalog naming quirk: when the file is named lakehouse.duckdb, +the catalog is "lakehouse" not "local". SQLMesh views may reference the wrong catalog, +so we fall back to querying physical tables (sqlmesh__.
__). """ import os @@ -27,6 +31,86 @@ PIPELINE_TABLES = [ ] +def _use_catalog(con): + """Detect and USE the database catalog so schema-qualified queries work.""" + catalogs = [ + row[0] + for row in con.execute( + "SELECT catalog_name FROM information_schema.schemata" + ).fetchall() + ] + # Pick the non-system catalog (not 'system', 'temp', 'memory') + user_catalogs = [c for c in set(catalogs) if c not in ("system", "temp", "memory")] + if user_catalogs: + catalog = user_catalogs[0] + con.execute(f"USE {catalog}") + return catalog + return None + + +def _find_physical_table(con, schema, table): + """Find the SQLMesh physical table name for a logical table. + + SQLMesh stores physical tables as: + sqlmesh__.__
__ + """ + sqlmesh_schema = f"sqlmesh__{schema}" + try: + rows = con.execute( + "SELECT table_schema, table_name " + "FROM information_schema.tables " + f"WHERE table_schema = '{sqlmesh_schema}' " + f"AND table_name LIKE '{schema}__{table}%' " + "ORDER BY table_name " + "LIMIT 1" + ).fetchall() + if rows: + return f"{rows[0][0]}.{rows[0][1]}" + except Exception: + pass + return None + + +def _query_table(con, schema, table): + """Try logical view first, fall back to physical table. Returns (fqn, count) or (fqn, error_str).""" + logical = f"{schema}.{table}" + try: + (count,) = con.execute(f"SELECT COUNT(*) FROM {logical}").fetchone() + return logical, count + except Exception: + pass + + physical = _find_physical_table(con, schema, table) + if physical: + try: + (count,) = con.execute(f"SELECT COUNT(*) FROM {physical}").fetchone() + return f"{physical} (physical)", count + except Exception as e: + return f"{physical} (physical)", f"ERROR: {e}" + + return logical, "ERROR: view broken, no physical table found" + + +def _query_sql(con, sql, schema_tables): + """Execute SQL, falling back to rewritten SQL using physical table names if views fail. + + schema_tables: list of (schema, table) tuples used in the SQL, in order of appearance. + The SQL must use {schema}.{table} format for these references. + """ + try: + return con.execute(sql) + except Exception: + # Rewrite SQL to use physical table names + rewritten = sql + for schema, table in schema_tables: + physical = _find_physical_table(con, schema, table) + if physical: + rewritten = rewritten.replace(f"{schema}.{table}", physical) + else: + raise + return con.execute(rewritten) + + def main(): if not os.path.exists(DUCKDB_PATH): print(f"ERROR: {DUCKDB_PATH} not found") @@ -36,6 +120,10 @@ def main(): print(f"Database: {DUCKDB_PATH}") print(f"DuckDB version: {con.execute('SELECT version()').fetchone()[0]}") + + catalog = _use_catalog(con) + if catalog: + print(f"Catalog: {catalog}") print() # ── Row counts at each layer ────────────────────────────────────────── @@ -44,28 +132,11 @@ def main(): print("=" * 60) for schema, table in PIPELINE_TABLES: - # SQLMesh may use __ suffixed physical tables - # Try the logical name first, then scan for physical tables - candidates = [f"{schema}.{table}"] - try: - phys = con.execute( - f"SELECT table_schema || '.' || table_name " - f"FROM information_schema.tables " - f"WHERE table_name LIKE '{table}%' " - f"ORDER BY table_name" - ).fetchall() - for (name,) in phys: - if name not in candidates: - candidates.append(name) - except Exception: - pass - - for fqn in candidates: - try: - (count,) = con.execute(f"SELECT COUNT(*) FROM {fqn}").fetchone() - print(f" {fqn:50s} {count:>10,} rows") - except Exception as e: - print(f" {fqn:50s} ERROR: {e}") + fqn, result = _query_table(con, schema, table) + if isinstance(result, int): + print(f" {fqn:55s} {result:>10,} rows") + else: + print(f" {fqn:55s} {result}") # ── Date range in fct_daily_availability ────────────────────────────── print() @@ -74,7 +145,9 @@ def main(): print("=" * 60) try: - row = con.execute(""" + row = _query_sql( + con, + """ SELECT MIN(snapshot_date) AS min_date, MAX(snapshot_date) AS max_date, @@ -82,7 +155,9 @@ def main(): CURRENT_DATE AS today, CURRENT_DATE - INTERVAL '30 days' AS window_start FROM foundation.fct_daily_availability - """).fetchone() + """, + [("foundation", "fct_daily_availability")], + ).fetchone() if row: min_date, max_date, days, today, window_start = row print(f" Min snapshot_date: {min_date}") @@ -104,7 +179,9 @@ def main(): print("=" * 60) try: - row = con.execute(""" + row = _query_sql( + con, + """ WITH venue_stats AS ( SELECT da.tenant_id, @@ -124,7 +201,9 @@ def main(): MAX(days_observed) AS max_days, MIN(days_observed) AS min_days FROM venue_stats - """).fetchone() + """, + [("foundation", "fct_daily_availability")], + ).fetchone() if row: total, passing, failing, max_d, min_d = row print(f" Venues in 30-day window: {total}") @@ -145,7 +224,9 @@ def main(): print("=" * 60) try: - rows = con.execute(""" + rows = _query_sql( + con, + """ SELECT CASE WHEN occupancy_rate IS NULL THEN 'NULL' @@ -160,7 +241,9 @@ def main(): FROM foundation.fct_daily_availability GROUP BY 1 ORDER BY 1 - """).fetchall() + """, + [("foundation", "fct_daily_availability")], + ).fetchall() for bucket, cnt in rows: print(f" {bucket:25s} {cnt:>10,}") except Exception as e: @@ -173,14 +256,21 @@ def main(): print("=" * 60) try: - row = con.execute(""" + row = _query_sql( + con, + """ SELECT COUNT(DISTINCT a.tenant_id) AS slot_tenants, COUNT(DISTINCT c.tenant_id) AS capacity_tenants, COUNT(DISTINCT a.tenant_id) - COUNT(DISTINCT c.tenant_id) AS missing_capacity FROM foundation.fct_availability_slot a LEFT JOIN foundation.dim_venue_capacity c ON a.tenant_id = c.tenant_id - """).fetchone() + """, + [ + ("foundation", "fct_availability_slot"), + ("foundation", "dim_venue_capacity"), + ], + ).fetchone() if row: slot_t, cap_t, missing = row print(f" Tenants in fct_availability_slot: {slot_t}")