From a4f246d69a4d81c30055d0be4b279ff30d749bd1 Mon Sep 17 00:00:00 2001 From: Deeman Date: Wed, 25 Feb 2026 12:16:59 +0100 Subject: [PATCH] feat(extract): convert geonames to JSONL output - cities_global.jsonl.gz replaces .json.gz (one city object per line) - Empty placeholder writes a minimal .jsonl.gz (null row, filtered in staging) - Eliminates the {"rows": [...]} blob wrapper and maximum_object_size workaround stg_population_geonames: UNION ALL transition (jsonl_rows + blob_rows) - jsonl_rows: read_json JSONL, explicit columns, no UNNEST - blob_rows: existing UNNEST(rows) pattern with 40MB size limit retained Co-Authored-By: Claude Sonnet 4.6 --- .../src/padelnomics_extract/geonames.py | 30 ++++++++----- .../staging/stg_population_geonames.sql | 45 ++++++++++++++++--- 2 files changed, 58 insertions(+), 17 deletions(-) diff --git a/extract/padelnomics_extract/src/padelnomics_extract/geonames.py b/extract/padelnomics_extract/src/padelnomics_extract/geonames.py index b6d6a8d..0e83498 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/geonames.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/geonames.py @@ -10,14 +10,14 @@ highest padel investment opportunity (white space markets). Requires: GEONAMES_USERNAME env var (free registration at geonames.org) -Landing: {LANDING_DIR}/geonames/{year}/{month}/cities_global.json.gz -Output: {"rows": [{"geoname_id": 2950159, "city_name": "Berlin", - "country_code": "DE", "population": 3644826, - "lat": 52.524, "lon": 13.411, - "admin1_code": "16", "admin2_code": "00", - "ref_year": 2024}], "count": N} +Landing: {LANDING_DIR}/geonames/{year}/{month}/cities_global.jsonl.gz +Output: one JSON object per line, e.g.: + {"geoname_id": 2950159, "city_name": "Berlin", "country_code": "DE", + "population": 3644826, "lat": 52.524, "lon": 13.411, + "admin1_code": "16", "admin2_code": "00", "ref_year": 2024} """ +import gzip import io import json import os @@ -28,7 +28,7 @@ from pathlib import Path import niquests from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging -from .utils import get_last_cursor, landing_path, write_gzip_atomic +from .utils import compress_jsonl_atomic, get_last_cursor, landing_path logger = setup_logging("padelnomics.extract.geonames") @@ -131,9 +131,12 @@ def extract( logger.warning("GEONAMES_USERNAME not set — writing empty placeholder so SQLMesh models can run") year, month = year_month.split("/") dest_dir = landing_path(landing_dir, "geonames", year, month) - dest = dest_dir / "cities_global.json.gz" + dest = dest_dir / "cities_global.jsonl.gz" if not dest.exists(): - write_gzip_atomic(dest, b'{"rows": [], "count": 0}') + tmp = dest.with_suffix(".gz.tmp") + with gzip.open(tmp, "wt") as f: + f.write('{"geoname_id":null}\n') # filtered by WHERE geoname_id IS NOT NULL + tmp.rename(dest) return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) @@ -164,9 +167,12 @@ def extract( logger.info("parsed %d global locations (pop ≥1K)", len(rows)) dest_dir = landing_path(landing_dir, "geonames", year, month) - dest = dest_dir / "cities_global.json.gz" - payload = json.dumps({"rows": rows, "count": len(rows)}).encode() - bytes_written = write_gzip_atomic(dest, payload) + dest = dest_dir / "cities_global.jsonl.gz" + working_path = dest.with_suffix(".working.jsonl") + with open(working_path, "w") as f: + for row in rows: + f.write(json.dumps(row, separators=(",", ":")) + "\n") + bytes_written = compress_jsonl_atomic(working_path, dest) logger.info("written %s bytes compressed", f"{bytes_written:,}") return { diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_population_geonames.sql b/transform/sqlmesh_padelnomics/models/staging/stg_population_geonames.sql index 699c90e..82f4826 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_population_geonames.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_population_geonames.sql @@ -3,7 +3,11 @@ -- Broad coverage (140K+ locations) enables Gemeinde-level market intelligence. -- One row per geoname_id (GeoNames stable numeric identifier). -- --- Source: data/landing/geonames/{year}/{month}/cities_global.json.gz +-- Supports two landing formats (UNION ALL during migration): +-- New: cities_global.jsonl.gz — one city per line, columns directly accessible +-- Old: cities_global.json.gz — {"rows": [...]} blob (UNNEST required) +-- +-- Source: data/landing/geonames/{year}/{month}/cities_global.{jsonl,json}.gz MODEL ( name staging.stg_population_geonames, @@ -12,7 +16,33 @@ MODEL ( grain geoname_id ); -WITH parsed AS ( +WITH +-- New format: one city per JSONL line +jsonl_rows AS ( + SELECT + TRY_CAST(geoname_id AS INTEGER) AS geoname_id, + city_name, + country_code, + TRY_CAST(lat AS DOUBLE) AS lat, + TRY_CAST(lon AS DOUBLE) AS lon, + admin1_code, + admin2_code, + TRY_CAST(population AS BIGINT) AS population, + TRY_CAST(ref_year AS INTEGER) AS ref_year, + CURRENT_DATE AS extracted_date + FROM read_json( + @LANDING_DIR || '/geonames/*/*/cities_global.jsonl.gz', + format = 'newline_delimited', + columns = { + geoname_id: 'INTEGER', city_name: 'VARCHAR', country_code: 'VARCHAR', + lat: 'DOUBLE', lon: 'DOUBLE', admin1_code: 'VARCHAR', admin2_code: 'VARCHAR', + population: 'BIGINT', ref_year: 'INTEGER' + } + ) + WHERE geoname_id IS NOT NULL +), +-- Old format: {"rows": [...]} blob — kept for transition +blob_rows AS ( SELECT TRY_CAST(row ->> 'geoname_id' AS INTEGER) AS geoname_id, row ->> 'city_name' AS city_name, @@ -33,11 +63,16 @@ WITH parsed AS ( ) ) WHERE (row ->> 'geoname_id') IS NOT NULL +), +all_rows AS ( + SELECT * FROM jsonl_rows + UNION ALL + SELECT * FROM blob_rows ) SELECT geoname_id, - TRIM(city_name) AS city_name, - UPPER(country_code) AS country_code, + TRIM(city_name) AS city_name, + UPPER(country_code) AS country_code, lat, lon, NULLIF(TRIM(admin1_code), '') AS admin1_code, @@ -45,7 +80,7 @@ SELECT population, ref_year, extracted_date -FROM parsed +FROM all_rows WHERE population IS NOT NULL AND population > 0 AND geoname_id IS NOT NULL