Files
padelnomics/extract/padelnomics_extract/src/padelnomics_extract/geonames.py
Deeman 567798ebe1 feat(extract): add skip_if_current() and write_jsonl_atomic() helpers
Task 5/6: Compress repeated patterns in extractors:
- skip_if_current(): cursor check + early-return dict (3 extractors)
- write_jsonl_atomic(): working-file → JSONL → compress (2 extractors)
Applied in gisco, geonames, census_usa, playtomic_tenants.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 07:49:18 +01:00

187 lines
6.7 KiB
Python

"""GeoNames global city population extractor.
Downloads the cities1000.zip bulk file (~30MB compressed, ~140K entries) from
GeoNames. Includes all populated places with population ≥ 1,000 and feature codes
in {PPLA, PPLA2, PPLA3, PPLA4, PPLA5, PPLC, PPL}.
This broader coverage (vs. the old cities15000 with ≥50K filter) supports
Gemeinde-level market intelligence pages — small municipalities often have the
highest padel investment opportunity (white space markets).
Requires: GEONAMES_USERNAME env var (free registration at geonames.org)
Landing: {LANDING_DIR}/geonames/{year}/{month}/cities_global.jsonl.gz
Output: one JSON object per line, e.g.:
{"geoname_id": 2950159, "city_name": "Berlin", "country_code": "DE",
"population": 3644826, "lat": 52.524, "lon": 13.411,
"admin1_code": "16", "admin2_code": "00", "ref_year": 2024}
"""
import gzip
import io
import os
import sqlite3
import zipfile
from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import landing_path, skip_if_current, write_jsonl_atomic
logger = setup_logging("padelnomics.extract.geonames")
EXTRACTOR_NAME = "geonames"
DOWNLOAD_URL = "https://download.geonames.org/export/dump/cities1000.zip"
# Only populated place feature codes — excludes airports, parks, admin areas, etc.
# PPLC = capital of a political entity
# PPLA = seat of a first-order administrative division
# PPLA2 = seat of a second-order admin division
# PPLA3 = seat of a third-order admin division (Gemeinden, cantons, etc.)
# PPLA4 = seat of a fourth-order admin division
# PPLA5 = seat of a fifth-order admin division
# PPL = populated place
VALID_FEATURE_CODES = {"PPLC", "PPLA", "PPLA2", "PPLA3", "PPLA4", "PPLA5", "PPL"}
# No population floor — cities1000.zip is pre-filtered to ≥ 1,000.
# Accept all to maximise Gemeinde-level coverage.
MIN_POPULATION = 0
# GeoNames tab-separated column layout for cities1000.txt
# https://download.geonames.org/export/dump/readme.txt
COL_GEONAME_ID = 0
COL_NAME = 1
COL_ASCIINAME = 2
COL_LAT = 4
COL_LON = 5
COL_FEATURE_CODE = 7
COL_COUNTRY_CODE = 8
COL_ADMIN1_CODE = 10
COL_ADMIN2_CODE = 11
COL_POPULATION = 14
COL_MODIFICATION_DATE = 18
# Approximate year of last data update (GeoNames doesn't provide a precise vintage)
REF_YEAR = 2024
def _parse_cities_txt(content: bytes) -> list[dict]:
"""Parse GeoNames cities TSV into filtered rows."""
rows: list[dict] = []
for line in content.decode("utf-8").splitlines():
if not line.strip():
continue
parts = line.split("\t")
if len(parts) < 15:
continue
feature_code = parts[COL_FEATURE_CODE].strip()
if feature_code not in VALID_FEATURE_CODES:
continue
try:
population = int(parts[COL_POPULATION])
except (ValueError, IndexError):
continue
if population < MIN_POPULATION:
continue
geoname_id_str = parts[COL_GEONAME_ID].strip()
try:
geoname_id = int(geoname_id_str)
except ValueError:
continue
# Prefer ASCII name for matching (avoids diacritic mismatch); fall back to name
ascii_name = parts[COL_ASCIINAME].strip()
name = parts[COL_NAME].strip()
city_name = ascii_name if ascii_name else name
country_code = parts[COL_COUNTRY_CODE].strip().upper()
if not city_name or not country_code:
continue
try:
lat = float(parts[COL_LAT])
lon = float(parts[COL_LON])
except (ValueError, IndexError):
continue
admin1_code = parts[COL_ADMIN1_CODE].strip() if len(parts) > COL_ADMIN1_CODE else ""
admin2_code = parts[COL_ADMIN2_CODE].strip() if len(parts) > COL_ADMIN2_CODE else ""
rows.append({
"geoname_id": geoname_id,
"city_name": city_name,
"country_code": country_code,
"lat": lat,
"lon": lon,
"admin1_code": admin1_code or None,
"admin2_code": admin2_code or None,
"population": population,
"ref_year": REF_YEAR,
})
return rows
def extract(
landing_dir: Path,
year_month: str,
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Download GeoNames cities1000.zip. Skips if already run this month."""
username = os.environ.get("GEONAMES_USERNAME", "").strip()
if not username:
logger.warning("GEONAMES_USERNAME not set — writing empty placeholder so SQLMesh models can run")
year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "geonames", year, month)
dest = dest_dir / "cities_global.jsonl.gz"
if not dest.exists():
tmp = dest.with_suffix(".gz.tmp")
with gzip.open(tmp, "wt") as f:
f.write('{"geoname_id":null}\n') # filtered by WHERE geoname_id IS NOT NULL
tmp.rename(dest)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
skip = skip_if_current(conn, EXTRACTOR_NAME, year_month)
if skip:
logger.info("already have data for %s — skipping", year_month)
return skip
year, month = year_month.split("/")
# GeoNames bulk downloads don't require the username in the URL for cities1000.zip,
# but the username signals acceptance of their terms of use and helps their monitoring.
url = f"{DOWNLOAD_URL}?username={username}"
logger.info("GET cities1000.zip (~30MB compressed, ~140K locations)")
resp = session.get(url, timeout=HTTP_TIMEOUT_SECONDS * 10)
resp.raise_for_status()
assert len(resp.content) > 1_000_000, (
f"cities1000.zip too small ({len(resp.content)} bytes) — download may have failed"
)
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
txt_name = next((n for n in zf.namelist() if n.endswith(".txt")), None)
assert txt_name, f"No .txt file in cities1000.zip: {zf.namelist()}"
txt_content = zf.read(txt_name)
rows = _parse_cities_txt(txt_content)
assert len(rows) > 100_000, f"Expected >100K global locations (pop ≥1K), got {len(rows)}"
logger.info("parsed %d global locations (pop ≥1K)", len(rows))
dest_dir = landing_path(landing_dir, "geonames", year, month)
dest = dest_dir / "cities_global.jsonl.gz"
bytes_written = write_jsonl_atomic(dest, rows)
logger.info("written %s bytes compressed", f"{bytes_written:,}")
return {
"files_written": 1,
"files_skipped": 0,
"bytes_written": bytes_written,
"cursor_value": year_month,
}
def main() -> None:
run_extractor(EXTRACTOR_NAME, extract)
if __name__ == "__main__":
main()