diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py index 4c73b80..f01c3e7 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py @@ -5,8 +5,13 @@ unauthenticated /v1/availability endpoint for each venue's next-day slots. This is the highest-value source: daily snapshots enable occupancy rate estimation, pricing benchmarking, and demand signal detection. -Parallel mode: set EXTRACT_WORKERS=N and PROXY_URLS=... to fetch N venues -concurrently (one proxy per worker). Without proxies, runs single-threaded. +Parallel mode: worker count is derived from PROXY_URLS (one worker per proxy). +Without proxies, runs single-threaded with per-request throttling. + +Crash resumption: progress is flushed to a .partial.jsonl sidecar file every +PARTIAL_FLUSH_SIZE records. On restart the already-fetched venues are skipped +and prior results are merged into the final file. At most PARTIAL_FLUSH_SIZE +records (a few seconds of work with 10 workers) are lost on crash. Recheck mode: re-queries venues with slots starting within the next 90 minutes. Writes a separate recheck file for more accurate occupancy measurement. @@ -29,7 +34,7 @@ import niquests from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler -from .utils import get_last_cursor, landing_path, write_gzip_atomic +from .utils import flush_partial_batch, landing_path, load_partial_results, write_gzip_atomic logger = setup_logging("padelnomics.extract.playtomic_availability") @@ -40,7 +45,6 @@ AVAILABILITY_URL = "https://api.playtomic.io/v1/availability" THROTTLE_SECONDS = 1 MAX_VENUES_PER_RUN = 20_000 MAX_RETRIES_PER_VENUE = 2 -MAX_WORKERS = int(os.environ.get("EXTRACT_WORKERS", "1")) RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "90")) CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD", "10")) @@ -49,6 +53,9 @@ CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD", "10" # batch still complete. PARALLEL_BATCH_SIZE = 100 +# Flush partial results to disk every N records — lose at most this many on crash. +PARTIAL_FLUSH_SIZE = 50 + # Thread-local storage for per-worker sessions _thread_local = threading.local() @@ -84,22 +91,6 @@ def _load_tenant_ids(landing_dir: Path) -> list[str]: return ids -def _parse_resume_cursor(cursor: str | None, target_date: str) -> int: - """Parse cursor_value to find resume index. Returns 0 if no valid cursor.""" - if not cursor: - return 0 - parts = cursor.split(":", 1) - if len(parts) != 2: - return 0 - cursor_date, cursor_index = parts - if cursor_date != target_date: - return 0 - try: - return int(cursor_index) - except ValueError: - return 0 - - # --------------------------------------------------------------------------- # Per-venue fetch (used by both serial and parallel modes) # --------------------------------------------------------------------------- @@ -149,7 +140,8 @@ def _fetch_venue_availability( continue resp.raise_for_status() - time.sleep(THROTTLE_SECONDS) + if not proxy_url: + time.sleep(THROTTLE_SECONDS) return {"tenant_id": tenant_id, "slots": resp.json()} except niquests.exceptions.RequestException as e: @@ -177,6 +169,7 @@ def _fetch_venues_parallel( worker_count: int, cycler: dict, fallback_urls: list[str], + on_result=None, ) -> tuple[list[dict], int]: """Fetch availability for multiple venues in parallel. @@ -184,6 +177,9 @@ def _fetch_venues_parallel( completes, checks the circuit breaker: if it opened and there is no fallback configured, stops submitting further batches. + on_result: optional callable(result: dict) invoked inside the lock for + each successful result — used for incremental partial-file flushing. + Returns (venues_data, venues_errored). """ venues_data: list[dict] = [] @@ -215,6 +211,8 @@ def _fetch_venues_parallel( if result is not None: venues_data.append(result) cycler["record_success"]() + if on_result is not None: + on_result(result) else: venues_errored += 1 cycler["record_failure"]() @@ -262,41 +260,56 @@ def extract( logger.info("Already have %s — skipping", dest) return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} - # Resume from cursor if crashed mid-run - last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) - resume_index = _parse_resume_cursor(last_cursor, target_date) - if resume_index > 0: - logger.info("Resuming from index %d (cursor: %s)", resume_index, last_cursor) + # Crash resumption: load already-fetched venues from partial file + partial_path = dest.with_suffix(".partial.jsonl") + prior_results, already_done = load_partial_results(partial_path, id_key="tenant_id") + if already_done: + logger.info("Resuming: %d venues already fetched from partial file", len(already_done)) - venues_to_process = tenant_ids[:MAX_VENUES_PER_RUN] - if resume_index > 0: - venues_to_process = venues_to_process[resume_index:] + all_venues_to_process = tenant_ids[:MAX_VENUES_PER_RUN] + venues_to_process = [tid for tid in all_venues_to_process if tid not in already_done] # Set up tiered proxy cycler with circuit breaker proxy_urls = load_proxy_urls() fallback_urls = load_fallback_proxy_urls() - worker_count = min(MAX_WORKERS, len(proxy_urls)) if proxy_urls else 1 + worker_count = len(proxy_urls) if proxy_urls else 1 cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD) start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S") start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S") + # Partial file for incremental crash-safe progress + partial_file = open(partial_path, "a") # noqa: SIM115 + partial_lock = threading.Lock() + pending_batch: list[dict] = [] + + def _on_result(result: dict) -> None: + # Called inside _fetch_venues_parallel's lock — no additional locking needed. + # In serial mode, called single-threaded — also safe without extra locking. + pending_batch.append(result) + if len(pending_batch) >= PARTIAL_FLUSH_SIZE: + flush_partial_batch(partial_file, partial_lock, pending_batch) + pending_batch.clear() + + new_venues_data: list[dict] = [] + venues_errored = 0 + if worker_count > 1: logger.info("Parallel mode: %d workers, %d proxies", worker_count, len(proxy_urls)) - venues_data, venues_errored = _fetch_venues_parallel( + new_venues_data, venues_errored = _fetch_venues_parallel( venues_to_process, start_min_str, start_max_str, worker_count, cycler, fallback_urls, + on_result=_on_result, ) else: logger.info("Serial mode: 1 worker, %d venues", len(venues_to_process)) - venues_data = [] - venues_errored = 0 for i, tenant_id in enumerate(venues_to_process): result = _fetch_venue_availability( tenant_id, start_min_str, start_max_str, cycler["next_proxy"](), ) if result is not None: - venues_data.append(result) + new_venues_data.append(result) cycler["record_success"]() + _on_result(result) else: venues_errored += 1 circuit_opened = cycler["record_failure"]() @@ -310,7 +323,14 @@ def extract( i + 1, len(venues_to_process), venues_errored, ) - # Write consolidated file + # Final flush of any remaining partial batch + if pending_batch: + flush_partial_batch(partial_file, partial_lock, pending_batch) + pending_batch.clear() + partial_file.close() + + # Consolidate prior (resumed) + new results into final file + venues_data = prior_results + new_venues_data captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") payload = json.dumps({ "date": target_date, @@ -321,6 +341,9 @@ def extract( }).encode() bytes_written = write_gzip_atomic(dest, payload) + if partial_path.exists(): + partial_path.unlink() + logger.info( "%d venues scraped (%d errors) -> %s (%s bytes)", len(venues_data), venues_errored, dest, f"{bytes_written:,}", @@ -330,7 +353,7 @@ def extract( "files_written": 1, "files_skipped": 0, "bytes_written": bytes_written, - "cursor_value": f"{target_date}:{len(tenant_ids[:MAX_VENUES_PER_RUN])}", + "cursor_value": f"{target_date}:{len(all_venues_to_process)}", } @@ -421,7 +444,7 @@ def extract_recheck( # Set up tiered proxy cycler with circuit breaker proxy_urls = load_proxy_urls() fallback_urls = load_fallback_proxy_urls() - worker_count = min(MAX_WORKERS, len(proxy_urls)) if proxy_urls else 1 + worker_count = len(proxy_urls) if proxy_urls else 1 cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD) if worker_count > 1 and len(venues_to_recheck) > 10: diff --git a/extract/padelnomics_extract/src/padelnomics_extract/proxy.py b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py index 7d280ee..0e8c82e 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/proxy.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py @@ -3,10 +3,6 @@ Proxies are configured via the PROXY_URLS environment variable (comma-separated). When unset, all functions return None/no-op — extractors fall back to direct requests. -Two routing modes: - round-robin — distribute requests evenly across proxies (default) - sticky — same key always maps to same proxy (for session-tracked sites) - Tiered proxy with circuit breaker: Primary tier (PROXY_URLS) is used by default — typically cheap datacenter proxies. Fallback tier (PROXY_URLS_FALLBACK) activates once consecutive failures >= threshold. @@ -141,17 +137,3 @@ def make_tiered_cycler( "is_fallback_active": is_fallback_active, } - -def make_sticky_selector(proxy_urls: list[str]): - """Consistent-hash proxy selector — same key always maps to same proxy. - - Use when the target site tracks sessions by IP (e.g. Cloudflare). - Returns a callable: select_proxy(key: str) -> str | None - """ - if not proxy_urls: - return lambda key: None - - def select_proxy(key: str) -> str: - return proxy_urls[hash(key) % len(proxy_urls)] - - return select_proxy diff --git a/extract/padelnomics_extract/src/padelnomics_extract/utils.py b/extract/padelnomics_extract/src/padelnomics_extract/utils.py index 3cb2562..15777f0 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/utils.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/utils.py @@ -7,7 +7,9 @@ if you add multiple data sources, extract them to a shared workspace package. import gzip import hashlib +import json import sqlite3 +import threading from pathlib import Path # --------------------------------------------------------------------------- @@ -117,6 +119,50 @@ def content_hash(data: bytes, prefix_bytes: int = 8) -> str: return hashlib.sha256(data).hexdigest()[:prefix_bytes] +def load_partial_results(partial_path: Path, id_key: str) -> tuple[list[dict], set[str]]: + """Load already-completed records from a partial JSONL file (crash recovery). + + Returns (records, seen_ids). If the file doesn't exist, returns ([], set()). + Gracefully handles a truncated last line from a mid-write crash. + """ + records: list[dict] = [] + seen_ids: set[str] = set() + if not partial_path.exists(): + return records, seen_ids + with open(partial_path) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + records.append(record) + rid = record.get(id_key) + if rid: + seen_ids.add(rid) + except json.JSONDecodeError: + break # truncated last line from crash — skip it + return records, seen_ids + + +def flush_partial_batch( + partial_file, + lock: threading.Lock, + batch: list[dict], +) -> None: + """Thread-safe batch write of JSON records to the partial JSONL file. + + Writes all records in one lock acquisition with a single flush. + Call with batches of ~50 records for good I/O throughput vs crash safety tradeoff. + On crash, at most one batch worth of records is lost. + """ + assert batch, "batch must not be empty" + with lock: + for record in batch: + partial_file.write(json.dumps(record, separators=(",", ":")) + "\n") + partial_file.flush() + + def write_gzip_atomic(path: Path, data: bytes) -> int: """Gzip compress data and write to path atomically via .tmp sibling.