perf(extract): parallel page fetching in tenants, drop EXTRACT_WORKERS env var

- playtomic_tenants.py: batch_size = len(proxy_urls) pages fired in parallel per
  batch; each page gets its own session + proxy; sorted(results) ensures
  deterministic done-detection; falls back to serial + THROTTLE_SECONDS when no
  proxies. Expected speedup: ~2.5 min → ~15 s with 10 proxies.
- .env.dev.sops, .env.prod.sops: remove EXTRACT_WORKERS (now derived from
  PROXY_URLS length)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-24 22:30:28 +01:00
parent 6116445b56
commit 9f010d8c0c
3 changed files with 192 additions and 140 deletions

View File

@@ -10,7 +10,13 @@ API notes (discovered 2026-02):
- `size=100` is the maximum effective page size
- ~14K venues globally as of Feb 2026
Rate: 1 req / 2 s (see docs/data-sources-inventory.md §1.2).
Parallel mode: when PROXY_URLS is set, fires batch_size = len(proxy_urls)
pages concurrently. Each page gets its own fresh session + proxy. Pages beyond
the last one return empty lists (safe — just triggers the done condition).
Without proxies, falls back to single-threaded with THROTTLE_SECONDS between
pages.
Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2).
Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.json.gz
"""
@@ -18,11 +24,13 @@ Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.json.gz
import json
import sqlite3
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging
from .proxy import load_proxy_urls, make_round_robin_cycler
from .utils import landing_path, write_gzip_atomic
logger = setup_logging("padelnomics.extract.playtomic_tenants")
@@ -35,6 +43,30 @@ PAGE_SIZE = 100
MAX_PAGES = 500 # safety bound — ~50K venues max, well above current ~14K
def _fetch_one_page(proxy_url: str | None, page: int) -> tuple[int, list[dict]]:
"""Fetch a single page using a fresh session with the given proxy.
Returns (page, tenants_list). Raises on HTTP error.
"""
s = niquests.Session()
s.headers["User-Agent"] = USER_AGENT
if proxy_url:
s.proxies = {"http": proxy_url, "https": proxy_url}
params = {"sport_ids": "PADEL", "size": PAGE_SIZE, "page": page}
resp = s.get(PLAYTOMIC_TENANTS_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS)
resp.raise_for_status()
tenants = resp.json()
assert isinstance(tenants, list), f"Expected list from Playtomic API, got {type(tenants)}"
return (page, tenants)
def _fetch_pages_parallel(pages: list[int], next_proxy) -> list[tuple[int, list[dict]]]:
"""Fetch multiple pages concurrently. Returns [(page_num, tenants_list), ...]."""
with ThreadPoolExecutor(max_workers=len(pages)) as pool:
futures = [pool.submit(_fetch_one_page, next_proxy(), p) for p in pages]
return [f.result() for f in as_completed(futures)]
def extract(
landing_dir: Path,
year_month: str,
@@ -46,43 +78,65 @@ def extract(
dest_dir = landing_path(landing_dir, "playtomic", year, month)
dest = dest_dir / "tenants.json.gz"
proxy_urls = load_proxy_urls()
next_proxy = make_round_robin_cycler(proxy_urls) if proxy_urls else None
batch_size = len(proxy_urls) if proxy_urls else 1
if next_proxy:
logger.info("Parallel mode: %d pages per batch (%d proxies)", batch_size, len(proxy_urls))
else:
logger.info("Serial mode: 1 page at a time (no proxies)")
all_tenants: list[dict] = []
seen_ids: set[str] = set()
page = 0
done = False
for page in range(MAX_PAGES):
params = {
"sport_ids": "PADEL",
"size": PAGE_SIZE,
"page": page,
}
while not done and page < MAX_PAGES:
batch_end = min(page + batch_size, MAX_PAGES)
pages_to_fetch = list(range(page, batch_end))
logger.info("GET page=%d (total so far: %d)", page, len(all_tenants))
if next_proxy and len(pages_to_fetch) > 1:
logger.info(
"Fetching pages %d-%d in parallel (%d workers, total so far: %d)",
page, batch_end - 1, len(pages_to_fetch), len(all_tenants),
)
results = _fetch_pages_parallel(pages_to_fetch, next_proxy)
else:
# Serial: reuse the shared session, throttle between pages
page_num = pages_to_fetch[0]
logger.info("GET page=%d (total so far: %d)", page_num, len(all_tenants))
params = {"sport_ids": "PADEL", "size": PAGE_SIZE, "page": page_num}
resp = session.get(PLAYTOMIC_TENANTS_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS)
resp.raise_for_status()
tenants = resp.json()
assert isinstance(tenants, list), (
f"Expected list from Playtomic API, got {type(tenants)}"
)
results = [(page_num, tenants)]
resp = session.get(PLAYTOMIC_TENANTS_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS)
resp.raise_for_status()
# Process pages in order so the done-detection on < PAGE_SIZE is deterministic
for p, tenants in sorted(results):
new_count = 0
for tenant in tenants:
tid = tenant.get("tenant_id") or tenant.get("id")
if tid and tid not in seen_ids:
seen_ids.add(tid)
all_tenants.append(tenant)
new_count += 1
tenants = resp.json()
assert isinstance(tenants, list), (
f"Expected list from Playtomic API, got {type(tenants)}"
)
logger.info(
"page=%d got=%d new=%d total=%d", p, len(tenants), new_count, len(all_tenants),
)
new_count = 0
for tenant in tenants:
tid = tenant.get("tenant_id") or tenant.get("id")
if tid and tid not in seen_ids:
seen_ids.add(tid)
all_tenants.append(tenant)
new_count += 1
# Last page — fewer than PAGE_SIZE results means we've exhausted the list
if len(tenants) < PAGE_SIZE:
done = True
break
logger.info(
"page=%d got=%d new=%d total=%d", page, len(tenants), new_count, len(all_tenants)
)
# Last page — fewer than PAGE_SIZE results means we've exhausted the list
if len(tenants) < PAGE_SIZE:
break
time.sleep(THROTTLE_SECONDS)
page = batch_end
if not next_proxy:
time.sleep(THROTTLE_SECONDS)
payload = json.dumps({"tenants": all_tenants, "count": len(all_tenants)}).encode()
bytes_written = write_gzip_atomic(dest, payload)