merge: SOPS migration + Python supervisor + docs (3 repos)

This commit is contained in:
Deeman
2026-02-26 12:15:35 +01:00
22 changed files with 1244 additions and 370 deletions

View File

@@ -106,7 +106,7 @@ def pipeline_list():
typer.echo(f"{name:<15} (command: {cmd}, timeout: {config['timeout_seconds']}s)")
secrets_app = typer.Typer(help="Manage secrets via Pulumi ESC")
secrets_app = typer.Typer(help="Manage secrets via SOPS + age")
app.add_typer(secrets_app, name="secrets")
@@ -142,15 +142,15 @@ def secrets_get(
@secrets_app.command("test")
def secrets_test():
"""Test ESC connection and authentication."""
"""Test sops decryption (verifies sops is installed and age key is present)."""
from materia.secrets import test_connection
typer.echo("Testing Pulumi ESC connection...")
typer.echo("Testing SOPS decryption...")
if test_connection():
typer.echo("ESC connection successful")
typer.echo("SOPS decryption successful")
else:
typer.echo("ESC connection failed", err=True)
typer.echo("\nMake sure you've run: esc login")
typer.echo("SOPS decryption failed", err=True)
typer.echo("\nMake sure sops is installed and your age key is at ~/.config/sops/age/keys.txt")
raise typer.Exit(1)

View File

@@ -1,44 +1,67 @@
"""Secrets management via Pulumi ESC."""
"""Secrets management via SOPS + age."""
import json
import subprocess
from functools import lru_cache
from pathlib import Path
# Default secrets file path (relative to repo root)
_DEFAULT_SECRETS_PATH = Path(__file__).parent.parent.parent / ".env.prod.sops"
def _parse_dotenv(text: str) -> dict[str, str]:
"""Parse dotenv-format text into a dict, skipping comments and blanks."""
result = {}
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, _, value = line.partition("=")
result[key.strip()] = value.strip()
return result
@lru_cache(maxsize=1)
def _load_environment() -> dict[str, str]:
"""Load secrets from Pulumi ESC environment."""
def _load_environment(secrets_path: str = None) -> dict[str, str]:
"""Decrypt and load secrets from a SOPS-encrypted dotenv file."""
path = Path(secrets_path) if secrets_path else _DEFAULT_SECRETS_PATH
assert path.exists(), f"Secrets file not found: {path}"
try:
result = subprocess.run(
["esc", "env", "open", "beanflows/prod", "--format", "json"],
["sops", "--input-type", "dotenv", "--output-type", "dotenv", "--decrypt", str(path)],
capture_output=True,
text=True,
check=True,
timeout=30,
)
data = json.loads(result.stdout)
return data.get("environmentVariables", {})
return _parse_dotenv(result.stdout)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to load ESC environment: {e.stderr}")
raise RuntimeError(f"Failed to decrypt secrets: {e.stderr.strip()}")
except FileNotFoundError:
raise RuntimeError("ESC CLI not found. Install with: curl -fsSL https://get.pulumi.com/esc/install.sh | sh")
raise RuntimeError(
"sops not found. Install with: brew install sops "
"or see https://github.com/getsops/sops/releases"
)
def get_secret(key: str) -> str | None:
def get_secret(key: str, secrets_path: str = None) -> str | None:
"""Get a secret value by key."""
env = _load_environment()
env = _load_environment(secrets_path)
return env.get(key)
def list_secrets() -> list[str]:
def list_secrets(secrets_path: str = None) -> list[str]:
"""List all available secret keys."""
env = _load_environment()
env = _load_environment(secrets_path)
return list(env.keys())
def test_connection() -> bool:
"""Test ESC connection."""
def test_connection(secrets_path: str = None) -> bool:
"""Test that sops is available and can decrypt the secrets file."""
try:
_load_environment()
_load_environment.cache_clear()
_load_environment(secrets_path)
return True
except Exception:
return False

448
src/materia/supervisor.py Normal file
View File

@@ -0,0 +1,448 @@
"""Materia Supervisor — schedule-aware pipeline orchestration.
Reads a TOML workflow registry, runs extractors on cron-based schedules
(with dependency ordering and parallel execution), then runs SQLMesh
transform + export_serving. On production, polls for new git tags and
deploys the web app automatically when a new tag appears.
Crash safety: the main loop catches all exceptions and backs off, matching
the TigerBeetle CFO supervisor pattern. Combined with systemd Restart=always,
the supervisor is effectively unkillable.
Usage:
# Run the supervisor loop (production)
LANDING_DIR=data/landing uv run python src/materia/supervisor.py
# Show workflow status
LANDING_DIR=data/landing uv run python src/materia/supervisor.py status
"""
import importlib
import logging
import os
import subprocess
import sys
import time
import tomllib
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import UTC, datetime
from pathlib import Path
from croniter import croniter
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
TICK_INTERVAL_SECONDS = 60
BACKOFF_SECONDS = 600 # 10 min on tick failure
SUBPROCESS_TIMEOUT_SECONDS = 14400 # 4 hours max per subprocess
REPO_DIR = Path(os.getenv("REPO_DIR", "/opt/materia"))
LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing"))
DUCKDB_PATH = os.getenv("DUCKDB_PATH", "data/lakehouse.duckdb")
SERVING_DUCKDB_PATH = os.getenv("SERVING_DUCKDB_PATH", "analytics.duckdb")
ALERT_WEBHOOK_URL = os.getenv("ALERT_WEBHOOK_URL", "")
WORKFLOWS_PATH = Path(os.getenv("WORKFLOWS_PATH", "infra/supervisor/workflows.toml"))
NAMED_SCHEDULES = {
"hourly": "0 * * * *",
"daily": "0 5 * * *",
"weekly": "0 3 * * 1",
"monthly": "0 4 1 * *",
}
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("materia.supervisor")
# ---------------------------------------------------------------------------
# State DB helpers (reuse extraction state DB)
# ---------------------------------------------------------------------------
def _open_state_db():
"""Open the extraction state DB at {LANDING_DIR}/.state.sqlite."""
from extract_core.state import open_state_db
return open_state_db(LANDING_DIR)
def _get_last_success_time(conn, workflow_name: str) -> datetime | None:
"""Return the finish time of the last successful run, or None."""
row = conn.execute(
"SELECT MAX(finished_at) AS t FROM extraction_runs "
"WHERE extractor = ? AND status = 'success'",
(workflow_name,),
).fetchone()
if not row or not row["t"]:
return None
return datetime.fromisoformat(row["t"]).replace(tzinfo=UTC)
# ---------------------------------------------------------------------------
# Workflow loading + scheduling
# ---------------------------------------------------------------------------
def load_workflows(path: Path) -> list[dict]:
"""Load workflow definitions from TOML file."""
assert path.exists(), f"Workflows file not found: {path}"
with open(path, "rb") as f:
data = tomllib.load(f)
workflows = []
for name, cfg in data.items():
assert "module" in cfg, f"Workflow '{name}' missing 'module'"
assert "schedule" in cfg, f"Workflow '{name}' missing 'schedule'"
workflows.append({
"name": name,
"module": cfg["module"],
"entry": cfg.get("entry", "main"),
"schedule": cfg["schedule"],
"depends_on": cfg.get("depends_on", []),
})
return workflows
def resolve_schedule(schedule: str) -> str:
"""Resolve a named schedule to a cron expression, or pass through raw cron."""
return NAMED_SCHEDULES.get(schedule, schedule)
def is_due(conn, workflow: dict) -> bool:
"""Check if the most recent cron trigger hasn't been served yet."""
cron_expr = resolve_schedule(workflow["schedule"])
assert croniter.is_valid(cron_expr), f"Invalid cron: {cron_expr} for {workflow['name']}"
last_success = _get_last_success_time(conn, workflow["name"])
if last_success is None:
return True # never ran
now_naive = datetime.now(UTC).replace(tzinfo=None)
prev_trigger = croniter(cron_expr, now_naive).get_prev(datetime).replace(tzinfo=UTC)
return last_success < prev_trigger
# ---------------------------------------------------------------------------
# Topological ordering
# ---------------------------------------------------------------------------
def topological_waves(workflows: list[dict]) -> list[list[dict]]:
"""Group workflows into dependency waves for parallel execution.
Wave 0: no deps. Wave 1: depends only on wave 0. Etc.
"""
name_to_wf = {w["name"]: w for w in workflows}
due_names = set(name_to_wf.keys())
in_degree: dict[str, int] = {}
dependents: dict[str, list[str]] = defaultdict(list)
for w in workflows:
deps_in_scope = [d for d in w["depends_on"] if d in due_names]
in_degree[w["name"]] = len(deps_in_scope)
for d in deps_in_scope:
dependents[d].append(w["name"])
waves = []
remaining = set(due_names)
max_iterations = len(workflows) + 1
for _ in range(max_iterations):
if not remaining:
break
wave = [name_to_wf[n] for n in remaining if in_degree[n] == 0]
assert wave, f"Circular dependency detected among: {remaining}"
waves.append(wave)
for w in wave:
remaining.discard(w["name"])
for dep in dependents[w["name"]]:
if dep in remaining:
in_degree[dep] -= 1
return waves
# ---------------------------------------------------------------------------
# Workflow execution
# ---------------------------------------------------------------------------
def run_workflow(conn, workflow: dict) -> None:
"""Run a single workflow by importing its module and calling the entry function."""
module_name = workflow["module"]
entry_name = workflow["entry"]
logger.info("Running workflow: %s (%s.%s)", workflow["name"], module_name, entry_name)
try:
module = importlib.import_module(module_name)
entry_fn = getattr(module, entry_name)
entry_fn()
logger.info("Workflow %s completed successfully", workflow["name"])
except Exception:
logger.exception("Workflow %s failed", workflow["name"])
send_alert(f"Workflow '{workflow['name']}' failed")
raise
def run_due_workflows(conn, workflows: list[dict]) -> bool:
"""Run all due workflows in dependency-wave order. Returns True if any ran."""
due = [w for w in workflows if is_due(conn, w)]
if not due:
logger.info("No workflows due")
return False
logger.info("Due workflows: %s", [w["name"] for w in due])
waves = topological_waves(due)
for i, wave in enumerate(waves):
wave_names = [w["name"] for w in wave]
logger.info("Wave %d: %s", i, wave_names)
if len(wave) == 1:
try:
run_workflow(conn, wave[0])
except Exception:
pass # already logged in run_workflow
else:
with ThreadPoolExecutor(max_workers=len(wave)) as pool:
futures = {pool.submit(run_workflow, conn, w): w for w in wave}
for future in as_completed(futures):
try:
future.result()
except Exception:
pass # already logged in run_workflow
return True
# ---------------------------------------------------------------------------
# Transform + Export + Deploy
# ---------------------------------------------------------------------------
def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> bool:
"""Run a shell command. Returns True on success."""
logger.info("Shell: %s", cmd)
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=timeout_seconds
)
if result.returncode != 0:
logger.error(
"Shell failed (rc=%d): %s\nstdout: %s\nstderr: %s",
result.returncode, cmd, result.stdout[-500:], result.stderr[-500:],
)
return False
return True
def run_transform() -> None:
"""Run SQLMesh — evaluates model staleness internally."""
logger.info("Running SQLMesh transform")
ok = run_shell("uv run sqlmesh -p transform/sqlmesh_materia run")
if not ok:
send_alert("SQLMesh transform failed")
def run_export() -> None:
"""Export serving tables to analytics.duckdb."""
logger.info("Exporting serving tables")
ok = run_shell(
f"DUCKDB_PATH={DUCKDB_PATH} SERVING_DUCKDB_PATH={SERVING_DUCKDB_PATH} "
f"uv run materia pipeline run export_serving"
)
if not ok:
send_alert("Serving export failed")
def web_code_changed() -> bool:
"""Check if web app code changed since last deploy."""
result = subprocess.run(
["git", "diff", "--name-only", "HEAD~1", "HEAD", "--", "web/", "web/Dockerfile"],
capture_output=True, text=True, timeout=30,
)
return bool(result.stdout.strip())
def current_deployed_tag() -> str | None:
"""Return the tag currently checked out, or None if not on a tag."""
result = subprocess.run(
["git", "describe", "--tags", "--exact-match", "HEAD"],
capture_output=True, text=True, timeout=10,
)
return result.stdout.strip() or None
def latest_remote_tag() -> str | None:
"""Fetch tags from origin and return the latest v<n> tag."""
subprocess.run(
["git", "fetch", "--tags", "--prune-tags", "origin"],
capture_output=True, text=True, timeout=30,
)
result = subprocess.run(
["git", "tag", "--list", "--sort=-version:refname", "v*"],
capture_output=True, text=True, timeout=10,
)
tags = result.stdout.strip().splitlines()
return tags[0] if tags else None
def git_pull_and_sync() -> None:
"""Checkout the latest passing release tag and sync dependencies.
A tag v<N> is created by CI only after tests pass, so presence of a new
tag implies green CI. Skips if already on the latest tag.
"""
latest = latest_remote_tag()
if not latest:
logger.info("No release tags found — skipping pull")
return
current = current_deployed_tag()
if current == latest:
logger.info("Already on latest tag %s — skipping pull", latest)
return
logger.info("New tag %s available (current: %s) — deploying", latest, current)
run_shell(f"git checkout --detach {latest}")
run_shell("uv sync --all-packages")
# ---------------------------------------------------------------------------
# Alerting
# ---------------------------------------------------------------------------
def send_alert(message: str) -> None:
"""Send alert via ntfy.sh (or any webhook accepting POST body)."""
if not ALERT_WEBHOOK_URL:
return
timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
try:
subprocess.run(
["curl", "-s", "-d", f"[{timestamp}] {message}", ALERT_WEBHOOK_URL],
timeout=10, capture_output=True,
)
except Exception:
logger.exception("Failed to send alert")
# ---------------------------------------------------------------------------
# Main loop
# ---------------------------------------------------------------------------
def tick() -> None:
"""One cycle: git pull, run due extractors, transform, export, maybe deploy."""
workflows = load_workflows(WORKFLOWS_PATH)
conn = _open_state_db()
try:
# Git pull + sync (production only — SUPERVISOR_GIT_PULL env var set in systemd service)
if os.getenv("SUPERVISOR_GIT_PULL"):
git_pull_and_sync()
# Run due extractors
run_due_workflows(conn, workflows)
# SQLMesh always runs (evaluates model staleness internally)
run_transform()
# Export serving tables to analytics.duckdb
run_export()
# Deploy web app if code changed
if os.getenv("SUPERVISOR_GIT_PULL") and web_code_changed():
logger.info("Web code changed — deploying")
ok = run_shell("./web/deploy.sh")
if ok:
send_alert("Deploy succeeded")
else:
send_alert("Deploy FAILED — check journalctl -u materia-supervisor")
finally:
conn.close()
def supervisor_loop() -> None:
"""Infinite supervisor loop — never exits unless killed."""
logger.info("Supervisor starting (tick interval: %ds)", TICK_INTERVAL_SECONDS)
logger.info("Workflows: %s", WORKFLOWS_PATH)
logger.info("Landing dir: %s", LANDING_DIR)
while True:
try:
tick()
except KeyboardInterrupt:
logger.info("Supervisor stopped (KeyboardInterrupt)")
break
except Exception:
logger.exception("Supervisor tick failed — backing off %ds", BACKOFF_SECONDS)
send_alert("Supervisor tick failed")
time.sleep(BACKOFF_SECONDS)
else:
time.sleep(TICK_INTERVAL_SECONDS)
# ---------------------------------------------------------------------------
# Status CLI
# ---------------------------------------------------------------------------
def print_status() -> None:
"""Print workflow status table."""
workflows = load_workflows(WORKFLOWS_PATH)
conn = _open_state_db()
now = datetime.now(UTC)
print(f"{'Workflow':<28} {'Schedule':<18} {'Last Run':<20} {'Status':<8} {'Next'}")
print(f"{'' * 28} {'' * 18} {'' * 20} {'' * 8} {'' * 12}")
for w in workflows:
last_success = _get_last_success_time(conn, w["name"])
cron_expr = resolve_schedule(w["schedule"])
if last_success:
last_str = last_success.strftime("%Y-%m-%d %H:%M")
status = "ok"
else:
last_str = "never"
status = "pending"
row = conn.execute(
"SELECT MAX(finished_at) AS t FROM extraction_runs "
"WHERE extractor = ? AND status = 'failed'",
(w["name"],),
).fetchone()
if row and row["t"]:
last_fail = datetime.fromisoformat(row["t"]).replace(tzinfo=UTC)
if last_success is None or last_fail > last_success:
status = "FAILED"
now_naive = now.replace(tzinfo=None)
next_trigger = croniter(cron_expr, now_naive).get_next(datetime)
delta = next_trigger - now_naive
if delta.total_seconds() < 3600:
next_str = f"in {int(delta.total_seconds() / 60)}m"
elif delta.total_seconds() < 86400:
next_str = next_trigger.strftime("%H:%M")
else:
next_str = next_trigger.strftime("%b %d")
schedule_display = w["schedule"] if w["schedule"] in NAMED_SCHEDULES else cron_expr
print(f"{w['name']:<28} {schedule_display:<18} {last_str:<20} {status:<8} {next_str}")
conn.close()
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
if len(sys.argv) > 1 and sys.argv[1] == "status":
print_status()
else:
supervisor_loop()
if __name__ == "__main__":
main()