Files
beanflows/infra/FUTURE_ENHANCEMENTS.md
Deeman f207fb441d Add supervisor deployment with continuous pipeline orchestration
Implements automated supervisor instance deployment that runs scheduled
pipelines using a TigerBeetle-inspired continuous orchestration pattern.

Infrastructure changes:
- Update Pulumi to use existing R2 buckets (beanflows-artifacts, beanflows-data-prod)
- Rename scheduler → supervisor, optimize to CCX11 (€4/mo)
- Remove always-on worker (workers are now ephemeral only)
- Add artifacts bucket resource for CLI/pipeline packages

Supervisor architecture:
- supervisor.sh: Continuous loop checking schedules every 15 minutes
- Self-updating: Checks for new CLI versions hourly
- Fixed schedules: Extract at 2 AM UTC, Transform at 3 AM UTC
- systemd service for automatic restart on failure
- Logs to systemd journal for observability

CI/CD changes:
- deploy:infra now runs on every master push (not just on changes)
- New deploy:supervisor job:
  * Deploys supervisor.sh and systemd service
  * Installs latest materia CLI from R2
  * Configures environment with Pulumi ESC secrets
  * Restarts supervisor service

Future enhancements documented:
- SQLMesh-aware scheduling (check models before running)
- Model tags for worker sizing (heavy/distributed hints)
- Multi-pipeline support, distributed execution
- Cost optimization with multi-cloud spot pricing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-12 22:23:55 +02:00

7.0 KiB

Future Enhancement Ideas

This document captures ideas for future improvements to the Materia infrastructure and pipeline orchestration.

Smart SQLMesh-Aware Scheduling

Current State

  • Extract pipeline runs on a fixed schedule (daily at 2 AM UTC)
  • Transform pipeline runs on a fixed schedule (daily at 3 AM UTC)
  • No awareness of whether SQLMesh models actually need refreshing

Enhancement Idea

Add materia pipeline check <name> command that uses SQLMesh's internal scheduler to determine if work is needed:

# Check if any models need refresh
materia pipeline check transform

# Returns:
# - Exit code 0 if work needed
# - Exit code 1 if nothing to do
# - Prints list of models that would be updated

Implementation:

# src/materia/pipelines.py
def check_pipeline(pipeline_name: str) -> PipelineCheckResult:
    """Check if pipeline has work to do without creating a worker."""
    if pipeline_name == "transform":
        # Run sqlmesh plan --dry-run locally
        # Parse output to see what models need refresh
        # Return list of models + their tags
        pass

Supervisor Integration:

# In supervisor.sh
if materia pipeline check transform; then
  echo "Transform has work to do, scheduling..."
  materia pipeline run transform
else
  echo "Transform up to date, skipping"
fi

Model Tags for Worker Sizing

Current State

  • All transform pipelines run on the same worker type (ccx22)
  • No way to specify that certain models need more resources

Enhancement Idea

Use SQLMesh model tags to hint at resource requirements:

-- models/serving/obt_commodity_metrics.sql
MODEL (
  name serving.obt_commodity_metrics,
  kind INCREMENTAL_BY_TIME_RANGE,
  cron '@daily',
  tags ['heavy', 'long_running']  -- Supervisor uses this!
)

Pipeline Check Enhancement:

@dataclass
class PipelineCheckResult:
    has_work: bool
    models: list[str]
    tags: set[str]  # Aggregated tags from all models needing refresh

# Check if any heavy models need refresh
result = check_pipeline("transform")
if "heavy" in result.tags:
    worker_type = "ccx32"  # Use bigger worker
else:
    worker_type = "ccx22"  # Default worker

Supervisor Integration:

# In supervisor.sh
RESULT=$(materia pipeline check transform --format json)

if [ "$(echo $RESULT | jq -r '.has_work')" = "true" ]; then
  TAGS=$(echo $RESULT | jq -r '.tags | join(",")')

  if echo "$TAGS" | grep -q "heavy"; then
    WORKER_TYPE="ccx32"
  else
    WORKER_TYPE="ccx22"
  fi

  materia pipeline run transform --worker $WORKER_TYPE
fi

Tag Conventions:

  • heavy: Requires larger worker (ccx32+)
  • long_running: Expected to take >1 hour
  • gpu: Requires GPU instance (future)
  • distributed: Could benefit from multi-node execution (future)

Multi-Pipeline Support

Current State

  • Only extract and transform pipelines
  • Extract doesn't use SQLMesh scheduling

Enhancement Idea

Make extract pipeline also SQLMesh-aware, add more pipeline types:

New Pipelines:

  • dbt: For dbt models (if we add dbt support)
  • ml: For machine learning model training
  • export: For exporting data to external systems

Extract with Scheduling: Instead of fixed schedule, use SQLMesh to track last successful extract:

-- models/raw/psd_alldata.sql
MODEL (
  name raw.psd_alldata,
  kind INCREMENTAL_BY_TIME_RANGE,
  cron '@daily',  -- SQLMesh tracks this!
  tags ['extract']
)

Then supervisor checks if extract needs running based on SQLMesh state.


Distributed Transform Execution

Current State

  • All SQLMesh models run on a single worker
  • Large transforms can be slow

Enhancement Idea

For models tagged distributed, split execution across multiple workers:

# Detect heavy models that would benefit from parallelization
result = check_pipeline("transform")

if "distributed" in result.tags:
    # Split models into batches based on dependency graph
    batches = split_execution_graph(result.models)

    # Create one worker per batch
    workers = [create_worker(f"transform-{i}", "ccx22") for i in range(len(batches))]

    # Execute in parallel
    with ThreadPoolExecutor() as executor:
        futures = [
            executor.submit(run_pipeline_on_worker, worker, batch)
            for worker, batch in zip(workers, batches)
        ]

Cost Optimization with Multi-Cloud

Current State

  • Only supports Hetzner Cloud
  • Provider abstraction exists but not utilized

Enhancement Idea

Check spot prices across multiple providers and use cheapest:

# src/materia/providers/pricing.py
def get_cheapest_provider(worker_type: str) -> str:
    prices = {
        "hetzner": get_hetzner_spot_price(worker_type),
        "ovh": get_ovh_spot_price(worker_type),
        "scaleway": get_scaleway_spot_price(worker_type),
    }
    return min(prices, key=prices.get)

# In supervisor.sh or materia CLI
PROVIDER=$(materia provider cheapest --type ccx22)
materia pipeline run transform --provider $PROVIDER

Observability Enhancements

Current State

  • Logs go to systemd journal
  • No metrics or alerting

Enhancement Ideas

1. Prometheus Metrics:

# Expose metrics endpoint on supervisor
curl http://supervisor:9090/metrics

# Example metrics:
materia_pipeline_runs_total{pipeline="extract",status="success"} 42
materia_pipeline_duration_seconds{pipeline="transform"} 1234
materia_worker_cost_euros{pipeline="extract"} 0.05

2. Alerting:

# alerts/pipelines.yml
- alert: PipelineFailure
  expr: rate(materia_pipeline_runs_total{status="failure"}[1h]) > 0
  for: 5m
  annotations:
    summary: Pipeline {{ $labels.pipeline }} is failing

3. Web Dashboard: Simple web UI showing:

  • Current supervisor status
  • Recent pipeline runs
  • Active workers
  • Cost tracking

Self-Healing Capabilities

Enhancement Idea

Supervisor automatically handles common failure modes:

# In supervisor.sh
run_with_retry() {
  local pipeline=$1
  local max_retries=3
  local retry_count=0

  while [ $retry_count -lt $max_retries ]; do
    if materia pipeline run $pipeline; then
      return 0
    fi

    retry_count=$((retry_count + 1))
    log "Pipeline $pipeline failed, retry $retry_count/$max_retries"

    # Exponential backoff
    sleep $((2 ** retry_count * 60))
  done

  log_error "Pipeline $pipeline failed after $max_retries retries"
  # Send alert
  return 1
}

Priority Queuing

Current State

  • Pipelines run sequentially based on schedule

Enhancement Idea

Add priority system for pipeline execution:

@dataclass
class PipelineConfig:
    priority: int = 0  # Higher = more important

PIPELINES = {
    "extract": PipelineConfig(priority=10, ...),  # Highest
    "transform": PipelineConfig(priority=5, ...),
    "export": PipelineConfig(priority=1, ...),    # Lowest
}

Supervisor maintains a priority queue and executes highest-priority work first.


Notes

  • These are ideas for future consideration, not immediate requirements
  • Implement incrementally as needs arise
  • Keep simplicity as a guiding principle