From a898a0657594e139f152082055c746548ed9ec5d Mon Sep 17 00:00:00 2001 From: Deeman Date: Sun, 1 Mar 2026 12:28:54 +0100 Subject: [PATCH] feat(proxy): per-proxy dead tracking in tiered cycler Add proxy_failure_limit param to make_tiered_cycler (default 3). Individual proxies hitting the limit are marked dead and permanently skipped. next_proxy() auto-escalates when all proxies in the active tier are dead. Both mechanisms coexist: per-proxy dead tracking removes broken individuals; tier-level threshold catches systemic failure. - proxy.py: dead_proxies set + proxy_failure_counts dict in state; next_proxy skips dead proxies with bounded loop; record_failure/ record_success accept optional proxy_url; dead_proxy_count() added - playtomic_tenants.py: pass proxy_url to record_success/record_failure - playtomic_availability.py: _worker returns (proxy_url, result); serial loops in extract + extract_recheck capture proxy_url - test_supervisor.py: 11 new tests in TestTieredCyclerDeadProxyTracking Co-Authored-By: Claude Sonnet 4.6 --- CHANGELOG.md | 7 + .../playtomic_availability.py | 25 ++-- .../padelnomics_extract/playtomic_tenants.py | 4 +- .../src/padelnomics_extract/proxy.py | 82 +++++++++-- web/tests/test_supervisor.py | 128 ++++++++++++++++++ 5 files changed, 222 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 286fbdb..96e36c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] +### Changed +- **Per-proxy dead tracking in tiered cycler** — `make_tiered_cycler` now accepts a `proxy_failure_limit` parameter (default 3). Individual proxies that hit the limit are marked dead and permanently skipped by `next_proxy()`. If all proxies in the active tier are dead, `next_proxy()` auto-escalates to the next tier without needing the tier-level threshold. `record_failure(proxy_url)` and `record_success(proxy_url)` accept an optional `proxy_url` argument for per-proxy tracking; callers without `proxy_url` are fully backward-compatible. New `dead_proxy_count()` callable exposed for monitoring. + - `extract/padelnomics_extract/src/padelnomics_extract/proxy.py`: added per-proxy state (`proxy_failure_counts`, `dead_proxies`), updated `next_proxy`/`record_failure`/`record_success`, added `dead_proxy_count` + - `extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py`: `_fetch_page_via_cycler` passes `proxy_url` to `record_success`/`record_failure` + - `extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py`: `_worker` returns `(proxy_url, result)` tuple; serial loops in `extract` and `extract_recheck` capture `proxy_url` before passing to `record_success`/`record_failure` + - `web/tests/test_supervisor.py`: 11 new tests in `TestTieredCyclerDeadProxyTracking` covering dead proxy skipping, auto-escalation, `dead_proxy_count`, backward compat, and thread safety + ### Added - **Affiliate programs management** — centralised retailer config (`affiliate_programs` table) with URL template + tracking tag + commission %. Products now use a program dropdown + product identifier (e.g. ASIN) instead of manually baking full URLs. URL is assembled at redirect time via `build_affiliate_url()`, so changing a tag propagates instantly to all products. Legacy products (baked `affiliate_url`) continue to work via fallback. Amazon OneLink configured in the Associates dashboard handles geo-redirect to local marketplaces — no per-country programs needed. - `web/src/padelnomics/migrations/versions/0027_affiliate_programs.py`: `affiliate_programs` table, nullable `program_id` + `product_identifier` columns on `affiliate_products`, seeds "Amazon" program, backfills ASINs from existing URLs diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py index f1e68a6..62534b0 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py @@ -213,9 +213,10 @@ def _fetch_venues_parallel( completed_count = 0 lock = threading.Lock() - def _worker(tenant_id: str) -> dict | None: + def _worker(tenant_id: str) -> tuple[str | None, dict | None]: proxy_url = cycler["next_proxy"]() - return _fetch_venue_availability(tenant_id, start_min_str, start_max_str, proxy_url) + result = _fetch_venue_availability(tenant_id, start_min_str, start_max_str, proxy_url) + return proxy_url, result with ThreadPoolExecutor(max_workers=worker_count) as pool: for batch_start in range(0, len(tenant_ids), PARALLEL_BATCH_SIZE): @@ -231,17 +232,17 @@ def _fetch_venues_parallel( batch_futures = {pool.submit(_worker, tid): tid for tid in batch} for future in as_completed(batch_futures): - result = future.result() + proxy_url, result = future.result() with lock: completed_count += 1 if result is not None: venues_data.append(result) - cycler["record_success"]() + cycler["record_success"](proxy_url) if on_result is not None: on_result(result) else: venues_errored += 1 - cycler["record_failure"]() + cycler["record_failure"](proxy_url) if completed_count % 500 == 0: logger.info( @@ -336,16 +337,17 @@ def extract( else: logger.info("Serial mode: 1 worker, %d venues", len(venues_to_process)) for i, tenant_id in enumerate(venues_to_process): + proxy_url = cycler["next_proxy"]() result = _fetch_venue_availability( - tenant_id, start_min_str, start_max_str, cycler["next_proxy"](), + tenant_id, start_min_str, start_max_str, proxy_url, ) if result is not None: new_venues_data.append(result) - cycler["record_success"]() + cycler["record_success"](proxy_url) _on_result(result) else: venues_errored += 1 - cycler["record_failure"]() + cycler["record_failure"](proxy_url) if cycler["is_exhausted"](): logger.error("All proxy tiers exhausted — writing partial results") break @@ -500,13 +502,14 @@ def extract_recheck( venues_data = [] venues_errored = 0 for tid in venues_to_recheck: - result = _fetch_venue_availability(tid, start_min_str, start_max_str, cycler["next_proxy"]()) + proxy_url = cycler["next_proxy"]() + result = _fetch_venue_availability(tid, start_min_str, start_max_str, proxy_url) if result is not None: venues_data.append(result) - cycler["record_success"]() + cycler["record_success"](proxy_url) else: venues_errored += 1 - cycler["record_failure"]() + cycler["record_failure"](proxy_url) if cycler["is_exhausted"](): logger.error("All proxy tiers exhausted — writing partial recheck results") break diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py index c16e976..277bdec 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py @@ -79,7 +79,7 @@ def _fetch_page_via_cycler(cycler: dict, page: int) -> tuple[int, list[dict]]: raise RuntimeError(f"All proxy tiers exhausted fetching page {page}") try: result = _fetch_one_page(proxy_url, page) - cycler["record_success"]() + cycler["record_success"](proxy_url) return result except Exception as exc: last_exc = exc @@ -91,7 +91,7 @@ def _fetch_page_via_cycler(cycler: dict, page: int) -> tuple[int, list[dict]]: proxy_url, exc, ) - cycler["record_failure"]() + cycler["record_failure"](proxy_url) if cycler["is_exhausted"](): raise RuntimeError(f"All proxy tiers exhausted fetching page {page}") from exc raise RuntimeError(f"Page {page} failed after {MAX_PAGE_ATTEMPTS} attempts") from last_exc diff --git a/extract/padelnomics_extract/src/padelnomics_extract/proxy.py b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py index bab4c85..9904c80 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/proxy.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py @@ -134,8 +134,8 @@ def make_sticky_selector(proxy_urls: list[str]): return select_proxy -def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict: - """Thread-safe N-tier proxy cycler with circuit breaker. +def make_tiered_cycler(tiers: list[list[str]], threshold: int, proxy_failure_limit: int = 3) -> dict: + """Thread-safe N-tier proxy cycler with circuit breaker and per-proxy dead tracking. Uses tiers[0] until consecutive failures >= threshold, then escalates to tiers[1], then tiers[2], etc. Once all tiers are exhausted, @@ -144,13 +144,21 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict: Failure counter resets on each escalation — the new tier gets a fresh start. Once exhausted, further record_failure() calls are no-ops. + Per-proxy dead tracking (when proxy_failure_limit > 0): + Individual proxies are marked dead after proxy_failure_limit failures and + skipped by next_proxy(). If all proxies in the active tier are dead, + next_proxy() auto-escalates to the next tier. Both mechanisms coexist: + per-proxy dead tracking removes broken individuals; tier-level threshold + catches systemic failure even before any single proxy hits the limit. + Returns a dict of callables: - next_proxy() -> str | None — URL from the active tier, or None - record_success() -> None — resets consecutive failure counter - record_failure() -> bool — True if just escalated to next tier + next_proxy() -> str | None — URL from active tier (skips dead), or None + record_success(proxy_url=None) -> None — resets consecutive failure counter + record_failure(proxy_url=None) -> bool — True if just escalated to next tier is_exhausted() -> bool — True if all tiers exhausted active_tier_index() -> int — 0-based index of current tier tier_count() -> int — total number of tiers + dead_proxy_count() -> int — number of individual proxies marked dead Edge cases: Empty tiers list: next_proxy() always returns None, is_exhausted() True. @@ -158,28 +166,75 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict: """ assert threshold > 0, f"threshold must be positive, got {threshold}" assert isinstance(tiers, list), f"tiers must be a list, got {type(tiers)}" + assert proxy_failure_limit >= 0, f"proxy_failure_limit must be >= 0, got {proxy_failure_limit}" lock = threading.Lock() cycles = [itertools.cycle(t) for t in tiers] state = { "active_tier": 0, "consecutive_failures": 0, + "proxy_failure_counts": {}, # proxy_url -> int + "dead_proxies": set(), # proxy URLs marked dead } def next_proxy() -> str | None: with lock: - idx = state["active_tier"] - if idx >= len(cycles): - return None - return next(cycles[idx]) + # Try each remaining tier (bounded: at most len(tiers) escalations) + for _ in range(len(tiers) + 1): + idx = state["active_tier"] + if idx >= len(cycles): + return None - def record_success() -> None: + tier_proxies = tiers[idx] + tier_len = len(tier_proxies) + + # Find a live proxy in this tier (bounded: try each proxy at most once) + for _ in range(tier_len): + candidate = next(cycles[idx]) + if candidate not in state["dead_proxies"]: + return candidate + + # All proxies in this tier are dead — auto-escalate + state["consecutive_failures"] = 0 + state["active_tier"] += 1 + new_idx = state["active_tier"] + if new_idx < len(tiers): + logger.warning( + "All proxies in tier %d are dead — auto-escalating to tier %d/%d", + idx + 1, + new_idx + 1, + len(tiers), + ) + else: + logger.error( + "All proxies in all %d tier(s) are dead — no more fallbacks", + len(tiers), + ) + + return None # safety fallback + + def record_success(proxy_url: str | None = None) -> None: with lock: state["consecutive_failures"] = 0 + if proxy_url is not None: + state["proxy_failure_counts"][proxy_url] = 0 - def record_failure() -> bool: + def record_failure(proxy_url: str | None = None) -> bool: """Increment failure counter. Returns True if just escalated to next tier.""" with lock: + # Per-proxy dead tracking (additional to tier-level circuit breaker) + if proxy_url is not None and proxy_failure_limit > 0: + count = state["proxy_failure_counts"].get(proxy_url, 0) + 1 + state["proxy_failure_counts"][proxy_url] = count + if count >= proxy_failure_limit and proxy_url not in state["dead_proxies"]: + state["dead_proxies"].add(proxy_url) + logger.warning( + "Proxy %s marked dead after %d consecutive failures", + proxy_url, + count, + ) + + # Tier-level circuit breaker (existing behavior) idx = state["active_tier"] if idx >= len(tiers): # Already exhausted — no-op @@ -219,6 +274,10 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict: def tier_count() -> int: return len(tiers) + def dead_proxy_count() -> int: + with lock: + return len(state["dead_proxies"]) + return { "next_proxy": next_proxy, "record_success": record_success, @@ -226,4 +285,5 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict: "is_exhausted": is_exhausted, "active_tier_index": active_tier_index, "tier_count": tier_count, + "dead_proxy_count": dead_proxy_count, } diff --git a/web/tests/test_supervisor.py b/web/tests/test_supervisor.py index 55ad30e..be721e7 100644 --- a/web/tests/test_supervisor.py +++ b/web/tests/test_supervisor.py @@ -500,3 +500,131 @@ class TestTieredCyclerNTier: t.join() assert errors == [], f"Thread safety errors: {errors}" + + +class TestTieredCyclerDeadProxyTracking: + """Per-proxy dead tracking: individual proxies marked dead are skipped.""" + + def test_dead_proxy_skipped_in_next_proxy(self): + """After a proxy hits the failure limit it is never returned again.""" + tiers = [["http://dead", "http://live"]] + cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=1) + # Mark http://dead as dead + cycler["record_failure"]("http://dead") + # next_proxy must always return the live one + for _ in range(6): + assert cycler["next_proxy"]() == "http://live" + + def test_dead_proxy_count_increments(self): + tiers = [["http://a", "http://b", "http://c"]] + cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=2) + assert cycler["dead_proxy_count"]() == 0 + cycler["record_failure"]("http://a") + assert cycler["dead_proxy_count"]() == 0 # only 1 failure, limit is 2 + cycler["record_failure"]("http://a") + assert cycler["dead_proxy_count"]() == 1 + cycler["record_failure"]("http://b") + cycler["record_failure"]("http://b") + assert cycler["dead_proxy_count"]() == 2 + + def test_auto_escalates_when_all_proxies_in_tier_dead(self): + """If all proxies in the active tier are dead, next_proxy auto-escalates.""" + tiers = [["http://t0a", "http://t0b"], ["http://t1"]] + cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=1) + # Kill all proxies in tier 0 + cycler["record_failure"]("http://t0a") + cycler["record_failure"]("http://t0b") + # next_proxy should transparently escalate and return tier 1 proxy + assert cycler["next_proxy"]() == "http://t1" + + def test_auto_escalates_updates_active_tier_index(self): + """Auto-escalation via dead proxies bumps active_tier_index.""" + tiers = [["http://t0a", "http://t0b"], ["http://t1"]] + cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=1) + cycler["record_failure"]("http://t0a") + cycler["record_failure"]("http://t0b") + cycler["next_proxy"]() # triggers auto-escalation + assert cycler["active_tier_index"]() == 1 + + def test_returns_none_when_all_tiers_exhausted_by_dead_proxies(self): + tiers = [["http://t0"], ["http://t1"]] + cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=1) + cycler["record_failure"]("http://t0") + cycler["record_failure"]("http://t1") + assert cycler["next_proxy"]() is None + + def test_record_success_resets_per_proxy_counter(self): + """Success resets the failure count so proxy is not marked dead.""" + tiers = [["http://a", "http://b"]] + cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=3) + # Two failures — not dead yet + cycler["record_failure"]("http://a") + cycler["record_failure"]("http://a") + assert cycler["dead_proxy_count"]() == 0 + # Success resets the counter + cycler["record_success"]("http://a") + # Two more failures — still not dead (counter was reset) + cycler["record_failure"]("http://a") + cycler["record_failure"]("http://a") + assert cycler["dead_proxy_count"]() == 0 + # Third failure after reset — now dead + cycler["record_failure"]("http://a") + assert cycler["dead_proxy_count"]() == 1 + + def test_dead_proxy_stays_dead_after_success(self): + """Once marked dead, a proxy is not revived by record_success.""" + tiers = [["http://a", "http://b"]] + cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=1) + cycler["record_failure"]("http://a") + assert cycler["dead_proxy_count"]() == 1 + cycler["record_success"]("http://a") + assert cycler["dead_proxy_count"]() == 1 + # http://a is still skipped + for _ in range(6): + assert cycler["next_proxy"]() == "http://b" + + def test_backward_compat_no_proxy_url(self): + """Calling record_failure/record_success without proxy_url still works.""" + tiers = [["http://t0"], ["http://t1"]] + cycler = make_tiered_cycler(tiers, threshold=2) + cycler["record_failure"]() + cycler["record_failure"]() # escalates + assert cycler["active_tier_index"]() == 1 + cycler["record_success"]() + assert cycler["dead_proxy_count"]() == 0 # no per-proxy tracking happened + + def test_proxy_failure_limit_zero_disables_per_proxy_tracking(self): + """proxy_failure_limit=0 disables per-proxy dead tracking entirely.""" + tiers = [["http://a", "http://b"]] + cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=0) + for _ in range(100): + cycler["record_failure"]("http://a") + assert cycler["dead_proxy_count"]() == 0 + + def test_thread_safety_with_per_proxy_tracking(self): + """Concurrent record_failure(proxy_url) calls don't corrupt state.""" + import threading as _threading + + tiers = [["http://t0a", "http://t0b", "http://t0c"], ["http://t1a"]] + cycler = make_tiered_cycler(tiers, threshold=50, proxy_failure_limit=5) + errors = [] + lock = _threading.Lock() + + def worker(): + try: + for _ in range(30): + p = cycler["next_proxy"]() + if p is not None: + cycler["record_failure"](p) + cycler["record_success"](p) + except Exception as e: + with lock: + errors.append(e) + + threads = [_threading.Thread(target=worker) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert errors == [], f"Thread safety errors: {errors}"