Files
padelnomics/extract/padelnomics_extract/src/padelnomics_extract/eurostat.py
Deeman 15ca316682 fix(extract): correct lc_lci_lev lcstruct filter value
D1_D2_A_HW doesn't exist in the API; use D1_D4_MD5 (total labour cost
= compensation + taxes - subsidies).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-05 17:32:49 +01:00

291 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Eurostat extractor — city-level demographic datasets.
Fetches Eurostat Statistics API JSON-stat datasets with etag-based deduplication.
Data only changes ~twice a year so most runs skip with 304 Not Modified.
The raw Eurostat JSON-stat format is a 4D sparse dictionary (freq × indicator ×
city × time) which DuckDB cannot efficiently parse. This extractor normalizes
the response into simple JSON arrays that the staging SQL can UNNEST directly.
Landing: {LANDING_DIR}/eurostat/{year}/{month}/{dataset_code}.json.gz
"""
import json
import sqlite3
from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import landing_path, write_gzip_atomic
logger = setup_logging("padelnomics.extract.eurostat")
EXTRACTOR_NAME = "eurostat"
EUROSTAT_BASE_URL = "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data"
# Dataset configs: filters fix dimension values, geo_dim/time_dim are iterated.
# All other dimensions must either be in filters or have size=1.
#
# Optional `dataset_code` field: when present, used for the API URL instead of the dict key.
# This allows multiple entries to share the same Eurostat dataset with different filters
# (e.g. five prc_ppp_ind entries with different ppp_cat values).
DATASETS: dict[str, dict] = {
"urb_cpop1": {
"filters": {"indic_ur": "DE1001V"}, # Population on 1 January, total
"geo_dim": "cities",
"time_dim": "time",
},
"ilc_di03": {
"filters": { # Median equivalised net income
"age": "TOTAL",
"sex": "T",
"indic_il": "MED_E",
"unit": "PPS",
},
"geo_dim": "geo",
"time_dim": "time",
},
"nama_10r_2hhinc": {
"filters": { # Net household income per inhabitant in PPS (NUTS-2 grain, contains NUTS-1)
"unit": "PPS_EU27_2020_HAB",
"na_item": "B6N",
"direct": "BAL",
},
"geo_dim": "geo",
"time_dim": "time",
},
# ── Direct-value datasets (actual EUR figures) ───────────────────────────
"nrg_pc_205": {
# Electricity prices for non-household consumers, EUR/kWh, excl. taxes
"filters": {"freq": "S", "nrg_cons": "MWH500-1999", "currency": "EUR", "tax": "I_TAX"},
"geo_dim": "geo",
"time_dim": "time",
},
"nrg_pc_203": {
# Gas prices for non-household consumers, EUR/kWh, excl. taxes
"filters": {"freq": "S", "nrg_cons": "GJ1000-9999", "unit": "KWH", "currency": "EUR", "tax": "I_TAX"},
"geo_dim": "geo",
"time_dim": "time",
},
"lc_lci_lev": {
# Labour cost levels EUR/hour — NACE N (administrative/support services)
# D1_D4_MD5 = compensation of employees + taxes - subsidies (total labour cost)
"filters": {"lcstruct": "D1_D4_MD5", "nace_r2": "N", "unit": "EUR"},
"geo_dim": "geo",
"time_dim": "time",
},
# ── Price level indices (relative scaling, EU27=100) ─────────────────────
# Five entries share the prc_ppp_ind dataset with different ppp_cat filters.
# dataset_code points to the real API endpoint; the dict key is the landing filename.
"prc_ppp_ind_construction": {
"dataset_code": "prc_ppp_ind",
"filters": {"ppp_cat": "A050202", "na_item": "PLI_EU27_2020"},
"geo_dim": "geo",
"time_dim": "time",
},
"prc_ppp_ind_housing": {
"dataset_code": "prc_ppp_ind",
"filters": {"ppp_cat": "A0104", "na_item": "PLI_EU27_2020"},
"geo_dim": "geo",
"time_dim": "time",
},
"prc_ppp_ind_services": {
"dataset_code": "prc_ppp_ind",
"filters": {"ppp_cat": "P0201", "na_item": "PLI_EU27_2020"},
"geo_dim": "geo",
"time_dim": "time",
},
"prc_ppp_ind_misc": {
"dataset_code": "prc_ppp_ind",
"filters": {"ppp_cat": "A0112", "na_item": "PLI_EU27_2020"},
"geo_dim": "geo",
"time_dim": "time",
},
"prc_ppp_ind_government": {
"dataset_code": "prc_ppp_ind",
"filters": {"ppp_cat": "P0202", "na_item": "PLI_EU27_2020"},
"geo_dim": "geo",
"time_dim": "time",
},
}
def _parse_jsonstat(
data: dict,
filters: dict[str, str],
geo_dim: str,
time_dim: str,
) -> list[dict]:
"""Parse a Eurostat JSON-stat response into flat records.
JSON-stat stores values in a sparse flat dict keyed by linear index
computed from dimension positions:
index = sum(pos_i * stride_i) where stride_i = product(sizes[i+1:])
We fix all dimensions except geo and time using ``filters``, then
iterate geo × time to extract every non-null value.
"""
dims = data["dimension"]
sizes = data["size"]
values = data["value"]
dim_names = data["id"]
assert len(dim_names) == len(sizes)
# Compute strides (row-major): stride[i] = product of sizes[i+1:]
strides = [1] * len(sizes)
for i in range(len(sizes) - 2, -1, -1):
strides[i] = strides[i + 1] * sizes[i + 1]
# Resolve fixed dimension positions → compute base offset
offset = 0
geo_idx = None
time_idx = None
for i, name in enumerate(dim_names):
if name == geo_dim:
geo_idx = i
elif name == time_dim:
time_idx = i
elif name in filters:
cat_index = dims[name]["category"]["index"]
code = filters[name]
assert code in cat_index, (
f"Filter value {code!r} not in dimension {name!r}. "
f"Available: {list(cat_index.keys())[:10]}..."
)
offset += cat_index[code] * strides[i]
else:
# Dimension not filtered and not geo/time — must have size 1
assert sizes[i] == 1, f"Dimension {name!r} has size {sizes[i]} but no filter provided"
assert geo_idx is not None, f"geo_dim {geo_dim!r} not found in {dim_names}"
assert time_idx is not None, f"time_dim {time_dim!r} not found in {dim_names}"
geo_index = dims[geo_dim]["category"]["index"]
time_index = dims[time_dim]["category"]["index"]
geo_by_pos = {pos: code for code, pos in geo_index.items()}
time_by_pos = {pos: code for code, pos in time_index.items()}
geo_stride = strides[geo_idx]
time_stride = strides[time_idx]
rows: list[dict] = []
for geo_pos in range(sizes[geo_idx]):
geo_code = geo_by_pos[geo_pos]
for time_pos in range(sizes[time_idx]):
idx = offset + geo_pos * geo_stride + time_pos * time_stride
val = values.get(str(idx))
if val is not None:
rows.append(
{
"geo_code": geo_code,
"ref_year": time_by_pos[time_pos],
"value": val,
}
)
return rows
def _etag_path(dest: Path) -> Path:
"""Return the sibling .etag file path for a given dest."""
return dest.parent / (dest.name + ".etag")
def _fetch_with_etag(
url: str,
dest: Path,
session: niquests.Session,
dataset_config: dict | None = None,
) -> int:
"""GET url with If-None-Match etag. Pre-processes JSON-stat if config given.
Returns bytes_written (0 if 304).
"""
etag_file = _etag_path(dest)
headers: dict[str, str] = {}
if etag_file.exists():
headers["If-None-Match"] = etag_file.read_text().strip()
resp = session.get(url, headers=headers, timeout=HTTP_TIMEOUT_SECONDS)
if resp.status_code == 304:
return 0
resp.raise_for_status()
if dataset_config:
raw = resp.json()
rows = _parse_jsonstat(
raw,
filters=dataset_config["filters"],
geo_dim=dataset_config["geo_dim"],
time_dim=dataset_config["time_dim"],
)
payload = json.dumps({"rows": rows, "count": len(rows)}).encode()
logger.info("parsed %d records", len(rows))
else:
payload = resp.content
bytes_written = write_gzip_atomic(dest, payload)
if etag := resp.headers.get("etag"):
etag_file.parent.mkdir(parents=True, exist_ok=True)
etag_file.write_text(etag)
return bytes_written
def extract(
landing_dir: Path,
year_month: str,
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Fetch all Eurostat datasets. Returns run metrics."""
assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}"
assert "/" in year_month and len(year_month) == 7, f"year_month must be YYYY/MM: {year_month!r}"
year, month = year_month.split("/")
files_written = 0
files_skipped = 0
bytes_written_total = 0
for dataset_key, config in DATASETS.items():
# Use dataset_code (if set) for the API URL; fall back to the dict key.
# This lets multiple entries share one Eurostat dataset with different filters.
api_code = config.get("dataset_code", dataset_key)
url = f"{EUROSTAT_BASE_URL}/{api_code}?format=JSON&lang=EN"
for key, val in config.get("filters", {}).items():
url += f"&{key}={val}"
dest_dir = landing_path(landing_dir, "eurostat", year, month)
dest = dest_dir / f"{dataset_key}.json.gz"
logger.info("GET %s", dataset_key)
bytes_written = _fetch_with_etag(url, dest, session, config)
if bytes_written > 0:
logger.info("%s updated — %s bytes compressed", dataset_key, f"{bytes_written:,}")
files_written += 1
bytes_written_total += bytes_written
else:
logger.info("%s not modified (304)", dataset_key)
files_skipped += 1
return {
"files_written": files_written,
"files_skipped": files_skipped,
"bytes_written": bytes_written_total,
"cursor_value": year_month,
}
def main() -> None:
run_extractor(EXTRACTOR_NAME, extract)
if __name__ == "__main__":
main()