fix(pipeline): eurostat filter bugs + supervisor uses sqlmesh plan
This commit is contained in:
@@ -8,6 +8,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
|
||||
|
||||
### Fixed
|
||||
- **Pipeline diagnostic script** (`scripts/check_pipeline.py`) — handle DuckDB catalog naming quirk where `lakehouse.duckdb` uses catalog `lakehouse` instead of `local`, causing SQLMesh logical views to break. Script now auto-detects the catalog via `USE`, and falls back to querying physical tables (`sqlmesh__<schema>.<table>__<hash>`) when views fail.
|
||||
- **Eurostat gas prices extractor** — `nrg_pc_203` filter missing `unit` dimension (API returns both KWH and GJ_GCV); now filters to `KWH`.
|
||||
- **Eurostat labour costs extractor** — `lc_lci_lev` used non-existent `currency` filter dimension; corrected to `unit: EUR`.
|
||||
- **Supervisor transform step** — changed `sqlmesh run` to `sqlmesh plan prod --auto-apply` so new/modified models are detected and applied automatically.
|
||||
|
||||
### Added
|
||||
- **Pipeline diagnostic script** (`scripts/check_pipeline.py`) — read-only script that reports row counts at every layer of the pricing pipeline (staging → foundation → serving), date range analysis, HAVING filter impact, and join coverage. Run on prod to diagnose empty serving tables.
|
||||
|
||||
@@ -63,15 +63,15 @@ DATASETS: dict[str, dict] = {
|
||||
"time_dim": "time",
|
||||
},
|
||||
"nrg_pc_203": {
|
||||
# Gas prices for non-household consumers, EUR/GJ, excl. taxes
|
||||
"filters": {"freq": "S", "nrg_cons": "GJ1000-9999", "currency": "EUR", "tax": "I_TAX"},
|
||||
# Gas prices for non-household consumers, EUR/kWh, excl. taxes
|
||||
"filters": {"freq": "S", "nrg_cons": "GJ1000-9999", "unit": "KWH", "currency": "EUR", "tax": "I_TAX"},
|
||||
"geo_dim": "geo",
|
||||
"time_dim": "time",
|
||||
},
|
||||
"lc_lci_lev": {
|
||||
# Labour cost levels EUR/hour — NACE N (administrative/support services)
|
||||
# Stored in dim_countries for future staffed-scenario calculations.
|
||||
"filters": {"lcstruct": "D1_D2_A_HW", "nace_r2": "N", "currency": "EUR"},
|
||||
"filters": {"lcstruct": "D1_D2_A_HW", "nace_r2": "N", "unit": "EUR"},
|
||||
"geo_dim": "geo",
|
||||
"time_dim": "time",
|
||||
},
|
||||
|
||||
@@ -33,10 +33,10 @@ do
|
||||
DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \
|
||||
uv run --package padelnomics_extract extract
|
||||
|
||||
# Transform
|
||||
# Transform — plan detects new/changed models; run only executes existing plans.
|
||||
LANDING_DIR="${LANDING_DIR:-/data/padelnomics/landing}" \
|
||||
DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \
|
||||
uv run --package sqlmesh_padelnomics sqlmesh run --select-model "serving.*"
|
||||
uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply
|
||||
|
||||
# Export serving tables to analytics.duckdb (atomic swap).
|
||||
# The web app detects the inode change on next query — no restart needed.
|
||||
|
||||
Reference in New Issue
Block a user