feat(scoring): Score v6 — World Bank global economic data for non-EU countries

Non-EU countries (AR, MX, AE, AU, etc.) previously got NULL for
median_income_pps and pli_construction, falling back to EU-calibrated
defaults (15K PPS, PLI=100) that produced wrong scores.

New World Bank WDI extractor fetches GNI per capita PPP and price level
ratio for 215 countries. dim_countries uses Germany as calibration anchor
to scale WB values into the Eurostat range (dynamic ratio, self-corrects
as both sources update). EU countries keep exact Eurostat values.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-03-08 18:17:33 +01:00
parent fcef47cb22
commit 3c135051fd
7 changed files with 246 additions and 8 deletions

View File

@@ -22,6 +22,7 @@ extract-census-usa-income = "padelnomics_extract.census_usa_income:main"
extract-ons-uk = "padelnomics_extract.ons_uk:main"
extract-geonames = "padelnomics_extract.geonames:main"
extract-gisco = "padelnomics_extract.gisco:main"
extract-worldbank = "padelnomics_extract.worldbank:main"
[build-system]
requires = ["hatchling"]

View File

@@ -7,7 +7,7 @@ A graphlib.TopologicalSorter schedules them: tasks with no unmet dependencies
run immediately in parallel; each completion may unlock new tasks.
Current dependency graph:
- All 9 non-availability extractors have no dependencies (run in parallel)
- All 10 non-availability extractors have no dependencies (run in parallel)
- playtomic_availability depends on playtomic_tenants (starts as soon as
tenants finishes, even if other extractors are still running)
"""
@@ -38,6 +38,8 @@ from .playtomic_availability import EXTRACTOR_NAME as AVAILABILITY_NAME
from .playtomic_availability import extract as extract_availability
from .playtomic_tenants import EXTRACTOR_NAME as TENANTS_NAME
from .playtomic_tenants import extract as extract_tenants
from .worldbank import EXTRACTOR_NAME as WORLDBANK_NAME
from .worldbank import extract as extract_worldbank
logger = setup_logging("padelnomics.extract")
@@ -54,6 +56,7 @@ EXTRACTORS: dict[str, tuple] = {
GEONAMES_NAME: (extract_geonames, []),
GISCO_NAME: (extract_gisco, []),
TENANTS_NAME: (extract_tenants, []),
WORLDBANK_NAME: (extract_worldbank, []),
AVAILABILITY_NAME: (extract_availability, [TENANTS_NAME]),
}

View File

@@ -0,0 +1,153 @@
"""World Bank WDI extractor — GNI per capita PPP and price level ratio.
Fetches two indicators (one API call each, no key required):
- NY.GNP.PCAP.PP.CD — GNI per capita, PPP (international $)
- PA.NUS.PPPC.RF — Price level ratio (PPP conversion factor / exchange rate)
These provide global fallbacks behind Eurostat for dim_countries.median_income_pps
and dim_countries.pli_construction (see dim_countries.sql for calibration logic).
API: World Bank API v2 — https://api.worldbank.org/v2/
No API key required. No env vars.
Landing: {LANDING_DIR}/worldbank/{year}/{month}/wdi_indicators.json.gz
Output: {"rows": [{"country_code": "DE", "indicator": "NY.GNP.PCAP.PP.CD",
"ref_year": 2023, "value": 74200.0}, ...], "count": N}
"""
import json
import sqlite3
from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import get_last_cursor, landing_path, write_gzip_atomic
logger = setup_logging("padelnomics.extract.worldbank")
EXTRACTOR_NAME = "worldbank"
INDICATORS = ["NY.GNP.PCAP.PP.CD", "PA.NUS.PPPC.RF"]
# 6 years of data — we take the latest non-null per country in staging
DATE_RANGE = "2019:2025"
MAX_PER_PAGE = 5000
MAX_PAGES = 3
WDI_BASE_URL = "https://api.worldbank.org/v2/country/all/indicator"
# WB aggregate codes that look like real 2-letter country codes.
# These are regional/income-group aggregates, not actual countries.
_WB_AGGREGATE_CODES = frozenset({
"EU", "OE",
"XC", "XD", "XE", "XF", "XG", "XH", "XI", "XJ", "XL", "XM",
"XN", "XO", "XP", "XQ", "XR", "XS", "XT", "XU", "XV", "XY",
"ZF", "ZG", "ZH", "ZI", "ZJ", "ZQ", "ZT",
"V1", "V2", "V3", "V4",
})
def _normalize_country_code(wb_code: str) -> str | None:
"""Normalize WB country code to ISO alpha-2. Returns None for aggregates."""
code = wb_code.strip().upper()
if len(code) != 2:
return None
# Reject codes starting with a digit (e.g. "1W" for World)
if code[0].isdigit():
return None
if code in _WB_AGGREGATE_CODES:
return None
return code
def _fetch_indicator(
session: niquests.Session,
indicator: str,
) -> list[dict]:
"""Fetch all records for one indicator. Returns list of row dicts."""
rows: list[dict] = []
page = 1
while page <= MAX_PAGES:
url = (
f"{WDI_BASE_URL}/{indicator}"
f"?format=json&date={DATE_RANGE}&per_page={MAX_PER_PAGE}&page={page}"
)
logger.info("GET %s page %d", indicator, page)
resp = session.get(url, timeout=HTTP_TIMEOUT_SECONDS * 2)
resp.raise_for_status()
data = resp.json()
assert isinstance(data, list) and len(data) == 2, (
f"unexpected WB response shape for {indicator}: {type(data)}, len={len(data)}"
)
meta, records = data
total_pages = meta.get("pages", 1)
if records is None:
logger.warning("WB returned null data for %s page %d", indicator, page)
break
for record in records:
value = record.get("value")
if value is None:
continue
country_code = _normalize_country_code(record["country"]["id"])
if country_code is None:
continue
rows.append({
"country_code": country_code,
"indicator": indicator,
"ref_year": int(record["date"]),
"value": float(value),
})
if page >= total_pages:
break
page += 1
return rows
def extract(
landing_dir: Path,
year_month: str,
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Fetch WDI indicators. Skips if already run this month."""
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
if last_cursor == year_month:
logger.info("already have data for %s — skipping", year_month)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
rows: list[dict] = []
for indicator in INDICATORS:
indicator_rows = _fetch_indicator(session, indicator)
logger.info("%s: %d records", indicator, len(indicator_rows))
rows.extend(indicator_rows)
assert len(rows) >= 200, f"expected ≥200 WB records, got {len(rows)} — API may have changed"
logger.info("total: %d WDI records", len(rows))
year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "worldbank", year, month)
dest = dest_dir / "wdi_indicators.json.gz"
payload = json.dumps({"rows": rows, "count": len(rows)}).encode()
bytes_written = write_gzip_atomic(dest, payload)
logger.info("written %s bytes compressed", f"{bytes_written:,}")
return {
"files_written": 1,
"files_skipped": 0,
"bytes_written": bytes_written,
"cursor_value": year_month,
}
def main() -> None:
run_extractor(EXTRACTOR_NAME, extract)
if __name__ == "__main__":
main()