""" Unit tests for supervisor.py and proxy.py. Tests cover pure-Python logic only — no DB, no subprocesses, no network. DB-dependent functions (is_due, _get_last_success_time) are tested via mocks. supervisor.py lives in src/padelnomics/ (not a uv workspace package), so we add src/ to sys.path before importing. """ # Load supervisor.py directly by path — avoids clashing with the web app's # 'padelnomics' namespace (which is the installed web package). import importlib.util as _ilu import textwrap from datetime import UTC, datetime, timedelta from pathlib import Path from unittest.mock import MagicMock, patch import pytest _SUP_PATH = Path(__file__).parent.parent.parent / "src" / "padelnomics" / "supervisor.py" _spec = _ilu.spec_from_file_location("padelnomics_supervisor", _SUP_PATH) sup = _ilu.module_from_spec(_spec) _spec.loader.exec_module(sup) from padelnomics_extract.proxy import ( # noqa: E402 fetch_webshare_proxies, load_proxy_tiers, make_round_robin_cycler, make_sticky_selector, make_tiered_cycler, ) # ── load_workflows ──────────────────────────────────────────────── class TestLoadWorkflows: def test_loads_all_fields(self, tmp_path): toml = tmp_path / "workflows.toml" toml.write_text(textwrap.dedent("""\ [extract_a] module = "mypkg.extract_a" schedule = "daily" [extract_b] module = "mypkg.extract_b" schedule = "weekly" entry = "run" depends_on = ["extract_a"] proxy_mode = "sticky" """)) wfs = sup.load_workflows(toml) assert len(wfs) == 2 a = next(w for w in wfs if w["name"] == "extract_a") assert a["module"] == "mypkg.extract_a" assert a["schedule"] == "daily" assert a["entry"] == "main" # default assert a["depends_on"] == [] # default assert a["proxy_mode"] == "round-robin" # default b = next(w for w in wfs if w["name"] == "extract_b") assert b["entry"] == "run" assert b["depends_on"] == ["extract_a"] assert b["proxy_mode"] == "sticky" def test_raises_on_missing_module(self, tmp_path): toml = tmp_path / "bad.toml" toml.write_text("[wf]\nschedule = 'daily'\n") with pytest.raises(AssertionError, match="missing 'module'"): sup.load_workflows(toml) def test_raises_on_missing_schedule(self, tmp_path): toml = tmp_path / "bad.toml" toml.write_text("[wf]\nmodule = 'mypkg.wf'\n") with pytest.raises(AssertionError, match="missing 'schedule'"): sup.load_workflows(toml) def test_raises_if_file_missing(self, tmp_path): with pytest.raises(AssertionError, match="not found"): sup.load_workflows(tmp_path / "nonexistent.toml") # ── resolve_schedule ────────────────────────────────────────────── class TestResolveSchedule: def test_maps_named_presets(self): assert sup.resolve_schedule("hourly") == "0 * * * *" assert sup.resolve_schedule("daily") == "0 5 * * *" assert sup.resolve_schedule("weekly") == "0 3 * * 1" assert sup.resolve_schedule("monthly") == "0 4 1 * *" def test_passes_through_raw_cron(self): expr = "0 6-23 * * *" assert sup.resolve_schedule(expr) == expr def test_unknown_name_passes_through(self): assert sup.resolve_schedule("quarterly") == "quarterly" # ── is_due ──────────────────────────────────────────────────────── class TestIsDue: def _wf(self, schedule="daily", name="test_wf"): return {"name": name, "schedule": schedule} def test_never_ran_is_due(self): conn = MagicMock() with patch.object(sup, "_get_last_success_time", return_value=None): assert sup.is_due(conn, self._wf()) is True def test_ran_after_last_trigger_is_not_due(self): """Last run was AFTER the most recent trigger — not due.""" conn = MagicMock() # Use a daily schedule — trigger fires at 05:00 UTC # Simulate last success = today at 06:00, so trigger already covered now = datetime.now(UTC) last_success = now.replace(hour=6, minute=0, second=0, microsecond=0) with patch.object(sup, "_get_last_success_time", return_value=last_success): assert sup.is_due(conn, self._wf(schedule="daily")) is False def test_ran_before_last_trigger_is_due(self): """Last run was BEFORE the most recent trigger — due again.""" conn = MagicMock() # Monthly fires on the 1st at 04:00 — simulate running last month last_success = datetime.now(UTC) - timedelta(days=35) with patch.object(sup, "_get_last_success_time", return_value=last_success): assert sup.is_due(conn, self._wf(schedule="monthly")) is True # ── topological_waves ───────────────────────────────────────────── class TestTopologicalWaves: def _wf(self, name, depends_on=None): return {"name": name, "depends_on": depends_on or []} def test_no_deps_single_wave(self): wfs = [self._wf("a"), self._wf("b"), self._wf("c")] waves = sup.topological_waves(wfs) assert len(waves) == 1 assert {w["name"] for w in waves[0]} == {"a", "b", "c"} def test_simple_chain_two_waves(self): wfs = [self._wf("a"), self._wf("b", depends_on=["a"])] waves = sup.topological_waves(wfs) assert len(waves) == 2 assert waves[0][0]["name"] == "a" assert waves[1][0]["name"] == "b" def test_diamond_three_waves(self): """a → b,c → d""" wfs = [ self._wf("a"), self._wf("b", depends_on=["a"]), self._wf("c", depends_on=["a"]), self._wf("d", depends_on=["b", "c"]), ] waves = sup.topological_waves(wfs) assert len(waves) == 3 assert waves[0][0]["name"] == "a" assert {w["name"] for w in waves[1]} == {"b", "c"} assert waves[2][0]["name"] == "d" def test_dep_outside_due_set_ignored(self): """Dependency not in the due set is treated as satisfied.""" wfs = [self._wf("b", depends_on=["a"])] # "a" not in due set waves = sup.topological_waves(wfs) assert len(waves) == 1 assert waves[0][0]["name"] == "b" def test_circular_dep_raises(self): wfs = [ self._wf("a", depends_on=["b"]), self._wf("b", depends_on=["a"]), ] with pytest.raises(AssertionError, match="Circular dependency"): sup.topological_waves(wfs) def test_empty_list_returns_empty(self): assert sup.topological_waves([]) == [] def test_real_workflows_toml(self): """The actual workflows.toml in the repo parses and produces valid waves.""" repo_root = Path(__file__).parent.parent.parent wf_path = repo_root / "infra" / "supervisor" / "workflows.toml" if not wf_path.exists(): pytest.skip("workflows.toml not found") wfs = sup.load_workflows(wf_path) waves = sup.topological_waves(wfs) # playtomic_availability must come after playtomic_tenants all_names = [w["name"] for wave in waves for w in wave] tenants_idx = all_names.index("playtomic_tenants") avail_idx = all_names.index("playtomic_availability") assert tenants_idx < avail_idx # ── proxy.py ───────────────────────────────────────────────────── class TestFetchWebshareProxies: def test_parses_ip_port_user_pass_format(self): raw = "1.2.3.4:1080:user1:pass1\n5.6.7.8:1080:user2:pass2\n" with patch("urllib.request.urlopen") as mock_open: mock_resp = MagicMock() mock_resp.read.return_value = raw.encode("utf-8") mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) mock_open.return_value = mock_resp urls = fetch_webshare_proxies("http://example.com/proxy-list") assert urls == [ "http://user1:pass1@1.2.3.4:1080", "http://user2:pass2@5.6.7.8:1080", ] def test_network_error_returns_empty(self): import urllib.error with patch("urllib.request.urlopen", side_effect=urllib.error.URLError("timeout")): result = fetch_webshare_proxies("http://example.com/proxy-list") assert result == [] def test_malformed_lines_are_skipped(self): raw = "bad_line\n1.2.3.4:1080:user:pass\nonly:three:parts\n" with patch("urllib.request.urlopen") as mock_open: mock_resp = MagicMock() mock_resp.read.return_value = raw.encode("utf-8") mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) mock_open.return_value = mock_resp urls = fetch_webshare_proxies("http://example.com/proxy-list") assert urls == ["http://user:pass@1.2.3.4:1080"] def test_max_proxies_respected(self): lines = "\n".join(f"10.0.0.{i}:1080:u{i}:p{i}" for i in range(10)) with patch("urllib.request.urlopen") as mock_open: mock_resp = MagicMock() mock_resp.read.return_value = lines.encode("utf-8") mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) mock_open.return_value = mock_resp urls = fetch_webshare_proxies("http://example.com/proxy-list", max_proxies=3) assert len(urls) == 3 def test_empty_lines_skipped(self): raw = "\n\n1.2.3.4:1080:user:pass\n\n" with patch("urllib.request.urlopen") as mock_open: mock_resp = MagicMock() mock_resp.read.return_value = raw.encode("utf-8") mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) mock_open.return_value = mock_resp urls = fetch_webshare_proxies("http://example.com/proxy-list") assert urls == ["http://user:pass@1.2.3.4:1080"] class TestLoadProxyTiers: def _clear_proxy_env(self, monkeypatch): for var in ("WEBSHARE_DOWNLOAD_URL", "PROXY_URLS_DATACENTER", "PROXY_URLS_RESIDENTIAL"): monkeypatch.delenv(var, raising=False) def test_returns_empty_when_all_unset(self, monkeypatch): self._clear_proxy_env(monkeypatch) assert load_proxy_tiers() == [] def test_single_datacenter_tier(self, monkeypatch): self._clear_proxy_env(monkeypatch) monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080,http://dc2:8080") tiers = load_proxy_tiers() assert len(tiers) == 1 assert tiers[0] == ["http://dc1:8080", "http://dc2:8080"] def test_residential_only(self, monkeypatch): self._clear_proxy_env(monkeypatch) monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080") tiers = load_proxy_tiers() assert len(tiers) == 1 assert tiers[0] == ["http://res1:8080"] def test_empty_tiers_skipped(self, monkeypatch): self._clear_proxy_env(monkeypatch) monkeypatch.setenv("PROXY_URLS_DATACENTER", "") monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080") tiers = load_proxy_tiers() assert len(tiers) == 1 assert tiers[0] == ["http://res1:8080"] def test_three_tiers_correct_order(self, monkeypatch): self._clear_proxy_env(monkeypatch) with patch("padelnomics_extract.proxy.fetch_webshare_proxies", return_value=["http://user:pass@1.2.3.4:1080"]): monkeypatch.setenv("WEBSHARE_DOWNLOAD_URL", "http://example.com/list") monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080") monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080") tiers = load_proxy_tiers() assert len(tiers) == 3 assert tiers[0] == ["http://user:pass@1.2.3.4:1080"] # free assert tiers[1] == ["http://dc1:8080"] # datacenter assert tiers[2] == ["http://res1:8080"] # residential def test_webshare_fetch_failure_skips_tier(self, monkeypatch): self._clear_proxy_env(monkeypatch) with patch("padelnomics_extract.proxy.fetch_webshare_proxies", return_value=[]): monkeypatch.setenv("WEBSHARE_DOWNLOAD_URL", "http://example.com/list") monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080") tiers = load_proxy_tiers() assert len(tiers) == 1 assert tiers[0] == ["http://dc1:8080"] class TestRoundRobinCycler: def test_returns_none_callable_when_no_proxies(self): fn = make_round_robin_cycler([]) assert fn() is None def test_cycles_through_proxies(self): urls = ["http://p1", "http://p2", "http://p3"] fn = make_round_robin_cycler(urls) results = [fn() for _ in range(6)] assert results == ["http://p1", "http://p2", "http://p3"] * 2 def test_thread_safe_independent_calls(self): """Concurrent calls each get a proxy — no exceptions.""" import threading urls = ["http://p1", "http://p2"] fn = make_round_robin_cycler(urls) results = [] lock = threading.Lock() def worker(): proxy = fn() with lock: results.append(proxy) threads = [threading.Thread(target=worker) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() assert len(results) == 10 assert all(r in urls for r in results) class TestStickySelectorProxy: def test_returns_none_callable_when_no_proxies(self): fn = make_sticky_selector([]) assert fn("any_key") is None def test_same_key_always_same_proxy(self): urls = ["http://p1", "http://p2", "http://p3"] fn = make_sticky_selector(urls) proxy = fn("tenant_abc") for _ in range(10): assert fn("tenant_abc") == proxy def test_different_keys_can_map_to_different_proxies(self): urls = ["http://p1", "http://p2", "http://p3"] fn = make_sticky_selector(urls) results = {fn(f"key_{i}") for i in range(30)} assert len(results) > 1 # distribution across proxies def test_all_results_are_valid_proxies(self): urls = ["http://p1", "http://p2"] fn = make_sticky_selector(urls) for i in range(20): assert fn(f"key_{i}") in urls class TestTieredCyclerNTier: def test_starts_on_first_tier(self): tiers = [["http://t0a", "http://t0b"], ["http://t1a"]] cycler = make_tiered_cycler(tiers, threshold=3) assert cycler["active_tier_index"]() == 0 assert not cycler["is_exhausted"]() assert cycler["next_proxy"]() in tiers[0] def test_escalates_after_threshold(self): tiers = [["http://t0"], ["http://t1"]] cycler = make_tiered_cycler(tiers, threshold=3) # Two failures — stays on tier 0 cycler["record_failure"]() cycler["record_failure"]() assert cycler["active_tier_index"]() == 0 # Third failure — escalates escalated = cycler["record_failure"]() assert escalated is True assert cycler["active_tier_index"]() == 1 assert cycler["next_proxy"]() == "http://t1" def test_escalates_through_all_tiers(self): tiers = [["http://t0"], ["http://t1"], ["http://t2"]] cycler = make_tiered_cycler(tiers, threshold=2) # Exhaust tier 0 cycler["record_failure"]() cycler["record_failure"]() assert cycler["active_tier_index"]() == 1 # Exhaust tier 1 cycler["record_failure"]() cycler["record_failure"]() assert cycler["active_tier_index"]() == 2 # Exhaust tier 2 cycler["record_failure"]() cycler["record_failure"]() assert cycler["is_exhausted"]() assert cycler["next_proxy"]() is None def test_success_resets_counter(self): tiers = [["http://t0"], ["http://t1"]] cycler = make_tiered_cycler(tiers, threshold=3) cycler["record_failure"]() cycler["record_failure"]() cycler["record_success"]() # Counter reset — need threshold more failures to escalate cycler["record_failure"]() cycler["record_failure"]() assert cycler["active_tier_index"]() == 0 # still on tier 0 cycler["record_failure"]() assert cycler["active_tier_index"]() == 1 # now escalated def test_counter_resets_on_escalation(self): """After escalating, failure counter resets so new tier gets a fresh start.""" tiers = [["http://t0"], ["http://t1"], ["http://t2"]] cycler = make_tiered_cycler(tiers, threshold=2) # Exhaust tier 0 cycler["record_failure"]() cycler["record_failure"]() assert cycler["active_tier_index"]() == 1 # One failure on tier 1 — should NOT escalate yet (counter reset) cycler["record_failure"]() assert cycler["active_tier_index"]() == 1 # Second failure on tier 1 — escalates to tier 2 cycler["record_failure"]() assert cycler["active_tier_index"]() == 2 def test_is_exhausted_false_when_tiers_remain(self): tiers = [["http://t0"], ["http://t1"]] cycler = make_tiered_cycler(tiers, threshold=1) assert not cycler["is_exhausted"]() cycler["record_failure"]() # escalates to tier 1 assert not cycler["is_exhausted"]() def test_is_exhausted_true_after_all_tiers_fail(self): tiers = [["http://t0"]] cycler = make_tiered_cycler(tiers, threshold=1) assert not cycler["is_exhausted"]() cycler["record_failure"]() assert cycler["is_exhausted"]() assert cycler["next_proxy"]() is None def test_empty_tiers_immediately_exhausted(self): cycler = make_tiered_cycler([], threshold=3) assert cycler["is_exhausted"]() assert cycler["next_proxy"]() is None assert cycler["tier_count"]() == 0 def test_single_tier_cycles_within_tier(self): tiers = [["http://p1", "http://p2", "http://p3"]] cycler = make_tiered_cycler(tiers, threshold=10) results = [cycler["next_proxy"]() for _ in range(6)] assert results == ["http://p1", "http://p2", "http://p3"] * 2 def test_tier_count_reflects_input(self): assert make_tiered_cycler([], threshold=1)["tier_count"]() == 0 assert make_tiered_cycler([["a"]], threshold=1)["tier_count"]() == 1 assert make_tiered_cycler([["a"], ["b"], ["c"]], threshold=1)["tier_count"]() == 3 def test_record_failure_noop_when_exhausted(self): tiers = [["http://t0"]] cycler = make_tiered_cycler(tiers, threshold=1) cycler["record_failure"]() # exhausts assert cycler["is_exhausted"]() # Further failures are no-ops, not exceptions result = cycler["record_failure"]() assert result is False assert cycler["is_exhausted"]() def test_thread_safety(self): """Concurrent next_proxy and record calls do not raise or corrupt state.""" import threading tiers = [["http://t0a", "http://t0b"], ["http://t1a", "http://t1b"]] cycler = make_tiered_cycler(tiers, threshold=5) errors = [] lock = threading.Lock() def worker(): try: for _ in range(20): cycler["next_proxy"]() cycler["record_failure"]() cycler["record_success"]() except Exception as e: with lock: errors.append(e) threads = [threading.Thread(target=worker) for _ in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Thread safety errors: {errors}"