fix(extract): use tiered cycler in playtomic_tenants
Previously the tenants extractor flattened all proxy tiers into a single round-robin list, bypassing the circuit breaker entirely. When the free Webshare tier runs out of bandwidth (402), all 20 free proxies fail and the batch crashes — the paid datacenter/residential proxies are never tried. Changes: - Replace make_round_robin_cycler with make_tiered_cycler (same as availability) - Add _fetch_page_via_cycler: retries per page across tiers, records success/failure in cycler so circuit breaker can escalate - Fix batch_size to BATCH_SIZE=20 constant (was len(all_proxies) ≈ 22) - Check cycler.is_exhausted() before each batch; catch RuntimeError mid-batch and write partial results rather than crashing with nothing - CIRCUIT_BREAKER_THRESHOLD from env (default 10), matching availability Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -10,11 +10,11 @@ API notes (discovered 2026-02):
|
|||||||
- `size=100` is the maximum effective page size
|
- `size=100` is the maximum effective page size
|
||||||
- ~14K venues globally as of Feb 2026
|
- ~14K venues globally as of Feb 2026
|
||||||
|
|
||||||
Parallel mode: when PROXY_URLS is set, fires batch_size = len(proxy_urls)
|
Parallel mode: when proxy tiers are configured, fires BATCH_SIZE pages
|
||||||
pages concurrently. Each page gets its own fresh session + proxy. Pages beyond
|
concurrently. Each page gets its own fresh session + proxy from the tiered
|
||||||
the last one return empty lists (safe — just triggers the done condition).
|
cycler. On failure the cycler escalates through free → datacenter →
|
||||||
Without proxies, falls back to single-threaded with THROTTLE_SECONDS between
|
residential tiers. Without proxies, falls back to single-threaded with
|
||||||
pages.
|
THROTTLE_SECONDS between pages.
|
||||||
|
|
||||||
Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2).
|
Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2).
|
||||||
|
|
||||||
@@ -22,6 +22,7 @@ Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.jsonl.gz
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import time
|
import time
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
@@ -31,7 +32,7 @@ from pathlib import Path
|
|||||||
import niquests
|
import niquests
|
||||||
|
|
||||||
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
|
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
|
||||||
from .proxy import load_proxy_tiers, make_round_robin_cycler
|
from .proxy import load_proxy_tiers, make_tiered_cycler
|
||||||
from .utils import compress_jsonl_atomic, landing_path
|
from .utils import compress_jsonl_atomic, landing_path
|
||||||
|
|
||||||
logger = setup_logging("padelnomics.extract.playtomic_tenants")
|
logger = setup_logging("padelnomics.extract.playtomic_tenants")
|
||||||
@@ -42,6 +43,9 @@ PLAYTOMIC_TENANTS_URL = "https://api.playtomic.io/v1/tenants"
|
|||||||
THROTTLE_SECONDS = 2
|
THROTTLE_SECONDS = 2
|
||||||
PAGE_SIZE = 100
|
PAGE_SIZE = 100
|
||||||
MAX_PAGES = 500 # safety bound — ~50K venues max, well above current ~14K
|
MAX_PAGES = 500 # safety bound — ~50K venues max, well above current ~14K
|
||||||
|
BATCH_SIZE = 20 # concurrent pages per batch (fixed, independent of proxy count)
|
||||||
|
CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD") or "10")
|
||||||
|
MAX_PAGE_ATTEMPTS = 5 # max retries per individual page before giving up
|
||||||
|
|
||||||
|
|
||||||
def _fetch_one_page(proxy_url: str | None, page: int) -> tuple[int, list[dict]]:
|
def _fetch_one_page(proxy_url: str | None, page: int) -> tuple[int, list[dict]]:
|
||||||
@@ -61,22 +65,57 @@ def _fetch_one_page(proxy_url: str | None, page: int) -> tuple[int, list[dict]]:
|
|||||||
return (page, tenants)
|
return (page, tenants)
|
||||||
|
|
||||||
|
|
||||||
def _fetch_pages_parallel(pages: list[int], next_proxy) -> list[tuple[int, list[dict]]]:
|
def _fetch_page_via_cycler(cycler: dict, page: int) -> tuple[int, list[dict]]:
|
||||||
"""Fetch multiple pages concurrently. Returns [(page_num, tenants_list), ...]."""
|
"""Fetch a single page, retrying across proxy tiers via the circuit breaker.
|
||||||
|
|
||||||
|
On each attempt, pulls the next proxy from the active tier. Records
|
||||||
|
success/failure so the circuit breaker can escalate tiers. Raises
|
||||||
|
RuntimeError if all tiers are exhausted or MAX_PAGE_ATTEMPTS is exceeded.
|
||||||
|
"""
|
||||||
|
last_exc: Exception | None = None
|
||||||
|
for attempt in range(MAX_PAGE_ATTEMPTS):
|
||||||
|
proxy_url = cycler["next_proxy"]()
|
||||||
|
if proxy_url is None: # all tiers exhausted
|
||||||
|
raise RuntimeError(f"All proxy tiers exhausted fetching page {page}")
|
||||||
|
try:
|
||||||
|
result = _fetch_one_page(proxy_url, page)
|
||||||
|
cycler["record_success"]()
|
||||||
|
return result
|
||||||
|
except Exception as exc:
|
||||||
|
last_exc = exc
|
||||||
|
logger.warning(
|
||||||
|
"Page %d attempt %d/%d failed (proxy=%s): %s",
|
||||||
|
page,
|
||||||
|
attempt + 1,
|
||||||
|
MAX_PAGE_ATTEMPTS,
|
||||||
|
proxy_url,
|
||||||
|
exc,
|
||||||
|
)
|
||||||
|
cycler["record_failure"]()
|
||||||
|
if cycler["is_exhausted"]():
|
||||||
|
raise RuntimeError(f"All proxy tiers exhausted fetching page {page}") from exc
|
||||||
|
raise RuntimeError(f"Page {page} failed after {MAX_PAGE_ATTEMPTS} attempts") from last_exc
|
||||||
|
|
||||||
|
|
||||||
|
def _fetch_pages_parallel(pages: list[int], cycler: dict) -> list[tuple[int, list[dict]]]:
|
||||||
|
"""Fetch multiple pages concurrently using the tiered cycler.
|
||||||
|
|
||||||
|
Returns [(page_num, tenants_list), ...]. Raises if any page exhausts all tiers.
|
||||||
|
"""
|
||||||
with ThreadPoolExecutor(max_workers=len(pages)) as pool:
|
with ThreadPoolExecutor(max_workers=len(pages)) as pool:
|
||||||
futures = [pool.submit(_fetch_one_page, next_proxy(), p) for p in pages]
|
futures = [pool.submit(_fetch_page_via_cycler, cycler, p) for p in pages]
|
||||||
return [f.result() for f in as_completed(futures)]
|
return [f.result() for f in as_completed(futures)]
|
||||||
|
|
||||||
|
|
||||||
def extract(
|
def extract(
|
||||||
landing_dir: Path,
|
landing_dir: Path,
|
||||||
year_month: str, # noqa: ARG001 — unused; tenants uses ISO week partition instead
|
year_month: str, # noqa: ARG001 — unused; tenants uses daily partition instead
|
||||||
conn: sqlite3.Connection,
|
conn: sqlite3.Connection,
|
||||||
session: niquests.Session,
|
session: niquests.Session,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""Fetch all Playtomic venues via global pagination. Returns run metrics.
|
"""Fetch all Playtomic venues via global pagination. Returns run metrics.
|
||||||
|
|
||||||
Partitioned by ISO week (e.g. 2026/W09) so each weekly run produces a
|
Partitioned by day (e.g. 2026/03/01) so each daily run produces a
|
||||||
fresh file. _load_tenant_ids() in playtomic_availability globs across all
|
fresh file. _load_tenant_ids() in playtomic_availability globs across all
|
||||||
partitions and picks the most recent one.
|
partitions and picks the most recent one.
|
||||||
"""
|
"""
|
||||||
@@ -89,12 +128,16 @@ def extract(
|
|||||||
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||||
|
|
||||||
tiers = load_proxy_tiers()
|
tiers = load_proxy_tiers()
|
||||||
all_proxies = [url for tier in tiers for url in tier]
|
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD) if tiers else None
|
||||||
next_proxy = make_round_robin_cycler(all_proxies) if all_proxies else None
|
batch_size = BATCH_SIZE if cycler else 1
|
||||||
batch_size = len(all_proxies) if all_proxies else 1
|
|
||||||
|
|
||||||
if next_proxy:
|
if cycler:
|
||||||
logger.info("Parallel mode: %d pages per batch (%d proxies across %d tier(s))", batch_size, len(all_proxies), len(tiers))
|
logger.info(
|
||||||
|
"Parallel mode: %d pages/batch, %d tier(s), threshold=%d",
|
||||||
|
batch_size,
|
||||||
|
cycler["tier_count"](),
|
||||||
|
CIRCUIT_BREAKER_THRESHOLD,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logger.info("Serial mode: 1 page at a time (no proxies)")
|
logger.info("Serial mode: 1 page at a time (no proxies)")
|
||||||
|
|
||||||
@@ -104,15 +147,33 @@ def extract(
|
|||||||
done = False
|
done = False
|
||||||
|
|
||||||
while not done and page < MAX_PAGES:
|
while not done and page < MAX_PAGES:
|
||||||
|
if cycler and cycler["is_exhausted"]():
|
||||||
|
logger.error(
|
||||||
|
"All proxy tiers exhausted — stopping at page %d (%d venues collected)",
|
||||||
|
page,
|
||||||
|
len(all_tenants),
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
batch_end = min(page + batch_size, MAX_PAGES)
|
batch_end = min(page + batch_size, MAX_PAGES)
|
||||||
pages_to_fetch = list(range(page, batch_end))
|
pages_to_fetch = list(range(page, batch_end))
|
||||||
|
|
||||||
if next_proxy and len(pages_to_fetch) > 1:
|
if cycler and len(pages_to_fetch) > 1:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Fetching pages %d-%d in parallel (%d workers, total so far: %d)",
|
"Fetching pages %d-%d in parallel (%d workers, total so far: %d)",
|
||||||
page, batch_end - 1, len(pages_to_fetch), len(all_tenants),
|
page,
|
||||||
|
batch_end - 1,
|
||||||
|
len(pages_to_fetch),
|
||||||
|
len(all_tenants),
|
||||||
)
|
)
|
||||||
results = _fetch_pages_parallel(pages_to_fetch, next_proxy)
|
try:
|
||||||
|
results = _fetch_pages_parallel(pages_to_fetch, cycler)
|
||||||
|
except RuntimeError:
|
||||||
|
logger.error(
|
||||||
|
"Proxy tiers exhausted mid-batch — writing partial results (%d venues)",
|
||||||
|
len(all_tenants),
|
||||||
|
)
|
||||||
|
break
|
||||||
else:
|
else:
|
||||||
# Serial: reuse the shared session, throttle between pages
|
# Serial: reuse the shared session, throttle between pages
|
||||||
page_num = pages_to_fetch[0]
|
page_num = pages_to_fetch[0]
|
||||||
@@ -126,7 +187,7 @@ def extract(
|
|||||||
)
|
)
|
||||||
results = [(page_num, tenants)]
|
results = [(page_num, tenants)]
|
||||||
|
|
||||||
# Process pages in order so the done-detection on < PAGE_SIZE is deterministic
|
# Process pages in order so done-detection on < PAGE_SIZE is deterministic
|
||||||
for p, tenants in sorted(results):
|
for p, tenants in sorted(results):
|
||||||
new_count = 0
|
new_count = 0
|
||||||
for tenant in tenants:
|
for tenant in tenants:
|
||||||
@@ -137,7 +198,11 @@ def extract(
|
|||||||
new_count += 1
|
new_count += 1
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"page=%d got=%d new=%d total=%d", p, len(tenants), new_count, len(all_tenants),
|
"page=%d got=%d new=%d total=%d",
|
||||||
|
p,
|
||||||
|
len(tenants),
|
||||||
|
new_count,
|
||||||
|
len(all_tenants),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Last page — fewer than PAGE_SIZE results means we've exhausted the list
|
# Last page — fewer than PAGE_SIZE results means we've exhausted the list
|
||||||
@@ -146,7 +211,7 @@ def extract(
|
|||||||
break
|
break
|
||||||
|
|
||||||
page = batch_end
|
page = batch_end
|
||||||
if not next_proxy:
|
if not cycler:
|
||||||
time.sleep(THROTTLE_SECONDS)
|
time.sleep(THROTTLE_SECONDS)
|
||||||
|
|
||||||
# Write each tenant as a JSONL line, then compress atomically
|
# Write each tenant as a JSONL line, then compress atomically
|
||||||
|
|||||||
Reference in New Issue
Block a user