feat(extract): convert geonames to JSONL output
- cities_global.jsonl.gz replaces .json.gz (one city object per line)
- Empty placeholder writes a minimal .jsonl.gz (null row, filtered in staging)
- Eliminates the {"rows": [...]} blob wrapper and maximum_object_size workaround
stg_population_geonames: UNION ALL transition (jsonl_rows + blob_rows)
- jsonl_rows: read_json JSONL, explicit columns, no UNNEST
- blob_rows: existing UNNEST(rows) pattern with 40MB size limit retained
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -10,14 +10,14 @@ highest padel investment opportunity (white space markets).
|
|||||||
|
|
||||||
Requires: GEONAMES_USERNAME env var (free registration at geonames.org)
|
Requires: GEONAMES_USERNAME env var (free registration at geonames.org)
|
||||||
|
|
||||||
Landing: {LANDING_DIR}/geonames/{year}/{month}/cities_global.json.gz
|
Landing: {LANDING_DIR}/geonames/{year}/{month}/cities_global.jsonl.gz
|
||||||
Output: {"rows": [{"geoname_id": 2950159, "city_name": "Berlin",
|
Output: one JSON object per line, e.g.:
|
||||||
"country_code": "DE", "population": 3644826,
|
{"geoname_id": 2950159, "city_name": "Berlin", "country_code": "DE",
|
||||||
"lat": 52.524, "lon": 13.411,
|
"population": 3644826, "lat": 52.524, "lon": 13.411,
|
||||||
"admin1_code": "16", "admin2_code": "00",
|
"admin1_code": "16", "admin2_code": "00", "ref_year": 2024}
|
||||||
"ref_year": 2024}], "count": N}
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import gzip
|
||||||
import io
|
import io
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
@@ -28,7 +28,7 @@ from pathlib import Path
|
|||||||
import niquests
|
import niquests
|
||||||
|
|
||||||
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
|
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
|
||||||
from .utils import get_last_cursor, landing_path, write_gzip_atomic
|
from .utils import compress_jsonl_atomic, get_last_cursor, landing_path
|
||||||
|
|
||||||
logger = setup_logging("padelnomics.extract.geonames")
|
logger = setup_logging("padelnomics.extract.geonames")
|
||||||
|
|
||||||
@@ -131,9 +131,12 @@ def extract(
|
|||||||
logger.warning("GEONAMES_USERNAME not set — writing empty placeholder so SQLMesh models can run")
|
logger.warning("GEONAMES_USERNAME not set — writing empty placeholder so SQLMesh models can run")
|
||||||
year, month = year_month.split("/")
|
year, month = year_month.split("/")
|
||||||
dest_dir = landing_path(landing_dir, "geonames", year, month)
|
dest_dir = landing_path(landing_dir, "geonames", year, month)
|
||||||
dest = dest_dir / "cities_global.json.gz"
|
dest = dest_dir / "cities_global.jsonl.gz"
|
||||||
if not dest.exists():
|
if not dest.exists():
|
||||||
write_gzip_atomic(dest, b'{"rows": [], "count": 0}')
|
tmp = dest.with_suffix(".gz.tmp")
|
||||||
|
with gzip.open(tmp, "wt") as f:
|
||||||
|
f.write('{"geoname_id":null}\n') # filtered by WHERE geoname_id IS NOT NULL
|
||||||
|
tmp.rename(dest)
|
||||||
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||||
|
|
||||||
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
|
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
|
||||||
@@ -164,9 +167,12 @@ def extract(
|
|||||||
logger.info("parsed %d global locations (pop ≥1K)", len(rows))
|
logger.info("parsed %d global locations (pop ≥1K)", len(rows))
|
||||||
|
|
||||||
dest_dir = landing_path(landing_dir, "geonames", year, month)
|
dest_dir = landing_path(landing_dir, "geonames", year, month)
|
||||||
dest = dest_dir / "cities_global.json.gz"
|
dest = dest_dir / "cities_global.jsonl.gz"
|
||||||
payload = json.dumps({"rows": rows, "count": len(rows)}).encode()
|
working_path = dest.with_suffix(".working.jsonl")
|
||||||
bytes_written = write_gzip_atomic(dest, payload)
|
with open(working_path, "w") as f:
|
||||||
|
for row in rows:
|
||||||
|
f.write(json.dumps(row, separators=(",", ":")) + "\n")
|
||||||
|
bytes_written = compress_jsonl_atomic(working_path, dest)
|
||||||
logger.info("written %s bytes compressed", f"{bytes_written:,}")
|
logger.info("written %s bytes compressed", f"{bytes_written:,}")
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -3,7 +3,11 @@
|
|||||||
-- Broad coverage (140K+ locations) enables Gemeinde-level market intelligence.
|
-- Broad coverage (140K+ locations) enables Gemeinde-level market intelligence.
|
||||||
-- One row per geoname_id (GeoNames stable numeric identifier).
|
-- One row per geoname_id (GeoNames stable numeric identifier).
|
||||||
--
|
--
|
||||||
-- Source: data/landing/geonames/{year}/{month}/cities_global.json.gz
|
-- Supports two landing formats (UNION ALL during migration):
|
||||||
|
-- New: cities_global.jsonl.gz — one city per line, columns directly accessible
|
||||||
|
-- Old: cities_global.json.gz — {"rows": [...]} blob (UNNEST required)
|
||||||
|
--
|
||||||
|
-- Source: data/landing/geonames/{year}/{month}/cities_global.{jsonl,json}.gz
|
||||||
|
|
||||||
MODEL (
|
MODEL (
|
||||||
name staging.stg_population_geonames,
|
name staging.stg_population_geonames,
|
||||||
@@ -12,7 +16,33 @@ MODEL (
|
|||||||
grain geoname_id
|
grain geoname_id
|
||||||
);
|
);
|
||||||
|
|
||||||
WITH parsed AS (
|
WITH
|
||||||
|
-- New format: one city per JSONL line
|
||||||
|
jsonl_rows AS (
|
||||||
|
SELECT
|
||||||
|
TRY_CAST(geoname_id AS INTEGER) AS geoname_id,
|
||||||
|
city_name,
|
||||||
|
country_code,
|
||||||
|
TRY_CAST(lat AS DOUBLE) AS lat,
|
||||||
|
TRY_CAST(lon AS DOUBLE) AS lon,
|
||||||
|
admin1_code,
|
||||||
|
admin2_code,
|
||||||
|
TRY_CAST(population AS BIGINT) AS population,
|
||||||
|
TRY_CAST(ref_year AS INTEGER) AS ref_year,
|
||||||
|
CURRENT_DATE AS extracted_date
|
||||||
|
FROM read_json(
|
||||||
|
@LANDING_DIR || '/geonames/*/*/cities_global.jsonl.gz',
|
||||||
|
format = 'newline_delimited',
|
||||||
|
columns = {
|
||||||
|
geoname_id: 'INTEGER', city_name: 'VARCHAR', country_code: 'VARCHAR',
|
||||||
|
lat: 'DOUBLE', lon: 'DOUBLE', admin1_code: 'VARCHAR', admin2_code: 'VARCHAR',
|
||||||
|
population: 'BIGINT', ref_year: 'INTEGER'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
WHERE geoname_id IS NOT NULL
|
||||||
|
),
|
||||||
|
-- Old format: {"rows": [...]} blob — kept for transition
|
||||||
|
blob_rows AS (
|
||||||
SELECT
|
SELECT
|
||||||
TRY_CAST(row ->> 'geoname_id' AS INTEGER) AS geoname_id,
|
TRY_CAST(row ->> 'geoname_id' AS INTEGER) AS geoname_id,
|
||||||
row ->> 'city_name' AS city_name,
|
row ->> 'city_name' AS city_name,
|
||||||
@@ -33,11 +63,16 @@ WITH parsed AS (
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
WHERE (row ->> 'geoname_id') IS NOT NULL
|
WHERE (row ->> 'geoname_id') IS NOT NULL
|
||||||
|
),
|
||||||
|
all_rows AS (
|
||||||
|
SELECT * FROM jsonl_rows
|
||||||
|
UNION ALL
|
||||||
|
SELECT * FROM blob_rows
|
||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
geoname_id,
|
geoname_id,
|
||||||
TRIM(city_name) AS city_name,
|
TRIM(city_name) AS city_name,
|
||||||
UPPER(country_code) AS country_code,
|
UPPER(country_code) AS country_code,
|
||||||
lat,
|
lat,
|
||||||
lon,
|
lon,
|
||||||
NULLIF(TRIM(admin1_code), '') AS admin1_code,
|
NULLIF(TRIM(admin1_code), '') AS admin1_code,
|
||||||
@@ -45,7 +80,7 @@ SELECT
|
|||||||
population,
|
population,
|
||||||
ref_year,
|
ref_year,
|
||||||
extracted_date
|
extracted_date
|
||||||
FROM parsed
|
FROM all_rows
|
||||||
WHERE population IS NOT NULL
|
WHERE population IS NOT NULL
|
||||||
AND population > 0
|
AND population > 0
|
||||||
AND geoname_id IS NOT NULL
|
AND geoname_id IS NOT NULL
|
||||||
|
|||||||
Reference in New Issue
Block a user