# Future Enhancement Ideas This document captures ideas for future improvements to the Materia infrastructure and pipeline orchestration. ## Smart SQLMesh-Aware Scheduling ### Current State - Extract pipeline runs on a fixed schedule (daily at 2 AM UTC) - Transform pipeline runs on a fixed schedule (daily at 3 AM UTC) - No awareness of whether SQLMesh models actually need refreshing ### Enhancement Idea Add `materia pipeline check ` command that uses SQLMesh's internal scheduler to determine if work is needed: ```bash # Check if any models need refresh materia pipeline check transform # Returns: # - Exit code 0 if work needed # - Exit code 1 if nothing to do # - Prints list of models that would be updated ``` **Implementation:** ```python # src/materia/pipelines.py def check_pipeline(pipeline_name: str) -> PipelineCheckResult: """Check if pipeline has work to do without creating a worker.""" if pipeline_name == "transform": # Run sqlmesh plan --dry-run locally # Parse output to see what models need refresh # Return list of models + their tags pass ``` **Supervisor Integration:** ```bash # In supervisor.sh if materia pipeline check transform; then echo "Transform has work to do, scheduling..." materia pipeline run transform else echo "Transform up to date, skipping" fi ``` --- ## Model Tags for Worker Sizing ### Current State - All transform pipelines run on the same worker type (ccx22) - No way to specify that certain models need more resources ### Enhancement Idea Use SQLMesh model tags to hint at resource requirements: ```sql -- models/serving/obt_commodity_metrics.sql MODEL ( name serving.obt_commodity_metrics, kind INCREMENTAL_BY_TIME_RANGE, cron '@daily', tags ['heavy', 'long_running'] -- Supervisor uses this! ) ``` **Pipeline Check Enhancement:** ```python @dataclass class PipelineCheckResult: has_work: bool models: list[str] tags: set[str] # Aggregated tags from all models needing refresh # Check if any heavy models need refresh result = check_pipeline("transform") if "heavy" in result.tags: worker_type = "ccx32" # Use bigger worker else: worker_type = "ccx22" # Default worker ``` **Supervisor Integration:** ```bash # In supervisor.sh RESULT=$(materia pipeline check transform --format json) if [ "$(echo $RESULT | jq -r '.has_work')" = "true" ]; then TAGS=$(echo $RESULT | jq -r '.tags | join(",")') if echo "$TAGS" | grep -q "heavy"; then WORKER_TYPE="ccx32" else WORKER_TYPE="ccx22" fi materia pipeline run transform --worker $WORKER_TYPE fi ``` **Tag Conventions:** - `heavy`: Requires larger worker (ccx32+) - `long_running`: Expected to take >1 hour - `gpu`: Requires GPU instance (future) - `distributed`: Could benefit from multi-node execution (future) --- ## Multi-Pipeline Support ### Current State - Only extract and transform pipelines - Extract doesn't use SQLMesh scheduling ### Enhancement Idea Make extract pipeline also SQLMesh-aware, add more pipeline types: **New Pipelines:** - `dbt`: For dbt models (if we add dbt support) - `ml`: For machine learning model training - `export`: For exporting data to external systems **Extract with Scheduling:** Instead of fixed schedule, use SQLMesh to track last successful extract: ```sql -- models/raw/psd_alldata.sql MODEL ( name raw.psd_alldata, kind INCREMENTAL_BY_TIME_RANGE, cron '@daily', -- SQLMesh tracks this! tags ['extract'] ) ``` Then supervisor checks if extract needs running based on SQLMesh state. --- ## Distributed Transform Execution ### Current State - All SQLMesh models run on a single worker - Large transforms can be slow ### Enhancement Idea For models tagged `distributed`, split execution across multiple workers: ```python # Detect heavy models that would benefit from parallelization result = check_pipeline("transform") if "distributed" in result.tags: # Split models into batches based on dependency graph batches = split_execution_graph(result.models) # Create one worker per batch workers = [create_worker(f"transform-{i}", "ccx22") for i in range(len(batches))] # Execute in parallel with ThreadPoolExecutor() as executor: futures = [ executor.submit(run_pipeline_on_worker, worker, batch) for worker, batch in zip(workers, batches) ] ``` --- ## Cost Optimization with Multi-Cloud ### Current State - Only supports Hetzner Cloud - Provider abstraction exists but not utilized ### Enhancement Idea Check spot prices across multiple providers and use cheapest: ```python # src/materia/providers/pricing.py def get_cheapest_provider(worker_type: str) -> str: prices = { "hetzner": get_hetzner_spot_price(worker_type), "ovh": get_ovh_spot_price(worker_type), "scaleway": get_scaleway_spot_price(worker_type), } return min(prices, key=prices.get) # In supervisor.sh or materia CLI PROVIDER=$(materia provider cheapest --type ccx22) materia pipeline run transform --provider $PROVIDER ``` --- ## Observability Enhancements ### Current State - Logs go to systemd journal - No metrics or alerting ### Enhancement Ideas **1. Prometheus Metrics:** ```bash # Expose metrics endpoint on supervisor curl http://supervisor:9090/metrics # Example metrics: materia_pipeline_runs_total{pipeline="extract",status="success"} 42 materia_pipeline_duration_seconds{pipeline="transform"} 1234 materia_worker_cost_euros{pipeline="extract"} 0.05 ``` **2. Alerting:** ```yaml # alerts/pipelines.yml - alert: PipelineFailure expr: rate(materia_pipeline_runs_total{status="failure"}[1h]) > 0 for: 5m annotations: summary: Pipeline {{ $labels.pipeline }} is failing ``` **3. Web Dashboard:** Simple web UI showing: - Current supervisor status - Recent pipeline runs - Active workers - Cost tracking --- ## Self-Healing Capabilities ### Enhancement Idea Supervisor automatically handles common failure modes: ```bash # In supervisor.sh run_with_retry() { local pipeline=$1 local max_retries=3 local retry_count=0 while [ $retry_count -lt $max_retries ]; do if materia pipeline run $pipeline; then return 0 fi retry_count=$((retry_count + 1)) log "Pipeline $pipeline failed, retry $retry_count/$max_retries" # Exponential backoff sleep $((2 ** retry_count * 60)) done log_error "Pipeline $pipeline failed after $max_retries retries" # Send alert return 1 } ``` --- ## Priority Queuing ### Current State - Pipelines run sequentially based on schedule ### Enhancement Idea Add priority system for pipeline execution: ```python @dataclass class PipelineConfig: priority: int = 0 # Higher = more important PIPELINES = { "extract": PipelineConfig(priority=10, ...), # Highest "transform": PipelineConfig(priority=5, ...), "export": PipelineConfig(priority=1, ...), # Lowest } ``` Supervisor maintains a priority queue and executes highest-priority work first. --- ## Notes - These are ideas for future consideration, not immediate requirements - Implement incrementally as needs arise - Keep simplicity as a guiding principle