fix(pipeline): handle DuckDB catalog naming in diagnostic script
This commit is contained in:
@@ -6,6 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- **Pipeline diagnostic script** (`scripts/check_pipeline.py`) — handle DuckDB catalog naming quirk where `lakehouse.duckdb` uses catalog `lakehouse` instead of `local`, causing SQLMesh logical views to break. Script now auto-detects the catalog via `USE`, and falls back to querying physical tables (`sqlmesh__<schema>.<table>__<hash>`) when views fail.
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- **Pipeline diagnostic script** (`scripts/check_pipeline.py`) — read-only script that reports row counts at every layer of the pricing pipeline (staging → foundation → serving), date range analysis, HAVING filter impact, and join coverage. Run on prod to diagnose empty serving tables.
|
- **Pipeline diagnostic script** (`scripts/check_pipeline.py`) — read-only script that reports row counts at every layer of the pricing pipeline (staging → foundation → serving), date range analysis, HAVING filter impact, and join coverage. Run on prod to diagnose empty serving tables.
|
||||||
- **Extraction card descriptions** — each workflow card on the admin pipeline page now shows a one-line description explaining what the data source is (e.g. "EU geographic boundaries (NUTS2 polygons) from Eurostat GISCO"). Descriptions defined in `workflows.toml`.
|
- **Extraction card descriptions** — each workflow card on the admin pipeline page now shows a one-line description explaining what the data source is (e.g. "EU geographic boundaries (NUTS2 polygons) from Eurostat GISCO"). Descriptions defined in `workflows.toml`.
|
||||||
|
|||||||
@@ -8,6 +8,10 @@ Or locally:
|
|||||||
DUCKDB_PATH=data/lakehouse.duckdb uv run python scripts/check_pipeline.py
|
DUCKDB_PATH=data/lakehouse.duckdb uv run python scripts/check_pipeline.py
|
||||||
|
|
||||||
Read-only — never writes to the database.
|
Read-only — never writes to the database.
|
||||||
|
|
||||||
|
Handles the DuckDB catalog naming quirk: when the file is named lakehouse.duckdb,
|
||||||
|
the catalog is "lakehouse" not "local". SQLMesh views may reference the wrong catalog,
|
||||||
|
so we fall back to querying physical tables (sqlmesh__<schema>.<table>__<hash>).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
@@ -27,6 +31,86 @@ PIPELINE_TABLES = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _use_catalog(con):
|
||||||
|
"""Detect and USE the database catalog so schema-qualified queries work."""
|
||||||
|
catalogs = [
|
||||||
|
row[0]
|
||||||
|
for row in con.execute(
|
||||||
|
"SELECT catalog_name FROM information_schema.schemata"
|
||||||
|
).fetchall()
|
||||||
|
]
|
||||||
|
# Pick the non-system catalog (not 'system', 'temp', 'memory')
|
||||||
|
user_catalogs = [c for c in set(catalogs) if c not in ("system", "temp", "memory")]
|
||||||
|
if user_catalogs:
|
||||||
|
catalog = user_catalogs[0]
|
||||||
|
con.execute(f"USE {catalog}")
|
||||||
|
return catalog
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _find_physical_table(con, schema, table):
|
||||||
|
"""Find the SQLMesh physical table name for a logical table.
|
||||||
|
|
||||||
|
SQLMesh stores physical tables as:
|
||||||
|
sqlmesh__<schema>.<schema>__<table>__<hash>
|
||||||
|
"""
|
||||||
|
sqlmesh_schema = f"sqlmesh__{schema}"
|
||||||
|
try:
|
||||||
|
rows = con.execute(
|
||||||
|
"SELECT table_schema, table_name "
|
||||||
|
"FROM information_schema.tables "
|
||||||
|
f"WHERE table_schema = '{sqlmesh_schema}' "
|
||||||
|
f"AND table_name LIKE '{schema}__{table}%' "
|
||||||
|
"ORDER BY table_name "
|
||||||
|
"LIMIT 1"
|
||||||
|
).fetchall()
|
||||||
|
if rows:
|
||||||
|
return f"{rows[0][0]}.{rows[0][1]}"
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _query_table(con, schema, table):
|
||||||
|
"""Try logical view first, fall back to physical table. Returns (fqn, count) or (fqn, error_str)."""
|
||||||
|
logical = f"{schema}.{table}"
|
||||||
|
try:
|
||||||
|
(count,) = con.execute(f"SELECT COUNT(*) FROM {logical}").fetchone()
|
||||||
|
return logical, count
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
physical = _find_physical_table(con, schema, table)
|
||||||
|
if physical:
|
||||||
|
try:
|
||||||
|
(count,) = con.execute(f"SELECT COUNT(*) FROM {physical}").fetchone()
|
||||||
|
return f"{physical} (physical)", count
|
||||||
|
except Exception as e:
|
||||||
|
return f"{physical} (physical)", f"ERROR: {e}"
|
||||||
|
|
||||||
|
return logical, "ERROR: view broken, no physical table found"
|
||||||
|
|
||||||
|
|
||||||
|
def _query_sql(con, sql, schema_tables):
|
||||||
|
"""Execute SQL, falling back to rewritten SQL using physical table names if views fail.
|
||||||
|
|
||||||
|
schema_tables: list of (schema, table) tuples used in the SQL, in order of appearance.
|
||||||
|
The SQL must use {schema}.{table} format for these references.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return con.execute(sql)
|
||||||
|
except Exception:
|
||||||
|
# Rewrite SQL to use physical table names
|
||||||
|
rewritten = sql
|
||||||
|
for schema, table in schema_tables:
|
||||||
|
physical = _find_physical_table(con, schema, table)
|
||||||
|
if physical:
|
||||||
|
rewritten = rewritten.replace(f"{schema}.{table}", physical)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
return con.execute(rewritten)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
if not os.path.exists(DUCKDB_PATH):
|
if not os.path.exists(DUCKDB_PATH):
|
||||||
print(f"ERROR: {DUCKDB_PATH} not found")
|
print(f"ERROR: {DUCKDB_PATH} not found")
|
||||||
@@ -36,6 +120,10 @@ def main():
|
|||||||
|
|
||||||
print(f"Database: {DUCKDB_PATH}")
|
print(f"Database: {DUCKDB_PATH}")
|
||||||
print(f"DuckDB version: {con.execute('SELECT version()').fetchone()[0]}")
|
print(f"DuckDB version: {con.execute('SELECT version()').fetchone()[0]}")
|
||||||
|
|
||||||
|
catalog = _use_catalog(con)
|
||||||
|
if catalog:
|
||||||
|
print(f"Catalog: {catalog}")
|
||||||
print()
|
print()
|
||||||
|
|
||||||
# ── Row counts at each layer ──────────────────────────────────────────
|
# ── Row counts at each layer ──────────────────────────────────────────
|
||||||
@@ -44,28 +132,11 @@ def main():
|
|||||||
print("=" * 60)
|
print("=" * 60)
|
||||||
|
|
||||||
for schema, table in PIPELINE_TABLES:
|
for schema, table in PIPELINE_TABLES:
|
||||||
# SQLMesh may use __<env> suffixed physical tables
|
fqn, result = _query_table(con, schema, table)
|
||||||
# Try the logical name first, then scan for physical tables
|
if isinstance(result, int):
|
||||||
candidates = [f"{schema}.{table}"]
|
print(f" {fqn:55s} {result:>10,} rows")
|
||||||
try:
|
else:
|
||||||
phys = con.execute(
|
print(f" {fqn:55s} {result}")
|
||||||
f"SELECT table_schema || '.' || table_name "
|
|
||||||
f"FROM information_schema.tables "
|
|
||||||
f"WHERE table_name LIKE '{table}%' "
|
|
||||||
f"ORDER BY table_name"
|
|
||||||
).fetchall()
|
|
||||||
for (name,) in phys:
|
|
||||||
if name not in candidates:
|
|
||||||
candidates.append(name)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
for fqn in candidates:
|
|
||||||
try:
|
|
||||||
(count,) = con.execute(f"SELECT COUNT(*) FROM {fqn}").fetchone()
|
|
||||||
print(f" {fqn:50s} {count:>10,} rows")
|
|
||||||
except Exception as e:
|
|
||||||
print(f" {fqn:50s} ERROR: {e}")
|
|
||||||
|
|
||||||
# ── Date range in fct_daily_availability ──────────────────────────────
|
# ── Date range in fct_daily_availability ──────────────────────────────
|
||||||
print()
|
print()
|
||||||
@@ -74,7 +145,9 @@ def main():
|
|||||||
print("=" * 60)
|
print("=" * 60)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
row = con.execute("""
|
row = _query_sql(
|
||||||
|
con,
|
||||||
|
"""
|
||||||
SELECT
|
SELECT
|
||||||
MIN(snapshot_date) AS min_date,
|
MIN(snapshot_date) AS min_date,
|
||||||
MAX(snapshot_date) AS max_date,
|
MAX(snapshot_date) AS max_date,
|
||||||
@@ -82,7 +155,9 @@ def main():
|
|||||||
CURRENT_DATE AS today,
|
CURRENT_DATE AS today,
|
||||||
CURRENT_DATE - INTERVAL '30 days' AS window_start
|
CURRENT_DATE - INTERVAL '30 days' AS window_start
|
||||||
FROM foundation.fct_daily_availability
|
FROM foundation.fct_daily_availability
|
||||||
""").fetchone()
|
""",
|
||||||
|
[("foundation", "fct_daily_availability")],
|
||||||
|
).fetchone()
|
||||||
if row:
|
if row:
|
||||||
min_date, max_date, days, today, window_start = row
|
min_date, max_date, days, today, window_start = row
|
||||||
print(f" Min snapshot_date: {min_date}")
|
print(f" Min snapshot_date: {min_date}")
|
||||||
@@ -104,7 +179,9 @@ def main():
|
|||||||
print("=" * 60)
|
print("=" * 60)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
row = con.execute("""
|
row = _query_sql(
|
||||||
|
con,
|
||||||
|
"""
|
||||||
WITH venue_stats AS (
|
WITH venue_stats AS (
|
||||||
SELECT
|
SELECT
|
||||||
da.tenant_id,
|
da.tenant_id,
|
||||||
@@ -124,7 +201,9 @@ def main():
|
|||||||
MAX(days_observed) AS max_days,
|
MAX(days_observed) AS max_days,
|
||||||
MIN(days_observed) AS min_days
|
MIN(days_observed) AS min_days
|
||||||
FROM venue_stats
|
FROM venue_stats
|
||||||
""").fetchone()
|
""",
|
||||||
|
[("foundation", "fct_daily_availability")],
|
||||||
|
).fetchone()
|
||||||
if row:
|
if row:
|
||||||
total, passing, failing, max_d, min_d = row
|
total, passing, failing, max_d, min_d = row
|
||||||
print(f" Venues in 30-day window: {total}")
|
print(f" Venues in 30-day window: {total}")
|
||||||
@@ -145,7 +224,9 @@ def main():
|
|||||||
print("=" * 60)
|
print("=" * 60)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
rows = con.execute("""
|
rows = _query_sql(
|
||||||
|
con,
|
||||||
|
"""
|
||||||
SELECT
|
SELECT
|
||||||
CASE
|
CASE
|
||||||
WHEN occupancy_rate IS NULL THEN 'NULL'
|
WHEN occupancy_rate IS NULL THEN 'NULL'
|
||||||
@@ -160,7 +241,9 @@ def main():
|
|||||||
FROM foundation.fct_daily_availability
|
FROM foundation.fct_daily_availability
|
||||||
GROUP BY 1
|
GROUP BY 1
|
||||||
ORDER BY 1
|
ORDER BY 1
|
||||||
""").fetchall()
|
""",
|
||||||
|
[("foundation", "fct_daily_availability")],
|
||||||
|
).fetchall()
|
||||||
for bucket, cnt in rows:
|
for bucket, cnt in rows:
|
||||||
print(f" {bucket:25s} {cnt:>10,}")
|
print(f" {bucket:25s} {cnt:>10,}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -173,14 +256,21 @@ def main():
|
|||||||
print("=" * 60)
|
print("=" * 60)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
row = con.execute("""
|
row = _query_sql(
|
||||||
|
con,
|
||||||
|
"""
|
||||||
SELECT
|
SELECT
|
||||||
COUNT(DISTINCT a.tenant_id) AS slot_tenants,
|
COUNT(DISTINCT a.tenant_id) AS slot_tenants,
|
||||||
COUNT(DISTINCT c.tenant_id) AS capacity_tenants,
|
COUNT(DISTINCT c.tenant_id) AS capacity_tenants,
|
||||||
COUNT(DISTINCT a.tenant_id) - COUNT(DISTINCT c.tenant_id) AS missing_capacity
|
COUNT(DISTINCT a.tenant_id) - COUNT(DISTINCT c.tenant_id) AS missing_capacity
|
||||||
FROM foundation.fct_availability_slot a
|
FROM foundation.fct_availability_slot a
|
||||||
LEFT JOIN foundation.dim_venue_capacity c ON a.tenant_id = c.tenant_id
|
LEFT JOIN foundation.dim_venue_capacity c ON a.tenant_id = c.tenant_id
|
||||||
""").fetchone()
|
""",
|
||||||
|
[
|
||||||
|
("foundation", "fct_availability_slot"),
|
||||||
|
("foundation", "dim_venue_capacity"),
|
||||||
|
],
|
||||||
|
).fetchone()
|
||||||
if row:
|
if row:
|
||||||
slot_t, cap_t, missing = row
|
slot_t, cap_t, missing = row
|
||||||
print(f" Tenants in fct_availability_slot: {slot_t}")
|
print(f" Tenants in fct_availability_slot: {slot_t}")
|
||||||
|
|||||||
Reference in New Issue
Block a user