Compare commits

..

2 Commits

Author SHA1 Message Date
Deeman
bf811444ba merge: Score v6 — World Bank global economic data for non-EU countries
All checks were successful
CI / test (push) Successful in 56s
CI / tag (push) Successful in 3s
2026-03-08 19:40:57 +01:00
Deeman
3c135051fd feat(scoring): Score v6 — World Bank global economic data for non-EU countries
Non-EU countries (AR, MX, AE, AU, etc.) previously got NULL for
median_income_pps and pli_construction, falling back to EU-calibrated
defaults (15K PPS, PLI=100) that produced wrong scores.

New World Bank WDI extractor fetches GNI per capita PPP and price level
ratio for 215 countries. dim_countries uses Germany as calibration anchor
to scale WB values into the Eurostat range (dynamic ratio, self-corrects
as both sources update). EU countries keep exact Eurostat values.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 18:17:33 +01:00
7 changed files with 246 additions and 8 deletions

View File

@@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased]
### Changed
- **Score v6: Global economic data** — `dim_countries.median_income_pps` and `pli_construction` now cover all target markets, not just EU. World Bank WDI indicators (GNI per capita PPP + price level ratio) fill gaps for non-EU countries (AR, MX, AE, AU, etc.) with values calibrated to the Eurostat scale using Germany as anchor. EU countries keep exact Eurostat values. New extractor (`worldbank.py`), staging model (`stg_worldbank_income`), and `dim_countries` fallback CTEs. No changes to scoring formulas — the fix is upstream in the data layer.
- **Market Score v3 → v4** — fixes Spain averaging 54 (should be 65-80). Four calibration changes: count gate threshold lowered from 5 → 3 venues (3 establishes a market pattern), density ceiling lowered from LN(21) → LN(11) (10/100k is reachable for mature markets), demand evidence fallback raised from 0.4 → 0.65 multiplier with 0.3 floor (existence of venues IS evidence of demand), economic context ceiling changed from income/200 → income/25000 (actual discrimination instead of free 10 pts for everyone).
- **Opportunity Score v4 → v5** — fixes structural flaws: supply gap (30pts) + catchment gap (15pts) merged into single supply deficit (35pts, GREATEST of density gap and distance gap) eliminating ~80% correlated double-count. New sports culture signal (10pts) using tennis court density as racquet-sport adoption proxy. New construction affordability signal (5pts) using income relative to PLI construction costs from `dim_countries`. Economic power reduced from 20 → 15pts. New dependency on `foundation.dim_countries` for `pli_construction`.

View File

@@ -22,6 +22,7 @@ extract-census-usa-income = "padelnomics_extract.census_usa_income:main"
extract-ons-uk = "padelnomics_extract.ons_uk:main"
extract-geonames = "padelnomics_extract.geonames:main"
extract-gisco = "padelnomics_extract.gisco:main"
extract-worldbank = "padelnomics_extract.worldbank:main"
[build-system]
requires = ["hatchling"]

View File

@@ -7,7 +7,7 @@ A graphlib.TopologicalSorter schedules them: tasks with no unmet dependencies
run immediately in parallel; each completion may unlock new tasks.
Current dependency graph:
- All 9 non-availability extractors have no dependencies (run in parallel)
- All 10 non-availability extractors have no dependencies (run in parallel)
- playtomic_availability depends on playtomic_tenants (starts as soon as
tenants finishes, even if other extractors are still running)
"""
@@ -38,6 +38,8 @@ from .playtomic_availability import EXTRACTOR_NAME as AVAILABILITY_NAME
from .playtomic_availability import extract as extract_availability
from .playtomic_tenants import EXTRACTOR_NAME as TENANTS_NAME
from .playtomic_tenants import extract as extract_tenants
from .worldbank import EXTRACTOR_NAME as WORLDBANK_NAME
from .worldbank import extract as extract_worldbank
logger = setup_logging("padelnomics.extract")
@@ -54,6 +56,7 @@ EXTRACTORS: dict[str, tuple] = {
GEONAMES_NAME: (extract_geonames, []),
GISCO_NAME: (extract_gisco, []),
TENANTS_NAME: (extract_tenants, []),
WORLDBANK_NAME: (extract_worldbank, []),
AVAILABILITY_NAME: (extract_availability, [TENANTS_NAME]),
}

View File

@@ -0,0 +1,153 @@
"""World Bank WDI extractor — GNI per capita PPP and price level ratio.
Fetches two indicators (one API call each, no key required):
- NY.GNP.PCAP.PP.CD — GNI per capita, PPP (international $)
- PA.NUS.PPPC.RF — Price level ratio (PPP conversion factor / exchange rate)
These provide global fallbacks behind Eurostat for dim_countries.median_income_pps
and dim_countries.pli_construction (see dim_countries.sql for calibration logic).
API: World Bank API v2 — https://api.worldbank.org/v2/
No API key required. No env vars.
Landing: {LANDING_DIR}/worldbank/{year}/{month}/wdi_indicators.json.gz
Output: {"rows": [{"country_code": "DE", "indicator": "NY.GNP.PCAP.PP.CD",
"ref_year": 2023, "value": 74200.0}, ...], "count": N}
"""
import json
import sqlite3
from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import get_last_cursor, landing_path, write_gzip_atomic
logger = setup_logging("padelnomics.extract.worldbank")
EXTRACTOR_NAME = "worldbank"
INDICATORS = ["NY.GNP.PCAP.PP.CD", "PA.NUS.PPPC.RF"]
# 6 years of data — we take the latest non-null per country in staging
DATE_RANGE = "2019:2025"
MAX_PER_PAGE = 5000
MAX_PAGES = 3
WDI_BASE_URL = "https://api.worldbank.org/v2/country/all/indicator"
# WB aggregate codes that look like real 2-letter country codes.
# These are regional/income-group aggregates, not actual countries.
_WB_AGGREGATE_CODES = frozenset({
"EU", "OE",
"XC", "XD", "XE", "XF", "XG", "XH", "XI", "XJ", "XL", "XM",
"XN", "XO", "XP", "XQ", "XR", "XS", "XT", "XU", "XV", "XY",
"ZF", "ZG", "ZH", "ZI", "ZJ", "ZQ", "ZT",
"V1", "V2", "V3", "V4",
})
def _normalize_country_code(wb_code: str) -> str | None:
"""Normalize WB country code to ISO alpha-2. Returns None for aggregates."""
code = wb_code.strip().upper()
if len(code) != 2:
return None
# Reject codes starting with a digit (e.g. "1W" for World)
if code[0].isdigit():
return None
if code in _WB_AGGREGATE_CODES:
return None
return code
def _fetch_indicator(
session: niquests.Session,
indicator: str,
) -> list[dict]:
"""Fetch all records for one indicator. Returns list of row dicts."""
rows: list[dict] = []
page = 1
while page <= MAX_PAGES:
url = (
f"{WDI_BASE_URL}/{indicator}"
f"?format=json&date={DATE_RANGE}&per_page={MAX_PER_PAGE}&page={page}"
)
logger.info("GET %s page %d", indicator, page)
resp = session.get(url, timeout=HTTP_TIMEOUT_SECONDS * 2)
resp.raise_for_status()
data = resp.json()
assert isinstance(data, list) and len(data) == 2, (
f"unexpected WB response shape for {indicator}: {type(data)}, len={len(data)}"
)
meta, records = data
total_pages = meta.get("pages", 1)
if records is None:
logger.warning("WB returned null data for %s page %d", indicator, page)
break
for record in records:
value = record.get("value")
if value is None:
continue
country_code = _normalize_country_code(record["country"]["id"])
if country_code is None:
continue
rows.append({
"country_code": country_code,
"indicator": indicator,
"ref_year": int(record["date"]),
"value": float(value),
})
if page >= total_pages:
break
page += 1
return rows
def extract(
landing_dir: Path,
year_month: str,
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Fetch WDI indicators. Skips if already run this month."""
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
if last_cursor == year_month:
logger.info("already have data for %s — skipping", year_month)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
rows: list[dict] = []
for indicator in INDICATORS:
indicator_rows = _fetch_indicator(session, indicator)
logger.info("%s: %d records", indicator, len(indicator_rows))
rows.extend(indicator_rows)
assert len(rows) >= 200, f"expected ≥200 WB records, got {len(rows)} — API may have changed"
logger.info("total: %d WDI records", len(rows))
year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "worldbank", year, month)
dest = dest_dir / "wdi_indicators.json.gz"
payload = json.dumps({"rows": rows, "count": len(rows)}).encode()
bytes_written = write_gzip_atomic(dest, payload)
logger.info("written %s bytes compressed", f"{bytes_written:,}")
return {
"files_written": 1,
"files_skipped": 0,
"bytes_written": bytes_written,
"cursor_value": year_month,
}
def main() -> None:
run_extractor(EXTRACTOR_NAME, extract)
if __name__ == "__main__":
main()

View File

@@ -72,3 +72,8 @@ description = "UK local authority population estimates from ONS"
module = "padelnomics_extract.gisco"
schedule = "0 0 1 1 *"
description = "EU geographic boundaries (NUTS2 polygons) from Eurostat GISCO"
[worldbank]
module = "padelnomics_extract.worldbank"
schedule = "monthly"
description = "GNI per capita PPP + price level ratio from World Bank WDI"

View File

@@ -2,10 +2,14 @@
--
-- Consolidates data previously duplicated across dim_cities and dim_locations:
-- - country_name_en / country_slug (was: ~50-line CASE blocks in both models)
-- - median_income_pps (was: country_income CTE in both models)
-- - energy prices, labour costs, PLI indices (new — from Eurostat datasets)
-- - median_income_pps (Eurostat PPS preferred, World Bank GNI PPP fallback)
-- - energy prices, labour costs, PLI indices (Eurostat, WB price level ratio fallback)
-- - cost override columns for the financial calculator
--
-- World Bank fallback: for non-EU countries (AR, MX, AE, AU, etc.), income and PLI
-- are derived from WB WDI indicators calibrated to the Eurostat scale using Germany
-- as anchor. See de_calibration CTE. EU countries keep exact Eurostat values.
--
-- Used by: dim_cities, dim_locations, pseo_city_costs_de, planner_defaults.
-- Grain: country_code (one row per ISO 3166-1 alpha-2 country code).
-- Kind: FULL — small table (~40 rows), full refresh daily.
@@ -82,6 +86,26 @@ de_elec AS (
de_gas AS (
SELECT gas_eur_gj FROM latest_gas WHERE country_code = 'DE'
),
-- Latest World Bank WDI per country (GNI PPP + price level ratio)
latest_wb AS (
SELECT country_code, gni_ppp, price_level_ratio, ref_year AS wb_year
FROM staging.stg_worldbank_income
WHERE gni_ppp IS NOT NULL OR price_level_ratio IS NOT NULL
QUALIFY ROW_NUMBER() OVER (PARTITION BY country_code ORDER BY ref_year DESC) = 1
),
-- Germany calibration anchor: Eurostat PPS + WB GNI PPP + WB price ratio + Eurostat PLI construction.
-- Used to scale World Bank values into Eurostat-comparable ranges.
-- Single row; if DE is missing from any source, that ratio produces NULL (safe fallthrough).
de_calibration AS (
SELECT
i.median_income_pps AS de_eurostat_pps,
wb.gni_ppp AS de_gni_ppp,
wb.price_level_ratio AS de_price_level_ratio,
p.construction AS de_pli_construction
FROM (SELECT median_income_pps FROM latest_income WHERE country_code = 'DE') i
CROSS JOIN (SELECT gni_ppp, price_level_ratio FROM latest_wb WHERE country_code = 'DE') wb
CROSS JOIN (SELECT construction FROM pli_pivoted WHERE country_code = 'DE') p
),
-- All distinct country codes from any source
all_countries AS (
SELECT country_code FROM latest_income
@@ -93,6 +117,8 @@ all_countries AS (
SELECT country_code FROM latest_labour
UNION
SELECT country_code FROM pli_pivoted
UNION
SELECT country_code FROM latest_wb
-- Ensure known padel markets appear even if Eurostat doesn't cover them yet
UNION ALL
SELECT unnest(['DE','ES','GB','FR','IT','PT','AT','CH','NL','BE','SE','NO','DK','FI',
@@ -149,15 +175,21 @@ SELECT
ELSE ac.country_code
END, '[^a-zA-Z0-9]+', '-'
)) AS country_slug,
-- Income data
-- Income: Eurostat PPS preferred, World Bank GNI PPP scaled to PPS as fallback
COALESCE(
i.median_income_pps,
i.income_year,
ROUND(wb.gni_ppp * (de_cal.de_eurostat_pps / NULLIF(de_cal.de_gni_ppp, 0)), 0)
) AS median_income_pps,
COALESCE(i.income_year, wb.wb_year) AS income_year,
-- Raw energy and labour data (for reference / future staffed-scenario use)
e.electricity_eur_kwh,
g.gas_eur_gj,
la.labour_cost_eur_hour,
-- PLI indices per category (EU27=100)
p.construction AS pli_construction,
-- PLI construction: Eurostat preferred, World Bank price level ratio scaled to PLI as fallback
COALESCE(
p.construction,
ROUND(wb.price_level_ratio / NULLIF(de_cal.de_price_level_ratio, 0) * de_cal.de_pli_construction, 1)
) AS pli_construction,
p.housing AS pli_housing,
p.services AS pli_services,
p.misc AS pli_misc,
@@ -278,8 +310,10 @@ LEFT JOIN latest_electricity e ON ac.country_code = e.country_code
LEFT JOIN latest_gas g ON ac.country_code = g.country_code
LEFT JOIN latest_labour la ON ac.country_code = la.country_code
LEFT JOIN pli_pivoted p ON ac.country_code = p.country_code
LEFT JOIN latest_wb wb ON ac.country_code = wb.country_code
CROSS JOIN de_pli de_p
CROSS JOIN de_elec de_e
CROSS JOIN de_gas de_g
CROSS JOIN de_calibration de_cal
-- Enforce grain
QUALIFY ROW_NUMBER() OVER (PARTITION BY ac.country_code ORDER BY ac.country_code) = 1

View File

@@ -0,0 +1,41 @@
-- World Bank WDI indicators: GNI per capita PPP and price level ratio.
-- Pivoted to one row per (country_code, ref_year) with both indicators as columns.
--
-- Source: data/landing/worldbank/{year}/{month}/wdi_indicators.json.gz
-- Extracted by: worldbank.py
-- Used by: dim_countries (fallback behind Eurostat for non-EU countries)
MODEL (
name staging.stg_worldbank_income,
kind FULL,
cron '@daily',
grain (country_code, ref_year)
);
WITH parsed AS (
SELECT
row ->> 'country_code' AS country_code,
TRY_CAST(row ->> 'ref_year' AS INTEGER) AS ref_year,
row ->> 'indicator' AS indicator,
TRY_CAST(row ->> 'value' AS DOUBLE) AS value,
CURRENT_DATE AS extracted_date
FROM (
SELECT UNNEST(rows) AS row
FROM read_json(
@LANDING_DIR || '/worldbank/*/*/wdi_indicators.json.gz',
auto_detect = true
)
)
WHERE (row ->> 'country_code') IS NOT NULL
)
SELECT
country_code,
ref_year,
MAX(value) FILTER (WHERE indicator = 'NY.GNP.PCAP.PP.CD') AS gni_ppp,
MAX(value) FILTER (WHERE indicator = 'PA.NUS.PPPC.RF') AS price_level_ratio,
MAX(extracted_date) AS extracted_date
FROM parsed
WHERE value IS NOT NULL
AND value > 0
AND LENGTH(country_code) = 2
GROUP BY country_code, ref_year