diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 4a830f5..e67f643 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -3,7 +3,6 @@ image: python:3.13 stages: - lint - test - - build - deploy variables: @@ -31,7 +30,6 @@ lint: script: - uv sync - uv run ruff check . - - uv run ruff format --check . test:cli: stage: test @@ -55,83 +53,6 @@ test:sqlmesh: - uv sync - cd transform/sqlmesh_materia && uv run sqlmesh test -build:extract: - stage: build - before_script: - - *uv_setup - script: - - uv sync - - mkdir -p dist - - uv build --package psdonline --out-dir dist/extract - - cd dist/extract && tar -czf ../materia-extract-latest.tar.gz . - artifacts: - paths: - - dist/materia-extract-latest.tar.gz - expire_in: 1 week - rules: - - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH - -build:transform: - stage: build - before_script: - - *uv_setup - script: - - uv sync - - mkdir -p dist - - uv build --package sqlmesh_materia --out-dir dist/transform - - cd dist/transform && tar -czf ../materia-transform-latest.tar.gz . - artifacts: - paths: - - dist/materia-transform-latest.tar.gz - expire_in: 1 week - rules: - - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH - -build:cli: - stage: build - before_script: - - *uv_setup - script: - - uv sync - - mkdir -p dist - - uv build --out-dir dist/cli - - cd dist/cli && tar -czf ../materia-cli-latest.tar.gz . - artifacts: - paths: - - dist/materia-cli-latest.tar.gz - expire_in: 1 week - rules: - - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH - -deploy:r2: - stage: deploy - image: rclone/rclone:latest - before_script: - - apk add --no-cache curl unzip - - curl -fsSL https://get.pulumi.com/esc/install.sh | sh - - export PATH="$HOME/.pulumi/bin:$PATH" - - esc login --token ${PULUMI_ACCESS_TOKEN} - - eval $(esc env open beanflows/prod --format shell) - - | - mkdir -p ~/.config/rclone - cat > ~/.config/rclone/rclone.conf < ~/.ssh/id_rsa + chmod 600 ~/.ssh/id_rsa + ssh-keyscan -H $SUPERVISOR_IP >> ~/.ssh/known_hosts + + # Wait for SSH to be ready (new instance may take a moment) + echo "Waiting for SSH to be ready..." + for i in $(seq 1 30); do + if ssh -o ConnectTimeout=5 root@${SUPERVISOR_IP} "echo 'SSH ready'"; then + break + fi + echo "Attempt $i/30 failed, retrying..." + sleep 10 + done + + # Check if supervisor is bootstrapped + if ssh root@${SUPERVISOR_IP} "test -d /opt/materia/.git"; then + echo "Supervisor already bootstrapped and will auto-update" + ssh root@${SUPERVISOR_IP} "systemctl status materia-supervisor --no-pager" + else + echo "Bootstrapping supervisor for the first time..." + # Export secrets and run bootstrap + ssh root@${SUPERVISOR_IP} "export PULUMI_ACCESS_TOKEN='${PULUMI_ACCESS_TOKEN}' GITLAB_READ_TOKEN='${GITLAB_READ_TOKEN}' && bash -s" < infra/bootstrap_supervisor.sh + echo "Bootstrap complete!" + fi + dependencies: + - deploy:infra + rules: + - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH diff --git a/CLAUDE.md b/CLAUDE.md index eecad36..9f97d58 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -55,8 +55,11 @@ SQLMesh project implementing a layered data architecture. ```bash cd transform/sqlmesh_materia -# Plan changes (no prompts, auto-apply enabled in config) -sqlmesh plan +# Local development (creates virtual environment) +sqlmesh plan dev_ + +# Production +sqlmesh plan prod # Run tests sqlmesh test @@ -76,10 +79,17 @@ sqlmesh ui **Configuration:** - Config: `transform/sqlmesh_materia/config.yaml` -- Default gateway: `dev` (uses `materia_dev.db`) -- Production gateway: `prod` (uses `materia_prod.db`) +- Single gateway: `prod` (connects to R2 Iceberg catalog) +- Uses virtual environments for dev isolation (e.g., `dev_deeman`) +- Production uses `prod` environment - Auto-apply enabled, no interactive prompts -- DuckDB extensions: zipfs, httpfs, iceberg +- DuckDB extensions: httpfs, iceberg + +**Environment Strategy:** +- All environments connect to the same R2 Iceberg catalog +- Dev environments (e.g., `dev_deeman`) are isolated virtual environments +- SQLMesh manages environment isolation and table versioning +- No local DuckDB files needed ### 3. Core Package (`src/materia/`) Currently minimal; main logic resides in workspace packages. @@ -168,11 +178,11 @@ pytest --cov=./ --cov-report=xml ### CI/CD Pipeline (`.gitlab-ci.yml`) -**4 Stages: Lint → Test → Build → Deploy** +**3 Stages: Lint → Test → Deploy** #### 1. Lint Stage -- Runs `ruff check` and `ruff format --check` -- Validates code quality on every commit +- Runs `ruff check` on every commit +- Validates code quality #### 2. Test Stage - **`test:cli`**: Runs pytest on materia CLI with 71% coverage @@ -182,41 +192,51 @@ pytest --cov=./ --cov-report=xml - Exports coverage reports to GitLab - **`test:sqlmesh`**: Runs SQLMesh model tests in transform layer -#### 3. Build Stage (only on master branch) -Creates separate artifacts for each workspace package: -- **`build:extract`**: Builds `materia-extract-latest.tar.gz` (psdonline package) -- **`build:transform`**: Builds `materia-transform-latest.tar.gz` (sqlmesh_materia package) -- **`build:cli`**: Builds `materia-cli-latest.tar.gz` (materia management CLI) +#### 3. Deploy Stage (only on master branch) +- **`deploy:infra`**: Runs `pulumi up` to ensure supervisor instance exists + - Runs on every master push + - Creates/updates Hetzner CPX11 supervisor instance (~€4.49/mo) + - Uses Pulumi ESC (`beanflows/prod`) for all secrets +- **`deploy:supervisor`**: Checks supervisor status + - Verifies supervisor is bootstrapped + - Supervisor auto-updates via `git pull` every 15 minutes (no CI/CD deployment needed) -Each artifact is a self-contained tarball with all dependencies. +**Note:** No build artifacts! Supervisor pulls code directly from git and runs via `uv`. -#### 4. Deploy Stage (only on master branch) -- **`deploy:r2`**: Uploads artifacts to Cloudflare R2 using rclone - - Loads secrets from Pulumi ESC (`beanflows/prod`) - - Only requires `PULUMI_ACCESS_TOKEN` in GitLab variables - - All other secrets (R2 credentials, SSH keys, API tokens) come from ESC -- **`deploy:infra`**: Runs `pulumi up` to deploy infrastructure changes - - Only triggers when `infra/**/*` files change - -### Production Architecture: Ephemeral Worker Model +### Production Architecture: Git-Based Deployment with Ephemeral Workers **Design Philosophy:** - No always-on workers (cost optimization) -- Supervisor instance dynamically creates/destroys workers on-demand -- Language-agnostic artifacts enable future migration to C/Rust/Go +- Supervisor pulls latest code from git (no artifact builds) +- Supervisor dynamically creates/destroys workers on-demand +- Simple, inspectable, easy to test locally - Multi-cloud abstraction for pricing optimization **Components:** #### 1. Supervisor Instance (Small Hetzner VM) -- Runs the `materia` management CLI -- Small, always-on instance (cheap) +- Runs `supervisor.sh` - continuous orchestration loop (inspired by TigerBeetle's CFO supervisor) +- Hetzner CPX11: 2 vCPU (shared), 2GB RAM (~€4.49/mo) +- Always-on, minimal resource usage +- Git-based deployment: `git pull` every 15 minutes for auto-updates +- Runs pipelines on schedule: + - Extract: Daily at 2 AM UTC + - Transform: Daily at 3 AM UTC +- Uses systemd service for automatic restart on failure - Pulls secrets from Pulumi ESC -- Orchestrates worker lifecycle via cloud provider APIs + +**Bootstrap (one-time):** +```bash +# Get supervisor IP from Pulumi +cd infra && pulumi stack output supervisor_ip -s prod + +# Run bootstrap script +export PULUMI_ACCESS_TOKEN= +ssh root@ 'bash -s' < infra/bootstrap_supervisor.sh +``` #### 2. Ephemeral Workers (On-Demand) -- Created for each pipeline execution -- Downloads pre-built artifacts from R2 (no git, no uv on worker) +- Created for each pipeline execution by materia CLI - Receives secrets via SSH environment variable injection - Destroyed immediately after job completion - Different instance types per pipeline: @@ -227,34 +247,39 @@ Each artifact is a self-contained tarball with all dependencies. ``` Pulumi ESC (beanflows/prod) ↓ -Supervisor Instance (materia CLI) +Supervisor Instance (via esc CLI) ↓ Workers (injected as env vars via SSH) ``` -#### 4. Artifact Flow +#### 4. Code Deployment Flow ``` -GitLab CI: uv build → tar.gz +GitLab (master branch) ↓ -Cloudflare R2 (artifact storage) +Supervisor: git pull origin master (every 15 min) ↓ -Worker: curl → extract → execute +Supervisor: uv sync (update dependencies) + ↓ +Supervisor: uv run materia pipeline run ``` #### 5. Data Storage -- **Dev**: Local DuckDB file (`materia_dev.db`) -- **Prod**: DuckDB in-memory + Cloudflare R2 Data Catalog (Iceberg REST API) +- **All environments**: DuckDB in-memory + Cloudflare R2 Data Catalog (Iceberg REST API) - ACID transactions on object storage - No persistent database on workers + - Virtual environments for dev isolation (e.g., `dev_deeman`) **Execution Flow:** -1. Supervisor receives schedule trigger (cron/manual) -2. CLI runs: `materia pipeline run extract` -3. Creates Hetzner worker with SSH key -4. Worker downloads `materia-extract-latest.tar.gz` from R2 -5. CLI injects secrets via SSH: `export R2_ACCESS_KEY_ID=... && ./extract_psd` -6. Pipeline executes, writes to R2 Iceberg catalog -7. Worker destroyed (entire lifecycle ~5-10 minutes) +1. Supervisor loop wakes up every 15 minutes +2. Runs `git fetch` and checks if new commits on master +3. If updates available: `git pull && uv sync` +4. Checks if current time matches pipeline schedule (e.g., 2 AM for extract) +5. If scheduled: `uv run materia pipeline run extract` +6. CLI creates Hetzner worker with SSH key +7. CLI injects secrets via SSH and executes pipeline +8. Pipeline executes, writes to R2 Iceberg catalog +9. Worker destroyed (entire lifecycle ~5-10 minutes) +10. Supervisor logs results and continues loop **Multi-Cloud Provider Abstraction:** - Protocol-based interface (data-oriented design, no OOP) @@ -284,11 +309,15 @@ Worker: curl → extract → execute - Leverage SQLMesh's built-in time macros (`@start_ds`, `@end_ds`) - Keep raw layer thin, push transformations to staging+ -## Database Location +## Data Storage -- **Dev database:** `materia_dev.db` (13GB, in project root) -- **Prod database:** `materia_prod.db` (not yet created) - -Note: The dev database is large and should not be committed to git (.gitignore already configured). +All data is stored in Cloudflare R2 Data Catalog (Apache Iceberg) via REST API: +- **Production environment:** `prod` +- **Dev environments:** `dev_` (virtual environments) +- SQLMesh manages environment isolation and table versioning +- No local database files needed - We use a monorepo with uv workspaces - The pulumi env is called beanflows/prod +- NEVER hardcode secrets in plaintext +- Never add ssh keys to the git repo! +- If there is a simpler more direct solution and there is no other tradeoff, always choose the simpler solution \ No newline at end of file diff --git a/beanflows_ssh b/beanflows_ssh deleted file mode 100644 index 89c5f73..0000000 --- a/beanflows_ssh +++ /dev/null @@ -1,7 +0,0 @@ ------BEGIN OPENSSH PRIVATE KEY----- -b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW -QyNTUxOQAAACCfGESotAKXA3uc2Mu90jYfpbwqZyRF+VytareVIN3PkgAAAJjG2ri3xtq4 -twAAAAtzc2gtZWQyNTUxOQAAACCfGESotAKXA3uc2Mu90jYfpbwqZyRF+VytareVIN3Pkg -AAAECiPTY1dlijk3nvQcqZckzW2RddBhlqRTp4CMqrqj4oLJ8YRKi0ApcDe5zYy73SNh+l -vCpnJEX5XK1qt5Ug3c+SAAAAD2RlZW1hbkBEZWVtYW5QQwECAwQFBg== ------END OPENSSH PRIVATE KEY----- diff --git a/beanflows_ssh.pub b/beanflows_ssh.pub deleted file mode 100644 index 1f6e925..0000000 --- a/beanflows_ssh.pub +++ /dev/null @@ -1 +0,0 @@ -ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIJ8YRKi0ApcDe5zYy73SNh+lvCpnJEX5XK1qt5Ug3c+S deeman@DeemanPC diff --git a/infra/FUTURE_ENHANCEMENTS.md b/infra/FUTURE_ENHANCEMENTS.md new file mode 100644 index 0000000..6c91687 --- /dev/null +++ b/infra/FUTURE_ENHANCEMENTS.md @@ -0,0 +1,295 @@ +# Future Enhancement Ideas + +This document captures ideas for future improvements to the Materia infrastructure and pipeline orchestration. + +## Smart SQLMesh-Aware Scheduling + +### Current State +- Extract pipeline runs on a fixed schedule (daily at 2 AM UTC) +- Transform pipeline runs on a fixed schedule (daily at 3 AM UTC) +- No awareness of whether SQLMesh models actually need refreshing + +### Enhancement Idea +Add `materia pipeline check ` command that uses SQLMesh's internal scheduler to determine if work is needed: + +```bash +# Check if any models need refresh +materia pipeline check transform + +# Returns: +# - Exit code 0 if work needed +# - Exit code 1 if nothing to do +# - Prints list of models that would be updated +``` + +**Implementation:** +```python +# src/materia/pipelines.py +def check_pipeline(pipeline_name: str) -> PipelineCheckResult: + """Check if pipeline has work to do without creating a worker.""" + if pipeline_name == "transform": + # Run sqlmesh plan --dry-run locally + # Parse output to see what models need refresh + # Return list of models + their tags + pass +``` + +**Supervisor Integration:** +```bash +# In supervisor.sh +if materia pipeline check transform; then + echo "Transform has work to do, scheduling..." + materia pipeline run transform +else + echo "Transform up to date, skipping" +fi +``` + +--- + +## Model Tags for Worker Sizing + +### Current State +- All transform pipelines run on the same worker type (ccx22) +- No way to specify that certain models need more resources + +### Enhancement Idea +Use SQLMesh model tags to hint at resource requirements: + +```sql +-- models/serving/obt_commodity_metrics.sql +MODEL ( + name serving.obt_commodity_metrics, + kind INCREMENTAL_BY_TIME_RANGE, + cron '@daily', + tags ['heavy', 'long_running'] -- Supervisor uses this! +) +``` + +**Pipeline Check Enhancement:** +```python +@dataclass +class PipelineCheckResult: + has_work: bool + models: list[str] + tags: set[str] # Aggregated tags from all models needing refresh + +# Check if any heavy models need refresh +result = check_pipeline("transform") +if "heavy" in result.tags: + worker_type = "ccx32" # Use bigger worker +else: + worker_type = "ccx22" # Default worker +``` + +**Supervisor Integration:** +```bash +# In supervisor.sh +RESULT=$(materia pipeline check transform --format json) + +if [ "$(echo $RESULT | jq -r '.has_work')" = "true" ]; then + TAGS=$(echo $RESULT | jq -r '.tags | join(",")') + + if echo "$TAGS" | grep -q "heavy"; then + WORKER_TYPE="ccx32" + else + WORKER_TYPE="ccx22" + fi + + materia pipeline run transform --worker $WORKER_TYPE +fi +``` + +**Tag Conventions:** +- `heavy`: Requires larger worker (ccx32+) +- `long_running`: Expected to take >1 hour +- `gpu`: Requires GPU instance (future) +- `distributed`: Could benefit from multi-node execution (future) + +--- + +## Multi-Pipeline Support + +### Current State +- Only extract and transform pipelines +- Extract doesn't use SQLMesh scheduling + +### Enhancement Idea +Make extract pipeline also SQLMesh-aware, add more pipeline types: + +**New Pipelines:** +- `dbt`: For dbt models (if we add dbt support) +- `ml`: For machine learning model training +- `export`: For exporting data to external systems + +**Extract with Scheduling:** +Instead of fixed schedule, use SQLMesh to track last successful extract: + +```sql +-- models/raw/psd_alldata.sql +MODEL ( + name raw.psd_alldata, + kind INCREMENTAL_BY_TIME_RANGE, + cron '@daily', -- SQLMesh tracks this! + tags ['extract'] +) +``` + +Then supervisor checks if extract needs running based on SQLMesh state. + +--- + +## Distributed Transform Execution + +### Current State +- All SQLMesh models run on a single worker +- Large transforms can be slow + +### Enhancement Idea +For models tagged `distributed`, split execution across multiple workers: + +```python +# Detect heavy models that would benefit from parallelization +result = check_pipeline("transform") + +if "distributed" in result.tags: + # Split models into batches based on dependency graph + batches = split_execution_graph(result.models) + + # Create one worker per batch + workers = [create_worker(f"transform-{i}", "ccx22") for i in range(len(batches))] + + # Execute in parallel + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(run_pipeline_on_worker, worker, batch) + for worker, batch in zip(workers, batches) + ] +``` + +--- + +## Cost Optimization with Multi-Cloud + +### Current State +- Only supports Hetzner Cloud +- Provider abstraction exists but not utilized + +### Enhancement Idea +Check spot prices across multiple providers and use cheapest: + +```python +# src/materia/providers/pricing.py +def get_cheapest_provider(worker_type: str) -> str: + prices = { + "hetzner": get_hetzner_spot_price(worker_type), + "ovh": get_ovh_spot_price(worker_type), + "scaleway": get_scaleway_spot_price(worker_type), + } + return min(prices, key=prices.get) + +# In supervisor.sh or materia CLI +PROVIDER=$(materia provider cheapest --type ccx22) +materia pipeline run transform --provider $PROVIDER +``` + +--- + +## Observability Enhancements + +### Current State +- Logs go to systemd journal +- No metrics or alerting + +### Enhancement Ideas + +**1. Prometheus Metrics:** +```bash +# Expose metrics endpoint on supervisor +curl http://supervisor:9090/metrics + +# Example metrics: +materia_pipeline_runs_total{pipeline="extract",status="success"} 42 +materia_pipeline_duration_seconds{pipeline="transform"} 1234 +materia_worker_cost_euros{pipeline="extract"} 0.05 +``` + +**2. Alerting:** +```yaml +# alerts/pipelines.yml +- alert: PipelineFailure + expr: rate(materia_pipeline_runs_total{status="failure"}[1h]) > 0 + for: 5m + annotations: + summary: Pipeline {{ $labels.pipeline }} is failing +``` + +**3. Web Dashboard:** +Simple web UI showing: +- Current supervisor status +- Recent pipeline runs +- Active workers +- Cost tracking + +--- + +## Self-Healing Capabilities + +### Enhancement Idea +Supervisor automatically handles common failure modes: + +```bash +# In supervisor.sh +run_with_retry() { + local pipeline=$1 + local max_retries=3 + local retry_count=0 + + while [ $retry_count -lt $max_retries ]; do + if materia pipeline run $pipeline; then + return 0 + fi + + retry_count=$((retry_count + 1)) + log "Pipeline $pipeline failed, retry $retry_count/$max_retries" + + # Exponential backoff + sleep $((2 ** retry_count * 60)) + done + + log_error "Pipeline $pipeline failed after $max_retries retries" + # Send alert + return 1 +} +``` + +--- + +## Priority Queuing + +### Current State +- Pipelines run sequentially based on schedule + +### Enhancement Idea +Add priority system for pipeline execution: + +```python +@dataclass +class PipelineConfig: + priority: int = 0 # Higher = more important + +PIPELINES = { + "extract": PipelineConfig(priority=10, ...), # Highest + "transform": PipelineConfig(priority=5, ...), + "export": PipelineConfig(priority=1, ...), # Lowest +} +``` + +Supervisor maintains a priority queue and executes highest-priority work first. + +--- + +## Notes +- These are ideas for future consideration, not immediate requirements +- Implement incrementally as needs arise +- Keep simplicity as a guiding principle diff --git a/infra/Pulumi.prod.yaml b/infra/Pulumi.prod.yaml new file mode 100644 index 0000000..322a354 --- /dev/null +++ b/infra/Pulumi.prod.yaml @@ -0,0 +1,7 @@ +# Production stack configuration +# All secrets come from Pulumi ESC environment: beanflows/prod +environment: + - beanflows/prod + +config: + materia-infrastructure:hetzner_location: "nbg1" # Nuremberg, Germany diff --git a/infra/__main__.py b/infra/__main__.py index 85e6054..f4672df 100644 --- a/infra/__main__.py +++ b/infra/__main__.py @@ -1,108 +1,76 @@ """ BeanFlows.coffee Infrastructure -Cloudflare R2 + Iceberg + Hetzner compute stack +Hetzner compute stack for ephemeral worker orchestration + +Note: R2 buckets are managed manually in Cloudflare dashboard +- beanflows-artifacts: Stores CLI and pipeline artifacts +- beanflows-data-prod: Iceberg data lakehouse """ import pulumi -import pulumi_cloudflare as cloudflare import pulumi_hcloud as hcloud # Load configuration config = pulumi.Config() -cloudflare_account_id = config.require("cloudflare_account_id") hetzner_location = config.get("hetzner_location") or "nbg1" # Nuremberg datacenter # ============================================================ -# Cloudflare R2 Storage + Data Catalog (Iceberg) +# R2 Bucket Names (managed manually in Cloudflare R2 UI) # ============================================================ +# R2 buckets cannot be managed via Pulumi as they require R2-specific tokens +# that don't work with the Cloudflare Pulumi provider. +# These are defined here for documentation purposes only. -# R2 bucket for raw data (extraction outputs) -raw_bucket = cloudflare.R2Bucket( - "materia-raw", - account_id=cloudflare_account_id, - name="materia-raw", - location="weur", # Western Europe -) - -# R2 bucket for lakehouse (Iceberg tables) -lakehouse_bucket = cloudflare.R2Bucket( - "materia-lakehouse", - account_id=cloudflare_account_id, - name="materia-lakehouse", - location="weur", -) - -# TODO: Enable R2 Data Catalog (Iceberg) on lakehouse bucket -# Note: As of Oct 2025, R2 Data Catalog is in public beta -# May need to enable via Cloudflare dashboard or API once SDK supports it -# For now, document manual step in README - -# API token for R2 access (needs R2 + Data Catalog permissions) -# Note: Create this manually in Cloudflare dashboard and store in Pulumi config -# pulumi config set --secret cloudflare_r2_token +ARTIFACTS_BUCKET = "beanflows-artifacts" # CLI + extract/transform packages +LAKEHOUSE_BUCKET = "beanflows-data-prod" # Iceberg tables (EEUR region) # ============================================================ # Hetzner Cloud Infrastructure # ============================================================ -# SSH key for server access +# SSH key for server access (imported from existing key) ssh_key = hcloud.SshKey( "materia-ssh-key", - name="materia-deployment-key", + name="deeman@DeemanPC", public_key=config.require_secret("ssh_public_key"), + opts=pulumi.ResourceOptions(protect=True), ) -# Small CCX instance for scheduler/orchestrator -# This runs the cron scheduler + lightweight tasks -scheduler_server = hcloud.Server( - "materia-scheduler", - name="materia-scheduler", - server_type="ccx12", # 2 vCPU, 8GB RAM, ~€6/mo +# Small CPX instance for supervisor (runs materia CLI to orchestrate pipelines) +# This is an always-on instance that creates/destroys ephemeral workers on-demand +supervisor_server = hcloud.Server( + "materia-supervisor", + name="materia-supervisor", + server_type="cpx11", # 2 vCPU (shared), 2GB RAM, ~€4.49/mo (cheapest option) image="ubuntu-24.04", location=hetzner_location, ssh_keys=[ssh_key.id], labels={ - "role": "scheduler", + "role": "supervisor", "project": "materia", }, user_data="""#!/bin/bash +set -e + # Basic server setup apt-get update -apt-get install -y python3.13 python3-pip git curl +apt-get install -y python3.13 python3-pip curl unzip -# Install uv -curl -LsSf https://astral.sh/uv/install.sh | sh +# Install Pulumi ESC CLI +curl -fsSL https://get.pulumi.com/esc/install.sh | sh +export PATH="$HOME/.pulumi/bin:$PATH" +echo 'export PATH="$HOME/.pulumi/bin:$PATH"' >> /root/.bashrc + +# Create deployment directory +mkdir -p /opt/materia # Configure environment -echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> /root/.bashrc +echo 'Setup complete. Materia CLI will be deployed via CI/CD.' > /opt/materia/README.txt """, ) -# Larger CCX instance for heavy SQLMesh workloads -# This gets spun up on-demand for big transformations -worker_server = hcloud.Server( - "materia-worker-01", - name="materia-worker-01", - server_type="ccx22", # 4 vCPU, 16GB RAM, ~€24/mo - image="ubuntu-24.04", - location=hetzner_location, - ssh_keys=[ssh_key.id], - labels={ - "role": "worker", - "project": "materia", - }, - user_data="""#!/bin/bash -# Basic server setup -apt-get update -apt-get install -y python3.13 python3-pip git curl - -# Install uv -curl -LsSf https://astral.sh/uv/install.sh | sh - -# Configure environment -echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> /root/.bashrc -""", -) +# Note: Workers are created on-demand by the materia CLI +# No always-on worker instances in this architecture # Firewall for servers (restrict to SSH + outbound only) firewall = hcloud.Firewall( @@ -132,36 +100,17 @@ firewall = hcloud.Firewall( ], ) -# Apply firewall to all servers -scheduler_firewall = hcloud.FirewallAttachment( - "scheduler-firewall", +# Apply firewall to supervisor +supervisor_firewall = hcloud.FirewallAttachment( + "supervisor-firewall", firewall_id=firewall.id, - server_ids=[scheduler_server.id], -) - -worker_firewall = hcloud.FirewallAttachment( - "worker-firewall", - firewall_id=firewall.id, - server_ids=[worker_server.id], + server_ids=[supervisor_server.id], ) # ============================================================ # Outputs # ============================================================ -pulumi.export("raw_bucket_name", raw_bucket.name) -pulumi.export("lakehouse_bucket_name", lakehouse_bucket.name) -pulumi.export("scheduler_ip", scheduler_server.ipv4_address) -pulumi.export("worker_ip", worker_server.ipv4_address) - -# Export connection info for DuckDB -pulumi.export( - "duckdb_r2_config", - pulumi.Output.all(cloudflare_account_id, lakehouse_bucket.name).apply( - lambda args: { - "account_id": args[0], - "bucket": args[1], - "catalog_uri": f"https://catalog.cloudflarestorage.com/{args[0]}/r2-data-catalog", - } - ), -) +pulumi.export("supervisor_ip", supervisor_server.ipv4_address) +pulumi.export("artifacts_bucket_name", ARTIFACTS_BUCKET) +pulumi.export("lakehouse_bucket_name", LAKEHOUSE_BUCKET) diff --git a/infra/bootstrap_supervisor.sh b/infra/bootstrap_supervisor.sh new file mode 100755 index 0000000..3c71497 --- /dev/null +++ b/infra/bootstrap_supervisor.sh @@ -0,0 +1,111 @@ +#!/bin/bash +# Bootstrap script for Materia supervisor instance +# Run this once on a new supervisor to set it up +# +# Usage: +# From CI/CD or locally: +# ssh root@ 'bash -s' < infra/bootstrap_supervisor.sh +# +# Or on the supervisor itself: +# curl -fsSL | bash + +set -euo pipefail + +echo "=== Materia Supervisor Bootstrap ===" +echo "This script will:" +echo " 1. Install dependencies (git, uv, esc)" +echo " 2. Clone the materia repository" +echo " 3. Setup systemd service" +echo " 4. Start the supervisor" +echo "" + +# Check if we're root +if [ "$EUID" -ne 0 ]; then + echo "ERROR: This script must be run as root" + exit 1 +fi + +# Configuration +REPO_DIR="/opt/materia" +GITLAB_PROJECT="deemanone/materia" + +# GITLAB_READ_TOKEN should be set in Pulumi ESC (beanflows/prod) +if [ -z "${GITLAB_READ_TOKEN:-}" ]; then + echo "ERROR: GITLAB_READ_TOKEN environment variable not set" + echo "Please add it to Pulumi ESC (beanflows/prod) first" + exit 1 +fi + +REPO_URL="https://gitlab-ci-token:${GITLAB_READ_TOKEN}@gitlab.com/${GITLAB_PROJECT}.git" + +echo "--- Installing system dependencies ---" +apt-get update +apt-get install -y git curl python3-pip + +echo "--- Installing uv ---" +if ! command -v uv &> /dev/null; then + curl -LsSf https://astral.sh/uv/install.sh | sh + export PATH="$HOME/.cargo/bin:$PATH" + echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> /root/.bashrc +fi + +echo "--- Installing Pulumi ESC ---" +if ! command -v esc &> /dev/null; then + curl -fsSL https://get.pulumi.com/esc/install.sh | sh + export PATH="$HOME/.pulumi/bin:$PATH" + echo 'export PATH="$HOME/.pulumi/bin:$PATH"' >> /root/.bashrc +fi + +echo "--- Setting up Pulumi ESC authentication ---" +if [ -z "${PULUMI_ACCESS_TOKEN:-}" ]; then + echo "ERROR: PULUMI_ACCESS_TOKEN environment variable not set" + echo "Please set it before running this script:" + echo " export PULUMI_ACCESS_TOKEN=" + exit 1 +fi + +esc login --token "$PULUMI_ACCESS_TOKEN" + +echo "--- Loading secrets from Pulumi ESC ---" +eval $(esc env open beanflows/prod --format shell) + +echo "--- Cloning repository ---" +if [ -d "$REPO_DIR" ]; then + echo "Repository already exists, pulling latest..." + cd "$REPO_DIR" + git pull origin master +else + git clone "$REPO_URL" "$REPO_DIR" + cd "$REPO_DIR" +fi + +echo "--- Installing Python dependencies ---" +uv sync + +echo "--- Creating environment file ---" +cat > "$REPO_DIR/.env" < + +# Production +sqlmesh plan prod + +# Run tests +sqlmesh test + +# Format SQL +sqlmesh format +``` + +## Architecture + +### Gateway Configuration + +**Single Gateway:** All environments connect to Cloudflare R2 Data Catalog (Apache Iceberg) +- **Production:** `sqlmesh plan prod` +- **Development:** `sqlmesh plan dev_` (isolated virtual environment) + +SQLMesh manages environment isolation automatically - no need for separate local databases. + +### 4-Layer Data Model + +See `models/README.md` for detailed architecture documentation: + +1. **Raw** - Immutable source data +2. **Staging** - Schema, types, basic cleansing +3. **Cleaned** - Business logic, integration +4. **Serving** - Analytics-ready (facts, dimensions, aggregates) + +## Configuration + +**Config:** `config.yaml` +- DuckDB in-memory with R2 Iceberg catalog +- Extensions: httpfs, iceberg +- Auto-apply enabled (no prompts) +- Initialization hooks for R2 secret/catalog attachment + +## Commands + +```bash +# Plan changes for dev environment +sqlmesh plan dev_yourname + +# Plan changes for prod +sqlmesh plan prod + +# Run tests +sqlmesh test + +# Validate models +sqlmesh validate + +# Run audits +sqlmesh audit + +# Format SQL files +sqlmesh format + +# Start web UI +sqlmesh ui +``` + +## Environment Variables (Prod) + +Required for production R2 Iceberg catalog: +- `CLOUDFLARE_API_TOKEN` - R2 API token +- `ICEBERG_REST_URI` - R2 catalog REST endpoint +- `R2_WAREHOUSE_NAME` - Warehouse name (default: "materia") + +These are injected via Pulumi ESC (`beanflows/prod`) on the supervisor instance. + +## Development Workflow + +1. Make changes to models in `models/` +2. Test locally: `sqlmesh test` +3. Plan changes: `sqlmesh plan dev_yourname` +4. Review and apply changes +5. Commit and push to trigger CI/CD + +SQLMesh will handle environment isolation, table versioning, and incremental updates automatically. diff --git a/transform/sqlmesh_materia/config.yaml b/transform/sqlmesh_materia/config.yaml index 92c6d0f..7f5634d 100644 --- a/transform/sqlmesh_materia/config.yaml +++ b/transform/sqlmesh_materia/config.yaml @@ -1,18 +1,8 @@ # --- Gateway Connection --- +# Single gateway connecting to R2 Iceberg catalog +# Local dev uses virtual environments (e.g., dev_) +# Production uses the 'prod' environment gateways: - - dev: - connection: - # For more information on configuring the connection to your execution engine, visit: - # https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#connection - # https://sqlmesh.readthedocs.io/en/stable/integrations/engines/duckdb/#connection-options - type: duckdb - database: materia_dev.db - extensions: - - name: zipfs - - name: httpfs - - name: iceberg - prod: connection: type: duckdb @@ -20,20 +10,26 @@ gateways: extensions: - name: httpfs - name: iceberg - init_script: | - CREATE SECRET IF NOT EXISTS r2_secret ( - TYPE ICEBERG, - TOKEN '{{ env_var("CLOUDFLARE_API_TOKEN") }}' - ); - ATTACH '{{ env_var("R2_WAREHOUSE_NAME", "materia") }}' AS catalog ( - TYPE ICEBERG, - ENDPOINT '{{ env_var("ICEBERG_REST_URI") }}' - ); - CREATE SCHEMA IF NOT EXISTS catalog.materia; - USE catalog.materia; - -default_gateway: dev +default_gateway: prod + +# --- Hooks --- +# Run initialization SQL before all plans/runs +# https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#execution-hooks + +before_all: + - | + CREATE SECRET IF NOT EXISTS r2_secret ( + TYPE ICEBERG, + TOKEN '@env_var("CLOUDFLARE_API_TOKEN")' + ) + - | + ATTACH '@env_var("R2_WAREHOUSE_NAME", "materia")' AS catalog ( + TYPE ICEBERG, + ENDPOINT '@env_var("ICEBERG_REST_URI")' + ) + - CREATE SCHEMA IF NOT EXISTS catalog.materia + - USE catalog.materia # --- Model Defaults --- # https://sqlmesh.readthedocs.io/en/stable/reference/model_configuration/#model-defaults