""" Unit tests for supervisor.py and proxy.py. Tests cover pure-Python logic only — no DB, no subprocesses, no network. DB-dependent functions (is_due, _get_last_success_time) are tested via mocks. supervisor.py lives in src/padelnomics/ (not a uv workspace package), so we add src/ to sys.path before importing. """ import sys import textwrap import tomllib from datetime import UTC, datetime, timedelta from pathlib import Path from unittest.mock import MagicMock, patch import pytest # Load supervisor.py directly by path — avoids clashing with the web app's # 'padelnomics' namespace (which is the installed web package). import importlib.util as _ilu _SUP_PATH = Path(__file__).parent.parent.parent / "src" / "padelnomics" / "supervisor.py" _spec = _ilu.spec_from_file_location("padelnomics_supervisor", _SUP_PATH) sup = _ilu.module_from_spec(_spec) _spec.loader.exec_module(sup) from padelnomics_extract.proxy import ( load_proxy_urls, make_round_robin_cycler, make_sticky_selector, ) # ── load_workflows ──────────────────────────────────────────────── class TestLoadWorkflows: def test_loads_all_fields(self, tmp_path): toml = tmp_path / "workflows.toml" toml.write_text(textwrap.dedent("""\ [extract_a] module = "mypkg.extract_a" schedule = "daily" [extract_b] module = "mypkg.extract_b" schedule = "weekly" entry = "run" depends_on = ["extract_a"] proxy_mode = "sticky" """)) wfs = sup.load_workflows(toml) assert len(wfs) == 2 a = next(w for w in wfs if w["name"] == "extract_a") assert a["module"] == "mypkg.extract_a" assert a["schedule"] == "daily" assert a["entry"] == "main" # default assert a["depends_on"] == [] # default assert a["proxy_mode"] == "round-robin" # default b = next(w for w in wfs if w["name"] == "extract_b") assert b["entry"] == "run" assert b["depends_on"] == ["extract_a"] assert b["proxy_mode"] == "sticky" def test_raises_on_missing_module(self, tmp_path): toml = tmp_path / "bad.toml" toml.write_text("[wf]\nschedule = 'daily'\n") with pytest.raises(AssertionError, match="missing 'module'"): sup.load_workflows(toml) def test_raises_on_missing_schedule(self, tmp_path): toml = tmp_path / "bad.toml" toml.write_text("[wf]\nmodule = 'mypkg.wf'\n") with pytest.raises(AssertionError, match="missing 'schedule'"): sup.load_workflows(toml) def test_raises_if_file_missing(self, tmp_path): with pytest.raises(AssertionError, match="not found"): sup.load_workflows(tmp_path / "nonexistent.toml") # ── resolve_schedule ────────────────────────────────────────────── class TestResolveSchedule: def test_maps_named_presets(self): assert sup.resolve_schedule("hourly") == "0 * * * *" assert sup.resolve_schedule("daily") == "0 5 * * *" assert sup.resolve_schedule("weekly") == "0 3 * * 1" assert sup.resolve_schedule("monthly") == "0 4 1 * *" def test_passes_through_raw_cron(self): expr = "0 6-23 * * *" assert sup.resolve_schedule(expr) == expr def test_unknown_name_passes_through(self): assert sup.resolve_schedule("quarterly") == "quarterly" # ── is_due ──────────────────────────────────────────────────────── class TestIsDue: def _wf(self, schedule="daily", name="test_wf"): return {"name": name, "schedule": schedule} def test_never_ran_is_due(self): conn = MagicMock() with patch.object(sup, "_get_last_success_time", return_value=None): assert sup.is_due(conn, self._wf()) is True def test_ran_after_last_trigger_is_not_due(self): """Last run was AFTER the most recent trigger — not due.""" conn = MagicMock() # Use a daily schedule — trigger fires at 05:00 UTC # Simulate last success = today at 06:00, so trigger already covered now = datetime.now(UTC) last_success = now.replace(hour=6, minute=0, second=0, microsecond=0) with patch.object(sup, "_get_last_success_time", return_value=last_success): assert sup.is_due(conn, self._wf(schedule="daily")) is False def test_ran_before_last_trigger_is_due(self): """Last run was BEFORE the most recent trigger — due again.""" conn = MagicMock() # Monthly fires on the 1st at 04:00 — simulate running last month last_success = datetime.now(UTC) - timedelta(days=35) with patch.object(sup, "_get_last_success_time", return_value=last_success): assert sup.is_due(conn, self._wf(schedule="monthly")) is True # ── topological_waves ───────────────────────────────────────────── class TestTopologicalWaves: def _wf(self, name, depends_on=None): return {"name": name, "depends_on": depends_on or []} def test_no_deps_single_wave(self): wfs = [self._wf("a"), self._wf("b"), self._wf("c")] waves = sup.topological_waves(wfs) assert len(waves) == 1 assert {w["name"] for w in waves[0]} == {"a", "b", "c"} def test_simple_chain_two_waves(self): wfs = [self._wf("a"), self._wf("b", depends_on=["a"])] waves = sup.topological_waves(wfs) assert len(waves) == 2 assert waves[0][0]["name"] == "a" assert waves[1][0]["name"] == "b" def test_diamond_three_waves(self): """a → b,c → d""" wfs = [ self._wf("a"), self._wf("b", depends_on=["a"]), self._wf("c", depends_on=["a"]), self._wf("d", depends_on=["b", "c"]), ] waves = sup.topological_waves(wfs) assert len(waves) == 3 assert waves[0][0]["name"] == "a" assert {w["name"] for w in waves[1]} == {"b", "c"} assert waves[2][0]["name"] == "d" def test_dep_outside_due_set_ignored(self): """Dependency not in the due set is treated as satisfied.""" wfs = [self._wf("b", depends_on=["a"])] # "a" not in due set waves = sup.topological_waves(wfs) assert len(waves) == 1 assert waves[0][0]["name"] == "b" def test_circular_dep_raises(self): wfs = [ self._wf("a", depends_on=["b"]), self._wf("b", depends_on=["a"]), ] with pytest.raises(AssertionError, match="Circular dependency"): sup.topological_waves(wfs) def test_empty_list_returns_empty(self): assert sup.topological_waves([]) == [] def test_real_workflows_toml(self): """The actual workflows.toml in the repo parses and produces valid waves.""" repo_root = Path(__file__).parent.parent.parent wf_path = repo_root / "infra" / "supervisor" / "workflows.toml" if not wf_path.exists(): pytest.skip("workflows.toml not found") wfs = sup.load_workflows(wf_path) waves = sup.topological_waves(wfs) # playtomic_availability must come after playtomic_tenants all_names = [w["name"] for wave in waves for w in wave] tenants_idx = all_names.index("playtomic_tenants") avail_idx = all_names.index("playtomic_availability") assert tenants_idx < avail_idx # ── proxy.py ───────────────────────────────────────────────────── class TestLoadProxyUrls: def test_returns_empty_when_unset(self, monkeypatch): monkeypatch.delenv("PROXY_URLS", raising=False) assert load_proxy_urls() == [] def test_parses_comma_separated_urls(self, monkeypatch): monkeypatch.setenv( "PROXY_URLS", "http://p1:8080,http://p2:8080,http://p3:8080", ) urls = load_proxy_urls() assert urls == ["http://p1:8080", "http://p2:8080", "http://p3:8080"] def test_strips_whitespace(self, monkeypatch): monkeypatch.setenv("PROXY_URLS", " http://p1:8080 , http://p2:8080 ") urls = load_proxy_urls() assert urls == ["http://p1:8080", "http://p2:8080"] def test_ignores_empty_segments(self, monkeypatch): monkeypatch.setenv("PROXY_URLS", "http://p1:8080,,http://p2:8080,") urls = load_proxy_urls() assert urls == ["http://p1:8080", "http://p2:8080"] class TestRoundRobinCycler: def test_returns_none_callable_when_no_proxies(self): fn = make_round_robin_cycler([]) assert fn() is None def test_cycles_through_proxies(self): urls = ["http://p1", "http://p2", "http://p3"] fn = make_round_robin_cycler(urls) results = [fn() for _ in range(6)] assert results == ["http://p1", "http://p2", "http://p3"] * 2 def test_thread_safe_independent_calls(self): """Concurrent calls each get a proxy — no exceptions.""" import threading urls = ["http://p1", "http://p2"] fn = make_round_robin_cycler(urls) results = [] lock = threading.Lock() def worker(): proxy = fn() with lock: results.append(proxy) threads = [threading.Thread(target=worker) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() assert len(results) == 10 assert all(r in urls for r in results) class TestStickySelectorProxy: def test_returns_none_callable_when_no_proxies(self): fn = make_sticky_selector([]) assert fn("any_key") is None def test_same_key_always_same_proxy(self): urls = ["http://p1", "http://p2", "http://p3"] fn = make_sticky_selector(urls) proxy = fn("tenant_abc") for _ in range(10): assert fn("tenant_abc") == proxy def test_different_keys_can_map_to_different_proxies(self): urls = ["http://p1", "http://p2", "http://p3"] fn = make_sticky_selector(urls) results = {fn(f"key_{i}") for i in range(30)} assert len(results) > 1 # distribution across proxies def test_all_results_are_valid_proxies(self): urls = ["http://p1", "http://p2"] fn = make_sticky_selector(urls) for i in range(20): assert fn(f"key_{i}") in urls