merge: Phase 2a + 2b — EU NUTS-2 spatial join + US state income

Phase 2a: NUTS-1 regional income for Germany (16 Bundesländer via admin1→NUTS-1 mapping)
Phase 2b: EU-wide NUTS-2 via GISCO spatial join + US Census ACS state income
- All EU-27+EFTA+UK locations now auto-resolve to NUTS-2 via ST_Contains
- Germany gets sub-Bundesland (38 Regierungsbezirke) differentiation
- US gets state-level income with PPS normalisation
- Income cascade: NUTS-2 → NUTS-1 → US state → country-level

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-27 11:11:36 +01:00
12 changed files with 511 additions and 11 deletions

View File

@@ -18,6 +18,7 @@ extract-playtomic-availability = "padelnomics_extract.playtomic_availability:mai
extract-playtomic-recheck = "padelnomics_extract.playtomic_availability:main_recheck"
extract-eurostat-city-labels = "padelnomics_extract.eurostat_city_labels:main"
extract-census-usa = "padelnomics_extract.census_usa:main"
extract-census-usa-income = "padelnomics_extract.census_usa_income:main"
extract-ons-uk = "padelnomics_extract.ons_uk:main"
extract-geonames = "padelnomics_extract.geonames:main"

View File

@@ -18,6 +18,8 @@ from graphlib import TopologicalSorter
from ._shared import run_extractor, setup_logging
from .census_usa import EXTRACTOR_NAME as CENSUS_USA_NAME
from .census_usa import extract as extract_census_usa
from .census_usa_income import EXTRACTOR_NAME as CENSUS_USA_INCOME_NAME
from .census_usa_income import extract as extract_census_usa_income
from .eurostat import EXTRACTOR_NAME as EUROSTAT_NAME
from .eurostat import extract as extract_eurostat
from .eurostat_city_labels import EXTRACTOR_NAME as EUROSTAT_CITY_LABELS_NAME
@@ -45,6 +47,7 @@ EXTRACTORS: dict[str, tuple] = {
EUROSTAT_NAME: (extract_eurostat, []),
EUROSTAT_CITY_LABELS_NAME: (extract_eurostat_city_labels, []),
CENSUS_USA_NAME: (extract_census_usa, []),
CENSUS_USA_INCOME_NAME: (extract_census_usa_income, []),
ONS_UK_NAME: (extract_ons_uk, []),
GEONAMES_NAME: (extract_geonames, []),
TENANTS_NAME: (extract_tenants, []),

View File

@@ -0,0 +1,121 @@
"""US Census Bureau ACS 5-year state-level median household income extractor.
Fetches state-level median household income from the American Community Survey
5-year estimates. Requires a free API key from api.census.gov.
Env var: CENSUS_API_KEY (same key as census_usa.py)
Landing: {LANDING_DIR}/census_usa/{year}/{month}/acs5_state_income.json.gz
Output: {"rows": [{"state_fips": "06", "state_name": "California",
"median_income_usd": 91905, "ref_year": 2023,
"country_code": "US"}], "count": N}
"""
import json
import os
import sqlite3
from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import get_last_cursor, landing_path, write_gzip_atomic
logger = setup_logging("padelnomics.extract.census_usa_income")
EXTRACTOR_NAME = "census_usa_income"
# ACS 5-year estimates, 2023 vintage — refreshed annually.
# B19013_001E = median household income in the past 12 months (inflation-adjusted).
ACS_STATE_URL = (
"https://api.census.gov/data/2023/acs/acs5"
"?get=B19013_001E,NAME&for=state:*"
)
REF_YEAR = 2023
MAX_RETRIES = 2
def extract(
landing_dir: Path,
year_month: str,
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Fetch ACS 5-year state-level income. Skips if already run this month."""
api_key = os.environ.get("CENSUS_API_KEY", "").strip()
if not api_key:
logger.warning("CENSUS_API_KEY not set — writing empty placeholder so SQLMesh models can run")
year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "census_usa", year, month)
dest = dest_dir / "acs5_state_income.json.gz"
if not dest.exists():
write_gzip_atomic(dest, b'{"rows": [], "count": 0}')
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
if last_cursor == year_month:
logger.info("already have data for %s — skipping", year_month)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
year, month = year_month.split("/")
url = f"{ACS_STATE_URL}&key={api_key}"
logger.info("GET ACS 5-year state income (vintage %d)", REF_YEAR)
resp = session.get(url, timeout=HTTP_TIMEOUT_SECONDS * 2)
resp.raise_for_status()
raw = resp.json()
assert isinstance(raw, list) and len(raw) > 1, "ACS response must be a non-empty list"
# First row is headers: ["B19013_001E", "NAME", "state"]
headers = raw[0]
assert "B19013_001E" in headers, f"Income column missing from ACS response: {headers}"
income_idx = headers.index("B19013_001E")
name_idx = headers.index("NAME")
state_idx = headers.index("state")
rows: list[dict] = []
for row in raw[1:]:
try:
income = int(row[income_idx])
except (ValueError, TypeError):
# ACS returns -666666666 for suppressed/unavailable values
continue
if income <= 0:
continue
# Full state name from ACS; strip any trailing text after comma
state_name = row[name_idx].split(",")[0].strip()
if not state_name:
continue
rows.append({
"state_fips": row[state_idx],
"state_name": state_name,
"median_income_usd": income,
"ref_year": REF_YEAR,
"country_code": "US",
})
assert len(rows) >= 50, f"Expected ≥50 US states, got {len(rows)} — parse may have failed"
logger.info("parsed %d US state income records", len(rows))
dest_dir = landing_path(landing_dir, "census_usa", year, month)
dest = dest_dir / "acs5_state_income.json.gz"
payload = json.dumps({"rows": rows, "count": len(rows)}).encode()
bytes_written = write_gzip_atomic(dest, payload)
logger.info("written %s bytes compressed", f"{bytes_written:,}")
return {
"files_written": 1,
"files_skipped": 0,
"bytes_written": bytes_written,
"cursor_value": year_month,
}
def main() -> None:
run_extractor(EXTRACTOR_NAME, extract)
if __name__ == "__main__":
main()

View File

@@ -42,6 +42,15 @@ DATASETS: dict[str, dict] = {
"geo_dim": "geo",
"time_dim": "time",
},
"nama_10r_2hhinc": {
"filters": { # Net household income per inhabitant in PPS (NUTS-2 grain, contains NUTS-1)
"unit": "PPS_EU27_2020_HAB",
"na_item": "B6N",
"direct": "BAL",
},
"geo_dim": "geo",
"time_dim": "time",
},
}
@@ -189,6 +198,8 @@ def extract(
for dataset_code, config in DATASETS.items():
url = f"{EUROSTAT_BASE_URL}/{dataset_code}?format=JSON&lang=EN"
for key, val in config.get("filters", {}).items():
url += f"&{key}={val}"
dest_dir = landing_path(landing_dir, "eurostat", year, month)
dest = dest_dir / f"{dataset_code}.json.gz"