diff --git a/extract/padelnomics_extract/src/padelnomics_extract/eurostat_city_labels.py b/extract/padelnomics_extract/src/padelnomics_extract/eurostat_city_labels.py index 02e4c73..e1cd423 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/eurostat_city_labels.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/eurostat_city_labels.py @@ -33,36 +33,30 @@ CODELIST_URL = ( def _parse_sdmx_codelist(data: dict) -> list[dict]: - """Extract city_code → city_name pairs from SDMX codelist JSON response. + """Extract city_code → city_name pairs from Eurostat dimension JSON response. - The SDMX 2.1 JSON structure varies by endpoint. This endpoint returns a - structure.codelists[0].codes list where each code has id and name[0].name. + Eurostat's /codelist endpoint (format=JSON) returns a compact dimension + object where data["category"]["label"] is a flat dict mapping code → name: + {"BE001C": "Bruxelles/Brussel (greater city)", "DE001C": "Berlin", ...} + + Country-level entries (e.g. "BE") are included in the same dict; we filter + them out by requiring a digit in the code (city codes look like DE001C). """ try: - codelists = data["structure"]["codelists"] + label_map = data["category"]["label"] except (KeyError, TypeError) as e: - raise ValueError(f"Unexpected SDMX structure — missing codelists: {e}") from e + raise ValueError(f"Unexpected SDMX structure — missing category.label: {e}") from e - assert len(codelists) > 0, "SDMX response has empty codelists array" - - codes = codelists[0].get("codes", []) - assert len(codes) > 0, "SDMX codelist has no codes — API response may have changed" + assert len(label_map) > 0, "SDMX category.label is empty — API response may have changed" rows: list[dict] = [] - for code in codes: - city_code = code.get("id", "").strip() - if not city_code: + for city_code, city_name in label_map.items(): + city_code = city_code.strip() + city_name = str(city_name).strip() if city_name else "" + # City codes contain digits (e.g. DE001C); country codes (e.g. DE) do not + if not city_code or not city_name or not any(c.isdigit() for c in city_code): continue - # Name is a list of {lang, name} objects; pick the first (EN requested above) - names = code.get("name", []) - if isinstance(names, list) and names: - city_name = names[0].get("name", "").strip() - elif isinstance(names, str): - city_name = names.strip() - else: - continue - if city_name: - rows.append({"city_code": city_code, "city_name": city_name}) + rows.append({"city_code": city_code, "city_name": city_name}) return rows diff --git a/extract/padelnomics_extract/src/padelnomics_extract/ons_uk.py b/extract/padelnomics_extract/src/padelnomics_extract/ons_uk.py index 5e49783..4e9134b 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/ons_uk.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/ons_uk.py @@ -1,14 +1,20 @@ """ONS (Office for National Statistics) UK population extractor. -Fetches 2021 Census population by Local Authority District (LAD) from the ONS -beta API. No authentication required. +Fetches mid-year population estimates by Local Authority from the ONS Dataset API. +Downloads a CSV (~68MB) containing population by LAD × year × sex × age, then +filters to sex='all' for the most recent year and sums across ages to get +total population per LAD. + +No authentication required. Landing: {LANDING_DIR}/ons_uk/{year}/{month}/lad_population.json.gz Output: {"rows": [{"lad_code": "E08000003", "lad_name": "Manchester", - "population": 553230, "ref_year": 2021, + "population": 553230, "ref_year": 2022, "country_code": "GB"}], "count": N} """ +import csv +import io import json import sqlite3 from pathlib import Path @@ -22,79 +28,74 @@ logger = setup_logging("padelnomics.extract.ons_uk") EXTRACTOR_NAME = "ons_uk" -# ONS beta API — 2021 Census population estimates by Local Authority District. -# TS007A = "Age by single year" dataset; aggregate gives total population per LAD. -# We use the observations endpoint which returns flat rows. -# limit=500 covers all ~380 LADs in England, Wales, Scotland, and Northern Ireland. -ONS_BASE_URL = ( - "https://api.beta.ons.gov.uk/v1/datasets/TS007A/editions/2021/versions/1" -) +ONS_BASE = "https://api.beta.ons.gov.uk/v1" +DATASET_ID = "mid-year-pop-est" +# Most recent edition with England & Wales LAD data +EDITION = "mid-2022-england-wales" -REF_YEAR = 2021 MIN_POPULATION = 50_000 -# ONS rate limit is 120 requests per 10 seconds; a single paginated call is fine. -PAGE_SIZE = 500 -MAX_PAGES = 10 # safety bound; all LADs fit in page 1 at limit=500 + +# CSV columns: v4_0, calendar-years, Time, administrative-geography, Geography, sex, Sex, single-year-of-age, Age +COL_VALUE = "v4_0" +COL_YEAR = "calendar-years" +COL_LAD_CODE = "administrative-geography" +COL_LAD_NAME = "Geography" +COL_SEX = "sex" -def _fetch_all_observations(session: niquests.Session) -> list[dict]: - """Fetch all LAD population rows, paginating if needed.""" - rows: list[dict] = [] - offset = 0 +def _get_csv_url(session: niquests.Session) -> tuple[str, int]: + """Fetch the latest CSV download URL from the ONS Dataset API. - for page in range(MAX_PAGES): - url = f"{ONS_BASE_URL}/observations?geography=*&age=0&limit={PAGE_SIZE}&offset={offset}" - resp = session.get(url, timeout=HTTP_TIMEOUT_SECONDS) - resp.raise_for_status() - data = resp.json() - - observations = data.get("observations", []) - if not observations: - break - - for obs in observations: - # Each observation: {dimensions: [{id: "geography", option: {id: "E08000003", label: "Manchester"}}...], observation: "553230"} - geo_dim = next( - (d for d in obs.get("dimensions", []) if d.get("dimension_id") == "geography"), - None, - ) - if not geo_dim: - continue - lad_code = geo_dim.get("option", {}).get("id", "").strip() - lad_name = geo_dim.get("option", {}).get("label", "").strip() - if not lad_code or not lad_name: - continue - try: - population = int(obs.get("observation", "0").replace(",", "")) - except (ValueError, TypeError): - continue - rows.append({ - "lad_code": lad_code, - "lad_name": lad_name, - "population": population, - }) - - total = data.get("total_observations", len(rows)) - offset += len(observations) - if offset >= total: - break - - logger.info("fetched page %d (%d rows so far)", page + 1, len(rows)) - - return rows - - -def _aggregate_by_lad(raw_rows: list[dict]) -> list[dict]: - """Sum population across all age groups per LAD. - - TS007A breaks population down by single year of age, so we need to aggregate. + Returns (url, ref_year). """ + versions_url = f"{ONS_BASE}/datasets/{DATASET_ID}/editions/{EDITION}/versions" + resp = session.get(versions_url, timeout=HTTP_TIMEOUT_SECONDS) + resp.raise_for_status() + versions = resp.json().get("items", []) + assert versions, f"No versions found for {DATASET_ID}/{EDITION}" + + # Sort by version number (highest = latest) and pick the latest + latest = max(versions, key=lambda v: v.get("version", 0)) + csv_url = latest["downloads"]["csv"]["href"] + + # Extract year from edition name (mid-2022-england-wales → 2022) + year_part = EDITION.split("-")[1] + ref_year = int(year_part) + + return csv_url, ref_year + + +def _aggregate_csv(content: bytes, ref_year: int) -> list[dict]: + """Parse the ONS CSV and aggregate total population per LAD for the target year. + + The CSV has one row per (year × LAD × sex × age); we filter to sex='all' + and the target year, then sum the value column per LAD to get total population. + """ + reader = csv.DictReader(io.StringIO(content.decode("utf-8"))) totals: dict[str, dict] = {} - for row in raw_rows: - key = row["lad_code"] - if key not in totals: - totals[key] = {"lad_code": row["lad_code"], "lad_name": row["lad_name"], "population": 0} - totals[key]["population"] += row["population"] + target_year = str(ref_year) + + for row in reader: + if row.get(COL_SEX, "").lower() != "all": + continue + if row.get(COL_YEAR, "") != target_year: + continue + lad_code = row.get(COL_LAD_CODE, "").strip() + lad_name = row.get(COL_LAD_NAME, "").strip() + if not lad_code or not lad_name: + continue + # Skip aggregate geographies (country/region level codes start differently from LADs) + # LAD codes: E06*, E07*, E08*, E09*, W06*, S12*, N09* + if not any(lad_code.startswith(p) for p in ("E0", "W0", "S1", "N0")): + continue + try: + value = int(row.get(COL_VALUE, "0").replace(",", "")) + except ValueError: + continue + if lad_code not in totals: + totals[lad_code] = {"lad_code": lad_code, "lad_name": lad_name, "population": 0} + totals[lad_code]["population"] += value + return list(totals.values()) @@ -112,16 +113,26 @@ def extract( year, month = year_month.split("/") - logger.info("GET ONS TS007A LAD population (2021 Census)") - raw_rows = _fetch_all_observations(session) - lad_rows = _aggregate_by_lad(raw_rows) + logger.info("fetching ONS CSV download URL for %s/%s", DATASET_ID, EDITION) + csv_url, ref_year = _get_csv_url(session) + + logger.info("GET %s (ref_year=%d)", csv_url, ref_year) + resp = session.get(csv_url, timeout=HTTP_TIMEOUT_SECONDS * 10) + resp.raise_for_status() + assert len(resp.content) > 1_000_000, ( + f"ONS CSV too small ({len(resp.content)} bytes) — download may have failed" + ) + + logger.info("aggregating %d bytes CSV", len(resp.content)) + lad_rows = _aggregate_csv(resp.content, ref_year) + assert len(lad_rows) > 100, f"Expected >100 LADs from CSV, got {len(lad_rows)}" filtered = [ { "lad_code": r["lad_code"], "lad_name": r["lad_name"], "population": r["population"], - "ref_year": REF_YEAR, + "ref_year": ref_year, "country_code": "GB", } for r in lad_rows