From 1af65bb46f16a799bc989e0e0a02ad5398abb7c6 Mon Sep 17 00:00:00 2001 From: Deeman Date: Sat, 28 Feb 2026 17:06:53 +0100 Subject: [PATCH] feat(extract): add PROXY_CONCURRENCY override for rotating single-URL proxies When DC/residential tiers have a single rotating endpoint, worker_count defaulted to 1 (one URL = one worker). PROXY_CONCURRENCY lets you set an explicit thread count (e.g. 100) for providers that handle concurrent connections on a single URL. Capped at MAX_PROXY_CONCURRENCY=200 to avoid overloading the endpoint. Falls back to len(tiers[0]) when unset (existing behaviour). Co-Authored-By: Claude Sonnet 4.6 --- .../padelnomics_extract/playtomic_availability.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py index d708a61..575a98f 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py @@ -52,6 +52,10 @@ MAX_VENUES_PER_RUN = 20_000 MAX_RETRIES_PER_VENUE = 2 RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "30")) CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD") or "10") +# Override worker count — useful when tier 0 is a single rotating endpoint (DC/residential) +# that supports many concurrent connections. Defaults to len(tiers[0]) when unset. +_PROXY_CONCURRENCY = os.environ.get("PROXY_CONCURRENCY", "").strip() +MAX_PROXY_CONCURRENCY = 200 # Parallel mode submits futures in batches so the circuit breaker can stop # new submissions after it opens. Already-inflight futures in the current @@ -294,7 +298,8 @@ def extract( # Set up tiered proxy cycler with circuit breaker tiers = load_proxy_tiers() - worker_count = len(tiers[0]) if tiers else 1 + default_workers = len(tiers[0]) if tiers else 1 + worker_count = min(int(_PROXY_CONCURRENCY), MAX_PROXY_CONCURRENCY) if _PROXY_CONCURRENCY else default_workers cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD) start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S") @@ -323,7 +328,7 @@ def extract( venues_errored = 0 if worker_count > 1: - logger.info("Parallel mode: %d workers (tier 0), %d tiers total", worker_count, len(tiers)) + logger.info("Parallel mode: %d workers, %d tier(s)", worker_count, len(tiers)) new_venues_data, venues_errored = _fetch_venues_parallel( venues_to_process, start_min_str, start_max_str, worker_count, cycler, on_result=_on_result, @@ -484,7 +489,8 @@ def extract_recheck( # Set up tiered proxy cycler with circuit breaker tiers = load_proxy_tiers() - worker_count = len(tiers[0]) if tiers else 1 + default_workers = len(tiers[0]) if tiers else 1 + worker_count = min(int(_PROXY_CONCURRENCY), MAX_PROXY_CONCURRENCY) if _PROXY_CONCURRENCY else default_workers cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD) if worker_count > 1 and len(venues_to_recheck) > 10: