feat: Python supervisor + feature flags
Supervisor (replaces supervisor.sh): - supervisor.py — cron-based pipeline orchestration, reads workflows.toml on every tick, runs due extractors in topological waves with parallel execution, then SQLMesh transform + serving export - workflows.toml — workflow registry: overpass (monthly), eurostat (monthly), playtomic_tenants (weekly), playtomic_availability (daily), playtomic_recheck (hourly 6–23) - padelnomics-supervisor.service — updated ExecStart to Python supervisor Extraction enhancements: - proxy.py — optional round-robin/sticky proxy rotation via PROXY_URLS env - playtomic_availability.py — parallel fetch (EXTRACT_WORKERS), recheck mode (main_recheck) re-queries imminent slots for accurate occupancy measurement - _shared.py — realistic browser User-Agent on all extractor sessions - stg_playtomic_availability.sql — reads morning + recheck snapshots, tags each - fct_daily_availability.sql — prefers recheck over morning for same slot Feature flags (replaces WAITLIST_MODE env var): - migration 0019 — feature_flags table, 5 initial flags: markets (on), payments/planner_export/supplier_signup/lead_unlock (off) - core.py — is_flag_enabled() + feature_gate() decorator - routes — payments, markets, planner_export, supplier_signup, lead_unlock gated - admin flags UI — /admin/flags toggle page + nav link - app.py — flag() injected as Jinja2 global Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
0
src/padelnomics/__init__.py
Normal file
0
src/padelnomics/__init__.py
Normal file
416
src/padelnomics/supervisor.py
Normal file
416
src/padelnomics/supervisor.py
Normal file
@@ -0,0 +1,416 @@
|
||||
"""Padelnomics Supervisor — schedule-aware pipeline orchestration.
|
||||
|
||||
Replaces supervisor.sh with a Python supervisor that reads a TOML workflow
|
||||
registry, runs extractors on cron-based schedules (with dependency ordering
|
||||
and parallel execution), then runs SQLMesh transform + export.
|
||||
|
||||
Crash safety: the main loop catches all exceptions and backs off, matching
|
||||
the TigerBeetle CFO supervisor pattern. Combined with systemd Restart=always,
|
||||
the supervisor is effectively unkillable.
|
||||
|
||||
Usage:
|
||||
# Run the supervisor loop (production)
|
||||
LANDING_DIR=data/landing uv run python src/padelnomics/supervisor.py
|
||||
|
||||
# Show workflow status
|
||||
LANDING_DIR=data/landing uv run python src/padelnomics/supervisor.py status
|
||||
"""
|
||||
|
||||
import importlib
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import tomllib
|
||||
from collections import defaultdict
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from croniter import croniter
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
TICK_INTERVAL_SECONDS = 60
|
||||
BACKOFF_SECONDS = 600 # 10 min on tick failure (matches shell version)
|
||||
SUBPROCESS_TIMEOUT_SECONDS = 14400 # 4 hours max per subprocess
|
||||
REPO_DIR = Path(os.getenv("REPO_DIR", "/opt/padelnomics"))
|
||||
LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing"))
|
||||
DUCKDB_PATH = os.getenv("DUCKDB_PATH", "data/lakehouse.duckdb")
|
||||
SERVING_DUCKDB_PATH = os.getenv("SERVING_DUCKDB_PATH", "analytics.duckdb")
|
||||
ALERT_WEBHOOK_URL = os.getenv("ALERT_WEBHOOK_URL", "")
|
||||
WORKFLOWS_PATH = Path(os.getenv("WORKFLOWS_PATH", "infra/supervisor/workflows.toml"))
|
||||
|
||||
NAMED_SCHEDULES = {
|
||||
"hourly": "0 * * * *",
|
||||
"daily": "0 5 * * *",
|
||||
"weekly": "0 3 * * 1",
|
||||
"monthly": "0 4 1 * *",
|
||||
}
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(name)s %(levelname)s %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
handlers=[logging.StreamHandler(sys.stdout)],
|
||||
)
|
||||
logger = logging.getLogger("padelnomics.supervisor")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# State DB helpers (reuse extraction state DB)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _open_state_db():
|
||||
"""Open the extraction state DB. Reuses the same .state.sqlite as extractors."""
|
||||
# Import here to avoid circular deps at module level
|
||||
from padelnomics_extract.utils import open_state_db
|
||||
|
||||
return open_state_db(LANDING_DIR)
|
||||
|
||||
|
||||
def _get_last_success_time(conn, workflow_name: str) -> datetime | None:
|
||||
"""Return the finish time of the last successful run, or None."""
|
||||
row = conn.execute(
|
||||
"SELECT MAX(finished_at) AS t FROM extraction_runs "
|
||||
"WHERE extractor = ? AND status = 'success'",
|
||||
(workflow_name,),
|
||||
).fetchone()
|
||||
if not row or not row["t"]:
|
||||
return None
|
||||
return datetime.fromisoformat(row["t"]).replace(tzinfo=UTC)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Workflow loading + scheduling
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_workflows(path: Path) -> list[dict]:
|
||||
"""Load workflow definitions from TOML file."""
|
||||
assert path.exists(), f"Workflows file not found: {path}"
|
||||
with open(path, "rb") as f:
|
||||
data = tomllib.load(f)
|
||||
|
||||
workflows = []
|
||||
for name, cfg in data.items():
|
||||
assert "module" in cfg, f"Workflow '{name}' missing 'module'"
|
||||
assert "schedule" in cfg, f"Workflow '{name}' missing 'schedule'"
|
||||
workflows.append({
|
||||
"name": name,
|
||||
"module": cfg["module"],
|
||||
"entry": cfg.get("entry", "main"),
|
||||
"schedule": cfg["schedule"],
|
||||
"depends_on": cfg.get("depends_on", []),
|
||||
"proxy_mode": cfg.get("proxy_mode", "round-robin"),
|
||||
})
|
||||
return workflows
|
||||
|
||||
|
||||
def resolve_schedule(schedule: str) -> str:
|
||||
"""Resolve a named schedule to a cron expression, or pass through raw cron."""
|
||||
return NAMED_SCHEDULES.get(schedule, schedule)
|
||||
|
||||
|
||||
def is_due(conn, workflow: dict) -> bool:
|
||||
"""Check if the most recent cron trigger hasn't been served yet."""
|
||||
cron_expr = resolve_schedule(workflow["schedule"])
|
||||
assert croniter.is_valid(cron_expr), f"Invalid cron: {cron_expr} for {workflow['name']}"
|
||||
|
||||
last_success = _get_last_success_time(conn, workflow["name"])
|
||||
if last_success is None:
|
||||
return True # never ran
|
||||
|
||||
now_naive = datetime.now(UTC).replace(tzinfo=None)
|
||||
prev_trigger = croniter(cron_expr, now_naive).get_prev(datetime).replace(tzinfo=UTC)
|
||||
return last_success < prev_trigger
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Topological ordering
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def topological_waves(workflows: list[dict]) -> list[list[dict]]:
|
||||
"""Group workflows into dependency waves for parallel execution.
|
||||
|
||||
Wave 0: no deps. Wave 1: depends only on wave 0. Etc.
|
||||
Workflows whose dependencies aren't in the 'due' set are treated as having no deps.
|
||||
"""
|
||||
name_to_wf = {w["name"]: w for w in workflows}
|
||||
due_names = set(name_to_wf.keys())
|
||||
|
||||
# Build in-degree map (only count deps that are in the due set)
|
||||
in_degree: dict[str, int] = {}
|
||||
dependents: dict[str, list[str]] = defaultdict(list)
|
||||
for w in workflows:
|
||||
deps_in_scope = [d for d in w["depends_on"] if d in due_names]
|
||||
in_degree[w["name"]] = len(deps_in_scope)
|
||||
for d in deps_in_scope:
|
||||
dependents[d].append(w["name"])
|
||||
|
||||
waves = []
|
||||
remaining = set(due_names)
|
||||
max_iterations = len(workflows) + 1 # safety bound
|
||||
|
||||
for _ in range(max_iterations):
|
||||
if not remaining:
|
||||
break
|
||||
# Wave = all workflows with in_degree 0
|
||||
wave = [name_to_wf[n] for n in remaining if in_degree[n] == 0]
|
||||
assert wave, f"Circular dependency detected among: {remaining}"
|
||||
waves.append(wave)
|
||||
for w in wave:
|
||||
remaining.discard(w["name"])
|
||||
for dep in dependents[w["name"]]:
|
||||
if dep in remaining:
|
||||
in_degree[dep] -= 1
|
||||
|
||||
return waves
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Workflow execution
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def run_workflow(conn, workflow: dict) -> None:
|
||||
"""Run a single workflow by importing its module and calling the entry function."""
|
||||
module_name = workflow["module"]
|
||||
entry_name = workflow["entry"]
|
||||
|
||||
logger.info("Running workflow: %s (%s.%s)", workflow["name"], module_name, entry_name)
|
||||
|
||||
try:
|
||||
module = importlib.import_module(module_name)
|
||||
entry_fn = getattr(module, entry_name)
|
||||
entry_fn()
|
||||
logger.info("Workflow %s completed successfully", workflow["name"])
|
||||
except Exception:
|
||||
logger.exception("Workflow %s failed", workflow["name"])
|
||||
send_alert(f"Workflow '{workflow['name']}' failed")
|
||||
raise
|
||||
|
||||
|
||||
def run_due_workflows(conn, workflows: list[dict]) -> bool:
|
||||
"""Run all due workflows. Independent ones run in parallel. Returns True if any ran."""
|
||||
due = [w for w in workflows if is_due(conn, w)]
|
||||
if not due:
|
||||
logger.info("No workflows due")
|
||||
return False
|
||||
|
||||
logger.info("Due workflows: %s", [w["name"] for w in due])
|
||||
waves = topological_waves(due)
|
||||
|
||||
for i, wave in enumerate(waves):
|
||||
wave_names = [w["name"] for w in wave]
|
||||
logger.info("Wave %d: %s", i, wave_names)
|
||||
|
||||
if len(wave) == 1:
|
||||
try:
|
||||
run_workflow(conn, wave[0])
|
||||
except Exception:
|
||||
pass # already logged in run_workflow
|
||||
else:
|
||||
with ThreadPoolExecutor(max_workers=len(wave)) as pool:
|
||||
futures = {pool.submit(run_workflow, conn, w): w for w in wave}
|
||||
for future in as_completed(futures):
|
||||
try:
|
||||
future.result()
|
||||
except Exception:
|
||||
pass # already logged in run_workflow
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Transform + Export + Deploy
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> bool:
|
||||
"""Run a shell command. Returns True on success."""
|
||||
logger.info("Shell: %s", cmd)
|
||||
result = subprocess.run(
|
||||
cmd, shell=True, capture_output=True, text=True, timeout=timeout_seconds
|
||||
)
|
||||
if result.returncode != 0:
|
||||
logger.error("Shell failed (rc=%d): %s\nstdout: %s\nstderr: %s",
|
||||
result.returncode, cmd, result.stdout[-500:], result.stderr[-500:])
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def run_transform() -> None:
|
||||
"""Run SQLMesh — it evaluates model staleness internally."""
|
||||
logger.info("Running SQLMesh transform")
|
||||
ok = run_shell(
|
||||
f"uv run sqlmesh -p transform/sqlmesh_padelnomics run",
|
||||
)
|
||||
if not ok:
|
||||
send_alert("SQLMesh transform failed")
|
||||
|
||||
|
||||
def run_export() -> None:
|
||||
"""Export serving tables to analytics.duckdb."""
|
||||
logger.info("Exporting serving tables")
|
||||
ok = run_shell(
|
||||
f"DUCKDB_PATH={DUCKDB_PATH} SERVING_DUCKDB_PATH={SERVING_DUCKDB_PATH} "
|
||||
f"uv run python src/padelnomics/export_serving.py"
|
||||
)
|
||||
if not ok:
|
||||
send_alert("Serving export failed")
|
||||
|
||||
|
||||
def web_code_changed() -> bool:
|
||||
"""Check if web app code changed since last deploy (after git pull)."""
|
||||
result = subprocess.run(
|
||||
["git", "diff", "--name-only", "HEAD~1", "HEAD", "--", "web/", "Dockerfile"],
|
||||
capture_output=True, text=True, timeout=30,
|
||||
)
|
||||
return bool(result.stdout.strip())
|
||||
|
||||
|
||||
def git_pull_and_sync() -> None:
|
||||
"""Pull latest code and sync dependencies."""
|
||||
run_shell("git fetch origin master")
|
||||
run_shell("git switch --discard-changes --detach origin/master")
|
||||
run_shell("uv sync --all-packages")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Alerting
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def send_alert(message: str) -> None:
|
||||
"""Send failure alert via webhook (ntfy.sh / Slack / Telegram)."""
|
||||
if not ALERT_WEBHOOK_URL:
|
||||
return
|
||||
timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
|
||||
try:
|
||||
subprocess.run(
|
||||
["curl", "-s", "-d", f"[{timestamp}] {message}", ALERT_WEBHOOK_URL],
|
||||
timeout=10, capture_output=True,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to send alert")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main loop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def tick() -> None:
|
||||
"""One cycle: check schedules, run what's due, transform, export."""
|
||||
workflows = load_workflows(WORKFLOWS_PATH)
|
||||
conn = _open_state_db()
|
||||
|
||||
try:
|
||||
# Git pull + sync (production only)
|
||||
if os.getenv("SUPERVISOR_GIT_PULL"):
|
||||
git_pull_and_sync()
|
||||
|
||||
# Run due extractors
|
||||
run_due_workflows(conn, workflows)
|
||||
|
||||
# SQLMesh always runs (evaluates staleness internally)
|
||||
run_transform()
|
||||
|
||||
# Export serving tables
|
||||
run_export()
|
||||
|
||||
# Deploy web app if code changed
|
||||
if os.getenv("SUPERVISOR_GIT_PULL") and web_code_changed():
|
||||
logger.info("Web code changed — deploying")
|
||||
run_shell("./deploy.sh")
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def supervisor_loop() -> None:
|
||||
"""Infinite supervisor loop — never exits unless killed."""
|
||||
logger.info("Supervisor starting (tick interval: %ds)", TICK_INTERVAL_SECONDS)
|
||||
logger.info("Workflows: %s", WORKFLOWS_PATH)
|
||||
logger.info("Landing dir: %s", LANDING_DIR)
|
||||
|
||||
while True:
|
||||
try:
|
||||
tick()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Supervisor stopped (KeyboardInterrupt)")
|
||||
break
|
||||
except Exception:
|
||||
logger.exception("Supervisor tick failed — backing off %ds", BACKOFF_SECONDS)
|
||||
send_alert("Supervisor tick failed")
|
||||
time.sleep(BACKOFF_SECONDS)
|
||||
else:
|
||||
time.sleep(TICK_INTERVAL_SECONDS)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Status CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def print_status() -> None:
|
||||
"""Print workflow status table."""
|
||||
workflows = load_workflows(WORKFLOWS_PATH)
|
||||
conn = _open_state_db()
|
||||
|
||||
now = datetime.now(UTC)
|
||||
|
||||
# Header
|
||||
print(f"{'Workflow':<28} {'Schedule':<18} {'Last Run':<20} {'Status':<8} {'Next'}")
|
||||
print(f"{'─' * 28} {'─' * 18} {'─' * 20} {'─' * 8} {'─' * 12}")
|
||||
|
||||
for w in workflows:
|
||||
last_success = _get_last_success_time(conn, w["name"])
|
||||
cron_expr = resolve_schedule(w["schedule"])
|
||||
|
||||
# Last run info
|
||||
if last_success:
|
||||
last_str = last_success.strftime("%Y-%m-%d %H:%M")
|
||||
status = "ok"
|
||||
else:
|
||||
last_str = "never"
|
||||
status = "pending"
|
||||
|
||||
# Last failure check
|
||||
row = conn.execute(
|
||||
"SELECT MAX(finished_at) AS t FROM extraction_runs "
|
||||
"WHERE extractor = ? AND status = 'failed'",
|
||||
(w["name"],),
|
||||
).fetchone()
|
||||
if row and row["t"]:
|
||||
last_fail = datetime.fromisoformat(row["t"]).replace(tzinfo=UTC)
|
||||
if last_success is None or last_fail > last_success:
|
||||
status = "FAILED"
|
||||
|
||||
# Next trigger (croniter returns naive datetimes — treat as UTC)
|
||||
now_naive = now.replace(tzinfo=None)
|
||||
next_trigger = croniter(cron_expr, now_naive).get_next(datetime)
|
||||
delta = next_trigger - now_naive
|
||||
if delta.total_seconds() < 3600:
|
||||
next_str = f"in {int(delta.total_seconds() / 60)}m"
|
||||
elif delta.total_seconds() < 86400:
|
||||
next_str = next_trigger.strftime("%H:%M")
|
||||
else:
|
||||
next_str = next_trigger.strftime("%b %d")
|
||||
|
||||
schedule_display = w["schedule"] if w["schedule"] in NAMED_SCHEDULES else cron_expr
|
||||
print(f"{w['name']:<28} {schedule_display:<18} {last_str:<20} {status:<8} {next_str}")
|
||||
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main() -> None:
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "status":
|
||||
print_status()
|
||||
else:
|
||||
supervisor_loop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user