diff --git a/CHANGELOG.md b/CHANGELOG.md index 48f9964..0a4d1d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] +### Added +- **Three-tier proxy system** for extraction pipeline: free (Webshare auto-fetched) → datacenter (`PROXY_URLS_DATACENTER`) → residential (`PROXY_URLS_RESIDENTIAL`). Webshare free proxies are now auto-fetched from their download API on each run — no more manually copying stale proxy lists. + - `proxy.py`: added `fetch_webshare_proxies()` (stdlib urllib, bounded read + timeout), `load_proxy_tiers()` (assembles N tiers from env), generalised `make_tiered_cycler()` to accept `list[list[str]]` with N-level escalation. Exposes `is_exhausted()`, `active_tier_index()`, `tier_count()`. + - `playtomic_availability.py`: both `extract()` and `extract_recheck()` now use `load_proxy_tiers()` + N-tier cycler. `_fetch_venues_parallel` `fallback_urls` param removed. `is_fallback_active()` replaced by `is_exhausted()`. + - `playtomic_tenants.py`: uses `load_proxy_tiers()` flattened for simple round-robin. + +### Changed +- **Env vars renamed** (breaking): `PROXY_URLS` → removed, `PROXY_URLS_FALLBACK` → removed. New vars: `WEBSHARE_DOWNLOAD_URL`, `PROXY_URLS_DATACENTER`, `PROXY_URLS_RESIDENTIAL`. + ### Added - **Phase 2a — NUTS-1 regional income differentiation** (`opportunity_score`): Munich and Berlin no longer share the same income figure as Chemnitz. - `eurostat.py`: added `nama_10r_2hhinc` dataset config (NUTS-2 cube with NUTS-1 entries); filter params now appended to API URL so the server pre-filters the large cube before download (also makes `ilc_di03` requests smaller). diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py index ff2d10f..d708a61 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py @@ -33,7 +33,7 @@ from pathlib import Path import niquests from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy -from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler +from .proxy import load_proxy_tiers, make_tiered_cycler from .utils import ( compress_jsonl_atomic, flush_partial_batch, @@ -190,14 +190,13 @@ def _fetch_venues_parallel( start_max_str: str, worker_count: int, cycler: dict, - fallback_urls: list[str], on_result=None, ) -> tuple[list[dict], int]: """Fetch availability for multiple venues in parallel. Submits futures in batches of PARALLEL_BATCH_SIZE. After each batch - completes, checks the circuit breaker: if it opened and there is no - fallback configured, stops submitting further batches. + completes, checks the circuit breaker: if all proxy tiers are exhausted, + stops submitting further batches. on_result: optional callable(result: dict) invoked inside the lock for each successful result — used for incremental partial-file flushing. @@ -215,10 +214,10 @@ def _fetch_venues_parallel( with ThreadPoolExecutor(max_workers=worker_count) as pool: for batch_start in range(0, len(tenant_ids), PARALLEL_BATCH_SIZE): - # Stop submitting new work if circuit is open with no fallback - if cycler["is_fallback_active"]() and not fallback_urls: + # Stop submitting new work if all proxy tiers are exhausted + if cycler["is_exhausted"](): logger.error( - "Circuit open with no fallback — stopping after %d/%d venues", + "All proxy tiers exhausted — stopping after %d/%d venues", completed_count, len(tenant_ids), ) break @@ -294,10 +293,9 @@ def extract( venues_to_process = [tid for tid in all_venues_to_process if tid not in already_done] # Set up tiered proxy cycler with circuit breaker - proxy_urls = load_proxy_urls() - fallback_urls = load_fallback_proxy_urls() - worker_count = len(proxy_urls) if proxy_urls else 1 - cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD) + tiers = load_proxy_tiers() + worker_count = len(tiers[0]) if tiers else 1 + cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD) start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S") start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S") @@ -325,9 +323,9 @@ def extract( venues_errored = 0 if worker_count > 1: - logger.info("Parallel mode: %d workers, %d proxies", worker_count, len(proxy_urls)) + logger.info("Parallel mode: %d workers (tier 0), %d tiers total", worker_count, len(tiers)) new_venues_data, venues_errored = _fetch_venues_parallel( - venues_to_process, start_min_str, start_max_str, worker_count, cycler, fallback_urls, + venues_to_process, start_min_str, start_max_str, worker_count, cycler, on_result=_on_result, ) else: @@ -342,9 +340,9 @@ def extract( _on_result(result) else: venues_errored += 1 - circuit_opened = cycler["record_failure"]() - if circuit_opened and not fallback_urls: - logger.error("Circuit open with no fallback — writing partial results") + cycler["record_failure"]() + if cycler["is_exhausted"](): + logger.error("All proxy tiers exhausted — writing partial results") break if (i + 1) % 100 == 0: @@ -485,14 +483,13 @@ def extract_recheck( start_max_str = window_end.strftime("%Y-%m-%dT%H:%M:%S") # Set up tiered proxy cycler with circuit breaker - proxy_urls = load_proxy_urls() - fallback_urls = load_fallback_proxy_urls() - worker_count = len(proxy_urls) if proxy_urls else 1 - cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD) + tiers = load_proxy_tiers() + worker_count = len(tiers[0]) if tiers else 1 + cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD) if worker_count > 1 and len(venues_to_recheck) > 10: venues_data, venues_errored = _fetch_venues_parallel( - venues_to_recheck, start_min_str, start_max_str, worker_count, cycler, fallback_urls, + venues_to_recheck, start_min_str, start_max_str, worker_count, cycler, ) else: venues_data = [] @@ -504,9 +501,9 @@ def extract_recheck( cycler["record_success"]() else: venues_errored += 1 - circuit_opened = cycler["record_failure"]() - if circuit_opened and not fallback_urls: - logger.error("Circuit open with no fallback — writing partial recheck results") + cycler["record_failure"]() + if cycler["is_exhausted"](): + logger.error("All proxy tiers exhausted — writing partial recheck results") break # Write recheck file as JSONL — one venue per line with metadata injected diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py index 2ad4166..8b63a39 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py @@ -30,7 +30,7 @@ from pathlib import Path import niquests from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy -from .proxy import load_proxy_urls, make_round_robin_cycler +from .proxy import load_proxy_tiers, make_round_robin_cycler from .utils import compress_jsonl_atomic, landing_path logger = setup_logging("padelnomics.extract.playtomic_tenants") @@ -82,12 +82,13 @@ def extract( logger.info("Already have tenants for %s/%s — skipping", year, month) return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} - proxy_urls = load_proxy_urls() - next_proxy = make_round_robin_cycler(proxy_urls) if proxy_urls else None - batch_size = len(proxy_urls) if proxy_urls else 1 + tiers = load_proxy_tiers() + all_proxies = [url for tier in tiers for url in tier] + next_proxy = make_round_robin_cycler(all_proxies) if all_proxies else None + batch_size = len(all_proxies) if all_proxies else 1 if next_proxy: - logger.info("Parallel mode: %d pages per batch (%d proxies)", batch_size, len(proxy_urls)) + logger.info("Parallel mode: %d pages per batch (%d proxies across %d tier(s))", batch_size, len(all_proxies), len(tiers)) else: logger.info("Serial mode: 1 page at a time (no proxies)") diff --git a/extract/padelnomics_extract/src/padelnomics_extract/proxy.py b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py index 7a8b5f5..bab4c85 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/proxy.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py @@ -1,41 +1,97 @@ """Optional proxy rotation for parallel HTTP fetching. -Proxies are configured via the PROXY_URLS environment variable (comma-separated). -When unset, all functions return None/no-op — extractors fall back to direct requests. +Proxies are configured via environment variables. When unset, all functions +return None/no-op — extractors fall back to direct requests. -Tiered proxy with circuit breaker: - Primary tier (PROXY_URLS) is used by default — typically cheap datacenter proxies. - Fallback tier (PROXY_URLS_FALLBACK) activates once consecutive failures >= threshold. - Once the circuit opens it stays open for the duration of the run (no auto-recovery). +Three-tier escalation: free → datacenter → residential. + Tier 1 (free): WEBSHARE_DOWNLOAD_URL — auto-fetched from Webshare API + Tier 2 (datacenter): PROXY_URLS_DATACENTER — comma-separated paid DC proxies + Tier 3 (residential): PROXY_URLS_RESIDENTIAL — comma-separated paid residential proxies + +Tiered circuit breaker: + Active tier is used until consecutive failures >= threshold, then escalates + to the next tier. Once all tiers are exhausted, is_exhausted() returns True. + Escalation is permanent for the duration of the run — no auto-recovery. """ import itertools import logging import os import threading +import urllib.error +import urllib.request logger = logging.getLogger(__name__) +MAX_WEBSHARE_PROXIES = 20 +WEBSHARE_FETCH_TIMEOUT_SECONDS = 10 +WEBSHARE_MAX_RESPONSE_BYTES = 1024 * 1024 # 1MB -def load_proxy_urls() -> list[str]: - """Read PROXY_URLS env var (comma-separated). Returns [] if unset. - Format: http://user:pass@host:port or socks5://host:port +def fetch_webshare_proxies(download_url: str, max_proxies: int = MAX_WEBSHARE_PROXIES) -> list[str]: + """Fetch proxy list from the Webshare download API. Returns [] on any error. + + Expected line format: ip:port:username:password + Converts to: http://username:password@ip:port + + Bounded: reads at most WEBSHARE_MAX_RESPONSE_BYTES, returns at most max_proxies. """ - raw = os.environ.get("PROXY_URLS", "") - urls = [u.strip() for u in raw.split(",") if u.strip()] + assert max_proxies > 0, f"max_proxies must be positive, got {max_proxies}" + assert download_url, "download_url must not be empty" + + try: + req = urllib.request.Request( + download_url, + headers={"User-Agent": "padelnomics-extract/1.0"}, + ) + with urllib.request.urlopen(req, timeout=WEBSHARE_FETCH_TIMEOUT_SECONDS) as resp: + raw = resp.read(WEBSHARE_MAX_RESPONSE_BYTES).decode("utf-8") + except Exception as e: + logger.warning("Failed to fetch Webshare proxies: %s", e) + return [] + + urls = [] + for line in raw.splitlines(): + line = line.strip() + if not line: + continue + parts = line.split(":") + if len(parts) != 4: + logger.debug("Skipping malformed proxy line: %r", line) + continue + ip, port, username, password = parts + urls.append(f"http://{username}:{password}@{ip}:{port}") + if len(urls) >= max_proxies: + break + + logger.info("Fetched %d proxies from Webshare", len(urls)) return urls -def load_fallback_proxy_urls() -> list[str]: - """Read PROXY_URLS_FALLBACK env var (comma-separated). Returns [] if unset. +def load_proxy_tiers() -> list[list[str]]: + """Assemble proxy tiers in escalation order: free → datacenter → residential. - Used as the residential/reliable fallback tier when the primary tier fails. - Format: http://user:pass@host:port or socks5://host:port + Tier 1 (free): fetched from WEBSHARE_DOWNLOAD_URL if set. + Tier 2 (datacenter): PROXY_URLS_DATACENTER (comma-separated). + Tier 3 (residential): PROXY_URLS_RESIDENTIAL (comma-separated). + + Empty tiers are omitted. Returns [] if no proxies configured anywhere. """ - raw = os.environ.get("PROXY_URLS_FALLBACK", "") - urls = [u.strip() for u in raw.split(",") if u.strip()] - return urls + tiers: list[list[str]] = [] + + webshare_url = os.environ.get("WEBSHARE_DOWNLOAD_URL", "").strip() + if webshare_url: + free_proxies = fetch_webshare_proxies(webshare_url) + if free_proxies: + tiers.append(free_proxies) + + for var in ("PROXY_URLS_DATACENTER", "PROXY_URLS_RESIDENTIAL"): + raw = os.environ.get(var, "") + urls = [u.strip() for u in raw.split(",") if u.strip()] + if urls: + tiers.append(urls) + + return tiers def make_round_robin_cycler(proxy_urls: list[str]): @@ -78,83 +134,96 @@ def make_sticky_selector(proxy_urls: list[str]): return select_proxy -def make_tiered_cycler( - primary_urls: list[str], - fallback_urls: list[str], - threshold: int, -) -> dict: - """Thread-safe tiered proxy cycler with circuit breaker. +def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict: + """Thread-safe N-tier proxy cycler with circuit breaker. - Uses primary_urls until consecutive failures >= threshold, then switches - permanently to fallback_urls for the rest of the run. No auto-recovery — - once the circuit opens it stays open to avoid flapping. + Uses tiers[0] until consecutive failures >= threshold, then escalates + to tiers[1], then tiers[2], etc. Once all tiers are exhausted, + is_exhausted() returns True and next_proxy() returns None. + + Failure counter resets on each escalation — the new tier gets a fresh start. + Once exhausted, further record_failure() calls are no-ops. Returns a dict of callables: - next_proxy() -> str | None — returns URL from the active tier - record_success() — resets consecutive failure counter - record_failure() -> bool — increments counter; True if circuit just opened - is_fallback_active() -> bool — whether fallback tier is currently active + next_proxy() -> str | None — URL from the active tier, or None + record_success() -> None — resets consecutive failure counter + record_failure() -> bool — True if just escalated to next tier + is_exhausted() -> bool — True if all tiers exhausted + active_tier_index() -> int — 0-based index of current tier + tier_count() -> int — total number of tiers - If primary_urls is empty: always returns from fallback_urls (no circuit breaker needed). - If both are empty: next_proxy() always returns None. + Edge cases: + Empty tiers list: next_proxy() always returns None, is_exhausted() True. + Single tier: behaves like the primary-only case, is_exhausted() after threshold. """ assert threshold > 0, f"threshold must be positive, got {threshold}" + assert isinstance(tiers, list), f"tiers must be a list, got {type(tiers)}" lock = threading.Lock() + cycles = [itertools.cycle(t) for t in tiers] state = { + "active_tier": 0, "consecutive_failures": 0, - "fallback_active": False, } - primary_cycle = itertools.cycle(primary_urls) if primary_urls else None - fallback_cycle = itertools.cycle(fallback_urls) if fallback_urls else None - - # No primary proxies — skip circuit breaker, use fallback directly - if not primary_urls: - state["fallback_active"] = True - def next_proxy() -> str | None: with lock: - if state["fallback_active"]: - return next(fallback_cycle) if fallback_cycle else None - return next(primary_cycle) if primary_cycle else None + idx = state["active_tier"] + if idx >= len(cycles): + return None + return next(cycles[idx]) def record_success() -> None: with lock: state["consecutive_failures"] = 0 def record_failure() -> bool: - """Increment failure counter. Returns True if circuit just opened.""" + """Increment failure counter. Returns True if just escalated to next tier.""" with lock: - if state["fallback_active"]: - # Already on fallback — don't trip the circuit again + idx = state["active_tier"] + if idx >= len(tiers): + # Already exhausted — no-op return False state["consecutive_failures"] += 1 - if state["consecutive_failures"] >= threshold: - state["fallback_active"] = True - if fallback_urls: - logger.warning( - "Circuit open after %d consecutive failures — " - "switching to fallback residential proxies", - state["consecutive_failures"], - ) - else: - logger.error( - "Circuit open after %d consecutive failures — " - "no fallback configured, aborting run", - state["consecutive_failures"], - ) - return True - return False + if state["consecutive_failures"] < threshold: + return False + # Threshold reached — escalate + state["consecutive_failures"] = 0 + state["active_tier"] += 1 + new_idx = state["active_tier"] + if new_idx < len(tiers): + logger.warning( + "Circuit open after %d consecutive failures — " + "escalating to proxy tier %d/%d", + threshold, + new_idx + 1, + len(tiers), + ) + else: + logger.error( + "All %d proxy tier(s) exhausted after %d consecutive failures — " + "no more fallbacks", + len(tiers), + threshold, + ) + return True - def is_fallback_active() -> bool: + def is_exhausted() -> bool: with lock: - return state["fallback_active"] + return state["active_tier"] >= len(tiers) + + def active_tier_index() -> int: + with lock: + return state["active_tier"] + + def tier_count() -> int: + return len(tiers) return { "next_proxy": next_proxy, "record_success": record_success, "record_failure": record_failure, - "is_fallback_active": is_fallback_active, + "is_exhausted": is_exhausted, + "active_tier_index": active_tier_index, + "tier_count": tier_count, } - diff --git a/web/tests/test_supervisor.py b/web/tests/test_supervisor.py index 6ab5fea..55ad30e 100644 --- a/web/tests/test_supervisor.py +++ b/web/tests/test_supervisor.py @@ -24,9 +24,11 @@ sup = _ilu.module_from_spec(_spec) _spec.loader.exec_module(sup) from padelnomics_extract.proxy import ( # noqa: E402 - load_proxy_urls, + fetch_webshare_proxies, + load_proxy_tiers, make_round_robin_cycler, make_sticky_selector, + make_tiered_cycler, ) # ── load_workflows ──────────────────────────────────────────────── @@ -198,28 +200,112 @@ class TestTopologicalWaves: # ── proxy.py ───────────────────────────────────────────────────── -class TestLoadProxyUrls: - def test_returns_empty_when_unset(self, monkeypatch): - monkeypatch.delenv("PROXY_URLS", raising=False) - assert load_proxy_urls() == [] +class TestFetchWebshareProxies: + def test_parses_ip_port_user_pass_format(self): + raw = "1.2.3.4:1080:user1:pass1\n5.6.7.8:1080:user2:pass2\n" + with patch("urllib.request.urlopen") as mock_open: + mock_resp = MagicMock() + mock_resp.read.return_value = raw.encode("utf-8") + mock_resp.__enter__ = lambda s: s + mock_resp.__exit__ = MagicMock(return_value=False) + mock_open.return_value = mock_resp + urls = fetch_webshare_proxies("http://example.com/proxy-list") + assert urls == [ + "http://user1:pass1@1.2.3.4:1080", + "http://user2:pass2@5.6.7.8:1080", + ] - def test_parses_comma_separated_urls(self, monkeypatch): - monkeypatch.setenv( - "PROXY_URLS", - "http://p1:8080,http://p2:8080,http://p3:8080", - ) - urls = load_proxy_urls() - assert urls == ["http://p1:8080", "http://p2:8080", "http://p3:8080"] + def test_network_error_returns_empty(self): + import urllib.error + with patch("urllib.request.urlopen", side_effect=urllib.error.URLError("timeout")): + result = fetch_webshare_proxies("http://example.com/proxy-list") + assert result == [] - def test_strips_whitespace(self, monkeypatch): - monkeypatch.setenv("PROXY_URLS", " http://p1:8080 , http://p2:8080 ") - urls = load_proxy_urls() - assert urls == ["http://p1:8080", "http://p2:8080"] + def test_malformed_lines_are_skipped(self): + raw = "bad_line\n1.2.3.4:1080:user:pass\nonly:three:parts\n" + with patch("urllib.request.urlopen") as mock_open: + mock_resp = MagicMock() + mock_resp.read.return_value = raw.encode("utf-8") + mock_resp.__enter__ = lambda s: s + mock_resp.__exit__ = MagicMock(return_value=False) + mock_open.return_value = mock_resp + urls = fetch_webshare_proxies("http://example.com/proxy-list") + assert urls == ["http://user:pass@1.2.3.4:1080"] - def test_ignores_empty_segments(self, monkeypatch): - monkeypatch.setenv("PROXY_URLS", "http://p1:8080,,http://p2:8080,") - urls = load_proxy_urls() - assert urls == ["http://p1:8080", "http://p2:8080"] + def test_max_proxies_respected(self): + lines = "\n".join(f"10.0.0.{i}:1080:u{i}:p{i}" for i in range(10)) + with patch("urllib.request.urlopen") as mock_open: + mock_resp = MagicMock() + mock_resp.read.return_value = lines.encode("utf-8") + mock_resp.__enter__ = lambda s: s + mock_resp.__exit__ = MagicMock(return_value=False) + mock_open.return_value = mock_resp + urls = fetch_webshare_proxies("http://example.com/proxy-list", max_proxies=3) + assert len(urls) == 3 + + def test_empty_lines_skipped(self): + raw = "\n\n1.2.3.4:1080:user:pass\n\n" + with patch("urllib.request.urlopen") as mock_open: + mock_resp = MagicMock() + mock_resp.read.return_value = raw.encode("utf-8") + mock_resp.__enter__ = lambda s: s + mock_resp.__exit__ = MagicMock(return_value=False) + mock_open.return_value = mock_resp + urls = fetch_webshare_proxies("http://example.com/proxy-list") + assert urls == ["http://user:pass@1.2.3.4:1080"] + + +class TestLoadProxyTiers: + def _clear_proxy_env(self, monkeypatch): + for var in ("WEBSHARE_DOWNLOAD_URL", "PROXY_URLS_DATACENTER", "PROXY_URLS_RESIDENTIAL"): + monkeypatch.delenv(var, raising=False) + + def test_returns_empty_when_all_unset(self, monkeypatch): + self._clear_proxy_env(monkeypatch) + assert load_proxy_tiers() == [] + + def test_single_datacenter_tier(self, monkeypatch): + self._clear_proxy_env(monkeypatch) + monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080,http://dc2:8080") + tiers = load_proxy_tiers() + assert len(tiers) == 1 + assert tiers[0] == ["http://dc1:8080", "http://dc2:8080"] + + def test_residential_only(self, monkeypatch): + self._clear_proxy_env(monkeypatch) + monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080") + tiers = load_proxy_tiers() + assert len(tiers) == 1 + assert tiers[0] == ["http://res1:8080"] + + def test_empty_tiers_skipped(self, monkeypatch): + self._clear_proxy_env(monkeypatch) + monkeypatch.setenv("PROXY_URLS_DATACENTER", "") + monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080") + tiers = load_proxy_tiers() + assert len(tiers) == 1 + assert tiers[0] == ["http://res1:8080"] + + def test_three_tiers_correct_order(self, monkeypatch): + self._clear_proxy_env(monkeypatch) + with patch("padelnomics_extract.proxy.fetch_webshare_proxies", return_value=["http://user:pass@1.2.3.4:1080"]): + monkeypatch.setenv("WEBSHARE_DOWNLOAD_URL", "http://example.com/list") + monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080") + monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080") + tiers = load_proxy_tiers() + assert len(tiers) == 3 + assert tiers[0] == ["http://user:pass@1.2.3.4:1080"] # free + assert tiers[1] == ["http://dc1:8080"] # datacenter + assert tiers[2] == ["http://res1:8080"] # residential + + def test_webshare_fetch_failure_skips_tier(self, monkeypatch): + self._clear_proxy_env(monkeypatch) + with patch("padelnomics_extract.proxy.fetch_webshare_proxies", return_value=[]): + monkeypatch.setenv("WEBSHARE_DOWNLOAD_URL", "http://example.com/list") + monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080") + tiers = load_proxy_tiers() + assert len(tiers) == 1 + assert tiers[0] == ["http://dc1:8080"] class TestRoundRobinCycler: @@ -279,3 +365,138 @@ class TestStickySelectorProxy: fn = make_sticky_selector(urls) for i in range(20): assert fn(f"key_{i}") in urls + + +class TestTieredCyclerNTier: + def test_starts_on_first_tier(self): + tiers = [["http://t0a", "http://t0b"], ["http://t1a"]] + cycler = make_tiered_cycler(tiers, threshold=3) + assert cycler["active_tier_index"]() == 0 + assert not cycler["is_exhausted"]() + assert cycler["next_proxy"]() in tiers[0] + + def test_escalates_after_threshold(self): + tiers = [["http://t0"], ["http://t1"]] + cycler = make_tiered_cycler(tiers, threshold=3) + # Two failures — stays on tier 0 + cycler["record_failure"]() + cycler["record_failure"]() + assert cycler["active_tier_index"]() == 0 + # Third failure — escalates + escalated = cycler["record_failure"]() + assert escalated is True + assert cycler["active_tier_index"]() == 1 + assert cycler["next_proxy"]() == "http://t1" + + def test_escalates_through_all_tiers(self): + tiers = [["http://t0"], ["http://t1"], ["http://t2"]] + cycler = make_tiered_cycler(tiers, threshold=2) + # Exhaust tier 0 + cycler["record_failure"]() + cycler["record_failure"]() + assert cycler["active_tier_index"]() == 1 + # Exhaust tier 1 + cycler["record_failure"]() + cycler["record_failure"]() + assert cycler["active_tier_index"]() == 2 + # Exhaust tier 2 + cycler["record_failure"]() + cycler["record_failure"]() + assert cycler["is_exhausted"]() + assert cycler["next_proxy"]() is None + + def test_success_resets_counter(self): + tiers = [["http://t0"], ["http://t1"]] + cycler = make_tiered_cycler(tiers, threshold=3) + cycler["record_failure"]() + cycler["record_failure"]() + cycler["record_success"]() + # Counter reset — need threshold more failures to escalate + cycler["record_failure"]() + cycler["record_failure"]() + assert cycler["active_tier_index"]() == 0 # still on tier 0 + cycler["record_failure"]() + assert cycler["active_tier_index"]() == 1 # now escalated + + def test_counter_resets_on_escalation(self): + """After escalating, failure counter resets so new tier gets a fresh start.""" + tiers = [["http://t0"], ["http://t1"], ["http://t2"]] + cycler = make_tiered_cycler(tiers, threshold=2) + # Exhaust tier 0 + cycler["record_failure"]() + cycler["record_failure"]() + assert cycler["active_tier_index"]() == 1 + # One failure on tier 1 — should NOT escalate yet (counter reset) + cycler["record_failure"]() + assert cycler["active_tier_index"]() == 1 + # Second failure on tier 1 — escalates to tier 2 + cycler["record_failure"]() + assert cycler["active_tier_index"]() == 2 + + def test_is_exhausted_false_when_tiers_remain(self): + tiers = [["http://t0"], ["http://t1"]] + cycler = make_tiered_cycler(tiers, threshold=1) + assert not cycler["is_exhausted"]() + cycler["record_failure"]() # escalates to tier 1 + assert not cycler["is_exhausted"]() + + def test_is_exhausted_true_after_all_tiers_fail(self): + tiers = [["http://t0"]] + cycler = make_tiered_cycler(tiers, threshold=1) + assert not cycler["is_exhausted"]() + cycler["record_failure"]() + assert cycler["is_exhausted"]() + assert cycler["next_proxy"]() is None + + def test_empty_tiers_immediately_exhausted(self): + cycler = make_tiered_cycler([], threshold=3) + assert cycler["is_exhausted"]() + assert cycler["next_proxy"]() is None + assert cycler["tier_count"]() == 0 + + def test_single_tier_cycles_within_tier(self): + tiers = [["http://p1", "http://p2", "http://p3"]] + cycler = make_tiered_cycler(tiers, threshold=10) + results = [cycler["next_proxy"]() for _ in range(6)] + assert results == ["http://p1", "http://p2", "http://p3"] * 2 + + def test_tier_count_reflects_input(self): + assert make_tiered_cycler([], threshold=1)["tier_count"]() == 0 + assert make_tiered_cycler([["a"]], threshold=1)["tier_count"]() == 1 + assert make_tiered_cycler([["a"], ["b"], ["c"]], threshold=1)["tier_count"]() == 3 + + def test_record_failure_noop_when_exhausted(self): + tiers = [["http://t0"]] + cycler = make_tiered_cycler(tiers, threshold=1) + cycler["record_failure"]() # exhausts + assert cycler["is_exhausted"]() + # Further failures are no-ops, not exceptions + result = cycler["record_failure"]() + assert result is False + assert cycler["is_exhausted"]() + + def test_thread_safety(self): + """Concurrent next_proxy and record calls do not raise or corrupt state.""" + import threading + tiers = [["http://t0a", "http://t0b"], ["http://t1a", "http://t1b"]] + cycler = make_tiered_cycler(tiers, threshold=5) + errors = [] + lock = threading.Lock() + + def worker(): + try: + for _ in range(20): + cycler["next_proxy"]() + cycler["record_failure"]() + cycler["record_success"]() + except Exception as e: + with lock: + errors.append(e) + + threads = [threading.Thread(target=worker) for _ in range(8)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert errors == [], f"Thread safety errors: {errors}"