feat(supervisor): port Python supervisor from padelnomics + workflows.toml
Port padelnomics' schedule-aware Python supervisor to materia: - src/materia/supervisor.py — croniter scheduling, topological wave execution (parallel independent workflows), tag-based git pull + deploy, status CLI subcommand - infra/supervisor/workflows.toml — workflow registry (psd daily, cot weekly, prices daily, ice daily, weather daily) - infra/supervisor/materia-supervisor.service — updated ExecStart to Python supervisor, added SUPERVISOR_GIT_PULL=1 Adaptations from padelnomics: - Uses extract_core.state.open_state_db (not padelnomics_extract.utils) - uv run sqlmesh -p transform/sqlmesh_materia run - uv run materia pipeline run export_serving - web/deploy.sh path (materia's deploy.sh is under web/) - Removed proxy_mode (not used in materia) Also: add croniter dependency to src/materia, delete old supervisor.sh. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
448
src/materia/supervisor.py
Normal file
448
src/materia/supervisor.py
Normal file
@@ -0,0 +1,448 @@
|
||||
"""Materia Supervisor — schedule-aware pipeline orchestration.
|
||||
|
||||
Reads a TOML workflow registry, runs extractors on cron-based schedules
|
||||
(with dependency ordering and parallel execution), then runs SQLMesh
|
||||
transform + export_serving. On production, polls for new git tags and
|
||||
deploys the web app automatically when a new tag appears.
|
||||
|
||||
Crash safety: the main loop catches all exceptions and backs off, matching
|
||||
the TigerBeetle CFO supervisor pattern. Combined with systemd Restart=always,
|
||||
the supervisor is effectively unkillable.
|
||||
|
||||
Usage:
|
||||
# Run the supervisor loop (production)
|
||||
LANDING_DIR=data/landing uv run python src/materia/supervisor.py
|
||||
|
||||
# Show workflow status
|
||||
LANDING_DIR=data/landing uv run python src/materia/supervisor.py status
|
||||
"""
|
||||
|
||||
import importlib
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import tomllib
|
||||
from collections import defaultdict
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from croniter import croniter
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
TICK_INTERVAL_SECONDS = 60
|
||||
BACKOFF_SECONDS = 600 # 10 min on tick failure
|
||||
SUBPROCESS_TIMEOUT_SECONDS = 14400 # 4 hours max per subprocess
|
||||
REPO_DIR = Path(os.getenv("REPO_DIR", "/opt/materia"))
|
||||
LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing"))
|
||||
DUCKDB_PATH = os.getenv("DUCKDB_PATH", "data/lakehouse.duckdb")
|
||||
SERVING_DUCKDB_PATH = os.getenv("SERVING_DUCKDB_PATH", "analytics.duckdb")
|
||||
ALERT_WEBHOOK_URL = os.getenv("ALERT_WEBHOOK_URL", "")
|
||||
WORKFLOWS_PATH = Path(os.getenv("WORKFLOWS_PATH", "infra/supervisor/workflows.toml"))
|
||||
|
||||
NAMED_SCHEDULES = {
|
||||
"hourly": "0 * * * *",
|
||||
"daily": "0 5 * * *",
|
||||
"weekly": "0 3 * * 1",
|
||||
"monthly": "0 4 1 * *",
|
||||
}
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(name)s %(levelname)s %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
handlers=[logging.StreamHandler(sys.stdout)],
|
||||
)
|
||||
logger = logging.getLogger("materia.supervisor")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# State DB helpers (reuse extraction state DB)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _open_state_db():
|
||||
"""Open the extraction state DB at {LANDING_DIR}/.state.sqlite."""
|
||||
from extract_core.state import open_state_db
|
||||
return open_state_db(LANDING_DIR)
|
||||
|
||||
|
||||
def _get_last_success_time(conn, workflow_name: str) -> datetime | None:
|
||||
"""Return the finish time of the last successful run, or None."""
|
||||
row = conn.execute(
|
||||
"SELECT MAX(finished_at) AS t FROM extraction_runs "
|
||||
"WHERE extractor = ? AND status = 'success'",
|
||||
(workflow_name,),
|
||||
).fetchone()
|
||||
if not row or not row["t"]:
|
||||
return None
|
||||
return datetime.fromisoformat(row["t"]).replace(tzinfo=UTC)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Workflow loading + scheduling
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_workflows(path: Path) -> list[dict]:
|
||||
"""Load workflow definitions from TOML file."""
|
||||
assert path.exists(), f"Workflows file not found: {path}"
|
||||
with open(path, "rb") as f:
|
||||
data = tomllib.load(f)
|
||||
|
||||
workflows = []
|
||||
for name, cfg in data.items():
|
||||
assert "module" in cfg, f"Workflow '{name}' missing 'module'"
|
||||
assert "schedule" in cfg, f"Workflow '{name}' missing 'schedule'"
|
||||
workflows.append({
|
||||
"name": name,
|
||||
"module": cfg["module"],
|
||||
"entry": cfg.get("entry", "main"),
|
||||
"schedule": cfg["schedule"],
|
||||
"depends_on": cfg.get("depends_on", []),
|
||||
})
|
||||
return workflows
|
||||
|
||||
|
||||
def resolve_schedule(schedule: str) -> str:
|
||||
"""Resolve a named schedule to a cron expression, or pass through raw cron."""
|
||||
return NAMED_SCHEDULES.get(schedule, schedule)
|
||||
|
||||
|
||||
def is_due(conn, workflow: dict) -> bool:
|
||||
"""Check if the most recent cron trigger hasn't been served yet."""
|
||||
cron_expr = resolve_schedule(workflow["schedule"])
|
||||
assert croniter.is_valid(cron_expr), f"Invalid cron: {cron_expr} for {workflow['name']}"
|
||||
|
||||
last_success = _get_last_success_time(conn, workflow["name"])
|
||||
if last_success is None:
|
||||
return True # never ran
|
||||
|
||||
now_naive = datetime.now(UTC).replace(tzinfo=None)
|
||||
prev_trigger = croniter(cron_expr, now_naive).get_prev(datetime).replace(tzinfo=UTC)
|
||||
return last_success < prev_trigger
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Topological ordering
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def topological_waves(workflows: list[dict]) -> list[list[dict]]:
|
||||
"""Group workflows into dependency waves for parallel execution.
|
||||
|
||||
Wave 0: no deps. Wave 1: depends only on wave 0. Etc.
|
||||
"""
|
||||
name_to_wf = {w["name"]: w for w in workflows}
|
||||
due_names = set(name_to_wf.keys())
|
||||
|
||||
in_degree: dict[str, int] = {}
|
||||
dependents: dict[str, list[str]] = defaultdict(list)
|
||||
for w in workflows:
|
||||
deps_in_scope = [d for d in w["depends_on"] if d in due_names]
|
||||
in_degree[w["name"]] = len(deps_in_scope)
|
||||
for d in deps_in_scope:
|
||||
dependents[d].append(w["name"])
|
||||
|
||||
waves = []
|
||||
remaining = set(due_names)
|
||||
max_iterations = len(workflows) + 1
|
||||
|
||||
for _ in range(max_iterations):
|
||||
if not remaining:
|
||||
break
|
||||
wave = [name_to_wf[n] for n in remaining if in_degree[n] == 0]
|
||||
assert wave, f"Circular dependency detected among: {remaining}"
|
||||
waves.append(wave)
|
||||
for w in wave:
|
||||
remaining.discard(w["name"])
|
||||
for dep in dependents[w["name"]]:
|
||||
if dep in remaining:
|
||||
in_degree[dep] -= 1
|
||||
|
||||
return waves
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Workflow execution
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def run_workflow(conn, workflow: dict) -> None:
|
||||
"""Run a single workflow by importing its module and calling the entry function."""
|
||||
module_name = workflow["module"]
|
||||
entry_name = workflow["entry"]
|
||||
|
||||
logger.info("Running workflow: %s (%s.%s)", workflow["name"], module_name, entry_name)
|
||||
|
||||
try:
|
||||
module = importlib.import_module(module_name)
|
||||
entry_fn = getattr(module, entry_name)
|
||||
entry_fn()
|
||||
logger.info("Workflow %s completed successfully", workflow["name"])
|
||||
except Exception:
|
||||
logger.exception("Workflow %s failed", workflow["name"])
|
||||
send_alert(f"Workflow '{workflow['name']}' failed")
|
||||
raise
|
||||
|
||||
|
||||
def run_due_workflows(conn, workflows: list[dict]) -> bool:
|
||||
"""Run all due workflows in dependency-wave order. Returns True if any ran."""
|
||||
due = [w for w in workflows if is_due(conn, w)]
|
||||
if not due:
|
||||
logger.info("No workflows due")
|
||||
return False
|
||||
|
||||
logger.info("Due workflows: %s", [w["name"] for w in due])
|
||||
waves = topological_waves(due)
|
||||
|
||||
for i, wave in enumerate(waves):
|
||||
wave_names = [w["name"] for w in wave]
|
||||
logger.info("Wave %d: %s", i, wave_names)
|
||||
|
||||
if len(wave) == 1:
|
||||
try:
|
||||
run_workflow(conn, wave[0])
|
||||
except Exception:
|
||||
pass # already logged in run_workflow
|
||||
else:
|
||||
with ThreadPoolExecutor(max_workers=len(wave)) as pool:
|
||||
futures = {pool.submit(run_workflow, conn, w): w for w in wave}
|
||||
for future in as_completed(futures):
|
||||
try:
|
||||
future.result()
|
||||
except Exception:
|
||||
pass # already logged in run_workflow
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Transform + Export + Deploy
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> bool:
|
||||
"""Run a shell command. Returns True on success."""
|
||||
logger.info("Shell: %s", cmd)
|
||||
result = subprocess.run(
|
||||
cmd, shell=True, capture_output=True, text=True, timeout=timeout_seconds
|
||||
)
|
||||
if result.returncode != 0:
|
||||
logger.error(
|
||||
"Shell failed (rc=%d): %s\nstdout: %s\nstderr: %s",
|
||||
result.returncode, cmd, result.stdout[-500:], result.stderr[-500:],
|
||||
)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def run_transform() -> None:
|
||||
"""Run SQLMesh — evaluates model staleness internally."""
|
||||
logger.info("Running SQLMesh transform")
|
||||
ok = run_shell("uv run sqlmesh -p transform/sqlmesh_materia run")
|
||||
if not ok:
|
||||
send_alert("SQLMesh transform failed")
|
||||
|
||||
|
||||
def run_export() -> None:
|
||||
"""Export serving tables to analytics.duckdb."""
|
||||
logger.info("Exporting serving tables")
|
||||
ok = run_shell(
|
||||
f"DUCKDB_PATH={DUCKDB_PATH} SERVING_DUCKDB_PATH={SERVING_DUCKDB_PATH} "
|
||||
f"uv run materia pipeline run export_serving"
|
||||
)
|
||||
if not ok:
|
||||
send_alert("Serving export failed")
|
||||
|
||||
|
||||
def web_code_changed() -> bool:
|
||||
"""Check if web app code changed since last deploy."""
|
||||
result = subprocess.run(
|
||||
["git", "diff", "--name-only", "HEAD~1", "HEAD", "--", "web/", "web/Dockerfile"],
|
||||
capture_output=True, text=True, timeout=30,
|
||||
)
|
||||
return bool(result.stdout.strip())
|
||||
|
||||
|
||||
def current_deployed_tag() -> str | None:
|
||||
"""Return the tag currently checked out, or None if not on a tag."""
|
||||
result = subprocess.run(
|
||||
["git", "describe", "--tags", "--exact-match", "HEAD"],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
return result.stdout.strip() or None
|
||||
|
||||
|
||||
def latest_remote_tag() -> str | None:
|
||||
"""Fetch tags from origin and return the latest v<n> tag."""
|
||||
subprocess.run(
|
||||
["git", "fetch", "--tags", "--prune-tags", "origin"],
|
||||
capture_output=True, text=True, timeout=30,
|
||||
)
|
||||
result = subprocess.run(
|
||||
["git", "tag", "--list", "--sort=-version:refname", "v*"],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
tags = result.stdout.strip().splitlines()
|
||||
return tags[0] if tags else None
|
||||
|
||||
|
||||
def git_pull_and_sync() -> None:
|
||||
"""Checkout the latest passing release tag and sync dependencies.
|
||||
|
||||
A tag v<N> is created by CI only after tests pass, so presence of a new
|
||||
tag implies green CI. Skips if already on the latest tag.
|
||||
"""
|
||||
latest = latest_remote_tag()
|
||||
if not latest:
|
||||
logger.info("No release tags found — skipping pull")
|
||||
return
|
||||
|
||||
current = current_deployed_tag()
|
||||
if current == latest:
|
||||
logger.info("Already on latest tag %s — skipping pull", latest)
|
||||
return
|
||||
|
||||
logger.info("New tag %s available (current: %s) — deploying", latest, current)
|
||||
run_shell(f"git checkout --detach {latest}")
|
||||
run_shell("uv sync --all-packages")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Alerting
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def send_alert(message: str) -> None:
|
||||
"""Send alert via ntfy.sh (or any webhook accepting POST body)."""
|
||||
if not ALERT_WEBHOOK_URL:
|
||||
return
|
||||
timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
|
||||
try:
|
||||
subprocess.run(
|
||||
["curl", "-s", "-d", f"[{timestamp}] {message}", ALERT_WEBHOOK_URL],
|
||||
timeout=10, capture_output=True,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to send alert")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main loop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def tick() -> None:
|
||||
"""One cycle: git pull, run due extractors, transform, export, maybe deploy."""
|
||||
workflows = load_workflows(WORKFLOWS_PATH)
|
||||
conn = _open_state_db()
|
||||
|
||||
try:
|
||||
# Git pull + sync (production only — SUPERVISOR_GIT_PULL env var set in systemd service)
|
||||
if os.getenv("SUPERVISOR_GIT_PULL"):
|
||||
git_pull_and_sync()
|
||||
|
||||
# Run due extractors
|
||||
run_due_workflows(conn, workflows)
|
||||
|
||||
# SQLMesh always runs (evaluates model staleness internally)
|
||||
run_transform()
|
||||
|
||||
# Export serving tables to analytics.duckdb
|
||||
run_export()
|
||||
|
||||
# Deploy web app if code changed
|
||||
if os.getenv("SUPERVISOR_GIT_PULL") and web_code_changed():
|
||||
logger.info("Web code changed — deploying")
|
||||
ok = run_shell("./web/deploy.sh")
|
||||
if ok:
|
||||
send_alert("Deploy succeeded")
|
||||
else:
|
||||
send_alert("Deploy FAILED — check journalctl -u materia-supervisor")
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def supervisor_loop() -> None:
|
||||
"""Infinite supervisor loop — never exits unless killed."""
|
||||
logger.info("Supervisor starting (tick interval: %ds)", TICK_INTERVAL_SECONDS)
|
||||
logger.info("Workflows: %s", WORKFLOWS_PATH)
|
||||
logger.info("Landing dir: %s", LANDING_DIR)
|
||||
|
||||
while True:
|
||||
try:
|
||||
tick()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Supervisor stopped (KeyboardInterrupt)")
|
||||
break
|
||||
except Exception:
|
||||
logger.exception("Supervisor tick failed — backing off %ds", BACKOFF_SECONDS)
|
||||
send_alert("Supervisor tick failed")
|
||||
time.sleep(BACKOFF_SECONDS)
|
||||
else:
|
||||
time.sleep(TICK_INTERVAL_SECONDS)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Status CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def print_status() -> None:
|
||||
"""Print workflow status table."""
|
||||
workflows = load_workflows(WORKFLOWS_PATH)
|
||||
conn = _open_state_db()
|
||||
|
||||
now = datetime.now(UTC)
|
||||
|
||||
print(f"{'Workflow':<28} {'Schedule':<18} {'Last Run':<20} {'Status':<8} {'Next'}")
|
||||
print(f"{'─' * 28} {'─' * 18} {'─' * 20} {'─' * 8} {'─' * 12}")
|
||||
|
||||
for w in workflows:
|
||||
last_success = _get_last_success_time(conn, w["name"])
|
||||
cron_expr = resolve_schedule(w["schedule"])
|
||||
|
||||
if last_success:
|
||||
last_str = last_success.strftime("%Y-%m-%d %H:%M")
|
||||
status = "ok"
|
||||
else:
|
||||
last_str = "never"
|
||||
status = "pending"
|
||||
|
||||
row = conn.execute(
|
||||
"SELECT MAX(finished_at) AS t FROM extraction_runs "
|
||||
"WHERE extractor = ? AND status = 'failed'",
|
||||
(w["name"],),
|
||||
).fetchone()
|
||||
if row and row["t"]:
|
||||
last_fail = datetime.fromisoformat(row["t"]).replace(tzinfo=UTC)
|
||||
if last_success is None or last_fail > last_success:
|
||||
status = "FAILED"
|
||||
|
||||
now_naive = now.replace(tzinfo=None)
|
||||
next_trigger = croniter(cron_expr, now_naive).get_next(datetime)
|
||||
delta = next_trigger - now_naive
|
||||
if delta.total_seconds() < 3600:
|
||||
next_str = f"in {int(delta.total_seconds() / 60)}m"
|
||||
elif delta.total_seconds() < 86400:
|
||||
next_str = next_trigger.strftime("%H:%M")
|
||||
else:
|
||||
next_str = next_trigger.strftime("%b %d")
|
||||
|
||||
schedule_display = w["schedule"] if w["schedule"] in NAMED_SCHEDULES else cron_expr
|
||||
print(f"{w['name']:<28} {schedule_display:<18} {last_str:<20} {status:<8} {next_str}")
|
||||
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main() -> None:
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "status":
|
||||
print_status()
|
||||
else:
|
||||
supervisor_loop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user