#!/bin/bash # Materia Supervisor - Continuous pipeline orchestration # Inspired by TigerBeetle's CFO supervisor pattern # https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh # # Git-based deployment: pulls latest code from master and runs pipelines via uv set -euo pipefail # Configuration readonly CHECK_INTERVAL=900 # 15 minutes readonly MATERIA_REPO="/opt/materia/repo" readonly STATE_DIR="/var/lib/materia" # Schedules (cron-style times in UTC) readonly EXTRACT_SCHEDULE_HOUR=2 # 02:00 UTC readonly TRANSFORM_SCHEDULE_HOUR=3 # 03:00 UTC # Ensure state directory exists mkdir -p "$STATE_DIR" log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" } log_error() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] ERROR: $*" >&2 } # Update code from git update_code() { log "Checking for code updates..." cd "$MATERIA_REPO" # Fetch latest from master if ! git fetch origin master 2>&1 | grep -v "^From"; then log_error "Failed to fetch from git" return 1 fi # Check if update available LOCAL=$(git rev-parse HEAD) REMOTE=$(git rev-parse origin/master) if [ "$LOCAL" != "$REMOTE" ]; then log "New version detected: $LOCAL -> $REMOTE" # Pull latest code if git pull origin master; then log "Code updated successfully" # Update dependencies log "Updating dependencies with uv sync..." if uv sync; then log "Dependencies updated" return 0 else log_error "Failed to update dependencies" return 1 fi else log_error "Failed to pull code" return 1 fi fi log "Already up to date at $(git rev-parse --short HEAD)" return 1 # Return 1 to indicate no update (not an error) } # Run pipeline using materia CLI via uv run_pipeline() { local pipeline=$1 local date=$(date -u +%Y-%m-%d) local state_file="$STATE_DIR/${pipeline}_last_run" log "Running $pipeline pipeline..." cd "$MATERIA_REPO" if uv run materia pipeline run "$pipeline"; then log "$pipeline completed successfully" echo "$date" > "$state_file" return 0 else log_error "$pipeline failed" return 1 fi } # Check if pipeline should run today should_run_pipeline() { local pipeline=$1 local schedule_hour=$2 local current_hour=$(date -u +%H) local current_date=$(date -u +%Y-%m-%d) local state_file="$STATE_DIR/${pipeline}_last_run" # Only run at the scheduled hour if [ "$current_hour" -ne "$schedule_hour" ]; then return 1 fi # Check if already ran today if [ -f "$state_file" ]; then local last_run=$(cat "$state_file") if [ "$last_run" = "$current_date" ]; then return 1 # Already ran today fi fi return 0 # Should run } # Main supervisor loop main() { log "Materia supervisor starting..." log "Repository: $MATERIA_REPO" log "Extract schedule: daily at ${EXTRACT_SCHEDULE_HOUR}:00 UTC" log "Transform schedule: daily at ${TRANSFORM_SCHEDULE_HOUR}:00 UTC" log "Check interval: ${CHECK_INTERVAL}s" # Ensure repo exists if [ ! -d "$MATERIA_REPO/.git" ]; then log_error "Repository not found at $MATERIA_REPO" log_error "Run bootstrap script first!" exit 1 fi # Show initial version cd "$MATERIA_REPO" log "Starting at commit: $(git rev-parse --short HEAD)" while true; do # Check for code updates every loop update_code || true # Check extract schedule if should_run_pipeline "extract" "$EXTRACT_SCHEDULE_HOUR"; then run_pipeline extract || true fi # Check transform schedule if should_run_pipeline "transform" "$TRANSFORM_SCHEDULE_HOUR"; then run_pipeline transform || true fi sleep "$CHECK_INTERVAL" done } # Run main loop main