Add supervisor deployment with continuous pipeline orchestration

Implements automated supervisor instance deployment that runs scheduled
pipelines using a TigerBeetle-inspired continuous orchestration pattern.

Infrastructure changes:
- Update Pulumi to use existing R2 buckets (beanflows-artifacts, beanflows-data-prod)
- Rename scheduler → supervisor, optimize to CCX11 (€4/mo)
- Remove always-on worker (workers are now ephemeral only)
- Add artifacts bucket resource for CLI/pipeline packages

Supervisor architecture:
- supervisor.sh: Continuous loop checking schedules every 15 minutes
- Self-updating: Checks for new CLI versions hourly
- Fixed schedules: Extract at 2 AM UTC, Transform at 3 AM UTC
- systemd service for automatic restart on failure
- Logs to systemd journal for observability

CI/CD changes:
- deploy:infra now runs on every master push (not just on changes)
- New deploy:supervisor job:
  * Deploys supervisor.sh and systemd service
  * Installs latest materia CLI from R2
  * Configures environment with Pulumi ESC secrets
  * Restarts supervisor service

Future enhancements documented:
- SQLMesh-aware scheduling (check models before running)
- Model tags for worker sizing (heavy/distributed hints)
- Multi-pipeline support, distributed execution
- Cost optimization with multi-cloud spot pricing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Deeman
2025-10-12 22:23:55 +02:00
parent 7e6ff29dea
commit f207fb441d
6 changed files with 648 additions and 79 deletions

View File

@@ -0,0 +1,295 @@
# Future Enhancement Ideas
This document captures ideas for future improvements to the Materia infrastructure and pipeline orchestration.
## Smart SQLMesh-Aware Scheduling
### Current State
- Extract pipeline runs on a fixed schedule (daily at 2 AM UTC)
- Transform pipeline runs on a fixed schedule (daily at 3 AM UTC)
- No awareness of whether SQLMesh models actually need refreshing
### Enhancement Idea
Add `materia pipeline check <name>` command that uses SQLMesh's internal scheduler to determine if work is needed:
```bash
# Check if any models need refresh
materia pipeline check transform
# Returns:
# - Exit code 0 if work needed
# - Exit code 1 if nothing to do
# - Prints list of models that would be updated
```
**Implementation:**
```python
# src/materia/pipelines.py
def check_pipeline(pipeline_name: str) -> PipelineCheckResult:
"""Check if pipeline has work to do without creating a worker."""
if pipeline_name == "transform":
# Run sqlmesh plan --dry-run locally
# Parse output to see what models need refresh
# Return list of models + their tags
pass
```
**Supervisor Integration:**
```bash
# In supervisor.sh
if materia pipeline check transform; then
echo "Transform has work to do, scheduling..."
materia pipeline run transform
else
echo "Transform up to date, skipping"
fi
```
---
## Model Tags for Worker Sizing
### Current State
- All transform pipelines run on the same worker type (ccx22)
- No way to specify that certain models need more resources
### Enhancement Idea
Use SQLMesh model tags to hint at resource requirements:
```sql
-- models/serving/obt_commodity_metrics.sql
MODEL (
name serving.obt_commodity_metrics,
kind INCREMENTAL_BY_TIME_RANGE,
cron '@daily',
tags ['heavy', 'long_running'] -- Supervisor uses this!
)
```
**Pipeline Check Enhancement:**
```python
@dataclass
class PipelineCheckResult:
has_work: bool
models: list[str]
tags: set[str] # Aggregated tags from all models needing refresh
# Check if any heavy models need refresh
result = check_pipeline("transform")
if "heavy" in result.tags:
worker_type = "ccx32" # Use bigger worker
else:
worker_type = "ccx22" # Default worker
```
**Supervisor Integration:**
```bash
# In supervisor.sh
RESULT=$(materia pipeline check transform --format json)
if [ "$(echo $RESULT | jq -r '.has_work')" = "true" ]; then
TAGS=$(echo $RESULT | jq -r '.tags | join(",")')
if echo "$TAGS" | grep -q "heavy"; then
WORKER_TYPE="ccx32"
else
WORKER_TYPE="ccx22"
fi
materia pipeline run transform --worker $WORKER_TYPE
fi
```
**Tag Conventions:**
- `heavy`: Requires larger worker (ccx32+)
- `long_running`: Expected to take >1 hour
- `gpu`: Requires GPU instance (future)
- `distributed`: Could benefit from multi-node execution (future)
---
## Multi-Pipeline Support
### Current State
- Only extract and transform pipelines
- Extract doesn't use SQLMesh scheduling
### Enhancement Idea
Make extract pipeline also SQLMesh-aware, add more pipeline types:
**New Pipelines:**
- `dbt`: For dbt models (if we add dbt support)
- `ml`: For machine learning model training
- `export`: For exporting data to external systems
**Extract with Scheduling:**
Instead of fixed schedule, use SQLMesh to track last successful extract:
```sql
-- models/raw/psd_alldata.sql
MODEL (
name raw.psd_alldata,
kind INCREMENTAL_BY_TIME_RANGE,
cron '@daily', -- SQLMesh tracks this!
tags ['extract']
)
```
Then supervisor checks if extract needs running based on SQLMesh state.
---
## Distributed Transform Execution
### Current State
- All SQLMesh models run on a single worker
- Large transforms can be slow
### Enhancement Idea
For models tagged `distributed`, split execution across multiple workers:
```python
# Detect heavy models that would benefit from parallelization
result = check_pipeline("transform")
if "distributed" in result.tags:
# Split models into batches based on dependency graph
batches = split_execution_graph(result.models)
# Create one worker per batch
workers = [create_worker(f"transform-{i}", "ccx22") for i in range(len(batches))]
# Execute in parallel
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(run_pipeline_on_worker, worker, batch)
for worker, batch in zip(workers, batches)
]
```
---
## Cost Optimization with Multi-Cloud
### Current State
- Only supports Hetzner Cloud
- Provider abstraction exists but not utilized
### Enhancement Idea
Check spot prices across multiple providers and use cheapest:
```python
# src/materia/providers/pricing.py
def get_cheapest_provider(worker_type: str) -> str:
prices = {
"hetzner": get_hetzner_spot_price(worker_type),
"ovh": get_ovh_spot_price(worker_type),
"scaleway": get_scaleway_spot_price(worker_type),
}
return min(prices, key=prices.get)
# In supervisor.sh or materia CLI
PROVIDER=$(materia provider cheapest --type ccx22)
materia pipeline run transform --provider $PROVIDER
```
---
## Observability Enhancements
### Current State
- Logs go to systemd journal
- No metrics or alerting
### Enhancement Ideas
**1. Prometheus Metrics:**
```bash
# Expose metrics endpoint on supervisor
curl http://supervisor:9090/metrics
# Example metrics:
materia_pipeline_runs_total{pipeline="extract",status="success"} 42
materia_pipeline_duration_seconds{pipeline="transform"} 1234
materia_worker_cost_euros{pipeline="extract"} 0.05
```
**2. Alerting:**
```yaml
# alerts/pipelines.yml
- alert: PipelineFailure
expr: rate(materia_pipeline_runs_total{status="failure"}[1h]) > 0
for: 5m
annotations:
summary: Pipeline {{ $labels.pipeline }} is failing
```
**3. Web Dashboard:**
Simple web UI showing:
- Current supervisor status
- Recent pipeline runs
- Active workers
- Cost tracking
---
## Self-Healing Capabilities
### Enhancement Idea
Supervisor automatically handles common failure modes:
```bash
# In supervisor.sh
run_with_retry() {
local pipeline=$1
local max_retries=3
local retry_count=0
while [ $retry_count -lt $max_retries ]; do
if materia pipeline run $pipeline; then
return 0
fi
retry_count=$((retry_count + 1))
log "Pipeline $pipeline failed, retry $retry_count/$max_retries"
# Exponential backoff
sleep $((2 ** retry_count * 60))
done
log_error "Pipeline $pipeline failed after $max_retries retries"
# Send alert
return 1
}
```
---
## Priority Queuing
### Current State
- Pipelines run sequentially based on schedule
### Enhancement Idea
Add priority system for pipeline execution:
```python
@dataclass
class PipelineConfig:
priority: int = 0 # Higher = more important
PIPELINES = {
"extract": PipelineConfig(priority=10, ...), # Highest
"transform": PipelineConfig(priority=5, ...),
"export": PipelineConfig(priority=1, ...), # Lowest
}
```
Supervisor maintains a priority queue and executes highest-priority work first.
---
## Notes
- These are ideas for future consideration, not immediate requirements
- Implement incrementally as needs arise
- Keep simplicity as a guiding principle

View File

@@ -16,31 +16,26 @@ hetzner_location = config.get("hetzner_location") or "nbg1" # Nuremberg datacen
# Cloudflare R2 Storage + Data Catalog (Iceberg)
# ============================================================
# R2 bucket for raw data (extraction outputs)
raw_bucket = cloudflare.R2Bucket(
"materia-raw",
# R2 bucket for artifacts (CLI + extract/transform packages)
# Note: Import existing bucket with:
# pulumi import cloudflare:index/r2Bucket:R2Bucket materia-artifacts <account_id>/beanflows-artifacts
artifacts_bucket = cloudflare.R2Bucket(
"materia-artifacts",
account_id=cloudflare_account_id,
name="materia-raw",
name="beanflows-artifacts",
location="weur", # Western Europe
)
# R2 bucket for lakehouse (Iceberg tables)
# Note: Import existing bucket with:
# pulumi import cloudflare:index/r2Bucket:R2Bucket materia-lakehouse <account_id>/beanflows-data-prod
lakehouse_bucket = cloudflare.R2Bucket(
"materia-lakehouse",
account_id=cloudflare_account_id,
name="materia-lakehouse",
name="beanflows-data-prod",
location="weur",
)
# TODO: Enable R2 Data Catalog (Iceberg) on lakehouse bucket
# Note: As of Oct 2025, R2 Data Catalog is in public beta
# May need to enable via Cloudflare dashboard or API once SDK supports it
# For now, document manual step in README
# API token for R2 access (needs R2 + Data Catalog permissions)
# Note: Create this manually in Cloudflare dashboard and store in Pulumi config
# pulumi config set --secret cloudflare_r2_token <token>
# ============================================================
# Hetzner Cloud Infrastructure
# ============================================================
@@ -52,57 +47,41 @@ ssh_key = hcloud.SshKey(
public_key=config.require_secret("ssh_public_key"),
)
# Small CCX instance for scheduler/orchestrator
# This runs the cron scheduler + lightweight tasks
scheduler_server = hcloud.Server(
"materia-scheduler",
name="materia-scheduler",
server_type="ccx12", # 2 vCPU, 8GB RAM, ~€6/mo
# Small CCX instance for supervisor (runs materia CLI to orchestrate pipelines)
# This is an always-on instance that creates/destroys ephemeral workers on-demand
supervisor_server = hcloud.Server(
"materia-supervisor",
name="materia-supervisor",
server_type="ccx11", # 2 vCPU, 4GB RAM, ~€4/mo (cheapest option)
image="ubuntu-24.04",
location=hetzner_location,
ssh_keys=[ssh_key.id],
labels={
"role": "scheduler",
"role": "supervisor",
"project": "materia",
},
user_data="""#!/bin/bash
set -e
# Basic server setup
apt-get update
apt-get install -y python3.13 python3-pip git curl
apt-get install -y python3.13 python3-pip curl unzip
# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh
# Install Pulumi ESC CLI
curl -fsSL https://get.pulumi.com/esc/install.sh | sh
export PATH="$HOME/.pulumi/bin:$PATH"
echo 'export PATH="$HOME/.pulumi/bin:$PATH"' >> /root/.bashrc
# Create deployment directory
mkdir -p /opt/materia
# Configure environment
echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> /root/.bashrc
echo 'Setup complete. Materia CLI will be deployed via CI/CD.' > /opt/materia/README.txt
""",
)
# Larger CCX instance for heavy SQLMesh workloads
# This gets spun up on-demand for big transformations
worker_server = hcloud.Server(
"materia-worker-01",
name="materia-worker-01",
server_type="ccx22", # 4 vCPU, 16GB RAM, ~€24/mo
image="ubuntu-24.04",
location=hetzner_location,
ssh_keys=[ssh_key.id],
labels={
"role": "worker",
"project": "materia",
},
user_data="""#!/bin/bash
# Basic server setup
apt-get update
apt-get install -y python3.13 python3-pip git curl
# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh
# Configure environment
echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> /root/.bashrc
""",
)
# Note: Workers are created on-demand by the materia CLI
# No always-on worker instances in this architecture
# Firewall for servers (restrict to SSH + outbound only)
firewall = hcloud.Firewall(
@@ -132,27 +111,20 @@ firewall = hcloud.Firewall(
],
)
# Apply firewall to all servers
scheduler_firewall = hcloud.FirewallAttachment(
"scheduler-firewall",
# Apply firewall to supervisor
supervisor_firewall = hcloud.FirewallAttachment(
"supervisor-firewall",
firewall_id=firewall.id,
server_ids=[scheduler_server.id],
)
worker_firewall = hcloud.FirewallAttachment(
"worker-firewall",
firewall_id=firewall.id,
server_ids=[worker_server.id],
server_ids=[supervisor_server.id],
)
# ============================================================
# Outputs
# ============================================================
pulumi.export("raw_bucket_name", raw_bucket.name)
pulumi.export("artifacts_bucket_name", artifacts_bucket.name)
pulumi.export("lakehouse_bucket_name", lakehouse_bucket.name)
pulumi.export("scheduler_ip", scheduler_server.ipv4_address)
pulumi.export("worker_ip", worker_server.ipv4_address)
pulumi.export("supervisor_ip", supervisor_server.ipv4_address)
# Export connection info for DuckDB
pulumi.export(

View File

@@ -0,0 +1,29 @@
[Unit]
Description=Materia Supervisor - Pipeline Orchestration
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=root
WorkingDirectory=/opt/materia
Environment="PATH=/usr/local/bin:/usr/bin:/bin:/root/.pulumi/bin"
EnvironmentFile=/opt/materia/.env
# Restart policy
Restart=always
RestartSec=10
# Resource limits
LimitNOFILE=65536
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=materia-supervisor
# Execute supervisor script
ExecStart=/opt/materia/supervisor.sh
[Install]
WantedBy=multi-user.target

View File

@@ -0,0 +1,184 @@
#!/bin/bash
# Materia Supervisor - Continuous pipeline orchestration
# Inspired by TigerBeetle's CFO supervisor pattern
# https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh
set -euo pipefail
# Configuration
readonly CHECK_INTERVAL=900 # 15 minutes
readonly CLI_VERSION_CHECK_INTERVAL=3600 # 1 hour
readonly MATERIA_DIR="/opt/materia"
readonly R2_ARTIFACTS_URL="https://${R2_ENDPOINT}/${R2_ARTIFACTS_BUCKET}"
readonly CLI_ARTIFACT="materia-cli-latest.tar.gz"
# Schedules (cron-style times in UTC)
readonly EXTRACT_SCHEDULE_HOUR=2 # 02:00 UTC
readonly TRANSFORM_SCHEDULE_HOUR=3 # 03:00 UTC
# State tracking
last_extract_run=""
last_transform_run=""
last_cli_check=0
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*"
}
log_error() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] ERROR: $*" >&2
}
# Check if CLI needs updating
check_cli_update() {
local now
now=$(date +%s)
# Only check once per hour
if (( now - last_cli_check < CLI_VERSION_CHECK_INTERVAL )); then
return 0
fi
last_cli_check=$now
log "Checking for CLI updates..."
# Download new version
local temp_file="${MATERIA_DIR}/cli-new.tar.gz"
if ! curl -fsSL -o "$temp_file" "${R2_ARTIFACTS_URL}/${CLI_ARTIFACT}"; then
log_error "Failed to download CLI artifact"
return 1
fi
# Compare checksums
local old_checksum=""
local new_checksum
if [ -f "${MATERIA_DIR}/${CLI_ARTIFACT}" ]; then
old_checksum=$(sha256sum "${MATERIA_DIR}/${CLI_ARTIFACT}" | awk '{print $1}')
fi
new_checksum=$(sha256sum "$temp_file" | awk '{print $1}')
if [ "$old_checksum" = "$new_checksum" ]; then
log "CLI is up to date"
rm -f "$temp_file"
return 0
fi
log "New CLI version detected, updating..."
# Install new version
mv "$temp_file" "${MATERIA_DIR}/${CLI_ARTIFACT}"
cd "$MATERIA_DIR"
rm -rf cli && mkdir -p cli
tar -xzf "$CLI_ARTIFACT" -C cli/
if pip3 install --force-reinstall cli/*.whl; then
log "CLI updated successfully"
materia version
else
log_error "Failed to install CLI"
return 1
fi
}
# Check if we should run extract pipeline (daily at specified hour)
should_run_extract() {
local current_hour
local current_date
current_hour=$(date -u +%H)
current_date=$(date -u +%Y-%m-%d)
# Only run at the scheduled hour
if [ "$current_hour" != "$EXTRACT_SCHEDULE_HOUR" ]; then
return 1
fi
# Only run once per day
if [ "$last_extract_run" = "$current_date" ]; then
return 1
fi
return 0
}
# Check if we should run transform pipeline (daily at specified hour)
should_run_transform() {
local current_hour
local current_date
current_hour=$(date -u +%H)
current_date=$(date -u +%Y-%m-%d)
# Only run at the scheduled hour
if [ "$current_hour" != "$TRANSFORM_SCHEDULE_HOUR" ]; then
return 1
fi
# Only run once per day
if [ "$last_transform_run" = "$current_date" ]; then
return 1
fi
return 0
}
# Run extract pipeline
run_extract() {
log "Starting extract pipeline..."
if materia pipeline run extract; then
log "Extract pipeline completed successfully"
last_extract_run=$(date -u +%Y-%m-%d)
else
log_error "Extract pipeline failed"
return 1
fi
}
# Run transform pipeline
run_transform() {
log "Starting transform pipeline..."
if materia pipeline run transform; then
log "Transform pipeline completed successfully"
last_transform_run=$(date -u +%Y-%m-%d)
else
log_error "Transform pipeline failed"
return 1
fi
}
# Main supervisor loop
main() {
log "Materia supervisor starting..."
log "Extract schedule: daily at ${EXTRACT_SCHEDULE_HOUR}:00 UTC"
log "Transform schedule: daily at ${TRANSFORM_SCHEDULE_HOUR}:00 UTC"
log "Check interval: ${CHECK_INTERVAL}s"
# Initial CLI check
check_cli_update || log_error "Initial CLI check failed, continuing anyway"
while true; do
# Check for CLI updates
check_cli_update || true
# Check and run extract pipeline
if should_run_extract; then
run_extract || true
fi
# Check and run transform pipeline
if should_run_transform; then
run_transform || true
fi
sleep "$CHECK_INTERVAL"
done
}
# Run main loop
main