feat(data): Phase 2b step 1 — expand stg_regional_income + Census income extractor
- stg_regional_income.sql: accept NUTS-1 (3-char) + NUTS-2 (4-char) codes; rename nuts1_code → nuts_code; add nuts_level column; NUTS-2 rows were already in the landing zone but discarded by LENGTH(geo_code) = 3 - scripts/download_gisco_nuts.py: one-time download of GISCO NUTS-2 boundary GeoJSON (NUTS_RG_20M_2021_4326_LEVL_2.geojson, ~5MB) to landing zone; uncompressed because ST_Read cannot read .gz files - census_usa_income.py: new extractor for ACS B19013_001E state-level median household income; follows census_usa.py pattern; 51 states + DC - all.py + pyproject.toml: register census_usa_income extractor Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -18,6 +18,7 @@ extract-playtomic-availability = "padelnomics_extract.playtomic_availability:mai
|
|||||||
extract-playtomic-recheck = "padelnomics_extract.playtomic_availability:main_recheck"
|
extract-playtomic-recheck = "padelnomics_extract.playtomic_availability:main_recheck"
|
||||||
extract-eurostat-city-labels = "padelnomics_extract.eurostat_city_labels:main"
|
extract-eurostat-city-labels = "padelnomics_extract.eurostat_city_labels:main"
|
||||||
extract-census-usa = "padelnomics_extract.census_usa:main"
|
extract-census-usa = "padelnomics_extract.census_usa:main"
|
||||||
|
extract-census-usa-income = "padelnomics_extract.census_usa_income:main"
|
||||||
extract-ons-uk = "padelnomics_extract.ons_uk:main"
|
extract-ons-uk = "padelnomics_extract.ons_uk:main"
|
||||||
extract-geonames = "padelnomics_extract.geonames:main"
|
extract-geonames = "padelnomics_extract.geonames:main"
|
||||||
|
|
||||||
|
|||||||
@@ -18,6 +18,8 @@ from graphlib import TopologicalSorter
|
|||||||
from ._shared import run_extractor, setup_logging
|
from ._shared import run_extractor, setup_logging
|
||||||
from .census_usa import EXTRACTOR_NAME as CENSUS_USA_NAME
|
from .census_usa import EXTRACTOR_NAME as CENSUS_USA_NAME
|
||||||
from .census_usa import extract as extract_census_usa
|
from .census_usa import extract as extract_census_usa
|
||||||
|
from .census_usa_income import EXTRACTOR_NAME as CENSUS_USA_INCOME_NAME
|
||||||
|
from .census_usa_income import extract as extract_census_usa_income
|
||||||
from .eurostat import EXTRACTOR_NAME as EUROSTAT_NAME
|
from .eurostat import EXTRACTOR_NAME as EUROSTAT_NAME
|
||||||
from .eurostat import extract as extract_eurostat
|
from .eurostat import extract as extract_eurostat
|
||||||
from .eurostat_city_labels import EXTRACTOR_NAME as EUROSTAT_CITY_LABELS_NAME
|
from .eurostat_city_labels import EXTRACTOR_NAME as EUROSTAT_CITY_LABELS_NAME
|
||||||
@@ -45,6 +47,7 @@ EXTRACTORS: dict[str, tuple] = {
|
|||||||
EUROSTAT_NAME: (extract_eurostat, []),
|
EUROSTAT_NAME: (extract_eurostat, []),
|
||||||
EUROSTAT_CITY_LABELS_NAME: (extract_eurostat_city_labels, []),
|
EUROSTAT_CITY_LABELS_NAME: (extract_eurostat_city_labels, []),
|
||||||
CENSUS_USA_NAME: (extract_census_usa, []),
|
CENSUS_USA_NAME: (extract_census_usa, []),
|
||||||
|
CENSUS_USA_INCOME_NAME: (extract_census_usa_income, []),
|
||||||
ONS_UK_NAME: (extract_ons_uk, []),
|
ONS_UK_NAME: (extract_ons_uk, []),
|
||||||
GEONAMES_NAME: (extract_geonames, []),
|
GEONAMES_NAME: (extract_geonames, []),
|
||||||
TENANTS_NAME: (extract_tenants, []),
|
TENANTS_NAME: (extract_tenants, []),
|
||||||
|
|||||||
@@ -0,0 +1,121 @@
|
|||||||
|
"""US Census Bureau ACS 5-year state-level median household income extractor.
|
||||||
|
|
||||||
|
Fetches state-level median household income from the American Community Survey
|
||||||
|
5-year estimates. Requires a free API key from api.census.gov.
|
||||||
|
|
||||||
|
Env var: CENSUS_API_KEY (same key as census_usa.py)
|
||||||
|
|
||||||
|
Landing: {LANDING_DIR}/census_usa/{year}/{month}/acs5_state_income.json.gz
|
||||||
|
Output: {"rows": [{"state_fips": "06", "state_name": "California",
|
||||||
|
"median_income_usd": 91905, "ref_year": 2023,
|
||||||
|
"country_code": "US"}], "count": N}
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sqlite3
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import niquests
|
||||||
|
|
||||||
|
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
|
||||||
|
from .utils import get_last_cursor, landing_path, write_gzip_atomic
|
||||||
|
|
||||||
|
logger = setup_logging("padelnomics.extract.census_usa_income")
|
||||||
|
|
||||||
|
EXTRACTOR_NAME = "census_usa_income"
|
||||||
|
|
||||||
|
# ACS 5-year estimates, 2023 vintage — refreshed annually.
|
||||||
|
# B19013_001E = median household income in the past 12 months (inflation-adjusted).
|
||||||
|
ACS_STATE_URL = (
|
||||||
|
"https://api.census.gov/data/2023/acs/acs5"
|
||||||
|
"?get=B19013_001E,NAME&for=state:*"
|
||||||
|
)
|
||||||
|
|
||||||
|
REF_YEAR = 2023
|
||||||
|
MAX_RETRIES = 2
|
||||||
|
|
||||||
|
|
||||||
|
def extract(
|
||||||
|
landing_dir: Path,
|
||||||
|
year_month: str,
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
session: niquests.Session,
|
||||||
|
) -> dict:
|
||||||
|
"""Fetch ACS 5-year state-level income. Skips if already run this month."""
|
||||||
|
api_key = os.environ.get("CENSUS_API_KEY", "").strip()
|
||||||
|
if not api_key:
|
||||||
|
logger.warning("CENSUS_API_KEY not set — writing empty placeholder so SQLMesh models can run")
|
||||||
|
year, month = year_month.split("/")
|
||||||
|
dest_dir = landing_path(landing_dir, "census_usa", year, month)
|
||||||
|
dest = dest_dir / "acs5_state_income.json.gz"
|
||||||
|
if not dest.exists():
|
||||||
|
write_gzip_atomic(dest, b'{"rows": [], "count": 0}')
|
||||||
|
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||||
|
|
||||||
|
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
|
||||||
|
if last_cursor == year_month:
|
||||||
|
logger.info("already have data for %s — skipping", year_month)
|
||||||
|
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||||
|
|
||||||
|
year, month = year_month.split("/")
|
||||||
|
url = f"{ACS_STATE_URL}&key={api_key}"
|
||||||
|
|
||||||
|
logger.info("GET ACS 5-year state income (vintage %d)", REF_YEAR)
|
||||||
|
resp = session.get(url, timeout=HTTP_TIMEOUT_SECONDS * 2)
|
||||||
|
resp.raise_for_status()
|
||||||
|
|
||||||
|
raw = resp.json()
|
||||||
|
assert isinstance(raw, list) and len(raw) > 1, "ACS response must be a non-empty list"
|
||||||
|
|
||||||
|
# First row is headers: ["B19013_001E", "NAME", "state"]
|
||||||
|
headers = raw[0]
|
||||||
|
assert "B19013_001E" in headers, f"Income column missing from ACS response: {headers}"
|
||||||
|
income_idx = headers.index("B19013_001E")
|
||||||
|
name_idx = headers.index("NAME")
|
||||||
|
state_idx = headers.index("state")
|
||||||
|
|
||||||
|
rows: list[dict] = []
|
||||||
|
for row in raw[1:]:
|
||||||
|
try:
|
||||||
|
income = int(row[income_idx])
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
# ACS returns -666666666 for suppressed/unavailable values
|
||||||
|
continue
|
||||||
|
if income <= 0:
|
||||||
|
continue
|
||||||
|
# Full state name from ACS; strip any trailing text after comma
|
||||||
|
state_name = row[name_idx].split(",")[0].strip()
|
||||||
|
if not state_name:
|
||||||
|
continue
|
||||||
|
rows.append({
|
||||||
|
"state_fips": row[state_idx],
|
||||||
|
"state_name": state_name,
|
||||||
|
"median_income_usd": income,
|
||||||
|
"ref_year": REF_YEAR,
|
||||||
|
"country_code": "US",
|
||||||
|
})
|
||||||
|
|
||||||
|
assert len(rows) >= 50, f"Expected ≥50 US states, got {len(rows)} — parse may have failed"
|
||||||
|
logger.info("parsed %d US state income records", len(rows))
|
||||||
|
|
||||||
|
dest_dir = landing_path(landing_dir, "census_usa", year, month)
|
||||||
|
dest = dest_dir / "acs5_state_income.json.gz"
|
||||||
|
payload = json.dumps({"rows": rows, "count": len(rows)}).encode()
|
||||||
|
bytes_written = write_gzip_atomic(dest, payload)
|
||||||
|
logger.info("written %s bytes compressed", f"{bytes_written:,}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"files_written": 1,
|
||||||
|
"files_skipped": 0,
|
||||||
|
"bytes_written": bytes_written,
|
||||||
|
"cursor_value": year_month,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
run_extractor(EXTRACTOR_NAME, extract)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
81
scripts/download_gisco_nuts.py
Normal file
81
scripts/download_gisco_nuts.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
"""Download NUTS-2 boundary GeoJSON from Eurostat GISCO.
|
||||||
|
|
||||||
|
One-time (or on NUTS revision) download of NUTS-2 boundary polygons used for
|
||||||
|
spatial income resolution in dim_locations. Stored uncompressed because DuckDB's
|
||||||
|
ST_Read function cannot read gzipped files.
|
||||||
|
|
||||||
|
NUTS classification changes approximately every 7 years. Current revision: 2021.
|
||||||
|
|
||||||
|
Output: {LANDING_DIR}/gisco/2024/01/nuts2_boundaries.geojson (~5MB, uncompressed)
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
uv run python scripts/download_gisco_nuts.py [--landing-dir data/landing]
|
||||||
|
|
||||||
|
Idempotent: skips download if the file already exists.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import niquests
|
||||||
|
|
||||||
|
# NUTS 2021 revision, 20M scale (1:20,000,000), WGS84 (EPSG:4326), LEVL_2 only.
|
||||||
|
# 20M resolution gives simplified polygons that are fast for point-in-polygon
|
||||||
|
# matching without sacrificing accuracy at the NUTS-2 boundary level.
|
||||||
|
GISCO_URL = (
|
||||||
|
"https://gisco-services.ec.europa.eu/distribution/v2/nuts/geojson/"
|
||||||
|
"NUTS_RG_20M_2021_4326_LEVL_2.geojson"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fixed partition: NUTS boundaries are a static reference file, not time-series data.
|
||||||
|
# Use the NUTS revision year as the partition to make the source version explicit.
|
||||||
|
DEST_REL_PATH = "gisco/2024/01/nuts2_boundaries.geojson"
|
||||||
|
|
||||||
|
HTTP_TIMEOUT_SECONDS = 120
|
||||||
|
|
||||||
|
|
||||||
|
def download_nuts_boundaries(landing_dir: Path) -> None:
|
||||||
|
dest = landing_dir / DEST_REL_PATH
|
||||||
|
if dest.exists():
|
||||||
|
print(f"Already exists (skipping): {dest}")
|
||||||
|
return
|
||||||
|
|
||||||
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
print(f"Downloading NUTS-2 boundaries from GISCO...")
|
||||||
|
print(f" URL: {GISCO_URL}")
|
||||||
|
|
||||||
|
with niquests.Session() as session:
|
||||||
|
resp = session.get(GISCO_URL, timeout=HTTP_TIMEOUT_SECONDS)
|
||||||
|
resp.raise_for_status()
|
||||||
|
|
||||||
|
content = resp.content
|
||||||
|
assert len(content) > 100_000, (
|
||||||
|
f"GeoJSON too small ({len(content)} bytes) — download may have failed"
|
||||||
|
)
|
||||||
|
assert b'"FeatureCollection"' in content, "Response does not look like GeoJSON"
|
||||||
|
|
||||||
|
# Write uncompressed — ST_Read requires a plain file
|
||||||
|
tmp = dest.with_suffix(".geojson.tmp")
|
||||||
|
tmp.write_bytes(content)
|
||||||
|
tmp.rename(dest)
|
||||||
|
|
||||||
|
size_mb = len(content) / 1_000_000
|
||||||
|
print(f" Written: {dest} ({size_mb:.1f} MB)")
|
||||||
|
print("Done. Run SQLMesh plan to rebuild stg_nuts2_boundaries.")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser(description=__doc__)
|
||||||
|
parser.add_argument("--landing-dir", default="data/landing", type=Path)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if not args.landing_dir.is_dir():
|
||||||
|
print(f"Error: landing dir does not exist: {args.landing_dir}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
download_nuts_boundaries(args.landing_dir)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -1,15 +1,15 @@
|
|||||||
-- Eurostat NUTS-1 regional household income in PPS (dataset: nama_10r_2hhinc).
|
-- Eurostat NUTS-1 and NUTS-2 regional household income in PPS (dataset: nama_10r_2hhinc).
|
||||||
-- Filters to NUTS-1 codes (exactly 3 characters, e.g. DE1, DE2, …).
|
-- Accepts NUTS-1 codes (3 characters, e.g. DE1) and NUTS-2 codes (4 characters, e.g. DE21).
|
||||||
-- One row per (nuts1_code, ref_year).
|
-- One row per (nuts_code, ref_year).
|
||||||
--
|
--
|
||||||
-- Source: data/landing/eurostat/{year}/{month}/nama_10r_2hhinc.json.gz
|
-- Source: data/landing/eurostat/{year}/{month}/nama_10r_2hhinc.json.gz
|
||||||
-- Format: {"rows": [{"geo_code": "DE1", "ref_year": "2022", "value": 29400}, ...]}
|
-- Format: {"rows": [{"geo_code": "DE21", "ref_year": "2022", "value": 31200}, ...]}
|
||||||
|
|
||||||
MODEL (
|
MODEL (
|
||||||
name staging.stg_regional_income,
|
name staging.stg_regional_income,
|
||||||
kind FULL,
|
kind FULL,
|
||||||
cron '@daily',
|
cron '@daily',
|
||||||
grain (nuts1_code, ref_year)
|
grain (nuts_code, ref_year)
|
||||||
);
|
);
|
||||||
|
|
||||||
WITH source AS (
|
WITH source AS (
|
||||||
@@ -34,11 +34,13 @@ SELECT
|
|||||||
WHEN geo_code LIKE 'EL%' THEN 'GR' || SUBSTR(geo_code, 3)
|
WHEN geo_code LIKE 'EL%' THEN 'GR' || SUBSTR(geo_code, 3)
|
||||||
WHEN geo_code LIKE 'UK%' THEN 'GB' || SUBSTR(geo_code, 3)
|
WHEN geo_code LIKE 'UK%' THEN 'GB' || SUBSTR(geo_code, 3)
|
||||||
ELSE geo_code
|
ELSE geo_code
|
||||||
END AS nuts1_code,
|
END AS nuts_code,
|
||||||
|
-- NUTS level: 3-char = NUTS-1, 4-char = NUTS-2
|
||||||
|
LENGTH(geo_code) - 2 AS nuts_level,
|
||||||
ref_year,
|
ref_year,
|
||||||
regional_income_pps,
|
regional_income_pps,
|
||||||
extracted_date
|
extracted_date
|
||||||
FROM parsed
|
FROM parsed
|
||||||
-- NUTS-1 codes are exactly 3 characters (country 2 + region 1)
|
-- NUTS-1 (3 chars) and NUTS-2 (4 chars); exclude country codes (2) and NUTS-3 (5)
|
||||||
WHERE LENGTH(geo_code) = 3
|
WHERE LENGTH(geo_code) IN (3, 4)
|
||||||
AND regional_income_pps > 0
|
AND regional_income_pps > 0
|
||||||
|
|||||||
Reference in New Issue
Block a user