feat(extract): three-tier proxy system with Webshare auto-fetch

Replace two-tier proxy setup (PROXY_URLS / PROXY_URLS_FALLBACK) with
N-tier escalation: free → datacenter → residential.

- proxy.py: fetch_webshare_proxies() auto-fetches the Webshare download
  API on each run (no more stale manually-copied lists). load_proxy_tiers()
  assembles tiers from WEBSHARE_DOWNLOAD_URL, PROXY_URLS_DATACENTER,
  PROXY_URLS_RESIDENTIAL. make_tiered_cycler() generalised to list[list[str]]
  with N-level escalation; is_fallback_active() replaced by is_exhausted().
  Old load_proxy_urls() / load_fallback_proxy_urls() deleted.

- playtomic_availability.py: both extract() and extract_recheck() use
  load_proxy_tiers() + generalised cycler. _fetch_venues_parallel fallback_urls
  param removed. All is_fallback_active() checks → is_exhausted().

- playtomic_tenants.py: flattens tiers for simple round-robin.

- test_supervisor.py: TestLoadProxyUrls removed (function deleted).
  Added TestFetchWebshareProxies, TestLoadProxyTiers, TestTieredCyclerNTier
  (11 tests covering parse format, error handling, escalation, thread safety).

47 tests pass, ruff clean.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-28 16:57:07 +01:00
parent 642041b32b
commit adf22924f6
5 changed files with 413 additions and 116 deletions

View File

@@ -33,7 +33,7 @@ from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler
from .proxy import load_proxy_tiers, make_tiered_cycler
from .utils import (
compress_jsonl_atomic,
flush_partial_batch,
@@ -190,14 +190,13 @@ def _fetch_venues_parallel(
start_max_str: str,
worker_count: int,
cycler: dict,
fallback_urls: list[str],
on_result=None,
) -> tuple[list[dict], int]:
"""Fetch availability for multiple venues in parallel.
Submits futures in batches of PARALLEL_BATCH_SIZE. After each batch
completes, checks the circuit breaker: if it opened and there is no
fallback configured, stops submitting further batches.
completes, checks the circuit breaker: if all proxy tiers are exhausted,
stops submitting further batches.
on_result: optional callable(result: dict) invoked inside the lock for
each successful result — used for incremental partial-file flushing.
@@ -215,10 +214,10 @@ def _fetch_venues_parallel(
with ThreadPoolExecutor(max_workers=worker_count) as pool:
for batch_start in range(0, len(tenant_ids), PARALLEL_BATCH_SIZE):
# Stop submitting new work if circuit is open with no fallback
if cycler["is_fallback_active"]() and not fallback_urls:
# Stop submitting new work if all proxy tiers are exhausted
if cycler["is_exhausted"]():
logger.error(
"Circuit open with no fallback — stopping after %d/%d venues",
"All proxy tiers exhausted — stopping after %d/%d venues",
completed_count, len(tenant_ids),
)
break
@@ -294,10 +293,9 @@ def extract(
venues_to_process = [tid for tid in all_venues_to_process if tid not in already_done]
# Set up tiered proxy cycler with circuit breaker
proxy_urls = load_proxy_urls()
fallback_urls = load_fallback_proxy_urls()
worker_count = len(proxy_urls) if proxy_urls else 1
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
tiers = load_proxy_tiers()
worker_count = len(tiers[0]) if tiers else 1
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD)
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S")
@@ -325,9 +323,9 @@ def extract(
venues_errored = 0
if worker_count > 1:
logger.info("Parallel mode: %d workers, %d proxies", worker_count, len(proxy_urls))
logger.info("Parallel mode: %d workers (tier 0), %d tiers total", worker_count, len(tiers))
new_venues_data, venues_errored = _fetch_venues_parallel(
venues_to_process, start_min_str, start_max_str, worker_count, cycler, fallback_urls,
venues_to_process, start_min_str, start_max_str, worker_count, cycler,
on_result=_on_result,
)
else:
@@ -342,9 +340,9 @@ def extract(
_on_result(result)
else:
venues_errored += 1
circuit_opened = cycler["record_failure"]()
if circuit_opened and not fallback_urls:
logger.error("Circuit open with no fallback — writing partial results")
cycler["record_failure"]()
if cycler["is_exhausted"]():
logger.error("All proxy tiers exhausted — writing partial results")
break
if (i + 1) % 100 == 0:
@@ -485,14 +483,13 @@ def extract_recheck(
start_max_str = window_end.strftime("%Y-%m-%dT%H:%M:%S")
# Set up tiered proxy cycler with circuit breaker
proxy_urls = load_proxy_urls()
fallback_urls = load_fallback_proxy_urls()
worker_count = len(proxy_urls) if proxy_urls else 1
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
tiers = load_proxy_tiers()
worker_count = len(tiers[0]) if tiers else 1
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD)
if worker_count > 1 and len(venues_to_recheck) > 10:
venues_data, venues_errored = _fetch_venues_parallel(
venues_to_recheck, start_min_str, start_max_str, worker_count, cycler, fallback_urls,
venues_to_recheck, start_min_str, start_max_str, worker_count, cycler,
)
else:
venues_data = []
@@ -504,9 +501,9 @@ def extract_recheck(
cycler["record_success"]()
else:
venues_errored += 1
circuit_opened = cycler["record_failure"]()
if circuit_opened and not fallback_urls:
logger.error("Circuit open with no fallback — writing partial recheck results")
cycler["record_failure"]()
if cycler["is_exhausted"]():
logger.error("All proxy tiers exhausted — writing partial recheck results")
break
# Write recheck file as JSONL — one venue per line with metadata injected

View File

@@ -30,7 +30,7 @@ from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
from .proxy import load_proxy_urls, make_round_robin_cycler
from .proxy import load_proxy_tiers, make_round_robin_cycler
from .utils import compress_jsonl_atomic, landing_path
logger = setup_logging("padelnomics.extract.playtomic_tenants")
@@ -82,12 +82,13 @@ def extract(
logger.info("Already have tenants for %s/%s — skipping", year, month)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
proxy_urls = load_proxy_urls()
next_proxy = make_round_robin_cycler(proxy_urls) if proxy_urls else None
batch_size = len(proxy_urls) if proxy_urls else 1
tiers = load_proxy_tiers()
all_proxies = [url for tier in tiers for url in tier]
next_proxy = make_round_robin_cycler(all_proxies) if all_proxies else None
batch_size = len(all_proxies) if all_proxies else 1
if next_proxy:
logger.info("Parallel mode: %d pages per batch (%d proxies)", batch_size, len(proxy_urls))
logger.info("Parallel mode: %d pages per batch (%d proxies across %d tier(s))", batch_size, len(all_proxies), len(tiers))
else:
logger.info("Serial mode: 1 page at a time (no proxies)")

View File

@@ -1,41 +1,97 @@
"""Optional proxy rotation for parallel HTTP fetching.
Proxies are configured via the PROXY_URLS environment variable (comma-separated).
When unset, all functions return None/no-op — extractors fall back to direct requests.
Proxies are configured via environment variables. When unset, all functions
return None/no-op — extractors fall back to direct requests.
Tiered proxy with circuit breaker:
Primary tier (PROXY_URLS) is used by default — typically cheap datacenter proxies.
Fallback tier (PROXY_URLS_FALLBACK) activates once consecutive failures >= threshold.
Once the circuit opens it stays open for the duration of the run (no auto-recovery).
Three-tier escalation: free → datacenter → residential.
Tier 1 (free): WEBSHARE_DOWNLOAD_URL — auto-fetched from Webshare API
Tier 2 (datacenter): PROXY_URLS_DATACENTER — comma-separated paid DC proxies
Tier 3 (residential): PROXY_URLS_RESIDENTIAL — comma-separated paid residential proxies
Tiered circuit breaker:
Active tier is used until consecutive failures >= threshold, then escalates
to the next tier. Once all tiers are exhausted, is_exhausted() returns True.
Escalation is permanent for the duration of the run — no auto-recovery.
"""
import itertools
import logging
import os
import threading
import urllib.error
import urllib.request
logger = logging.getLogger(__name__)
MAX_WEBSHARE_PROXIES = 20
WEBSHARE_FETCH_TIMEOUT_SECONDS = 10
WEBSHARE_MAX_RESPONSE_BYTES = 1024 * 1024 # 1MB
def load_proxy_urls() -> list[str]:
"""Read PROXY_URLS env var (comma-separated). Returns [] if unset.
Format: http://user:pass@host:port or socks5://host:port
def fetch_webshare_proxies(download_url: str, max_proxies: int = MAX_WEBSHARE_PROXIES) -> list[str]:
"""Fetch proxy list from the Webshare download API. Returns [] on any error.
Expected line format: ip:port:username:password
Converts to: http://username:password@ip:port
Bounded: reads at most WEBSHARE_MAX_RESPONSE_BYTES, returns at most max_proxies.
"""
raw = os.environ.get("PROXY_URLS", "")
urls = [u.strip() for u in raw.split(",") if u.strip()]
assert max_proxies > 0, f"max_proxies must be positive, got {max_proxies}"
assert download_url, "download_url must not be empty"
try:
req = urllib.request.Request(
download_url,
headers={"User-Agent": "padelnomics-extract/1.0"},
)
with urllib.request.urlopen(req, timeout=WEBSHARE_FETCH_TIMEOUT_SECONDS) as resp:
raw = resp.read(WEBSHARE_MAX_RESPONSE_BYTES).decode("utf-8")
except Exception as e:
logger.warning("Failed to fetch Webshare proxies: %s", e)
return []
urls = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
parts = line.split(":")
if len(parts) != 4:
logger.debug("Skipping malformed proxy line: %r", line)
continue
ip, port, username, password = parts
urls.append(f"http://{username}:{password}@{ip}:{port}")
if len(urls) >= max_proxies:
break
logger.info("Fetched %d proxies from Webshare", len(urls))
return urls
def load_fallback_proxy_urls() -> list[str]:
"""Read PROXY_URLS_FALLBACK env var (comma-separated). Returns [] if unset.
def load_proxy_tiers() -> list[list[str]]:
"""Assemble proxy tiers in escalation order: free → datacenter → residential.
Used as the residential/reliable fallback tier when the primary tier fails.
Format: http://user:pass@host:port or socks5://host:port
Tier 1 (free): fetched from WEBSHARE_DOWNLOAD_URL if set.
Tier 2 (datacenter): PROXY_URLS_DATACENTER (comma-separated).
Tier 3 (residential): PROXY_URLS_RESIDENTIAL (comma-separated).
Empty tiers are omitted. Returns [] if no proxies configured anywhere.
"""
raw = os.environ.get("PROXY_URLS_FALLBACK", "")
urls = [u.strip() for u in raw.split(",") if u.strip()]
return urls
tiers: list[list[str]] = []
webshare_url = os.environ.get("WEBSHARE_DOWNLOAD_URL", "").strip()
if webshare_url:
free_proxies = fetch_webshare_proxies(webshare_url)
if free_proxies:
tiers.append(free_proxies)
for var in ("PROXY_URLS_DATACENTER", "PROXY_URLS_RESIDENTIAL"):
raw = os.environ.get(var, "")
urls = [u.strip() for u in raw.split(",") if u.strip()]
if urls:
tiers.append(urls)
return tiers
def make_round_robin_cycler(proxy_urls: list[str]):
@@ -78,83 +134,96 @@ def make_sticky_selector(proxy_urls: list[str]):
return select_proxy
def make_tiered_cycler(
primary_urls: list[str],
fallback_urls: list[str],
threshold: int,
) -> dict:
"""Thread-safe tiered proxy cycler with circuit breaker.
def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict:
"""Thread-safe N-tier proxy cycler with circuit breaker.
Uses primary_urls until consecutive failures >= threshold, then switches
permanently to fallback_urls for the rest of the run. No auto-recovery —
once the circuit opens it stays open to avoid flapping.
Uses tiers[0] until consecutive failures >= threshold, then escalates
to tiers[1], then tiers[2], etc. Once all tiers are exhausted,
is_exhausted() returns True and next_proxy() returns None.
Failure counter resets on each escalation — the new tier gets a fresh start.
Once exhausted, further record_failure() calls are no-ops.
Returns a dict of callables:
next_proxy() -> str | None — returns URL from the active tier
record_success() — resets consecutive failure counter
record_failure() -> bool — increments counter; True if circuit just opened
is_fallback_active() -> bool — whether fallback tier is currently active
next_proxy() -> str | None — URL from the active tier, or None
record_success() -> None — resets consecutive failure counter
record_failure() -> bool — True if just escalated to next tier
is_exhausted() -> bool — True if all tiers exhausted
active_tier_index() -> int — 0-based index of current tier
tier_count() -> int — total number of tiers
If primary_urls is empty: always returns from fallback_urls (no circuit breaker needed).
If both are empty: next_proxy() always returns None.
Edge cases:
Empty tiers list: next_proxy() always returns None, is_exhausted() True.
Single tier: behaves like the primary-only case, is_exhausted() after threshold.
"""
assert threshold > 0, f"threshold must be positive, got {threshold}"
assert isinstance(tiers, list), f"tiers must be a list, got {type(tiers)}"
lock = threading.Lock()
cycles = [itertools.cycle(t) for t in tiers]
state = {
"active_tier": 0,
"consecutive_failures": 0,
"fallback_active": False,
}
primary_cycle = itertools.cycle(primary_urls) if primary_urls else None
fallback_cycle = itertools.cycle(fallback_urls) if fallback_urls else None
# No primary proxies — skip circuit breaker, use fallback directly
if not primary_urls:
state["fallback_active"] = True
def next_proxy() -> str | None:
with lock:
if state["fallback_active"]:
return next(fallback_cycle) if fallback_cycle else None
return next(primary_cycle) if primary_cycle else None
idx = state["active_tier"]
if idx >= len(cycles):
return None
return next(cycles[idx])
def record_success() -> None:
with lock:
state["consecutive_failures"] = 0
def record_failure() -> bool:
"""Increment failure counter. Returns True if circuit just opened."""
"""Increment failure counter. Returns True if just escalated to next tier."""
with lock:
if state["fallback_active"]:
# Already on fallback — don't trip the circuit again
idx = state["active_tier"]
if idx >= len(tiers):
# Already exhausted — no-op
return False
state["consecutive_failures"] += 1
if state["consecutive_failures"] >= threshold:
state["fallback_active"] = True
if fallback_urls:
logger.warning(
"Circuit open after %d consecutive failures — "
"switching to fallback residential proxies",
state["consecutive_failures"],
)
else:
logger.error(
"Circuit open after %d consecutive failures — "
"no fallback configured, aborting run",
state["consecutive_failures"],
)
return True
return False
if state["consecutive_failures"] < threshold:
return False
# Threshold reached — escalate
state["consecutive_failures"] = 0
state["active_tier"] += 1
new_idx = state["active_tier"]
if new_idx < len(tiers):
logger.warning(
"Circuit open after %d consecutive failures — "
"escalating to proxy tier %d/%d",
threshold,
new_idx + 1,
len(tiers),
)
else:
logger.error(
"All %d proxy tier(s) exhausted after %d consecutive failures — "
"no more fallbacks",
len(tiers),
threshold,
)
return True
def is_fallback_active() -> bool:
def is_exhausted() -> bool:
with lock:
return state["fallback_active"]
return state["active_tier"] >= len(tiers)
def active_tier_index() -> int:
with lock:
return state["active_tier"]
def tier_count() -> int:
return len(tiers)
return {
"next_proxy": next_proxy,
"record_success": record_success,
"record_failure": record_failure,
"is_fallback_active": is_fallback_active,
"is_exhausted": is_exhausted,
"active_tier_index": active_tier_index,
"tier_count": tier_count,
}