Files
padelnomics/web/tests/test_supervisor.py
Deeman e33b28025e fix: use SQLite-compatible space format in utcnow_iso(), fix credits ordering
utcnow_iso() now produces 'YYYY-MM-DD HH:MM:SS' (space separator) matching
SQLite's datetime('now') so lexicographic comparisons like
'published_at <= datetime(now)' work correctly.

Also add `id DESC` tiebreaker to get_ledger() ORDER BY to preserve
insertion order when multiple credits are added within the same second.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 10:30:18 +01:00

282 lines
11 KiB
Python

"""
Unit tests for supervisor.py and proxy.py.
Tests cover pure-Python logic only — no DB, no subprocesses, no network.
DB-dependent functions (is_due, _get_last_success_time) are tested via mocks.
supervisor.py lives in src/padelnomics/ (not a uv workspace package), so we
add src/ to sys.path before importing.
"""
# Load supervisor.py directly by path — avoids clashing with the web app's
# 'padelnomics' namespace (which is the installed web package).
import importlib.util as _ilu
import textwrap
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
_SUP_PATH = Path(__file__).parent.parent.parent / "src" / "padelnomics" / "supervisor.py"
_spec = _ilu.spec_from_file_location("padelnomics_supervisor", _SUP_PATH)
sup = _ilu.module_from_spec(_spec)
_spec.loader.exec_module(sup)
from padelnomics_extract.proxy import (
load_proxy_urls,
make_round_robin_cycler,
make_sticky_selector,
)
# ── load_workflows ────────────────────────────────────────────────
class TestLoadWorkflows:
def test_loads_all_fields(self, tmp_path):
toml = tmp_path / "workflows.toml"
toml.write_text(textwrap.dedent("""\
[extract_a]
module = "mypkg.extract_a"
schedule = "daily"
[extract_b]
module = "mypkg.extract_b"
schedule = "weekly"
entry = "run"
depends_on = ["extract_a"]
proxy_mode = "sticky"
"""))
wfs = sup.load_workflows(toml)
assert len(wfs) == 2
a = next(w for w in wfs if w["name"] == "extract_a")
assert a["module"] == "mypkg.extract_a"
assert a["schedule"] == "daily"
assert a["entry"] == "main" # default
assert a["depends_on"] == [] # default
assert a["proxy_mode"] == "round-robin" # default
b = next(w for w in wfs if w["name"] == "extract_b")
assert b["entry"] == "run"
assert b["depends_on"] == ["extract_a"]
assert b["proxy_mode"] == "sticky"
def test_raises_on_missing_module(self, tmp_path):
toml = tmp_path / "bad.toml"
toml.write_text("[wf]\nschedule = 'daily'\n")
with pytest.raises(AssertionError, match="missing 'module'"):
sup.load_workflows(toml)
def test_raises_on_missing_schedule(self, tmp_path):
toml = tmp_path / "bad.toml"
toml.write_text("[wf]\nmodule = 'mypkg.wf'\n")
with pytest.raises(AssertionError, match="missing 'schedule'"):
sup.load_workflows(toml)
def test_raises_if_file_missing(self, tmp_path):
with pytest.raises(AssertionError, match="not found"):
sup.load_workflows(tmp_path / "nonexistent.toml")
# ── resolve_schedule ──────────────────────────────────────────────
class TestResolveSchedule:
def test_maps_named_presets(self):
assert sup.resolve_schedule("hourly") == "0 * * * *"
assert sup.resolve_schedule("daily") == "0 5 * * *"
assert sup.resolve_schedule("weekly") == "0 3 * * 1"
assert sup.resolve_schedule("monthly") == "0 4 1 * *"
def test_passes_through_raw_cron(self):
expr = "0 6-23 * * *"
assert sup.resolve_schedule(expr) == expr
def test_unknown_name_passes_through(self):
assert sup.resolve_schedule("quarterly") == "quarterly"
# ── is_due ────────────────────────────────────────────────────────
class TestIsDue:
def _wf(self, schedule="daily", name="test_wf"):
return {"name": name, "schedule": schedule}
def test_never_ran_is_due(self):
conn = MagicMock()
with patch.object(sup, "_get_last_success_time", return_value=None):
assert sup.is_due(conn, self._wf()) is True
def test_ran_after_last_trigger_is_not_due(self):
"""Last run was AFTER the most recent trigger — not due."""
conn = MagicMock()
# Use a daily schedule — trigger fires at 05:00 UTC
# Simulate last success = today at 06:00, so trigger already covered
now = datetime.now(UTC)
last_success = now.replace(hour=6, minute=0, second=0, microsecond=0)
with patch.object(sup, "_get_last_success_time", return_value=last_success):
assert sup.is_due(conn, self._wf(schedule="daily")) is False
def test_ran_before_last_trigger_is_due(self):
"""Last run was BEFORE the most recent trigger — due again."""
conn = MagicMock()
# Monthly fires on the 1st at 04:00 — simulate running last month
last_success = datetime.now(UTC) - timedelta(days=35)
with patch.object(sup, "_get_last_success_time", return_value=last_success):
assert sup.is_due(conn, self._wf(schedule="monthly")) is True
# ── topological_waves ─────────────────────────────────────────────
class TestTopologicalWaves:
def _wf(self, name, depends_on=None):
return {"name": name, "depends_on": depends_on or []}
def test_no_deps_single_wave(self):
wfs = [self._wf("a"), self._wf("b"), self._wf("c")]
waves = sup.topological_waves(wfs)
assert len(waves) == 1
assert {w["name"] for w in waves[0]} == {"a", "b", "c"}
def test_simple_chain_two_waves(self):
wfs = [self._wf("a"), self._wf("b", depends_on=["a"])]
waves = sup.topological_waves(wfs)
assert len(waves) == 2
assert waves[0][0]["name"] == "a"
assert waves[1][0]["name"] == "b"
def test_diamond_three_waves(self):
"""a → b,c → d"""
wfs = [
self._wf("a"),
self._wf("b", depends_on=["a"]),
self._wf("c", depends_on=["a"]),
self._wf("d", depends_on=["b", "c"]),
]
waves = sup.topological_waves(wfs)
assert len(waves) == 3
assert waves[0][0]["name"] == "a"
assert {w["name"] for w in waves[1]} == {"b", "c"}
assert waves[2][0]["name"] == "d"
def test_dep_outside_due_set_ignored(self):
"""Dependency not in the due set is treated as satisfied."""
wfs = [self._wf("b", depends_on=["a"])] # "a" not in due set
waves = sup.topological_waves(wfs)
assert len(waves) == 1
assert waves[0][0]["name"] == "b"
def test_circular_dep_raises(self):
wfs = [
self._wf("a", depends_on=["b"]),
self._wf("b", depends_on=["a"]),
]
with pytest.raises(AssertionError, match="Circular dependency"):
sup.topological_waves(wfs)
def test_empty_list_returns_empty(self):
assert sup.topological_waves([]) == []
def test_real_workflows_toml(self):
"""The actual workflows.toml in the repo parses and produces valid waves."""
repo_root = Path(__file__).parent.parent.parent
wf_path = repo_root / "infra" / "supervisor" / "workflows.toml"
if not wf_path.exists():
pytest.skip("workflows.toml not found")
wfs = sup.load_workflows(wf_path)
waves = sup.topological_waves(wfs)
# playtomic_availability must come after playtomic_tenants
all_names = [w["name"] for wave in waves for w in wave]
tenants_idx = all_names.index("playtomic_tenants")
avail_idx = all_names.index("playtomic_availability")
assert tenants_idx < avail_idx
# ── proxy.py ─────────────────────────────────────────────────────
class TestLoadProxyUrls:
def test_returns_empty_when_unset(self, monkeypatch):
monkeypatch.delenv("PROXY_URLS", raising=False)
assert load_proxy_urls() == []
def test_parses_comma_separated_urls(self, monkeypatch):
monkeypatch.setenv(
"PROXY_URLS",
"http://p1:8080,http://p2:8080,http://p3:8080",
)
urls = load_proxy_urls()
assert urls == ["http://p1:8080", "http://p2:8080", "http://p3:8080"]
def test_strips_whitespace(self, monkeypatch):
monkeypatch.setenv("PROXY_URLS", " http://p1:8080 , http://p2:8080 ")
urls = load_proxy_urls()
assert urls == ["http://p1:8080", "http://p2:8080"]
def test_ignores_empty_segments(self, monkeypatch):
monkeypatch.setenv("PROXY_URLS", "http://p1:8080,,http://p2:8080,")
urls = load_proxy_urls()
assert urls == ["http://p1:8080", "http://p2:8080"]
class TestRoundRobinCycler:
def test_returns_none_callable_when_no_proxies(self):
fn = make_round_robin_cycler([])
assert fn() is None
def test_cycles_through_proxies(self):
urls = ["http://p1", "http://p2", "http://p3"]
fn = make_round_robin_cycler(urls)
results = [fn() for _ in range(6)]
assert results == ["http://p1", "http://p2", "http://p3"] * 2
def test_thread_safe_independent_calls(self):
"""Concurrent calls each get a proxy — no exceptions."""
import threading
urls = ["http://p1", "http://p2"]
fn = make_round_robin_cycler(urls)
results = []
lock = threading.Lock()
def worker():
proxy = fn()
with lock:
results.append(proxy)
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(results) == 10
assert all(r in urls for r in results)
class TestStickySelectorProxy:
def test_returns_none_callable_when_no_proxies(self):
fn = make_sticky_selector([])
assert fn("any_key") is None
def test_same_key_always_same_proxy(self):
urls = ["http://p1", "http://p2", "http://p3"]
fn = make_sticky_selector(urls)
proxy = fn("tenant_abc")
for _ in range(10):
assert fn("tenant_abc") == proxy
def test_different_keys_can_map_to_different_proxies(self):
urls = ["http://p1", "http://p2", "http://p3"]
fn = make_sticky_selector(urls)
results = {fn(f"key_{i}") for i in range(30)}
assert len(results) > 1 # distribution across proxies
def test_all_results_are_valid_proxies(self):
urls = ["http://p1", "http://p2"]
fn = make_sticky_selector(urls)
for i in range(20):
assert fn(f"key_{i}") in urls