Files
beanflows/CLAUDE.md
Deeman d4e6c65f97 Fix SQLMesh command documentation
Corrected SQLMesh commands to show proper usage:
- Run from project root (not from transform/sqlmesh_materia/)
- Use -p flag to specify project directory
- Use uv run for all commands
- Use esc run for commands requiring secrets (plan, audit, ui)
- Clarified which commands need secrets vs local-only

This aligns with the actual working pattern and Pulumi ESC integration.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-21 21:46:41 +02:00

377 lines
13 KiB
Markdown

# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Materia is a commodity data analytics platform built on a modern data engineering stack. The project extracts agricultural commodity data from USDA PSD Online, transforms it through a layered SQL pipeline using SQLMesh, and stores it in DuckDB for analysis.
**Tech Stack:**
- Python 3.13 with `uv` package manager
- SQLMesh for SQL transformation and orchestration
- DuckDB as the analytical database
- Workspace structure with separate extract and transform packages
## Environment Setup
**Install dependencies:**
```bash
uv sync
```
**Setup pre-commit hooks:**
```bash
pre-commit install
```
**Add new dependencies:**
```bash
uv add <package-name>
```
## Secrets Management with Pulumi ESC
All secrets are managed via Pulumi ESC (Environment, Secrets, and Configuration). The production environment is `beanflows/prod`.
**Load secrets into your shell:**
```bash
# Login to Pulumi ESC (one-time)
esc login
# Load secrets as environment variables
eval $(esc env open beanflows/prod --format shell)
# Now all secrets are available as env vars
echo $R2_ENDPOINT # Example: access R2 endpoint
```
**Run commands with ESC secrets:**
```bash
# Run a command with secrets loaded
esc run beanflows/prod -- uv run extract_psd
# Run multiple commands
esc run beanflows/prod -- bash -c "
uv run extract_psd
cd transform/sqlmesh_materia && uv run sqlmesh plan prod
"
```
**Available secrets in `beanflows/prod`:**
- R2 storage: `R2_ENDPOINT`, `R2_BUCKET`, `R2_ACCESS_KEY`, `R2_SECRET_KEY`
- Hetzner Cloud: `HETZNER_TOKEN`, SSH keys
- GitLab: `GITLAB_READ_TOKEN`
- Iceberg catalog credentials
**Note:** Never hardcode secrets! Always use Pulumi ESC or environment variables.
## Project Structure
This is a uv workspace with three main components:
### 1. Extract Layer (`extract/`)
Contains extraction packages for pulling data from external sources.
- **`extract/psdonline/`**: Extracts USDA PSD commodity data
- Entry point: `extract_psd` CLI command (defined in `extract/psdonline/src/psdonline/execute.py`)
- Checks latest available monthly snapshot (tries current month and 3 months back)
- Uses ETags to avoid re-downloading unchanged files
- Storage modes:
- **Local mode** (no R2 credentials): Downloads to `extract/psdonline/src/psdonline/data/{etag}.zip`
- **R2 mode** (R2 credentials present): Uploads to `s3://bucket/psd/{etag}.zip`
- Flat structure: files named by ETag for natural deduplication
**Run extraction:**
```bash
extract_psd # Local mode (default)
# R2 mode (requires env vars: R2_ENDPOINT, R2_BUCKET, R2_ACCESS_KEY, R2_SECRET_KEY)
export R2_ENDPOINT=...
export R2_BUCKET=...
export R2_ACCESS_KEY=...
export R2_SECRET_KEY=...
extract_psd
```
### 2. Transform Layer (`transform/sqlmesh_materia/`)
SQLMesh project implementing a layered data architecture.
**Working directory:** All SQLMesh commands must be run from the project root with `-p transform/sqlmesh_materia`
**Key commands:**
```bash
# Load secrets and run SQLMesh commands
# Always run from project root with -p flag and uv
# Local development (creates virtual environment)
esc run beanflows/prod -- uv run sqlmesh -p transform/sqlmesh_materia plan dev_<username>
# Production
esc run beanflows/prod -- uv run sqlmesh -p transform/sqlmesh_materia plan prod
# Run tests
uv run sqlmesh -p transform/sqlmesh_materia test
# Validate models
uv run sqlmesh -p transform/sqlmesh_materia validate
# Run audits (requires secrets)
esc run beanflows/prod -- uv run sqlmesh -p transform/sqlmesh_materia audit
# Format SQL
uv run sqlmesh -p transform/sqlmesh_materia format
# Start UI (requires secrets)
esc run beanflows/prod -- uv run sqlmesh -p transform/sqlmesh_materia ui
```
**Configuration:**
- Config: `transform/sqlmesh_materia/config.yaml`
- Single gateway: `prod` (connects to R2 Iceberg catalog)
- Uses virtual environments for dev isolation (e.g., `dev_deeman`)
- Production uses `prod` environment
- Auto-apply enabled, no interactive prompts
- DuckDB extensions: httpfs, iceberg
**Environment Strategy:**
- All environments connect to the same R2 Iceberg catalog
- Dev environments (e.g., `dev_deeman`) are isolated virtual environments
- SQLMesh manages environment isolation and table versioning
- No local DuckDB files needed
### 3. Core Package (`src/materia/`)
Currently minimal; main logic resides in workspace packages.
## Data Architecture
SQLMesh models follow a strict 4-layer architecture defined in `transform/sqlmesh_materia/models/README.md`:
### Layer 1: Raw (`models/raw/`)
- **Purpose:** Immutable archive of source data
- **Pattern:** Directly reads from extraction outputs
- **Example:** `raw.psd_alldata` reads zip files using DuckDB's `read_csv('zip://...')` function
- **Grain:** Defines unique keys for each raw table
### Layer 2: Staging (`models/staging/`)
- **Purpose:** Apply schema, cast types, basic cleansing
- **Pattern:** `stg_[source]__[entity]`
- **Example:** `stg_psdalldata__commodity.sql` casts raw strings to proper types, joins lookup tables
- **Features:**
- Deduplication using hash keys
- Extracts metadata (ingest_date) from file paths
- 1:1 relationship with raw sources
### Layer 3: Cleaned (`models/cleaned/`)
- **Purpose:** Integration, business logic, unified models
- **Pattern:** `cln_[entity]` or `cln_[vault_component]_[entity]`
- **Example:** `cln_psdalldata__commodity_pivoted.sql` pivots commodity attributes into columns
### Layer 4: Serving (`models/serving/`)
- **Purpose:** Analytics-ready models (star schema, aggregates)
- **Patterns:**
- `dim_[entity]` for dimensions
- `fct_[process]` for facts
- `agg_[description]` for aggregates
- `obt_[description]` for one-big-tables
- **Example:** `obt_commodity_metrics.sql` provides wide table for analysis
## Model Development
**Incremental models:**
- Use `INCREMENTAL_BY_TIME_RANGE` kind
- Define `time_column` (usually `ingest_date`)
- Filter with `WHERE time_column BETWEEN @start_ds AND @end_ds`
**Full refresh models:**
- Use `FULL` kind for small lookup tables and raw sources
**Model properties:**
- `grain`: Define unique key columns for data quality
- `start`: Historical backfill start date (project default: 2025-07-07)
- `cron`: Schedule (project default: '@daily')
## Linting and Formatting
**Run linting:**
```bash
ruff check .
```
**Auto-fix issues:**
```bash
ruff check --fix .
```
**Format code:**
```bash
ruff format .
```
Pre-commit hooks automatically run ruff on commits.
## Testing
**Run SQLMesh tests:**
```bash
cd transform/sqlmesh_materia
sqlmesh test
```
**Run Python tests (if configured):**
```bash
pytest --cov=./ --cov-report=xml
```
## CI/CD Pipeline and Production Architecture
### CI/CD Pipeline (`.gitlab-ci.yml`)
**3 Stages: Lint → Test → Deploy**
#### 1. Lint Stage
- Runs `ruff check` on every commit
- Validates code quality
#### 2. Test Stage
- **`test:cli`**: Runs pytest on materia CLI with 71% coverage
- Tests secrets management (Pulumi ESC integration)
- Tests worker lifecycle (create, list, destroy)
- Tests pipeline execution (extract, transform)
- Exports coverage reports to GitLab
- **`test:sqlmesh`**: Runs SQLMesh model tests in transform layer
#### 3. Deploy Stage (only on master branch)
- **`deploy:infra`**: Runs `pulumi up` to ensure supervisor instance exists
- Runs on every master push
- Creates/updates Hetzner CPX11 supervisor instance (~€4.49/mo)
- Uses Pulumi ESC (`beanflows/prod`) for all secrets
- **`deploy:supervisor`**: Bootstraps and monitors supervisor
- Checks if supervisor is already bootstrapped (`test -d /opt/materia/.git`)
- If not bootstrapped: Runs `infra/bootstrap_supervisor.sh` automatically
- If already bootstrapped: Verifies service status
- After bootstrap: Supervisor auto-updates via `git pull` every 15 minutes
**Note:** No build artifacts! Supervisor pulls code directly from git and runs via `uv`.
### Production Architecture: Git-Based Deployment with Ephemeral Workers
**Design Philosophy:**
- No always-on workers (cost optimization)
- Supervisor pulls latest code from git (no artifact builds)
- Supervisor dynamically creates/destroys workers on-demand
- Simple, inspectable, easy to test locally
- Multi-cloud abstraction for pricing optimization
**Components:**
#### 1. Supervisor Instance (Small Hetzner VM)
- Runs `supervisor.sh` - continuous orchestration loop (inspired by TigerBeetle's CFO supervisor)
- Hetzner CPX11: 2 vCPU (shared), 2GB RAM (~€4.49/mo)
- Always-on, minimal resource usage
- Git-based deployment: `git pull` every 15 minutes for auto-updates
- Runs pipelines on schedule:
- Extract: Daily at 2 AM UTC
- Transform: Daily at 3 AM UTC
- Uses systemd service for automatic restart on failure
- Pulls secrets from Pulumi ESC
**Bootstrap:**
Bootstrapping happens automatically in CI/CD (`deploy:supervisor` stage). The pipeline:
1. Checks if supervisor is already bootstrapped
2. If not: Runs `infra/bootstrap_supervisor.sh` with secrets injected
3. If yes: Verifies systemd service status
Manual bootstrap (if needed):
```bash
cd infra && pulumi stack output supervisor_ip -s prod
export PULUMI_ACCESS_TOKEN=<your-token>
ssh root@<supervisor-ip> 'bash -s' < infra/bootstrap_supervisor.sh
```
#### 2. Ephemeral Workers (On-Demand)
- Created for each pipeline execution by materia CLI
- Receives secrets via SSH environment variable injection
- Destroyed immediately after job completion
- Different instance types per pipeline:
- Extract: `ccx12` (2 vCPU, 8GB RAM)
- Transform: `ccx22` (4 vCPU, 16GB RAM)
#### 3. Secrets Flow
```
Pulumi ESC (beanflows/prod)
Supervisor Instance (via esc CLI)
Workers (injected as env vars via SSH)
```
#### 4. Code Deployment Flow
```
GitLab (master branch)
Supervisor: git pull origin master (every 15 min)
Supervisor: uv sync (update dependencies)
Supervisor: uv run materia pipeline run <pipeline>
```
#### 5. Data Storage
- **All environments**: DuckDB in-memory + Cloudflare R2 Data Catalog (Iceberg REST API)
- ACID transactions on object storage
- No persistent database on workers
- Virtual environments for dev isolation (e.g., `dev_deeman`)
**Execution Flow:**
1. Supervisor loop wakes up every 15 minutes
2. Runs `git fetch` and checks if new commits on master
3. If updates available: `git pull && uv sync`
4. Checks if current time matches pipeline schedule (e.g., 2 AM for extract)
5. If scheduled: `uv run materia pipeline run extract`
6. CLI creates Hetzner worker with SSH key
7. CLI injects secrets via SSH and executes pipeline
8. Pipeline executes, writes to R2 Iceberg catalog
9. Worker destroyed (entire lifecycle ~5-10 minutes)
10. Supervisor logs results and continues loop
**Multi-Cloud Provider Abstraction:**
- Protocol-based interface (data-oriented design, no OOP)
- Providers: Hetzner (implemented), OVH, Scaleway, Oracle (stubs)
- Allows switching providers for cost optimization
- Each provider implements: `create_instance`, `destroy_instance`, `list_instances`, `wait_for_ssh`
## Key Design Patterns
**Raw data ingestion:**
- DuckDB reads directly from zip archives using `read_csv('zip://...')`
- `filename=true` captures source file path for metadata
- `union_by_name=true` handles schema evolution
**Deduplication:**
- Use `hash()` function to create unique keys
- Use `any_value()` with `GROUP BY hkey` to deduplicate
- Preserve all metadata in hash key for change detection
**Date handling:**
- Extract ingest dates from file paths: `make_date(split(filename, '/')[-4]::int, split(filename, '/')[-3]::int, 1)`
- Calculate market dates: `last_day(make_date(market_year, month, 1))`
**SQLMesh best practices:**
- Always define `grain` for data quality validation
- Use meaningful model names following layer conventions
- Leverage SQLMesh's built-in time macros (`@start_ds`, `@end_ds`)
- Keep raw layer thin, push transformations to staging+
## Data Storage
All data is stored in Cloudflare R2 Data Catalog (Apache Iceberg) via REST API:
- **Production environment:** `prod`
- **Dev environments:** `dev_<username>` (virtual environments)
- SQLMesh manages environment isolation and table versioning
- No local database files needed
- We use a monorepo with uv workspaces
- The pulumi env is called beanflows/prod
- NEVER hardcode secrets in plaintext
- Never add ssh keys to the git repo!
- If there is a simpler more direct solution and there is no other tradeoff, always choose the simpler solution