From 1aedf78ec68ebd7371584106e2db0a2cf76be2f3 Mon Sep 17 00:00:00 2001 From: Deeman Date: Sun, 1 Mar 2026 12:13:50 +0100 Subject: [PATCH] fix(extract): use tiered cycler in playtomic_tenants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the tenants extractor flattened all proxy tiers into a single round-robin list, bypassing the circuit breaker entirely. When the free Webshare tier runs out of bandwidth (402), all 20 free proxies fail and the batch crashes — the paid datacenter/residential proxies are never tried. Changes: - Replace make_round_robin_cycler with make_tiered_cycler (same as availability) - Add _fetch_page_via_cycler: retries per page across tiers, records success/failure in cycler so circuit breaker can escalate - Fix batch_size to BATCH_SIZE=20 constant (was len(all_proxies) ≈ 22) - Check cycler.is_exhausted() before each batch; catch RuntimeError mid-batch and write partial results rather than crashing with nothing - CIRCUIT_BREAKER_THRESHOLD from env (default 10), matching availability Co-Authored-By: Claude Sonnet 4.6 --- .../padelnomics_extract/playtomic_tenants.py | 109 ++++++++++++++---- 1 file changed, 87 insertions(+), 22 deletions(-) diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py index e09102b..c16e976 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py @@ -10,11 +10,11 @@ API notes (discovered 2026-02): - `size=100` is the maximum effective page size - ~14K venues globally as of Feb 2026 -Parallel mode: when PROXY_URLS is set, fires batch_size = len(proxy_urls) -pages concurrently. Each page gets its own fresh session + proxy. Pages beyond -the last one return empty lists (safe — just triggers the done condition). -Without proxies, falls back to single-threaded with THROTTLE_SECONDS between -pages. +Parallel mode: when proxy tiers are configured, fires BATCH_SIZE pages +concurrently. Each page gets its own fresh session + proxy from the tiered +cycler. On failure the cycler escalates through free → datacenter → +residential tiers. Without proxies, falls back to single-threaded with +THROTTLE_SECONDS between pages. Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2). @@ -22,6 +22,7 @@ Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.jsonl.gz """ import json +import os import sqlite3 import time from concurrent.futures import ThreadPoolExecutor, as_completed @@ -31,7 +32,7 @@ from pathlib import Path import niquests from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy -from .proxy import load_proxy_tiers, make_round_robin_cycler +from .proxy import load_proxy_tiers, make_tiered_cycler from .utils import compress_jsonl_atomic, landing_path logger = setup_logging("padelnomics.extract.playtomic_tenants") @@ -42,6 +43,9 @@ PLAYTOMIC_TENANTS_URL = "https://api.playtomic.io/v1/tenants" THROTTLE_SECONDS = 2 PAGE_SIZE = 100 MAX_PAGES = 500 # safety bound — ~50K venues max, well above current ~14K +BATCH_SIZE = 20 # concurrent pages per batch (fixed, independent of proxy count) +CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD") or "10") +MAX_PAGE_ATTEMPTS = 5 # max retries per individual page before giving up def _fetch_one_page(proxy_url: str | None, page: int) -> tuple[int, list[dict]]: @@ -61,22 +65,57 @@ def _fetch_one_page(proxy_url: str | None, page: int) -> tuple[int, list[dict]]: return (page, tenants) -def _fetch_pages_parallel(pages: list[int], next_proxy) -> list[tuple[int, list[dict]]]: - """Fetch multiple pages concurrently. Returns [(page_num, tenants_list), ...].""" +def _fetch_page_via_cycler(cycler: dict, page: int) -> tuple[int, list[dict]]: + """Fetch a single page, retrying across proxy tiers via the circuit breaker. + + On each attempt, pulls the next proxy from the active tier. Records + success/failure so the circuit breaker can escalate tiers. Raises + RuntimeError if all tiers are exhausted or MAX_PAGE_ATTEMPTS is exceeded. + """ + last_exc: Exception | None = None + for attempt in range(MAX_PAGE_ATTEMPTS): + proxy_url = cycler["next_proxy"]() + if proxy_url is None: # all tiers exhausted + raise RuntimeError(f"All proxy tiers exhausted fetching page {page}") + try: + result = _fetch_one_page(proxy_url, page) + cycler["record_success"]() + return result + except Exception as exc: + last_exc = exc + logger.warning( + "Page %d attempt %d/%d failed (proxy=%s): %s", + page, + attempt + 1, + MAX_PAGE_ATTEMPTS, + proxy_url, + exc, + ) + cycler["record_failure"]() + if cycler["is_exhausted"](): + raise RuntimeError(f"All proxy tiers exhausted fetching page {page}") from exc + raise RuntimeError(f"Page {page} failed after {MAX_PAGE_ATTEMPTS} attempts") from last_exc + + +def _fetch_pages_parallel(pages: list[int], cycler: dict) -> list[tuple[int, list[dict]]]: + """Fetch multiple pages concurrently using the tiered cycler. + + Returns [(page_num, tenants_list), ...]. Raises if any page exhausts all tiers. + """ with ThreadPoolExecutor(max_workers=len(pages)) as pool: - futures = [pool.submit(_fetch_one_page, next_proxy(), p) for p in pages] + futures = [pool.submit(_fetch_page_via_cycler, cycler, p) for p in pages] return [f.result() for f in as_completed(futures)] def extract( landing_dir: Path, - year_month: str, # noqa: ARG001 — unused; tenants uses ISO week partition instead + year_month: str, # noqa: ARG001 — unused; tenants uses daily partition instead conn: sqlite3.Connection, session: niquests.Session, ) -> dict: """Fetch all Playtomic venues via global pagination. Returns run metrics. - Partitioned by ISO week (e.g. 2026/W09) so each weekly run produces a + Partitioned by day (e.g. 2026/03/01) so each daily run produces a fresh file. _load_tenant_ids() in playtomic_availability globs across all partitions and picks the most recent one. """ @@ -89,12 +128,16 @@ def extract( return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} tiers = load_proxy_tiers() - all_proxies = [url for tier in tiers for url in tier] - next_proxy = make_round_robin_cycler(all_proxies) if all_proxies else None - batch_size = len(all_proxies) if all_proxies else 1 + cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD) if tiers else None + batch_size = BATCH_SIZE if cycler else 1 - if next_proxy: - logger.info("Parallel mode: %d pages per batch (%d proxies across %d tier(s))", batch_size, len(all_proxies), len(tiers)) + if cycler: + logger.info( + "Parallel mode: %d pages/batch, %d tier(s), threshold=%d", + batch_size, + cycler["tier_count"](), + CIRCUIT_BREAKER_THRESHOLD, + ) else: logger.info("Serial mode: 1 page at a time (no proxies)") @@ -104,15 +147,33 @@ def extract( done = False while not done and page < MAX_PAGES: + if cycler and cycler["is_exhausted"](): + logger.error( + "All proxy tiers exhausted — stopping at page %d (%d venues collected)", + page, + len(all_tenants), + ) + break + batch_end = min(page + batch_size, MAX_PAGES) pages_to_fetch = list(range(page, batch_end)) - if next_proxy and len(pages_to_fetch) > 1: + if cycler and len(pages_to_fetch) > 1: logger.info( "Fetching pages %d-%d in parallel (%d workers, total so far: %d)", - page, batch_end - 1, len(pages_to_fetch), len(all_tenants), + page, + batch_end - 1, + len(pages_to_fetch), + len(all_tenants), ) - results = _fetch_pages_parallel(pages_to_fetch, next_proxy) + try: + results = _fetch_pages_parallel(pages_to_fetch, cycler) + except RuntimeError: + logger.error( + "Proxy tiers exhausted mid-batch — writing partial results (%d venues)", + len(all_tenants), + ) + break else: # Serial: reuse the shared session, throttle between pages page_num = pages_to_fetch[0] @@ -126,7 +187,7 @@ def extract( ) results = [(page_num, tenants)] - # Process pages in order so the done-detection on < PAGE_SIZE is deterministic + # Process pages in order so done-detection on < PAGE_SIZE is deterministic for p, tenants in sorted(results): new_count = 0 for tenant in tenants: @@ -137,7 +198,11 @@ def extract( new_count += 1 logger.info( - "page=%d got=%d new=%d total=%d", p, len(tenants), new_count, len(all_tenants), + "page=%d got=%d new=%d total=%d", + p, + len(tenants), + new_count, + len(all_tenants), ) # Last page — fewer than PAGE_SIZE results means we've exhausted the list @@ -146,7 +211,7 @@ def extract( break page = batch_end - if not next_proxy: + if not cycler: time.sleep(THROTTLE_SECONDS) # Write each tenant as a JSONL line, then compress atomically