From aa27f14f3cac0bacb77d267110139375cd301dd2 Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 5 Mar 2026 17:19:12 +0100 Subject: [PATCH] fix(pipeline): eurostat filter bugs + supervisor uses sqlmesh plan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - nrg_pc_203: add missing unit=KWH filter (API returns 2 units) - lc_lci_lev: fix currency→unit filter dimension name - supervisor: use `sqlmesh plan prod --auto-apply` instead of `sqlmesh run` so new/changed models are detected automatically Co-Authored-By: Claude Sonnet 4.6 --- CHANGELOG.md | 3 +++ .../padelnomics_extract/src/padelnomics_extract/eurostat.py | 6 +++--- infra/supervisor/supervisor.sh | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c62254f..996d238 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ### Fixed - **Pipeline diagnostic script** (`scripts/check_pipeline.py`) — handle DuckDB catalog naming quirk where `lakehouse.duckdb` uses catalog `lakehouse` instead of `local`, causing SQLMesh logical views to break. Script now auto-detects the catalog via `USE`, and falls back to querying physical tables (`sqlmesh__.__`) when views fail. +- **Eurostat gas prices extractor** — `nrg_pc_203` filter missing `unit` dimension (API returns both KWH and GJ_GCV); now filters to `KWH`. +- **Eurostat labour costs extractor** — `lc_lci_lev` used non-existent `currency` filter dimension; corrected to `unit: EUR`. +- **Supervisor transform step** — changed `sqlmesh run` to `sqlmesh plan prod --auto-apply` so new/modified models are detected and applied automatically. ### Added - **Pipeline diagnostic script** (`scripts/check_pipeline.py`) — read-only script that reports row counts at every layer of the pricing pipeline (staging → foundation → serving), date range analysis, HAVING filter impact, and join coverage. Run on prod to diagnose empty serving tables. diff --git a/extract/padelnomics_extract/src/padelnomics_extract/eurostat.py b/extract/padelnomics_extract/src/padelnomics_extract/eurostat.py index 0a62ff9..fc9cc2e 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/eurostat.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/eurostat.py @@ -63,15 +63,15 @@ DATASETS: dict[str, dict] = { "time_dim": "time", }, "nrg_pc_203": { - # Gas prices for non-household consumers, EUR/GJ, excl. taxes - "filters": {"freq": "S", "nrg_cons": "GJ1000-9999", "currency": "EUR", "tax": "I_TAX"}, + # Gas prices for non-household consumers, EUR/kWh, excl. taxes + "filters": {"freq": "S", "nrg_cons": "GJ1000-9999", "unit": "KWH", "currency": "EUR", "tax": "I_TAX"}, "geo_dim": "geo", "time_dim": "time", }, "lc_lci_lev": { # Labour cost levels EUR/hour — NACE N (administrative/support services) # Stored in dim_countries for future staffed-scenario calculations. - "filters": {"lcstruct": "D1_D2_A_HW", "nace_r2": "N", "currency": "EUR"}, + "filters": {"lcstruct": "D1_D2_A_HW", "nace_r2": "N", "unit": "EUR"}, "geo_dim": "geo", "time_dim": "time", }, diff --git a/infra/supervisor/supervisor.sh b/infra/supervisor/supervisor.sh index 5bd849b..a855b12 100644 --- a/infra/supervisor/supervisor.sh +++ b/infra/supervisor/supervisor.sh @@ -33,10 +33,10 @@ do DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \ uv run --package padelnomics_extract extract - # Transform + # Transform — plan detects new/changed models; run only executes existing plans. LANDING_DIR="${LANDING_DIR:-/data/padelnomics/landing}" \ DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \ - uv run --package sqlmesh_padelnomics sqlmesh run --select-model "serving.*" + uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply # Export serving tables to analytics.duckdb (atomic swap). # The web app detects the inode change on next query — no restart needed.