From 567798ebe1914ff883cdb9031e4a0c3fecb1602a Mon Sep 17 00:00:00 2001 From: Deeman Date: Mon, 2 Mar 2026 07:49:18 +0100 Subject: [PATCH] feat(extract): add skip_if_current() and write_jsonl_atomic() helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Task 5/6: Compress repeated patterns in extractors: - skip_if_current(): cursor check + early-return dict (3 extractors) - write_jsonl_atomic(): working-file → JSONL → compress (2 extractors) Applied in gisco, geonames, census_usa, playtomic_tenants. Co-Authored-By: Claude Opus 4.6 --- .../src/padelnomics_extract/census_usa.py | 8 +++--- .../src/padelnomics_extract/geonames.py | 15 ++++------- .../src/padelnomics_extract/gisco.py | 8 +++--- .../padelnomics_extract/playtomic_tenants.py | 9 ++----- .../src/padelnomics_extract/utils.py | 27 +++++++++++++++++++ 5 files changed, 42 insertions(+), 25 deletions(-) diff --git a/extract/padelnomics_extract/src/padelnomics_extract/census_usa.py b/extract/padelnomics_extract/src/padelnomics_extract/census_usa.py index 91e35a8..3a5f49d 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/census_usa.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/census_usa.py @@ -19,7 +19,7 @@ from pathlib import Path import niquests from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging -from .utils import get_last_cursor, landing_path, write_gzip_atomic +from .utils import landing_path, skip_if_current, write_gzip_atomic logger = setup_logging("padelnomics.extract.census_usa") @@ -73,10 +73,10 @@ def extract( return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} # Skip if we already have data for this month (annual data, monthly cursor) - last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) - if last_cursor == year_month: + skip = skip_if_current(conn, EXTRACTOR_NAME, year_month) + if skip: logger.info("already have data for %s — skipping", year_month) - return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} + return skip year, month = year_month.split("/") url = f"{ACS_URL}&key={api_key}" diff --git a/extract/padelnomics_extract/src/padelnomics_extract/geonames.py b/extract/padelnomics_extract/src/padelnomics_extract/geonames.py index 0e83498..600af86 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/geonames.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/geonames.py @@ -19,7 +19,6 @@ Output: one JSON object per line, e.g.: import gzip import io -import json import os import sqlite3 import zipfile @@ -28,7 +27,7 @@ from pathlib import Path import niquests from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging -from .utils import compress_jsonl_atomic, get_last_cursor, landing_path +from .utils import landing_path, skip_if_current, write_jsonl_atomic logger = setup_logging("padelnomics.extract.geonames") @@ -139,10 +138,10 @@ def extract( tmp.rename(dest) return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} - last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) - if last_cursor == year_month: + skip = skip_if_current(conn, EXTRACTOR_NAME, year_month) + if skip: logger.info("already have data for %s — skipping", year_month) - return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} + return skip year, month = year_month.split("/") @@ -168,11 +167,7 @@ def extract( dest_dir = landing_path(landing_dir, "geonames", year, month) dest = dest_dir / "cities_global.jsonl.gz" - working_path = dest.with_suffix(".working.jsonl") - with open(working_path, "w") as f: - for row in rows: - f.write(json.dumps(row, separators=(",", ":")) + "\n") - bytes_written = compress_jsonl_atomic(working_path, dest) + bytes_written = write_jsonl_atomic(dest, rows) logger.info("written %s bytes compressed", f"{bytes_written:,}") return { diff --git a/extract/padelnomics_extract/src/padelnomics_extract/gisco.py b/extract/padelnomics_extract/src/padelnomics_extract/gisco.py index 8fd43fb..140d3a8 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/gisco.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/gisco.py @@ -17,7 +17,7 @@ from pathlib import Path import niquests from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging -from .utils import get_last_cursor +from .utils import skip_if_current logger = setup_logging("padelnomics.extract.gisco") @@ -45,10 +45,10 @@ def extract( session: niquests.Session, ) -> dict: """Download NUTS-2 GeoJSON. Skips if already run this month or file exists.""" - last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) - if last_cursor == year_month: + skip = skip_if_current(conn, EXTRACTOR_NAME, year_month) + if skip: logger.info("already ran for %s — skipping", year_month) - return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} + return skip dest = landing_dir / DEST_REL if dest.exists(): diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py index 277bdec..23af26a 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py @@ -21,7 +21,6 @@ Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2). Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.jsonl.gz """ -import json import os import sqlite3 import time @@ -33,7 +32,7 @@ import niquests from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy from .proxy import load_proxy_tiers, make_tiered_cycler -from .utils import compress_jsonl_atomic, landing_path +from .utils import landing_path, write_jsonl_atomic logger = setup_logging("padelnomics.extract.playtomic_tenants") @@ -215,11 +214,7 @@ def extract( time.sleep(THROTTLE_SECONDS) # Write each tenant as a JSONL line, then compress atomically - working_path = dest.with_suffix(".working.jsonl") - with open(working_path, "w") as f: - for tenant in all_tenants: - f.write(json.dumps(tenant, separators=(",", ":")) + "\n") - bytes_written = compress_jsonl_atomic(working_path, dest) + bytes_written = write_jsonl_atomic(dest, all_tenants) logger.info("%d unique venues -> %s", len(all_tenants), dest) return { diff --git a/extract/padelnomics_extract/src/padelnomics_extract/utils.py b/extract/padelnomics_extract/src/padelnomics_extract/utils.py index 451c365..25ce4b4 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/utils.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/utils.py @@ -101,6 +101,19 @@ def get_last_cursor(conn: sqlite3.Connection, extractor: str) -> str | None: return row["cursor_value"] if row else None +_SKIP_RESULT = {"files_written": 0, "files_skipped": 1, "bytes_written": 0} + + +def skip_if_current(conn: sqlite3.Connection, extractor: str, year_month: str) -> dict | None: + """Return an early-exit result dict if this extractor already ran for year_month. + + Returns None when the extractor should proceed with extraction. + """ + if get_last_cursor(conn, extractor) == year_month: + return _SKIP_RESULT + return None + + # --------------------------------------------------------------------------- # File I/O helpers # --------------------------------------------------------------------------- @@ -176,6 +189,20 @@ def write_gzip_atomic(path: Path, data: bytes) -> int: return len(compressed) +def write_jsonl_atomic(dest: Path, items: list[dict]) -> int: + """Write items as JSONL, then compress atomically to dest (.jsonl.gz). + + Compresses the working-file → JSONL → gzip pattern into one call. + Returns compressed bytes written. + """ + assert items, "items must not be empty" + working_path = dest.with_suffix(".working.jsonl") + with open(working_path, "w") as f: + for item in items: + f.write(json.dumps(item, separators=(",", ":")) + "\n") + return compress_jsonl_atomic(working_path, dest) + + def compress_jsonl_atomic(jsonl_path: Path, dest_path: Path) -> int: """Compress a JSONL working file to .jsonl.gz atomically, then delete the source.