Compare commits
8 Commits
v202603051
...
v202603051
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
add5f8ddfa | ||
|
|
15ca316682 | ||
|
|
103ef73cf5 | ||
|
|
aa27f14f3c | ||
|
|
8205744444 | ||
|
|
1cbefe349c | ||
|
|
003f19e071 | ||
|
|
c3f15535b8 |
@@ -3,6 +3,7 @@ APP_NAME=ENC[AES256_GCM,data:ldJf4P0iD9ziMVg=,iv:hiVl2whhd02yZCafzBfbxX5/EU/suvz
|
||||
SECRET_KEY=ENC[AES256_GCM,data:hmlXm7NKVVFmeea4DnlrH/oSnsoaMAkUz42oWwFXOXL1XwAh3iemIKHUQOV2G4SPlmjfmEVQD64xbxaJW0OcPQ/8KqhrRYDsy0F/u0h7nmNQdwJrcvzcmbvjgcwU5IITPIr23d/W5PeSJzxhB93uaJ0+zFN2CyHfeewrJKafPfw=,iv:e+ZSLUO+dlt+ET8r/0/pf74UtGIBMkaVoJMWlJn1W5U=,tag:LdDCCrHcJnKLkKL/cY/R/Q==,type:str]
|
||||
BASE_URL=ENC[AES256_GCM,data:50k/RqlZ1EHqGM4UkSmTaCsuJgyU4w==,iv:f8zKr2jkts4RsawA97hzICHwj9Quzgp+Dw8AhQ7GSWA=,tag:9KhNvwmoOtDyuIql7okeew==,type:str]
|
||||
DEBUG=ENC[AES256_GCM,data:O0/uRF4=,iv:cZ+vyUuXjQOYYRf4l8lWS3JIWqL/w3pnlCTDPAZpB1E=,tag:OmJE9oJpzYzth0xwaMqADQ==,type:str]
|
||||
LANDING_DIR=ENC[AES256_GCM,data:rn8u+tGob0vU7kSAtxmrpYQlneesvyO10A==,iv:PuGtdcQBdRbnybulzd6L7JVQClcK3/QjMeYFXZSxGW0=,tag:K2PJPMCWXdqTlQpwP9+DOQ==,type:str]
|
||||
#ENC[AES256_GCM,data:xmJc6WTb3yumHzvLeA==,iv:9jKuYaDgm4zR/DTswIMwsajV0s5UTe+AOX4Sue0GPCs=,tag:b/7H9js1HmFYjuQE4zJz8w==,type:comment]
|
||||
ADMIN_EMAILS=ENC[AES256_GCM,data:R/2YTk8KDEpNQ71RN8Fm6miLZvXNJQ==,iv:kzmiaBK7KvnSjR5gx6lp7zEMzs5xRul6LBhmLf48bCU=,tag:csVZ0W1TxBAoJacQurW9VQ==,type:str]
|
||||
#ENC[AES256_GCM,data:S7Pdg9tcom3N,iv:OjmYk3pqbZHKPS1Y06w1y8BE7CU0y6Vx2wnio9tEhus=,tag:YAOGbrHQ+UOcdSQFWdiCDA==,type:comment]
|
||||
@@ -63,7 +64,7 @@ sops_age__list_1__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb2
|
||||
sops_age__list_1__map_recipient=age1wjepykv3glvsrtegu25tevg7vyn3ngpl607u3yjc9ucay04s045s796msw
|
||||
sops_age__list_2__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBFeHhaOURNZnRVMEwxNThu\nUjF4Q0kwUXhTUE1QSzZJbmpubnh3RnpQTmdvCjRmWWxpNkxFUmVGb3NRbnlydW5O\nWEg3ZXJQTU4vcndzS2pUQXY3Q0ttYjAKLS0tIE9IRFJ1c2ZxbGVHa2xTL0swbGN1\nTzgwMThPUDRFTWhuZHJjZUYxOTZrU00KY62qrNBCUQYxwcLMXFEnLkwncxq3BPJB\nKm4NzeHBU87XmPWVrgrKuf+PH1mxJlBsl7Hev8xBTy7l6feiZjLIvQ==\n-----END AGE ENCRYPTED FILE-----\n
|
||||
sops_age__list_2__map_recipient=age1c783ym2q5x9tv7py5d28uc4k44aguudjn03g97l9nzs00dd9tsrqum8h4d
|
||||
sops_lastmodified=2026-03-01T20:26:09Z
|
||||
sops_mac=ENC[AES256_GCM,data:IxzU6VehA0iHgpIEqDSoMywKyKONI6jSr/6Amo+g3JI72awJtk6ft0ppfDWZjeHhL0ixfnvgqMNwai+1e0V/U8hSP8/FqYKEVpAO0UGJfBPKP3pbw+tx3WJQMF5dIh2/UVNrKvoACZq0IDJfXlVqalCnRMQEHGtKVTIT3fn8m6c=,iv:0w0ohOBsqTzuoQdtt6AI5ZdHEKw9+hI73tycBjDSS0o=,tag:Guw7LweA4m4Nw+3kSuZKWA==,type:str]
|
||||
sops_lastmodified=2026-03-05T15:55:19Z
|
||||
sops_mac=ENC[AES256_GCM,data:orLypjurBTYmk3um0bDQV3wFxj1pjCsjOf2D+AZyoIYY88MeY8BjK8mg8BWhmJYlGWqHH1FCpoJS+2SECv2Bvgejqvx/C/HSysA8et5CArM/p/MBbcupLAKOD8bTXorKMRDYPkWpK/snkPToxIZZd7dNj/zSU+OhRp5qLGCHkvM=,iv:eBn93z4DSk8UPHgP/Jf/Kz+3KwoKIQ9Et72pbLFcLP8=,tag:79kzPIKp0rtHGhH1CkXqwg==,type:str]
|
||||
sops_unencrypted_suffix=_unencrypted
|
||||
sops_version=3.12.1
|
||||
|
||||
@@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Fixed
|
||||
- **Pipeline diagnostic script** (`scripts/check_pipeline.py`) — handle DuckDB catalog naming quirk where `lakehouse.duckdb` uses catalog `lakehouse` instead of `local`, causing SQLMesh logical views to break. Script now auto-detects the catalog via `USE`, and falls back to querying physical tables (`sqlmesh__<schema>.<table>__<hash>`) when views fail.
|
||||
- **Eurostat gas prices extractor** — `nrg_pc_203` filter missing `unit` dimension (API returns both KWH and GJ_GCV); now filters to `KWH`.
|
||||
- **Eurostat labour costs extractor** — `lc_lci_lev` used non-existent `currency` filter dimension; corrected to `unit: EUR`.
|
||||
- **Supervisor transform step** — changed `sqlmesh run` to `sqlmesh plan prod --auto-apply` so new/modified models are detected and applied automatically.
|
||||
|
||||
### Added
|
||||
- **Pipeline diagnostic script** (`scripts/check_pipeline.py`) — read-only script that reports row counts at every layer of the pricing pipeline (staging → foundation → serving), date range analysis, HAVING filter impact, and join coverage. Run on prod to diagnose empty serving tables.
|
||||
- **Extraction card descriptions** — each workflow card on the admin pipeline page now shows a one-line description explaining what the data source is (e.g. "EU geographic boundaries (NUTS2 polygons) from Eurostat GISCO"). Descriptions defined in `workflows.toml`.
|
||||
|
||||
@@ -63,15 +63,15 @@ DATASETS: dict[str, dict] = {
|
||||
"time_dim": "time",
|
||||
},
|
||||
"nrg_pc_203": {
|
||||
# Gas prices for non-household consumers, EUR/GJ, excl. taxes
|
||||
"filters": {"freq": "S", "nrg_cons": "GJ1000-9999", "currency": "EUR", "tax": "I_TAX"},
|
||||
# Gas prices for non-household consumers, EUR/kWh, excl. taxes
|
||||
"filters": {"freq": "S", "nrg_cons": "GJ1000-9999", "unit": "KWH", "currency": "EUR", "tax": "I_TAX"},
|
||||
"geo_dim": "geo",
|
||||
"time_dim": "time",
|
||||
},
|
||||
"lc_lci_lev": {
|
||||
# Labour cost levels EUR/hour — NACE N (administrative/support services)
|
||||
# Stored in dim_countries for future staffed-scenario calculations.
|
||||
"filters": {"lcstruct": "D1_D2_A_HW", "nace_r2": "N", "currency": "EUR"},
|
||||
# D1_D4_MD5 = compensation of employees + taxes - subsidies (total labour cost)
|
||||
"filters": {"lcstruct": "D1_D4_MD5", "nace_r2": "N", "unit": "EUR"},
|
||||
"geo_dim": "geo",
|
||||
"time_dim": "time",
|
||||
},
|
||||
|
||||
@@ -33,10 +33,10 @@ do
|
||||
DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \
|
||||
uv run --package padelnomics_extract extract
|
||||
|
||||
# Transform
|
||||
# Transform — plan detects new/changed models; run only executes existing plans.
|
||||
LANDING_DIR="${LANDING_DIR:-/data/padelnomics/landing}" \
|
||||
DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \
|
||||
uv run --package sqlmesh_padelnomics sqlmesh run --select-model "serving.*"
|
||||
uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply
|
||||
|
||||
# Export serving tables to analytics.duckdb (atomic swap).
|
||||
# The web app detects the inode change on next query — no restart needed.
|
||||
|
||||
@@ -8,6 +8,10 @@ Or locally:
|
||||
DUCKDB_PATH=data/lakehouse.duckdb uv run python scripts/check_pipeline.py
|
||||
|
||||
Read-only — never writes to the database.
|
||||
|
||||
Handles the DuckDB catalog naming quirk: when the file is named lakehouse.duckdb,
|
||||
the catalog is "lakehouse" not "local". SQLMesh views may reference the wrong catalog,
|
||||
so we fall back to querying physical tables (sqlmesh__<schema>.<table>__<hash>).
|
||||
"""
|
||||
|
||||
import os
|
||||
@@ -27,6 +31,86 @@ PIPELINE_TABLES = [
|
||||
]
|
||||
|
||||
|
||||
def _use_catalog(con):
|
||||
"""Detect and USE the database catalog so schema-qualified queries work."""
|
||||
catalogs = [
|
||||
row[0]
|
||||
for row in con.execute(
|
||||
"SELECT catalog_name FROM information_schema.schemata"
|
||||
).fetchall()
|
||||
]
|
||||
# Pick the non-system catalog (not 'system', 'temp', 'memory')
|
||||
user_catalogs = [c for c in set(catalogs) if c not in ("system", "temp", "memory")]
|
||||
if user_catalogs:
|
||||
catalog = user_catalogs[0]
|
||||
con.execute(f"USE {catalog}")
|
||||
return catalog
|
||||
return None
|
||||
|
||||
|
||||
def _find_physical_table(con, schema, table):
|
||||
"""Find the SQLMesh physical table name for a logical table.
|
||||
|
||||
SQLMesh stores physical tables as:
|
||||
sqlmesh__<schema>.<schema>__<table>__<hash>
|
||||
"""
|
||||
sqlmesh_schema = f"sqlmesh__{schema}"
|
||||
try:
|
||||
rows = con.execute(
|
||||
"SELECT table_schema, table_name "
|
||||
"FROM information_schema.tables "
|
||||
f"WHERE table_schema = '{sqlmesh_schema}' "
|
||||
f"AND table_name LIKE '{schema}__{table}%' "
|
||||
"ORDER BY table_name "
|
||||
"LIMIT 1"
|
||||
).fetchall()
|
||||
if rows:
|
||||
return f"{rows[0][0]}.{rows[0][1]}"
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def _query_table(con, schema, table):
|
||||
"""Try logical view first, fall back to physical table. Returns (fqn, count) or (fqn, error_str)."""
|
||||
logical = f"{schema}.{table}"
|
||||
try:
|
||||
(count,) = con.execute(f"SELECT COUNT(*) FROM {logical}").fetchone()
|
||||
return logical, count
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
physical = _find_physical_table(con, schema, table)
|
||||
if physical:
|
||||
try:
|
||||
(count,) = con.execute(f"SELECT COUNT(*) FROM {physical}").fetchone()
|
||||
return f"{physical} (physical)", count
|
||||
except Exception as e:
|
||||
return f"{physical} (physical)", f"ERROR: {e}"
|
||||
|
||||
return logical, "ERROR: view broken, no physical table found"
|
||||
|
||||
|
||||
def _query_sql(con, sql, schema_tables):
|
||||
"""Execute SQL, falling back to rewritten SQL using physical table names if views fail.
|
||||
|
||||
schema_tables: list of (schema, table) tuples used in the SQL, in order of appearance.
|
||||
The SQL must use {schema}.{table} format for these references.
|
||||
"""
|
||||
try:
|
||||
return con.execute(sql)
|
||||
except Exception:
|
||||
# Rewrite SQL to use physical table names
|
||||
rewritten = sql
|
||||
for schema, table in schema_tables:
|
||||
physical = _find_physical_table(con, schema, table)
|
||||
if physical:
|
||||
rewritten = rewritten.replace(f"{schema}.{table}", physical)
|
||||
else:
|
||||
raise
|
||||
return con.execute(rewritten)
|
||||
|
||||
|
||||
def main():
|
||||
if not os.path.exists(DUCKDB_PATH):
|
||||
print(f"ERROR: {DUCKDB_PATH} not found")
|
||||
@@ -36,6 +120,10 @@ def main():
|
||||
|
||||
print(f"Database: {DUCKDB_PATH}")
|
||||
print(f"DuckDB version: {con.execute('SELECT version()').fetchone()[0]}")
|
||||
|
||||
catalog = _use_catalog(con)
|
||||
if catalog:
|
||||
print(f"Catalog: {catalog}")
|
||||
print()
|
||||
|
||||
# ── Row counts at each layer ──────────────────────────────────────────
|
||||
@@ -44,28 +132,11 @@ def main():
|
||||
print("=" * 60)
|
||||
|
||||
for schema, table in PIPELINE_TABLES:
|
||||
# SQLMesh may use __<env> suffixed physical tables
|
||||
# Try the logical name first, then scan for physical tables
|
||||
candidates = [f"{schema}.{table}"]
|
||||
try:
|
||||
phys = con.execute(
|
||||
f"SELECT table_schema || '.' || table_name "
|
||||
f"FROM information_schema.tables "
|
||||
f"WHERE table_name LIKE '{table}%' "
|
||||
f"ORDER BY table_name"
|
||||
).fetchall()
|
||||
for (name,) in phys:
|
||||
if name not in candidates:
|
||||
candidates.append(name)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for fqn in candidates:
|
||||
try:
|
||||
(count,) = con.execute(f"SELECT COUNT(*) FROM {fqn}").fetchone()
|
||||
print(f" {fqn:50s} {count:>10,} rows")
|
||||
except Exception as e:
|
||||
print(f" {fqn:50s} ERROR: {e}")
|
||||
fqn, result = _query_table(con, schema, table)
|
||||
if isinstance(result, int):
|
||||
print(f" {fqn:55s} {result:>10,} rows")
|
||||
else:
|
||||
print(f" {fqn:55s} {result}")
|
||||
|
||||
# ── Date range in fct_daily_availability ──────────────────────────────
|
||||
print()
|
||||
@@ -74,7 +145,9 @@ def main():
|
||||
print("=" * 60)
|
||||
|
||||
try:
|
||||
row = con.execute("""
|
||||
row = _query_sql(
|
||||
con,
|
||||
"""
|
||||
SELECT
|
||||
MIN(snapshot_date) AS min_date,
|
||||
MAX(snapshot_date) AS max_date,
|
||||
@@ -82,7 +155,9 @@ def main():
|
||||
CURRENT_DATE AS today,
|
||||
CURRENT_DATE - INTERVAL '30 days' AS window_start
|
||||
FROM foundation.fct_daily_availability
|
||||
""").fetchone()
|
||||
""",
|
||||
[("foundation", "fct_daily_availability")],
|
||||
).fetchone()
|
||||
if row:
|
||||
min_date, max_date, days, today, window_start = row
|
||||
print(f" Min snapshot_date: {min_date}")
|
||||
@@ -104,7 +179,9 @@ def main():
|
||||
print("=" * 60)
|
||||
|
||||
try:
|
||||
row = con.execute("""
|
||||
row = _query_sql(
|
||||
con,
|
||||
"""
|
||||
WITH venue_stats AS (
|
||||
SELECT
|
||||
da.tenant_id,
|
||||
@@ -124,7 +201,9 @@ def main():
|
||||
MAX(days_observed) AS max_days,
|
||||
MIN(days_observed) AS min_days
|
||||
FROM venue_stats
|
||||
""").fetchone()
|
||||
""",
|
||||
[("foundation", "fct_daily_availability")],
|
||||
).fetchone()
|
||||
if row:
|
||||
total, passing, failing, max_d, min_d = row
|
||||
print(f" Venues in 30-day window: {total}")
|
||||
@@ -145,7 +224,9 @@ def main():
|
||||
print("=" * 60)
|
||||
|
||||
try:
|
||||
rows = con.execute("""
|
||||
rows = _query_sql(
|
||||
con,
|
||||
"""
|
||||
SELECT
|
||||
CASE
|
||||
WHEN occupancy_rate IS NULL THEN 'NULL'
|
||||
@@ -160,7 +241,9 @@ def main():
|
||||
FROM foundation.fct_daily_availability
|
||||
GROUP BY 1
|
||||
ORDER BY 1
|
||||
""").fetchall()
|
||||
""",
|
||||
[("foundation", "fct_daily_availability")],
|
||||
).fetchall()
|
||||
for bucket, cnt in rows:
|
||||
print(f" {bucket:25s} {cnt:>10,}")
|
||||
except Exception as e:
|
||||
@@ -173,14 +256,21 @@ def main():
|
||||
print("=" * 60)
|
||||
|
||||
try:
|
||||
row = con.execute("""
|
||||
row = _query_sql(
|
||||
con,
|
||||
"""
|
||||
SELECT
|
||||
COUNT(DISTINCT a.tenant_id) AS slot_tenants,
|
||||
COUNT(DISTINCT c.tenant_id) AS capacity_tenants,
|
||||
COUNT(DISTINCT a.tenant_id) - COUNT(DISTINCT c.tenant_id) AS missing_capacity
|
||||
FROM foundation.fct_availability_slot a
|
||||
LEFT JOIN foundation.dim_venue_capacity c ON a.tenant_id = c.tenant_id
|
||||
""").fetchone()
|
||||
""",
|
||||
[
|
||||
("foundation", "fct_availability_slot"),
|
||||
("foundation", "dim_venue_capacity"),
|
||||
],
|
||||
).fetchone()
|
||||
if row:
|
||||
slot_t, cap_t, missing = row
|
||||
print(f" Tenants in fct_availability_slot: {slot_t}")
|
||||
|
||||
Reference in New Issue
Block a user