Refactor to local-first architecture on Hetzner NVMe

Remove distributed R2/Iceberg/SSH pipeline architecture in favor of
local subprocess execution with NVMe storage. Landing data backed up
to R2 via rclone timer.

- Strip Iceberg catalog, httpfs, boto3, paramiko, prefect, pyarrow
- Pipelines run via subprocess.run() with bounded timeouts
- Extract writes to {LANDING_DIR}/psd/{year}/{month}/{etag}.csv.gzip
- SQLMesh reads LANDING_DIR variable, writes to DUCKDB_PATH
- Delete unused provider stubs (ovh, scaleway, oracle)
- Add rclone systemd timer for R2 backup every 6h
- Update supervisor to run pipelines with env vars

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-18 18:05:41 +01:00
parent 910424c956
commit c1d00dcdc4
25 changed files with 231 additions and 1807 deletions

View File

@@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
## Project Overview
Materia is a commodity data analytics platform (product: **BeanFlows.coffee**) for coffee traders. It's a uv workspace monorepo with three packages: extraction (USDA PSD data), SQL transformation (SQLMesh + DuckDB), and a CLI for orchestrating cloud workers and pipelines.
Materia is a commodity data analytics platform (product: **BeanFlows.coffee**) for coffee traders. It's a uv workspace monorepo with three packages: extraction (USDA PSD data), SQL transformation (SQLMesh + DuckDB), and a CLI for worker management and local pipeline execution.
## Commands
@@ -25,7 +25,7 @@ cd transform/sqlmesh_materia && uv run sqlmesh test # SQLMesh model tests
uv run pytest tests/test_cli.py::test_name -v
# Extract data
uv run extract_psd
LANDING_DIR=data/landing uv run extract_psd
# SQLMesh (from repo root)
uv run sqlmesh -p transform/sqlmesh_materia plan # Plans to dev_<username> by default
@@ -33,43 +33,45 @@ uv run sqlmesh -p transform/sqlmesh_materia plan prod # Production
uv run sqlmesh -p transform/sqlmesh_materia test # Run model tests
uv run sqlmesh -p transform/sqlmesh_materia format # Format SQL
# With production secrets
esc run beanflows/prod -- <command>
# CLI
uv run materia pipeline run extract|transform
uv run materia pipeline list
uv run materia worker create|destroy|list
uv run materia pipeline run
uv run materia secrets get
```
## Architecture
**Workspace packages** (`pyproject.toml``tool.uv.workspace`):
- `extract/psdonline/` — Downloads USDA PSD Online data, normalizes ZIP→gzip CSV, uploads to R2
- `transform/sqlmesh_materia/` — 4-layer SQL transformation pipeline (DuckDB + Iceberg)
- `src/materia/` — CLI (Typer) for worker management, pipeline orchestration, secrets
- `extract/psdonline/` — Downloads USDA PSD Online data, normalizes ZIP→gzip CSV, writes to local landing directory
- `transform/sqlmesh_materia/` — 4-layer SQL transformation pipeline (local DuckDB)
- `src/materia/` — CLI (Typer) for pipeline execution, worker management, secrets
- `web/` — Future web frontend
**Data flow:**
```
USDA API → extract (psdonline) → R2/local CSV → SQLMesh transforms → DuckDB/Iceberg
USDA API → extract → /data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip
→ rclone cron syncs landing/ to R2
→ SQLMesh raw → staging → cleaned → serving → /data/materia/lakehouse.duckdb
→ Web app reads lakehouse.duckdb (read-only)
```
**SQLMesh 4-layer model structure** (`transform/sqlmesh_materia/models/`):
1. `raw/` — Immutable source reads (read_csv from extracted files)
1. `raw/` — Immutable source reads (read_csv from landing directory)
2. `staging/` — Type casting, lookup joins, basic cleansing
3. `cleaned/` — Business logic, pivoting, integration
4. `serving/` — Analytics-ready facts, dimensions, aggregates
**CLI modules** (`src/materia/`):
- `cli.py` — Typer app with subcommands: worker, pipeline, secrets, version
- `workers.py`Ephemeral cloud instance management (Hetzner, with planned OVH/Scaleway/Oracle)
- `pipelines.py`SSH-based pipeline execution on workers (download artifact, run, destroy)
- `workers.py`Hetzner cloud instance management (for ad-hoc compute)
- `pipelines.py`Local subprocess pipeline execution with bounded timeouts
- `secrets.py` — Pulumi ESC integration for environment secrets
**Infrastructure** (`infra/`):
- Pulumi IaC for Cloudflare R2 buckets and Hetzner compute
- Supervisor systemd service for always-on orchestration (pulls git every 15 min)
- Supervisor systemd service for always-on orchestration (pulls git, runs pipelines)
- rclone systemd timer for landing data backup to R2
## Coding Philosophy
@@ -87,7 +89,14 @@ Read `coding_philosophy.md` for the full guide. Key points:
- **Python 3.13** (`.python-version`)
- **Ruff**: double quotes, spaces, E501 ignored (formatter handles line length)
- **SQLMesh**: DuckDB dialect, `@daily` cron, start date `2025-07-07`, default env `dev_{{ user() }}`
- **Storage**: Cloudflare R2 with Iceberg catalog (zero egress cost)
- **Storage**: Local NVMe (`LANDING_DIR`, `DUCKDB_PATH`), R2 for backup via rclone
- **Secrets**: Pulumi ESC (`esc run beanflows/prod -- <cmd>`)
- **CI**: GitLab CI (`.gitlab/.gitlab-ci.yml`) — runs pytest and sqlmesh test on push/MR
- **Pre-commit hooks**: installed via `pre-commit install`
## Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `LANDING_DIR` | `data/landing` | Root directory for extracted landing data |
| `DUCKDB_PATH` | `local.duckdb` | Path to the DuckDB lakehouse database |

View File

@@ -2,16 +2,13 @@
name = "psdonline"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
authors = [
{ name = "Deeman", email = "hendriknote@gmail.com" }
]
requires-python = ">=3.13"
dependencies = [
"boto3>=1.40.55",
"niquests>=3.14.1",
"pendulum>=3.1.0",
]
[project.scripts]
extract_psd = "psdonline.execute:extract_psd_dataset"

View File

@@ -5,63 +5,32 @@ import pathlib
import sys
from datetime import datetime
import boto3
import niquests
from botocore.exceptions import ClientError
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.StreamHandler(sys.stdout)
]
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("PSDOnline Extractor")
OUTPUT_DIR = pathlib.Path(__file__).parent / "data"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
logger.info(f"Output dir: {OUTPUT_DIR}")
# R2 configuration from environment
R2_ENDPOINT = os.getenv('R2_ENDPOINT')
R2_BUCKET = os.getenv('R2_BUCKET')
R2_ACCESS_KEY = os.getenv('R2_ACCESS_KEY') or os.getenv('R2_ADMIN_ACCESS_KEY_ID')
R2_SECRET_KEY = os.getenv('R2_SECRET_KEY') or os.getenv('R2_ADMIN_SECRET_ACCESS_KEY')
LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing"))
LANDING_DIR.mkdir(parents=True, exist_ok=True)
logger.info(f"Landing dir: {LANDING_DIR}")
PSD_HISTORICAL_URL = "https://apps.fas.usda.gov/psdonline/downloads/archives/{year}/{month:02d}/psd_alldata_csv.zip"
FIRST_YEAR = 2006
FIRST_MONTH = 8
def check_r2_file_exists(etag: str, s3_client) -> bool:
"""Check if file exists in R2."""
r2_key = f"landing/psd/{etag}.csv.gzip"
try:
s3_client.head_object(Bucket=R2_BUCKET, Key=r2_key)
logger.info(f"File {r2_key} already exists in R2, skipping")
return True
except ClientError as e:
if e.response['Error']['Code'] == '404':
return False
raise
HTTP_TIMEOUT_SECONDS = 60
def upload_to_r2(content: bytes, etag: str, s3_client):
"""Upload file content to R2."""
r2_key = f"landing/psd/{etag}.csv.gzip"
logger.info(f"Uploading to R2: {r2_key}")
s3_client.put_object(Bucket=R2_BUCKET, Key=r2_key, Body=content)
logger.info("Upload complete")
def extract_psd_file(url: str, extract_to_path: pathlib.Path, http_session: niquests.Session, s3_client=None):
"""
Extract PSD file either to local storage or R2.
If s3_client is provided, uploads to R2 only (no local storage).
If s3_client is None, downloads to local storage.
"""
def extract_psd_file(url: str, year: int, month: int, http_session: niquests.Session):
"""Extract PSD file to local year/month subdirectory."""
logger.info(f"Requesting file {url} ...")
response = http_session.head(url)
response = http_session.head(url, timeout=HTTP_TIMEOUT_SECONDS)
if response.status_code == 404:
logger.error("File doesn't exist on server, received status code 404 Not Found")
return
@@ -69,55 +38,31 @@ def extract_psd_file(url: str, extract_to_path: pathlib.Path, http_session: niqu
logger.error(f"Status code not ok, STATUS={response.status_code}")
return
etag = response.headers.get("etag").replace('"',"").replace(":","_")
etag = response.headers.get("etag", "").replace('"', "").replace(":", "_")
assert etag, "USDA response missing etag header"
# R2 mode: check R2 and upload if needed
if s3_client:
if check_r2_file_exists(etag, s3_client):
return
response = http_session.get(url)
normalized_content = normalize_zipped_csv(response.content)
upload_to_r2(normalized_content, etag, s3_client)
return
# Local mode: check local and download if needed
extract_to_path = LANDING_DIR / "psd" / str(year) / f"{month:02d}"
local_file = extract_to_path / f"{etag}.csv.gzip"
if local_file.exists():
logger.info(f"File {etag}.zip already exists locally, skipping")
logger.info(f"File {etag}.csv.gzip already exists locally, skipping")
return
response = http_session.get(url)
response = http_session.get(url, timeout=HTTP_TIMEOUT_SECONDS)
logger.info(f"Storing file to {local_file}")
extract_to_path.mkdir(parents=True, exist_ok=True)
normalized_content = normalize_zipped_csv(response.content)
local_file.write_bytes(normalized_content)
assert local_file.exists(), f"File was not written: {local_file}"
logger.info("Download complete")
def extract_psd_dataset():
today = datetime.now()
# Check if R2 credentials are configured
use_r2 = all([R2_ENDPOINT, R2_BUCKET, R2_ACCESS_KEY, R2_SECRET_KEY])
if use_r2:
logger.info("R2 credentials found, uploading to R2")
s3_client = boto3.client(
's3',
endpoint_url=R2_ENDPOINT,
aws_access_key_id=R2_ACCESS_KEY,
aws_secret_access_key=R2_SECRET_KEY
)
else:
logger.info("R2 credentials not found, downloading to local storage")
s3_client = None
# Try current month and previous 3 months (USDA data is published with lag)
with niquests.Session() as session:
for months_back in range(4):
year = today.year
month = today.month - months_back
# Handle year rollover
while month < 1:
month += 12
year -= 1
@@ -125,11 +70,10 @@ def extract_psd_dataset():
url = PSD_HISTORICAL_URL.format(year=year, month=month)
logger.info(f"Trying {year}-{month:02d}...")
# Check if URL exists
response = session.head(url)
response = session.head(url, timeout=HTTP_TIMEOUT_SECONDS)
if response.status_code == 200:
logger.info(f"Found latest data at {year}-{month:02d}")
extract_psd_file(url=url, http_session=session, extract_to_path=OUTPUT_DIR, s3_client=s3_client)
extract_psd_file(url=url, year=year, month=month, http_session=session)
return
elif response.status_code == 404:
logger.info(f"Month {year}-{month:02d} not found, trying earlier...")
@@ -141,5 +85,3 @@ def extract_psd_dataset():
if __name__ == "__main__":
extract_psd_dataset()

View File

@@ -0,0 +1,9 @@
[Unit]
Description=Materia Landing Data Backup to R2
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
ExecStart=/usr/bin/rclone sync /data/materia/landing/ r2:materia-raw/landing/ --log-level INFO
TimeoutStartSec=1800

View File

@@ -0,0 +1,10 @@
[Unit]
Description=Materia Landing Data Backup Timer
[Timer]
OnCalendar=*-*-* 00/6:00:00
RandomizedDelaySec=300
Persistent=true
[Install]
WantedBy=timers.target

View File

@@ -0,0 +1,14 @@
# Cloudflare R2 remote for landing data backup
# Copy to /root/.config/rclone/rclone.conf and fill in credentials
#
# Get credentials from: Cloudflare Dashboard → R2 → Manage R2 API Tokens
# Or from Pulumi ESC: esc env open beanflows/prod --format shell
[r2]
type = s3
provider = Cloudflare
access_key_id = <R2_ACCESS_KEY_ID>
secret_access_key = <R2_SECRET_ACCESS_KEY>
endpoint = https://<CLOUDFLARE_ACCOUNT_ID>.r2.cloudflarestorage.com
acl = private
no_check_bucket = true

View File

@@ -79,6 +79,9 @@ else
cd "$REPO_DIR"
fi
echo "--- Creating data directories ---"
mkdir -p /data/materia/landing/psd
echo "--- Installing Python dependencies ---"
uv sync
@@ -88,6 +91,8 @@ cat > "$REPO_DIR/.env" <<EOF
# Loaded from Pulumi ESC: beanflows/prod
PULUMI_ACCESS_TOKEN=${PULUMI_ACCESS_TOKEN}
PATH=/root/.cargo/bin:/root/.pulumi/bin:/usr/local/bin:/usr/bin:/bin
LANDING_DIR=/data/materia/landing
DUCKDB_PATH=/data/materia/lakehouse.duckdb
EOF
echo "--- Setting up systemd service ---"

View File

@@ -1,80 +0,0 @@
services:
postgres:
image: postgres:14
environment:
POSTGRES_USER: prefect
POSTGRES_PASSWORD: prefect
POSTGRES_DB: prefect
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U prefect"]
interval: 5s
timeout: 5s
retries: 5
dragonfly:
image: 'docker.dragonflydb.io/dragonflydb/dragonfly'
ulimits:
memlock: -1
volumes:
- dragonflydata:/data
healthcheck:
test: ["CMD-SHELL", "redis-cli ping"]
interval: 5s
timeout: 5s
retries: 5
prefect-server:
image: prefecthq/prefect:3-latest
depends_on:
postgres:
condition: service_healthy
dragonfly:
condition: service_healthy
environment:
PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:prefect@postgres:5432/prefect
PREFECT_SERVER_API_HOST: 0.0.0.0
PREFECT_UI_API_URL: http://localhost:4200/api
PREFECT_MESSAGING_BROKER: prefect_redis.messaging
PREFECT_MESSAGING_CACHE: prefect_redis.messaging
PREFECT_REDIS_MESSAGING_HOST: dragonfly
PREFECT_REDIS_MESSAGING_PORT: 6379
PREFECT_REDIS_MESSAGING_DB: 0
command: prefect server start --no-services
ports:
- "4200:4200"
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request as u; u.urlopen('http://localhost:4200/api/health', timeout=1)"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
prefect-services:
image: prefecthq/prefect:3-latest
depends_on:
prefect-server:
condition: service_healthy
environment:
PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:prefect@postgres:5432/prefect
PREFECT_MESSAGING_BROKER: prefect_redis.messaging
PREFECT_MESSAGING_CACHE: prefect_redis.messaging
PREFECT_REDIS_MESSAGING_HOST: dragonfly
PREFECT_REDIS_MESSAGING_PORT: 6379
PREFECT_REDIS_MESSAGING_DB: 0
command: prefect server services start
prefect-worker:
image: prefecthq/prefect:3-latest
depends_on:
prefect-server:
condition: service_healthy
environment:
PREFECT_API_URL: http://prefect-server:4200/api
command: prefect worker start --pool local-pool
restart: on-failure
volumes:
postgres_data:
dragonflydata:

View File

@@ -1,161 +1,85 @@
# Materia Infrastructure
Pulumi-managed infrastructure for BeanFlows.coffee
Single-server local-first setup for BeanFlows.coffee on Hetzner NVMe.
## Stack Overview
## Architecture
- **Storage:** Cloudflare R2 buckets with Iceberg Data Catalog
- **Compute:** Hetzner Cloud CCX dedicated vCPU instances
- **Orchestration:** Custom Python scheduler (see `src/orchestrator/`)
```
Hetzner Server (NVMe)
├── /opt/materia/ # Git repo, code, uv environment
├── /data/materia/landing/ # Extracted USDA data (year/month subdirs)
├── /data/materia/lakehouse.duckdb # SQLMesh output database
└── systemd services:
├── materia-supervisor # Pulls git, runs extract + transform daily
└── materia-backup.timer # Syncs landing/ to R2 every 6 hours
```
## Prerequisites
## Data Flow
1. **Cloudflare Account**
- Sign up at https://dash.cloudflare.com
- Create API token with R2 + Data Catalog permissions
- Get your Account ID from dashboard
1. **Extract**: USDA API → `/data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip`
2. **Transform**: SQLMesh reads landing CSVs → writes to `/data/materia/lakehouse.duckdb`
3. **Backup**: rclone syncs `/data/materia/landing/` → R2 `materia-raw/landing/`
4. **Web**: Reads `lakehouse.duckdb` (read-only)
2. **Hetzner Cloud Account**
- Sign up at https://console.hetzner.cloud
- Create API token with Read & Write permissions
## Setup
3. **Pulumi Account** (optional, can use local state)
- Sign up at https://app.pulumi.com
- Or use local state with `pulumi login --local`
### Prerequisites
4. **SSH Key**
- Generate if needed: `ssh-keygen -t ed25519 -C "materia-deploy"`
- Hetzner server with NVMe storage
- Pulumi ESC configured (`beanflows/prod` environment)
- `GITLAB_READ_TOKEN` and `PULUMI_ACCESS_TOKEN` set
## Initial Setup
### Bootstrap
```bash
# From local machine or CI:
ssh root@<server_ip> 'bash -s' < infra/bootstrap_supervisor.sh
```
This installs dependencies, clones the repo, creates data directories, and starts the supervisor service.
### R2 Backup
1. Install rclone: `apt install rclone`
2. Copy and configure: `cp infra/backup/rclone.conf.example /root/.config/rclone/rclone.conf`
3. Fill in R2 credentials from Pulumi ESC
4. Install systemd units:
```bash
cp infra/backup/materia-backup.service /etc/systemd/system/
cp infra/backup/materia-backup.timer /etc/systemd/system/
systemctl daemon-reload
systemctl enable --now materia-backup.timer
```
## Pulumi IaC
Still manages Cloudflare R2 buckets and can provision Hetzner instances:
```bash
cd infra
# Login to Pulumi (local or cloud)
pulumi login # or: pulumi login --local
# Initialize the stack
pulumi stack init dev
# Configure secrets
pulumi config set --secret cloudflare:apiToken <your-cloudflare-token>
pulumi config set cloudflare_account_id <your-account-id>
pulumi config set --secret hcloud:token <your-hetzner-token>
pulumi config set --secret ssh_public_key "$(cat ~/.ssh/id_ed25519.pub)"
# Preview changes
pulumi preview
# Deploy infrastructure
pulumi login
pulumi stack select prod
pulumi up
```
## What Gets Provisioned
### Cloudflare R2 Buckets
1. **materia-raw** - Raw data from extraction (immutable archives)
2. **materia-lakehouse** - Iceberg tables for SQLMesh (ACID transactions)
### Hetzner Cloud Servers
1. **materia-scheduler** (CCX12: 2 vCPU, 8GB RAM)
- Runs cron scheduler
- Lightweight orchestration tasks
- Always-on, low cost (~€6/mo)
2. **materia-worker-01** (CCX22: 4 vCPU, 16GB RAM)
- Heavy SQLMesh transformations
- Can be stopped when not in use
- Scale up to CCX32/CCX42 for larger workloads (~€24-90/mo)
3. **materia-firewall**
- SSH access (port 22)
- All outbound traffic allowed
- No inbound HTTP/HTTPS (we're not running web services yet)
## Enabling R2 Data Catalog (Iceberg)
As of October 2025, R2 Data Catalog is in public beta. Enable it manually:
1. Go to Cloudflare Dashboard → R2
2. Select the `materia-lakehouse` bucket
3. Navigate to Settings → Data Catalog
4. Click "Enable Data Catalog"
Once enabled, you can connect DuckDB to the Iceberg REST catalog:
```python
import duckdb
# Get catalog URI from Pulumi outputs
# pulumi stack output duckdb_r2_config
conn = duckdb.connect()
conn.execute("INSTALL iceberg; LOAD iceberg;")
conn.execute(f"""
ATTACH 'iceberg_rest://catalog.cloudflarestorage.com/<account_id>/r2-data-catalog'
AS lakehouse (
TYPE ICEBERG_REST,
SECRET '<r2_api_token>'
);
""")
```
## Server Access
Get server IPs from Pulumi outputs:
## Monitoring
```bash
pulumi stack output scheduler_ip
pulumi stack output worker_ip
# Supervisor status and logs
systemctl status materia-supervisor
journalctl -u materia-supervisor -f
# Backup timer status
systemctl list-timers materia-backup.timer
journalctl -u materia-backup -f
```
SSH into servers:
```bash
ssh root@<scheduler_ip>
ssh root@<worker_ip>
```
## Cost Estimates (Monthly)
## Cost
| Resource | Type | Cost |
|----------|------|------|
| R2 Storage | 10 GB | $0.15 |
| R2 Operations | 1M reads | $0.36 |
| R2 Egress | Unlimited | $0.00 (zero egress!) |
| Scheduler | CCX12 | €6.00 |
| Worker (on-demand) | CCX22 | €24.00 |
| **Total** | | **~€30/mo (~$33)** |
Compare to AWS equivalent: ~$300-500/mo with S3 + EC2 + egress fees.
## Scaling Workers
To add more worker capacity or different instance sizes:
1. Edit `infra/__main__.py` to add new server resources
2. Update worker config in `src/orchestrator/workers.yaml`
3. Run `pulumi up` to provision
Example worker sizes:
- CCX12: 2 vCPU, 8GB RAM (light workloads)
- CCX22: 4 vCPU, 16GB RAM (medium workloads)
- CCX32: 8 vCPU, 32GB RAM (heavy workloads)
- CCX42: 16 vCPU, 64GB RAM (very heavy workloads)
## Destroying Infrastructure
```bash
cd infra
pulumi destroy
```
**Warning:** This will delete all buckets and servers. Backup data first!
## Next Steps
1. Deploy orchestrator to scheduler server (see `src/orchestrator/README.md`)
2. Configure SQLMesh to use R2 lakehouse (see `transform/sqlmesh_materia/config.yaml`)
3. Set up CI/CD pipeline to deploy on push (see `.gitlab-ci.yml`)
| Hetzner Server | CCX22 (4 vCPU, 16GB) | ~€24/mo |
| R2 Storage | Backup (~10 GB) | $0.15/mo |
| R2 Egress | Zero | $0.00 |
| **Total** | | **~€24/mo (~$26)** |

View File

@@ -11,6 +11,8 @@ ExecStart=/opt/materia/infra/supervisor/supervisor.sh
Restart=always
RestartSec=10
EnvironmentFile=/opt/materia/.env
Environment=LANDING_DIR=/data/materia/landing
Environment=DUCKDB_PATH=/data/materia/lakehouse.duckdb
# Resource limits
LimitNOFILE=65536

View File

@@ -24,9 +24,14 @@ do
git switch --discard-changes --detach origin/master
uv sync
# Run pipelines (SQLMesh handles scheduling)
#uv run materia pipeline run extract
#uv run materia pipeline run transform
# Run pipelines
LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
uv run materia pipeline run extract
LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
uv run materia pipeline run transform
) || sleep 600 # Sleep 10 min on failure to avoid busy-loop retries
done

View File

@@ -9,14 +9,11 @@ authors = [
]
requires-python = ">=3.13"
dependencies = [
"pyarrow>=20.0.0",
"python-dotenv>=1.1.0",
"typer>=0.15.0",
"paramiko>=3.5.0",
"pyyaml>=6.0.2",
"niquests>=3.15.2",
"hcloud>=2.8.0",
"prefect>=3.6.15",
]
[project.scripts]
@@ -130,4 +127,5 @@ force-single-line = false
# Allow print statements and other rules in scripts
"scripts/*" = ["T201"]
[tool.pytest.ini_options]
testpaths = ["tests"]

View File

@@ -80,15 +80,12 @@ app.add_typer(pipeline_app, name="pipeline")
@pipeline_app.command("run")
def pipeline_run(
name: Annotated[str, typer.Argument(help="Pipeline name (extract, transform)")],
worker_type: Annotated[str | None, typer.Option("--worker", "-w")] = None,
provider: Annotated[str, typer.Option("--provider", "-p")] = "hetzner",
keep: Annotated[bool, typer.Option("--keep", help="Keep worker after completion")] = False,
):
"""Run a pipeline on an ephemeral worker."""
"""Run a pipeline locally."""
from materia.pipelines import run_pipeline
typer.echo(f"Running pipeline '{name}'...")
result = run_pipeline(name, worker_type, auto_destroy=not keep, provider=provider)
result = run_pipeline(name)
if result.success:
typer.echo(result.output)
@@ -105,7 +102,8 @@ def pipeline_list():
typer.echo("Available pipelines:")
for name, config in PIPELINES.items():
typer.echo(f"{name:<15} (worker: {config.worker_type}, artifact: {config.artifact})")
cmd = " ".join(config["command"])
typer.echo(f"{name:<15} (command: {cmd}, timeout: {config['timeout_seconds']}s)")
secrets_app = typer.Typer(help="Manage secrets via Pulumi ESC")

View File

@@ -1,21 +1,8 @@
"""Pipeline execution on ephemeral workers."""
"""Pipeline execution via local subprocess."""
import contextlib
import subprocess
from dataclasses import dataclass
import paramiko
from materia.secrets import get_secret
from materia.workers import create_worker, destroy_worker
@dataclass
class PipelineConfig:
worker_type: str
artifact: str
command: str
secrets: list[str]
@dataclass
class PipelineResult:
@@ -25,56 +12,20 @@ class PipelineResult:
PIPELINES = {
"extract": PipelineConfig(
worker_type="ccx12",
artifact="materia-extract-latest.tar.gz",
command="./extract_psd",
secrets=["R2_ACCESS_KEY_ID", "R2_SECRET_ACCESS_KEY", "R2_ENDPOINT", "R2_ARTIFACTS_BUCKET"],
),
"transform": PipelineConfig(
worker_type="ccx22",
artifact="materia-transform-latest.tar.gz",
command="cd sqlmesh_materia && ./sqlmesh plan prod",
secrets=[
"CLOUDFLARE_API_TOKEN",
"ICEBERG_REST_URI",
"R2_WAREHOUSE_NAME",
],
),
"extract": {
"command": ["uv", "run", "--package", "psdonline", "extract_psd"],
"timeout_seconds": 1800,
},
"transform": {
"command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"],
"timeout_seconds": 3600,
},
}
def _execute_ssh_command(ip: str, command: str, env_vars: dict[str, str]) -> tuple[str, str, int]:
ssh_key_path = get_secret("SSH_PRIVATE_KEY_PATH")
if not ssh_key_path:
raise ValueError("SSH_PRIVATE_KEY_PATH not found in secrets")
def run_pipeline(pipeline_name: str) -> PipelineResult:
assert pipeline_name, "pipeline_name must not be empty"
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
pkey = paramiko.RSAKey.from_private_key_file(ssh_key_path)
client.connect(ip, username="root", pkey=pkey)
env_string = " ".join([f"export {k}='{v}' &&" for k, v in env_vars.items()])
full_command = f"{env_string} {command}" if env_vars else command
stdin, stdout, stderr = client.exec_command(full_command)
exit_code = stdout.channel.recv_exit_status()
output = stdout.read().decode()
error = stderr.read().decode()
client.close()
return output, error, exit_code
def run_pipeline(
pipeline_name: str,
worker_type: str | None = None,
auto_destroy: bool = True,
provider: str = "hetzner",
) -> PipelineResult:
if pipeline_name not in PIPELINES:
return PipelineResult(
success=False,
@@ -82,58 +33,24 @@ def run_pipeline(
error=f"Unknown pipeline: {pipeline_name}. Available: {', '.join(PIPELINES.keys())}",
)
pipeline_config = PIPELINES[pipeline_name]
worker_type = worker_type or pipeline_config.worker_type
worker_name = f"materia-{pipeline_name}-worker"
pipeline = PIPELINES[pipeline_name]
timeout_seconds = pipeline["timeout_seconds"]
r2_bucket = get_secret("R2_ARTIFACTS_BUCKET") or "materia-artifacts"
r2_endpoint = get_secret("R2_ENDPOINT")
if not r2_endpoint:
try:
result = subprocess.run(
pipeline["command"],
capture_output=True,
text=True,
timeout=timeout_seconds,
)
return PipelineResult(
success=result.returncode == 0,
output=result.stdout,
error=result.stderr if result.returncode != 0 else None,
)
except subprocess.TimeoutExpired:
return PipelineResult(
success=False,
output="",
error="R2_ENDPOINT not configured in secrets",
error=f"Pipeline '{pipeline_name}' timed out after {timeout_seconds} seconds",
)
try:
worker = create_worker(worker_name, worker_type, provider)
artifact_url = f"https://{r2_endpoint}/{r2_bucket}/{pipeline_config.artifact}"
bootstrap_commands = [
f"curl -fsSL -o artifact.tar.gz {artifact_url}",
"tar -xzf artifact.tar.gz",
"chmod +x -R .",
]
for cmd in bootstrap_commands:
_, error, exit_code = _execute_ssh_command(worker.ip, cmd, {})
if exit_code != 0:
return PipelineResult(
success=False,
output="",
error=f"Bootstrap failed: {error}",
)
env_vars = {}
for secret_key in pipeline_config.secrets:
value = get_secret(secret_key)
if value:
env_vars[secret_key] = value
command = pipeline_config.command
output, error, exit_code = _execute_ssh_command(worker.ip, command, env_vars)
success = exit_code == 0
return PipelineResult(
success=success,
output=output,
error=error if not success else None,
)
finally:
if auto_destroy:
with contextlib.suppress(Exception):
destroy_worker(worker_name, provider)

View File

@@ -1,7 +1,6 @@
"""Cloud provider abstraction for worker management."""
"""Cloud provider for worker management."""
from dataclasses import dataclass
from typing import Protocol
@dataclass
@@ -14,35 +13,10 @@ class Instance:
type: str
class ProviderModule(Protocol):
def create_instance(
self: str,
instance_type: str,
ssh_key: str,
location: str | None = None,
) -> Instance: ...
def destroy_instance(self: str) -> None: ...
def list_instances(self: str | None = None) -> list[Instance]: ...
def get_instance(self: str) -> Instance | None: ...
def wait_for_ssh(self: str, timeout: int = 300) -> bool: ...
def get_provider(provider_name: str) -> ProviderModule:
def get_provider(provider_name: str):
if provider_name == "hetzner":
from materia.providers import hetzner
return hetzner
elif provider_name == "ovh":
from materia.providers import ovh
return ovh
elif provider_name == "scaleway":
from materia.providers import scaleway
return scaleway
elif provider_name == "oracle":
from materia.providers import oracle
return oracle
else:
raise ValueError(f"Unknown provider: {provider_name}")

View File

@@ -1,28 +0,0 @@
"""Oracle Cloud provider implementation."""
from materia.providers import Instance
def create_instance(
name: str,
instance_type: str,
ssh_key: str,
location: str | None = None,
) -> Instance:
raise NotImplementedError("Oracle Cloud provider not yet implemented")
def destroy_instance(instance_id: str) -> None:
raise NotImplementedError("Oracle Cloud provider not yet implemented")
def list_instances(label: str | None = None) -> list[Instance]:
raise NotImplementedError("Oracle Cloud provider not yet implemented")
def get_instance(name: str) -> Instance | None:
raise NotImplementedError("Oracle Cloud provider not yet implemented")
def wait_for_ssh(ip: str, timeout: int = 300) -> bool:
raise NotImplementedError("Oracle Cloud provider not yet implemented")

View File

@@ -1,28 +0,0 @@
"""OVH Cloud provider implementation."""
from materia.providers import Instance
def create_instance(
name: str,
instance_type: str,
ssh_key: str,
location: str | None = None,
) -> Instance:
raise NotImplementedError("OVH provider not yet implemented")
def destroy_instance(instance_id: str) -> None:
raise NotImplementedError("OVH provider not yet implemented")
def list_instances(label: str | None = None) -> list[Instance]:
raise NotImplementedError("OVH provider not yet implemented")
def get_instance(name: str) -> Instance | None:
raise NotImplementedError("OVH provider not yet implemented")
def wait_for_ssh(ip: str, timeout: int = 300) -> bool:
raise NotImplementedError("OVH provider not yet implemented")

View File

@@ -1,28 +0,0 @@
"""Scaleway provider implementation."""
from materia.providers import Instance
def create_instance(
name: str,
instance_type: str,
ssh_key: str,
location: str | None = None,
) -> Instance:
raise NotImplementedError("Scaleway provider not yet implemented")
def destroy_instance(instance_id: str) -> None:
raise NotImplementedError("Scaleway provider not yet implemented")
def list_instances(label: str | None = None) -> list[Instance]:
raise NotImplementedError("Scaleway provider not yet implemented")
def get_instance(name: str) -> Instance | None:
raise NotImplementedError("Scaleway provider not yet implemented")
def wait_for_ssh(ip: str, timeout: int = 300) -> bool:
raise NotImplementedError("Scaleway provider not yet implemented")

View File

@@ -13,16 +13,9 @@ def mock_esc_env(tmp_path):
return {
"HETZNER_API_TOKEN": "test-hetzner-token",
"R2_ACCESS_KEY_ID": "test-r2-key",
"R2_SECRET_ACCESS_KEY": "test-r2-secret",
"R2_ENDPOINT": "test.r2.cloudflarestorage.com",
"R2_ARTIFACTS_BUCKET": "test-artifacts",
"SSH_PUBLIC_KEY": "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAITest",
"SSH_PRIVATE_KEY": "-----BEGIN OPENSSH PRIVATE KEY-----\ntest\n-----END OPENSSH PRIVATE KEY-----",
"SSH_PRIVATE_KEY_PATH": str(ssh_key_path),
"CLOUDFLARE_API_TOKEN": "test-cf-token",
"ICEBERG_REST_URI": "https://api.cloudflare.com/test",
"R2_WAREHOUSE_NAME": "test-warehouse",
}
@@ -67,33 +60,3 @@ def mock_ssh_wait():
"""Mock SSH wait function to return immediately."""
with patch("materia.providers.hetzner.wait_for_ssh", return_value=True):
yield
@pytest.fixture
def mock_ssh_connection():
"""Mock paramiko SSH connection."""
with patch("materia.pipelines.paramiko.SSHClient") as mock_ssh_class, \
patch("materia.pipelines.paramiko.RSAKey.from_private_key_file") as mock_key:
ssh_instance = Mock()
mock_ssh_class.return_value = ssh_instance
mock_key.return_value = Mock()
ssh_instance.connect = Mock()
ssh_instance.set_missing_host_key_policy = Mock()
mock_channel = Mock()
mock_channel.recv_exit_status.return_value = 0
mock_stdout = Mock()
mock_stdout.read.return_value = b"Success\n"
mock_stdout.channel = mock_channel
mock_stderr = Mock()
mock_stderr.read.return_value = b""
ssh_instance.exec_command = Mock(
return_value=(Mock(), mock_stdout, mock_stderr)
)
ssh_instance.close = Mock()
yield ssh_instance

View File

@@ -1,5 +1,7 @@
"""End-to-end tests for the materia CLI."""
from unittest.mock import patch
from typer.testing import CliRunner
from materia.cli import app
@@ -33,7 +35,6 @@ def test_secrets_list_command(mock_secrets):
result = runner.invoke(app, ["secrets", "list"])
assert result.exit_code == 0
assert "HETZNER_API_TOKEN" in result.stdout
assert "R2_ACCESS_KEY_ID" in result.stdout
def test_worker_list_empty(mock_secrets, mock_hcloud_client):
@@ -98,46 +99,55 @@ def test_worker_destroy(mock_secrets, mock_hcloud_client):
assert "Worker destroyed" in result.stdout
def test_pipeline_list(mock_secrets):
def test_pipeline_list():
"""Test pipeline list command."""
result = runner.invoke(app, ["pipeline", "list"])
assert result.exit_code == 0
assert "extract" in result.stdout
assert "transform" in result.stdout
assert "ccx12" in result.stdout
assert "ccx22" in result.stdout
assert "1800" in result.stdout
assert "3600" in result.stdout
def test_pipeline_run_extract(
mock_secrets, mock_hcloud_client, mock_ssh_wait, mock_ssh_connection
):
def test_pipeline_run_extract():
"""Test running extract pipeline end-to-end."""
result = runner.invoke(app, ["pipeline", "run", "extract"])
with patch("materia.pipelines.subprocess.run") as mock_run:
mock_run.return_value.returncode = 0
mock_run.return_value.stdout = "Extracted successfully\n"
mock_run.return_value.stderr = ""
assert result.exit_code == 0
assert "Running pipeline" in result.stdout
assert "Pipeline completed successfully" in result.stdout
result = runner.invoke(app, ["pipeline", "run", "extract"])
mock_hcloud_client.servers.create.assert_called_once()
mock_ssh_connection.connect.assert_called()
mock_ssh_connection.exec_command.assert_called()
assert result.exit_code == 0
assert "Running pipeline" in result.stdout
assert "Pipeline completed successfully" in result.stdout
mock_run.assert_called_once()
call_args = mock_run.call_args
assert call_args[0][0] == ["uv", "run", "--package", "psdonline", "extract_psd"]
assert call_args[1]["timeout"] == 1800
def test_pipeline_run_transform(
mock_secrets, mock_hcloud_client, mock_ssh_wait, mock_ssh_connection
):
def test_pipeline_run_transform():
"""Test running transform pipeline end-to-end."""
result = runner.invoke(app, ["pipeline", "run", "transform"])
with patch("materia.pipelines.subprocess.run") as mock_run:
mock_run.return_value.returncode = 0
mock_run.return_value.stdout = "Transform complete\n"
mock_run.return_value.stderr = ""
assert result.exit_code == 0
assert "Running pipeline" in result.stdout
assert "Pipeline completed successfully" in result.stdout
result = runner.invoke(app, ["pipeline", "run", "transform"])
mock_hcloud_client.servers.create.assert_called_once()
mock_ssh_connection.connect.assert_called()
assert result.exit_code == 0
assert "Running pipeline" in result.stdout
assert "Pipeline completed successfully" in result.stdout
mock_run.assert_called_once()
call_args = mock_run.call_args
assert "sqlmesh" in call_args[0][0]
assert call_args[1]["timeout"] == 3600
def test_pipeline_run_invalid(mock_secrets):
def test_pipeline_run_invalid():
"""Test running an invalid pipeline."""
result = runner.invoke(app, ["pipeline", "run", "invalid-pipeline"])

View File

@@ -1,5 +1,5 @@
# --- Gateway Connection ---
# Single gateway connecting to R2 Iceberg catalog
# Single local DuckDB gateway
# Local dev uses virtual environments (e.g., dev_<username>)
# Production uses the 'prod' environment
gateways:
@@ -7,48 +7,13 @@ gateways:
connection:
type: duckdb
catalogs:
local: 'local.duckdb'
cloudflare:
type: iceberg
path: '{{ env_var("ICEBERG_WAREHOUSE_NAME") }}'
connector_config:
endpoint: '{{ env_var("ICEBERG_CATALOG_URI") }}'
extensions:
- name: httpfs
- name: iceberg
secrets:
r2_secret:
type: iceberg
token: "{{ env_var('R2_ADMIN_API_TOKEN') }}"
r2_data_secret:
type: r2
key_id: "{{ env_var('R2_ADMIN_ACCESS_KEY_ID') }}"
secret: "{{ env_var('R2_ADMIN_SECRET_ACCESS_KEY') }}"
account_id: "{{ var('CLOUDFLARE_ACCOUNT_ID') }}"
region: 'eeur'
local: '{{ env_var("DUCKDB_PATH", "local.duckdb") }}'
default_gateway: duckdb
# --- Variables ---
# Make environment variables available to models
variables:
R2_BUCKET: beanflows-data-prod
CLOUDFLARE_ACCOUNT_ID: "{{ env_var('CLOUDFLARE_ACCOUNT_ID') }}"
# --- Catalog Configuration ---
# Attach R2 Iceberg catalog and configure default schema
# https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#execution-hooks
# https://developers.cloudflare.com/r2/data-catalog/config-examples/duckdb/
#before_all:
# - "ATTACH '{{ env_var('ICEBERG_WAREHOUSE_NAME') }}' AS catalog (TYPE ICEBERG, ENDPOINT '{{ env_var('ICEBERG_CATALOG_URI') }}', SECRET r2_secret);"
# Note: R2 data access is configured via r2_data_secret (TYPE R2)
# Models can use r2://bucket/path to read landing data
# Note: CREATE SCHEMA has a DuckDB/Iceberg bug (missing Content-Type header)
# Schema must be pre-created in R2 Data Catalog via Cloudflare dashboard or API
# For now, skip USE statement and rely on fully-qualified table names in models
LANDING_DIR: '{{ env_var("LANDING_DIR", "data/landing") }}'
# --- Model Defaults ---
# https://sqlmesh.readthedocs.io/en/stable/reference/model_configuration/#model-defaults
@@ -59,7 +24,6 @@ model_defaults:
cron: '@daily' # Run models daily at 12am UTC (can override per model)
# --- Linting Rules ---
# Enforce standards for your team
# https://sqlmesh.readthedocs.io/en/stable/guides/linter/
linter:
@@ -68,22 +32,8 @@ linter:
- ambiguousorinvalidcolumn
- invalidselectstarexpansion
# FLOW: Minimal prompts, automatic changes, summary output
# https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#plan
#plan:
# no_diff: true # Hide detailed text differences for changed models
# no_prompts: true # No interactive prompts
# auto_apply: true # Apply changes automatically
# --- Optional: Set a default target environment ---
# This is intended for local development to prevent users from accidentally applying plans to the prod environment.
# It is a development only config and should NOT be committed to your git repo.
# --- Default Target Environment ---
# Prevents accidentally applying plans to prod during local development.
# https://sqlmesh.readthedocs.io/en/stable/guides/configuration/#default-target-environment
# Uncomment the following line to use a default target environment derived from the logged in user's name.
default_target_environment: dev_{{ user() }}
# Example usage:
# sqlmesh plan # Automatically resolves to: sqlmesh plan dev_yourname
# sqlmesh plan prod # Specify `prod` to apply changes to production

View File

@@ -21,4 +21,4 @@ MODEL (
)
);
select *
FROM read_csv('extract/psdonline/src/psdonline/data/*.csv.gzip', delim=',', encoding='utf-8', compression='gzip', max_line_size=10000000, header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true)
FROM read_csv('{{ var("LANDING_DIR") }}/psd/**/*.csv.gzip', delim=',', encoding='utf-8', compression='gzip', max_line_size=10000000, header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true)

View File

@@ -41,7 +41,7 @@ select
any_value(unit_name) as unit_name,
any_value(value) as value,
hash(commodity_code, commodity_name, country_code, country_name, market_year, calendar_year, month, attribute_id, attribute_name, unit_id, unit_name, value) as hkey,
any_value(make_date(split(filename, '/')[-4]::int, split(filename, '/')[-3]::int, 1)) as ingest_date,
any_value(make_date(split(filename, '/')[-3]::int, split(filename, '/')[-2]::int, 1)) as ingest_date,
any_value(if(month!=0,last_day(make_date(market_year, month, 1)),null)) as market_date_month_end,
from cast_dtypes
group by hkey

View File

@@ -2,7 +2,6 @@
name = "sqlmesh_materia"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
authors = [
{ name = "Deeman", email = "hendriknote@gmail.com" }
]

1140
uv.lock generated

File diff suppressed because it is too large Load Diff