fix: eurostat JSON-stat parsing + staging model corrections
Eurostat JSON-stat format (4-7 dimension sparse dict with 583K values) causes DuckDB OOM — pre-process in extractor to flat records. Also fix dim_cities unused CTE bug and playtomic venue lat/lon path. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,11 +1,16 @@
|
|||||||
"""Eurostat extractor — city-level demographic datasets.
|
"""Eurostat extractor — city-level demographic datasets.
|
||||||
|
|
||||||
Fetches Eurostat Statistics API JSON datasets using etag-based deduplication.
|
Fetches Eurostat Statistics API JSON-stat datasets with etag-based deduplication.
|
||||||
Data only changes ~twice a year so most runs skip with 304 Not Modified.
|
Data only changes ~twice a year so most runs skip with 304 Not Modified.
|
||||||
|
|
||||||
|
The raw Eurostat JSON-stat format is a 4D sparse dictionary (freq × indicator ×
|
||||||
|
city × time) which DuckDB cannot efficiently parse. This extractor normalizes
|
||||||
|
the response into simple JSON arrays that the staging SQL can UNNEST directly.
|
||||||
|
|
||||||
Landing: {LANDING_DIR}/eurostat/{year}/{month}/{dataset_code}.json.gz
|
Landing: {LANDING_DIR}/eurostat/{year}/{month}/{dataset_code}.json.gz
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
import sqlite3
|
import sqlite3
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
@@ -19,11 +24,103 @@ logger = setup_logging("padelnomics.extract.eurostat")
|
|||||||
EXTRACTOR_NAME = "eurostat"
|
EXTRACTOR_NAME = "eurostat"
|
||||||
EUROSTAT_BASE_URL = "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data"
|
EUROSTAT_BASE_URL = "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data"
|
||||||
|
|
||||||
# Datasets to fetch
|
# Dataset configs: filters fix dimension values, geo_dim/time_dim are iterated.
|
||||||
DATASETS = [
|
# All other dimensions must either be in filters or have size=1.
|
||||||
"urb_cpop1", # Urban Audit — city population
|
DATASETS: dict[str, dict] = {
|
||||||
"ilc_di03", # Median equivalised net income by NUTS2
|
"urb_cpop1": {
|
||||||
]
|
"filters": {"indic_ur": "DE1001V"}, # Population on 1 January, total
|
||||||
|
"geo_dim": "cities",
|
||||||
|
"time_dim": "time",
|
||||||
|
},
|
||||||
|
"ilc_di03": {
|
||||||
|
"filters": { # Median equivalised net income
|
||||||
|
"age": "TOTAL",
|
||||||
|
"sex": "T",
|
||||||
|
"indic_il": "MED_E",
|
||||||
|
"unit": "PPS",
|
||||||
|
},
|
||||||
|
"geo_dim": "geo",
|
||||||
|
"time_dim": "time",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_jsonstat(
|
||||||
|
data: dict,
|
||||||
|
filters: dict[str, str],
|
||||||
|
geo_dim: str,
|
||||||
|
time_dim: str,
|
||||||
|
) -> list[dict]:
|
||||||
|
"""Parse a Eurostat JSON-stat response into flat records.
|
||||||
|
|
||||||
|
JSON-stat stores values in a sparse flat dict keyed by linear index
|
||||||
|
computed from dimension positions:
|
||||||
|
index = sum(pos_i * stride_i) where stride_i = product(sizes[i+1:])
|
||||||
|
|
||||||
|
We fix all dimensions except geo and time using ``filters``, then
|
||||||
|
iterate geo × time to extract every non-null value.
|
||||||
|
"""
|
||||||
|
dims = data["dimension"]
|
||||||
|
sizes = data["size"]
|
||||||
|
values = data["value"]
|
||||||
|
dim_names = data["id"]
|
||||||
|
|
||||||
|
assert len(dim_names) == len(sizes)
|
||||||
|
|
||||||
|
# Compute strides (row-major): stride[i] = product of sizes[i+1:]
|
||||||
|
strides = [1] * len(sizes)
|
||||||
|
for i in range(len(sizes) - 2, -1, -1):
|
||||||
|
strides[i] = strides[i + 1] * sizes[i + 1]
|
||||||
|
|
||||||
|
# Resolve fixed dimension positions → compute base offset
|
||||||
|
offset = 0
|
||||||
|
geo_idx = None
|
||||||
|
time_idx = None
|
||||||
|
|
||||||
|
for i, name in enumerate(dim_names):
|
||||||
|
if name == geo_dim:
|
||||||
|
geo_idx = i
|
||||||
|
elif name == time_dim:
|
||||||
|
time_idx = i
|
||||||
|
elif name in filters:
|
||||||
|
cat_index = dims[name]["category"]["index"]
|
||||||
|
code = filters[name]
|
||||||
|
assert code in cat_index, (
|
||||||
|
f"Filter value {code!r} not in dimension {name!r}. "
|
||||||
|
f"Available: {list(cat_index.keys())[:10]}..."
|
||||||
|
)
|
||||||
|
offset += cat_index[code] * strides[i]
|
||||||
|
else:
|
||||||
|
# Dimension not filtered and not geo/time — must have size 1
|
||||||
|
assert sizes[i] == 1, f"Dimension {name!r} has size {sizes[i]} but no filter provided"
|
||||||
|
|
||||||
|
assert geo_idx is not None, f"geo_dim {geo_dim!r} not found in {dim_names}"
|
||||||
|
assert time_idx is not None, f"time_dim {time_dim!r} not found in {dim_names}"
|
||||||
|
|
||||||
|
geo_index = dims[geo_dim]["category"]["index"]
|
||||||
|
time_index = dims[time_dim]["category"]["index"]
|
||||||
|
geo_by_pos = {pos: code for code, pos in geo_index.items()}
|
||||||
|
time_by_pos = {pos: code for code, pos in time_index.items()}
|
||||||
|
|
||||||
|
geo_stride = strides[geo_idx]
|
||||||
|
time_stride = strides[time_idx]
|
||||||
|
|
||||||
|
rows: list[dict] = []
|
||||||
|
for geo_pos in range(sizes[geo_idx]):
|
||||||
|
geo_code = geo_by_pos[geo_pos]
|
||||||
|
for time_pos in range(sizes[time_idx]):
|
||||||
|
idx = offset + geo_pos * geo_stride + time_pos * time_stride
|
||||||
|
val = values.get(str(idx))
|
||||||
|
if val is not None:
|
||||||
|
rows.append(
|
||||||
|
{
|
||||||
|
"geo_code": geo_code,
|
||||||
|
"ref_year": time_by_pos[time_pos],
|
||||||
|
"value": val,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return rows
|
||||||
|
|
||||||
|
|
||||||
def _etag_path(dest: Path) -> Path:
|
def _etag_path(dest: Path) -> Path:
|
||||||
@@ -35,8 +132,12 @@ def _fetch_with_etag(
|
|||||||
url: str,
|
url: str,
|
||||||
dest: Path,
|
dest: Path,
|
||||||
session: niquests.Session,
|
session: niquests.Session,
|
||||||
|
dataset_config: dict | None = None,
|
||||||
) -> int:
|
) -> int:
|
||||||
"""GET url with If-None-Match etag. Returns bytes_written (0 if 304)."""
|
"""GET url with If-None-Match etag. Pre-processes JSON-stat if config given.
|
||||||
|
|
||||||
|
Returns bytes_written (0 if 304).
|
||||||
|
"""
|
||||||
etag_file = _etag_path(dest)
|
etag_file = _etag_path(dest)
|
||||||
headers: dict[str, str] = {}
|
headers: dict[str, str] = {}
|
||||||
|
|
||||||
@@ -49,7 +150,21 @@ def _fetch_with_etag(
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
bytes_written = write_gzip_atomic(dest, resp.content)
|
|
||||||
|
if dataset_config:
|
||||||
|
raw = resp.json()
|
||||||
|
rows = _parse_jsonstat(
|
||||||
|
raw,
|
||||||
|
filters=dataset_config["filters"],
|
||||||
|
geo_dim=dataset_config["geo_dim"],
|
||||||
|
time_dim=dataset_config["time_dim"],
|
||||||
|
)
|
||||||
|
payload = json.dumps({"rows": rows, "count": len(rows)}).encode()
|
||||||
|
logger.info("parsed %d records", len(rows))
|
||||||
|
else:
|
||||||
|
payload = resp.content
|
||||||
|
|
||||||
|
bytes_written = write_gzip_atomic(dest, payload)
|
||||||
|
|
||||||
if etag := resp.headers.get("etag"):
|
if etag := resp.headers.get("etag"):
|
||||||
etag_file.parent.mkdir(parents=True, exist_ok=True)
|
etag_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
@@ -70,13 +185,13 @@ def extract(
|
|||||||
files_skipped = 0
|
files_skipped = 0
|
||||||
bytes_written_total = 0
|
bytes_written_total = 0
|
||||||
|
|
||||||
for dataset_code in DATASETS:
|
for dataset_code, config in DATASETS.items():
|
||||||
url = f"{EUROSTAT_BASE_URL}/{dataset_code}?format=JSON&lang=EN"
|
url = f"{EUROSTAT_BASE_URL}/{dataset_code}?format=JSON&lang=EN"
|
||||||
dest_dir = landing_path(landing_dir, "eurostat", year, month)
|
dest_dir = landing_path(landing_dir, "eurostat", year, month)
|
||||||
dest = dest_dir / f"{dataset_code}.json.gz"
|
dest = dest_dir / f"{dataset_code}.json.gz"
|
||||||
|
|
||||||
logger.info("GET %s", dataset_code)
|
logger.info("GET %s", dataset_code)
|
||||||
bytes_written = _fetch_with_etag(url, dest, session)
|
bytes_written = _fetch_with_etag(url, dest, session, config)
|
||||||
|
|
||||||
if bytes_written > 0:
|
if bytes_written > 0:
|
||||||
logger.info("%s updated — %s bytes compressed", dataset_code, f"{bytes_written:,}")
|
logger.info("%s updated — %s bytes compressed", dataset_code, f"{bytes_written:,}")
|
||||||
|
|||||||
@@ -15,7 +15,8 @@ eurostat_cities AS (
|
|||||||
city_code,
|
city_code,
|
||||||
country_code,
|
country_code,
|
||||||
population,
|
population,
|
||||||
ref_year
|
ref_year,
|
||||||
|
LOWER(REPLACE(city_code, country_code, '')) AS city_slug_raw
|
||||||
FROM staging.stg_population
|
FROM staging.stg_population
|
||||||
QUALIFY ROW_NUMBER() OVER (PARTITION BY city_code ORDER BY ref_year DESC) = 1
|
QUALIFY ROW_NUMBER() OVER (PARTITION BY city_code ORDER BY ref_year DESC) = 1
|
||||||
),
|
),
|
||||||
@@ -30,16 +31,6 @@ venue_counts AS (
|
|||||||
FROM foundation.dim_venues
|
FROM foundation.dim_venues
|
||||||
WHERE city IS NOT NULL AND city != ''
|
WHERE city IS NOT NULL AND city != ''
|
||||||
GROUP BY country_code, city
|
GROUP BY country_code, city
|
||||||
),
|
|
||||||
-- Eurostat city label mapping to canonical city names
|
|
||||||
-- (Eurostat uses codes like DE001C → Berlin; we keep both)
|
|
||||||
eurostat_labels AS (
|
|
||||||
SELECT DISTINCT
|
|
||||||
city_code,
|
|
||||||
country_code,
|
|
||||||
-- Derive a slug-friendly city name from the code as fallback
|
|
||||||
LOWER(REPLACE(city_code, country_code, '')) AS city_slug_raw
|
|
||||||
FROM eurostat_cities
|
|
||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
ec.city_code,
|
ec.city_code,
|
||||||
|
|||||||
@@ -19,8 +19,8 @@ WITH parsed AS (
|
|||||||
tenant -> 'address' ->> 'city' AS city,
|
tenant -> 'address' ->> 'city' AS city,
|
||||||
tenant -> 'address' ->> 'postal_code' AS postal_code,
|
tenant -> 'address' ->> 'postal_code' AS postal_code,
|
||||||
tenant -> 'address' ->> 'country_code' AS country_code,
|
tenant -> 'address' ->> 'country_code' AS country_code,
|
||||||
TRY_CAST(tenant -> 'address' ->> 'coordinate_lat' AS DOUBLE) AS lat,
|
TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lat' AS DOUBLE) AS lat,
|
||||||
TRY_CAST(tenant -> 'address' ->> 'coordinate_lon' AS DOUBLE) AS lon,
|
TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lon' AS DOUBLE) AS lon,
|
||||||
tenant ->> 'sport_ids' AS sport_ids_raw,
|
tenant ->> 'sport_ids' AS sport_ids_raw,
|
||||||
tenant ->> 'tenant_type' AS tenant_type,
|
tenant ->> 'tenant_type' AS tenant_type,
|
||||||
filename AS source_file,
|
filename AS source_file,
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
-- Eurostat Urban Audit city population (dataset: urb_cpop1).
|
-- Eurostat Urban Audit city population (dataset: urb_cpop1).
|
||||||
-- Reads landing zone JSON directly and parses the Eurostat multidimensional format.
|
-- Reads pre-processed landing zone JSON (extractor normalizes JSON-stat to flat rows).
|
||||||
-- One row per (city_code, year) with validated population values.
|
-- One row per (city_code, year) with validated population values.
|
||||||
--
|
--
|
||||||
-- Source: data/landing/eurostat/{year}/{month}/urb_cpop1.json.gz
|
-- Source: data/landing/eurostat/{year}/{month}/urb_cpop1.json.gz
|
||||||
@@ -11,51 +11,26 @@ MODEL (
|
|||||||
grain (city_code, ref_year)
|
grain (city_code, ref_year)
|
||||||
);
|
);
|
||||||
|
|
||||||
WITH raw AS (
|
WITH parsed AS (
|
||||||
SELECT raw_json, filename
|
|
||||||
FROM read_json(
|
|
||||||
@LANDING_DIR || '/eurostat/*/*/urb_cpop1.json.gz',
|
|
||||||
format = 'auto',
|
|
||||||
filename = true,
|
|
||||||
columns = { 'raw_json': 'JSON' }
|
|
||||||
)
|
|
||||||
),
|
|
||||||
cities AS (
|
|
||||||
SELECT
|
SELECT
|
||||||
city_code,
|
row ->> 'geo_code' AS geo_code,
|
||||||
(city_pos)::INTEGER AS city_pos,
|
row ->> 'ref_year' AS ref_year,
|
||||||
filename, raw_json,
|
TRY_CAST(row ->> 'value' AS DOUBLE) AS population,
|
||||||
(json_extract(raw_json, '$.size[1]'))::INTEGER AS n_times
|
filename AS source_file,
|
||||||
FROM raw,
|
CURRENT_DATE AS extracted_date
|
||||||
LATERAL (
|
FROM (
|
||||||
SELECT key AS city_code, value::INTEGER AS city_pos
|
SELECT UNNEST(rows) AS row, filename
|
||||||
FROM json_each(json_extract(raw_json, '$.dimension.cities.category.index'))
|
FROM read_json(
|
||||||
|
@LANDING_DIR || '/eurostat/*/*/urb_cpop1.json.gz',
|
||||||
|
format = 'auto',
|
||||||
|
filename = true
|
||||||
|
)
|
||||||
)
|
)
|
||||||
),
|
WHERE (row ->> 'geo_code') IS NOT NULL
|
||||||
times AS (
|
|
||||||
SELECT key AS ref_year, value::INTEGER AS time_pos
|
|
||||||
FROM (SELECT raw_json FROM raw LIMIT 1),
|
|
||||||
LATERAL (
|
|
||||||
SELECT key, value
|
|
||||||
FROM json_each(json_extract(raw_json, '$.dimension.time.category.index'))
|
|
||||||
)
|
|
||||||
),
|
|
||||||
parsed AS (
|
|
||||||
SELECT
|
|
||||||
c.city_code,
|
|
||||||
t.ref_year,
|
|
||||||
TRY_CAST(
|
|
||||||
json_extract(c.raw_json, '$.' || (c.city_pos * c.n_times + t.time_pos)::TEXT)
|
|
||||||
AS DOUBLE
|
|
||||||
) AS population,
|
|
||||||
c.filename AS source_file,
|
|
||||||
CURRENT_DATE AS extracted_date
|
|
||||||
FROM cities c
|
|
||||||
CROSS JOIN times t
|
|
||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
UPPER(city_code) AS city_code,
|
UPPER(geo_code) AS city_code,
|
||||||
UPPER(LEFT(city_code, 2)) AS country_code,
|
UPPER(LEFT(geo_code, 2)) AS country_code,
|
||||||
ref_year::INTEGER AS ref_year,
|
ref_year::INTEGER AS ref_year,
|
||||||
population::BIGINT AS population,
|
population::BIGINT AS population,
|
||||||
extracted_date
|
extracted_date
|
||||||
|
|||||||
Reference in New Issue
Block a user