diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 59240fb..53f2be3 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -3,7 +3,6 @@ image: python:3.13 stages: - lint - test - - build - deploy variables: @@ -54,83 +53,6 @@ test:sqlmesh: - uv sync - cd transform/sqlmesh_materia && uv run sqlmesh test -build:extract: - stage: build - before_script: - - *uv_setup - script: - - uv sync - - mkdir -p dist - - uv build --package psdonline --out-dir dist/extract - - cd dist/extract && tar -czf ../materia-extract-latest.tar.gz . - artifacts: - paths: - - dist/materia-extract-latest.tar.gz - expire_in: 1 week - rules: - - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH - -build:transform: - stage: build - before_script: - - *uv_setup - script: - - uv sync - - mkdir -p dist - - uv build --package sqlmesh_materia --out-dir dist/transform - - cd dist/transform && tar -czf ../materia-transform-latest.tar.gz . - artifacts: - paths: - - dist/materia-transform-latest.tar.gz - expire_in: 1 week - rules: - - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH - -build:cli: - stage: build - before_script: - - *uv_setup - script: - - uv sync - - mkdir -p dist - - uv build --out-dir dist/cli - - cd dist/cli && tar -czf ../materia-cli-latest.tar.gz . - artifacts: - paths: - - dist/materia-cli-latest.tar.gz - expire_in: 1 week - rules: - - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH - -deploy:r2: - stage: deploy - image: rclone/rclone:latest - before_script: - - apk add --no-cache curl unzip - - curl -fsSL https://get.pulumi.com/esc/install.sh | sh - - export PATH="$HOME/.pulumi/bin:$PATH" - - esc login --token ${PULUMI_ACCESS_TOKEN} - - eval $(esc env open beanflows/prod --format shell) - - | - mkdir -p ~/.config/rclone - cat > ~/.config/rclone/rclone.conf <> ~/.ssh/known_hosts - # Deploy supervisor script and service - scp infra/supervisor/supervisor.sh root@${SUPERVISOR_IP}:/opt/materia/supervisor.sh - scp infra/supervisor/materia-supervisor.service root@${SUPERVISOR_IP}:/etc/systemd/system/materia-supervisor.service - - # Deploy to supervisor - ssh root@${SUPERVISOR_IP} bash <<'ENDSSH' - set -e - cd /opt/materia - - # Create environment file with secrets - cat > .env < +ssh root@ 'bash -s' < infra/bootstrap_supervisor.sh +``` #### 2. Ephemeral Workers (On-Demand) -- Created for each pipeline execution -- Downloads pre-built artifacts from R2 (no git, no uv on worker) +- Created for each pipeline execution by materia CLI - Receives secrets via SSH environment variable injection - Destroyed immediately after job completion - Different instance types per pipeline: @@ -239,18 +237,20 @@ Each artifact is a self-contained tarball with all dependencies. ``` Pulumi ESC (beanflows/prod) ↓ -Supervisor Instance (materia CLI) +Supervisor Instance (via esc CLI) ↓ Workers (injected as env vars via SSH) ``` -#### 4. Artifact Flow +#### 4. Code Deployment Flow ``` -GitLab CI: uv build → tar.gz +GitLab (master branch) ↓ -Cloudflare R2 (artifact storage) +Supervisor: git pull origin master (every 15 min) ↓ -Worker: curl → extract → execute +Supervisor: uv sync (update dependencies) + ↓ +Supervisor: uv run materia pipeline run ``` #### 5. Data Storage @@ -261,12 +261,12 @@ Worker: curl → extract → execute **Execution Flow:** 1. Supervisor loop wakes up every 15 minutes -2. Checks if current time matches pipeline schedule (e.g., 2 AM for extract) -3. Checks for CLI updates (hourly) and self-updates if needed -4. CLI runs: `materia pipeline run extract` -5. Creates Hetzner worker with SSH key -6. Worker downloads `materia-extract-latest.tar.gz` from R2 -7. CLI injects secrets via SSH: `export R2_ACCESS_KEY_ID=... && ./extract_psd` +2. Runs `git fetch` and checks if new commits on master +3. If updates available: `git pull && uv sync` +4. Checks if current time matches pipeline schedule (e.g., 2 AM for extract) +5. If scheduled: `uv run materia pipeline run extract` +6. CLI creates Hetzner worker with SSH key +7. CLI injects secrets via SSH and executes pipeline 8. Pipeline executes, writes to R2 Iceberg catalog 9. Worker destroyed (entire lifecycle ~5-10 minutes) 10. Supervisor logs results and continues loop diff --git a/beanflows_ssh b/beanflows_ssh deleted file mode 100644 index 89c5f73..0000000 --- a/beanflows_ssh +++ /dev/null @@ -1,7 +0,0 @@ ------BEGIN OPENSSH PRIVATE KEY----- -b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW -QyNTUxOQAAACCfGESotAKXA3uc2Mu90jYfpbwqZyRF+VytareVIN3PkgAAAJjG2ri3xtq4 -twAAAAtzc2gtZWQyNTUxOQAAACCfGESotAKXA3uc2Mu90jYfpbwqZyRF+VytareVIN3Pkg -AAAECiPTY1dlijk3nvQcqZckzW2RddBhlqRTp4CMqrqj4oLJ8YRKi0ApcDe5zYy73SNh+l -vCpnJEX5XK1qt5Ug3c+SAAAAD2RlZW1hbkBEZWVtYW5QQwECAwQFBg== ------END OPENSSH PRIVATE KEY----- diff --git a/beanflows_ssh.pub b/beanflows_ssh.pub deleted file mode 100644 index 1f6e925..0000000 --- a/beanflows_ssh.pub +++ /dev/null @@ -1 +0,0 @@ -ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIJ8YRKi0ApcDe5zYy73SNh+lvCpnJEX5XK1qt5Ug3c+S deeman@DeemanPC diff --git a/infra/Pulumi.prod.yaml b/infra/Pulumi.prod.yaml index 008e1ad..322a354 100644 --- a/infra/Pulumi.prod.yaml +++ b/infra/Pulumi.prod.yaml @@ -1,5 +1,7 @@ +# Production stack configuration +# All secrets come from Pulumi ESC environment: beanflows/prod +environment: + - beanflows/prod + config: - hcloud:token: - secure: AAABAEdhCpoRPhSknCQDgJWRFUjqwyM7TIz60ICRfcpy2GcYeFH098aX/3/rPCJCuetsRma0Wa145Ff3XXIEgUHFJ4Xr9/fZTZtlAtfMROaEhukWL19k96Fh6m8JihMl - materia-infrastructure:ssh_public_key: - secure: AAABAERKCdqTMBjaxXE+AzlVlCCxUkF1R7+1kFo7c69gqQt1JQuuvzAL/16f099iMP0Ij97U45VBpKUrMtZfHy68d1w1hyCueMHwhoOsfN7bLpj4R/DdCsupXfs8Vx/bJtBjIvsPKbK7f+DygWM1RA== + materia-infrastructure:hetzner_location: "nbg1" # Nuremberg, Germany diff --git a/infra/bootstrap_supervisor.sh b/infra/bootstrap_supervisor.sh new file mode 100755 index 0000000..5556284 --- /dev/null +++ b/infra/bootstrap_supervisor.sh @@ -0,0 +1,130 @@ +#!/bin/bash +# Bootstrap script for Materia supervisor instance +# Run this once on a new supervisor to set it up +# +# Usage: +# From CI/CD or locally: +# ssh root@ 'bash -s' < infra/bootstrap_supervisor.sh +# +# Or on the supervisor itself: +# curl -fsSL | bash + +set -euo pipefail + +echo "=== Materia Supervisor Bootstrap ===" +echo "This script will:" +echo " 1. Install dependencies (git, uv, esc)" +echo " 2. Clone the materia repository" +echo " 3. Setup systemd service" +echo " 4. Start the supervisor" +echo "" + +# Check if we're root +if [ "$EUID" -ne 0 ]; then + echo "ERROR: This script must be run as root" + exit 1 +fi + +# Configuration +REPO_URL="${REPO_URL:-https://gitlab.com/YOUR_USERNAME/materia.git}" # TODO: Update this! +MATERIA_DIR="/opt/materia" +REPO_DIR="$MATERIA_DIR/repo" + +echo "--- Installing system dependencies ---" +apt-get update +apt-get install -y git curl python3-pip + +echo "--- Installing uv ---" +if ! command -v uv &> /dev/null; then + curl -LsSf https://astral.sh/uv/install.sh | sh + export PATH="$HOME/.cargo/bin:$PATH" + echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> /root/.bashrc +fi + +echo "--- Installing Pulumi ESC ---" +if ! command -v esc &> /dev/null; then + curl -fsSL https://get.pulumi.com/esc/install.sh | sh + export PATH="$HOME/.pulumi/bin:$PATH" + echo 'export PATH="$HOME/.pulumi/bin:$PATH"' >> /root/.bashrc +fi + +echo "--- Setting up Pulumi ESC authentication ---" +if [ -z "${PULUMI_ACCESS_TOKEN:-}" ]; then + echo "ERROR: PULUMI_ACCESS_TOKEN environment variable not set" + echo "Please set it before running this script:" + echo " export PULUMI_ACCESS_TOKEN=" + exit 1 +fi + +esc login --token "$PULUMI_ACCESS_TOKEN" + +echo "--- Loading secrets from Pulumi ESC ---" +eval $(esc env open beanflows/prod --format shell) + +echo "--- Cloning repository ---" +mkdir -p "$MATERIA_DIR" +if [ -d "$REPO_DIR" ]; then + echo "Repository already exists, pulling latest..." + cd "$REPO_DIR" + git pull origin master +else + cd "$MATERIA_DIR" + git clone "$REPO_URL" repo + cd repo +fi + +echo "--- Installing Python dependencies ---" +uv sync + +echo "--- Creating environment file ---" +cat > "$MATERIA_DIR/.env" < /etc/systemd/system/materia-supervisor.service <<'EOF' +[Unit] +Description=Materia Supervisor - Pipeline Orchestration +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +User=root +WorkingDirectory=/opt/materia/repo +ExecStart=/opt/materia/repo/infra/supervisor/supervisor.sh +Restart=always +RestartSec=10 +EnvironmentFile=/opt/materia/.env + +# Resource limits +LimitNOFILE=65536 + +# Logging +StandardOutput=journal +StandardError=journal +SyslogIdentifier=materia-supervisor + +[Install] +WantedBy=multi-user.target +EOF + +echo "--- Enabling and starting service ---" +systemctl daemon-reload +systemctl enable materia-supervisor +systemctl start materia-supervisor + +echo "" +echo "=== Bootstrap complete! ===" +echo "" +echo "Supervisor is now running. Check status with:" +echo " systemctl status materia-supervisor" +echo "" +echo "View logs with:" +echo " journalctl -u materia-supervisor -f" +echo "" +echo "Repository location: $REPO_DIR" +echo "Current commit: $(cd $REPO_DIR && git rev-parse --short HEAD)" diff --git a/infra/supervisor/supervisor.sh b/infra/supervisor/supervisor.sh index 0a15430..71e9b7e 100644 --- a/infra/supervisor/supervisor.sh +++ b/infra/supervisor/supervisor.sh @@ -2,24 +2,22 @@ # Materia Supervisor - Continuous pipeline orchestration # Inspired by TigerBeetle's CFO supervisor pattern # https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh +# +# Git-based deployment: pulls latest code from master and runs pipelines via uv set -euo pipefail # Configuration readonly CHECK_INTERVAL=900 # 15 minutes -readonly CLI_VERSION_CHECK_INTERVAL=3600 # 1 hour -readonly MATERIA_DIR="/opt/materia" -readonly R2_ARTIFACTS_URL="https://${R2_ENDPOINT}/${R2_ARTIFACTS_BUCKET}" -readonly CLI_ARTIFACT="materia-cli-latest.tar.gz" +readonly MATERIA_REPO="/opt/materia/repo" +readonly STATE_DIR="/var/lib/materia" # Schedules (cron-style times in UTC) readonly EXTRACT_SCHEDULE_HOUR=2 # 02:00 UTC readonly TRANSFORM_SCHEDULE_HOUR=3 # 03:00 UTC -# State tracking -last_extract_run="" -last_transform_run="" -last_cli_check=0 +# Ensure state directory exists +mkdir -p "$STATE_DIR" log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" @@ -29,151 +27,121 @@ log_error() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] ERROR: $*" >&2 } -# Check if CLI needs updating -check_cli_update() { - local now - now=$(date +%s) +# Update code from git +update_code() { + log "Checking for code updates..." + cd "$MATERIA_REPO" - # Only check once per hour - if (( now - last_cli_check < CLI_VERSION_CHECK_INTERVAL )); then - return 0 - fi - - last_cli_check=$now - - log "Checking for CLI updates..." - - # Download new version - local temp_file="${MATERIA_DIR}/cli-new.tar.gz" - if ! curl -fsSL -o "$temp_file" "${R2_ARTIFACTS_URL}/${CLI_ARTIFACT}"; then - log_error "Failed to download CLI artifact" + # Fetch latest from master + if ! git fetch origin master 2>&1 | grep -v "^From"; then + log_error "Failed to fetch from git" return 1 fi - # Compare checksums - local old_checksum="" - local new_checksum + # Check if update available + LOCAL=$(git rev-parse HEAD) + REMOTE=$(git rev-parse origin/master) - if [ -f "${MATERIA_DIR}/${CLI_ARTIFACT}" ]; then - old_checksum=$(sha256sum "${MATERIA_DIR}/${CLI_ARTIFACT}" | awk '{print $1}') + if [ "$LOCAL" != "$REMOTE" ]; then + log "New version detected: $LOCAL -> $REMOTE" + + # Pull latest code + if git pull origin master; then + log "Code updated successfully" + + # Update dependencies + log "Updating dependencies with uv sync..." + if uv sync; then + log "Dependencies updated" + return 0 + else + log_error "Failed to update dependencies" + return 1 + fi + else + log_error "Failed to pull code" + return 1 + fi fi - new_checksum=$(sha256sum "$temp_file" | awk '{print $1}') + log "Already up to date at $(git rev-parse --short HEAD)" + return 1 # Return 1 to indicate no update (not an error) +} - if [ "$old_checksum" = "$new_checksum" ]; then - log "CLI is up to date" - rm -f "$temp_file" +# Run pipeline using materia CLI via uv +run_pipeline() { + local pipeline=$1 + local date=$(date -u +%Y-%m-%d) + local state_file="$STATE_DIR/${pipeline}_last_run" + + log "Running $pipeline pipeline..." + + cd "$MATERIA_REPO" + if uv run materia pipeline run "$pipeline"; then + log "$pipeline completed successfully" + echo "$date" > "$state_file" return 0 - fi - - log "New CLI version detected, updating..." - - # Install new version - mv "$temp_file" "${MATERIA_DIR}/${CLI_ARTIFACT}" - - cd "$MATERIA_DIR" - rm -rf cli && mkdir -p cli - tar -xzf "$CLI_ARTIFACT" -C cli/ - - if pip3 install --force-reinstall cli/*.whl; then - log "CLI updated successfully" - materia version else - log_error "Failed to install CLI" + log_error "$pipeline failed" return 1 fi } -# Check if we should run extract pipeline (daily at specified hour) -should_run_extract() { - local current_hour - local current_date - - current_hour=$(date -u +%H) - current_date=$(date -u +%Y-%m-%d) +# Check if pipeline should run today +should_run_pipeline() { + local pipeline=$1 + local schedule_hour=$2 + local current_hour=$(date -u +%H) + local current_date=$(date -u +%Y-%m-%d) + local state_file="$STATE_DIR/${pipeline}_last_run" # Only run at the scheduled hour - if [ "$current_hour" != "$EXTRACT_SCHEDULE_HOUR" ]; then + if [ "$current_hour" -ne "$schedule_hour" ]; then return 1 fi - # Only run once per day - if [ "$last_extract_run" = "$current_date" ]; then - return 1 + # Check if already ran today + if [ -f "$state_file" ]; then + local last_run=$(cat "$state_file") + if [ "$last_run" = "$current_date" ]; then + return 1 # Already ran today + fi fi - return 0 -} - -# Check if we should run transform pipeline (daily at specified hour) -should_run_transform() { - local current_hour - local current_date - - current_hour=$(date -u +%H) - current_date=$(date -u +%Y-%m-%d) - - # Only run at the scheduled hour - if [ "$current_hour" != "$TRANSFORM_SCHEDULE_HOUR" ]; then - return 1 - fi - - # Only run once per day - if [ "$last_transform_run" = "$current_date" ]; then - return 1 - fi - - return 0 -} - -# Run extract pipeline -run_extract() { - log "Starting extract pipeline..." - - if materia pipeline run extract; then - log "Extract pipeline completed successfully" - last_extract_run=$(date -u +%Y-%m-%d) - else - log_error "Extract pipeline failed" - return 1 - fi -} - -# Run transform pipeline -run_transform() { - log "Starting transform pipeline..." - - if materia pipeline run transform; then - log "Transform pipeline completed successfully" - last_transform_run=$(date -u +%Y-%m-%d) - else - log_error "Transform pipeline failed" - return 1 - fi + return 0 # Should run } # Main supervisor loop main() { log "Materia supervisor starting..." + log "Repository: $MATERIA_REPO" log "Extract schedule: daily at ${EXTRACT_SCHEDULE_HOUR}:00 UTC" log "Transform schedule: daily at ${TRANSFORM_SCHEDULE_HOUR}:00 UTC" log "Check interval: ${CHECK_INTERVAL}s" - # Initial CLI check - check_cli_update || log_error "Initial CLI check failed, continuing anyway" + # Ensure repo exists + if [ ! -d "$MATERIA_REPO/.git" ]; then + log_error "Repository not found at $MATERIA_REPO" + log_error "Run bootstrap script first!" + exit 1 + fi + + # Show initial version + cd "$MATERIA_REPO" + log "Starting at commit: $(git rev-parse --short HEAD)" while true; do - # Check for CLI updates - check_cli_update || true + # Check for code updates every loop + update_code || true - # Check and run extract pipeline - if should_run_extract; then - run_extract || true + # Check extract schedule + if should_run_pipeline "extract" "$EXTRACT_SCHEDULE_HOUR"; then + run_pipeline extract || true fi - # Check and run transform pipeline - if should_run_transform; then - run_transform || true + # Check transform schedule + if should_run_pipeline "transform" "$TRANSFORM_SCHEDULE_HOUR"; then + run_pipeline transform || true fi sleep "$CHECK_INTERVAL"