Files
padelnomics/extract/padelnomics_extract/src/padelnomics_extract/proxy.py
Deeman 563bd1fb2e
Some checks failed
CI / test (push) Failing after 46s
CI / tag (push) Has been skipped
merge: tiered-proxy-tenants — gisco extractor, proxy fixes, recheck datetime fix
- feat: GISCO NUTS-2 extractor module (replaces standalone script)
- feat: wire 5 unscheduled extractors into workflows.toml
- fix: add load_dotenv() to _shared.py so .env proxies are picked up
- fix: recheck datetime parsing (HH:MM:SS slot times need start_date prefix)
- fix: graceful 0-venue early return in recheck
- fix(proxy): remove Webshare free tier — DC tier 1, residential tier 2

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 22:12:17 +01:00

313 lines
12 KiB
Python

"""Optional proxy rotation for parallel HTTP fetching.
Proxies are configured via environment variables. When unset, all functions
return None/no-op — extractors fall back to direct requests.
Two-tier escalation: datacenter → residential.
Tier 1 (datacenter): PROXY_URLS_DATACENTER — comma-separated paid DC proxies
Tier 2 (residential): PROXY_URLS_RESIDENTIAL — comma-separated paid residential proxies
Tiered circuit breaker:
Active tier is used until consecutive failures >= threshold, then escalates
to the next tier. Once all tiers are exhausted, is_exhausted() returns True.
Escalation is permanent for the duration of the run — no auto-recovery.
"""
import itertools
import logging
import os
import threading
import urllib.error
import urllib.request
logger = logging.getLogger(__name__)
MAX_WEBSHARE_PROXIES = 20
WEBSHARE_FETCH_TIMEOUT_SECONDS = 10
WEBSHARE_MAX_RESPONSE_BYTES = 1024 * 1024 # 1MB
def fetch_webshare_proxies(download_url: str, max_proxies: int = MAX_WEBSHARE_PROXIES) -> list[str]:
"""Fetch proxy list from the Webshare download API. Returns [] on any error.
Expected line format: ip:port:username:password
Converts to: http://username:password@ip:port
Bounded: reads at most WEBSHARE_MAX_RESPONSE_BYTES, returns at most max_proxies.
"""
assert max_proxies > 0, f"max_proxies must be positive, got {max_proxies}"
assert download_url, "download_url must not be empty"
try:
req = urllib.request.Request(
download_url,
headers={"User-Agent": "padelnomics-extract/1.0"},
)
with urllib.request.urlopen(req, timeout=WEBSHARE_FETCH_TIMEOUT_SECONDS) as resp:
raw = resp.read(WEBSHARE_MAX_RESPONSE_BYTES).decode("utf-8")
except Exception as e:
logger.warning("Failed to fetch Webshare proxies: %s", e)
return []
urls = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
parts = line.split(":")
if len(parts) != 4:
logger.debug("Skipping malformed proxy line: %r", line)
continue
ip, port, username, password = parts
urls.append(f"http://{username}:{password}@{ip}:{port}")
if len(urls) >= max_proxies:
break
logger.info("Fetched %d proxies from Webshare", len(urls))
return urls
def load_proxy_tiers() -> list[list[str]]:
"""Assemble proxy tiers in escalation order: datacenter → residential.
Tier 1 (datacenter): PROXY_URLS_DATACENTER (comma-separated).
Tier 2 (residential): PROXY_URLS_RESIDENTIAL (comma-separated).
Empty tiers are omitted. Returns [] if no proxies configured anywhere.
"""
tiers: list[list[str]] = []
for var in ("PROXY_URLS_DATACENTER", "PROXY_URLS_RESIDENTIAL"):
raw = os.environ.get(var, "")
urls = [u.strip() for u in raw.split(",") if u.strip()]
valid = []
for url in urls:
if not url.startswith(("http://", "https://")):
logger.warning("%s contains URL without scheme, skipping: %s", var, url[:60])
continue
valid.append(url)
if valid:
tiers.append(valid)
return tiers
def make_round_robin_cycler(proxy_urls: list[str]):
"""Thread-safe round-robin proxy cycler.
Returns a callable: next_proxy() -> str | None
Returns None-returning callable if no proxies configured.
"""
if not proxy_urls:
return lambda: None
cycle = itertools.cycle(proxy_urls)
lock = threading.Lock()
def next_proxy() -> str:
with lock:
return next(cycle)
return next_proxy
def make_sticky_selector(proxy_urls: list[str]):
"""Hash-based sticky proxy selector.
Returns a callable: select_proxy(key: str) -> str | None
The same key always maps to the same proxy (consistent hashing).
Returns None-returning callable if no proxies configured.
"""
if not proxy_urls:
return lambda key: None
n = len(proxy_urls)
def select_proxy(key: str) -> str:
import hashlib
idx = int(hashlib.md5(key.encode(), usedforsecurity=False).hexdigest(), 16) % n
return proxy_urls[idx]
return select_proxy
def make_tiered_cycler(tiers: list[list[str]], threshold: int, proxy_failure_limit: int = 3) -> dict:
"""Thread-safe N-tier proxy cycler with circuit breaker and per-proxy dead tracking.
Uses tiers[0] until consecutive failures >= threshold, then escalates
to tiers[1], then tiers[2], etc. Once all tiers are exhausted,
is_exhausted() returns True and next_proxy() returns None.
Failure counter resets on each escalation — the new tier gets a fresh start.
Once exhausted, further record_failure() calls are no-ops.
Per-proxy dead tracking (when proxy_failure_limit > 0):
Individual proxies are marked dead after proxy_failure_limit failures and
skipped by next_proxy(). If all proxies in the active tier are dead,
next_proxy() auto-escalates to the next tier. Both mechanisms coexist:
per-proxy dead tracking removes broken individuals; tier-level threshold
catches systemic failure even before any single proxy hits the limit.
Stale-failure protection:
With parallel workers, some threads may fetch a proxy just before the tier
escalates and report failure after. record_failure(proxy_url) checks which
tier the proxy belongs to and ignores the tier-level circuit breaker if the
proxy is from an already-escalated tier. This prevents in-flight failures
from a dead tier instantly exhausting the freshly-escalated one.
Returns a dict of callables:
next_proxy() -> str | None — URL from active tier (skips dead), or None
record_success(proxy_url=None) -> None — resets consecutive failure counter
record_failure(proxy_url=None) -> bool — True if just escalated to next tier
is_exhausted() -> bool — True if all tiers exhausted
active_tier_index() -> int — 0-based index of current tier
tier_count() -> int — total number of tiers
dead_proxy_count() -> int — number of individual proxies marked dead
Edge cases:
Empty tiers list: next_proxy() always returns None, is_exhausted() True.
Single tier: behaves like the primary-only case, is_exhausted() after threshold.
"""
assert threshold > 0, f"threshold must be positive, got {threshold}"
assert isinstance(tiers, list), f"tiers must be a list, got {type(tiers)}"
assert proxy_failure_limit >= 0, f"proxy_failure_limit must be >= 0, got {proxy_failure_limit}"
# Reverse map: proxy URL -> tier index. Used in record_failure to ignore
# "in-flight" failures from workers that fetched a proxy before escalation —
# those failures belong to the old tier and must not count against the new one.
proxy_to_tier_idx: dict[str, int] = {
url: tier_idx
for tier_idx, tier in enumerate(tiers)
for url in tier
}
lock = threading.Lock()
cycles = [itertools.cycle(t) for t in tiers]
state = {
"active_tier": 0,
"consecutive_failures": 0,
"proxy_failure_counts": {}, # proxy_url -> int
"dead_proxies": set(), # proxy URLs marked dead
}
def next_proxy() -> str | None:
with lock:
# Try each remaining tier (bounded: at most len(tiers) escalations)
for _ in range(len(tiers) + 1):
idx = state["active_tier"]
if idx >= len(cycles):
return None
tier_proxies = tiers[idx]
tier_len = len(tier_proxies)
# Find a live proxy in this tier (bounded: try each proxy at most once)
for _ in range(tier_len):
candidate = next(cycles[idx])
if candidate not in state["dead_proxies"]:
return candidate
# All proxies in this tier are dead — auto-escalate
state["consecutive_failures"] = 0
state["active_tier"] += 1
new_idx = state["active_tier"]
if new_idx < len(tiers):
logger.warning(
"All proxies in tier %d are dead — auto-escalating to tier %d/%d",
idx + 1,
new_idx + 1,
len(tiers),
)
else:
logger.error(
"All proxies in all %d tier(s) are dead — no more fallbacks",
len(tiers),
)
return None # safety fallback
def record_success(proxy_url: str | None = None) -> None:
with lock:
state["consecutive_failures"] = 0
if proxy_url is not None:
state["proxy_failure_counts"][proxy_url] = 0
def record_failure(proxy_url: str | None = None) -> bool:
"""Increment failure counter. Returns True if just escalated to next tier."""
with lock:
# Per-proxy dead tracking (additional to tier-level circuit breaker)
if proxy_url is not None and proxy_failure_limit > 0:
count = state["proxy_failure_counts"].get(proxy_url, 0) + 1
state["proxy_failure_counts"][proxy_url] = count
if count >= proxy_failure_limit and proxy_url not in state["dead_proxies"]:
state["dead_proxies"].add(proxy_url)
logger.warning(
"Proxy %s marked dead after %d consecutive failures",
proxy_url,
count,
)
# Tier-level circuit breaker (existing behavior)
idx = state["active_tier"]
if idx >= len(tiers):
# Already exhausted — no-op
return False
# Ignore failures from proxies that belong to an already-escalated tier.
# With parallel workers, some threads fetch a proxy just before escalation
# and report back after — those stale failures must not penalise the new tier.
if proxy_url is not None:
proxy_tier = proxy_to_tier_idx.get(proxy_url)
if proxy_tier is not None and proxy_tier < idx:
return False
state["consecutive_failures"] += 1
if state["consecutive_failures"] < threshold:
return False
# Threshold reached — escalate
state["consecutive_failures"] = 0
state["active_tier"] += 1
new_idx = state["active_tier"]
if new_idx < len(tiers):
logger.warning(
"Circuit open after %d consecutive failures — "
"escalating to proxy tier %d/%d",
threshold,
new_idx + 1,
len(tiers),
)
else:
logger.error(
"All %d proxy tier(s) exhausted after %d consecutive failures — "
"no more fallbacks",
len(tiers),
threshold,
)
return True
def is_exhausted() -> bool:
with lock:
return state["active_tier"] >= len(tiers)
def active_tier_index() -> int:
with lock:
return state["active_tier"]
def tier_count() -> int:
return len(tiers)
def dead_proxy_count() -> int:
with lock:
return len(state["dead_proxies"])
return {
"next_proxy": next_proxy,
"record_success": record_success,
"record_failure": record_failure,
"is_exhausted": is_exhausted,
"active_tier_index": active_tier_index,
"tier_count": tier_count,
"dead_proxy_count": dead_proxy_count,
}