fix(extract): use tiered cycler in playtomic_tenants

Previously the tenants extractor flattened all proxy tiers into a single
round-robin list, bypassing the circuit breaker entirely. When the free
Webshare tier runs out of bandwidth (402), all 20 free proxies fail and
the batch crashes — the paid datacenter/residential proxies are never tried.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-03-01 12:13:58 +01:00

View File

@@ -10,11 +10,11 @@ API notes (discovered 2026-02):
- `size=100` is the maximum effective page size - `size=100` is the maximum effective page size
- ~14K venues globally as of Feb 2026 - ~14K venues globally as of Feb 2026
Parallel mode: when PROXY_URLS is set, fires batch_size = len(proxy_urls) Parallel mode: when proxy tiers are configured, fires BATCH_SIZE pages
pages concurrently. Each page gets its own fresh session + proxy. Pages beyond concurrently. Each page gets its own fresh session + proxy from the tiered
the last one return empty lists (safe — just triggers the done condition). cycler. On failure the cycler escalates through free → datacenter →
Without proxies, falls back to single-threaded with THROTTLE_SECONDS between residential tiers. Without proxies, falls back to single-threaded with
pages. THROTTLE_SECONDS between pages.
Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2). Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2).
@@ -22,6 +22,7 @@ Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.jsonl.gz
""" """
import json import json
import os
import sqlite3 import sqlite3
import time import time
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -31,7 +32,7 @@ from pathlib import Path
import niquests import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
from .proxy import load_proxy_tiers, make_round_robin_cycler from .proxy import load_proxy_tiers, make_tiered_cycler
from .utils import compress_jsonl_atomic, landing_path from .utils import compress_jsonl_atomic, landing_path
logger = setup_logging("padelnomics.extract.playtomic_tenants") logger = setup_logging("padelnomics.extract.playtomic_tenants")
@@ -42,6 +43,9 @@ PLAYTOMIC_TENANTS_URL = "https://api.playtomic.io/v1/tenants"
THROTTLE_SECONDS = 2 THROTTLE_SECONDS = 2
PAGE_SIZE = 100 PAGE_SIZE = 100
MAX_PAGES = 500 # safety bound — ~50K venues max, well above current ~14K MAX_PAGES = 500 # safety bound — ~50K venues max, well above current ~14K
BATCH_SIZE = 20 # concurrent pages per batch (fixed, independent of proxy count)
CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD") or "10")
MAX_PAGE_ATTEMPTS = 5 # max retries per individual page before giving up
def _fetch_one_page(proxy_url: str | None, page: int) -> tuple[int, list[dict]]: def _fetch_one_page(proxy_url: str | None, page: int) -> tuple[int, list[dict]]:
@@ -61,22 +65,57 @@ def _fetch_one_page(proxy_url: str | None, page: int) -> tuple[int, list[dict]]:
return (page, tenants) return (page, tenants)
def _fetch_pages_parallel(pages: list[int], next_proxy) -> list[tuple[int, list[dict]]]: def _fetch_page_via_cycler(cycler: dict, page: int) -> tuple[int, list[dict]]:
"""Fetch multiple pages concurrently. Returns [(page_num, tenants_list), ...].""" """Fetch a single page, retrying across proxy tiers via the circuit breaker.
On each attempt, pulls the next proxy from the active tier. Records
success/failure so the circuit breaker can escalate tiers. Raises
RuntimeError if all tiers are exhausted or MAX_PAGE_ATTEMPTS is exceeded.
"""
last_exc: Exception | None = None
for attempt in range(MAX_PAGE_ATTEMPTS):
proxy_url = cycler["next_proxy"]()
if proxy_url is None: # all tiers exhausted
raise RuntimeError(f"All proxy tiers exhausted fetching page {page}")
try:
result = _fetch_one_page(proxy_url, page)
cycler["record_success"]()
return result
except Exception as exc:
last_exc = exc
logger.warning(
"Page %d attempt %d/%d failed (proxy=%s): %s",
page,
attempt + 1,
MAX_PAGE_ATTEMPTS,
proxy_url,
exc,
)
cycler["record_failure"]()
if cycler["is_exhausted"]():
raise RuntimeError(f"All proxy tiers exhausted fetching page {page}") from exc
raise RuntimeError(f"Page {page} failed after {MAX_PAGE_ATTEMPTS} attempts") from last_exc
def _fetch_pages_parallel(pages: list[int], cycler: dict) -> list[tuple[int, list[dict]]]:
"""Fetch multiple pages concurrently using the tiered cycler.
Returns [(page_num, tenants_list), ...]. Raises if any page exhausts all tiers.
"""
with ThreadPoolExecutor(max_workers=len(pages)) as pool: with ThreadPoolExecutor(max_workers=len(pages)) as pool:
futures = [pool.submit(_fetch_one_page, next_proxy(), p) for p in pages] futures = [pool.submit(_fetch_page_via_cycler, cycler, p) for p in pages]
return [f.result() for f in as_completed(futures)] return [f.result() for f in as_completed(futures)]
def extract( def extract(
landing_dir: Path, landing_dir: Path,
year_month: str, # noqa: ARG001 — unused; tenants uses ISO week partition instead year_month: str, # noqa: ARG001 — unused; tenants uses daily partition instead
conn: sqlite3.Connection, conn: sqlite3.Connection,
session: niquests.Session, session: niquests.Session,
) -> dict: ) -> dict:
"""Fetch all Playtomic venues via global pagination. Returns run metrics. """Fetch all Playtomic venues via global pagination. Returns run metrics.
Partitioned by ISO week (e.g. 2026/W09) so each weekly run produces a Partitioned by day (e.g. 2026/03/01) so each daily run produces a
fresh file. _load_tenant_ids() in playtomic_availability globs across all fresh file. _load_tenant_ids() in playtomic_availability globs across all
partitions and picks the most recent one. partitions and picks the most recent one.
""" """
@@ -89,12 +128,16 @@ def extract(
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
tiers = load_proxy_tiers() tiers = load_proxy_tiers()
all_proxies = [url for tier in tiers for url in tier] cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD) if tiers else None
next_proxy = make_round_robin_cycler(all_proxies) if all_proxies else None batch_size = BATCH_SIZE if cycler else 1
batch_size = len(all_proxies) if all_proxies else 1
if next_proxy: if cycler:
logger.info("Parallel mode: %d pages per batch (%d proxies across %d tier(s))", batch_size, len(all_proxies), len(tiers)) logger.info(
"Parallel mode: %d pages/batch, %d tier(s), threshold=%d",
batch_size,
cycler["tier_count"](),
CIRCUIT_BREAKER_THRESHOLD,
)
else: else:
logger.info("Serial mode: 1 page at a time (no proxies)") logger.info("Serial mode: 1 page at a time (no proxies)")
@@ -104,15 +147,33 @@ def extract(
done = False done = False
while not done and page < MAX_PAGES: while not done and page < MAX_PAGES:
if cycler and cycler["is_exhausted"]():
logger.error(
"All proxy tiers exhausted — stopping at page %d (%d venues collected)",
page,
len(all_tenants),
)
break
batch_end = min(page + batch_size, MAX_PAGES) batch_end = min(page + batch_size, MAX_PAGES)
pages_to_fetch = list(range(page, batch_end)) pages_to_fetch = list(range(page, batch_end))
if next_proxy and len(pages_to_fetch) > 1: if cycler and len(pages_to_fetch) > 1:
logger.info( logger.info(
"Fetching pages %d-%d in parallel (%d workers, total so far: %d)", "Fetching pages %d-%d in parallel (%d workers, total so far: %d)",
page, batch_end - 1, len(pages_to_fetch), len(all_tenants), page,
batch_end - 1,
len(pages_to_fetch),
len(all_tenants),
) )
results = _fetch_pages_parallel(pages_to_fetch, next_proxy) try:
results = _fetch_pages_parallel(pages_to_fetch, cycler)
except RuntimeError:
logger.error(
"Proxy tiers exhausted mid-batch — writing partial results (%d venues)",
len(all_tenants),
)
break
else: else:
# Serial: reuse the shared session, throttle between pages # Serial: reuse the shared session, throttle between pages
page_num = pages_to_fetch[0] page_num = pages_to_fetch[0]
@@ -126,7 +187,7 @@ def extract(
) )
results = [(page_num, tenants)] results = [(page_num, tenants)]
# Process pages in order so the done-detection on < PAGE_SIZE is deterministic # Process pages in order so done-detection on < PAGE_SIZE is deterministic
for p, tenants in sorted(results): for p, tenants in sorted(results):
new_count = 0 new_count = 0
for tenant in tenants: for tenant in tenants:
@@ -137,7 +198,11 @@ def extract(
new_count += 1 new_count += 1
logger.info( logger.info(
"page=%d got=%d new=%d total=%d", p, len(tenants), new_count, len(all_tenants), "page=%d got=%d new=%d total=%d",
p,
len(tenants),
new_count,
len(all_tenants),
) )
# Last page — fewer than PAGE_SIZE results means we've exhausted the list # Last page — fewer than PAGE_SIZE results means we've exhausted the list
@@ -146,7 +211,7 @@ def extract(
break break
page = batch_end page = batch_end
if not next_proxy: if not cycler:
time.sleep(THROTTLE_SECONDS) time.sleep(THROTTLE_SECONDS)
# Write each tenant as a JSONL line, then compress atomically # Write each tenant as a JSONL line, then compress atomically