diff --git a/CHANGELOG.md b/CHANGELOG.md index 31be105..c8bec32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] ### Changed +- **Score v6: Global economic data** — `dim_countries.median_income_pps` and `pli_construction` now cover all target markets, not just EU. World Bank WDI indicators (GNI per capita PPP + price level ratio) fill gaps for non-EU countries (AR, MX, AE, AU, etc.) with values calibrated to the Eurostat scale using Germany as anchor. EU countries keep exact Eurostat values. New extractor (`worldbank.py`), staging model (`stg_worldbank_income`), and `dim_countries` fallback CTEs. No changes to scoring formulas — the fix is upstream in the data layer. - **Market Score v3 → v4** — fixes Spain averaging 54 (should be 65-80). Four calibration changes: count gate threshold lowered from 5 → 3 venues (3 establishes a market pattern), density ceiling lowered from LN(21) → LN(11) (10/100k is reachable for mature markets), demand evidence fallback raised from 0.4 → 0.65 multiplier with 0.3 floor (existence of venues IS evidence of demand), economic context ceiling changed from income/200 → income/25000 (actual discrimination instead of free 10 pts for everyone). - **Opportunity Score v4 → v5** — fixes structural flaws: supply gap (30pts) + catchment gap (15pts) merged into single supply deficit (35pts, GREATEST of density gap and distance gap) eliminating ~80% correlated double-count. New sports culture signal (10pts) using tennis court density as racquet-sport adoption proxy. New construction affordability signal (5pts) using income relative to PLI construction costs from `dim_countries`. Economic power reduced from 20 → 15pts. New dependency on `foundation.dim_countries` for `pli_construction`. diff --git a/extract/padelnomics_extract/pyproject.toml b/extract/padelnomics_extract/pyproject.toml index b5e7910..45406f5 100644 --- a/extract/padelnomics_extract/pyproject.toml +++ b/extract/padelnomics_extract/pyproject.toml @@ -22,6 +22,7 @@ extract-census-usa-income = "padelnomics_extract.census_usa_income:main" extract-ons-uk = "padelnomics_extract.ons_uk:main" extract-geonames = "padelnomics_extract.geonames:main" extract-gisco = "padelnomics_extract.gisco:main" +extract-worldbank = "padelnomics_extract.worldbank:main" [build-system] requires = ["hatchling"] diff --git a/extract/padelnomics_extract/src/padelnomics_extract/all.py b/extract/padelnomics_extract/src/padelnomics_extract/all.py index 6796fed..6ad9d5c 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/all.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/all.py @@ -7,7 +7,7 @@ A graphlib.TopologicalSorter schedules them: tasks with no unmet dependencies run immediately in parallel; each completion may unlock new tasks. Current dependency graph: - - All 9 non-availability extractors have no dependencies (run in parallel) + - All 10 non-availability extractors have no dependencies (run in parallel) - playtomic_availability depends on playtomic_tenants (starts as soon as tenants finishes, even if other extractors are still running) """ @@ -38,6 +38,8 @@ from .playtomic_availability import EXTRACTOR_NAME as AVAILABILITY_NAME from .playtomic_availability import extract as extract_availability from .playtomic_tenants import EXTRACTOR_NAME as TENANTS_NAME from .playtomic_tenants import extract as extract_tenants +from .worldbank import EXTRACTOR_NAME as WORLDBANK_NAME +from .worldbank import extract as extract_worldbank logger = setup_logging("padelnomics.extract") @@ -54,6 +56,7 @@ EXTRACTORS: dict[str, tuple] = { GEONAMES_NAME: (extract_geonames, []), GISCO_NAME: (extract_gisco, []), TENANTS_NAME: (extract_tenants, []), + WORLDBANK_NAME: (extract_worldbank, []), AVAILABILITY_NAME: (extract_availability, [TENANTS_NAME]), } diff --git a/extract/padelnomics_extract/src/padelnomics_extract/worldbank.py b/extract/padelnomics_extract/src/padelnomics_extract/worldbank.py new file mode 100644 index 0000000..916313e --- /dev/null +++ b/extract/padelnomics_extract/src/padelnomics_extract/worldbank.py @@ -0,0 +1,153 @@ +"""World Bank WDI extractor — GNI per capita PPP and price level ratio. + +Fetches two indicators (one API call each, no key required): + - NY.GNP.PCAP.PP.CD — GNI per capita, PPP (international $) + - PA.NUS.PPPC.RF — Price level ratio (PPP conversion factor / exchange rate) + +These provide global fallbacks behind Eurostat for dim_countries.median_income_pps +and dim_countries.pli_construction (see dim_countries.sql for calibration logic). + +API: World Bank API v2 — https://api.worldbank.org/v2/ +No API key required. No env vars. + +Landing: {LANDING_DIR}/worldbank/{year}/{month}/wdi_indicators.json.gz +Output: {"rows": [{"country_code": "DE", "indicator": "NY.GNP.PCAP.PP.CD", + "ref_year": 2023, "value": 74200.0}, ...], "count": N} +""" + +import json +import sqlite3 +from pathlib import Path + +import niquests + +from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging +from .utils import get_last_cursor, landing_path, write_gzip_atomic + +logger = setup_logging("padelnomics.extract.worldbank") + +EXTRACTOR_NAME = "worldbank" + +INDICATORS = ["NY.GNP.PCAP.PP.CD", "PA.NUS.PPPC.RF"] +# 6 years of data — we take the latest non-null per country in staging +DATE_RANGE = "2019:2025" +MAX_PER_PAGE = 5000 +MAX_PAGES = 3 + +WDI_BASE_URL = "https://api.worldbank.org/v2/country/all/indicator" + +# WB aggregate codes that look like real 2-letter country codes. +# These are regional/income-group aggregates, not actual countries. +_WB_AGGREGATE_CODES = frozenset({ + "EU", "OE", + "XC", "XD", "XE", "XF", "XG", "XH", "XI", "XJ", "XL", "XM", + "XN", "XO", "XP", "XQ", "XR", "XS", "XT", "XU", "XV", "XY", + "ZF", "ZG", "ZH", "ZI", "ZJ", "ZQ", "ZT", + "V1", "V2", "V3", "V4", +}) + + +def _normalize_country_code(wb_code: str) -> str | None: + """Normalize WB country code to ISO alpha-2. Returns None for aggregates.""" + code = wb_code.strip().upper() + if len(code) != 2: + return None + # Reject codes starting with a digit (e.g. "1W" for World) + if code[0].isdigit(): + return None + if code in _WB_AGGREGATE_CODES: + return None + return code + + +def _fetch_indicator( + session: niquests.Session, + indicator: str, +) -> list[dict]: + """Fetch all records for one indicator. Returns list of row dicts.""" + rows: list[dict] = [] + page = 1 + + while page <= MAX_PAGES: + url = ( + f"{WDI_BASE_URL}/{indicator}" + f"?format=json&date={DATE_RANGE}&per_page={MAX_PER_PAGE}&page={page}" + ) + logger.info("GET %s page %d", indicator, page) + resp = session.get(url, timeout=HTTP_TIMEOUT_SECONDS * 2) + resp.raise_for_status() + + data = resp.json() + assert isinstance(data, list) and len(data) == 2, ( + f"unexpected WB response shape for {indicator}: {type(data)}, len={len(data)}" + ) + meta, records = data + total_pages = meta.get("pages", 1) + + if records is None: + logger.warning("WB returned null data for %s page %d", indicator, page) + break + + for record in records: + value = record.get("value") + if value is None: + continue + country_code = _normalize_country_code(record["country"]["id"]) + if country_code is None: + continue + rows.append({ + "country_code": country_code, + "indicator": indicator, + "ref_year": int(record["date"]), + "value": float(value), + }) + + if page >= total_pages: + break + page += 1 + + return rows + + +def extract( + landing_dir: Path, + year_month: str, + conn: sqlite3.Connection, + session: niquests.Session, +) -> dict: + """Fetch WDI indicators. Skips if already run this month.""" + last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) + if last_cursor == year_month: + logger.info("already have data for %s — skipping", year_month) + return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} + + rows: list[dict] = [] + for indicator in INDICATORS: + indicator_rows = _fetch_indicator(session, indicator) + logger.info("%s: %d records", indicator, len(indicator_rows)) + rows.extend(indicator_rows) + + assert len(rows) >= 200, f"expected ≥200 WB records, got {len(rows)} — API may have changed" + logger.info("total: %d WDI records", len(rows)) + + year, month = year_month.split("/") + dest_dir = landing_path(landing_dir, "worldbank", year, month) + dest = dest_dir / "wdi_indicators.json.gz" + payload = json.dumps({"rows": rows, "count": len(rows)}).encode() + bytes_written = write_gzip_atomic(dest, payload) + logger.info("written %s bytes compressed", f"{bytes_written:,}") + + return { + "files_written": 1, + "files_skipped": 0, + "bytes_written": bytes_written, + "cursor_value": year_month, + } + + +def main() -> None: + run_extractor(EXTRACTOR_NAME, extract) + + +if __name__ == "__main__": + main() diff --git a/infra/supervisor/workflows.toml b/infra/supervisor/workflows.toml index 93913e5..524bca8 100644 --- a/infra/supervisor/workflows.toml +++ b/infra/supervisor/workflows.toml @@ -72,3 +72,8 @@ description = "UK local authority population estimates from ONS" module = "padelnomics_extract.gisco" schedule = "0 0 1 1 *" description = "EU geographic boundaries (NUTS2 polygons) from Eurostat GISCO" + +[worldbank] +module = "padelnomics_extract.worldbank" +schedule = "monthly" +description = "GNI per capita PPP + price level ratio from World Bank WDI" diff --git a/transform/sqlmesh_padelnomics/models/foundation/dim_countries.sql b/transform/sqlmesh_padelnomics/models/foundation/dim_countries.sql index 908fb0f..3f33648 100644 --- a/transform/sqlmesh_padelnomics/models/foundation/dim_countries.sql +++ b/transform/sqlmesh_padelnomics/models/foundation/dim_countries.sql @@ -2,10 +2,14 @@ -- -- Consolidates data previously duplicated across dim_cities and dim_locations: -- - country_name_en / country_slug (was: ~50-line CASE blocks in both models) --- - median_income_pps (was: country_income CTE in both models) --- - energy prices, labour costs, PLI indices (new — from Eurostat datasets) +-- - median_income_pps (Eurostat PPS preferred, World Bank GNI PPP fallback) +-- - energy prices, labour costs, PLI indices (Eurostat, WB price level ratio fallback) -- - cost override columns for the financial calculator -- +-- World Bank fallback: for non-EU countries (AR, MX, AE, AU, etc.), income and PLI +-- are derived from WB WDI indicators calibrated to the Eurostat scale using Germany +-- as anchor. See de_calibration CTE. EU countries keep exact Eurostat values. +-- -- Used by: dim_cities, dim_locations, pseo_city_costs_de, planner_defaults. -- Grain: country_code (one row per ISO 3166-1 alpha-2 country code). -- Kind: FULL — small table (~40 rows), full refresh daily. @@ -82,6 +86,26 @@ de_elec AS ( de_gas AS ( SELECT gas_eur_gj FROM latest_gas WHERE country_code = 'DE' ), +-- Latest World Bank WDI per country (GNI PPP + price level ratio) +latest_wb AS ( + SELECT country_code, gni_ppp, price_level_ratio, ref_year AS wb_year + FROM staging.stg_worldbank_income + WHERE gni_ppp IS NOT NULL OR price_level_ratio IS NOT NULL + QUALIFY ROW_NUMBER() OVER (PARTITION BY country_code ORDER BY ref_year DESC) = 1 +), +-- Germany calibration anchor: Eurostat PPS + WB GNI PPP + WB price ratio + Eurostat PLI construction. +-- Used to scale World Bank values into Eurostat-comparable ranges. +-- Single row; if DE is missing from any source, that ratio produces NULL (safe fallthrough). +de_calibration AS ( + SELECT + i.median_income_pps AS de_eurostat_pps, + wb.gni_ppp AS de_gni_ppp, + wb.price_level_ratio AS de_price_level_ratio, + p.construction AS de_pli_construction + FROM (SELECT median_income_pps FROM latest_income WHERE country_code = 'DE') i + CROSS JOIN (SELECT gni_ppp, price_level_ratio FROM latest_wb WHERE country_code = 'DE') wb + CROSS JOIN (SELECT construction FROM pli_pivoted WHERE country_code = 'DE') p +), -- All distinct country codes from any source all_countries AS ( SELECT country_code FROM latest_income @@ -93,6 +117,8 @@ all_countries AS ( SELECT country_code FROM latest_labour UNION SELECT country_code FROM pli_pivoted + UNION + SELECT country_code FROM latest_wb -- Ensure known padel markets appear even if Eurostat doesn't cover them yet UNION ALL SELECT unnest(['DE','ES','GB','FR','IT','PT','AT','CH','NL','BE','SE','NO','DK','FI', @@ -149,15 +175,21 @@ SELECT ELSE ac.country_code END, '[^a-zA-Z0-9]+', '-' )) AS country_slug, - -- Income data - i.median_income_pps, - i.income_year, + -- Income: Eurostat PPS preferred, World Bank GNI PPP scaled to PPS as fallback + COALESCE( + i.median_income_pps, + ROUND(wb.gni_ppp * (de_cal.de_eurostat_pps / NULLIF(de_cal.de_gni_ppp, 0)), 0) + ) AS median_income_pps, + COALESCE(i.income_year, wb.wb_year) AS income_year, -- Raw energy and labour data (for reference / future staffed-scenario use) e.electricity_eur_kwh, g.gas_eur_gj, la.labour_cost_eur_hour, - -- PLI indices per category (EU27=100) - p.construction AS pli_construction, + -- PLI construction: Eurostat preferred, World Bank price level ratio scaled to PLI as fallback + COALESCE( + p.construction, + ROUND(wb.price_level_ratio / NULLIF(de_cal.de_price_level_ratio, 0) * de_cal.de_pli_construction, 1) + ) AS pli_construction, p.housing AS pli_housing, p.services AS pli_services, p.misc AS pli_misc, @@ -278,8 +310,10 @@ LEFT JOIN latest_electricity e ON ac.country_code = e.country_code LEFT JOIN latest_gas g ON ac.country_code = g.country_code LEFT JOIN latest_labour la ON ac.country_code = la.country_code LEFT JOIN pli_pivoted p ON ac.country_code = p.country_code +LEFT JOIN latest_wb wb ON ac.country_code = wb.country_code CROSS JOIN de_pli de_p CROSS JOIN de_elec de_e CROSS JOIN de_gas de_g +CROSS JOIN de_calibration de_cal -- Enforce grain QUALIFY ROW_NUMBER() OVER (PARTITION BY ac.country_code ORDER BY ac.country_code) = 1 diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_worldbank_income.sql b/transform/sqlmesh_padelnomics/models/staging/stg_worldbank_income.sql new file mode 100644 index 0000000..55cd753 --- /dev/null +++ b/transform/sqlmesh_padelnomics/models/staging/stg_worldbank_income.sql @@ -0,0 +1,41 @@ +-- World Bank WDI indicators: GNI per capita PPP and price level ratio. +-- Pivoted to one row per (country_code, ref_year) with both indicators as columns. +-- +-- Source: data/landing/worldbank/{year}/{month}/wdi_indicators.json.gz +-- Extracted by: worldbank.py +-- Used by: dim_countries (fallback behind Eurostat for non-EU countries) + +MODEL ( + name staging.stg_worldbank_income, + kind FULL, + cron '@daily', + grain (country_code, ref_year) +); + +WITH parsed AS ( + SELECT + row ->> 'country_code' AS country_code, + TRY_CAST(row ->> 'ref_year' AS INTEGER) AS ref_year, + row ->> 'indicator' AS indicator, + TRY_CAST(row ->> 'value' AS DOUBLE) AS value, + CURRENT_DATE AS extracted_date + FROM ( + SELECT UNNEST(rows) AS row + FROM read_json( + @LANDING_DIR || '/worldbank/*/*/wdi_indicators.json.gz', + auto_detect = true + ) + ) + WHERE (row ->> 'country_code') IS NOT NULL +) +SELECT + country_code, + ref_year, + MAX(value) FILTER (WHERE indicator = 'NY.GNP.PCAP.PP.CD') AS gni_ppp, + MAX(value) FILTER (WHERE indicator = 'PA.NUS.PPPC.RF') AS price_level_ratio, + MAX(extracted_date) AS extracted_date +FROM parsed +WHERE value IS NOT NULL + AND value > 0 + AND LENGTH(country_code) = 2 +GROUP BY country_code, ref_year