feat(extract): add PROXY_CONCURRENCY override for rotating single-URL proxies

When DC/residential tiers have a single rotating endpoint, worker_count
defaulted to 1 (one URL = one worker). PROXY_CONCURRENCY lets you set
an explicit thread count (e.g. 100) for providers that handle concurrent
connections on a single URL.

Capped at MAX_PROXY_CONCURRENCY=200 to avoid overloading the endpoint.
Falls back to len(tiers[0]) when unset (existing behaviour).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-28 17:06:53 +01:00
parent 9b0bfc478d
commit 1af65bb46f

View File

@@ -52,6 +52,10 @@ MAX_VENUES_PER_RUN = 20_000
MAX_RETRIES_PER_VENUE = 2 MAX_RETRIES_PER_VENUE = 2
RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "30")) RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "30"))
CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD") or "10") CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD") or "10")
# Override worker count — useful when tier 0 is a single rotating endpoint (DC/residential)
# that supports many concurrent connections. Defaults to len(tiers[0]) when unset.
_PROXY_CONCURRENCY = os.environ.get("PROXY_CONCURRENCY", "").strip()
MAX_PROXY_CONCURRENCY = 200
# Parallel mode submits futures in batches so the circuit breaker can stop # Parallel mode submits futures in batches so the circuit breaker can stop
# new submissions after it opens. Already-inflight futures in the current # new submissions after it opens. Already-inflight futures in the current
@@ -294,7 +298,8 @@ def extract(
# Set up tiered proxy cycler with circuit breaker # Set up tiered proxy cycler with circuit breaker
tiers = load_proxy_tiers() tiers = load_proxy_tiers()
worker_count = len(tiers[0]) if tiers else 1 default_workers = len(tiers[0]) if tiers else 1
worker_count = min(int(_PROXY_CONCURRENCY), MAX_PROXY_CONCURRENCY) if _PROXY_CONCURRENCY else default_workers
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD) cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD)
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S") start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
@@ -323,7 +328,7 @@ def extract(
venues_errored = 0 venues_errored = 0
if worker_count > 1: if worker_count > 1:
logger.info("Parallel mode: %d workers (tier 0), %d tiers total", worker_count, len(tiers)) logger.info("Parallel mode: %d workers, %d tier(s)", worker_count, len(tiers))
new_venues_data, venues_errored = _fetch_venues_parallel( new_venues_data, venues_errored = _fetch_venues_parallel(
venues_to_process, start_min_str, start_max_str, worker_count, cycler, venues_to_process, start_min_str, start_max_str, worker_count, cycler,
on_result=_on_result, on_result=_on_result,
@@ -484,7 +489,8 @@ def extract_recheck(
# Set up tiered proxy cycler with circuit breaker # Set up tiered proxy cycler with circuit breaker
tiers = load_proxy_tiers() tiers = load_proxy_tiers()
worker_count = len(tiers[0]) if tiers else 1 default_workers = len(tiers[0]) if tiers else 1
worker_count = min(int(_PROXY_CONCURRENCY), MAX_PROXY_CONCURRENCY) if _PROXY_CONCURRENCY else default_workers
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD) cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD)
if worker_count > 1 and len(venues_to_recheck) > 10: if worker_count > 1 and len(venues_to_recheck) > 10: