merge: three-tier proxy system with Webshare auto-fetch
This commit is contained in:
@@ -6,6 +6,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- **Three-tier proxy system** for extraction pipeline: free (Webshare auto-fetched) → datacenter (`PROXY_URLS_DATACENTER`) → residential (`PROXY_URLS_RESIDENTIAL`). Webshare free proxies are now auto-fetched from their download API on each run — no more manually copying stale proxy lists.
|
||||||
|
- `proxy.py`: added `fetch_webshare_proxies()` (stdlib urllib, bounded read + timeout), `load_proxy_tiers()` (assembles N tiers from env), generalised `make_tiered_cycler()` to accept `list[list[str]]` with N-level escalation. Exposes `is_exhausted()`, `active_tier_index()`, `tier_count()`.
|
||||||
|
- `playtomic_availability.py`: both `extract()` and `extract_recheck()` now use `load_proxy_tiers()` + N-tier cycler. `_fetch_venues_parallel` `fallback_urls` param removed. `is_fallback_active()` replaced by `is_exhausted()`.
|
||||||
|
- `playtomic_tenants.py`: uses `load_proxy_tiers()` flattened for simple round-robin.
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
- **Env vars renamed** (breaking): `PROXY_URLS` → removed, `PROXY_URLS_FALLBACK` → removed. New vars: `WEBSHARE_DOWNLOAD_URL`, `PROXY_URLS_DATACENTER`, `PROXY_URLS_RESIDENTIAL`.
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- **Phase 2a — NUTS-1 regional income differentiation** (`opportunity_score`): Munich and Berlin no longer share the same income figure as Chemnitz.
|
- **Phase 2a — NUTS-1 regional income differentiation** (`opportunity_score`): Munich and Berlin no longer share the same income figure as Chemnitz.
|
||||||
- `eurostat.py`: added `nama_10r_2hhinc` dataset config (NUTS-2 cube with NUTS-1 entries); filter params now appended to API URL so the server pre-filters the large cube before download (also makes `ilc_di03` requests smaller).
|
- `eurostat.py`: added `nama_10r_2hhinc` dataset config (NUTS-2 cube with NUTS-1 entries); filter params now appended to API URL so the server pre-filters the large cube before download (also makes `ilc_di03` requests smaller).
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ from pathlib import Path
|
|||||||
import niquests
|
import niquests
|
||||||
|
|
||||||
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
|
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
|
||||||
from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler
|
from .proxy import load_proxy_tiers, make_tiered_cycler
|
||||||
from .utils import (
|
from .utils import (
|
||||||
compress_jsonl_atomic,
|
compress_jsonl_atomic,
|
||||||
flush_partial_batch,
|
flush_partial_batch,
|
||||||
@@ -190,14 +190,13 @@ def _fetch_venues_parallel(
|
|||||||
start_max_str: str,
|
start_max_str: str,
|
||||||
worker_count: int,
|
worker_count: int,
|
||||||
cycler: dict,
|
cycler: dict,
|
||||||
fallback_urls: list[str],
|
|
||||||
on_result=None,
|
on_result=None,
|
||||||
) -> tuple[list[dict], int]:
|
) -> tuple[list[dict], int]:
|
||||||
"""Fetch availability for multiple venues in parallel.
|
"""Fetch availability for multiple venues in parallel.
|
||||||
|
|
||||||
Submits futures in batches of PARALLEL_BATCH_SIZE. After each batch
|
Submits futures in batches of PARALLEL_BATCH_SIZE. After each batch
|
||||||
completes, checks the circuit breaker: if it opened and there is no
|
completes, checks the circuit breaker: if all proxy tiers are exhausted,
|
||||||
fallback configured, stops submitting further batches.
|
stops submitting further batches.
|
||||||
|
|
||||||
on_result: optional callable(result: dict) invoked inside the lock for
|
on_result: optional callable(result: dict) invoked inside the lock for
|
||||||
each successful result — used for incremental partial-file flushing.
|
each successful result — used for incremental partial-file flushing.
|
||||||
@@ -215,10 +214,10 @@ def _fetch_venues_parallel(
|
|||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=worker_count) as pool:
|
with ThreadPoolExecutor(max_workers=worker_count) as pool:
|
||||||
for batch_start in range(0, len(tenant_ids), PARALLEL_BATCH_SIZE):
|
for batch_start in range(0, len(tenant_ids), PARALLEL_BATCH_SIZE):
|
||||||
# Stop submitting new work if circuit is open with no fallback
|
# Stop submitting new work if all proxy tiers are exhausted
|
||||||
if cycler["is_fallback_active"]() and not fallback_urls:
|
if cycler["is_exhausted"]():
|
||||||
logger.error(
|
logger.error(
|
||||||
"Circuit open with no fallback — stopping after %d/%d venues",
|
"All proxy tiers exhausted — stopping after %d/%d venues",
|
||||||
completed_count, len(tenant_ids),
|
completed_count, len(tenant_ids),
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
@@ -294,10 +293,9 @@ def extract(
|
|||||||
venues_to_process = [tid for tid in all_venues_to_process if tid not in already_done]
|
venues_to_process = [tid for tid in all_venues_to_process if tid not in already_done]
|
||||||
|
|
||||||
# Set up tiered proxy cycler with circuit breaker
|
# Set up tiered proxy cycler with circuit breaker
|
||||||
proxy_urls = load_proxy_urls()
|
tiers = load_proxy_tiers()
|
||||||
fallback_urls = load_fallback_proxy_urls()
|
worker_count = len(tiers[0]) if tiers else 1
|
||||||
worker_count = len(proxy_urls) if proxy_urls else 1
|
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD)
|
||||||
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
|
|
||||||
|
|
||||||
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
|
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
|
||||||
start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S")
|
start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S")
|
||||||
@@ -325,9 +323,9 @@ def extract(
|
|||||||
venues_errored = 0
|
venues_errored = 0
|
||||||
|
|
||||||
if worker_count > 1:
|
if worker_count > 1:
|
||||||
logger.info("Parallel mode: %d workers, %d proxies", worker_count, len(proxy_urls))
|
logger.info("Parallel mode: %d workers (tier 0), %d tiers total", worker_count, len(tiers))
|
||||||
new_venues_data, venues_errored = _fetch_venues_parallel(
|
new_venues_data, venues_errored = _fetch_venues_parallel(
|
||||||
venues_to_process, start_min_str, start_max_str, worker_count, cycler, fallback_urls,
|
venues_to_process, start_min_str, start_max_str, worker_count, cycler,
|
||||||
on_result=_on_result,
|
on_result=_on_result,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
@@ -342,9 +340,9 @@ def extract(
|
|||||||
_on_result(result)
|
_on_result(result)
|
||||||
else:
|
else:
|
||||||
venues_errored += 1
|
venues_errored += 1
|
||||||
circuit_opened = cycler["record_failure"]()
|
cycler["record_failure"]()
|
||||||
if circuit_opened and not fallback_urls:
|
if cycler["is_exhausted"]():
|
||||||
logger.error("Circuit open with no fallback — writing partial results")
|
logger.error("All proxy tiers exhausted — writing partial results")
|
||||||
break
|
break
|
||||||
|
|
||||||
if (i + 1) % 100 == 0:
|
if (i + 1) % 100 == 0:
|
||||||
@@ -485,14 +483,13 @@ def extract_recheck(
|
|||||||
start_max_str = window_end.strftime("%Y-%m-%dT%H:%M:%S")
|
start_max_str = window_end.strftime("%Y-%m-%dT%H:%M:%S")
|
||||||
|
|
||||||
# Set up tiered proxy cycler with circuit breaker
|
# Set up tiered proxy cycler with circuit breaker
|
||||||
proxy_urls = load_proxy_urls()
|
tiers = load_proxy_tiers()
|
||||||
fallback_urls = load_fallback_proxy_urls()
|
worker_count = len(tiers[0]) if tiers else 1
|
||||||
worker_count = len(proxy_urls) if proxy_urls else 1
|
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD)
|
||||||
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
|
|
||||||
|
|
||||||
if worker_count > 1 and len(venues_to_recheck) > 10:
|
if worker_count > 1 and len(venues_to_recheck) > 10:
|
||||||
venues_data, venues_errored = _fetch_venues_parallel(
|
venues_data, venues_errored = _fetch_venues_parallel(
|
||||||
venues_to_recheck, start_min_str, start_max_str, worker_count, cycler, fallback_urls,
|
venues_to_recheck, start_min_str, start_max_str, worker_count, cycler,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
venues_data = []
|
venues_data = []
|
||||||
@@ -504,9 +501,9 @@ def extract_recheck(
|
|||||||
cycler["record_success"]()
|
cycler["record_success"]()
|
||||||
else:
|
else:
|
||||||
venues_errored += 1
|
venues_errored += 1
|
||||||
circuit_opened = cycler["record_failure"]()
|
cycler["record_failure"]()
|
||||||
if circuit_opened and not fallback_urls:
|
if cycler["is_exhausted"]():
|
||||||
logger.error("Circuit open with no fallback — writing partial recheck results")
|
logger.error("All proxy tiers exhausted — writing partial recheck results")
|
||||||
break
|
break
|
||||||
|
|
||||||
# Write recheck file as JSONL — one venue per line with metadata injected
|
# Write recheck file as JSONL — one venue per line with metadata injected
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ from pathlib import Path
|
|||||||
import niquests
|
import niquests
|
||||||
|
|
||||||
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
|
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
|
||||||
from .proxy import load_proxy_urls, make_round_robin_cycler
|
from .proxy import load_proxy_tiers, make_round_robin_cycler
|
||||||
from .utils import compress_jsonl_atomic, landing_path
|
from .utils import compress_jsonl_atomic, landing_path
|
||||||
|
|
||||||
logger = setup_logging("padelnomics.extract.playtomic_tenants")
|
logger = setup_logging("padelnomics.extract.playtomic_tenants")
|
||||||
@@ -82,12 +82,13 @@ def extract(
|
|||||||
logger.info("Already have tenants for %s/%s — skipping", year, month)
|
logger.info("Already have tenants for %s/%s — skipping", year, month)
|
||||||
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||||
|
|
||||||
proxy_urls = load_proxy_urls()
|
tiers = load_proxy_tiers()
|
||||||
next_proxy = make_round_robin_cycler(proxy_urls) if proxy_urls else None
|
all_proxies = [url for tier in tiers for url in tier]
|
||||||
batch_size = len(proxy_urls) if proxy_urls else 1
|
next_proxy = make_round_robin_cycler(all_proxies) if all_proxies else None
|
||||||
|
batch_size = len(all_proxies) if all_proxies else 1
|
||||||
|
|
||||||
if next_proxy:
|
if next_proxy:
|
||||||
logger.info("Parallel mode: %d pages per batch (%d proxies)", batch_size, len(proxy_urls))
|
logger.info("Parallel mode: %d pages per batch (%d proxies across %d tier(s))", batch_size, len(all_proxies), len(tiers))
|
||||||
else:
|
else:
|
||||||
logger.info("Serial mode: 1 page at a time (no proxies)")
|
logger.info("Serial mode: 1 page at a time (no proxies)")
|
||||||
|
|
||||||
|
|||||||
@@ -1,41 +1,97 @@
|
|||||||
"""Optional proxy rotation for parallel HTTP fetching.
|
"""Optional proxy rotation for parallel HTTP fetching.
|
||||||
|
|
||||||
Proxies are configured via the PROXY_URLS environment variable (comma-separated).
|
Proxies are configured via environment variables. When unset, all functions
|
||||||
When unset, all functions return None/no-op — extractors fall back to direct requests.
|
return None/no-op — extractors fall back to direct requests.
|
||||||
|
|
||||||
Tiered proxy with circuit breaker:
|
Three-tier escalation: free → datacenter → residential.
|
||||||
Primary tier (PROXY_URLS) is used by default — typically cheap datacenter proxies.
|
Tier 1 (free): WEBSHARE_DOWNLOAD_URL — auto-fetched from Webshare API
|
||||||
Fallback tier (PROXY_URLS_FALLBACK) activates once consecutive failures >= threshold.
|
Tier 2 (datacenter): PROXY_URLS_DATACENTER — comma-separated paid DC proxies
|
||||||
Once the circuit opens it stays open for the duration of the run (no auto-recovery).
|
Tier 3 (residential): PROXY_URLS_RESIDENTIAL — comma-separated paid residential proxies
|
||||||
|
|
||||||
|
Tiered circuit breaker:
|
||||||
|
Active tier is used until consecutive failures >= threshold, then escalates
|
||||||
|
to the next tier. Once all tiers are exhausted, is_exhausted() returns True.
|
||||||
|
Escalation is permanent for the duration of the run — no auto-recovery.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import threading
|
import threading
|
||||||
|
import urllib.error
|
||||||
|
import urllib.request
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
MAX_WEBSHARE_PROXIES = 20
|
||||||
|
WEBSHARE_FETCH_TIMEOUT_SECONDS = 10
|
||||||
|
WEBSHARE_MAX_RESPONSE_BYTES = 1024 * 1024 # 1MB
|
||||||
|
|
||||||
def load_proxy_urls() -> list[str]:
|
|
||||||
"""Read PROXY_URLS env var (comma-separated). Returns [] if unset.
|
|
||||||
|
|
||||||
Format: http://user:pass@host:port or socks5://host:port
|
def fetch_webshare_proxies(download_url: str, max_proxies: int = MAX_WEBSHARE_PROXIES) -> list[str]:
|
||||||
|
"""Fetch proxy list from the Webshare download API. Returns [] on any error.
|
||||||
|
|
||||||
|
Expected line format: ip:port:username:password
|
||||||
|
Converts to: http://username:password@ip:port
|
||||||
|
|
||||||
|
Bounded: reads at most WEBSHARE_MAX_RESPONSE_BYTES, returns at most max_proxies.
|
||||||
"""
|
"""
|
||||||
raw = os.environ.get("PROXY_URLS", "")
|
assert max_proxies > 0, f"max_proxies must be positive, got {max_proxies}"
|
||||||
urls = [u.strip() for u in raw.split(",") if u.strip()]
|
assert download_url, "download_url must not be empty"
|
||||||
|
|
||||||
|
try:
|
||||||
|
req = urllib.request.Request(
|
||||||
|
download_url,
|
||||||
|
headers={"User-Agent": "padelnomics-extract/1.0"},
|
||||||
|
)
|
||||||
|
with urllib.request.urlopen(req, timeout=WEBSHARE_FETCH_TIMEOUT_SECONDS) as resp:
|
||||||
|
raw = resp.read(WEBSHARE_MAX_RESPONSE_BYTES).decode("utf-8")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Failed to fetch Webshare proxies: %s", e)
|
||||||
|
return []
|
||||||
|
|
||||||
|
urls = []
|
||||||
|
for line in raw.splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
parts = line.split(":")
|
||||||
|
if len(parts) != 4:
|
||||||
|
logger.debug("Skipping malformed proxy line: %r", line)
|
||||||
|
continue
|
||||||
|
ip, port, username, password = parts
|
||||||
|
urls.append(f"http://{username}:{password}@{ip}:{port}")
|
||||||
|
if len(urls) >= max_proxies:
|
||||||
|
break
|
||||||
|
|
||||||
|
logger.info("Fetched %d proxies from Webshare", len(urls))
|
||||||
return urls
|
return urls
|
||||||
|
|
||||||
|
|
||||||
def load_fallback_proxy_urls() -> list[str]:
|
def load_proxy_tiers() -> list[list[str]]:
|
||||||
"""Read PROXY_URLS_FALLBACK env var (comma-separated). Returns [] if unset.
|
"""Assemble proxy tiers in escalation order: free → datacenter → residential.
|
||||||
|
|
||||||
Used as the residential/reliable fallback tier when the primary tier fails.
|
Tier 1 (free): fetched from WEBSHARE_DOWNLOAD_URL if set.
|
||||||
Format: http://user:pass@host:port or socks5://host:port
|
Tier 2 (datacenter): PROXY_URLS_DATACENTER (comma-separated).
|
||||||
|
Tier 3 (residential): PROXY_URLS_RESIDENTIAL (comma-separated).
|
||||||
|
|
||||||
|
Empty tiers are omitted. Returns [] if no proxies configured anywhere.
|
||||||
"""
|
"""
|
||||||
raw = os.environ.get("PROXY_URLS_FALLBACK", "")
|
tiers: list[list[str]] = []
|
||||||
urls = [u.strip() for u in raw.split(",") if u.strip()]
|
|
||||||
return urls
|
webshare_url = os.environ.get("WEBSHARE_DOWNLOAD_URL", "").strip()
|
||||||
|
if webshare_url:
|
||||||
|
free_proxies = fetch_webshare_proxies(webshare_url)
|
||||||
|
if free_proxies:
|
||||||
|
tiers.append(free_proxies)
|
||||||
|
|
||||||
|
for var in ("PROXY_URLS_DATACENTER", "PROXY_URLS_RESIDENTIAL"):
|
||||||
|
raw = os.environ.get(var, "")
|
||||||
|
urls = [u.strip() for u in raw.split(",") if u.strip()]
|
||||||
|
if urls:
|
||||||
|
tiers.append(urls)
|
||||||
|
|
||||||
|
return tiers
|
||||||
|
|
||||||
|
|
||||||
def make_round_robin_cycler(proxy_urls: list[str]):
|
def make_round_robin_cycler(proxy_urls: list[str]):
|
||||||
@@ -78,83 +134,96 @@ def make_sticky_selector(proxy_urls: list[str]):
|
|||||||
return select_proxy
|
return select_proxy
|
||||||
|
|
||||||
|
|
||||||
def make_tiered_cycler(
|
def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict:
|
||||||
primary_urls: list[str],
|
"""Thread-safe N-tier proxy cycler with circuit breaker.
|
||||||
fallback_urls: list[str],
|
|
||||||
threshold: int,
|
|
||||||
) -> dict:
|
|
||||||
"""Thread-safe tiered proxy cycler with circuit breaker.
|
|
||||||
|
|
||||||
Uses primary_urls until consecutive failures >= threshold, then switches
|
Uses tiers[0] until consecutive failures >= threshold, then escalates
|
||||||
permanently to fallback_urls for the rest of the run. No auto-recovery —
|
to tiers[1], then tiers[2], etc. Once all tiers are exhausted,
|
||||||
once the circuit opens it stays open to avoid flapping.
|
is_exhausted() returns True and next_proxy() returns None.
|
||||||
|
|
||||||
|
Failure counter resets on each escalation — the new tier gets a fresh start.
|
||||||
|
Once exhausted, further record_failure() calls are no-ops.
|
||||||
|
|
||||||
Returns a dict of callables:
|
Returns a dict of callables:
|
||||||
next_proxy() -> str | None — returns URL from the active tier
|
next_proxy() -> str | None — URL from the active tier, or None
|
||||||
record_success() — resets consecutive failure counter
|
record_success() -> None — resets consecutive failure counter
|
||||||
record_failure() -> bool — increments counter; True if circuit just opened
|
record_failure() -> bool — True if just escalated to next tier
|
||||||
is_fallback_active() -> bool — whether fallback tier is currently active
|
is_exhausted() -> bool — True if all tiers exhausted
|
||||||
|
active_tier_index() -> int — 0-based index of current tier
|
||||||
|
tier_count() -> int — total number of tiers
|
||||||
|
|
||||||
If primary_urls is empty: always returns from fallback_urls (no circuit breaker needed).
|
Edge cases:
|
||||||
If both are empty: next_proxy() always returns None.
|
Empty tiers list: next_proxy() always returns None, is_exhausted() True.
|
||||||
|
Single tier: behaves like the primary-only case, is_exhausted() after threshold.
|
||||||
"""
|
"""
|
||||||
assert threshold > 0, f"threshold must be positive, got {threshold}"
|
assert threshold > 0, f"threshold must be positive, got {threshold}"
|
||||||
|
assert isinstance(tiers, list), f"tiers must be a list, got {type(tiers)}"
|
||||||
|
|
||||||
lock = threading.Lock()
|
lock = threading.Lock()
|
||||||
|
cycles = [itertools.cycle(t) for t in tiers]
|
||||||
state = {
|
state = {
|
||||||
|
"active_tier": 0,
|
||||||
"consecutive_failures": 0,
|
"consecutive_failures": 0,
|
||||||
"fallback_active": False,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
primary_cycle = itertools.cycle(primary_urls) if primary_urls else None
|
|
||||||
fallback_cycle = itertools.cycle(fallback_urls) if fallback_urls else None
|
|
||||||
|
|
||||||
# No primary proxies — skip circuit breaker, use fallback directly
|
|
||||||
if not primary_urls:
|
|
||||||
state["fallback_active"] = True
|
|
||||||
|
|
||||||
def next_proxy() -> str | None:
|
def next_proxy() -> str | None:
|
||||||
with lock:
|
with lock:
|
||||||
if state["fallback_active"]:
|
idx = state["active_tier"]
|
||||||
return next(fallback_cycle) if fallback_cycle else None
|
if idx >= len(cycles):
|
||||||
return next(primary_cycle) if primary_cycle else None
|
return None
|
||||||
|
return next(cycles[idx])
|
||||||
|
|
||||||
def record_success() -> None:
|
def record_success() -> None:
|
||||||
with lock:
|
with lock:
|
||||||
state["consecutive_failures"] = 0
|
state["consecutive_failures"] = 0
|
||||||
|
|
||||||
def record_failure() -> bool:
|
def record_failure() -> bool:
|
||||||
"""Increment failure counter. Returns True if circuit just opened."""
|
"""Increment failure counter. Returns True if just escalated to next tier."""
|
||||||
with lock:
|
with lock:
|
||||||
if state["fallback_active"]:
|
idx = state["active_tier"]
|
||||||
# Already on fallback — don't trip the circuit again
|
if idx >= len(tiers):
|
||||||
|
# Already exhausted — no-op
|
||||||
return False
|
return False
|
||||||
state["consecutive_failures"] += 1
|
state["consecutive_failures"] += 1
|
||||||
if state["consecutive_failures"] >= threshold:
|
if state["consecutive_failures"] < threshold:
|
||||||
state["fallback_active"] = True
|
return False
|
||||||
if fallback_urls:
|
# Threshold reached — escalate
|
||||||
logger.warning(
|
state["consecutive_failures"] = 0
|
||||||
"Circuit open after %d consecutive failures — "
|
state["active_tier"] += 1
|
||||||
"switching to fallback residential proxies",
|
new_idx = state["active_tier"]
|
||||||
state["consecutive_failures"],
|
if new_idx < len(tiers):
|
||||||
)
|
logger.warning(
|
||||||
else:
|
"Circuit open after %d consecutive failures — "
|
||||||
logger.error(
|
"escalating to proxy tier %d/%d",
|
||||||
"Circuit open after %d consecutive failures — "
|
threshold,
|
||||||
"no fallback configured, aborting run",
|
new_idx + 1,
|
||||||
state["consecutive_failures"],
|
len(tiers),
|
||||||
)
|
)
|
||||||
return True
|
else:
|
||||||
return False
|
logger.error(
|
||||||
|
"All %d proxy tier(s) exhausted after %d consecutive failures — "
|
||||||
|
"no more fallbacks",
|
||||||
|
len(tiers),
|
||||||
|
threshold,
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
def is_fallback_active() -> bool:
|
def is_exhausted() -> bool:
|
||||||
with lock:
|
with lock:
|
||||||
return state["fallback_active"]
|
return state["active_tier"] >= len(tiers)
|
||||||
|
|
||||||
|
def active_tier_index() -> int:
|
||||||
|
with lock:
|
||||||
|
return state["active_tier"]
|
||||||
|
|
||||||
|
def tier_count() -> int:
|
||||||
|
return len(tiers)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"next_proxy": next_proxy,
|
"next_proxy": next_proxy,
|
||||||
"record_success": record_success,
|
"record_success": record_success,
|
||||||
"record_failure": record_failure,
|
"record_failure": record_failure,
|
||||||
"is_fallback_active": is_fallback_active,
|
"is_exhausted": is_exhausted,
|
||||||
|
"active_tier_index": active_tier_index,
|
||||||
|
"tier_count": tier_count,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -24,9 +24,11 @@ sup = _ilu.module_from_spec(_spec)
|
|||||||
_spec.loader.exec_module(sup)
|
_spec.loader.exec_module(sup)
|
||||||
|
|
||||||
from padelnomics_extract.proxy import ( # noqa: E402
|
from padelnomics_extract.proxy import ( # noqa: E402
|
||||||
load_proxy_urls,
|
fetch_webshare_proxies,
|
||||||
|
load_proxy_tiers,
|
||||||
make_round_robin_cycler,
|
make_round_robin_cycler,
|
||||||
make_sticky_selector,
|
make_sticky_selector,
|
||||||
|
make_tiered_cycler,
|
||||||
)
|
)
|
||||||
|
|
||||||
# ── load_workflows ────────────────────────────────────────────────
|
# ── load_workflows ────────────────────────────────────────────────
|
||||||
@@ -198,28 +200,112 @@ class TestTopologicalWaves:
|
|||||||
# ── proxy.py ─────────────────────────────────────────────────────
|
# ── proxy.py ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
class TestLoadProxyUrls:
|
class TestFetchWebshareProxies:
|
||||||
def test_returns_empty_when_unset(self, monkeypatch):
|
def test_parses_ip_port_user_pass_format(self):
|
||||||
monkeypatch.delenv("PROXY_URLS", raising=False)
|
raw = "1.2.3.4:1080:user1:pass1\n5.6.7.8:1080:user2:pass2\n"
|
||||||
assert load_proxy_urls() == []
|
with patch("urllib.request.urlopen") as mock_open:
|
||||||
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.read.return_value = raw.encode("utf-8")
|
||||||
|
mock_resp.__enter__ = lambda s: s
|
||||||
|
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||||
|
mock_open.return_value = mock_resp
|
||||||
|
urls = fetch_webshare_proxies("http://example.com/proxy-list")
|
||||||
|
assert urls == [
|
||||||
|
"http://user1:pass1@1.2.3.4:1080",
|
||||||
|
"http://user2:pass2@5.6.7.8:1080",
|
||||||
|
]
|
||||||
|
|
||||||
def test_parses_comma_separated_urls(self, monkeypatch):
|
def test_network_error_returns_empty(self):
|
||||||
monkeypatch.setenv(
|
import urllib.error
|
||||||
"PROXY_URLS",
|
with patch("urllib.request.urlopen", side_effect=urllib.error.URLError("timeout")):
|
||||||
"http://p1:8080,http://p2:8080,http://p3:8080",
|
result = fetch_webshare_proxies("http://example.com/proxy-list")
|
||||||
)
|
assert result == []
|
||||||
urls = load_proxy_urls()
|
|
||||||
assert urls == ["http://p1:8080", "http://p2:8080", "http://p3:8080"]
|
|
||||||
|
|
||||||
def test_strips_whitespace(self, monkeypatch):
|
def test_malformed_lines_are_skipped(self):
|
||||||
monkeypatch.setenv("PROXY_URLS", " http://p1:8080 , http://p2:8080 ")
|
raw = "bad_line\n1.2.3.4:1080:user:pass\nonly:three:parts\n"
|
||||||
urls = load_proxy_urls()
|
with patch("urllib.request.urlopen") as mock_open:
|
||||||
assert urls == ["http://p1:8080", "http://p2:8080"]
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.read.return_value = raw.encode("utf-8")
|
||||||
|
mock_resp.__enter__ = lambda s: s
|
||||||
|
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||||
|
mock_open.return_value = mock_resp
|
||||||
|
urls = fetch_webshare_proxies("http://example.com/proxy-list")
|
||||||
|
assert urls == ["http://user:pass@1.2.3.4:1080"]
|
||||||
|
|
||||||
def test_ignores_empty_segments(self, monkeypatch):
|
def test_max_proxies_respected(self):
|
||||||
monkeypatch.setenv("PROXY_URLS", "http://p1:8080,,http://p2:8080,")
|
lines = "\n".join(f"10.0.0.{i}:1080:u{i}:p{i}" for i in range(10))
|
||||||
urls = load_proxy_urls()
|
with patch("urllib.request.urlopen") as mock_open:
|
||||||
assert urls == ["http://p1:8080", "http://p2:8080"]
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.read.return_value = lines.encode("utf-8")
|
||||||
|
mock_resp.__enter__ = lambda s: s
|
||||||
|
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||||
|
mock_open.return_value = mock_resp
|
||||||
|
urls = fetch_webshare_proxies("http://example.com/proxy-list", max_proxies=3)
|
||||||
|
assert len(urls) == 3
|
||||||
|
|
||||||
|
def test_empty_lines_skipped(self):
|
||||||
|
raw = "\n\n1.2.3.4:1080:user:pass\n\n"
|
||||||
|
with patch("urllib.request.urlopen") as mock_open:
|
||||||
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.read.return_value = raw.encode("utf-8")
|
||||||
|
mock_resp.__enter__ = lambda s: s
|
||||||
|
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||||
|
mock_open.return_value = mock_resp
|
||||||
|
urls = fetch_webshare_proxies("http://example.com/proxy-list")
|
||||||
|
assert urls == ["http://user:pass@1.2.3.4:1080"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestLoadProxyTiers:
|
||||||
|
def _clear_proxy_env(self, monkeypatch):
|
||||||
|
for var in ("WEBSHARE_DOWNLOAD_URL", "PROXY_URLS_DATACENTER", "PROXY_URLS_RESIDENTIAL"):
|
||||||
|
monkeypatch.delenv(var, raising=False)
|
||||||
|
|
||||||
|
def test_returns_empty_when_all_unset(self, monkeypatch):
|
||||||
|
self._clear_proxy_env(monkeypatch)
|
||||||
|
assert load_proxy_tiers() == []
|
||||||
|
|
||||||
|
def test_single_datacenter_tier(self, monkeypatch):
|
||||||
|
self._clear_proxy_env(monkeypatch)
|
||||||
|
monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080,http://dc2:8080")
|
||||||
|
tiers = load_proxy_tiers()
|
||||||
|
assert len(tiers) == 1
|
||||||
|
assert tiers[0] == ["http://dc1:8080", "http://dc2:8080"]
|
||||||
|
|
||||||
|
def test_residential_only(self, monkeypatch):
|
||||||
|
self._clear_proxy_env(monkeypatch)
|
||||||
|
monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080")
|
||||||
|
tiers = load_proxy_tiers()
|
||||||
|
assert len(tiers) == 1
|
||||||
|
assert tiers[0] == ["http://res1:8080"]
|
||||||
|
|
||||||
|
def test_empty_tiers_skipped(self, monkeypatch):
|
||||||
|
self._clear_proxy_env(monkeypatch)
|
||||||
|
monkeypatch.setenv("PROXY_URLS_DATACENTER", "")
|
||||||
|
monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080")
|
||||||
|
tiers = load_proxy_tiers()
|
||||||
|
assert len(tiers) == 1
|
||||||
|
assert tiers[0] == ["http://res1:8080"]
|
||||||
|
|
||||||
|
def test_three_tiers_correct_order(self, monkeypatch):
|
||||||
|
self._clear_proxy_env(monkeypatch)
|
||||||
|
with patch("padelnomics_extract.proxy.fetch_webshare_proxies", return_value=["http://user:pass@1.2.3.4:1080"]):
|
||||||
|
monkeypatch.setenv("WEBSHARE_DOWNLOAD_URL", "http://example.com/list")
|
||||||
|
monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080")
|
||||||
|
monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080")
|
||||||
|
tiers = load_proxy_tiers()
|
||||||
|
assert len(tiers) == 3
|
||||||
|
assert tiers[0] == ["http://user:pass@1.2.3.4:1080"] # free
|
||||||
|
assert tiers[1] == ["http://dc1:8080"] # datacenter
|
||||||
|
assert tiers[2] == ["http://res1:8080"] # residential
|
||||||
|
|
||||||
|
def test_webshare_fetch_failure_skips_tier(self, monkeypatch):
|
||||||
|
self._clear_proxy_env(monkeypatch)
|
||||||
|
with patch("padelnomics_extract.proxy.fetch_webshare_proxies", return_value=[]):
|
||||||
|
monkeypatch.setenv("WEBSHARE_DOWNLOAD_URL", "http://example.com/list")
|
||||||
|
monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080")
|
||||||
|
tiers = load_proxy_tiers()
|
||||||
|
assert len(tiers) == 1
|
||||||
|
assert tiers[0] == ["http://dc1:8080"]
|
||||||
|
|
||||||
|
|
||||||
class TestRoundRobinCycler:
|
class TestRoundRobinCycler:
|
||||||
@@ -279,3 +365,138 @@ class TestStickySelectorProxy:
|
|||||||
fn = make_sticky_selector(urls)
|
fn = make_sticky_selector(urls)
|
||||||
for i in range(20):
|
for i in range(20):
|
||||||
assert fn(f"key_{i}") in urls
|
assert fn(f"key_{i}") in urls
|
||||||
|
|
||||||
|
|
||||||
|
class TestTieredCyclerNTier:
|
||||||
|
def test_starts_on_first_tier(self):
|
||||||
|
tiers = [["http://t0a", "http://t0b"], ["http://t1a"]]
|
||||||
|
cycler = make_tiered_cycler(tiers, threshold=3)
|
||||||
|
assert cycler["active_tier_index"]() == 0
|
||||||
|
assert not cycler["is_exhausted"]()
|
||||||
|
assert cycler["next_proxy"]() in tiers[0]
|
||||||
|
|
||||||
|
def test_escalates_after_threshold(self):
|
||||||
|
tiers = [["http://t0"], ["http://t1"]]
|
||||||
|
cycler = make_tiered_cycler(tiers, threshold=3)
|
||||||
|
# Two failures — stays on tier 0
|
||||||
|
cycler["record_failure"]()
|
||||||
|
cycler["record_failure"]()
|
||||||
|
assert cycler["active_tier_index"]() == 0
|
||||||
|
# Third failure — escalates
|
||||||
|
escalated = cycler["record_failure"]()
|
||||||
|
assert escalated is True
|
||||||
|
assert cycler["active_tier_index"]() == 1
|
||||||
|
assert cycler["next_proxy"]() == "http://t1"
|
||||||
|
|
||||||
|
def test_escalates_through_all_tiers(self):
|
||||||
|
tiers = [["http://t0"], ["http://t1"], ["http://t2"]]
|
||||||
|
cycler = make_tiered_cycler(tiers, threshold=2)
|
||||||
|
# Exhaust tier 0
|
||||||
|
cycler["record_failure"]()
|
||||||
|
cycler["record_failure"]()
|
||||||
|
assert cycler["active_tier_index"]() == 1
|
||||||
|
# Exhaust tier 1
|
||||||
|
cycler["record_failure"]()
|
||||||
|
cycler["record_failure"]()
|
||||||
|
assert cycler["active_tier_index"]() == 2
|
||||||
|
# Exhaust tier 2
|
||||||
|
cycler["record_failure"]()
|
||||||
|
cycler["record_failure"]()
|
||||||
|
assert cycler["is_exhausted"]()
|
||||||
|
assert cycler["next_proxy"]() is None
|
||||||
|
|
||||||
|
def test_success_resets_counter(self):
|
||||||
|
tiers = [["http://t0"], ["http://t1"]]
|
||||||
|
cycler = make_tiered_cycler(tiers, threshold=3)
|
||||||
|
cycler["record_failure"]()
|
||||||
|
cycler["record_failure"]()
|
||||||
|
cycler["record_success"]()
|
||||||
|
# Counter reset — need threshold more failures to escalate
|
||||||
|
cycler["record_failure"]()
|
||||||
|
cycler["record_failure"]()
|
||||||
|
assert cycler["active_tier_index"]() == 0 # still on tier 0
|
||||||
|
cycler["record_failure"]()
|
||||||
|
assert cycler["active_tier_index"]() == 1 # now escalated
|
||||||
|
|
||||||
|
def test_counter_resets_on_escalation(self):
|
||||||
|
"""After escalating, failure counter resets so new tier gets a fresh start."""
|
||||||
|
tiers = [["http://t0"], ["http://t1"], ["http://t2"]]
|
||||||
|
cycler = make_tiered_cycler(tiers, threshold=2)
|
||||||
|
# Exhaust tier 0
|
||||||
|
cycler["record_failure"]()
|
||||||
|
cycler["record_failure"]()
|
||||||
|
assert cycler["active_tier_index"]() == 1
|
||||||
|
# One failure on tier 1 — should NOT escalate yet (counter reset)
|
||||||
|
cycler["record_failure"]()
|
||||||
|
assert cycler["active_tier_index"]() == 1
|
||||||
|
# Second failure on tier 1 — escalates to tier 2
|
||||||
|
cycler["record_failure"]()
|
||||||
|
assert cycler["active_tier_index"]() == 2
|
||||||
|
|
||||||
|
def test_is_exhausted_false_when_tiers_remain(self):
|
||||||
|
tiers = [["http://t0"], ["http://t1"]]
|
||||||
|
cycler = make_tiered_cycler(tiers, threshold=1)
|
||||||
|
assert not cycler["is_exhausted"]()
|
||||||
|
cycler["record_failure"]() # escalates to tier 1
|
||||||
|
assert not cycler["is_exhausted"]()
|
||||||
|
|
||||||
|
def test_is_exhausted_true_after_all_tiers_fail(self):
|
||||||
|
tiers = [["http://t0"]]
|
||||||
|
cycler = make_tiered_cycler(tiers, threshold=1)
|
||||||
|
assert not cycler["is_exhausted"]()
|
||||||
|
cycler["record_failure"]()
|
||||||
|
assert cycler["is_exhausted"]()
|
||||||
|
assert cycler["next_proxy"]() is None
|
||||||
|
|
||||||
|
def test_empty_tiers_immediately_exhausted(self):
|
||||||
|
cycler = make_tiered_cycler([], threshold=3)
|
||||||
|
assert cycler["is_exhausted"]()
|
||||||
|
assert cycler["next_proxy"]() is None
|
||||||
|
assert cycler["tier_count"]() == 0
|
||||||
|
|
||||||
|
def test_single_tier_cycles_within_tier(self):
|
||||||
|
tiers = [["http://p1", "http://p2", "http://p3"]]
|
||||||
|
cycler = make_tiered_cycler(tiers, threshold=10)
|
||||||
|
results = [cycler["next_proxy"]() for _ in range(6)]
|
||||||
|
assert results == ["http://p1", "http://p2", "http://p3"] * 2
|
||||||
|
|
||||||
|
def test_tier_count_reflects_input(self):
|
||||||
|
assert make_tiered_cycler([], threshold=1)["tier_count"]() == 0
|
||||||
|
assert make_tiered_cycler([["a"]], threshold=1)["tier_count"]() == 1
|
||||||
|
assert make_tiered_cycler([["a"], ["b"], ["c"]], threshold=1)["tier_count"]() == 3
|
||||||
|
|
||||||
|
def test_record_failure_noop_when_exhausted(self):
|
||||||
|
tiers = [["http://t0"]]
|
||||||
|
cycler = make_tiered_cycler(tiers, threshold=1)
|
||||||
|
cycler["record_failure"]() # exhausts
|
||||||
|
assert cycler["is_exhausted"]()
|
||||||
|
# Further failures are no-ops, not exceptions
|
||||||
|
result = cycler["record_failure"]()
|
||||||
|
assert result is False
|
||||||
|
assert cycler["is_exhausted"]()
|
||||||
|
|
||||||
|
def test_thread_safety(self):
|
||||||
|
"""Concurrent next_proxy and record calls do not raise or corrupt state."""
|
||||||
|
import threading
|
||||||
|
tiers = [["http://t0a", "http://t0b"], ["http://t1a", "http://t1b"]]
|
||||||
|
cycler = make_tiered_cycler(tiers, threshold=5)
|
||||||
|
errors = []
|
||||||
|
lock = threading.Lock()
|
||||||
|
|
||||||
|
def worker():
|
||||||
|
try:
|
||||||
|
for _ in range(20):
|
||||||
|
cycler["next_proxy"]()
|
||||||
|
cycler["record_failure"]()
|
||||||
|
cycler["record_success"]()
|
||||||
|
except Exception as e:
|
||||||
|
with lock:
|
||||||
|
errors.append(e)
|
||||||
|
|
||||||
|
threads = [threading.Thread(target=worker) for _ in range(8)]
|
||||||
|
for t in threads:
|
||||||
|
t.start()
|
||||||
|
for t in threads:
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
assert errors == [], f"Thread safety errors: {errors}"
|
||||||
|
|||||||
Reference in New Issue
Block a user