Merge branch 'master' into worktree-dual-market-score
# Conflicts: # .env.dev.sops
This commit is contained in:
@@ -28,7 +28,7 @@ from pathlib import Path
|
||||
import niquests
|
||||
|
||||
from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging
|
||||
from .proxy import load_proxy_urls, make_round_robin_cycler
|
||||
from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler
|
||||
from .utils import get_last_cursor, landing_path, write_gzip_atomic
|
||||
|
||||
logger = setup_logging("padelnomics.extract.playtomic_availability")
|
||||
@@ -42,6 +42,12 @@ MAX_VENUES_PER_RUN = 20_000
|
||||
MAX_RETRIES_PER_VENUE = 2
|
||||
MAX_WORKERS = int(os.environ.get("EXTRACT_WORKERS", "1"))
|
||||
RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "90"))
|
||||
CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD", "10"))
|
||||
|
||||
# Parallel mode submits futures in batches so the circuit breaker can stop
|
||||
# new submissions after it opens. Already-inflight futures in the current
|
||||
# batch still complete.
|
||||
PARALLEL_BATCH_SIZE = 100
|
||||
|
||||
# Thread-local storage for per-worker sessions
|
||||
_thread_local = threading.local()
|
||||
@@ -169,10 +175,15 @@ def _fetch_venues_parallel(
|
||||
start_min_str: str,
|
||||
start_max_str: str,
|
||||
worker_count: int,
|
||||
proxy_cycler,
|
||||
cycler: dict,
|
||||
fallback_urls: list[str],
|
||||
) -> tuple[list[dict], int]:
|
||||
"""Fetch availability for multiple venues in parallel.
|
||||
|
||||
Submits futures in batches of PARALLEL_BATCH_SIZE. After each batch
|
||||
completes, checks the circuit breaker: if it opened and there is no
|
||||
fallback configured, stops submitting further batches.
|
||||
|
||||
Returns (venues_data, venues_errored).
|
||||
"""
|
||||
venues_data: list[dict] = []
|
||||
@@ -181,26 +192,38 @@ def _fetch_venues_parallel(
|
||||
lock = threading.Lock()
|
||||
|
||||
def _worker(tenant_id: str) -> dict | None:
|
||||
proxy_url = proxy_cycler()
|
||||
proxy_url = cycler["next_proxy"]()
|
||||
return _fetch_venue_availability(tenant_id, start_min_str, start_max_str, proxy_url)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=worker_count) as pool:
|
||||
futures = {pool.submit(_worker, tid): tid for tid in tenant_ids}
|
||||
for batch_start in range(0, len(tenant_ids), PARALLEL_BATCH_SIZE):
|
||||
# Stop submitting new work if circuit is open with no fallback
|
||||
if cycler["is_fallback_active"]() and not fallback_urls:
|
||||
logger.error(
|
||||
"Circuit open with no fallback — stopping after %d/%d venues",
|
||||
completed_count, len(tenant_ids),
|
||||
)
|
||||
break
|
||||
|
||||
for future in as_completed(futures):
|
||||
result = future.result()
|
||||
with lock:
|
||||
completed_count += 1
|
||||
if result is not None:
|
||||
venues_data.append(result)
|
||||
else:
|
||||
venues_errored += 1
|
||||
batch = tenant_ids[batch_start:batch_start + PARALLEL_BATCH_SIZE]
|
||||
batch_futures = {pool.submit(_worker, tid): tid for tid in batch}
|
||||
|
||||
if completed_count % 500 == 0:
|
||||
logger.info(
|
||||
"Progress: %d/%d venues (%d errors, %d workers)",
|
||||
completed_count, len(tenant_ids), venues_errored, worker_count,
|
||||
)
|
||||
for future in as_completed(batch_futures):
|
||||
result = future.result()
|
||||
with lock:
|
||||
completed_count += 1
|
||||
if result is not None:
|
||||
venues_data.append(result)
|
||||
cycler["record_success"]()
|
||||
else:
|
||||
venues_errored += 1
|
||||
cycler["record_failure"]()
|
||||
|
||||
if completed_count % 500 == 0:
|
||||
logger.info(
|
||||
"Progress: %d/%d venues (%d errors, %d workers)",
|
||||
completed_count, len(tenant_ids), venues_errored, worker_count,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Parallel fetch complete: %d/%d venues (%d errors, %d workers)",
|
||||
@@ -249,10 +272,11 @@ def extract(
|
||||
if resume_index > 0:
|
||||
venues_to_process = venues_to_process[resume_index:]
|
||||
|
||||
# Determine parallelism
|
||||
# Set up tiered proxy cycler with circuit breaker
|
||||
proxy_urls = load_proxy_urls()
|
||||
fallback_urls = load_fallback_proxy_urls()
|
||||
worker_count = min(MAX_WORKERS, len(proxy_urls)) if proxy_urls else 1
|
||||
proxy_cycler = make_round_robin_cycler(proxy_urls)
|
||||
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
|
||||
|
||||
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
@@ -260,21 +284,25 @@ def extract(
|
||||
if worker_count > 1:
|
||||
logger.info("Parallel mode: %d workers, %d proxies", worker_count, len(proxy_urls))
|
||||
venues_data, venues_errored = _fetch_venues_parallel(
|
||||
venues_to_process, start_min_str, start_max_str, worker_count, proxy_cycler,
|
||||
venues_to_process, start_min_str, start_max_str, worker_count, cycler, fallback_urls,
|
||||
)
|
||||
else:
|
||||
# Serial mode — same as before but uses shared fetch function
|
||||
logger.info("Serial mode: 1 worker, %d venues", len(venues_to_process))
|
||||
venues_data = []
|
||||
venues_errored = 0
|
||||
for i, tenant_id in enumerate(venues_to_process):
|
||||
result = _fetch_venue_availability(
|
||||
tenant_id, start_min_str, start_max_str, proxy_cycler(),
|
||||
tenant_id, start_min_str, start_max_str, cycler["next_proxy"](),
|
||||
)
|
||||
if result is not None:
|
||||
venues_data.append(result)
|
||||
cycler["record_success"]()
|
||||
else:
|
||||
venues_errored += 1
|
||||
circuit_opened = cycler["record_failure"]()
|
||||
if circuit_opened and not fallback_urls:
|
||||
logger.error("Circuit open with no fallback — writing partial results")
|
||||
break
|
||||
|
||||
if (i + 1) % 100 == 0:
|
||||
logger.info(
|
||||
@@ -390,24 +418,30 @@ def extract_recheck(
|
||||
start_min_str = window_start.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
start_max_str = window_end.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
|
||||
# Determine parallelism
|
||||
# Set up tiered proxy cycler with circuit breaker
|
||||
proxy_urls = load_proxy_urls()
|
||||
fallback_urls = load_fallback_proxy_urls()
|
||||
worker_count = min(MAX_WORKERS, len(proxy_urls)) if proxy_urls else 1
|
||||
proxy_cycler = make_round_robin_cycler(proxy_urls)
|
||||
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
|
||||
|
||||
if worker_count > 1 and len(venues_to_recheck) > 10:
|
||||
venues_data, venues_errored = _fetch_venues_parallel(
|
||||
venues_to_recheck, start_min_str, start_max_str, worker_count, proxy_cycler,
|
||||
venues_to_recheck, start_min_str, start_max_str, worker_count, cycler, fallback_urls,
|
||||
)
|
||||
else:
|
||||
venues_data = []
|
||||
venues_errored = 0
|
||||
for tid in venues_to_recheck:
|
||||
result = _fetch_venue_availability(tid, start_min_str, start_max_str, proxy_cycler())
|
||||
result = _fetch_venue_availability(tid, start_min_str, start_max_str, cycler["next_proxy"]())
|
||||
if result is not None:
|
||||
venues_data.append(result)
|
||||
cycler["record_success"]()
|
||||
else:
|
||||
venues_errored += 1
|
||||
circuit_opened = cycler["record_failure"]()
|
||||
if circuit_opened and not fallback_urls:
|
||||
logger.error("Circuit open with no fallback — writing partial recheck results")
|
||||
break
|
||||
|
||||
# Write recheck file
|
||||
recheck_hour = now.hour
|
||||
|
||||
@@ -6,12 +6,20 @@ When unset, all functions return None/no-op — extractors fall back to direct r
|
||||
Two routing modes:
|
||||
round-robin — distribute requests evenly across proxies (default)
|
||||
sticky — same key always maps to same proxy (for session-tracked sites)
|
||||
|
||||
Tiered proxy with circuit breaker:
|
||||
Primary tier (PROXY_URLS) is used by default — typically cheap datacenter proxies.
|
||||
Fallback tier (PROXY_URLS_FALLBACK) activates once consecutive failures >= threshold.
|
||||
Once the circuit opens it stays open for the duration of the run (no auto-recovery).
|
||||
"""
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def load_proxy_urls() -> list[str]:
|
||||
"""Read PROXY_URLS env var (comma-separated). Returns [] if unset.
|
||||
@@ -23,6 +31,17 @@ def load_proxy_urls() -> list[str]:
|
||||
return urls
|
||||
|
||||
|
||||
def load_fallback_proxy_urls() -> list[str]:
|
||||
"""Read PROXY_URLS_FALLBACK env var (comma-separated). Returns [] if unset.
|
||||
|
||||
Used as the residential/reliable fallback tier when the primary tier fails.
|
||||
Format: http://user:pass@host:port or socks5://host:port
|
||||
"""
|
||||
raw = os.environ.get("PROXY_URLS_FALLBACK", "")
|
||||
urls = [u.strip() for u in raw.split(",") if u.strip()]
|
||||
return urls
|
||||
|
||||
|
||||
def make_round_robin_cycler(proxy_urls: list[str]):
|
||||
"""Thread-safe round-robin proxy cycler.
|
||||
|
||||
@@ -42,6 +61,87 @@ def make_round_robin_cycler(proxy_urls: list[str]):
|
||||
return next_proxy
|
||||
|
||||
|
||||
def make_tiered_cycler(
|
||||
primary_urls: list[str],
|
||||
fallback_urls: list[str],
|
||||
threshold: int,
|
||||
) -> dict:
|
||||
"""Thread-safe tiered proxy cycler with circuit breaker.
|
||||
|
||||
Uses primary_urls until consecutive failures >= threshold, then switches
|
||||
permanently to fallback_urls for the rest of the run. No auto-recovery —
|
||||
once the circuit opens it stays open to avoid flapping.
|
||||
|
||||
Returns a dict of callables:
|
||||
next_proxy() -> str | None — returns URL from the active tier
|
||||
record_success() — resets consecutive failure counter
|
||||
record_failure() -> bool — increments counter; True if circuit just opened
|
||||
is_fallback_active() -> bool — whether fallback tier is currently active
|
||||
|
||||
If primary_urls is empty: always returns from fallback_urls (no circuit breaker needed).
|
||||
If both are empty: next_proxy() always returns None.
|
||||
"""
|
||||
assert threshold > 0, f"threshold must be positive, got {threshold}"
|
||||
|
||||
lock = threading.Lock()
|
||||
state = {
|
||||
"consecutive_failures": 0,
|
||||
"fallback_active": False,
|
||||
}
|
||||
|
||||
primary_cycle = itertools.cycle(primary_urls) if primary_urls else None
|
||||
fallback_cycle = itertools.cycle(fallback_urls) if fallback_urls else None
|
||||
|
||||
# No primary proxies — skip circuit breaker, use fallback directly
|
||||
if not primary_urls:
|
||||
state["fallback_active"] = True
|
||||
|
||||
def next_proxy() -> str | None:
|
||||
with lock:
|
||||
if state["fallback_active"]:
|
||||
return next(fallback_cycle) if fallback_cycle else None
|
||||
return next(primary_cycle) if primary_cycle else None
|
||||
|
||||
def record_success() -> None:
|
||||
with lock:
|
||||
state["consecutive_failures"] = 0
|
||||
|
||||
def record_failure() -> bool:
|
||||
"""Increment failure counter. Returns True if circuit just opened."""
|
||||
with lock:
|
||||
if state["fallback_active"]:
|
||||
# Already on fallback — don't trip the circuit again
|
||||
return False
|
||||
state["consecutive_failures"] += 1
|
||||
if state["consecutive_failures"] >= threshold:
|
||||
state["fallback_active"] = True
|
||||
if fallback_urls:
|
||||
logger.warning(
|
||||
"Circuit open after %d consecutive failures — "
|
||||
"switching to fallback residential proxies",
|
||||
state["consecutive_failures"],
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
"Circuit open after %d consecutive failures — "
|
||||
"no fallback configured, aborting run",
|
||||
state["consecutive_failures"],
|
||||
)
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_fallback_active() -> bool:
|
||||
with lock:
|
||||
return state["fallback_active"]
|
||||
|
||||
return {
|
||||
"next_proxy": next_proxy,
|
||||
"record_success": record_success,
|
||||
"record_failure": record_failure,
|
||||
"is_fallback_active": is_fallback_active,
|
||||
}
|
||||
|
||||
|
||||
def make_sticky_selector(proxy_urls: list[str]):
|
||||
"""Consistent-hash proxy selector — same key always maps to same proxy.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user