cleanup and prefect service setup
This commit is contained in:
295
infra/future_enhancements.md
Normal file
295
infra/future_enhancements.md
Normal file
@@ -0,0 +1,295 @@
|
||||
# Future Enhancement Ideas
|
||||
|
||||
This document captures ideas for future improvements to the Materia infrastructure and pipeline orchestration.
|
||||
|
||||
## Smart SQLMesh-Aware Scheduling
|
||||
|
||||
### Current State
|
||||
- Extract pipeline runs on a fixed schedule (daily at 2 AM UTC)
|
||||
- Transform pipeline runs on a fixed schedule (daily at 3 AM UTC)
|
||||
- No awareness of whether SQLMesh models actually need refreshing
|
||||
|
||||
### Enhancement Idea
|
||||
Add `materia pipeline check <name>` command that uses SQLMesh's internal scheduler to determine if work is needed:
|
||||
|
||||
```bash
|
||||
# Check if any models need refresh
|
||||
materia pipeline check transform
|
||||
|
||||
# Returns:
|
||||
# - Exit code 0 if work needed
|
||||
# - Exit code 1 if nothing to do
|
||||
# - Prints list of models that would be updated
|
||||
```
|
||||
|
||||
**Implementation:**
|
||||
```python
|
||||
# src/materia/pipelines.py
|
||||
def check_pipeline(pipeline_name: str) -> PipelineCheckResult:
|
||||
"""Check if pipeline has work to do without creating a worker."""
|
||||
if pipeline_name == "transform":
|
||||
# Run sqlmesh plan --dry-run locally
|
||||
# Parse output to see what models need refresh
|
||||
# Return list of models + their tags
|
||||
pass
|
||||
```
|
||||
|
||||
**Supervisor Integration:**
|
||||
```bash
|
||||
# In supervisor.sh
|
||||
if materia pipeline check transform; then
|
||||
echo "Transform has work to do, scheduling..."
|
||||
materia pipeline run transform
|
||||
else
|
||||
echo "Transform up to date, skipping"
|
||||
fi
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Model Tags for Worker Sizing
|
||||
|
||||
### Current State
|
||||
- All transform pipelines run on the same worker type (ccx22)
|
||||
- No way to specify that certain models need more resources
|
||||
|
||||
### Enhancement Idea
|
||||
Use SQLMesh model tags to hint at resource requirements:
|
||||
|
||||
```sql
|
||||
-- models/serving/obt_commodity_metrics.sql
|
||||
MODEL (
|
||||
name serving.obt_commodity_metrics,
|
||||
kind INCREMENTAL_BY_TIME_RANGE,
|
||||
cron '@daily',
|
||||
tags ['heavy', 'long_running'] -- Supervisor uses this!
|
||||
)
|
||||
```
|
||||
|
||||
**Pipeline Check Enhancement:**
|
||||
```python
|
||||
@dataclass
|
||||
class PipelineCheckResult:
|
||||
has_work: bool
|
||||
models: list[str]
|
||||
tags: set[str] # Aggregated tags from all models needing refresh
|
||||
|
||||
# Check if any heavy models need refresh
|
||||
result = check_pipeline("transform")
|
||||
if "heavy" in result.tags:
|
||||
worker_type = "ccx32" # Use bigger worker
|
||||
else:
|
||||
worker_type = "ccx22" # Default worker
|
||||
```
|
||||
|
||||
**Supervisor Integration:**
|
||||
```bash
|
||||
# In supervisor.sh
|
||||
RESULT=$(materia pipeline check transform --format json)
|
||||
|
||||
if [ "$(echo $RESULT | jq -r '.has_work')" = "true" ]; then
|
||||
TAGS=$(echo $RESULT | jq -r '.tags | join(",")')
|
||||
|
||||
if echo "$TAGS" | grep -q "heavy"; then
|
||||
WORKER_TYPE="ccx32"
|
||||
else
|
||||
WORKER_TYPE="ccx22"
|
||||
fi
|
||||
|
||||
materia pipeline run transform --worker $WORKER_TYPE
|
||||
fi
|
||||
```
|
||||
|
||||
**Tag Conventions:**
|
||||
- `heavy`: Requires larger worker (ccx32+)
|
||||
- `long_running`: Expected to take >1 hour
|
||||
- `gpu`: Requires GPU instance (future)
|
||||
- `distributed`: Could benefit from multi-node execution (future)
|
||||
|
||||
---
|
||||
|
||||
## Multi-Pipeline Support
|
||||
|
||||
### Current State
|
||||
- Only extract and transform pipelines
|
||||
- Extract doesn't use SQLMesh scheduling
|
||||
|
||||
### Enhancement Idea
|
||||
Make extract pipeline also SQLMesh-aware, add more pipeline types:
|
||||
|
||||
**New Pipelines:**
|
||||
- `dbt`: For dbt models (if we add dbt support)
|
||||
- `ml`: For machine learning model training
|
||||
- `export`: For exporting data to external systems
|
||||
|
||||
**Extract with Scheduling:**
|
||||
Instead of fixed schedule, use SQLMesh to track last successful extract:
|
||||
|
||||
```sql
|
||||
-- models/raw/psd_alldata.sql
|
||||
MODEL (
|
||||
name raw.psd_alldata,
|
||||
kind INCREMENTAL_BY_TIME_RANGE,
|
||||
cron '@daily', -- SQLMesh tracks this!
|
||||
tags ['extract']
|
||||
)
|
||||
```
|
||||
|
||||
Then supervisor checks if extract needs running based on SQLMesh state.
|
||||
|
||||
---
|
||||
|
||||
## Distributed Transform Execution
|
||||
|
||||
### Current State
|
||||
- All SQLMesh models run on a single worker
|
||||
- Large transforms can be slow
|
||||
|
||||
### Enhancement Idea
|
||||
For models tagged `distributed`, split execution across multiple workers:
|
||||
|
||||
```python
|
||||
# Detect heavy models that would benefit from parallelization
|
||||
result = check_pipeline("transform")
|
||||
|
||||
if "distributed" in result.tags:
|
||||
# Split models into batches based on dependency graph
|
||||
batches = split_execution_graph(result.models)
|
||||
|
||||
# Create one worker per batch
|
||||
workers = [create_worker(f"transform-{i}", "ccx22") for i in range(len(batches))]
|
||||
|
||||
# Execute in parallel
|
||||
with ThreadPoolExecutor() as executor:
|
||||
futures = [
|
||||
executor.submit(run_pipeline_on_worker, worker, batch)
|
||||
for worker, batch in zip(workers, batches)
|
||||
]
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Cost Optimization with Multi-Cloud
|
||||
|
||||
### Current State
|
||||
- Only supports Hetzner Cloud
|
||||
- Provider abstraction exists but not utilized
|
||||
|
||||
### Enhancement Idea
|
||||
Check spot prices across multiple providers and use cheapest:
|
||||
|
||||
```python
|
||||
# src/materia/providers/pricing.py
|
||||
def get_cheapest_provider(worker_type: str) -> str:
|
||||
prices = {
|
||||
"hetzner": get_hetzner_spot_price(worker_type),
|
||||
"ovh": get_ovh_spot_price(worker_type),
|
||||
"scaleway": get_scaleway_spot_price(worker_type),
|
||||
}
|
||||
return min(prices, key=prices.get)
|
||||
|
||||
# In supervisor.sh or materia CLI
|
||||
PROVIDER=$(materia provider cheapest --type ccx22)
|
||||
materia pipeline run transform --provider $PROVIDER
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Observability Enhancements
|
||||
|
||||
### Current State
|
||||
- Logs go to systemd journal
|
||||
- No metrics or alerting
|
||||
|
||||
### Enhancement Ideas
|
||||
|
||||
**1. Prometheus Metrics:**
|
||||
```bash
|
||||
# Expose metrics endpoint on supervisor
|
||||
curl http://supervisor:9090/metrics
|
||||
|
||||
# Example metrics:
|
||||
materia_pipeline_runs_total{pipeline="extract",status="success"} 42
|
||||
materia_pipeline_duration_seconds{pipeline="transform"} 1234
|
||||
materia_worker_cost_euros{pipeline="extract"} 0.05
|
||||
```
|
||||
|
||||
**2. Alerting:**
|
||||
```yaml
|
||||
# alerts/pipelines.yml
|
||||
- alert: PipelineFailure
|
||||
expr: rate(materia_pipeline_runs_total{status="failure"}[1h]) > 0
|
||||
for: 5m
|
||||
annotations:
|
||||
summary: Pipeline {{ $labels.pipeline }} is failing
|
||||
```
|
||||
|
||||
**3. Web Dashboard:**
|
||||
Simple web UI showing:
|
||||
- Current supervisor status
|
||||
- Recent pipeline runs
|
||||
- Active workers
|
||||
- Cost tracking
|
||||
|
||||
---
|
||||
|
||||
## Self-Healing Capabilities
|
||||
|
||||
### Enhancement Idea
|
||||
Supervisor automatically handles common failure modes:
|
||||
|
||||
```bash
|
||||
# In supervisor.sh
|
||||
run_with_retry() {
|
||||
local pipeline=$1
|
||||
local max_retries=3
|
||||
local retry_count=0
|
||||
|
||||
while [ $retry_count -lt $max_retries ]; do
|
||||
if materia pipeline run $pipeline; then
|
||||
return 0
|
||||
fi
|
||||
|
||||
retry_count=$((retry_count + 1))
|
||||
log "Pipeline $pipeline failed, retry $retry_count/$max_retries"
|
||||
|
||||
# Exponential backoff
|
||||
sleep $((2 ** retry_count * 60))
|
||||
done
|
||||
|
||||
log_error "Pipeline $pipeline failed after $max_retries retries"
|
||||
# Send alert
|
||||
return 1
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Priority Queuing
|
||||
|
||||
### Current State
|
||||
- Pipelines run sequentially based on schedule
|
||||
|
||||
### Enhancement Idea
|
||||
Add priority system for pipeline execution:
|
||||
|
||||
```python
|
||||
@dataclass
|
||||
class PipelineConfig:
|
||||
priority: int = 0 # Higher = more important
|
||||
|
||||
PIPELINES = {
|
||||
"extract": PipelineConfig(priority=10, ...), # Highest
|
||||
"transform": PipelineConfig(priority=5, ...),
|
||||
"export": PipelineConfig(priority=1, ...), # Lowest
|
||||
}
|
||||
```
|
||||
|
||||
Supervisor maintains a priority queue and executes highest-priority work first.
|
||||
|
||||
---
|
||||
|
||||
## Notes
|
||||
- These are ideas for future consideration, not immediate requirements
|
||||
- Implement incrementally as needs arise
|
||||
- Keep simplicity as a guiding principle
|
||||
Reference in New Issue
Block a user