fix(extract): correct Eurostat + ONS response parsers against live APIs

eurostat_city_labels: API returns compact dimension JSON (category.label dict),
not SDMX 2.1 nested codelists structure. Fixed parser to read from
data["category"]["label"]. 1771 city codes fetched successfully.

ons_uk: observations endpoint (TS007A) is 404. Switched to CSV download via
/datasets/mid-year-pop-est/editions/mid-2022-england-wales — fetches ~68MB CSV,
filters to sex='all' + target year, aggregates population per LAD. 316 LADs ≥50K.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-24 00:26:13 +01:00
parent bd1d29468a
commit 06cbdf80dc
2 changed files with 100 additions and 95 deletions

View File

@@ -33,36 +33,30 @@ CODELIST_URL = (
def _parse_sdmx_codelist(data: dict) -> list[dict]: def _parse_sdmx_codelist(data: dict) -> list[dict]:
"""Extract city_code → city_name pairs from SDMX codelist JSON response. """Extract city_code → city_name pairs from Eurostat dimension JSON response.
The SDMX 2.1 JSON structure varies by endpoint. This endpoint returns a Eurostat's /codelist endpoint (format=JSON) returns a compact dimension
structure.codelists[0].codes list where each code has id and name[0].name. object where data["category"]["label"] is a flat dict mapping code → name:
{"BE001C": "Bruxelles/Brussel (greater city)", "DE001C": "Berlin", ...}
Country-level entries (e.g. "BE") are included in the same dict; we filter
them out by requiring a digit in the code (city codes look like DE001C).
""" """
try: try:
codelists = data["structure"]["codelists"] label_map = data["category"]["label"]
except (KeyError, TypeError) as e: except (KeyError, TypeError) as e:
raise ValueError(f"Unexpected SDMX structure — missing codelists: {e}") from e raise ValueError(f"Unexpected SDMX structure — missing category.label: {e}") from e
assert len(codelists) > 0, "SDMX response has empty codelists array" assert len(label_map) > 0, "SDMX category.label is empty — API response may have changed"
codes = codelists[0].get("codes", [])
assert len(codes) > 0, "SDMX codelist has no codes — API response may have changed"
rows: list[dict] = [] rows: list[dict] = []
for code in codes: for city_code, city_name in label_map.items():
city_code = code.get("id", "").strip() city_code = city_code.strip()
if not city_code: city_name = str(city_name).strip() if city_name else ""
# City codes contain digits (e.g. DE001C); country codes (e.g. DE) do not
if not city_code or not city_name or not any(c.isdigit() for c in city_code):
continue continue
# Name is a list of {lang, name} objects; pick the first (EN requested above) rows.append({"city_code": city_code, "city_name": city_name})
names = code.get("name", [])
if isinstance(names, list) and names:
city_name = names[0].get("name", "").strip()
elif isinstance(names, str):
city_name = names.strip()
else:
continue
if city_name:
rows.append({"city_code": city_code, "city_name": city_name})
return rows return rows

View File

@@ -1,14 +1,20 @@
"""ONS (Office for National Statistics) UK population extractor. """ONS (Office for National Statistics) UK population extractor.
Fetches 2021 Census population by Local Authority District (LAD) from the ONS Fetches mid-year population estimates by Local Authority from the ONS Dataset API.
beta API. No authentication required. Downloads a CSV (~68MB) containing population by LAD × year × sex × age, then
filters to sex='all' for the most recent year and sums across ages to get
total population per LAD.
No authentication required.
Landing: {LANDING_DIR}/ons_uk/{year}/{month}/lad_population.json.gz Landing: {LANDING_DIR}/ons_uk/{year}/{month}/lad_population.json.gz
Output: {"rows": [{"lad_code": "E08000003", "lad_name": "Manchester", Output: {"rows": [{"lad_code": "E08000003", "lad_name": "Manchester",
"population": 553230, "ref_year": 2021, "population": 553230, "ref_year": 2022,
"country_code": "GB"}], "count": N} "country_code": "GB"}], "count": N}
""" """
import csv
import io
import json import json
import sqlite3 import sqlite3
from pathlib import Path from pathlib import Path
@@ -22,79 +28,74 @@ logger = setup_logging("padelnomics.extract.ons_uk")
EXTRACTOR_NAME = "ons_uk" EXTRACTOR_NAME = "ons_uk"
# ONS beta API — 2021 Census population estimates by Local Authority District. ONS_BASE = "https://api.beta.ons.gov.uk/v1"
# TS007A = "Age by single year" dataset; aggregate gives total population per LAD. DATASET_ID = "mid-year-pop-est"
# We use the observations endpoint which returns flat rows. # Most recent edition with England & Wales LAD data
# limit=500 covers all ~380 LADs in England, Wales, Scotland, and Northern Ireland. EDITION = "mid-2022-england-wales"
ONS_BASE_URL = (
"https://api.beta.ons.gov.uk/v1/datasets/TS007A/editions/2021/versions/1"
)
REF_YEAR = 2021
MIN_POPULATION = 50_000 MIN_POPULATION = 50_000
# ONS rate limit is 120 requests per 10 seconds; a single paginated call is fine.
PAGE_SIZE = 500 # CSV columns: v4_0, calendar-years, Time, administrative-geography, Geography, sex, Sex, single-year-of-age, Age
MAX_PAGES = 10 # safety bound; all LADs fit in page 1 at limit=500 COL_VALUE = "v4_0"
COL_YEAR = "calendar-years"
COL_LAD_CODE = "administrative-geography"
COL_LAD_NAME = "Geography"
COL_SEX = "sex"
def _fetch_all_observations(session: niquests.Session) -> list[dict]: def _get_csv_url(session: niquests.Session) -> tuple[str, int]:
"""Fetch all LAD population rows, paginating if needed.""" """Fetch the latest CSV download URL from the ONS Dataset API.
rows: list[dict] = []
offset = 0
for page in range(MAX_PAGES): Returns (url, ref_year).
url = f"{ONS_BASE_URL}/observations?geography=*&age=0&limit={PAGE_SIZE}&offset={offset}"
resp = session.get(url, timeout=HTTP_TIMEOUT_SECONDS)
resp.raise_for_status()
data = resp.json()
observations = data.get("observations", [])
if not observations:
break
for obs in observations:
# Each observation: {dimensions: [{id: "geography", option: {id: "E08000003", label: "Manchester"}}...], observation: "553230"}
geo_dim = next(
(d for d in obs.get("dimensions", []) if d.get("dimension_id") == "geography"),
None,
)
if not geo_dim:
continue
lad_code = geo_dim.get("option", {}).get("id", "").strip()
lad_name = geo_dim.get("option", {}).get("label", "").strip()
if not lad_code or not lad_name:
continue
try:
population = int(obs.get("observation", "0").replace(",", ""))
except (ValueError, TypeError):
continue
rows.append({
"lad_code": lad_code,
"lad_name": lad_name,
"population": population,
})
total = data.get("total_observations", len(rows))
offset += len(observations)
if offset >= total:
break
logger.info("fetched page %d (%d rows so far)", page + 1, len(rows))
return rows
def _aggregate_by_lad(raw_rows: list[dict]) -> list[dict]:
"""Sum population across all age groups per LAD.
TS007A breaks population down by single year of age, so we need to aggregate.
""" """
versions_url = f"{ONS_BASE}/datasets/{DATASET_ID}/editions/{EDITION}/versions"
resp = session.get(versions_url, timeout=HTTP_TIMEOUT_SECONDS)
resp.raise_for_status()
versions = resp.json().get("items", [])
assert versions, f"No versions found for {DATASET_ID}/{EDITION}"
# Sort by version number (highest = latest) and pick the latest
latest = max(versions, key=lambda v: v.get("version", 0))
csv_url = latest["downloads"]["csv"]["href"]
# Extract year from edition name (mid-2022-england-wales → 2022)
year_part = EDITION.split("-")[1]
ref_year = int(year_part)
return csv_url, ref_year
def _aggregate_csv(content: bytes, ref_year: int) -> list[dict]:
"""Parse the ONS CSV and aggregate total population per LAD for the target year.
The CSV has one row per (year × LAD × sex × age); we filter to sex='all'
and the target year, then sum the value column per LAD to get total population.
"""
reader = csv.DictReader(io.StringIO(content.decode("utf-8")))
totals: dict[str, dict] = {} totals: dict[str, dict] = {}
for row in raw_rows: target_year = str(ref_year)
key = row["lad_code"]
if key not in totals: for row in reader:
totals[key] = {"lad_code": row["lad_code"], "lad_name": row["lad_name"], "population": 0} if row.get(COL_SEX, "").lower() != "all":
totals[key]["population"] += row["population"] continue
if row.get(COL_YEAR, "") != target_year:
continue
lad_code = row.get(COL_LAD_CODE, "").strip()
lad_name = row.get(COL_LAD_NAME, "").strip()
if not lad_code or not lad_name:
continue
# Skip aggregate geographies (country/region level codes start differently from LADs)
# LAD codes: E06*, E07*, E08*, E09*, W06*, S12*, N09*
if not any(lad_code.startswith(p) for p in ("E0", "W0", "S1", "N0")):
continue
try:
value = int(row.get(COL_VALUE, "0").replace(",", ""))
except ValueError:
continue
if lad_code not in totals:
totals[lad_code] = {"lad_code": lad_code, "lad_name": lad_name, "population": 0}
totals[lad_code]["population"] += value
return list(totals.values()) return list(totals.values())
@@ -112,16 +113,26 @@ def extract(
year, month = year_month.split("/") year, month = year_month.split("/")
logger.info("GET ONS TS007A LAD population (2021 Census)") logger.info("fetching ONS CSV download URL for %s/%s", DATASET_ID, EDITION)
raw_rows = _fetch_all_observations(session) csv_url, ref_year = _get_csv_url(session)
lad_rows = _aggregate_by_lad(raw_rows)
logger.info("GET %s (ref_year=%d)", csv_url, ref_year)
resp = session.get(csv_url, timeout=HTTP_TIMEOUT_SECONDS * 10)
resp.raise_for_status()
assert len(resp.content) > 1_000_000, (
f"ONS CSV too small ({len(resp.content)} bytes) — download may have failed"
)
logger.info("aggregating %d bytes CSV", len(resp.content))
lad_rows = _aggregate_csv(resp.content, ref_year)
assert len(lad_rows) > 100, f"Expected >100 LADs from CSV, got {len(lad_rows)}"
filtered = [ filtered = [
{ {
"lad_code": r["lad_code"], "lad_code": r["lad_code"],
"lad_name": r["lad_name"], "lad_name": r["lad_name"],
"population": r["population"], "population": r["population"],
"ref_year": REF_YEAR, "ref_year": ref_year,
"country_code": "GB", "country_code": "GB",
} }
for r in lad_rows for r in lad_rows