feat(data): Sprint 1-5 population pipeline — city labels, US/UK/Global extractors
Part A: Data Layer — Sprints 1-5 Sprint 1 — Eurostat SDMX city labels (unblocks EU population): - New extractor: eurostat_city_labels.py — fetches ESTAT/CITIES codelist (city_code → city_name mapping) with ETag dedup - New staging model: stg_city_labels.sql — grain city_code - Updated dim_cities.sql — joins Eurostat population via city code lookup; replaces hardcoded 0::BIGINT population Sprint 2 — Market score formula v2: - city_market_profile.sql: 30pt population (LN/1M), 25pt income PPS (/200), 30pt demand (occupancy or density), 15pt data confidence - Moved venue_pricing_benchmarks join into base CTE so median_occupancy_rate is available to the scoring formula Sprint 3 — US Census ACS extractor: - New extractor: census_usa.py — ACS 5-year place population (vintage 2023) - New staging model: stg_population_usa.sql — grain (place_fips, ref_year) Sprint 4 — ONS UK extractor: - New extractor: ons_uk.py — 2021 Census LAD population via ONS beta API - New staging model: stg_population_uk.sql — grain (lad_code, ref_year) Sprint 5 — GeoNames global extractor: - New extractor: geonames.py — cities15000.zip bulk download, filtered to ≥50K pop - New staging model: stg_population_geonames.sql — grain geoname_id - dim_cities: 5-source population cascade (Eurostat > Census > ONS > GeoNames > 0) with case/whitespace-insensitive city name matching Registered all 4 new CLI entrypoints in pyproject.toml and all.py. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -15,6 +15,10 @@ extract-eurostat = "padelnomics_extract.eurostat:main"
|
||||
extract-playtomic-tenants = "padelnomics_extract.playtomic_tenants:main"
|
||||
extract-playtomic-availability = "padelnomics_extract.playtomic_availability:main"
|
||||
extract-playtomic-recheck = "padelnomics_extract.playtomic_availability:main_recheck"
|
||||
extract-eurostat-city-labels = "padelnomics_extract.eurostat_city_labels:main"
|
||||
extract-census-usa = "padelnomics_extract.census_usa:main"
|
||||
extract-ons-uk = "padelnomics_extract.ons_uk:main"
|
||||
extract-geonames = "padelnomics_extract.geonames:main"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
|
||||
@@ -5,8 +5,16 @@ Each extractor gets its own state tracking row in .state.sqlite.
|
||||
"""
|
||||
|
||||
from ._shared import run_extractor, setup_logging
|
||||
from .census_usa import EXTRACTOR_NAME as CENSUS_USA_NAME
|
||||
from .census_usa import extract as extract_census_usa
|
||||
from .eurostat import EXTRACTOR_NAME as EUROSTAT_NAME
|
||||
from .eurostat import extract as extract_eurostat
|
||||
from .eurostat_city_labels import EXTRACTOR_NAME as EUROSTAT_CITY_LABELS_NAME
|
||||
from .eurostat_city_labels import extract as extract_eurostat_city_labels
|
||||
from .geonames import EXTRACTOR_NAME as GEONAMES_NAME
|
||||
from .geonames import extract as extract_geonames
|
||||
from .ons_uk import EXTRACTOR_NAME as ONS_UK_NAME
|
||||
from .ons_uk import extract as extract_ons_uk
|
||||
from .overpass import EXTRACTOR_NAME as OVERPASS_NAME
|
||||
from .overpass import extract as extract_overpass
|
||||
from .playtomic_availability import EXTRACTOR_NAME as AVAILABILITY_NAME
|
||||
@@ -19,6 +27,10 @@ logger = setup_logging("padelnomics.extract")
|
||||
EXTRACTORS = [
|
||||
(OVERPASS_NAME, extract_overpass),
|
||||
(EUROSTAT_NAME, extract_eurostat),
|
||||
(EUROSTAT_CITY_LABELS_NAME, extract_eurostat_city_labels),
|
||||
(CENSUS_USA_NAME, extract_census_usa),
|
||||
(ONS_UK_NAME, extract_ons_uk),
|
||||
(GEONAMES_NAME, extract_geonames),
|
||||
(TENANTS_NAME, extract_tenants),
|
||||
(AVAILABILITY_NAME, extract_availability),
|
||||
]
|
||||
|
||||
@@ -0,0 +1,139 @@
|
||||
"""US Census Bureau ACS 5-year population extractor.
|
||||
|
||||
Fetches city-level (Census place) population from the American Community Survey
|
||||
5-year estimates. Requires a free API key from api.census.gov.
|
||||
|
||||
Env var: CENSUS_API_KEY (register free at https://api.census.gov/data/key_signup.html)
|
||||
|
||||
Landing: {LANDING_DIR}/census_usa/{year}/{month}/acs5_places.json.gz
|
||||
Output: {"rows": [{"city_name": "Los Angeles", "state_fips": "06",
|
||||
"place_fips": "0644000", "population": 3990456,
|
||||
"ref_year": 2023, "country_code": "US"}], "count": N}
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
import niquests
|
||||
|
||||
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
|
||||
from .utils import get_last_cursor, landing_path, write_gzip_atomic
|
||||
|
||||
logger = setup_logging("padelnomics.extract.census_usa")
|
||||
|
||||
EXTRACTOR_NAME = "census_usa"
|
||||
|
||||
# ACS 5-year estimates, 2023 vintage — refreshed annually by Census Bureau.
|
||||
# B01003_001E = total population; NAME = place name + state.
|
||||
ACS_URL = (
|
||||
"https://api.census.gov/data/2023/acs/acs5"
|
||||
"?get=B01003_001E,NAME&for=place:*&in=state:*"
|
||||
)
|
||||
|
||||
REF_YEAR = 2023
|
||||
MIN_POPULATION = 50_000
|
||||
MAX_RETRIES = 2
|
||||
|
||||
|
||||
def _parse_city_name(full_name: str) -> str:
|
||||
"""Extract city name from Census place name.
|
||||
|
||||
Examples:
|
||||
'Los Angeles city, California' → 'Los Angeles'
|
||||
'New York city, New York' → 'New York'
|
||||
'Miami city, Florida' → 'Miami'
|
||||
"""
|
||||
# Take everything before the first comma
|
||||
before_comma = full_name.split(",")[0].strip()
|
||||
# Strip common suffixes: ' city', ' town', ' CDP', ' borough', ' village'
|
||||
for suffix in (" city", " town", " CDP", " borough", " village", " municipality"):
|
||||
if before_comma.lower().endswith(suffix):
|
||||
before_comma = before_comma[: -len(suffix)].strip()
|
||||
break
|
||||
return before_comma
|
||||
|
||||
|
||||
def extract(
|
||||
landing_dir: Path,
|
||||
year_month: str,
|
||||
conn: sqlite3.Connection,
|
||||
session: niquests.Session,
|
||||
) -> dict:
|
||||
"""Fetch ACS 5-year place population. Skips if already run this month."""
|
||||
api_key = os.environ.get("CENSUS_API_KEY", "").strip()
|
||||
if not api_key:
|
||||
logger.warning("CENSUS_API_KEY not set — skipping US Census extract")
|
||||
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||
|
||||
# Skip if we already have data for this month (annual data, monthly cursor)
|
||||
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
|
||||
if last_cursor == year_month:
|
||||
logger.info("already have data for %s — skipping", year_month)
|
||||
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||
|
||||
year, month = year_month.split("/")
|
||||
url = f"{ACS_URL}&key={api_key}"
|
||||
|
||||
logger.info("GET ACS 5-year places (vintage %d)", REF_YEAR)
|
||||
resp = session.get(url, timeout=HTTP_TIMEOUT_SECONDS * 2)
|
||||
resp.raise_for_status()
|
||||
|
||||
raw = resp.json()
|
||||
assert isinstance(raw, list) and len(raw) > 1, "ACS response must be a non-empty list"
|
||||
|
||||
# First row is headers: ["B01003_001E", "NAME", "state", "place"]
|
||||
headers = raw[0]
|
||||
assert "B01003_001E" in headers, f"Population column missing from ACS response: {headers}"
|
||||
pop_idx = headers.index("B01003_001E")
|
||||
name_idx = headers.index("NAME")
|
||||
state_idx = headers.index("state")
|
||||
place_idx = headers.index("place")
|
||||
|
||||
rows: list[dict] = []
|
||||
for row in raw[1:]:
|
||||
try:
|
||||
population = int(row[pop_idx])
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
if population < MIN_POPULATION:
|
||||
continue
|
||||
full_name = row[name_idx]
|
||||
city_name = _parse_city_name(full_name)
|
||||
if not city_name:
|
||||
continue
|
||||
state_fips = row[state_idx]
|
||||
place_fips = state_fips + row[place_idx]
|
||||
rows.append({
|
||||
"city_name": city_name,
|
||||
"state_fips": state_fips,
|
||||
"place_fips": place_fips,
|
||||
"population": population,
|
||||
"ref_year": REF_YEAR,
|
||||
"country_code": "US",
|
||||
})
|
||||
|
||||
assert len(rows) > 500, f"Expected >500 US cities ≥50K pop, got {len(rows)} — parse may have failed"
|
||||
logger.info("parsed %d US cities with population ≥%d", len(rows), MIN_POPULATION)
|
||||
|
||||
dest_dir = landing_path(landing_dir, "census_usa", year, month)
|
||||
dest = dest_dir / "acs5_places.json.gz"
|
||||
payload = json.dumps({"rows": rows, "count": len(rows)}).encode()
|
||||
bytes_written = write_gzip_atomic(dest, payload)
|
||||
logger.info("written %s bytes compressed", f"{bytes_written:,}")
|
||||
|
||||
return {
|
||||
"files_written": 1,
|
||||
"files_skipped": 0,
|
||||
"bytes_written": bytes_written,
|
||||
"cursor_value": year_month,
|
||||
}
|
||||
|
||||
|
||||
def main() -> None:
|
||||
run_extractor(EXTRACTOR_NAME, extract)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,123 @@
|
||||
"""Eurostat SDMX city codelist extractor — city_code → city_name mapping.
|
||||
|
||||
The Eurostat Urban Audit population dataset (urb_cpop1) uses coded city identifiers
|
||||
(e.g. DE001C = Berlin) with no city name column. This extractor fetches the SDMX
|
||||
codelist that maps those codes to human-readable names, enabling stg_city_labels to
|
||||
join population data to dim_cities (which has names, not codes).
|
||||
|
||||
The codelist changes very rarely so ETag dedup means most runs produce a 304 skip.
|
||||
|
||||
Landing: {LANDING_DIR}/eurostat_city_labels/{year}/{month}/cities_codelist.json.gz
|
||||
Output: {"rows": [{"city_code": "DE001C", "city_name": "Berlin"}, ...], "count": N}
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
import niquests
|
||||
|
||||
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
|
||||
from .utils import landing_path, write_gzip_atomic
|
||||
|
||||
logger = setup_logging("padelnomics.extract.eurostat_city_labels")
|
||||
|
||||
EXTRACTOR_NAME = "eurostat_city_labels"
|
||||
|
||||
# SDMX codelist endpoint — returns the full CITIES dimension codes with labels
|
||||
# format=JSON gives a compact JSON-stat-like structure for the codelist
|
||||
CODELIST_URL = (
|
||||
"https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/codelist/ESTAT/CITIES"
|
||||
"?format=JSON&lang=EN"
|
||||
)
|
||||
|
||||
|
||||
def _parse_sdmx_codelist(data: dict) -> list[dict]:
|
||||
"""Extract city_code → city_name pairs from SDMX codelist JSON response.
|
||||
|
||||
The SDMX 2.1 JSON structure varies by endpoint. This endpoint returns a
|
||||
structure.codelists[0].codes list where each code has id and name[0].name.
|
||||
"""
|
||||
try:
|
||||
codelists = data["structure"]["codelists"]
|
||||
except (KeyError, TypeError) as e:
|
||||
raise ValueError(f"Unexpected SDMX structure — missing codelists: {e}") from e
|
||||
|
||||
assert len(codelists) > 0, "SDMX response has empty codelists array"
|
||||
|
||||
codes = codelists[0].get("codes", [])
|
||||
assert len(codes) > 0, "SDMX codelist has no codes — API response may have changed"
|
||||
|
||||
rows: list[dict] = []
|
||||
for code in codes:
|
||||
city_code = code.get("id", "").strip()
|
||||
if not city_code:
|
||||
continue
|
||||
# Name is a list of {lang, name} objects; pick the first (EN requested above)
|
||||
names = code.get("name", [])
|
||||
if isinstance(names, list) and names:
|
||||
city_name = names[0].get("name", "").strip()
|
||||
elif isinstance(names, str):
|
||||
city_name = names.strip()
|
||||
else:
|
||||
continue
|
||||
if city_name:
|
||||
rows.append({"city_code": city_code, "city_name": city_name})
|
||||
|
||||
return rows
|
||||
|
||||
|
||||
def _etag_path(dest: Path) -> Path:
|
||||
return dest.parent / (dest.name + ".etag")
|
||||
|
||||
|
||||
def extract(
|
||||
landing_dir: Path,
|
||||
year_month: str,
|
||||
conn: sqlite3.Connection,
|
||||
session: niquests.Session,
|
||||
) -> dict:
|
||||
"""Fetch Eurostat CITIES codelist with ETag dedup. Returns run metrics."""
|
||||
year, month = year_month.split("/")
|
||||
dest_dir = landing_path(landing_dir, "eurostat_city_labels", year, month)
|
||||
dest = dest_dir / "cities_codelist.json.gz"
|
||||
etag_file = _etag_path(dest)
|
||||
|
||||
headers: dict[str, str] = {}
|
||||
if etag_file.exists():
|
||||
headers["If-None-Match"] = etag_file.read_text().strip()
|
||||
|
||||
logger.info("GET CITIES codelist")
|
||||
resp = session.get(CODELIST_URL, headers=headers, timeout=HTTP_TIMEOUT_SECONDS)
|
||||
|
||||
if resp.status_code == 304:
|
||||
logger.info("CITIES codelist not modified (304)")
|
||||
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||
|
||||
resp.raise_for_status()
|
||||
|
||||
rows = _parse_sdmx_codelist(resp.json())
|
||||
assert len(rows) > 100, f"Expected >100 city codes, got {len(rows)} — parse may have failed"
|
||||
|
||||
payload = json.dumps({"rows": rows, "count": len(rows)}).encode()
|
||||
bytes_written = write_gzip_atomic(dest, payload)
|
||||
logger.info("written %d city codes (%s bytes compressed)", len(rows), f"{bytes_written:,}")
|
||||
|
||||
if etag := resp.headers.get("etag"):
|
||||
etag_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
etag_file.write_text(etag)
|
||||
|
||||
return {
|
||||
"files_written": 1,
|
||||
"files_skipped": 0,
|
||||
"bytes_written": bytes_written,
|
||||
"cursor_value": year_month,
|
||||
}
|
||||
|
||||
|
||||
def main() -> None:
|
||||
run_extractor(EXTRACTOR_NAME, extract)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
157
extract/padelnomics_extract/src/padelnomics_extract/geonames.py
Normal file
157
extract/padelnomics_extract/src/padelnomics_extract/geonames.py
Normal file
@@ -0,0 +1,157 @@
|
||||
"""GeoNames global city population extractor.
|
||||
|
||||
Downloads the cities15000.zip bulk file (~1.5MB compressed, ~26K entries) from
|
||||
GeoNames and filters to cities with population ≥ 50,000 and feature codes in
|
||||
{PPLA, PPLA2, PPLC, PPL} (populated places, avoiding parks, airports, etc.).
|
||||
|
||||
Used as the global fallback for population when Eurostat/Census/ONS don't cover
|
||||
a country. Padel is expanding globally so this catches UAE, Australia, Argentina, etc.
|
||||
|
||||
Requires: GEONAMES_USERNAME env var (free registration at geonames.org)
|
||||
|
||||
Landing: {LANDING_DIR}/geonames/{year}/{month}/cities_global.json.gz
|
||||
Output: {"rows": [{"geoname_id": 2950159, "city_name": "Berlin",
|
||||
"country_code": "DE", "population": 3644826,
|
||||
"ref_year": 2024}], "count": N}
|
||||
"""
|
||||
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
|
||||
import niquests
|
||||
|
||||
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
|
||||
from .utils import get_last_cursor, landing_path, write_gzip_atomic
|
||||
|
||||
logger = setup_logging("padelnomics.extract.geonames")
|
||||
|
||||
EXTRACTOR_NAME = "geonames"
|
||||
|
||||
DOWNLOAD_URL = "https://download.geonames.org/export/dump/cities15000.zip"
|
||||
|
||||
# Only populated place feature codes — excludes airports, parks, admin areas, etc.
|
||||
# PPLC = capital of a political entity
|
||||
# PPLA = seat of a first-order administrative division
|
||||
# PPLA2 = seat of a second-order admin division
|
||||
# PPL = populated place
|
||||
VALID_FEATURE_CODES = {"PPLC", "PPLA", "PPLA2", "PPL"}
|
||||
|
||||
MIN_POPULATION = 50_000
|
||||
|
||||
# GeoNames tab-separated column layout for cities15000.txt
|
||||
# https://download.geonames.org/export/dump/readme.txt
|
||||
COL_GEONAME_ID = 0
|
||||
COL_NAME = 1
|
||||
COL_ASCIINAME = 2
|
||||
COL_COUNTRY_CODE = 8
|
||||
COL_FEATURE_CODE = 7
|
||||
COL_POPULATION = 14
|
||||
COL_MODIFICATION_DATE = 18
|
||||
|
||||
# Approximate year of last data update (GeoNames doesn't provide a precise vintage)
|
||||
REF_YEAR = 2024
|
||||
|
||||
|
||||
def _parse_cities_txt(content: bytes) -> list[dict]:
|
||||
"""Parse GeoNames cities TSV into filtered rows."""
|
||||
rows: list[dict] = []
|
||||
for line in content.decode("utf-8").splitlines():
|
||||
if not line.strip():
|
||||
continue
|
||||
parts = line.split("\t")
|
||||
if len(parts) < 15:
|
||||
continue
|
||||
feature_code = parts[COL_FEATURE_CODE].strip()
|
||||
if feature_code not in VALID_FEATURE_CODES:
|
||||
continue
|
||||
try:
|
||||
population = int(parts[COL_POPULATION])
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
if population < MIN_POPULATION:
|
||||
continue
|
||||
geoname_id_str = parts[COL_GEONAME_ID].strip()
|
||||
try:
|
||||
geoname_id = int(geoname_id_str)
|
||||
except ValueError:
|
||||
continue
|
||||
# Prefer ASCII name for matching (avoids diacritic mismatch); fall back to name
|
||||
ascii_name = parts[COL_ASCIINAME].strip()
|
||||
name = parts[COL_NAME].strip()
|
||||
city_name = ascii_name if ascii_name else name
|
||||
country_code = parts[COL_COUNTRY_CODE].strip().upper()
|
||||
if not city_name or not country_code:
|
||||
continue
|
||||
rows.append({
|
||||
"geoname_id": geoname_id,
|
||||
"city_name": city_name,
|
||||
"country_code": country_code,
|
||||
"population": population,
|
||||
"ref_year": REF_YEAR,
|
||||
})
|
||||
return rows
|
||||
|
||||
|
||||
def extract(
|
||||
landing_dir: Path,
|
||||
year_month: str,
|
||||
conn: sqlite3.Connection,
|
||||
session: niquests.Session,
|
||||
) -> dict:
|
||||
"""Download GeoNames cities15000.zip. Skips if already run this month."""
|
||||
username = os.environ.get("GEONAMES_USERNAME", "").strip()
|
||||
if not username:
|
||||
logger.warning("GEONAMES_USERNAME not set — skipping GeoNames extract")
|
||||
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||
|
||||
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
|
||||
if last_cursor == year_month:
|
||||
logger.info("already have data for %s — skipping", year_month)
|
||||
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||
|
||||
year, month = year_month.split("/")
|
||||
|
||||
# GeoNames bulk downloads don't require the username in the URL for cities15000.zip,
|
||||
# but the username signals acceptance of their terms of use and helps their monitoring.
|
||||
url = f"{DOWNLOAD_URL}?username={username}"
|
||||
logger.info("GET cities15000.zip (~1.5MB compressed)")
|
||||
resp = session.get(url, timeout=HTTP_TIMEOUT_SECONDS * 4)
|
||||
resp.raise_for_status()
|
||||
|
||||
assert len(resp.content) > 100_000, (
|
||||
f"cities15000.zip too small ({len(resp.content)} bytes) — download may have failed"
|
||||
)
|
||||
|
||||
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
|
||||
txt_name = next((n for n in zf.namelist() if n.endswith(".txt")), None)
|
||||
assert txt_name, f"No .txt file in cities15000.zip: {zf.namelist()}"
|
||||
txt_content = zf.read(txt_name)
|
||||
|
||||
rows = _parse_cities_txt(txt_content)
|
||||
assert len(rows) > 5_000, f"Expected >5000 global cities ≥50K pop, got {len(rows)}"
|
||||
logger.info("parsed %d global cities with population ≥%d", len(rows), MIN_POPULATION)
|
||||
|
||||
dest_dir = landing_path(landing_dir, "geonames", year, month)
|
||||
dest = dest_dir / "cities_global.json.gz"
|
||||
payload = json.dumps({"rows": rows, "count": len(rows)}).encode()
|
||||
bytes_written = write_gzip_atomic(dest, payload)
|
||||
logger.info("written %s bytes compressed", f"{bytes_written:,}")
|
||||
|
||||
return {
|
||||
"files_written": 1,
|
||||
"files_skipped": 0,
|
||||
"bytes_written": bytes_written,
|
||||
"cursor_value": year_month,
|
||||
}
|
||||
|
||||
|
||||
def main() -> None:
|
||||
run_extractor(EXTRACTOR_NAME, extract)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
153
extract/padelnomics_extract/src/padelnomics_extract/ons_uk.py
Normal file
153
extract/padelnomics_extract/src/padelnomics_extract/ons_uk.py
Normal file
@@ -0,0 +1,153 @@
|
||||
"""ONS (Office for National Statistics) UK population extractor.
|
||||
|
||||
Fetches 2021 Census population by Local Authority District (LAD) from the ONS
|
||||
beta API. No authentication required.
|
||||
|
||||
Landing: {LANDING_DIR}/ons_uk/{year}/{month}/lad_population.json.gz
|
||||
Output: {"rows": [{"lad_code": "E08000003", "lad_name": "Manchester",
|
||||
"population": 553230, "ref_year": 2021,
|
||||
"country_code": "GB"}], "count": N}
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
import niquests
|
||||
|
||||
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
|
||||
from .utils import get_last_cursor, landing_path, write_gzip_atomic
|
||||
|
||||
logger = setup_logging("padelnomics.extract.ons_uk")
|
||||
|
||||
EXTRACTOR_NAME = "ons_uk"
|
||||
|
||||
# ONS beta API — 2021 Census population estimates by Local Authority District.
|
||||
# TS007A = "Age by single year" dataset; aggregate gives total population per LAD.
|
||||
# We use the observations endpoint which returns flat rows.
|
||||
# limit=500 covers all ~380 LADs in England, Wales, Scotland, and Northern Ireland.
|
||||
ONS_BASE_URL = (
|
||||
"https://api.beta.ons.gov.uk/v1/datasets/TS007A/editions/2021/versions/1"
|
||||
)
|
||||
|
||||
REF_YEAR = 2021
|
||||
MIN_POPULATION = 50_000
|
||||
# ONS rate limit is 120 requests per 10 seconds; a single paginated call is fine.
|
||||
PAGE_SIZE = 500
|
||||
MAX_PAGES = 10 # safety bound; all LADs fit in page 1 at limit=500
|
||||
|
||||
|
||||
def _fetch_all_observations(session: niquests.Session) -> list[dict]:
|
||||
"""Fetch all LAD population rows, paginating if needed."""
|
||||
rows: list[dict] = []
|
||||
offset = 0
|
||||
|
||||
for page in range(MAX_PAGES):
|
||||
url = f"{ONS_BASE_URL}/observations?geography=*&age=0&limit={PAGE_SIZE}&offset={offset}"
|
||||
resp = session.get(url, timeout=HTTP_TIMEOUT_SECONDS)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
observations = data.get("observations", [])
|
||||
if not observations:
|
||||
break
|
||||
|
||||
for obs in observations:
|
||||
# Each observation: {dimensions: [{id: "geography", option: {id: "E08000003", label: "Manchester"}}...], observation: "553230"}
|
||||
geo_dim = next(
|
||||
(d for d in obs.get("dimensions", []) if d.get("dimension_id") == "geography"),
|
||||
None,
|
||||
)
|
||||
if not geo_dim:
|
||||
continue
|
||||
lad_code = geo_dim.get("option", {}).get("id", "").strip()
|
||||
lad_name = geo_dim.get("option", {}).get("label", "").strip()
|
||||
if not lad_code or not lad_name:
|
||||
continue
|
||||
try:
|
||||
population = int(obs.get("observation", "0").replace(",", ""))
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
rows.append({
|
||||
"lad_code": lad_code,
|
||||
"lad_name": lad_name,
|
||||
"population": population,
|
||||
})
|
||||
|
||||
total = data.get("total_observations", len(rows))
|
||||
offset += len(observations)
|
||||
if offset >= total:
|
||||
break
|
||||
|
||||
logger.info("fetched page %d (%d rows so far)", page + 1, len(rows))
|
||||
|
||||
return rows
|
||||
|
||||
|
||||
def _aggregate_by_lad(raw_rows: list[dict]) -> list[dict]:
|
||||
"""Sum population across all age groups per LAD.
|
||||
|
||||
TS007A breaks population down by single year of age, so we need to aggregate.
|
||||
"""
|
||||
totals: dict[str, dict] = {}
|
||||
for row in raw_rows:
|
||||
key = row["lad_code"]
|
||||
if key not in totals:
|
||||
totals[key] = {"lad_code": row["lad_code"], "lad_name": row["lad_name"], "population": 0}
|
||||
totals[key]["population"] += row["population"]
|
||||
return list(totals.values())
|
||||
|
||||
|
||||
def extract(
|
||||
landing_dir: Path,
|
||||
year_month: str,
|
||||
conn: sqlite3.Connection,
|
||||
session: niquests.Session,
|
||||
) -> dict:
|
||||
"""Fetch ONS LAD population. Skips if already run this month."""
|
||||
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
|
||||
if last_cursor == year_month:
|
||||
logger.info("already have data for %s — skipping", year_month)
|
||||
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||
|
||||
year, month = year_month.split("/")
|
||||
|
||||
logger.info("GET ONS TS007A LAD population (2021 Census)")
|
||||
raw_rows = _fetch_all_observations(session)
|
||||
lad_rows = _aggregate_by_lad(raw_rows)
|
||||
|
||||
filtered = [
|
||||
{
|
||||
"lad_code": r["lad_code"],
|
||||
"lad_name": r["lad_name"],
|
||||
"population": r["population"],
|
||||
"ref_year": REF_YEAR,
|
||||
"country_code": "GB",
|
||||
}
|
||||
for r in lad_rows
|
||||
if r["population"] >= MIN_POPULATION
|
||||
]
|
||||
|
||||
assert len(filtered) > 50, f"Expected >50 UK LADs ≥50K pop, got {len(filtered)}"
|
||||
logger.info("parsed %d UK LADs with population ≥%d", len(filtered), MIN_POPULATION)
|
||||
|
||||
dest_dir = landing_path(landing_dir, "ons_uk", year, month)
|
||||
dest = dest_dir / "lad_population.json.gz"
|
||||
payload = json.dumps({"rows": filtered, "count": len(filtered)}).encode()
|
||||
bytes_written = write_gzip_atomic(dest, payload)
|
||||
logger.info("written %s bytes compressed", f"{bytes_written:,}")
|
||||
|
||||
return {
|
||||
"files_written": 1,
|
||||
"files_skipped": 0,
|
||||
"bytes_written": bytes_written,
|
||||
"cursor_value": year_month,
|
||||
}
|
||||
|
||||
|
||||
def main() -> None:
|
||||
run_extractor(EXTRACTOR_NAME, extract)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -3,14 +3,17 @@
|
||||
-- tracks cities where padel venues actually exist, not an administrative city list.
|
||||
--
|
||||
-- Conformed dimension: used by city_market_profile and all pSEO serving models.
|
||||
-- Integrates two sources:
|
||||
-- Integrates four sources:
|
||||
-- dim_venues → city list, venue count, coordinates (Playtomic + OSM)
|
||||
-- stg_income → country-level median income (Eurostat)
|
||||
-- stg_city_labels → Eurostat city_code → city_name mapping (EU cities)
|
||||
-- stg_population → Eurostat city-level population (EU, joined via city code)
|
||||
-- stg_population_usa → US Census ACS place population
|
||||
-- stg_population_uk → ONS LAD population
|
||||
-- stg_population_geonames → GeoNames global fallback
|
||||
--
|
||||
-- Population note: Eurostat uses coded identifiers (e.g. DE001C = Berlin) with no
|
||||
-- city name column in the dataset we extract. City-level population requires a
|
||||
-- separate code→name lookup extract (future improvement). Population is set to 0
|
||||
-- until that source is available; market_score degrades gracefully.
|
||||
-- Population cascade: Eurostat EU > US Census > ONS UK > GeoNames > 0.
|
||||
-- City name matching is case/whitespace-insensitive within each country.
|
||||
--
|
||||
-- Grain: (country_code, city_slug) — two cities in different countries can share a
|
||||
-- city name. QUALIFY enforces no duplicate (country_code, city_slug) pairs.
|
||||
@@ -42,6 +45,39 @@ country_income AS (
|
||||
SELECT country_code, median_income_pps, ref_year AS income_year
|
||||
FROM staging.stg_income
|
||||
QUALIFY ROW_NUMBER() OVER (PARTITION BY country_code ORDER BY ref_year DESC) = 1
|
||||
),
|
||||
-- Eurostat EU population: join city labels (code→name) with population values.
|
||||
-- QUALIFY keeps only the most recent year per (country, city name).
|
||||
eurostat_pop AS (
|
||||
SELECT
|
||||
cl.country_code,
|
||||
cl.city_name,
|
||||
p.population,
|
||||
p.ref_year
|
||||
FROM staging.stg_city_labels cl
|
||||
JOIN staging.stg_population p ON cl.city_code = p.city_code
|
||||
QUALIFY ROW_NUMBER() OVER (
|
||||
PARTITION BY cl.country_code, cl.city_name
|
||||
ORDER BY p.ref_year DESC
|
||||
) = 1
|
||||
),
|
||||
-- US Census ACS population (place-level, filtered to ≥50K)
|
||||
us_pop AS (
|
||||
SELECT city_name, country_code, population, ref_year
|
||||
FROM staging.stg_population_usa
|
||||
QUALIFY ROW_NUMBER() OVER (PARTITION BY place_fips ORDER BY ref_year DESC) = 1
|
||||
),
|
||||
-- ONS UK Local Authority District population
|
||||
uk_pop AS (
|
||||
SELECT lad_name AS city_name, country_code, population, ref_year
|
||||
FROM staging.stg_population_uk
|
||||
QUALIFY ROW_NUMBER() OVER (PARTITION BY lad_code ORDER BY ref_year DESC) = 1
|
||||
),
|
||||
-- GeoNames global fallback (all cities ≥50K)
|
||||
geonames_pop AS (
|
||||
SELECT city_name, country_code, population, ref_year
|
||||
FROM staging.stg_population_geonames
|
||||
QUALIFY ROW_NUMBER() OVER (PARTITION BY geoname_id ORDER BY ref_year DESC) = 1
|
||||
)
|
||||
SELECT
|
||||
vc.country_code,
|
||||
@@ -99,15 +135,43 @@ SELECT
|
||||
)) AS country_slug,
|
||||
vc.centroid_lat AS lat,
|
||||
vc.centroid_lon AS lon,
|
||||
-- Population: requires code→name Eurostat lookup (not yet extracted); defaults to 0.
|
||||
-- market_score uses LOG(GREATEST(population, 1)) so 0 degrades score gracefully.
|
||||
0::BIGINT AS population,
|
||||
0::INTEGER AS population_year,
|
||||
-- Population cascade: Eurostat EU > US Census > ONS UK > GeoNames > 0.
|
||||
-- City name match is case/whitespace-insensitive within each country.
|
||||
COALESCE(
|
||||
ep.population,
|
||||
usa.population,
|
||||
uk.population,
|
||||
gn.population,
|
||||
0
|
||||
)::BIGINT AS population,
|
||||
COALESCE(
|
||||
ep.ref_year,
|
||||
usa.ref_year,
|
||||
uk.ref_year,
|
||||
gn.ref_year,
|
||||
0
|
||||
)::INTEGER AS population_year,
|
||||
vc.padel_venue_count,
|
||||
ci.median_income_pps,
|
||||
ci.income_year
|
||||
FROM venue_cities vc
|
||||
LEFT JOIN country_income ci ON vc.country_code = ci.country_code
|
||||
-- Eurostat EU population (via city code→name lookup)
|
||||
LEFT JOIN eurostat_pop ep
|
||||
ON vc.country_code = ep.country_code
|
||||
AND LOWER(TRIM(vc.city_name)) = LOWER(TRIM(ep.city_name))
|
||||
-- US Census population
|
||||
LEFT JOIN us_pop usa
|
||||
ON vc.country_code = usa.country_code
|
||||
AND LOWER(TRIM(vc.city_name)) = LOWER(TRIM(usa.city_name))
|
||||
-- ONS UK population
|
||||
LEFT JOIN uk_pop uk
|
||||
ON vc.country_code = uk.country_code
|
||||
AND LOWER(TRIM(vc.city_name)) = LOWER(TRIM(uk.city_name))
|
||||
-- GeoNames global fallback
|
||||
LEFT JOIN geonames_pop gn
|
||||
ON vc.country_code = gn.country_code
|
||||
AND LOWER(TRIM(vc.city_name)) = LOWER(TRIM(gn.city_name))
|
||||
-- Enforce grain: if two cities in the same country have the same slug
|
||||
-- (e.g. 'São Paulo' and 'Sao Paulo'), keep the one with more venues
|
||||
QUALIFY ROW_NUMBER() OVER (
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
-- One Big Table: per-city padel market intelligence.
|
||||
-- Consumed by: SEO article generation, planner city-select pre-fill, API endpoints.
|
||||
--
|
||||
-- Market score (0–100) is a simple composite:
|
||||
-- 40% population (log-scaled, city > 500K = max)
|
||||
-- 40% venue density (courts per 100K residents)
|
||||
-- 20% data confidence (completeness of both population + venue data)
|
||||
-- Market score v2 (0–100):
|
||||
-- 30 pts population — log-scaled to 1M+ city ceiling (was 40pts/500K)
|
||||
-- 25 pts income PPS — normalised to 200 ceiling (covers CH/NO/LU outliers)
|
||||
-- 30 pts demand — observed occupancy if available, else venue density
|
||||
-- 15 pts data quality — completeness discount, not a market signal
|
||||
|
||||
MODEL (
|
||||
name serving.city_market_profile,
|
||||
@@ -37,19 +38,41 @@ WITH base AS (
|
||||
WHEN c.population > 0 AND c.padel_venue_count > 0 THEN 1.0
|
||||
WHEN c.population > 0 OR c.padel_venue_count > 0 THEN 0.5
|
||||
ELSE 0.0
|
||||
END AS data_confidence
|
||||
END AS data_confidence,
|
||||
-- Pricing / occupancy from Playtomic (NULL when no availability data)
|
||||
vpb.median_hourly_rate,
|
||||
vpb.median_peak_rate,
|
||||
vpb.median_offpeak_rate,
|
||||
vpb.median_occupancy_rate,
|
||||
vpb.median_daily_revenue_per_venue,
|
||||
vpb.price_currency
|
||||
FROM foundation.dim_cities c
|
||||
LEFT JOIN serving.venue_pricing_benchmarks vpb
|
||||
ON c.country_code = vpb.country_code
|
||||
AND LOWER(TRIM(c.city_name)) = LOWER(TRIM(vpb.city))
|
||||
WHERE c.padel_venue_count > 0
|
||||
),
|
||||
scored AS (
|
||||
SELECT *,
|
||||
ROUND(
|
||||
-- Population component (log scale, 500K+ city → 40 pts)
|
||||
40.0 * LEAST(1.0, LN(GREATEST(population, 1)) / LN(500000))
|
||||
-- Density component (5 courts/100K → 40 pts)
|
||||
+ 40.0 * LEAST(1.0, COALESCE(venues_per_100k, 0) / 5.0)
|
||||
-- Confidence component
|
||||
+ 20.0 * data_confidence
|
||||
-- Population (30 pts): log-scale, 1M+ city = full marks.
|
||||
-- LN(1) = 0 so unpopulated cities score 0 here — they still score on demand.
|
||||
30.0 * LEAST(1.0, LN(GREATEST(population, 1)) / LN(1000000))
|
||||
-- Economic power (25 pts): income PPS normalised to 200 ceiling.
|
||||
-- 200 covers high-income outliers (CH ~190, NO ~180, LU ~200+).
|
||||
-- Drives pricing power and willingness-to-pay directly.
|
||||
+ 25.0 * LEAST(1.0, COALESCE(median_income_pps, 100) / 200.0)
|
||||
-- Demand evidence (30 pts): observed occupancy is the best signal
|
||||
-- (proves real demand). If unavailable, venue density is the proxy
|
||||
-- (proves market exists; caps at 4/100K to avoid penalising dense cities).
|
||||
+ 30.0 * CASE
|
||||
WHEN median_occupancy_rate IS NOT NULL
|
||||
THEN LEAST(1.0, median_occupancy_rate / 0.65)
|
||||
ELSE LEAST(1.0, COALESCE(venues_per_100k, 0) / 4.0)
|
||||
END
|
||||
-- Data quality (15 pts): measures completeness, not market quality.
|
||||
-- Reduced from 20pts — kept as confidence discount, not market signal.
|
||||
+ 15.0 * data_confidence
|
||||
, 1) AS market_score
|
||||
FROM base
|
||||
)
|
||||
@@ -69,16 +92,12 @@ SELECT
|
||||
s.market_score,
|
||||
s.median_income_pps,
|
||||
s.income_year,
|
||||
-- Playtomic pricing/occupancy (NULL when no availability data)
|
||||
vpb.median_hourly_rate,
|
||||
vpb.median_peak_rate,
|
||||
vpb.median_offpeak_rate,
|
||||
vpb.median_occupancy_rate,
|
||||
vpb.median_daily_revenue_per_venue,
|
||||
vpb.price_currency,
|
||||
s.median_hourly_rate,
|
||||
s.median_peak_rate,
|
||||
s.median_offpeak_rate,
|
||||
s.median_occupancy_rate,
|
||||
s.median_daily_revenue_per_venue,
|
||||
s.price_currency,
|
||||
CURRENT_DATE AS refreshed_date
|
||||
FROM scored s
|
||||
LEFT JOIN serving.venue_pricing_benchmarks vpb
|
||||
ON s.country_code = vpb.country_code
|
||||
AND LOWER(TRIM(s.city_name)) = LOWER(TRIM(vpb.city))
|
||||
ORDER BY s.market_score DESC
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
-- Eurostat SDMX city codelist: city_code → city_name mapping.
|
||||
-- Maps coded identifiers (e.g. DE001C) to human-readable names (e.g. Berlin).
|
||||
-- This is the bridge table that lets stg_population join to dim_cities.
|
||||
--
|
||||
-- Source: data/landing/eurostat_city_labels/{year}/{month}/cities_codelist.json.gz
|
||||
|
||||
MODEL (
|
||||
name staging.stg_city_labels,
|
||||
kind FULL,
|
||||
cron '@daily',
|
||||
grain city_code
|
||||
);
|
||||
|
||||
WITH raw AS (
|
||||
SELECT unnest(rows) AS r
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/eurostat_city_labels/*/*/cities_codelist.json.gz',
|
||||
auto_detect = true
|
||||
)
|
||||
)
|
||||
SELECT
|
||||
UPPER(TRIM(r ->> 'city_code')) AS city_code,
|
||||
TRIM(r ->> 'city_name') AS city_name,
|
||||
-- Country code is always the first two letters of the city code (e.g. DE001C → DE)
|
||||
UPPER(LEFT(TRIM(r ->> 'city_code'), 2)) AS country_code,
|
||||
CURRENT_DATE AS extracted_date
|
||||
FROM raw
|
||||
WHERE (r ->> 'city_code') IS NOT NULL
|
||||
AND (r ->> 'city_name') IS NOT NULL
|
||||
AND LENGTH(TRIM(r ->> 'city_code')) > 0
|
||||
AND LENGTH(TRIM(r ->> 'city_name')) > 0
|
||||
@@ -0,0 +1,42 @@
|
||||
-- GeoNames global city population (cities15000 bulk dataset, filtered to ≥50K).
|
||||
-- Global fallback for countries not covered by Eurostat, Census, or ONS.
|
||||
-- One row per geoname_id (GeoNames stable numeric identifier).
|
||||
--
|
||||
-- Source: data/landing/geonames/{year}/{month}/cities_global.json.gz
|
||||
|
||||
MODEL (
|
||||
name staging.stg_population_geonames,
|
||||
kind FULL,
|
||||
cron '@daily',
|
||||
grain geoname_id
|
||||
);
|
||||
|
||||
WITH parsed AS (
|
||||
SELECT
|
||||
TRY_CAST(row ->> 'geoname_id' AS INTEGER) AS geoname_id,
|
||||
row ->> 'city_name' AS city_name,
|
||||
row ->> 'country_code' AS country_code,
|
||||
TRY_CAST(row ->> 'population' AS BIGINT) AS population,
|
||||
TRY_CAST(row ->> 'ref_year' AS INTEGER) AS ref_year,
|
||||
CURRENT_DATE AS extracted_date
|
||||
FROM (
|
||||
SELECT UNNEST(rows) AS row
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/geonames/*/*/cities_global.json.gz',
|
||||
auto_detect = true
|
||||
)
|
||||
)
|
||||
WHERE (row ->> 'geoname_id') IS NOT NULL
|
||||
)
|
||||
SELECT
|
||||
geoname_id,
|
||||
TRIM(city_name) AS city_name,
|
||||
UPPER(country_code) AS country_code,
|
||||
population,
|
||||
ref_year,
|
||||
extracted_date
|
||||
FROM parsed
|
||||
WHERE population IS NOT NULL
|
||||
AND population > 0
|
||||
AND geoname_id IS NOT NULL
|
||||
AND city_name IS NOT NULL
|
||||
@@ -0,0 +1,41 @@
|
||||
-- ONS 2021 Census population by Local Authority District (LAD).
|
||||
-- Reads pre-processed landing zone JSON from ons_uk extractor.
|
||||
-- One row per (lad_code, ref_year) — LAD code is the ONS area identifier.
|
||||
--
|
||||
-- Source: data/landing/ons_uk/{year}/{month}/lad_population.json.gz
|
||||
|
||||
MODEL (
|
||||
name staging.stg_population_uk,
|
||||
kind FULL,
|
||||
cron '@daily',
|
||||
grain (lad_code, ref_year)
|
||||
);
|
||||
|
||||
WITH parsed AS (
|
||||
SELECT
|
||||
row ->> 'lad_code' AS lad_code,
|
||||
row ->> 'lad_name' AS lad_name,
|
||||
TRY_CAST(row ->> 'population' AS BIGINT) AS population,
|
||||
TRY_CAST(row ->> 'ref_year' AS INTEGER) AS ref_year,
|
||||
row ->> 'country_code' AS country_code,
|
||||
CURRENT_DATE AS extracted_date
|
||||
FROM (
|
||||
SELECT UNNEST(rows) AS row
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/ons_uk/*/*/lad_population.json.gz',
|
||||
auto_detect = true
|
||||
)
|
||||
)
|
||||
WHERE (row ->> 'lad_code') IS NOT NULL
|
||||
)
|
||||
SELECT
|
||||
UPPER(TRIM(lad_code)) AS lad_code,
|
||||
TRIM(lad_name) AS lad_name,
|
||||
population,
|
||||
ref_year,
|
||||
UPPER(country_code) AS country_code,
|
||||
extracted_date
|
||||
FROM parsed
|
||||
WHERE population IS NOT NULL
|
||||
AND population > 0
|
||||
AND lad_code IS NOT NULL
|
||||
@@ -0,0 +1,43 @@
|
||||
-- US Census ACS 5-year place-level population.
|
||||
-- Reads pre-processed landing zone JSON from census_usa extractor.
|
||||
-- One row per (place_fips, ref_year) — surrogate key is the Census FIPS code.
|
||||
--
|
||||
-- Source: data/landing/census_usa/{year}/{month}/acs5_places.json.gz
|
||||
|
||||
MODEL (
|
||||
name staging.stg_population_usa,
|
||||
kind FULL,
|
||||
cron '@daily',
|
||||
grain (place_fips, ref_year)
|
||||
);
|
||||
|
||||
WITH parsed AS (
|
||||
SELECT
|
||||
row ->> 'city_name' AS city_name,
|
||||
row ->> 'state_fips' AS state_fips,
|
||||
row ->> 'place_fips' AS place_fips,
|
||||
TRY_CAST(row ->> 'population' AS BIGINT) AS population,
|
||||
TRY_CAST(row ->> 'ref_year' AS INTEGER) AS ref_year,
|
||||
row ->> 'country_code' AS country_code,
|
||||
CURRENT_DATE AS extracted_date
|
||||
FROM (
|
||||
SELECT UNNEST(rows) AS row
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/census_usa/*/*/acs5_places.json.gz',
|
||||
auto_detect = true
|
||||
)
|
||||
)
|
||||
WHERE (row ->> 'place_fips') IS NOT NULL
|
||||
)
|
||||
SELECT
|
||||
TRIM(city_name) AS city_name,
|
||||
state_fips,
|
||||
place_fips,
|
||||
population,
|
||||
ref_year,
|
||||
UPPER(country_code) AS country_code,
|
||||
extracted_date
|
||||
FROM parsed
|
||||
WHERE population IS NOT NULL
|
||||
AND population > 0
|
||||
AND place_fips IS NOT NULL
|
||||
Reference in New Issue
Block a user