diff --git a/extract/padelnomics_extract/src/padelnomics_extract/eurostat.py b/extract/padelnomics_extract/src/padelnomics_extract/eurostat.py index 6dfba92..6e45cd8 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/eurostat.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/eurostat.py @@ -1,11 +1,16 @@ """Eurostat extractor — city-level demographic datasets. -Fetches Eurostat Statistics API JSON datasets using etag-based deduplication. +Fetches Eurostat Statistics API JSON-stat datasets with etag-based deduplication. Data only changes ~twice a year so most runs skip with 304 Not Modified. +The raw Eurostat JSON-stat format is a 4D sparse dictionary (freq × indicator × +city × time) which DuckDB cannot efficiently parse. This extractor normalizes +the response into simple JSON arrays that the staging SQL can UNNEST directly. + Landing: {LANDING_DIR}/eurostat/{year}/{month}/{dataset_code}.json.gz """ +import json import sqlite3 from pathlib import Path @@ -19,11 +24,103 @@ logger = setup_logging("padelnomics.extract.eurostat") EXTRACTOR_NAME = "eurostat" EUROSTAT_BASE_URL = "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data" -# Datasets to fetch -DATASETS = [ - "urb_cpop1", # Urban Audit — city population - "ilc_di03", # Median equivalised net income by NUTS2 -] +# Dataset configs: filters fix dimension values, geo_dim/time_dim are iterated. +# All other dimensions must either be in filters or have size=1. +DATASETS: dict[str, dict] = { + "urb_cpop1": { + "filters": {"indic_ur": "DE1001V"}, # Population on 1 January, total + "geo_dim": "cities", + "time_dim": "time", + }, + "ilc_di03": { + "filters": { # Median equivalised net income + "age": "TOTAL", + "sex": "T", + "indic_il": "MED_E", + "unit": "PPS", + }, + "geo_dim": "geo", + "time_dim": "time", + }, +} + + +def _parse_jsonstat( + data: dict, + filters: dict[str, str], + geo_dim: str, + time_dim: str, +) -> list[dict]: + """Parse a Eurostat JSON-stat response into flat records. + + JSON-stat stores values in a sparse flat dict keyed by linear index + computed from dimension positions: + index = sum(pos_i * stride_i) where stride_i = product(sizes[i+1:]) + + We fix all dimensions except geo and time using ``filters``, then + iterate geo × time to extract every non-null value. + """ + dims = data["dimension"] + sizes = data["size"] + values = data["value"] + dim_names = data["id"] + + assert len(dim_names) == len(sizes) + + # Compute strides (row-major): stride[i] = product of sizes[i+1:] + strides = [1] * len(sizes) + for i in range(len(sizes) - 2, -1, -1): + strides[i] = strides[i + 1] * sizes[i + 1] + + # Resolve fixed dimension positions → compute base offset + offset = 0 + geo_idx = None + time_idx = None + + for i, name in enumerate(dim_names): + if name == geo_dim: + geo_idx = i + elif name == time_dim: + time_idx = i + elif name in filters: + cat_index = dims[name]["category"]["index"] + code = filters[name] + assert code in cat_index, ( + f"Filter value {code!r} not in dimension {name!r}. " + f"Available: {list(cat_index.keys())[:10]}..." + ) + offset += cat_index[code] * strides[i] + else: + # Dimension not filtered and not geo/time — must have size 1 + assert sizes[i] == 1, f"Dimension {name!r} has size {sizes[i]} but no filter provided" + + assert geo_idx is not None, f"geo_dim {geo_dim!r} not found in {dim_names}" + assert time_idx is not None, f"time_dim {time_dim!r} not found in {dim_names}" + + geo_index = dims[geo_dim]["category"]["index"] + time_index = dims[time_dim]["category"]["index"] + geo_by_pos = {pos: code for code, pos in geo_index.items()} + time_by_pos = {pos: code for code, pos in time_index.items()} + + geo_stride = strides[geo_idx] + time_stride = strides[time_idx] + + rows: list[dict] = [] + for geo_pos in range(sizes[geo_idx]): + geo_code = geo_by_pos[geo_pos] + for time_pos in range(sizes[time_idx]): + idx = offset + geo_pos * geo_stride + time_pos * time_stride + val = values.get(str(idx)) + if val is not None: + rows.append( + { + "geo_code": geo_code, + "ref_year": time_by_pos[time_pos], + "value": val, + } + ) + + return rows def _etag_path(dest: Path) -> Path: @@ -35,8 +132,12 @@ def _fetch_with_etag( url: str, dest: Path, session: niquests.Session, + dataset_config: dict | None = None, ) -> int: - """GET url with If-None-Match etag. Returns bytes_written (0 if 304).""" + """GET url with If-None-Match etag. Pre-processes JSON-stat if config given. + + Returns bytes_written (0 if 304). + """ etag_file = _etag_path(dest) headers: dict[str, str] = {} @@ -49,7 +150,21 @@ def _fetch_with_etag( return 0 resp.raise_for_status() - bytes_written = write_gzip_atomic(dest, resp.content) + + if dataset_config: + raw = resp.json() + rows = _parse_jsonstat( + raw, + filters=dataset_config["filters"], + geo_dim=dataset_config["geo_dim"], + time_dim=dataset_config["time_dim"], + ) + payload = json.dumps({"rows": rows, "count": len(rows)}).encode() + logger.info("parsed %d records", len(rows)) + else: + payload = resp.content + + bytes_written = write_gzip_atomic(dest, payload) if etag := resp.headers.get("etag"): etag_file.parent.mkdir(parents=True, exist_ok=True) @@ -70,13 +185,13 @@ def extract( files_skipped = 0 bytes_written_total = 0 - for dataset_code in DATASETS: + for dataset_code, config in DATASETS.items(): url = f"{EUROSTAT_BASE_URL}/{dataset_code}?format=JSON&lang=EN" dest_dir = landing_path(landing_dir, "eurostat", year, month) dest = dest_dir / f"{dataset_code}.json.gz" logger.info("GET %s", dataset_code) - bytes_written = _fetch_with_etag(url, dest, session) + bytes_written = _fetch_with_etag(url, dest, session, config) if bytes_written > 0: logger.info("%s updated — %s bytes compressed", dataset_code, f"{bytes_written:,}") diff --git a/transform/sqlmesh_padelnomics/models/foundation/dim_cities.sql b/transform/sqlmesh_padelnomics/models/foundation/dim_cities.sql index 4eae720..25a2c5b 100644 --- a/transform/sqlmesh_padelnomics/models/foundation/dim_cities.sql +++ b/transform/sqlmesh_padelnomics/models/foundation/dim_cities.sql @@ -15,7 +15,8 @@ eurostat_cities AS ( city_code, country_code, population, - ref_year + ref_year, + LOWER(REPLACE(city_code, country_code, '')) AS city_slug_raw FROM staging.stg_population QUALIFY ROW_NUMBER() OVER (PARTITION BY city_code ORDER BY ref_year DESC) = 1 ), @@ -30,16 +31,6 @@ venue_counts AS ( FROM foundation.dim_venues WHERE city IS NOT NULL AND city != '' GROUP BY country_code, city -), --- Eurostat city label mapping to canonical city names --- (Eurostat uses codes like DE001C → Berlin; we keep both) -eurostat_labels AS ( - SELECT DISTINCT - city_code, - country_code, - -- Derive a slug-friendly city name from the code as fallback - LOWER(REPLACE(city_code, country_code, '')) AS city_slug_raw - FROM eurostat_cities ) SELECT ec.city_code, diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql index c8c4083..2594658 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql @@ -19,8 +19,8 @@ WITH parsed AS ( tenant -> 'address' ->> 'city' AS city, tenant -> 'address' ->> 'postal_code' AS postal_code, tenant -> 'address' ->> 'country_code' AS country_code, - TRY_CAST(tenant -> 'address' ->> 'coordinate_lat' AS DOUBLE) AS lat, - TRY_CAST(tenant -> 'address' ->> 'coordinate_lon' AS DOUBLE) AS lon, + TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lat' AS DOUBLE) AS lat, + TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lon' AS DOUBLE) AS lon, tenant ->> 'sport_ids' AS sport_ids_raw, tenant ->> 'tenant_type' AS tenant_type, filename AS source_file, diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_population.sql b/transform/sqlmesh_padelnomics/models/staging/stg_population.sql index 418408e..f5b838b 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_population.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_population.sql @@ -1,5 +1,5 @@ -- Eurostat Urban Audit city population (dataset: urb_cpop1). --- Reads landing zone JSON directly and parses the Eurostat multidimensional format. +-- Reads pre-processed landing zone JSON (extractor normalizes JSON-stat to flat rows). -- One row per (city_code, year) with validated population values. -- -- Source: data/landing/eurostat/{year}/{month}/urb_cpop1.json.gz @@ -11,51 +11,26 @@ MODEL ( grain (city_code, ref_year) ); -WITH raw AS ( - SELECT raw_json, filename - FROM read_json( - @LANDING_DIR || '/eurostat/*/*/urb_cpop1.json.gz', - format = 'auto', - filename = true, - columns = { 'raw_json': 'JSON' } - ) -), -cities AS ( +WITH parsed AS ( SELECT - city_code, - (city_pos)::INTEGER AS city_pos, - filename, raw_json, - (json_extract(raw_json, '$.size[1]'))::INTEGER AS n_times - FROM raw, - LATERAL ( - SELECT key AS city_code, value::INTEGER AS city_pos - FROM json_each(json_extract(raw_json, '$.dimension.cities.category.index')) + row ->> 'geo_code' AS geo_code, + row ->> 'ref_year' AS ref_year, + TRY_CAST(row ->> 'value' AS DOUBLE) AS population, + filename AS source_file, + CURRENT_DATE AS extracted_date + FROM ( + SELECT UNNEST(rows) AS row, filename + FROM read_json( + @LANDING_DIR || '/eurostat/*/*/urb_cpop1.json.gz', + format = 'auto', + filename = true + ) ) -), -times AS ( - SELECT key AS ref_year, value::INTEGER AS time_pos - FROM (SELECT raw_json FROM raw LIMIT 1), - LATERAL ( - SELECT key, value - FROM json_each(json_extract(raw_json, '$.dimension.time.category.index')) - ) -), -parsed AS ( - SELECT - c.city_code, - t.ref_year, - TRY_CAST( - json_extract(c.raw_json, '$.' || (c.city_pos * c.n_times + t.time_pos)::TEXT) - AS DOUBLE - ) AS population, - c.filename AS source_file, - CURRENT_DATE AS extracted_date - FROM cities c - CROSS JOIN times t + WHERE (row ->> 'geo_code') IS NOT NULL ) SELECT - UPPER(city_code) AS city_code, - UPPER(LEFT(city_code, 2)) AS country_code, + UPPER(geo_code) AS city_code, + UPPER(LEFT(geo_code, 2)) AS country_code, ref_year::INTEGER AS ref_year, population::BIGINT AS population, extracted_date