Implements automated supervisor instance deployment that runs scheduled pipelines using a TigerBeetle-inspired continuous orchestration pattern. Infrastructure changes: - Update Pulumi to use existing R2 buckets (beanflows-artifacts, beanflows-data-prod) - Rename scheduler → supervisor, optimize to CCX11 (€4/mo) - Remove always-on worker (workers are now ephemeral only) - Add artifacts bucket resource for CLI/pipeline packages Supervisor architecture: - supervisor.sh: Continuous loop checking schedules every 15 minutes - Self-updating: Checks for new CLI versions hourly - Fixed schedules: Extract at 2 AM UTC, Transform at 3 AM UTC - systemd service for automatic restart on failure - Logs to systemd journal for observability CI/CD changes: - deploy:infra now runs on every master push (not just on changes) - New deploy:supervisor job: * Deploys supervisor.sh and systemd service * Installs latest materia CLI from R2 * Configures environment with Pulumi ESC secrets * Restarts supervisor service Future enhancements documented: - SQLMesh-aware scheduling (check models before running) - Model tags for worker sizing (heavy/distributed hints) - Multi-pipeline support, distributed execution - Cost optimization with multi-cloud spot pricing 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
185 lines
4.4 KiB
Bash
185 lines
4.4 KiB
Bash
#!/bin/bash
|
|
# Materia Supervisor - Continuous pipeline orchestration
|
|
# Inspired by TigerBeetle's CFO supervisor pattern
|
|
# https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh
|
|
|
|
set -euo pipefail
|
|
|
|
# Configuration
|
|
readonly CHECK_INTERVAL=900 # 15 minutes
|
|
readonly CLI_VERSION_CHECK_INTERVAL=3600 # 1 hour
|
|
readonly MATERIA_DIR="/opt/materia"
|
|
readonly R2_ARTIFACTS_URL="https://${R2_ENDPOINT}/${R2_ARTIFACTS_BUCKET}"
|
|
readonly CLI_ARTIFACT="materia-cli-latest.tar.gz"
|
|
|
|
# Schedules (cron-style times in UTC)
|
|
readonly EXTRACT_SCHEDULE_HOUR=2 # 02:00 UTC
|
|
readonly TRANSFORM_SCHEDULE_HOUR=3 # 03:00 UTC
|
|
|
|
# State tracking
|
|
last_extract_run=""
|
|
last_transform_run=""
|
|
last_cli_check=0
|
|
|
|
log() {
|
|
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*"
|
|
}
|
|
|
|
log_error() {
|
|
echo "[$(date '+%Y-%m-%d %H:%M:%S')] ERROR: $*" >&2
|
|
}
|
|
|
|
# Check if CLI needs updating
|
|
check_cli_update() {
|
|
local now
|
|
now=$(date +%s)
|
|
|
|
# Only check once per hour
|
|
if (( now - last_cli_check < CLI_VERSION_CHECK_INTERVAL )); then
|
|
return 0
|
|
fi
|
|
|
|
last_cli_check=$now
|
|
|
|
log "Checking for CLI updates..."
|
|
|
|
# Download new version
|
|
local temp_file="${MATERIA_DIR}/cli-new.tar.gz"
|
|
if ! curl -fsSL -o "$temp_file" "${R2_ARTIFACTS_URL}/${CLI_ARTIFACT}"; then
|
|
log_error "Failed to download CLI artifact"
|
|
return 1
|
|
fi
|
|
|
|
# Compare checksums
|
|
local old_checksum=""
|
|
local new_checksum
|
|
|
|
if [ -f "${MATERIA_DIR}/${CLI_ARTIFACT}" ]; then
|
|
old_checksum=$(sha256sum "${MATERIA_DIR}/${CLI_ARTIFACT}" | awk '{print $1}')
|
|
fi
|
|
|
|
new_checksum=$(sha256sum "$temp_file" | awk '{print $1}')
|
|
|
|
if [ "$old_checksum" = "$new_checksum" ]; then
|
|
log "CLI is up to date"
|
|
rm -f "$temp_file"
|
|
return 0
|
|
fi
|
|
|
|
log "New CLI version detected, updating..."
|
|
|
|
# Install new version
|
|
mv "$temp_file" "${MATERIA_DIR}/${CLI_ARTIFACT}"
|
|
|
|
cd "$MATERIA_DIR"
|
|
rm -rf cli && mkdir -p cli
|
|
tar -xzf "$CLI_ARTIFACT" -C cli/
|
|
|
|
if pip3 install --force-reinstall cli/*.whl; then
|
|
log "CLI updated successfully"
|
|
materia version
|
|
else
|
|
log_error "Failed to install CLI"
|
|
return 1
|
|
fi
|
|
}
|
|
|
|
# Check if we should run extract pipeline (daily at specified hour)
|
|
should_run_extract() {
|
|
local current_hour
|
|
local current_date
|
|
|
|
current_hour=$(date -u +%H)
|
|
current_date=$(date -u +%Y-%m-%d)
|
|
|
|
# Only run at the scheduled hour
|
|
if [ "$current_hour" != "$EXTRACT_SCHEDULE_HOUR" ]; then
|
|
return 1
|
|
fi
|
|
|
|
# Only run once per day
|
|
if [ "$last_extract_run" = "$current_date" ]; then
|
|
return 1
|
|
fi
|
|
|
|
return 0
|
|
}
|
|
|
|
# Check if we should run transform pipeline (daily at specified hour)
|
|
should_run_transform() {
|
|
local current_hour
|
|
local current_date
|
|
|
|
current_hour=$(date -u +%H)
|
|
current_date=$(date -u +%Y-%m-%d)
|
|
|
|
# Only run at the scheduled hour
|
|
if [ "$current_hour" != "$TRANSFORM_SCHEDULE_HOUR" ]; then
|
|
return 1
|
|
fi
|
|
|
|
# Only run once per day
|
|
if [ "$last_transform_run" = "$current_date" ]; then
|
|
return 1
|
|
fi
|
|
|
|
return 0
|
|
}
|
|
|
|
# Run extract pipeline
|
|
run_extract() {
|
|
log "Starting extract pipeline..."
|
|
|
|
if materia pipeline run extract; then
|
|
log "Extract pipeline completed successfully"
|
|
last_extract_run=$(date -u +%Y-%m-%d)
|
|
else
|
|
log_error "Extract pipeline failed"
|
|
return 1
|
|
fi
|
|
}
|
|
|
|
# Run transform pipeline
|
|
run_transform() {
|
|
log "Starting transform pipeline..."
|
|
|
|
if materia pipeline run transform; then
|
|
log "Transform pipeline completed successfully"
|
|
last_transform_run=$(date -u +%Y-%m-%d)
|
|
else
|
|
log_error "Transform pipeline failed"
|
|
return 1
|
|
fi
|
|
}
|
|
|
|
# Main supervisor loop
|
|
main() {
|
|
log "Materia supervisor starting..."
|
|
log "Extract schedule: daily at ${EXTRACT_SCHEDULE_HOUR}:00 UTC"
|
|
log "Transform schedule: daily at ${TRANSFORM_SCHEDULE_HOUR}:00 UTC"
|
|
log "Check interval: ${CHECK_INTERVAL}s"
|
|
|
|
# Initial CLI check
|
|
check_cli_update || log_error "Initial CLI check failed, continuing anyway"
|
|
|
|
while true; do
|
|
# Check for CLI updates
|
|
check_cli_update || true
|
|
|
|
# Check and run extract pipeline
|
|
if should_run_extract; then
|
|
run_extract || true
|
|
fi
|
|
|
|
# Check and run transform pipeline
|
|
if should_run_transform; then
|
|
run_transform || true
|
|
fi
|
|
|
|
sleep "$CHECK_INTERVAL"
|
|
done
|
|
}
|
|
|
|
# Run main loop
|
|
main
|