GeoNames: - cities15000 → cities1000 (~140K global locations, pop ≥ 1K) - Add lat/lon, admin1_code, admin2_code to output (needed for dim_locations) - Expand feature codes to include PPLA3/4/5 (Gemeinden, cantons, etc.) - Remove MIN_POPULATION=50K floor — cities1000 already pre-filters to ≥1K - Update assertions for new scale (~100K+ expected) Tennis courts: - New overpass_tennis.py extractor (sport=tennis, 180s Overpass timeout) - Registered as extract-overpass-tennis, added to EXTRACTORS list - New stg_tennis_courts.sql staging model (grain: osm_id) stg_population_geonames: add lat, lon, admin1_code, admin2_code columns Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
186 lines
6.7 KiB
Python
186 lines
6.7 KiB
Python
"""GeoNames global city population extractor.
|
|
|
|
Downloads the cities1000.zip bulk file (~30MB compressed, ~140K entries) from
|
|
GeoNames. Includes all populated places with population ≥ 1,000 and feature codes
|
|
in {PPLA, PPLA2, PPLA3, PPLA4, PPLA5, PPLC, PPL}.
|
|
|
|
This broader coverage (vs. the old cities15000 with ≥50K filter) supports
|
|
Gemeinde-level market intelligence pages — small municipalities often have the
|
|
highest padel investment opportunity (white space markets).
|
|
|
|
Requires: GEONAMES_USERNAME env var (free registration at geonames.org)
|
|
|
|
Landing: {LANDING_DIR}/geonames/{year}/{month}/cities_global.json.gz
|
|
Output: {"rows": [{"geoname_id": 2950159, "city_name": "Berlin",
|
|
"country_code": "DE", "population": 3644826,
|
|
"lat": 52.524, "lon": 13.411,
|
|
"admin1_code": "16", "admin2_code": "00",
|
|
"ref_year": 2024}], "count": N}
|
|
"""
|
|
|
|
import io
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
import niquests
|
|
|
|
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
|
|
from .utils import get_last_cursor, landing_path, write_gzip_atomic
|
|
|
|
logger = setup_logging("padelnomics.extract.geonames")
|
|
|
|
EXTRACTOR_NAME = "geonames"
|
|
|
|
DOWNLOAD_URL = "https://download.geonames.org/export/dump/cities1000.zip"
|
|
|
|
# Only populated place feature codes — excludes airports, parks, admin areas, etc.
|
|
# PPLC = capital of a political entity
|
|
# PPLA = seat of a first-order administrative division
|
|
# PPLA2 = seat of a second-order admin division
|
|
# PPLA3 = seat of a third-order admin division (Gemeinden, cantons, etc.)
|
|
# PPLA4 = seat of a fourth-order admin division
|
|
# PPLA5 = seat of a fifth-order admin division
|
|
# PPL = populated place
|
|
VALID_FEATURE_CODES = {"PPLC", "PPLA", "PPLA2", "PPLA3", "PPLA4", "PPLA5", "PPL"}
|
|
|
|
# No population floor — cities1000.zip is pre-filtered to ≥ 1,000.
|
|
# Accept all to maximise Gemeinde-level coverage.
|
|
MIN_POPULATION = 0
|
|
|
|
# GeoNames tab-separated column layout for cities1000.txt
|
|
# https://download.geonames.org/export/dump/readme.txt
|
|
COL_GEONAME_ID = 0
|
|
COL_NAME = 1
|
|
COL_ASCIINAME = 2
|
|
COL_LAT = 4
|
|
COL_LON = 5
|
|
COL_FEATURE_CODE = 7
|
|
COL_COUNTRY_CODE = 8
|
|
COL_ADMIN1_CODE = 10
|
|
COL_ADMIN2_CODE = 11
|
|
COL_POPULATION = 14
|
|
COL_MODIFICATION_DATE = 18
|
|
|
|
# Approximate year of last data update (GeoNames doesn't provide a precise vintage)
|
|
REF_YEAR = 2024
|
|
|
|
|
|
def _parse_cities_txt(content: bytes) -> list[dict]:
|
|
"""Parse GeoNames cities TSV into filtered rows."""
|
|
rows: list[dict] = []
|
|
for line in content.decode("utf-8").splitlines():
|
|
if not line.strip():
|
|
continue
|
|
parts = line.split("\t")
|
|
if len(parts) < 15:
|
|
continue
|
|
feature_code = parts[COL_FEATURE_CODE].strip()
|
|
if feature_code not in VALID_FEATURE_CODES:
|
|
continue
|
|
try:
|
|
population = int(parts[COL_POPULATION])
|
|
except (ValueError, IndexError):
|
|
continue
|
|
if population < MIN_POPULATION:
|
|
continue
|
|
geoname_id_str = parts[COL_GEONAME_ID].strip()
|
|
try:
|
|
geoname_id = int(geoname_id_str)
|
|
except ValueError:
|
|
continue
|
|
# Prefer ASCII name for matching (avoids diacritic mismatch); fall back to name
|
|
ascii_name = parts[COL_ASCIINAME].strip()
|
|
name = parts[COL_NAME].strip()
|
|
city_name = ascii_name if ascii_name else name
|
|
country_code = parts[COL_COUNTRY_CODE].strip().upper()
|
|
if not city_name or not country_code:
|
|
continue
|
|
try:
|
|
lat = float(parts[COL_LAT])
|
|
lon = float(parts[COL_LON])
|
|
except (ValueError, IndexError):
|
|
continue
|
|
admin1_code = parts[COL_ADMIN1_CODE].strip() if len(parts) > COL_ADMIN1_CODE else ""
|
|
admin2_code = parts[COL_ADMIN2_CODE].strip() if len(parts) > COL_ADMIN2_CODE else ""
|
|
rows.append({
|
|
"geoname_id": geoname_id,
|
|
"city_name": city_name,
|
|
"country_code": country_code,
|
|
"lat": lat,
|
|
"lon": lon,
|
|
"admin1_code": admin1_code or None,
|
|
"admin2_code": admin2_code or None,
|
|
"population": population,
|
|
"ref_year": REF_YEAR,
|
|
})
|
|
return rows
|
|
|
|
|
|
def extract(
|
|
landing_dir: Path,
|
|
year_month: str,
|
|
conn: sqlite3.Connection,
|
|
session: niquests.Session,
|
|
) -> dict:
|
|
"""Download GeoNames cities1000.zip. Skips if already run this month."""
|
|
username = os.environ.get("GEONAMES_USERNAME", "").strip()
|
|
if not username:
|
|
logger.warning("GEONAMES_USERNAME not set — writing empty placeholder so SQLMesh models can run")
|
|
year, month = year_month.split("/")
|
|
dest_dir = landing_path(landing_dir, "geonames", year, month)
|
|
dest = dest_dir / "cities_global.json.gz"
|
|
if not dest.exists():
|
|
write_gzip_atomic(dest, b'{"rows": [], "count": 0}')
|
|
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
|
|
|
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
|
|
if last_cursor == year_month:
|
|
logger.info("already have data for %s — skipping", year_month)
|
|
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
|
|
|
year, month = year_month.split("/")
|
|
|
|
# GeoNames bulk downloads don't require the username in the URL for cities1000.zip,
|
|
# but the username signals acceptance of their terms of use and helps their monitoring.
|
|
url = f"{DOWNLOAD_URL}?username={username}"
|
|
logger.info("GET cities1000.zip (~30MB compressed, ~140K locations)")
|
|
resp = session.get(url, timeout=HTTP_TIMEOUT_SECONDS * 10)
|
|
resp.raise_for_status()
|
|
|
|
assert len(resp.content) > 1_000_000, (
|
|
f"cities1000.zip too small ({len(resp.content)} bytes) — download may have failed"
|
|
)
|
|
|
|
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
|
|
txt_name = next((n for n in zf.namelist() if n.endswith(".txt")), None)
|
|
assert txt_name, f"No .txt file in cities1000.zip: {zf.namelist()}"
|
|
txt_content = zf.read(txt_name)
|
|
|
|
rows = _parse_cities_txt(txt_content)
|
|
assert len(rows) > 100_000, f"Expected >100K global locations (pop ≥1K), got {len(rows)}"
|
|
logger.info("parsed %d global locations (pop ≥1K)", len(rows))
|
|
|
|
dest_dir = landing_path(landing_dir, "geonames", year, month)
|
|
dest = dest_dir / "cities_global.json.gz"
|
|
payload = json.dumps({"rows": rows, "count": len(rows)}).encode()
|
|
bytes_written = write_gzip_atomic(dest, payload)
|
|
logger.info("written %s bytes compressed", f"{bytes_written:,}")
|
|
|
|
return {
|
|
"files_written": 1,
|
|
"files_skipped": 0,
|
|
"bytes_written": bytes_written,
|
|
"cursor_value": year_month,
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
run_extractor(EXTRACTOR_NAME, extract)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|