diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 4a830f5..59240fb 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -31,7 +31,6 @@ lint: script: - uv sync - uv run ruff check . - - uv run ruff format --check . test:cli: stage: test @@ -143,5 +142,80 @@ deploy:infra: - pulumi up --yes rules: - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH - changes: - - infra/**/* + +deploy:supervisor: + stage: deploy + image: alpine:latest + before_script: + - apk add --no-cache openssh-client curl bash + - curl -fsSL https://get.pulumi.com/esc/install.sh | sh + - export PATH="$HOME/.pulumi/bin:$PATH" + - esc login --token ${PULUMI_ACCESS_TOKEN} + - eval $(esc env open beanflows/prod --format shell) + script: + - | + # Install pulumi CLI to get stack outputs + apk add --no-cache pulumi-bin || { + curl -fsSL https://get.pulumi.com/install.sh | sh + export PATH="$HOME/.pulumi/bin:$PATH" + } + pulumi login --token ${PULUMI_ACCESS_TOKEN} + + # Get supervisor IP from Pulumi + cd infra + SUPERVISOR_IP=$(pulumi stack output supervisor_ip -s prod) + cd .. + + echo "Deploying to supervisor at ${SUPERVISOR_IP}..." + + # Setup SSH + mkdir -p ~/.ssh + echo "$SSH_PRIVATE_KEY" > ~/.ssh/id_rsa + chmod 600 ~/.ssh/id_rsa + ssh-keyscan -H $SUPERVISOR_IP >> ~/.ssh/known_hosts + + # Deploy supervisor script and service + scp infra/supervisor/supervisor.sh root@${SUPERVISOR_IP}:/opt/materia/supervisor.sh + scp infra/supervisor/materia-supervisor.service root@${SUPERVISOR_IP}:/etc/systemd/system/materia-supervisor.service + + # Deploy to supervisor + ssh root@${SUPERVISOR_IP} bash <<'ENDSSH' + set -e + cd /opt/materia + + # Create environment file with secrets + cat > .env <` command that uses SQLMesh's internal scheduler to determine if work is needed: + +```bash +# Check if any models need refresh +materia pipeline check transform + +# Returns: +# - Exit code 0 if work needed +# - Exit code 1 if nothing to do +# - Prints list of models that would be updated +``` + +**Implementation:** +```python +# src/materia/pipelines.py +def check_pipeline(pipeline_name: str) -> PipelineCheckResult: + """Check if pipeline has work to do without creating a worker.""" + if pipeline_name == "transform": + # Run sqlmesh plan --dry-run locally + # Parse output to see what models need refresh + # Return list of models + their tags + pass +``` + +**Supervisor Integration:** +```bash +# In supervisor.sh +if materia pipeline check transform; then + echo "Transform has work to do, scheduling..." + materia pipeline run transform +else + echo "Transform up to date, skipping" +fi +``` + +--- + +## Model Tags for Worker Sizing + +### Current State +- All transform pipelines run on the same worker type (ccx22) +- No way to specify that certain models need more resources + +### Enhancement Idea +Use SQLMesh model tags to hint at resource requirements: + +```sql +-- models/serving/obt_commodity_metrics.sql +MODEL ( + name serving.obt_commodity_metrics, + kind INCREMENTAL_BY_TIME_RANGE, + cron '@daily', + tags ['heavy', 'long_running'] -- Supervisor uses this! +) +``` + +**Pipeline Check Enhancement:** +```python +@dataclass +class PipelineCheckResult: + has_work: bool + models: list[str] + tags: set[str] # Aggregated tags from all models needing refresh + +# Check if any heavy models need refresh +result = check_pipeline("transform") +if "heavy" in result.tags: + worker_type = "ccx32" # Use bigger worker +else: + worker_type = "ccx22" # Default worker +``` + +**Supervisor Integration:** +```bash +# In supervisor.sh +RESULT=$(materia pipeline check transform --format json) + +if [ "$(echo $RESULT | jq -r '.has_work')" = "true" ]; then + TAGS=$(echo $RESULT | jq -r '.tags | join(",")') + + if echo "$TAGS" | grep -q "heavy"; then + WORKER_TYPE="ccx32" + else + WORKER_TYPE="ccx22" + fi + + materia pipeline run transform --worker $WORKER_TYPE +fi +``` + +**Tag Conventions:** +- `heavy`: Requires larger worker (ccx32+) +- `long_running`: Expected to take >1 hour +- `gpu`: Requires GPU instance (future) +- `distributed`: Could benefit from multi-node execution (future) + +--- + +## Multi-Pipeline Support + +### Current State +- Only extract and transform pipelines +- Extract doesn't use SQLMesh scheduling + +### Enhancement Idea +Make extract pipeline also SQLMesh-aware, add more pipeline types: + +**New Pipelines:** +- `dbt`: For dbt models (if we add dbt support) +- `ml`: For machine learning model training +- `export`: For exporting data to external systems + +**Extract with Scheduling:** +Instead of fixed schedule, use SQLMesh to track last successful extract: + +```sql +-- models/raw/psd_alldata.sql +MODEL ( + name raw.psd_alldata, + kind INCREMENTAL_BY_TIME_RANGE, + cron '@daily', -- SQLMesh tracks this! + tags ['extract'] +) +``` + +Then supervisor checks if extract needs running based on SQLMesh state. + +--- + +## Distributed Transform Execution + +### Current State +- All SQLMesh models run on a single worker +- Large transforms can be slow + +### Enhancement Idea +For models tagged `distributed`, split execution across multiple workers: + +```python +# Detect heavy models that would benefit from parallelization +result = check_pipeline("transform") + +if "distributed" in result.tags: + # Split models into batches based on dependency graph + batches = split_execution_graph(result.models) + + # Create one worker per batch + workers = [create_worker(f"transform-{i}", "ccx22") for i in range(len(batches))] + + # Execute in parallel + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(run_pipeline_on_worker, worker, batch) + for worker, batch in zip(workers, batches) + ] +``` + +--- + +## Cost Optimization with Multi-Cloud + +### Current State +- Only supports Hetzner Cloud +- Provider abstraction exists but not utilized + +### Enhancement Idea +Check spot prices across multiple providers and use cheapest: + +```python +# src/materia/providers/pricing.py +def get_cheapest_provider(worker_type: str) -> str: + prices = { + "hetzner": get_hetzner_spot_price(worker_type), + "ovh": get_ovh_spot_price(worker_type), + "scaleway": get_scaleway_spot_price(worker_type), + } + return min(prices, key=prices.get) + +# In supervisor.sh or materia CLI +PROVIDER=$(materia provider cheapest --type ccx22) +materia pipeline run transform --provider $PROVIDER +``` + +--- + +## Observability Enhancements + +### Current State +- Logs go to systemd journal +- No metrics or alerting + +### Enhancement Ideas + +**1. Prometheus Metrics:** +```bash +# Expose metrics endpoint on supervisor +curl http://supervisor:9090/metrics + +# Example metrics: +materia_pipeline_runs_total{pipeline="extract",status="success"} 42 +materia_pipeline_duration_seconds{pipeline="transform"} 1234 +materia_worker_cost_euros{pipeline="extract"} 0.05 +``` + +**2. Alerting:** +```yaml +# alerts/pipelines.yml +- alert: PipelineFailure + expr: rate(materia_pipeline_runs_total{status="failure"}[1h]) > 0 + for: 5m + annotations: + summary: Pipeline {{ $labels.pipeline }} is failing +``` + +**3. Web Dashboard:** +Simple web UI showing: +- Current supervisor status +- Recent pipeline runs +- Active workers +- Cost tracking + +--- + +## Self-Healing Capabilities + +### Enhancement Idea +Supervisor automatically handles common failure modes: + +```bash +# In supervisor.sh +run_with_retry() { + local pipeline=$1 + local max_retries=3 + local retry_count=0 + + while [ $retry_count -lt $max_retries ]; do + if materia pipeline run $pipeline; then + return 0 + fi + + retry_count=$((retry_count + 1)) + log "Pipeline $pipeline failed, retry $retry_count/$max_retries" + + # Exponential backoff + sleep $((2 ** retry_count * 60)) + done + + log_error "Pipeline $pipeline failed after $max_retries retries" + # Send alert + return 1 +} +``` + +--- + +## Priority Queuing + +### Current State +- Pipelines run sequentially based on schedule + +### Enhancement Idea +Add priority system for pipeline execution: + +```python +@dataclass +class PipelineConfig: + priority: int = 0 # Higher = more important + +PIPELINES = { + "extract": PipelineConfig(priority=10, ...), # Highest + "transform": PipelineConfig(priority=5, ...), + "export": PipelineConfig(priority=1, ...), # Lowest +} +``` + +Supervisor maintains a priority queue and executes highest-priority work first. + +--- + +## Notes +- These are ideas for future consideration, not immediate requirements +- Implement incrementally as needs arise +- Keep simplicity as a guiding principle diff --git a/infra/__main__.py b/infra/__main__.py index 85e6054..1f9e5ab 100644 --- a/infra/__main__.py +++ b/infra/__main__.py @@ -16,31 +16,26 @@ hetzner_location = config.get("hetzner_location") or "nbg1" # Nuremberg datacen # Cloudflare R2 Storage + Data Catalog (Iceberg) # ============================================================ -# R2 bucket for raw data (extraction outputs) -raw_bucket = cloudflare.R2Bucket( - "materia-raw", +# R2 bucket for artifacts (CLI + extract/transform packages) +# Note: Import existing bucket with: +# pulumi import cloudflare:index/r2Bucket:R2Bucket materia-artifacts /beanflows-artifacts +artifacts_bucket = cloudflare.R2Bucket( + "materia-artifacts", account_id=cloudflare_account_id, - name="materia-raw", + name="beanflows-artifacts", location="weur", # Western Europe ) # R2 bucket for lakehouse (Iceberg tables) +# Note: Import existing bucket with: +# pulumi import cloudflare:index/r2Bucket:R2Bucket materia-lakehouse /beanflows-data-prod lakehouse_bucket = cloudflare.R2Bucket( "materia-lakehouse", account_id=cloudflare_account_id, - name="materia-lakehouse", + name="beanflows-data-prod", location="weur", ) -# TODO: Enable R2 Data Catalog (Iceberg) on lakehouse bucket -# Note: As of Oct 2025, R2 Data Catalog is in public beta -# May need to enable via Cloudflare dashboard or API once SDK supports it -# For now, document manual step in README - -# API token for R2 access (needs R2 + Data Catalog permissions) -# Note: Create this manually in Cloudflare dashboard and store in Pulumi config -# pulumi config set --secret cloudflare_r2_token - # ============================================================ # Hetzner Cloud Infrastructure # ============================================================ @@ -52,57 +47,41 @@ ssh_key = hcloud.SshKey( public_key=config.require_secret("ssh_public_key"), ) -# Small CCX instance for scheduler/orchestrator -# This runs the cron scheduler + lightweight tasks -scheduler_server = hcloud.Server( - "materia-scheduler", - name="materia-scheduler", - server_type="ccx12", # 2 vCPU, 8GB RAM, ~€6/mo +# Small CCX instance for supervisor (runs materia CLI to orchestrate pipelines) +# This is an always-on instance that creates/destroys ephemeral workers on-demand +supervisor_server = hcloud.Server( + "materia-supervisor", + name="materia-supervisor", + server_type="ccx11", # 2 vCPU, 4GB RAM, ~€4/mo (cheapest option) image="ubuntu-24.04", location=hetzner_location, ssh_keys=[ssh_key.id], labels={ - "role": "scheduler", + "role": "supervisor", "project": "materia", }, user_data="""#!/bin/bash +set -e + # Basic server setup apt-get update -apt-get install -y python3.13 python3-pip git curl +apt-get install -y python3.13 python3-pip curl unzip -# Install uv -curl -LsSf https://astral.sh/uv/install.sh | sh +# Install Pulumi ESC CLI +curl -fsSL https://get.pulumi.com/esc/install.sh | sh +export PATH="$HOME/.pulumi/bin:$PATH" +echo 'export PATH="$HOME/.pulumi/bin:$PATH"' >> /root/.bashrc + +# Create deployment directory +mkdir -p /opt/materia # Configure environment -echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> /root/.bashrc +echo 'Setup complete. Materia CLI will be deployed via CI/CD.' > /opt/materia/README.txt """, ) -# Larger CCX instance for heavy SQLMesh workloads -# This gets spun up on-demand for big transformations -worker_server = hcloud.Server( - "materia-worker-01", - name="materia-worker-01", - server_type="ccx22", # 4 vCPU, 16GB RAM, ~€24/mo - image="ubuntu-24.04", - location=hetzner_location, - ssh_keys=[ssh_key.id], - labels={ - "role": "worker", - "project": "materia", - }, - user_data="""#!/bin/bash -# Basic server setup -apt-get update -apt-get install -y python3.13 python3-pip git curl - -# Install uv -curl -LsSf https://astral.sh/uv/install.sh | sh - -# Configure environment -echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> /root/.bashrc -""", -) +# Note: Workers are created on-demand by the materia CLI +# No always-on worker instances in this architecture # Firewall for servers (restrict to SSH + outbound only) firewall = hcloud.Firewall( @@ -132,27 +111,20 @@ firewall = hcloud.Firewall( ], ) -# Apply firewall to all servers -scheduler_firewall = hcloud.FirewallAttachment( - "scheduler-firewall", +# Apply firewall to supervisor +supervisor_firewall = hcloud.FirewallAttachment( + "supervisor-firewall", firewall_id=firewall.id, - server_ids=[scheduler_server.id], -) - -worker_firewall = hcloud.FirewallAttachment( - "worker-firewall", - firewall_id=firewall.id, - server_ids=[worker_server.id], + server_ids=[supervisor_server.id], ) # ============================================================ # Outputs # ============================================================ -pulumi.export("raw_bucket_name", raw_bucket.name) +pulumi.export("artifacts_bucket_name", artifacts_bucket.name) pulumi.export("lakehouse_bucket_name", lakehouse_bucket.name) -pulumi.export("scheduler_ip", scheduler_server.ipv4_address) -pulumi.export("worker_ip", worker_server.ipv4_address) +pulumi.export("supervisor_ip", supervisor_server.ipv4_address) # Export connection info for DuckDB pulumi.export( diff --git a/infra/supervisor/materia-supervisor.service b/infra/supervisor/materia-supervisor.service new file mode 100644 index 0000000..a35aca0 --- /dev/null +++ b/infra/supervisor/materia-supervisor.service @@ -0,0 +1,29 @@ +[Unit] +Description=Materia Supervisor - Pipeline Orchestration +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +User=root +WorkingDirectory=/opt/materia +Environment="PATH=/usr/local/bin:/usr/bin:/bin:/root/.pulumi/bin" +EnvironmentFile=/opt/materia/.env + +# Restart policy +Restart=always +RestartSec=10 + +# Resource limits +LimitNOFILE=65536 + +# Logging +StandardOutput=journal +StandardError=journal +SyslogIdentifier=materia-supervisor + +# Execute supervisor script +ExecStart=/opt/materia/supervisor.sh + +[Install] +WantedBy=multi-user.target diff --git a/infra/supervisor/supervisor.sh b/infra/supervisor/supervisor.sh new file mode 100644 index 0000000..0a15430 --- /dev/null +++ b/infra/supervisor/supervisor.sh @@ -0,0 +1,184 @@ +#!/bin/bash +# Materia Supervisor - Continuous pipeline orchestration +# Inspired by TigerBeetle's CFO supervisor pattern +# https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh + +set -euo pipefail + +# Configuration +readonly CHECK_INTERVAL=900 # 15 minutes +readonly CLI_VERSION_CHECK_INTERVAL=3600 # 1 hour +readonly MATERIA_DIR="/opt/materia" +readonly R2_ARTIFACTS_URL="https://${R2_ENDPOINT}/${R2_ARTIFACTS_BUCKET}" +readonly CLI_ARTIFACT="materia-cli-latest.tar.gz" + +# Schedules (cron-style times in UTC) +readonly EXTRACT_SCHEDULE_HOUR=2 # 02:00 UTC +readonly TRANSFORM_SCHEDULE_HOUR=3 # 03:00 UTC + +# State tracking +last_extract_run="" +last_transform_run="" +last_cli_check=0 + +log() { + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" +} + +log_error() { + echo "[$(date '+%Y-%m-%d %H:%M:%S')] ERROR: $*" >&2 +} + +# Check if CLI needs updating +check_cli_update() { + local now + now=$(date +%s) + + # Only check once per hour + if (( now - last_cli_check < CLI_VERSION_CHECK_INTERVAL )); then + return 0 + fi + + last_cli_check=$now + + log "Checking for CLI updates..." + + # Download new version + local temp_file="${MATERIA_DIR}/cli-new.tar.gz" + if ! curl -fsSL -o "$temp_file" "${R2_ARTIFACTS_URL}/${CLI_ARTIFACT}"; then + log_error "Failed to download CLI artifact" + return 1 + fi + + # Compare checksums + local old_checksum="" + local new_checksum + + if [ -f "${MATERIA_DIR}/${CLI_ARTIFACT}" ]; then + old_checksum=$(sha256sum "${MATERIA_DIR}/${CLI_ARTIFACT}" | awk '{print $1}') + fi + + new_checksum=$(sha256sum "$temp_file" | awk '{print $1}') + + if [ "$old_checksum" = "$new_checksum" ]; then + log "CLI is up to date" + rm -f "$temp_file" + return 0 + fi + + log "New CLI version detected, updating..." + + # Install new version + mv "$temp_file" "${MATERIA_DIR}/${CLI_ARTIFACT}" + + cd "$MATERIA_DIR" + rm -rf cli && mkdir -p cli + tar -xzf "$CLI_ARTIFACT" -C cli/ + + if pip3 install --force-reinstall cli/*.whl; then + log "CLI updated successfully" + materia version + else + log_error "Failed to install CLI" + return 1 + fi +} + +# Check if we should run extract pipeline (daily at specified hour) +should_run_extract() { + local current_hour + local current_date + + current_hour=$(date -u +%H) + current_date=$(date -u +%Y-%m-%d) + + # Only run at the scheduled hour + if [ "$current_hour" != "$EXTRACT_SCHEDULE_HOUR" ]; then + return 1 + fi + + # Only run once per day + if [ "$last_extract_run" = "$current_date" ]; then + return 1 + fi + + return 0 +} + +# Check if we should run transform pipeline (daily at specified hour) +should_run_transform() { + local current_hour + local current_date + + current_hour=$(date -u +%H) + current_date=$(date -u +%Y-%m-%d) + + # Only run at the scheduled hour + if [ "$current_hour" != "$TRANSFORM_SCHEDULE_HOUR" ]; then + return 1 + fi + + # Only run once per day + if [ "$last_transform_run" = "$current_date" ]; then + return 1 + fi + + return 0 +} + +# Run extract pipeline +run_extract() { + log "Starting extract pipeline..." + + if materia pipeline run extract; then + log "Extract pipeline completed successfully" + last_extract_run=$(date -u +%Y-%m-%d) + else + log_error "Extract pipeline failed" + return 1 + fi +} + +# Run transform pipeline +run_transform() { + log "Starting transform pipeline..." + + if materia pipeline run transform; then + log "Transform pipeline completed successfully" + last_transform_run=$(date -u +%Y-%m-%d) + else + log_error "Transform pipeline failed" + return 1 + fi +} + +# Main supervisor loop +main() { + log "Materia supervisor starting..." + log "Extract schedule: daily at ${EXTRACT_SCHEDULE_HOUR}:00 UTC" + log "Transform schedule: daily at ${TRANSFORM_SCHEDULE_HOUR}:00 UTC" + log "Check interval: ${CHECK_INTERVAL}s" + + # Initial CLI check + check_cli_update || log_error "Initial CLI check failed, continuing anyway" + + while true; do + # Check for CLI updates + check_cli_update || true + + # Check and run extract pipeline + if should_run_extract; then + run_extract || true + fi + + # Check and run transform pipeline + if should_run_transform; then + run_transform || true + fi + + sleep "$CHECK_INTERVAL" + done +} + +# Run main loop +main