fix(proxy): ignore stale-tier failures in record_failure()

With parallel workers, threads that fetch a proxy just before escalation
can report failures after the tier has already changed — those failures
were silently counting against the new tier, immediately exhausting it
before it ever got tried (Rayobyte being skipped entirely in favour of
DataImpulse because 10 in-flight Webshare failures hit the threshold).

Fix: build a proxy_url → tier_idx reverse map at construction time and
skip the tier-level circuit breaker when the failing proxy belongs to an
already-escalated tier.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-03-01 14:43:05 +01:00
parent 1c0edff3e5
commit 42c49e383c

View File

@@ -157,6 +157,13 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int, proxy_failure_lim
per-proxy dead tracking removes broken individuals; tier-level threshold per-proxy dead tracking removes broken individuals; tier-level threshold
catches systemic failure even before any single proxy hits the limit. catches systemic failure even before any single proxy hits the limit.
Stale-failure protection:
With parallel workers, some threads may fetch a proxy just before the tier
escalates and report failure after. record_failure(proxy_url) checks which
tier the proxy belongs to and ignores the tier-level circuit breaker if the
proxy is from an already-escalated tier. This prevents in-flight failures
from a dead tier instantly exhausting the freshly-escalated one.
Returns a dict of callables: Returns a dict of callables:
next_proxy() -> str | None — URL from active tier (skips dead), or None next_proxy() -> str | None — URL from active tier (skips dead), or None
record_success(proxy_url=None) -> None — resets consecutive failure counter record_success(proxy_url=None) -> None — resets consecutive failure counter
@@ -174,6 +181,15 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int, proxy_failure_lim
assert isinstance(tiers, list), f"tiers must be a list, got {type(tiers)}" assert isinstance(tiers, list), f"tiers must be a list, got {type(tiers)}"
assert proxy_failure_limit >= 0, f"proxy_failure_limit must be >= 0, got {proxy_failure_limit}" assert proxy_failure_limit >= 0, f"proxy_failure_limit must be >= 0, got {proxy_failure_limit}"
# Reverse map: proxy URL -> tier index. Used in record_failure to ignore
# "in-flight" failures from workers that fetched a proxy before escalation —
# those failures belong to the old tier and must not count against the new one.
proxy_to_tier_idx: dict[str, int] = {
url: tier_idx
for tier_idx, tier in enumerate(tiers)
for url in tier
}
lock = threading.Lock() lock = threading.Lock()
cycles = [itertools.cycle(t) for t in tiers] cycles = [itertools.cycle(t) for t in tiers]
state = { state = {
@@ -245,6 +261,15 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int, proxy_failure_lim
if idx >= len(tiers): if idx >= len(tiers):
# Already exhausted — no-op # Already exhausted — no-op
return False return False
# Ignore failures from proxies that belong to an already-escalated tier.
# With parallel workers, some threads fetch a proxy just before escalation
# and report back after — those stale failures must not penalise the new tier.
if proxy_url is not None:
proxy_tier = proxy_to_tier_idx.get(proxy_url)
if proxy_tier is not None and proxy_tier < idx:
return False
state["consecutive_failures"] += 1 state["consecutive_failures"] += 1
if state["consecutive_failures"] < threshold: if state["consecutive_failures"] < threshold:
return False return False