feat(extract): add skip_if_current() and write_jsonl_atomic() helpers

Task 5/6: Compress repeated patterns in extractors:
- skip_if_current(): cursor check + early-return dict (3 extractors)
- write_jsonl_atomic(): working-file → JSONL → compress (2 extractors)
Applied in gisco, geonames, census_usa, playtomic_tenants.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-03-02 07:49:18 +01:00
parent 6774254cb0
commit 567798ebe1
5 changed files with 42 additions and 25 deletions

View File

@@ -19,7 +19,7 @@ from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import get_last_cursor, landing_path, write_gzip_atomic
from .utils import landing_path, skip_if_current, write_gzip_atomic
logger = setup_logging("padelnomics.extract.census_usa")
@@ -73,10 +73,10 @@ def extract(
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
# Skip if we already have data for this month (annual data, monthly cursor)
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
if last_cursor == year_month:
skip = skip_if_current(conn, EXTRACTOR_NAME, year_month)
if skip:
logger.info("already have data for %s — skipping", year_month)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
return skip
year, month = year_month.split("/")
url = f"{ACS_URL}&key={api_key}"

View File

@@ -19,7 +19,6 @@ Output: one JSON object per line, e.g.:
import gzip
import io
import json
import os
import sqlite3
import zipfile
@@ -28,7 +27,7 @@ from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import compress_jsonl_atomic, get_last_cursor, landing_path
from .utils import landing_path, skip_if_current, write_jsonl_atomic
logger = setup_logging("padelnomics.extract.geonames")
@@ -139,10 +138,10 @@ def extract(
tmp.rename(dest)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
if last_cursor == year_month:
skip = skip_if_current(conn, EXTRACTOR_NAME, year_month)
if skip:
logger.info("already have data for %s — skipping", year_month)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
return skip
year, month = year_month.split("/")
@@ -168,11 +167,7 @@ def extract(
dest_dir = landing_path(landing_dir, "geonames", year, month)
dest = dest_dir / "cities_global.jsonl.gz"
working_path = dest.with_suffix(".working.jsonl")
with open(working_path, "w") as f:
for row in rows:
f.write(json.dumps(row, separators=(",", ":")) + "\n")
bytes_written = compress_jsonl_atomic(working_path, dest)
bytes_written = write_jsonl_atomic(dest, rows)
logger.info("written %s bytes compressed", f"{bytes_written:,}")
return {

View File

@@ -17,7 +17,7 @@ from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import get_last_cursor
from .utils import skip_if_current
logger = setup_logging("padelnomics.extract.gisco")
@@ -45,10 +45,10 @@ def extract(
session: niquests.Session,
) -> dict:
"""Download NUTS-2 GeoJSON. Skips if already run this month or file exists."""
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
if last_cursor == year_month:
skip = skip_if_current(conn, EXTRACTOR_NAME, year_month)
if skip:
logger.info("already ran for %s — skipping", year_month)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
return skip
dest = landing_dir / DEST_REL
if dest.exists():

View File

@@ -21,7 +21,6 @@ Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2).
Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.jsonl.gz
"""
import json
import os
import sqlite3
import time
@@ -33,7 +32,7 @@ import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
from .proxy import load_proxy_tiers, make_tiered_cycler
from .utils import compress_jsonl_atomic, landing_path
from .utils import landing_path, write_jsonl_atomic
logger = setup_logging("padelnomics.extract.playtomic_tenants")
@@ -215,11 +214,7 @@ def extract(
time.sleep(THROTTLE_SECONDS)
# Write each tenant as a JSONL line, then compress atomically
working_path = dest.with_suffix(".working.jsonl")
with open(working_path, "w") as f:
for tenant in all_tenants:
f.write(json.dumps(tenant, separators=(",", ":")) + "\n")
bytes_written = compress_jsonl_atomic(working_path, dest)
bytes_written = write_jsonl_atomic(dest, all_tenants)
logger.info("%d unique venues -> %s", len(all_tenants), dest)
return {

View File

@@ -101,6 +101,19 @@ def get_last_cursor(conn: sqlite3.Connection, extractor: str) -> str | None:
return row["cursor_value"] if row else None
_SKIP_RESULT = {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
def skip_if_current(conn: sqlite3.Connection, extractor: str, year_month: str) -> dict | None:
"""Return an early-exit result dict if this extractor already ran for year_month.
Returns None when the extractor should proceed with extraction.
"""
if get_last_cursor(conn, extractor) == year_month:
return _SKIP_RESULT
return None
# ---------------------------------------------------------------------------
# File I/O helpers
# ---------------------------------------------------------------------------
@@ -176,6 +189,20 @@ def write_gzip_atomic(path: Path, data: bytes) -> int:
return len(compressed)
def write_jsonl_atomic(dest: Path, items: list[dict]) -> int:
"""Write items as JSONL, then compress atomically to dest (.jsonl.gz).
Compresses the working-file → JSONL → gzip pattern into one call.
Returns compressed bytes written.
"""
assert items, "items must not be empty"
working_path = dest.with_suffix(".working.jsonl")
with open(working_path, "w") as f:
for item in items:
f.write(json.dumps(item, separators=(",", ":")) + "\n")
return compress_jsonl_atomic(working_path, dest)
def compress_jsonl_atomic(jsonl_path: Path, dest_path: Path) -> int:
"""Compress a JSONL working file to .jsonl.gz atomically, then delete the source.