Implements automated supervisor instance deployment that runs scheduled pipelines using a TigerBeetle-inspired continuous orchestration pattern. Infrastructure changes: - Update Pulumi to use existing R2 buckets (beanflows-artifacts, beanflows-data-prod) - Rename scheduler → supervisor, optimize to CCX11 (€4/mo) - Remove always-on worker (workers are now ephemeral only) - Add artifacts bucket resource for CLI/pipeline packages Supervisor architecture: - supervisor.sh: Continuous loop checking schedules every 15 minutes - Self-updating: Checks for new CLI versions hourly - Fixed schedules: Extract at 2 AM UTC, Transform at 3 AM UTC - systemd service for automatic restart on failure - Logs to systemd journal for observability CI/CD changes: - deploy:infra now runs on every master push (not just on changes) - New deploy:supervisor job: * Deploys supervisor.sh and systemd service * Installs latest materia CLI from R2 * Configures environment with Pulumi ESC secrets * Restarts supervisor service Future enhancements documented: - SQLMesh-aware scheduling (check models before running) - Model tags for worker sizing (heavy/distributed hints) - Multi-pipeline support, distributed execution - Cost optimization with multi-cloud spot pricing 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
7.0 KiB
Future Enhancement Ideas
This document captures ideas for future improvements to the Materia infrastructure and pipeline orchestration.
Smart SQLMesh-Aware Scheduling
Current State
- Extract pipeline runs on a fixed schedule (daily at 2 AM UTC)
- Transform pipeline runs on a fixed schedule (daily at 3 AM UTC)
- No awareness of whether SQLMesh models actually need refreshing
Enhancement Idea
Add materia pipeline check <name> command that uses SQLMesh's internal scheduler to determine if work is needed:
# Check if any models need refresh
materia pipeline check transform
# Returns:
# - Exit code 0 if work needed
# - Exit code 1 if nothing to do
# - Prints list of models that would be updated
Implementation:
# src/materia/pipelines.py
def check_pipeline(pipeline_name: str) -> PipelineCheckResult:
"""Check if pipeline has work to do without creating a worker."""
if pipeline_name == "transform":
# Run sqlmesh plan --dry-run locally
# Parse output to see what models need refresh
# Return list of models + their tags
pass
Supervisor Integration:
# In supervisor.sh
if materia pipeline check transform; then
echo "Transform has work to do, scheduling..."
materia pipeline run transform
else
echo "Transform up to date, skipping"
fi
Model Tags for Worker Sizing
Current State
- All transform pipelines run on the same worker type (ccx22)
- No way to specify that certain models need more resources
Enhancement Idea
Use SQLMesh model tags to hint at resource requirements:
-- models/serving/obt_commodity_metrics.sql
MODEL (
name serving.obt_commodity_metrics,
kind INCREMENTAL_BY_TIME_RANGE,
cron '@daily',
tags ['heavy', 'long_running'] -- Supervisor uses this!
)
Pipeline Check Enhancement:
@dataclass
class PipelineCheckResult:
has_work: bool
models: list[str]
tags: set[str] # Aggregated tags from all models needing refresh
# Check if any heavy models need refresh
result = check_pipeline("transform")
if "heavy" in result.tags:
worker_type = "ccx32" # Use bigger worker
else:
worker_type = "ccx22" # Default worker
Supervisor Integration:
# In supervisor.sh
RESULT=$(materia pipeline check transform --format json)
if [ "$(echo $RESULT | jq -r '.has_work')" = "true" ]; then
TAGS=$(echo $RESULT | jq -r '.tags | join(",")')
if echo "$TAGS" | grep -q "heavy"; then
WORKER_TYPE="ccx32"
else
WORKER_TYPE="ccx22"
fi
materia pipeline run transform --worker $WORKER_TYPE
fi
Tag Conventions:
heavy: Requires larger worker (ccx32+)long_running: Expected to take >1 hourgpu: Requires GPU instance (future)distributed: Could benefit from multi-node execution (future)
Multi-Pipeline Support
Current State
- Only extract and transform pipelines
- Extract doesn't use SQLMesh scheduling
Enhancement Idea
Make extract pipeline also SQLMesh-aware, add more pipeline types:
New Pipelines:
dbt: For dbt models (if we add dbt support)ml: For machine learning model trainingexport: For exporting data to external systems
Extract with Scheduling: Instead of fixed schedule, use SQLMesh to track last successful extract:
-- models/raw/psd_alldata.sql
MODEL (
name raw.psd_alldata,
kind INCREMENTAL_BY_TIME_RANGE,
cron '@daily', -- SQLMesh tracks this!
tags ['extract']
)
Then supervisor checks if extract needs running based on SQLMesh state.
Distributed Transform Execution
Current State
- All SQLMesh models run on a single worker
- Large transforms can be slow
Enhancement Idea
For models tagged distributed, split execution across multiple workers:
# Detect heavy models that would benefit from parallelization
result = check_pipeline("transform")
if "distributed" in result.tags:
# Split models into batches based on dependency graph
batches = split_execution_graph(result.models)
# Create one worker per batch
workers = [create_worker(f"transform-{i}", "ccx22") for i in range(len(batches))]
# Execute in parallel
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(run_pipeline_on_worker, worker, batch)
for worker, batch in zip(workers, batches)
]
Cost Optimization with Multi-Cloud
Current State
- Only supports Hetzner Cloud
- Provider abstraction exists but not utilized
Enhancement Idea
Check spot prices across multiple providers and use cheapest:
# src/materia/providers/pricing.py
def get_cheapest_provider(worker_type: str) -> str:
prices = {
"hetzner": get_hetzner_spot_price(worker_type),
"ovh": get_ovh_spot_price(worker_type),
"scaleway": get_scaleway_spot_price(worker_type),
}
return min(prices, key=prices.get)
# In supervisor.sh or materia CLI
PROVIDER=$(materia provider cheapest --type ccx22)
materia pipeline run transform --provider $PROVIDER
Observability Enhancements
Current State
- Logs go to systemd journal
- No metrics or alerting
Enhancement Ideas
1. Prometheus Metrics:
# Expose metrics endpoint on supervisor
curl http://supervisor:9090/metrics
# Example metrics:
materia_pipeline_runs_total{pipeline="extract",status="success"} 42
materia_pipeline_duration_seconds{pipeline="transform"} 1234
materia_worker_cost_euros{pipeline="extract"} 0.05
2. Alerting:
# alerts/pipelines.yml
- alert: PipelineFailure
expr: rate(materia_pipeline_runs_total{status="failure"}[1h]) > 0
for: 5m
annotations:
summary: Pipeline {{ $labels.pipeline }} is failing
3. Web Dashboard: Simple web UI showing:
- Current supervisor status
- Recent pipeline runs
- Active workers
- Cost tracking
Self-Healing Capabilities
Enhancement Idea
Supervisor automatically handles common failure modes:
# In supervisor.sh
run_with_retry() {
local pipeline=$1
local max_retries=3
local retry_count=0
while [ $retry_count -lt $max_retries ]; do
if materia pipeline run $pipeline; then
return 0
fi
retry_count=$((retry_count + 1))
log "Pipeline $pipeline failed, retry $retry_count/$max_retries"
# Exponential backoff
sleep $((2 ** retry_count * 60))
done
log_error "Pipeline $pipeline failed after $max_retries retries"
# Send alert
return 1
}
Priority Queuing
Current State
- Pipelines run sequentially based on schedule
Enhancement Idea
Add priority system for pipeline execution:
@dataclass
class PipelineConfig:
priority: int = 0 # Higher = more important
PIPELINES = {
"extract": PipelineConfig(priority=10, ...), # Highest
"transform": PipelineConfig(priority=5, ...),
"export": PipelineConfig(priority=1, ...), # Lowest
}
Supervisor maintains a priority queue and executes highest-priority work first.
Notes
- These are ideas for future consideration, not immediate requirements
- Implement incrementally as needs arise
- Keep simplicity as a guiding principle