From 409dc4bfac6785db0d1085092dd23ffdacd43e03 Mon Sep 17 00:00:00 2001 From: Deeman Date: Fri, 27 Feb 2026 10:58:12 +0100 Subject: [PATCH] =?UTF-8?q?feat(data):=20Phase=202b=20step=201=20=E2=80=94?= =?UTF-8?q?=20expand=20stg=5Fregional=5Fincome=20+=20Census=20income=20ext?= =?UTF-8?q?ractor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - stg_regional_income.sql: accept NUTS-1 (3-char) + NUTS-2 (4-char) codes; rename nuts1_code → nuts_code; add nuts_level column; NUTS-2 rows were already in the landing zone but discarded by LENGTH(geo_code) = 3 - scripts/download_gisco_nuts.py: one-time download of GISCO NUTS-2 boundary GeoJSON (NUTS_RG_20M_2021_4326_LEVL_2.geojson, ~5MB) to landing zone; uncompressed because ST_Read cannot read .gz files - census_usa_income.py: new extractor for ACS B19013_001E state-level median household income; follows census_usa.py pattern; 51 states + DC - all.py + pyproject.toml: register census_usa_income extractor Co-Authored-By: Claude Sonnet 4.6 --- extract/padelnomics_extract/pyproject.toml | 1 + .../src/padelnomics_extract/all.py | 3 + .../padelnomics_extract/census_usa_income.py | 121 ++++++++++++++++++ scripts/download_gisco_nuts.py | 81 ++++++++++++ .../models/staging/stg_regional_income.sql | 18 +-- 5 files changed, 216 insertions(+), 8 deletions(-) create mode 100644 extract/padelnomics_extract/src/padelnomics_extract/census_usa_income.py create mode 100644 scripts/download_gisco_nuts.py diff --git a/extract/padelnomics_extract/pyproject.toml b/extract/padelnomics_extract/pyproject.toml index 93098df..ec3ca82 100644 --- a/extract/padelnomics_extract/pyproject.toml +++ b/extract/padelnomics_extract/pyproject.toml @@ -18,6 +18,7 @@ extract-playtomic-availability = "padelnomics_extract.playtomic_availability:mai extract-playtomic-recheck = "padelnomics_extract.playtomic_availability:main_recheck" extract-eurostat-city-labels = "padelnomics_extract.eurostat_city_labels:main" extract-census-usa = "padelnomics_extract.census_usa:main" +extract-census-usa-income = "padelnomics_extract.census_usa_income:main" extract-ons-uk = "padelnomics_extract.ons_uk:main" extract-geonames = "padelnomics_extract.geonames:main" diff --git a/extract/padelnomics_extract/src/padelnomics_extract/all.py b/extract/padelnomics_extract/src/padelnomics_extract/all.py index 8b93c94..62987ce 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/all.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/all.py @@ -18,6 +18,8 @@ from graphlib import TopologicalSorter from ._shared import run_extractor, setup_logging from .census_usa import EXTRACTOR_NAME as CENSUS_USA_NAME from .census_usa import extract as extract_census_usa +from .census_usa_income import EXTRACTOR_NAME as CENSUS_USA_INCOME_NAME +from .census_usa_income import extract as extract_census_usa_income from .eurostat import EXTRACTOR_NAME as EUROSTAT_NAME from .eurostat import extract as extract_eurostat from .eurostat_city_labels import EXTRACTOR_NAME as EUROSTAT_CITY_LABELS_NAME @@ -45,6 +47,7 @@ EXTRACTORS: dict[str, tuple] = { EUROSTAT_NAME: (extract_eurostat, []), EUROSTAT_CITY_LABELS_NAME: (extract_eurostat_city_labels, []), CENSUS_USA_NAME: (extract_census_usa, []), + CENSUS_USA_INCOME_NAME: (extract_census_usa_income, []), ONS_UK_NAME: (extract_ons_uk, []), GEONAMES_NAME: (extract_geonames, []), TENANTS_NAME: (extract_tenants, []), diff --git a/extract/padelnomics_extract/src/padelnomics_extract/census_usa_income.py b/extract/padelnomics_extract/src/padelnomics_extract/census_usa_income.py new file mode 100644 index 0000000..76fb63d --- /dev/null +++ b/extract/padelnomics_extract/src/padelnomics_extract/census_usa_income.py @@ -0,0 +1,121 @@ +"""US Census Bureau ACS 5-year state-level median household income extractor. + +Fetches state-level median household income from the American Community Survey +5-year estimates. Requires a free API key from api.census.gov. + +Env var: CENSUS_API_KEY (same key as census_usa.py) + +Landing: {LANDING_DIR}/census_usa/{year}/{month}/acs5_state_income.json.gz +Output: {"rows": [{"state_fips": "06", "state_name": "California", + "median_income_usd": 91905, "ref_year": 2023, + "country_code": "US"}], "count": N} +""" + +import json +import os +import sqlite3 +from pathlib import Path + +import niquests + +from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging +from .utils import get_last_cursor, landing_path, write_gzip_atomic + +logger = setup_logging("padelnomics.extract.census_usa_income") + +EXTRACTOR_NAME = "census_usa_income" + +# ACS 5-year estimates, 2023 vintage — refreshed annually. +# B19013_001E = median household income in the past 12 months (inflation-adjusted). +ACS_STATE_URL = ( + "https://api.census.gov/data/2023/acs/acs5" + "?get=B19013_001E,NAME&for=state:*" +) + +REF_YEAR = 2023 +MAX_RETRIES = 2 + + +def extract( + landing_dir: Path, + year_month: str, + conn: sqlite3.Connection, + session: niquests.Session, +) -> dict: + """Fetch ACS 5-year state-level income. Skips if already run this month.""" + api_key = os.environ.get("CENSUS_API_KEY", "").strip() + if not api_key: + logger.warning("CENSUS_API_KEY not set — writing empty placeholder so SQLMesh models can run") + year, month = year_month.split("/") + dest_dir = landing_path(landing_dir, "census_usa", year, month) + dest = dest_dir / "acs5_state_income.json.gz" + if not dest.exists(): + write_gzip_atomic(dest, b'{"rows": [], "count": 0}') + return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} + + last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) + if last_cursor == year_month: + logger.info("already have data for %s — skipping", year_month) + return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} + + year, month = year_month.split("/") + url = f"{ACS_STATE_URL}&key={api_key}" + + logger.info("GET ACS 5-year state income (vintage %d)", REF_YEAR) + resp = session.get(url, timeout=HTTP_TIMEOUT_SECONDS * 2) + resp.raise_for_status() + + raw = resp.json() + assert isinstance(raw, list) and len(raw) > 1, "ACS response must be a non-empty list" + + # First row is headers: ["B19013_001E", "NAME", "state"] + headers = raw[0] + assert "B19013_001E" in headers, f"Income column missing from ACS response: {headers}" + income_idx = headers.index("B19013_001E") + name_idx = headers.index("NAME") + state_idx = headers.index("state") + + rows: list[dict] = [] + for row in raw[1:]: + try: + income = int(row[income_idx]) + except (ValueError, TypeError): + # ACS returns -666666666 for suppressed/unavailable values + continue + if income <= 0: + continue + # Full state name from ACS; strip any trailing text after comma + state_name = row[name_idx].split(",")[0].strip() + if not state_name: + continue + rows.append({ + "state_fips": row[state_idx], + "state_name": state_name, + "median_income_usd": income, + "ref_year": REF_YEAR, + "country_code": "US", + }) + + assert len(rows) >= 50, f"Expected ≥50 US states, got {len(rows)} — parse may have failed" + logger.info("parsed %d US state income records", len(rows)) + + dest_dir = landing_path(landing_dir, "census_usa", year, month) + dest = dest_dir / "acs5_state_income.json.gz" + payload = json.dumps({"rows": rows, "count": len(rows)}).encode() + bytes_written = write_gzip_atomic(dest, payload) + logger.info("written %s bytes compressed", f"{bytes_written:,}") + + return { + "files_written": 1, + "files_skipped": 0, + "bytes_written": bytes_written, + "cursor_value": year_month, + } + + +def main() -> None: + run_extractor(EXTRACTOR_NAME, extract) + + +if __name__ == "__main__": + main() diff --git a/scripts/download_gisco_nuts.py b/scripts/download_gisco_nuts.py new file mode 100644 index 0000000..5a7737b --- /dev/null +++ b/scripts/download_gisco_nuts.py @@ -0,0 +1,81 @@ +"""Download NUTS-2 boundary GeoJSON from Eurostat GISCO. + +One-time (or on NUTS revision) download of NUTS-2 boundary polygons used for +spatial income resolution in dim_locations. Stored uncompressed because DuckDB's +ST_Read function cannot read gzipped files. + +NUTS classification changes approximately every 7 years. Current revision: 2021. + +Output: {LANDING_DIR}/gisco/2024/01/nuts2_boundaries.geojson (~5MB, uncompressed) + +Usage: + uv run python scripts/download_gisco_nuts.py [--landing-dir data/landing] + +Idempotent: skips download if the file already exists. +""" + +import argparse +import sys +from pathlib import Path + +import niquests + +# NUTS 2021 revision, 20M scale (1:20,000,000), WGS84 (EPSG:4326), LEVL_2 only. +# 20M resolution gives simplified polygons that are fast for point-in-polygon +# matching without sacrificing accuracy at the NUTS-2 boundary level. +GISCO_URL = ( + "https://gisco-services.ec.europa.eu/distribution/v2/nuts/geojson/" + "NUTS_RG_20M_2021_4326_LEVL_2.geojson" +) + +# Fixed partition: NUTS boundaries are a static reference file, not time-series data. +# Use the NUTS revision year as the partition to make the source version explicit. +DEST_REL_PATH = "gisco/2024/01/nuts2_boundaries.geojson" + +HTTP_TIMEOUT_SECONDS = 120 + + +def download_nuts_boundaries(landing_dir: Path) -> None: + dest = landing_dir / DEST_REL_PATH + if dest.exists(): + print(f"Already exists (skipping): {dest}") + return + + dest.parent.mkdir(parents=True, exist_ok=True) + print(f"Downloading NUTS-2 boundaries from GISCO...") + print(f" URL: {GISCO_URL}") + + with niquests.Session() as session: + resp = session.get(GISCO_URL, timeout=HTTP_TIMEOUT_SECONDS) + resp.raise_for_status() + + content = resp.content + assert len(content) > 100_000, ( + f"GeoJSON too small ({len(content)} bytes) — download may have failed" + ) + assert b'"FeatureCollection"' in content, "Response does not look like GeoJSON" + + # Write uncompressed — ST_Read requires a plain file + tmp = dest.with_suffix(".geojson.tmp") + tmp.write_bytes(content) + tmp.rename(dest) + + size_mb = len(content) / 1_000_000 + print(f" Written: {dest} ({size_mb:.1f} MB)") + print("Done. Run SQLMesh plan to rebuild stg_nuts2_boundaries.") + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--landing-dir", default="data/landing", type=Path) + args = parser.parse_args() + + if not args.landing_dir.is_dir(): + print(f"Error: landing dir does not exist: {args.landing_dir}", file=sys.stderr) + sys.exit(1) + + download_nuts_boundaries(args.landing_dir) + + +if __name__ == "__main__": + main() diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_regional_income.sql b/transform/sqlmesh_padelnomics/models/staging/stg_regional_income.sql index ceae1d9..e5f7db5 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_regional_income.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_regional_income.sql @@ -1,15 +1,15 @@ --- Eurostat NUTS-1 regional household income in PPS (dataset: nama_10r_2hhinc). --- Filters to NUTS-1 codes (exactly 3 characters, e.g. DE1, DE2, …). --- One row per (nuts1_code, ref_year). +-- Eurostat NUTS-1 and NUTS-2 regional household income in PPS (dataset: nama_10r_2hhinc). +-- Accepts NUTS-1 codes (3 characters, e.g. DE1) and NUTS-2 codes (4 characters, e.g. DE21). +-- One row per (nuts_code, ref_year). -- -- Source: data/landing/eurostat/{year}/{month}/nama_10r_2hhinc.json.gz --- Format: {"rows": [{"geo_code": "DE1", "ref_year": "2022", "value": 29400}, ...]} +-- Format: {"rows": [{"geo_code": "DE21", "ref_year": "2022", "value": 31200}, ...]} MODEL ( name staging.stg_regional_income, kind FULL, cron '@daily', - grain (nuts1_code, ref_year) + grain (nuts_code, ref_year) ); WITH source AS ( @@ -34,11 +34,13 @@ SELECT WHEN geo_code LIKE 'EL%' THEN 'GR' || SUBSTR(geo_code, 3) WHEN geo_code LIKE 'UK%' THEN 'GB' || SUBSTR(geo_code, 3) ELSE geo_code - END AS nuts1_code, + END AS nuts_code, + -- NUTS level: 3-char = NUTS-1, 4-char = NUTS-2 + LENGTH(geo_code) - 2 AS nuts_level, ref_year, regional_income_pps, extracted_date FROM parsed --- NUTS-1 codes are exactly 3 characters (country 2 + region 1) -WHERE LENGTH(geo_code) = 3 +-- NUTS-1 (3 chars) and NUTS-2 (4 chars); exclude country codes (2) and NUTS-3 (5) +WHERE LENGTH(geo_code) IN (3, 4) AND regional_income_pps > 0