fix(supervisor): improve alert messages with category prefix and error snippet
Each alert now includes a neutral category tag ([extract], [transform], [export], [deploy], [supervisor]) and the first line of the error, so notifications are actionable without revealing tech stack details on the public free ntfy tier. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -192,9 +192,9 @@ def run_workflow(conn, workflow: dict) -> None:
|
|||||||
entry_fn = getattr(module, entry_name)
|
entry_fn = getattr(module, entry_name)
|
||||||
entry_fn()
|
entry_fn()
|
||||||
logger.info("Workflow %s completed successfully", workflow["name"])
|
logger.info("Workflow %s completed successfully", workflow["name"])
|
||||||
except Exception:
|
except Exception as exc:
|
||||||
logger.exception("Workflow %s failed", workflow["name"])
|
logger.exception("Workflow %s failed", workflow["name"])
|
||||||
send_alert(f"Workflow '{workflow['name']}' failed")
|
send_alert(f"[extract] {type(exc).__name__}: {str(exc)[:100]}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
@@ -233,8 +233,8 @@ def run_due_workflows(conn, workflows: list[dict]) -> bool:
|
|||||||
# Transform + Export + Deploy
|
# Transform + Export + Deploy
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> bool:
|
def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> tuple[bool, str]:
|
||||||
"""Run a shell command. Returns True on success."""
|
"""Run a shell command. Returns (success, error_snippet)."""
|
||||||
logger.info("Shell: %s", cmd)
|
logger.info("Shell: %s", cmd)
|
||||||
result = subprocess.run(
|
result = subprocess.run(
|
||||||
cmd, shell=True, capture_output=True, text=True, timeout=timeout_seconds
|
cmd, shell=True, capture_output=True, text=True, timeout=timeout_seconds
|
||||||
@@ -242,29 +242,31 @@ def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> bo
|
|||||||
if result.returncode != 0:
|
if result.returncode != 0:
|
||||||
logger.error("Shell failed (rc=%d): %s\nstdout: %s\nstderr: %s",
|
logger.error("Shell failed (rc=%d): %s\nstdout: %s\nstderr: %s",
|
||||||
result.returncode, cmd, result.stdout[-500:], result.stderr[-500:])
|
result.returncode, cmd, result.stdout[-500:], result.stderr[-500:])
|
||||||
return False
|
raw = (result.stderr or result.stdout).strip()
|
||||||
return True
|
snippet = next((ln.strip() for ln in raw.splitlines() if ln.strip()), raw)[:120]
|
||||||
|
return False, snippet
|
||||||
|
return True, ""
|
||||||
|
|
||||||
|
|
||||||
def run_transform() -> None:
|
def run_transform() -> None:
|
||||||
"""Run SQLMesh — it evaluates model staleness internally."""
|
"""Run SQLMesh — it evaluates model staleness internally."""
|
||||||
logger.info("Running SQLMesh transform")
|
logger.info("Running SQLMesh transform")
|
||||||
ok = run_shell(
|
ok, err = run_shell(
|
||||||
"uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply",
|
"uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply",
|
||||||
)
|
)
|
||||||
if not ok:
|
if not ok:
|
||||||
send_alert("SQLMesh transform failed")
|
send_alert(f"[transform] {err}")
|
||||||
|
|
||||||
|
|
||||||
def run_export() -> None:
|
def run_export() -> None:
|
||||||
"""Export serving tables to analytics.duckdb."""
|
"""Export serving tables to analytics.duckdb."""
|
||||||
logger.info("Exporting serving tables")
|
logger.info("Exporting serving tables")
|
||||||
ok = run_shell(
|
ok, err = run_shell(
|
||||||
f"DUCKDB_PATH={DUCKDB_PATH} SERVING_DUCKDB_PATH={SERVING_DUCKDB_PATH} "
|
f"DUCKDB_PATH={DUCKDB_PATH} SERVING_DUCKDB_PATH={SERVING_DUCKDB_PATH} "
|
||||||
f"uv run python src/padelnomics/export_serving.py"
|
f"uv run python src/padelnomics/export_serving.py"
|
||||||
)
|
)
|
||||||
if not ok:
|
if not ok:
|
||||||
send_alert("Serving export failed")
|
send_alert(f"[export] {err}")
|
||||||
|
|
||||||
|
|
||||||
def web_code_changed() -> bool:
|
def web_code_changed() -> bool:
|
||||||
@@ -365,11 +367,11 @@ def tick() -> None:
|
|||||||
# Deploy web app if code changed
|
# Deploy web app if code changed
|
||||||
if os.getenv("SUPERVISOR_GIT_PULL") and web_code_changed():
|
if os.getenv("SUPERVISOR_GIT_PULL") and web_code_changed():
|
||||||
logger.info("Web code changed — deploying")
|
logger.info("Web code changed — deploying")
|
||||||
ok = run_shell("./deploy.sh")
|
ok, err = run_shell("./deploy.sh")
|
||||||
if ok:
|
if ok:
|
||||||
send_alert("Deploy succeeded")
|
send_alert("[deploy] ok")
|
||||||
else:
|
else:
|
||||||
send_alert("Deploy FAILED — check journalctl -u padelnomics-supervisor")
|
send_alert(f"[deploy] failed: {err}")
|
||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
@@ -386,9 +388,9 @@ def supervisor_loop() -> None:
|
|||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logger.info("Supervisor stopped (KeyboardInterrupt)")
|
logger.info("Supervisor stopped (KeyboardInterrupt)")
|
||||||
break
|
break
|
||||||
except Exception:
|
except Exception as exc:
|
||||||
logger.exception("Supervisor tick failed — backing off %ds", BACKOFF_SECONDS)
|
logger.exception("Supervisor tick failed — backing off %ds", BACKOFF_SECONDS)
|
||||||
send_alert("Supervisor tick failed")
|
send_alert(f"[supervisor] {type(exc).__name__}: {str(exc)[:100]}")
|
||||||
time.sleep(BACKOFF_SECONDS)
|
time.sleep(BACKOFF_SECONDS)
|
||||||
else:
|
else:
|
||||||
time.sleep(TICK_INTERVAL_SECONDS)
|
time.sleep(TICK_INTERVAL_SECONDS)
|
||||||
|
|||||||
Reference in New Issue
Block a user