Merge branch 'feature/supervisor-deployment' into 'master'

Add supervisor deployment with continuous pipeline orchestration

See merge request deemanone/materia!7
This commit is contained in:
Hendrik Dreesmann
2025-10-13 21:51:05 +02:00
12 changed files with 766 additions and 256 deletions

View File

@@ -3,7 +3,6 @@ image: python:3.13
stages: stages:
- lint - lint
- test - test
- build
- deploy - deploy
variables: variables:
@@ -31,7 +30,6 @@ lint:
script: script:
- uv sync - uv sync
- uv run ruff check . - uv run ruff check .
- uv run ruff format --check .
test:cli: test:cli:
stage: test stage: test
@@ -55,83 +53,6 @@ test:sqlmesh:
- uv sync - uv sync
- cd transform/sqlmesh_materia && uv run sqlmesh test - cd transform/sqlmesh_materia && uv run sqlmesh test
build:extract:
stage: build
before_script:
- *uv_setup
script:
- uv sync
- mkdir -p dist
- uv build --package psdonline --out-dir dist/extract
- cd dist/extract && tar -czf ../materia-extract-latest.tar.gz .
artifacts:
paths:
- dist/materia-extract-latest.tar.gz
expire_in: 1 week
rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
build:transform:
stage: build
before_script:
- *uv_setup
script:
- uv sync
- mkdir -p dist
- uv build --package sqlmesh_materia --out-dir dist/transform
- cd dist/transform && tar -czf ../materia-transform-latest.tar.gz .
artifacts:
paths:
- dist/materia-transform-latest.tar.gz
expire_in: 1 week
rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
build:cli:
stage: build
before_script:
- *uv_setup
script:
- uv sync
- mkdir -p dist
- uv build --out-dir dist/cli
- cd dist/cli && tar -czf ../materia-cli-latest.tar.gz .
artifacts:
paths:
- dist/materia-cli-latest.tar.gz
expire_in: 1 week
rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
deploy:r2:
stage: deploy
image: rclone/rclone:latest
before_script:
- apk add --no-cache curl unzip
- curl -fsSL https://get.pulumi.com/esc/install.sh | sh
- export PATH="$HOME/.pulumi/bin:$PATH"
- esc login --token ${PULUMI_ACCESS_TOKEN}
- eval $(esc env open beanflows/prod --format shell)
- |
mkdir -p ~/.config/rclone
cat > ~/.config/rclone/rclone.conf <<EOF
[r2]
type = s3
provider = Cloudflare
access_key_id = ${R2_ACCESS_KEY_ID}
secret_access_key = ${R2_SECRET_ACCESS_KEY}
endpoint = https://${R2_ENDPOINT}
acl = private
EOF
script:
- rclone copy dist/*.tar.gz r2:${R2_ARTIFACTS_BUCKET}/ -v
dependencies:
- build:extract
- build:transform
- build:cli
rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
deploy:infra: deploy:infra:
stage: deploy stage: deploy
image: pulumi/pulumi:latest image: pulumi/pulumi:latest
@@ -143,5 +64,65 @@ deploy:infra:
- pulumi up --yes - pulumi up --yes
rules: rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
changes:
- infra/**/* deploy:supervisor:
stage: deploy
image: alpine:latest
before_script:
- apk add --no-cache openssh-client curl bash
- curl -fsSL https://get.pulumi.com/esc/install.sh | sh
- export PATH="$HOME/.pulumi/bin:$PATH"
- esc login --token ${PULUMI_ACCESS_TOKEN}
- eval $(esc env open beanflows/prod --format shell)
# Install Pulumi CLI to get stack outputs
- |
apk add --no-cache pulumi-bin || {
curl -fsSL https://get.pulumi.com/install.sh | sh
export PATH="$HOME/.pulumi/bin:$PATH"
}
- pulumi login --token ${PULUMI_ACCESS_TOKEN}
script:
- |
# Get supervisor IP from Pulumi
cd infra
SUPERVISOR_IP=$(pulumi stack output supervisor_ip -s prod)
cd ..
# Check if supervisor exists
if [ -z "$SUPERVISOR_IP" ] || [ "$SUPERVISOR_IP" = "null" ]; then
echo "No supervisor instance found. Run 'pulumi up' first."
exit 1
fi
echo "Connecting to supervisor at ${SUPERVISOR_IP}..."
# Setup SSH
mkdir -p ~/.ssh
echo "$SSH_PRIVATE_KEY" > ~/.ssh/id_rsa
chmod 600 ~/.ssh/id_rsa
ssh-keyscan -H $SUPERVISOR_IP >> ~/.ssh/known_hosts
# Wait for SSH to be ready (new instance may take a moment)
echo "Waiting for SSH to be ready..."
for i in $(seq 1 30); do
if ssh -o ConnectTimeout=5 root@${SUPERVISOR_IP} "echo 'SSH ready'"; then
break
fi
echo "Attempt $i/30 failed, retrying..."
sleep 10
done
# Check if supervisor is bootstrapped
if ssh root@${SUPERVISOR_IP} "test -d /opt/materia/.git"; then
echo "Supervisor already bootstrapped and will auto-update"
ssh root@${SUPERVISOR_IP} "systemctl status materia-supervisor --no-pager"
else
echo "Bootstrapping supervisor for the first time..."
# Export secrets and run bootstrap
ssh root@${SUPERVISOR_IP} "export PULUMI_ACCESS_TOKEN='${PULUMI_ACCESS_TOKEN}' GITLAB_READ_TOKEN='${GITLAB_READ_TOKEN}' && bash -s" < infra/bootstrap_supervisor.sh
echo "Bootstrap complete!"
fi
dependencies:
- deploy:infra
rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH

127
CLAUDE.md
View File

@@ -55,8 +55,11 @@ SQLMesh project implementing a layered data architecture.
```bash ```bash
cd transform/sqlmesh_materia cd transform/sqlmesh_materia
# Plan changes (no prompts, auto-apply enabled in config) # Local development (creates virtual environment)
sqlmesh plan sqlmesh plan dev_<username>
# Production
sqlmesh plan prod
# Run tests # Run tests
sqlmesh test sqlmesh test
@@ -76,10 +79,17 @@ sqlmesh ui
**Configuration:** **Configuration:**
- Config: `transform/sqlmesh_materia/config.yaml` - Config: `transform/sqlmesh_materia/config.yaml`
- Default gateway: `dev` (uses `materia_dev.db`) - Single gateway: `prod` (connects to R2 Iceberg catalog)
- Production gateway: `prod` (uses `materia_prod.db`) - Uses virtual environments for dev isolation (e.g., `dev_deeman`)
- Production uses `prod` environment
- Auto-apply enabled, no interactive prompts - Auto-apply enabled, no interactive prompts
- DuckDB extensions: zipfs, httpfs, iceberg - DuckDB extensions: httpfs, iceberg
**Environment Strategy:**
- All environments connect to the same R2 Iceberg catalog
- Dev environments (e.g., `dev_deeman`) are isolated virtual environments
- SQLMesh manages environment isolation and table versioning
- No local DuckDB files needed
### 3. Core Package (`src/materia/`) ### 3. Core Package (`src/materia/`)
Currently minimal; main logic resides in workspace packages. Currently minimal; main logic resides in workspace packages.
@@ -168,11 +178,11 @@ pytest --cov=./ --cov-report=xml
### CI/CD Pipeline (`.gitlab-ci.yml`) ### CI/CD Pipeline (`.gitlab-ci.yml`)
**4 Stages: Lint → Test → Build → Deploy** **3 Stages: Lint → Test → Deploy**
#### 1. Lint Stage #### 1. Lint Stage
- Runs `ruff check` and `ruff format --check` - Runs `ruff check` on every commit
- Validates code quality on every commit - Validates code quality
#### 2. Test Stage #### 2. Test Stage
- **`test:cli`**: Runs pytest on materia CLI with 71% coverage - **`test:cli`**: Runs pytest on materia CLI with 71% coverage
@@ -182,41 +192,51 @@ pytest --cov=./ --cov-report=xml
- Exports coverage reports to GitLab - Exports coverage reports to GitLab
- **`test:sqlmesh`**: Runs SQLMesh model tests in transform layer - **`test:sqlmesh`**: Runs SQLMesh model tests in transform layer
#### 3. Build Stage (only on master branch) #### 3. Deploy Stage (only on master branch)
Creates separate artifacts for each workspace package: - **`deploy:infra`**: Runs `pulumi up` to ensure supervisor instance exists
- **`build:extract`**: Builds `materia-extract-latest.tar.gz` (psdonline package) - Runs on every master push
- **`build:transform`**: Builds `materia-transform-latest.tar.gz` (sqlmesh_materia package) - Creates/updates Hetzner CPX11 supervisor instance (~€4.49/mo)
- **`build:cli`**: Builds `materia-cli-latest.tar.gz` (materia management CLI) - Uses Pulumi ESC (`beanflows/prod`) for all secrets
- **`deploy:supervisor`**: Checks supervisor status
- Verifies supervisor is bootstrapped
- Supervisor auto-updates via `git pull` every 15 minutes (no CI/CD deployment needed)
Each artifact is a self-contained tarball with all dependencies. **Note:** No build artifacts! Supervisor pulls code directly from git and runs via `uv`.
#### 4. Deploy Stage (only on master branch) ### Production Architecture: Git-Based Deployment with Ephemeral Workers
- **`deploy:r2`**: Uploads artifacts to Cloudflare R2 using rclone
- Loads secrets from Pulumi ESC (`beanflows/prod`)
- Only requires `PULUMI_ACCESS_TOKEN` in GitLab variables
- All other secrets (R2 credentials, SSH keys, API tokens) come from ESC
- **`deploy:infra`**: Runs `pulumi up` to deploy infrastructure changes
- Only triggers when `infra/**/*` files change
### Production Architecture: Ephemeral Worker Model
**Design Philosophy:** **Design Philosophy:**
- No always-on workers (cost optimization) - No always-on workers (cost optimization)
- Supervisor instance dynamically creates/destroys workers on-demand - Supervisor pulls latest code from git (no artifact builds)
- Language-agnostic artifacts enable future migration to C/Rust/Go - Supervisor dynamically creates/destroys workers on-demand
- Simple, inspectable, easy to test locally
- Multi-cloud abstraction for pricing optimization - Multi-cloud abstraction for pricing optimization
**Components:** **Components:**
#### 1. Supervisor Instance (Small Hetzner VM) #### 1. Supervisor Instance (Small Hetzner VM)
- Runs the `materia` management CLI - Runs `supervisor.sh` - continuous orchestration loop (inspired by TigerBeetle's CFO supervisor)
- Small, always-on instance (cheap) - Hetzner CPX11: 2 vCPU (shared), 2GB RAM (~€4.49/mo)
- Always-on, minimal resource usage
- Git-based deployment: `git pull` every 15 minutes for auto-updates
- Runs pipelines on schedule:
- Extract: Daily at 2 AM UTC
- Transform: Daily at 3 AM UTC
- Uses systemd service for automatic restart on failure
- Pulls secrets from Pulumi ESC - Pulls secrets from Pulumi ESC
- Orchestrates worker lifecycle via cloud provider APIs
**Bootstrap (one-time):**
```bash
# Get supervisor IP from Pulumi
cd infra && pulumi stack output supervisor_ip -s prod
# Run bootstrap script
export PULUMI_ACCESS_TOKEN=<your-token>
ssh root@<supervisor-ip> 'bash -s' < infra/bootstrap_supervisor.sh
```
#### 2. Ephemeral Workers (On-Demand) #### 2. Ephemeral Workers (On-Demand)
- Created for each pipeline execution - Created for each pipeline execution by materia CLI
- Downloads pre-built artifacts from R2 (no git, no uv on worker)
- Receives secrets via SSH environment variable injection - Receives secrets via SSH environment variable injection
- Destroyed immediately after job completion - Destroyed immediately after job completion
- Different instance types per pipeline: - Different instance types per pipeline:
@@ -227,34 +247,39 @@ Each artifact is a self-contained tarball with all dependencies.
``` ```
Pulumi ESC (beanflows/prod) Pulumi ESC (beanflows/prod)
Supervisor Instance (materia CLI) Supervisor Instance (via esc CLI)
Workers (injected as env vars via SSH) Workers (injected as env vars via SSH)
``` ```
#### 4. Artifact Flow #### 4. Code Deployment Flow
``` ```
GitLab CI: uv build → tar.gz GitLab (master branch)
Cloudflare R2 (artifact storage) Supervisor: git pull origin master (every 15 min)
Worker: curl → extract → execute Supervisor: uv sync (update dependencies)
Supervisor: uv run materia pipeline run <pipeline>
``` ```
#### 5. Data Storage #### 5. Data Storage
- **Dev**: Local DuckDB file (`materia_dev.db`) - **All environments**: DuckDB in-memory + Cloudflare R2 Data Catalog (Iceberg REST API)
- **Prod**: DuckDB in-memory + Cloudflare R2 Data Catalog (Iceberg REST API)
- ACID transactions on object storage - ACID transactions on object storage
- No persistent database on workers - No persistent database on workers
- Virtual environments for dev isolation (e.g., `dev_deeman`)
**Execution Flow:** **Execution Flow:**
1. Supervisor receives schedule trigger (cron/manual) 1. Supervisor loop wakes up every 15 minutes
2. CLI runs: `materia pipeline run extract` 2. Runs `git fetch` and checks if new commits on master
3. Creates Hetzner worker with SSH key 3. If updates available: `git pull && uv sync`
4. Worker downloads `materia-extract-latest.tar.gz` from R2 4. Checks if current time matches pipeline schedule (e.g., 2 AM for extract)
5. CLI injects secrets via SSH: `export R2_ACCESS_KEY_ID=... && ./extract_psd` 5. If scheduled: `uv run materia pipeline run extract`
6. Pipeline executes, writes to R2 Iceberg catalog 6. CLI creates Hetzner worker with SSH key
7. Worker destroyed (entire lifecycle ~5-10 minutes) 7. CLI injects secrets via SSH and executes pipeline
8. Pipeline executes, writes to R2 Iceberg catalog
9. Worker destroyed (entire lifecycle ~5-10 minutes)
10. Supervisor logs results and continues loop
**Multi-Cloud Provider Abstraction:** **Multi-Cloud Provider Abstraction:**
- Protocol-based interface (data-oriented design, no OOP) - Protocol-based interface (data-oriented design, no OOP)
@@ -284,11 +309,15 @@ Worker: curl → extract → execute
- Leverage SQLMesh's built-in time macros (`@start_ds`, `@end_ds`) - Leverage SQLMesh's built-in time macros (`@start_ds`, `@end_ds`)
- Keep raw layer thin, push transformations to staging+ - Keep raw layer thin, push transformations to staging+
## Database Location ## Data Storage
- **Dev database:** `materia_dev.db` (13GB, in project root) All data is stored in Cloudflare R2 Data Catalog (Apache Iceberg) via REST API:
- **Prod database:** `materia_prod.db` (not yet created) - **Production environment:** `prod`
- **Dev environments:** `dev_<username>` (virtual environments)
Note: The dev database is large and should not be committed to git (.gitignore already configured). - SQLMesh manages environment isolation and table versioning
- No local database files needed
- We use a monorepo with uv workspaces - We use a monorepo with uv workspaces
- The pulumi env is called beanflows/prod - The pulumi env is called beanflows/prod
- NEVER hardcode secrets in plaintext
- Never add ssh keys to the git repo!
- If there is a simpler more direct solution and there is no other tradeoff, always choose the simpler solution

View File

@@ -1,7 +0,0 @@
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
QyNTUxOQAAACCfGESotAKXA3uc2Mu90jYfpbwqZyRF+VytareVIN3PkgAAAJjG2ri3xtq4
twAAAAtzc2gtZWQyNTUxOQAAACCfGESotAKXA3uc2Mu90jYfpbwqZyRF+VytareVIN3Pkg
AAAECiPTY1dlijk3nvQcqZckzW2RddBhlqRTp4CMqrqj4oLJ8YRKi0ApcDe5zYy73SNh+l
vCpnJEX5XK1qt5Ug3c+SAAAAD2RlZW1hbkBEZWVtYW5QQwECAwQFBg==
-----END OPENSSH PRIVATE KEY-----

View File

@@ -1 +0,0 @@
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIJ8YRKi0ApcDe5zYy73SNh+lvCpnJEX5XK1qt5Ug3c+S deeman@DeemanPC

View File

@@ -0,0 +1,295 @@
# Future Enhancement Ideas
This document captures ideas for future improvements to the Materia infrastructure and pipeline orchestration.
## Smart SQLMesh-Aware Scheduling
### Current State
- Extract pipeline runs on a fixed schedule (daily at 2 AM UTC)
- Transform pipeline runs on a fixed schedule (daily at 3 AM UTC)
- No awareness of whether SQLMesh models actually need refreshing
### Enhancement Idea
Add `materia pipeline check <name>` command that uses SQLMesh's internal scheduler to determine if work is needed:
```bash
# Check if any models need refresh
materia pipeline check transform
# Returns:
# - Exit code 0 if work needed
# - Exit code 1 if nothing to do
# - Prints list of models that would be updated
```
**Implementation:**
```python
# src/materia/pipelines.py
def check_pipeline(pipeline_name: str) -> PipelineCheckResult:
"""Check if pipeline has work to do without creating a worker."""
if pipeline_name == "transform":
# Run sqlmesh plan --dry-run locally
# Parse output to see what models need refresh
# Return list of models + their tags
pass
```
**Supervisor Integration:**
```bash
# In supervisor.sh
if materia pipeline check transform; then
echo "Transform has work to do, scheduling..."
materia pipeline run transform
else
echo "Transform up to date, skipping"
fi
```
---
## Model Tags for Worker Sizing
### Current State
- All transform pipelines run on the same worker type (ccx22)
- No way to specify that certain models need more resources
### Enhancement Idea
Use SQLMesh model tags to hint at resource requirements:
```sql
-- models/serving/obt_commodity_metrics.sql
MODEL (
name serving.obt_commodity_metrics,
kind INCREMENTAL_BY_TIME_RANGE,
cron '@daily',
tags ['heavy', 'long_running'] -- Supervisor uses this!
)
```
**Pipeline Check Enhancement:**
```python
@dataclass
class PipelineCheckResult:
has_work: bool
models: list[str]
tags: set[str] # Aggregated tags from all models needing refresh
# Check if any heavy models need refresh
result = check_pipeline("transform")
if "heavy" in result.tags:
worker_type = "ccx32" # Use bigger worker
else:
worker_type = "ccx22" # Default worker
```
**Supervisor Integration:**
```bash
# In supervisor.sh
RESULT=$(materia pipeline check transform --format json)
if [ "$(echo $RESULT | jq -r '.has_work')" = "true" ]; then
TAGS=$(echo $RESULT | jq -r '.tags | join(",")')
if echo "$TAGS" | grep -q "heavy"; then
WORKER_TYPE="ccx32"
else
WORKER_TYPE="ccx22"
fi
materia pipeline run transform --worker $WORKER_TYPE
fi
```
**Tag Conventions:**
- `heavy`: Requires larger worker (ccx32+)
- `long_running`: Expected to take >1 hour
- `gpu`: Requires GPU instance (future)
- `distributed`: Could benefit from multi-node execution (future)
---
## Multi-Pipeline Support
### Current State
- Only extract and transform pipelines
- Extract doesn't use SQLMesh scheduling
### Enhancement Idea
Make extract pipeline also SQLMesh-aware, add more pipeline types:
**New Pipelines:**
- `dbt`: For dbt models (if we add dbt support)
- `ml`: For machine learning model training
- `export`: For exporting data to external systems
**Extract with Scheduling:**
Instead of fixed schedule, use SQLMesh to track last successful extract:
```sql
-- models/raw/psd_alldata.sql
MODEL (
name raw.psd_alldata,
kind INCREMENTAL_BY_TIME_RANGE,
cron '@daily', -- SQLMesh tracks this!
tags ['extract']
)
```
Then supervisor checks if extract needs running based on SQLMesh state.
---
## Distributed Transform Execution
### Current State
- All SQLMesh models run on a single worker
- Large transforms can be slow
### Enhancement Idea
For models tagged `distributed`, split execution across multiple workers:
```python
# Detect heavy models that would benefit from parallelization
result = check_pipeline("transform")
if "distributed" in result.tags:
# Split models into batches based on dependency graph
batches = split_execution_graph(result.models)
# Create one worker per batch
workers = [create_worker(f"transform-{i}", "ccx22") for i in range(len(batches))]
# Execute in parallel
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(run_pipeline_on_worker, worker, batch)
for worker, batch in zip(workers, batches)
]
```
---
## Cost Optimization with Multi-Cloud
### Current State
- Only supports Hetzner Cloud
- Provider abstraction exists but not utilized
### Enhancement Idea
Check spot prices across multiple providers and use cheapest:
```python
# src/materia/providers/pricing.py
def get_cheapest_provider(worker_type: str) -> str:
prices = {
"hetzner": get_hetzner_spot_price(worker_type),
"ovh": get_ovh_spot_price(worker_type),
"scaleway": get_scaleway_spot_price(worker_type),
}
return min(prices, key=prices.get)
# In supervisor.sh or materia CLI
PROVIDER=$(materia provider cheapest --type ccx22)
materia pipeline run transform --provider $PROVIDER
```
---
## Observability Enhancements
### Current State
- Logs go to systemd journal
- No metrics or alerting
### Enhancement Ideas
**1. Prometheus Metrics:**
```bash
# Expose metrics endpoint on supervisor
curl http://supervisor:9090/metrics
# Example metrics:
materia_pipeline_runs_total{pipeline="extract",status="success"} 42
materia_pipeline_duration_seconds{pipeline="transform"} 1234
materia_worker_cost_euros{pipeline="extract"} 0.05
```
**2. Alerting:**
```yaml
# alerts/pipelines.yml
- alert: PipelineFailure
expr: rate(materia_pipeline_runs_total{status="failure"}[1h]) > 0
for: 5m
annotations:
summary: Pipeline {{ $labels.pipeline }} is failing
```
**3. Web Dashboard:**
Simple web UI showing:
- Current supervisor status
- Recent pipeline runs
- Active workers
- Cost tracking
---
## Self-Healing Capabilities
### Enhancement Idea
Supervisor automatically handles common failure modes:
```bash
# In supervisor.sh
run_with_retry() {
local pipeline=$1
local max_retries=3
local retry_count=0
while [ $retry_count -lt $max_retries ]; do
if materia pipeline run $pipeline; then
return 0
fi
retry_count=$((retry_count + 1))
log "Pipeline $pipeline failed, retry $retry_count/$max_retries"
# Exponential backoff
sleep $((2 ** retry_count * 60))
done
log_error "Pipeline $pipeline failed after $max_retries retries"
# Send alert
return 1
}
```
---
## Priority Queuing
### Current State
- Pipelines run sequentially based on schedule
### Enhancement Idea
Add priority system for pipeline execution:
```python
@dataclass
class PipelineConfig:
priority: int = 0 # Higher = more important
PIPELINES = {
"extract": PipelineConfig(priority=10, ...), # Highest
"transform": PipelineConfig(priority=5, ...),
"export": PipelineConfig(priority=1, ...), # Lowest
}
```
Supervisor maintains a priority queue and executes highest-priority work first.
---
## Notes
- These are ideas for future consideration, not immediate requirements
- Implement incrementally as needs arise
- Keep simplicity as a guiding principle

7
infra/Pulumi.prod.yaml Normal file
View File

@@ -0,0 +1,7 @@
# Production stack configuration
# All secrets come from Pulumi ESC environment: beanflows/prod
environment:
- beanflows/prod
config:
materia-infrastructure:hetzner_location: "nbg1" # Nuremberg, Germany

View File

@@ -1,108 +1,76 @@
""" """
BeanFlows.coffee Infrastructure BeanFlows.coffee Infrastructure
Cloudflare R2 + Iceberg + Hetzner compute stack Hetzner compute stack for ephemeral worker orchestration
Note: R2 buckets are managed manually in Cloudflare dashboard
- beanflows-artifacts: Stores CLI and pipeline artifacts
- beanflows-data-prod: Iceberg data lakehouse
""" """
import pulumi import pulumi
import pulumi_cloudflare as cloudflare
import pulumi_hcloud as hcloud import pulumi_hcloud as hcloud
# Load configuration # Load configuration
config = pulumi.Config() config = pulumi.Config()
cloudflare_account_id = config.require("cloudflare_account_id")
hetzner_location = config.get("hetzner_location") or "nbg1" # Nuremberg datacenter hetzner_location = config.get("hetzner_location") or "nbg1" # Nuremberg datacenter
# ============================================================ # ============================================================
# Cloudflare R2 Storage + Data Catalog (Iceberg) # R2 Bucket Names (managed manually in Cloudflare R2 UI)
# ============================================================ # ============================================================
# R2 buckets cannot be managed via Pulumi as they require R2-specific tokens
# that don't work with the Cloudflare Pulumi provider.
# These are defined here for documentation purposes only.
# R2 bucket for raw data (extraction outputs) ARTIFACTS_BUCKET = "beanflows-artifacts" # CLI + extract/transform packages
raw_bucket = cloudflare.R2Bucket( LAKEHOUSE_BUCKET = "beanflows-data-prod" # Iceberg tables (EEUR region)
"materia-raw",
account_id=cloudflare_account_id,
name="materia-raw",
location="weur", # Western Europe
)
# R2 bucket for lakehouse (Iceberg tables)
lakehouse_bucket = cloudflare.R2Bucket(
"materia-lakehouse",
account_id=cloudflare_account_id,
name="materia-lakehouse",
location="weur",
)
# TODO: Enable R2 Data Catalog (Iceberg) on lakehouse bucket
# Note: As of Oct 2025, R2 Data Catalog is in public beta
# May need to enable via Cloudflare dashboard or API once SDK supports it
# For now, document manual step in README
# API token for R2 access (needs R2 + Data Catalog permissions)
# Note: Create this manually in Cloudflare dashboard and store in Pulumi config
# pulumi config set --secret cloudflare_r2_token <token>
# ============================================================ # ============================================================
# Hetzner Cloud Infrastructure # Hetzner Cloud Infrastructure
# ============================================================ # ============================================================
# SSH key for server access # SSH key for server access (imported from existing key)
ssh_key = hcloud.SshKey( ssh_key = hcloud.SshKey(
"materia-ssh-key", "materia-ssh-key",
name="materia-deployment-key", name="deeman@DeemanPC",
public_key=config.require_secret("ssh_public_key"), public_key=config.require_secret("ssh_public_key"),
opts=pulumi.ResourceOptions(protect=True),
) )
# Small CCX instance for scheduler/orchestrator # Small CPX instance for supervisor (runs materia CLI to orchestrate pipelines)
# This runs the cron scheduler + lightweight tasks # This is an always-on instance that creates/destroys ephemeral workers on-demand
scheduler_server = hcloud.Server( supervisor_server = hcloud.Server(
"materia-scheduler", "materia-supervisor",
name="materia-scheduler", name="materia-supervisor",
server_type="ccx12", # 2 vCPU, 8GB RAM, ~€6/mo server_type="cpx11", # 2 vCPU (shared), 2GB RAM, ~€4.49/mo (cheapest option)
image="ubuntu-24.04", image="ubuntu-24.04",
location=hetzner_location, location=hetzner_location,
ssh_keys=[ssh_key.id], ssh_keys=[ssh_key.id],
labels={ labels={
"role": "scheduler", "role": "supervisor",
"project": "materia", "project": "materia",
}, },
user_data="""#!/bin/bash user_data="""#!/bin/bash
set -e
# Basic server setup # Basic server setup
apt-get update apt-get update
apt-get install -y python3.13 python3-pip git curl apt-get install -y python3.13 python3-pip curl unzip
# Install uv # Install Pulumi ESC CLI
curl -LsSf https://astral.sh/uv/install.sh | sh curl -fsSL https://get.pulumi.com/esc/install.sh | sh
export PATH="$HOME/.pulumi/bin:$PATH"
echo 'export PATH="$HOME/.pulumi/bin:$PATH"' >> /root/.bashrc
# Create deployment directory
mkdir -p /opt/materia
# Configure environment # Configure environment
echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> /root/.bashrc echo 'Setup complete. Materia CLI will be deployed via CI/CD.' > /opt/materia/README.txt
""", """,
) )
# Larger CCX instance for heavy SQLMesh workloads # Note: Workers are created on-demand by the materia CLI
# This gets spun up on-demand for big transformations # No always-on worker instances in this architecture
worker_server = hcloud.Server(
"materia-worker-01",
name="materia-worker-01",
server_type="ccx22", # 4 vCPU, 16GB RAM, ~€24/mo
image="ubuntu-24.04",
location=hetzner_location,
ssh_keys=[ssh_key.id],
labels={
"role": "worker",
"project": "materia",
},
user_data="""#!/bin/bash
# Basic server setup
apt-get update
apt-get install -y python3.13 python3-pip git curl
# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh
# Configure environment
echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> /root/.bashrc
""",
)
# Firewall for servers (restrict to SSH + outbound only) # Firewall for servers (restrict to SSH + outbound only)
firewall = hcloud.Firewall( firewall = hcloud.Firewall(
@@ -132,36 +100,17 @@ firewall = hcloud.Firewall(
], ],
) )
# Apply firewall to all servers # Apply firewall to supervisor
scheduler_firewall = hcloud.FirewallAttachment( supervisor_firewall = hcloud.FirewallAttachment(
"scheduler-firewall", "supervisor-firewall",
firewall_id=firewall.id, firewall_id=firewall.id,
server_ids=[scheduler_server.id], server_ids=[supervisor_server.id],
)
worker_firewall = hcloud.FirewallAttachment(
"worker-firewall",
firewall_id=firewall.id,
server_ids=[worker_server.id],
) )
# ============================================================ # ============================================================
# Outputs # Outputs
# ============================================================ # ============================================================
pulumi.export("raw_bucket_name", raw_bucket.name) pulumi.export("supervisor_ip", supervisor_server.ipv4_address)
pulumi.export("lakehouse_bucket_name", lakehouse_bucket.name) pulumi.export("artifacts_bucket_name", ARTIFACTS_BUCKET)
pulumi.export("scheduler_ip", scheduler_server.ipv4_address) pulumi.export("lakehouse_bucket_name", LAKEHOUSE_BUCKET)
pulumi.export("worker_ip", worker_server.ipv4_address)
# Export connection info for DuckDB
pulumi.export(
"duckdb_r2_config",
pulumi.Output.all(cloudflare_account_id, lakehouse_bucket.name).apply(
lambda args: {
"account_id": args[0],
"bucket": args[1],
"catalog_uri": f"https://catalog.cloudflarestorage.com/{args[0]}/r2-data-catalog",
}
),
)

111
infra/bootstrap_supervisor.sh Executable file
View File

@@ -0,0 +1,111 @@
#!/bin/bash
# Bootstrap script for Materia supervisor instance
# Run this once on a new supervisor to set it up
#
# Usage:
# From CI/CD or locally:
# ssh root@<supervisor_ip> 'bash -s' < infra/bootstrap_supervisor.sh
#
# Or on the supervisor itself:
# curl -fsSL <url-to-this-script> | bash
set -euo pipefail
echo "=== Materia Supervisor Bootstrap ==="
echo "This script will:"
echo " 1. Install dependencies (git, uv, esc)"
echo " 2. Clone the materia repository"
echo " 3. Setup systemd service"
echo " 4. Start the supervisor"
echo ""
# Check if we're root
if [ "$EUID" -ne 0 ]; then
echo "ERROR: This script must be run as root"
exit 1
fi
# Configuration
REPO_DIR="/opt/materia"
GITLAB_PROJECT="deemanone/materia"
# GITLAB_READ_TOKEN should be set in Pulumi ESC (beanflows/prod)
if [ -z "${GITLAB_READ_TOKEN:-}" ]; then
echo "ERROR: GITLAB_READ_TOKEN environment variable not set"
echo "Please add it to Pulumi ESC (beanflows/prod) first"
exit 1
fi
REPO_URL="https://gitlab-ci-token:${GITLAB_READ_TOKEN}@gitlab.com/${GITLAB_PROJECT}.git"
echo "--- Installing system dependencies ---"
apt-get update
apt-get install -y git curl python3-pip
echo "--- Installing uv ---"
if ! command -v uv &> /dev/null; then
curl -LsSf https://astral.sh/uv/install.sh | sh
export PATH="$HOME/.cargo/bin:$PATH"
echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> /root/.bashrc
fi
echo "--- Installing Pulumi ESC ---"
if ! command -v esc &> /dev/null; then
curl -fsSL https://get.pulumi.com/esc/install.sh | sh
export PATH="$HOME/.pulumi/bin:$PATH"
echo 'export PATH="$HOME/.pulumi/bin:$PATH"' >> /root/.bashrc
fi
echo "--- Setting up Pulumi ESC authentication ---"
if [ -z "${PULUMI_ACCESS_TOKEN:-}" ]; then
echo "ERROR: PULUMI_ACCESS_TOKEN environment variable not set"
echo "Please set it before running this script:"
echo " export PULUMI_ACCESS_TOKEN=<your-token>"
exit 1
fi
esc login --token "$PULUMI_ACCESS_TOKEN"
echo "--- Loading secrets from Pulumi ESC ---"
eval $(esc env open beanflows/prod --format shell)
echo "--- Cloning repository ---"
if [ -d "$REPO_DIR" ]; then
echo "Repository already exists, pulling latest..."
cd "$REPO_DIR"
git pull origin master
else
git clone "$REPO_URL" "$REPO_DIR"
cd "$REPO_DIR"
fi
echo "--- Installing Python dependencies ---"
uv sync
echo "--- Creating environment file ---"
cat > "$REPO_DIR/.env" <<EOF
# Environment variables for supervisor
# Loaded from Pulumi ESC: beanflows/prod
PULUMI_ACCESS_TOKEN=${PULUMI_ACCESS_TOKEN}
PATH=/root/.cargo/bin:/root/.pulumi/bin:/usr/local/bin:/usr/bin:/bin
EOF
echo "--- Setting up systemd service ---"
cp "$REPO_DIR/infra/supervisor/materia-supervisor.service" /etc/systemd/system/materia-supervisor.service
echo "--- Enabling and starting service ---"
systemctl daemon-reload
systemctl enable materia-supervisor
systemctl start materia-supervisor
echo ""
echo "=== Bootstrap complete! ==="
echo ""
echo "Supervisor is now running. Check status with:"
echo " systemctl status materia-supervisor"
echo ""
echo "View logs with:"
echo " journalctl -u materia-supervisor -f"
echo ""
echo "Repository location: $REPO_DIR"
echo "Current commit: $(cd $REPO_DIR && git rev-parse --short HEAD)"

View File

@@ -0,0 +1,24 @@
[Unit]
Description=Materia Supervisor - Pipeline Orchestration
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=root
WorkingDirectory=/opt/materia
ExecStart=/opt/materia/infra/supervisor/supervisor.sh
Restart=always
RestartSec=10
EnvironmentFile=/opt/materia/.env
# Resource limits
LimitNOFILE=65536
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=materia-supervisor
[Install]
WantedBy=multi-user.target

View File

@@ -0,0 +1,34 @@
#!/bin/sh
# Materia Supervisor - Continuous pipeline orchestration
# Inspired by TigerBeetle's CFO supervisor: simple, resilient, easy to understand
# https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh
set -eu
readonly REPO_DIR="/opt/materia"
while true
do
(
# Clone repo if missing
if ! [ -d "$REPO_DIR/.git" ]
then
echo "Repository not found, bootstrap required!"
exit 1
fi
cd "$REPO_DIR"
# Update code from git
git fetch origin master
git switch --discard-changes --detach origin/master
uv sync
# Run pipelines (SQLMesh handles scheduling)
uv run materia pipeline run extract
uv run materia pipeline run transform
) || sleep 600 # Sleep 10 min on failure to avoid busy-loop retries
sleep 3600 # Run pipelines every hour
done

View File

@@ -0,0 +1,92 @@
# Materia SQLMesh Transform Layer
Data transformation pipeline using SQLMesh and DuckDB, implementing a 4-layer architecture.
## Quick Start
```bash
cd transform/sqlmesh_materia
# Local development (virtual environment)
sqlmesh plan dev_<username>
# Production
sqlmesh plan prod
# Run tests
sqlmesh test
# Format SQL
sqlmesh format
```
## Architecture
### Gateway Configuration
**Single Gateway:** All environments connect to Cloudflare R2 Data Catalog (Apache Iceberg)
- **Production:** `sqlmesh plan prod`
- **Development:** `sqlmesh plan dev_<username>` (isolated virtual environment)
SQLMesh manages environment isolation automatically - no need for separate local databases.
### 4-Layer Data Model
See `models/README.md` for detailed architecture documentation:
1. **Raw** - Immutable source data
2. **Staging** - Schema, types, basic cleansing
3. **Cleaned** - Business logic, integration
4. **Serving** - Analytics-ready (facts, dimensions, aggregates)
## Configuration
**Config:** `config.yaml`
- DuckDB in-memory with R2 Iceberg catalog
- Extensions: httpfs, iceberg
- Auto-apply enabled (no prompts)
- Initialization hooks for R2 secret/catalog attachment
## Commands
```bash
# Plan changes for dev environment
sqlmesh plan dev_yourname
# Plan changes for prod
sqlmesh plan prod
# Run tests
sqlmesh test
# Validate models
sqlmesh validate
# Run audits
sqlmesh audit
# Format SQL files
sqlmesh format
# Start web UI
sqlmesh ui
```
## Environment Variables (Prod)
Required for production R2 Iceberg catalog:
- `CLOUDFLARE_API_TOKEN` - R2 API token
- `ICEBERG_REST_URI` - R2 catalog REST endpoint
- `R2_WAREHOUSE_NAME` - Warehouse name (default: "materia")
These are injected via Pulumi ESC (`beanflows/prod`) on the supervisor instance.
## Development Workflow
1. Make changes to models in `models/`
2. Test locally: `sqlmesh test`
3. Plan changes: `sqlmesh plan dev_yourname`
4. Review and apply changes
5. Commit and push to trigger CI/CD
SQLMesh will handle environment isolation, table versioning, and incremental updates automatically.

View File

@@ -1,18 +1,8 @@
# --- Gateway Connection --- # --- Gateway Connection ---
# Single gateway connecting to R2 Iceberg catalog
# Local dev uses virtual environments (e.g., dev_<username>)
# Production uses the 'prod' environment
gateways: gateways:
dev:
connection:
# For more information on configuring the connection to your execution engine, visit:
# https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#connection
# https://sqlmesh.readthedocs.io/en/stable/integrations/engines/duckdb/#connection-options
type: duckdb
database: materia_dev.db
extensions:
- name: zipfs
- name: httpfs
- name: iceberg
prod: prod:
connection: connection:
type: duckdb type: duckdb
@@ -20,20 +10,26 @@ gateways:
extensions: extensions:
- name: httpfs - name: httpfs
- name: iceberg - name: iceberg
init_script: |
CREATE SECRET IF NOT EXISTS r2_secret (
TYPE ICEBERG,
TOKEN '{{ env_var("CLOUDFLARE_API_TOKEN") }}'
);
ATTACH '{{ env_var("R2_WAREHOUSE_NAME", "materia") }}' AS catalog (
TYPE ICEBERG,
ENDPOINT '{{ env_var("ICEBERG_REST_URI") }}'
);
CREATE SCHEMA IF NOT EXISTS catalog.materia;
USE catalog.materia;
default_gateway: prod
default_gateway: dev
# --- Hooks ---
# Run initialization SQL before all plans/runs
# https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#execution-hooks
before_all:
- |
CREATE SECRET IF NOT EXISTS r2_secret (
TYPE ICEBERG,
TOKEN '@env_var("CLOUDFLARE_API_TOKEN")'
)
- |
ATTACH '@env_var("R2_WAREHOUSE_NAME", "materia")' AS catalog (
TYPE ICEBERG,
ENDPOINT '@env_var("ICEBERG_REST_URI")'
)
- CREATE SCHEMA IF NOT EXISTS catalog.materia
- USE catalog.materia
# --- Model Defaults --- # --- Model Defaults ---
# https://sqlmesh.readthedocs.io/en/stable/reference/model_configuration/#model-defaults # https://sqlmesh.readthedocs.io/en/stable/reference/model_configuration/#model-defaults