diff --git a/extract/padelnomics_extract/src/padelnomics_extract/proxy.py b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py index fc264a1..07acbbd 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/proxy.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py @@ -157,6 +157,13 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int, proxy_failure_lim per-proxy dead tracking removes broken individuals; tier-level threshold catches systemic failure even before any single proxy hits the limit. + Stale-failure protection: + With parallel workers, some threads may fetch a proxy just before the tier + escalates and report failure after. record_failure(proxy_url) checks which + tier the proxy belongs to and ignores the tier-level circuit breaker if the + proxy is from an already-escalated tier. This prevents in-flight failures + from a dead tier instantly exhausting the freshly-escalated one. + Returns a dict of callables: next_proxy() -> str | None — URL from active tier (skips dead), or None record_success(proxy_url=None) -> None — resets consecutive failure counter @@ -174,6 +181,15 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int, proxy_failure_lim assert isinstance(tiers, list), f"tiers must be a list, got {type(tiers)}" assert proxy_failure_limit >= 0, f"proxy_failure_limit must be >= 0, got {proxy_failure_limit}" + # Reverse map: proxy URL -> tier index. Used in record_failure to ignore + # "in-flight" failures from workers that fetched a proxy before escalation — + # those failures belong to the old tier and must not count against the new one. + proxy_to_tier_idx: dict[str, int] = { + url: tier_idx + for tier_idx, tier in enumerate(tiers) + for url in tier + } + lock = threading.Lock() cycles = [itertools.cycle(t) for t in tiers] state = { @@ -245,6 +261,15 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int, proxy_failure_lim if idx >= len(tiers): # Already exhausted — no-op return False + + # Ignore failures from proxies that belong to an already-escalated tier. + # With parallel workers, some threads fetch a proxy just before escalation + # and report back after — those stale failures must not penalise the new tier. + if proxy_url is not None: + proxy_tier = proxy_to_tier_idx.get(proxy_url) + if proxy_tier is not None and proxy_tier < idx: + return False + state["consecutive_failures"] += 1 if state["consecutive_failures"] < threshold: return False