From 42c49e383c0f93bfabf34985b3a0feb224711daf Mon Sep 17 00:00:00 2001 From: Deeman Date: Sun, 1 Mar 2026 14:43:05 +0100 Subject: [PATCH] fix(proxy): ignore stale-tier failures in record_failure() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With parallel workers, threads that fetch a proxy just before escalation can report failures after the tier has already changed — those failures were silently counting against the new tier, immediately exhausting it before it ever got tried (Rayobyte being skipped entirely in favour of DataImpulse because 10 in-flight Webshare failures hit the threshold). Fix: build a proxy_url → tier_idx reverse map at construction time and skip the tier-level circuit breaker when the failing proxy belongs to an already-escalated tier. Co-Authored-By: Claude Sonnet 4.6 --- .../src/padelnomics_extract/proxy.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/extract/padelnomics_extract/src/padelnomics_extract/proxy.py b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py index fc264a1..07acbbd 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/proxy.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py @@ -157,6 +157,13 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int, proxy_failure_lim per-proxy dead tracking removes broken individuals; tier-level threshold catches systemic failure even before any single proxy hits the limit. + Stale-failure protection: + With parallel workers, some threads may fetch a proxy just before the tier + escalates and report failure after. record_failure(proxy_url) checks which + tier the proxy belongs to and ignores the tier-level circuit breaker if the + proxy is from an already-escalated tier. This prevents in-flight failures + from a dead tier instantly exhausting the freshly-escalated one. + Returns a dict of callables: next_proxy() -> str | None — URL from active tier (skips dead), or None record_success(proxy_url=None) -> None — resets consecutive failure counter @@ -174,6 +181,15 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int, proxy_failure_lim assert isinstance(tiers, list), f"tiers must be a list, got {type(tiers)}" assert proxy_failure_limit >= 0, f"proxy_failure_limit must be >= 0, got {proxy_failure_limit}" + # Reverse map: proxy URL -> tier index. Used in record_failure to ignore + # "in-flight" failures from workers that fetched a proxy before escalation — + # those failures belong to the old tier and must not count against the new one. + proxy_to_tier_idx: dict[str, int] = { + url: tier_idx + for tier_idx, tier in enumerate(tiers) + for url in tier + } + lock = threading.Lock() cycles = [itertools.cycle(t) for t in tiers] state = { @@ -245,6 +261,15 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int, proxy_failure_lim if idx >= len(tiers): # Already exhausted — no-op return False + + # Ignore failures from proxies that belong to an already-escalated tier. + # With parallel workers, some threads fetch a proxy just before escalation + # and report back after — those stale failures must not penalise the new tier. + if proxy_url is not None: + proxy_tier = proxy_to_tier_idx.get(proxy_url) + if proxy_tier is not None and proxy_tier < idx: + return False + state["consecutive_failures"] += 1 if state["consecutive_failures"] < threshold: return False