Replace two-tier proxy setup (PROXY_URLS / PROXY_URLS_FALLBACK) with N-tier escalation: free → datacenter → residential. - proxy.py: fetch_webshare_proxies() auto-fetches the Webshare download API on each run (no more stale manually-copied lists). load_proxy_tiers() assembles tiers from WEBSHARE_DOWNLOAD_URL, PROXY_URLS_DATACENTER, PROXY_URLS_RESIDENTIAL. make_tiered_cycler() generalised to list[list[str]] with N-level escalation; is_fallback_active() replaced by is_exhausted(). Old load_proxy_urls() / load_fallback_proxy_urls() deleted. - playtomic_availability.py: both extract() and extract_recheck() use load_proxy_tiers() + generalised cycler. _fetch_venues_parallel fallback_urls param removed. All is_fallback_active() checks → is_exhausted(). - playtomic_tenants.py: flattens tiers for simple round-robin. - test_supervisor.py: TestLoadProxyUrls removed (function deleted). Added TestFetchWebshareProxies, TestLoadProxyTiers, TestTieredCyclerNTier (11 tests covering parse format, error handling, escalation, thread safety). 47 tests pass, ruff clean. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
503 lines
20 KiB
Python
503 lines
20 KiB
Python
"""
|
|
Unit tests for supervisor.py and proxy.py.
|
|
|
|
Tests cover pure-Python logic only — no DB, no subprocesses, no network.
|
|
DB-dependent functions (is_due, _get_last_success_time) are tested via mocks.
|
|
|
|
supervisor.py lives in src/padelnomics/ (not a uv workspace package), so we
|
|
add src/ to sys.path before importing.
|
|
"""
|
|
|
|
# Load supervisor.py directly by path — avoids clashing with the web app's
|
|
# 'padelnomics' namespace (which is the installed web package).
|
|
import importlib.util as _ilu
|
|
import textwrap
|
|
from datetime import UTC, datetime, timedelta
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
_SUP_PATH = Path(__file__).parent.parent.parent / "src" / "padelnomics" / "supervisor.py"
|
|
_spec = _ilu.spec_from_file_location("padelnomics_supervisor", _SUP_PATH)
|
|
sup = _ilu.module_from_spec(_spec)
|
|
_spec.loader.exec_module(sup)
|
|
|
|
from padelnomics_extract.proxy import ( # noqa: E402
|
|
fetch_webshare_proxies,
|
|
load_proxy_tiers,
|
|
make_round_robin_cycler,
|
|
make_sticky_selector,
|
|
make_tiered_cycler,
|
|
)
|
|
|
|
# ── load_workflows ────────────────────────────────────────────────
|
|
|
|
|
|
class TestLoadWorkflows:
|
|
def test_loads_all_fields(self, tmp_path):
|
|
toml = tmp_path / "workflows.toml"
|
|
toml.write_text(textwrap.dedent("""\
|
|
[extract_a]
|
|
module = "mypkg.extract_a"
|
|
schedule = "daily"
|
|
|
|
[extract_b]
|
|
module = "mypkg.extract_b"
|
|
schedule = "weekly"
|
|
entry = "run"
|
|
depends_on = ["extract_a"]
|
|
proxy_mode = "sticky"
|
|
"""))
|
|
wfs = sup.load_workflows(toml)
|
|
assert len(wfs) == 2
|
|
|
|
a = next(w for w in wfs if w["name"] == "extract_a")
|
|
assert a["module"] == "mypkg.extract_a"
|
|
assert a["schedule"] == "daily"
|
|
assert a["entry"] == "main" # default
|
|
assert a["depends_on"] == [] # default
|
|
assert a["proxy_mode"] == "round-robin" # default
|
|
|
|
b = next(w for w in wfs if w["name"] == "extract_b")
|
|
assert b["entry"] == "run"
|
|
assert b["depends_on"] == ["extract_a"]
|
|
assert b["proxy_mode"] == "sticky"
|
|
|
|
def test_raises_on_missing_module(self, tmp_path):
|
|
toml = tmp_path / "bad.toml"
|
|
toml.write_text("[wf]\nschedule = 'daily'\n")
|
|
with pytest.raises(AssertionError, match="missing 'module'"):
|
|
sup.load_workflows(toml)
|
|
|
|
def test_raises_on_missing_schedule(self, tmp_path):
|
|
toml = tmp_path / "bad.toml"
|
|
toml.write_text("[wf]\nmodule = 'mypkg.wf'\n")
|
|
with pytest.raises(AssertionError, match="missing 'schedule'"):
|
|
sup.load_workflows(toml)
|
|
|
|
def test_raises_if_file_missing(self, tmp_path):
|
|
with pytest.raises(AssertionError, match="not found"):
|
|
sup.load_workflows(tmp_path / "nonexistent.toml")
|
|
|
|
|
|
# ── resolve_schedule ──────────────────────────────────────────────
|
|
|
|
|
|
class TestResolveSchedule:
|
|
def test_maps_named_presets(self):
|
|
assert sup.resolve_schedule("hourly") == "0 * * * *"
|
|
assert sup.resolve_schedule("daily") == "0 5 * * *"
|
|
assert sup.resolve_schedule("weekly") == "0 3 * * 1"
|
|
assert sup.resolve_schedule("monthly") == "0 4 1 * *"
|
|
|
|
def test_passes_through_raw_cron(self):
|
|
expr = "0 6-23 * * *"
|
|
assert sup.resolve_schedule(expr) == expr
|
|
|
|
def test_unknown_name_passes_through(self):
|
|
assert sup.resolve_schedule("quarterly") == "quarterly"
|
|
|
|
|
|
# ── is_due ────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestIsDue:
|
|
def _wf(self, schedule="daily", name="test_wf"):
|
|
return {"name": name, "schedule": schedule}
|
|
|
|
def test_never_ran_is_due(self):
|
|
conn = MagicMock()
|
|
with patch.object(sup, "_get_last_success_time", return_value=None):
|
|
assert sup.is_due(conn, self._wf()) is True
|
|
|
|
def test_ran_after_last_trigger_is_not_due(self):
|
|
"""Last run was AFTER the most recent trigger — not due."""
|
|
conn = MagicMock()
|
|
# Use a daily schedule — trigger fires at 05:00 UTC
|
|
# Simulate last success = today at 06:00, so trigger already covered
|
|
now = datetime.now(UTC)
|
|
last_success = now.replace(hour=6, minute=0, second=0, microsecond=0)
|
|
with patch.object(sup, "_get_last_success_time", return_value=last_success):
|
|
assert sup.is_due(conn, self._wf(schedule="daily")) is False
|
|
|
|
def test_ran_before_last_trigger_is_due(self):
|
|
"""Last run was BEFORE the most recent trigger — due again."""
|
|
conn = MagicMock()
|
|
# Monthly fires on the 1st at 04:00 — simulate running last month
|
|
last_success = datetime.now(UTC) - timedelta(days=35)
|
|
with patch.object(sup, "_get_last_success_time", return_value=last_success):
|
|
assert sup.is_due(conn, self._wf(schedule="monthly")) is True
|
|
|
|
|
|
# ── topological_waves ─────────────────────────────────────────────
|
|
|
|
|
|
class TestTopologicalWaves:
|
|
def _wf(self, name, depends_on=None):
|
|
return {"name": name, "depends_on": depends_on or []}
|
|
|
|
def test_no_deps_single_wave(self):
|
|
wfs = [self._wf("a"), self._wf("b"), self._wf("c")]
|
|
waves = sup.topological_waves(wfs)
|
|
assert len(waves) == 1
|
|
assert {w["name"] for w in waves[0]} == {"a", "b", "c"}
|
|
|
|
def test_simple_chain_two_waves(self):
|
|
wfs = [self._wf("a"), self._wf("b", depends_on=["a"])]
|
|
waves = sup.topological_waves(wfs)
|
|
assert len(waves) == 2
|
|
assert waves[0][0]["name"] == "a"
|
|
assert waves[1][0]["name"] == "b"
|
|
|
|
def test_diamond_three_waves(self):
|
|
"""a → b,c → d"""
|
|
wfs = [
|
|
self._wf("a"),
|
|
self._wf("b", depends_on=["a"]),
|
|
self._wf("c", depends_on=["a"]),
|
|
self._wf("d", depends_on=["b", "c"]),
|
|
]
|
|
waves = sup.topological_waves(wfs)
|
|
assert len(waves) == 3
|
|
assert waves[0][0]["name"] == "a"
|
|
assert {w["name"] for w in waves[1]} == {"b", "c"}
|
|
assert waves[2][0]["name"] == "d"
|
|
|
|
def test_dep_outside_due_set_ignored(self):
|
|
"""Dependency not in the due set is treated as satisfied."""
|
|
wfs = [self._wf("b", depends_on=["a"])] # "a" not in due set
|
|
waves = sup.topological_waves(wfs)
|
|
assert len(waves) == 1
|
|
assert waves[0][0]["name"] == "b"
|
|
|
|
def test_circular_dep_raises(self):
|
|
wfs = [
|
|
self._wf("a", depends_on=["b"]),
|
|
self._wf("b", depends_on=["a"]),
|
|
]
|
|
with pytest.raises(AssertionError, match="Circular dependency"):
|
|
sup.topological_waves(wfs)
|
|
|
|
def test_empty_list_returns_empty(self):
|
|
assert sup.topological_waves([]) == []
|
|
|
|
def test_real_workflows_toml(self):
|
|
"""The actual workflows.toml in the repo parses and produces valid waves."""
|
|
repo_root = Path(__file__).parent.parent.parent
|
|
wf_path = repo_root / "infra" / "supervisor" / "workflows.toml"
|
|
if not wf_path.exists():
|
|
pytest.skip("workflows.toml not found")
|
|
wfs = sup.load_workflows(wf_path)
|
|
waves = sup.topological_waves(wfs)
|
|
# playtomic_availability must come after playtomic_tenants
|
|
all_names = [w["name"] for wave in waves for w in wave]
|
|
tenants_idx = all_names.index("playtomic_tenants")
|
|
avail_idx = all_names.index("playtomic_availability")
|
|
assert tenants_idx < avail_idx
|
|
|
|
|
|
# ── proxy.py ─────────────────────────────────────────────────────
|
|
|
|
|
|
class TestFetchWebshareProxies:
|
|
def test_parses_ip_port_user_pass_format(self):
|
|
raw = "1.2.3.4:1080:user1:pass1\n5.6.7.8:1080:user2:pass2\n"
|
|
with patch("urllib.request.urlopen") as mock_open:
|
|
mock_resp = MagicMock()
|
|
mock_resp.read.return_value = raw.encode("utf-8")
|
|
mock_resp.__enter__ = lambda s: s
|
|
mock_resp.__exit__ = MagicMock(return_value=False)
|
|
mock_open.return_value = mock_resp
|
|
urls = fetch_webshare_proxies("http://example.com/proxy-list")
|
|
assert urls == [
|
|
"http://user1:pass1@1.2.3.4:1080",
|
|
"http://user2:pass2@5.6.7.8:1080",
|
|
]
|
|
|
|
def test_network_error_returns_empty(self):
|
|
import urllib.error
|
|
with patch("urllib.request.urlopen", side_effect=urllib.error.URLError("timeout")):
|
|
result = fetch_webshare_proxies("http://example.com/proxy-list")
|
|
assert result == []
|
|
|
|
def test_malformed_lines_are_skipped(self):
|
|
raw = "bad_line\n1.2.3.4:1080:user:pass\nonly:three:parts\n"
|
|
with patch("urllib.request.urlopen") as mock_open:
|
|
mock_resp = MagicMock()
|
|
mock_resp.read.return_value = raw.encode("utf-8")
|
|
mock_resp.__enter__ = lambda s: s
|
|
mock_resp.__exit__ = MagicMock(return_value=False)
|
|
mock_open.return_value = mock_resp
|
|
urls = fetch_webshare_proxies("http://example.com/proxy-list")
|
|
assert urls == ["http://user:pass@1.2.3.4:1080"]
|
|
|
|
def test_max_proxies_respected(self):
|
|
lines = "\n".join(f"10.0.0.{i}:1080:u{i}:p{i}" for i in range(10))
|
|
with patch("urllib.request.urlopen") as mock_open:
|
|
mock_resp = MagicMock()
|
|
mock_resp.read.return_value = lines.encode("utf-8")
|
|
mock_resp.__enter__ = lambda s: s
|
|
mock_resp.__exit__ = MagicMock(return_value=False)
|
|
mock_open.return_value = mock_resp
|
|
urls = fetch_webshare_proxies("http://example.com/proxy-list", max_proxies=3)
|
|
assert len(urls) == 3
|
|
|
|
def test_empty_lines_skipped(self):
|
|
raw = "\n\n1.2.3.4:1080:user:pass\n\n"
|
|
with patch("urllib.request.urlopen") as mock_open:
|
|
mock_resp = MagicMock()
|
|
mock_resp.read.return_value = raw.encode("utf-8")
|
|
mock_resp.__enter__ = lambda s: s
|
|
mock_resp.__exit__ = MagicMock(return_value=False)
|
|
mock_open.return_value = mock_resp
|
|
urls = fetch_webshare_proxies("http://example.com/proxy-list")
|
|
assert urls == ["http://user:pass@1.2.3.4:1080"]
|
|
|
|
|
|
class TestLoadProxyTiers:
|
|
def _clear_proxy_env(self, monkeypatch):
|
|
for var in ("WEBSHARE_DOWNLOAD_URL", "PROXY_URLS_DATACENTER", "PROXY_URLS_RESIDENTIAL"):
|
|
monkeypatch.delenv(var, raising=False)
|
|
|
|
def test_returns_empty_when_all_unset(self, monkeypatch):
|
|
self._clear_proxy_env(monkeypatch)
|
|
assert load_proxy_tiers() == []
|
|
|
|
def test_single_datacenter_tier(self, monkeypatch):
|
|
self._clear_proxy_env(monkeypatch)
|
|
monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080,http://dc2:8080")
|
|
tiers = load_proxy_tiers()
|
|
assert len(tiers) == 1
|
|
assert tiers[0] == ["http://dc1:8080", "http://dc2:8080"]
|
|
|
|
def test_residential_only(self, monkeypatch):
|
|
self._clear_proxy_env(monkeypatch)
|
|
monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080")
|
|
tiers = load_proxy_tiers()
|
|
assert len(tiers) == 1
|
|
assert tiers[0] == ["http://res1:8080"]
|
|
|
|
def test_empty_tiers_skipped(self, monkeypatch):
|
|
self._clear_proxy_env(monkeypatch)
|
|
monkeypatch.setenv("PROXY_URLS_DATACENTER", "")
|
|
monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080")
|
|
tiers = load_proxy_tiers()
|
|
assert len(tiers) == 1
|
|
assert tiers[0] == ["http://res1:8080"]
|
|
|
|
def test_three_tiers_correct_order(self, monkeypatch):
|
|
self._clear_proxy_env(monkeypatch)
|
|
with patch("padelnomics_extract.proxy.fetch_webshare_proxies", return_value=["http://user:pass@1.2.3.4:1080"]):
|
|
monkeypatch.setenv("WEBSHARE_DOWNLOAD_URL", "http://example.com/list")
|
|
monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080")
|
|
monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080")
|
|
tiers = load_proxy_tiers()
|
|
assert len(tiers) == 3
|
|
assert tiers[0] == ["http://user:pass@1.2.3.4:1080"] # free
|
|
assert tiers[1] == ["http://dc1:8080"] # datacenter
|
|
assert tiers[2] == ["http://res1:8080"] # residential
|
|
|
|
def test_webshare_fetch_failure_skips_tier(self, monkeypatch):
|
|
self._clear_proxy_env(monkeypatch)
|
|
with patch("padelnomics_extract.proxy.fetch_webshare_proxies", return_value=[]):
|
|
monkeypatch.setenv("WEBSHARE_DOWNLOAD_URL", "http://example.com/list")
|
|
monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080")
|
|
tiers = load_proxy_tiers()
|
|
assert len(tiers) == 1
|
|
assert tiers[0] == ["http://dc1:8080"]
|
|
|
|
|
|
class TestRoundRobinCycler:
|
|
def test_returns_none_callable_when_no_proxies(self):
|
|
fn = make_round_robin_cycler([])
|
|
assert fn() is None
|
|
|
|
def test_cycles_through_proxies(self):
|
|
urls = ["http://p1", "http://p2", "http://p3"]
|
|
fn = make_round_robin_cycler(urls)
|
|
results = [fn() for _ in range(6)]
|
|
assert results == ["http://p1", "http://p2", "http://p3"] * 2
|
|
|
|
def test_thread_safe_independent_calls(self):
|
|
"""Concurrent calls each get a proxy — no exceptions."""
|
|
import threading
|
|
urls = ["http://p1", "http://p2"]
|
|
fn = make_round_robin_cycler(urls)
|
|
results = []
|
|
lock = threading.Lock()
|
|
|
|
def worker():
|
|
proxy = fn()
|
|
with lock:
|
|
results.append(proxy)
|
|
|
|
threads = [threading.Thread(target=worker) for _ in range(10)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
assert len(results) == 10
|
|
assert all(r in urls for r in results)
|
|
|
|
|
|
class TestStickySelectorProxy:
|
|
def test_returns_none_callable_when_no_proxies(self):
|
|
fn = make_sticky_selector([])
|
|
assert fn("any_key") is None
|
|
|
|
def test_same_key_always_same_proxy(self):
|
|
urls = ["http://p1", "http://p2", "http://p3"]
|
|
fn = make_sticky_selector(urls)
|
|
proxy = fn("tenant_abc")
|
|
for _ in range(10):
|
|
assert fn("tenant_abc") == proxy
|
|
|
|
def test_different_keys_can_map_to_different_proxies(self):
|
|
urls = ["http://p1", "http://p2", "http://p3"]
|
|
fn = make_sticky_selector(urls)
|
|
results = {fn(f"key_{i}") for i in range(30)}
|
|
assert len(results) > 1 # distribution across proxies
|
|
|
|
def test_all_results_are_valid_proxies(self):
|
|
urls = ["http://p1", "http://p2"]
|
|
fn = make_sticky_selector(urls)
|
|
for i in range(20):
|
|
assert fn(f"key_{i}") in urls
|
|
|
|
|
|
class TestTieredCyclerNTier:
|
|
def test_starts_on_first_tier(self):
|
|
tiers = [["http://t0a", "http://t0b"], ["http://t1a"]]
|
|
cycler = make_tiered_cycler(tiers, threshold=3)
|
|
assert cycler["active_tier_index"]() == 0
|
|
assert not cycler["is_exhausted"]()
|
|
assert cycler["next_proxy"]() in tiers[0]
|
|
|
|
def test_escalates_after_threshold(self):
|
|
tiers = [["http://t0"], ["http://t1"]]
|
|
cycler = make_tiered_cycler(tiers, threshold=3)
|
|
# Two failures — stays on tier 0
|
|
cycler["record_failure"]()
|
|
cycler["record_failure"]()
|
|
assert cycler["active_tier_index"]() == 0
|
|
# Third failure — escalates
|
|
escalated = cycler["record_failure"]()
|
|
assert escalated is True
|
|
assert cycler["active_tier_index"]() == 1
|
|
assert cycler["next_proxy"]() == "http://t1"
|
|
|
|
def test_escalates_through_all_tiers(self):
|
|
tiers = [["http://t0"], ["http://t1"], ["http://t2"]]
|
|
cycler = make_tiered_cycler(tiers, threshold=2)
|
|
# Exhaust tier 0
|
|
cycler["record_failure"]()
|
|
cycler["record_failure"]()
|
|
assert cycler["active_tier_index"]() == 1
|
|
# Exhaust tier 1
|
|
cycler["record_failure"]()
|
|
cycler["record_failure"]()
|
|
assert cycler["active_tier_index"]() == 2
|
|
# Exhaust tier 2
|
|
cycler["record_failure"]()
|
|
cycler["record_failure"]()
|
|
assert cycler["is_exhausted"]()
|
|
assert cycler["next_proxy"]() is None
|
|
|
|
def test_success_resets_counter(self):
|
|
tiers = [["http://t0"], ["http://t1"]]
|
|
cycler = make_tiered_cycler(tiers, threshold=3)
|
|
cycler["record_failure"]()
|
|
cycler["record_failure"]()
|
|
cycler["record_success"]()
|
|
# Counter reset — need threshold more failures to escalate
|
|
cycler["record_failure"]()
|
|
cycler["record_failure"]()
|
|
assert cycler["active_tier_index"]() == 0 # still on tier 0
|
|
cycler["record_failure"]()
|
|
assert cycler["active_tier_index"]() == 1 # now escalated
|
|
|
|
def test_counter_resets_on_escalation(self):
|
|
"""After escalating, failure counter resets so new tier gets a fresh start."""
|
|
tiers = [["http://t0"], ["http://t1"], ["http://t2"]]
|
|
cycler = make_tiered_cycler(tiers, threshold=2)
|
|
# Exhaust tier 0
|
|
cycler["record_failure"]()
|
|
cycler["record_failure"]()
|
|
assert cycler["active_tier_index"]() == 1
|
|
# One failure on tier 1 — should NOT escalate yet (counter reset)
|
|
cycler["record_failure"]()
|
|
assert cycler["active_tier_index"]() == 1
|
|
# Second failure on tier 1 — escalates to tier 2
|
|
cycler["record_failure"]()
|
|
assert cycler["active_tier_index"]() == 2
|
|
|
|
def test_is_exhausted_false_when_tiers_remain(self):
|
|
tiers = [["http://t0"], ["http://t1"]]
|
|
cycler = make_tiered_cycler(tiers, threshold=1)
|
|
assert not cycler["is_exhausted"]()
|
|
cycler["record_failure"]() # escalates to tier 1
|
|
assert not cycler["is_exhausted"]()
|
|
|
|
def test_is_exhausted_true_after_all_tiers_fail(self):
|
|
tiers = [["http://t0"]]
|
|
cycler = make_tiered_cycler(tiers, threshold=1)
|
|
assert not cycler["is_exhausted"]()
|
|
cycler["record_failure"]()
|
|
assert cycler["is_exhausted"]()
|
|
assert cycler["next_proxy"]() is None
|
|
|
|
def test_empty_tiers_immediately_exhausted(self):
|
|
cycler = make_tiered_cycler([], threshold=3)
|
|
assert cycler["is_exhausted"]()
|
|
assert cycler["next_proxy"]() is None
|
|
assert cycler["tier_count"]() == 0
|
|
|
|
def test_single_tier_cycles_within_tier(self):
|
|
tiers = [["http://p1", "http://p2", "http://p3"]]
|
|
cycler = make_tiered_cycler(tiers, threshold=10)
|
|
results = [cycler["next_proxy"]() for _ in range(6)]
|
|
assert results == ["http://p1", "http://p2", "http://p3"] * 2
|
|
|
|
def test_tier_count_reflects_input(self):
|
|
assert make_tiered_cycler([], threshold=1)["tier_count"]() == 0
|
|
assert make_tiered_cycler([["a"]], threshold=1)["tier_count"]() == 1
|
|
assert make_tiered_cycler([["a"], ["b"], ["c"]], threshold=1)["tier_count"]() == 3
|
|
|
|
def test_record_failure_noop_when_exhausted(self):
|
|
tiers = [["http://t0"]]
|
|
cycler = make_tiered_cycler(tiers, threshold=1)
|
|
cycler["record_failure"]() # exhausts
|
|
assert cycler["is_exhausted"]()
|
|
# Further failures are no-ops, not exceptions
|
|
result = cycler["record_failure"]()
|
|
assert result is False
|
|
assert cycler["is_exhausted"]()
|
|
|
|
def test_thread_safety(self):
|
|
"""Concurrent next_proxy and record calls do not raise or corrupt state."""
|
|
import threading
|
|
tiers = [["http://t0a", "http://t0b"], ["http://t1a", "http://t1b"]]
|
|
cycler = make_tiered_cycler(tiers, threshold=5)
|
|
errors = []
|
|
lock = threading.Lock()
|
|
|
|
def worker():
|
|
try:
|
|
for _ in range(20):
|
|
cycler["next_proxy"]()
|
|
cycler["record_failure"]()
|
|
cycler["record_success"]()
|
|
except Exception as e:
|
|
with lock:
|
|
errors.append(e)
|
|
|
|
threads = [threading.Thread(target=worker) for _ in range(8)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
assert errors == [], f"Thread safety errors: {errors}"
|