# CLAUDE.md This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. ## Project Overview Materia is a commodity data analytics platform built on a modern data engineering stack. The project extracts agricultural commodity data from USDA PSD Online, transforms it through a layered SQL pipeline using SQLMesh, and stores it in DuckDB for analysis. **Tech Stack:** - Python 3.13 with `uv` package manager - SQLMesh for SQL transformation and orchestration - DuckDB as the analytical database - Workspace structure with separate extract and transform packages ## Environment Setup **Install dependencies:** ```bash uv sync ``` **Setup pre-commit hooks:** ```bash pre-commit install ``` **Add new dependencies:** ```bash uv add ``` ## Project Structure This is a uv workspace with three main components: ### 1. Extract Layer (`extract/`) Contains extraction packages for pulling data from external sources. - **`extract/psdonline/`**: Extracts USDA PSD commodity data from archives dating back to 2006 - Entry point: `extract_psd` CLI command (defined in `extract/psdonline/src/psdonline/execute.py`) - Downloads monthly zip archives to `extract/psdonline/src/psdonline/data/` - Uses ETags to avoid re-downloading unchanged files **Run extraction:** ```bash extract_psd ``` ### 2. Transform Layer (`transform/sqlmesh_materia/`) SQLMesh project implementing a layered data architecture. **Working directory:** All SQLMesh commands must be run from `transform/sqlmesh_materia/` **Key commands:** ```bash cd transform/sqlmesh_materia # Plan changes (no prompts, auto-apply enabled in config) sqlmesh plan # Run tests sqlmesh test # Validate models sqlmesh validate # Run audits sqlmesh audit # Format SQL sqlmesh format # Start UI sqlmesh ui ``` **Configuration:** - Config: `transform/sqlmesh_materia/config.yaml` - Default gateway: `dev` (uses `materia_dev.db`) - Production gateway: `prod` (uses `materia_prod.db`) - Auto-apply enabled, no interactive prompts - DuckDB extensions: zipfs, httpfs, iceberg ### 3. Core Package (`src/materia/`) Currently minimal; main logic resides in workspace packages. ## Data Architecture SQLMesh models follow a strict 4-layer architecture defined in `transform/sqlmesh_materia/models/README.md`: ### Layer 1: Raw (`models/raw/`) - **Purpose:** Immutable archive of source data - **Pattern:** Directly reads from extraction outputs - **Example:** `raw.psd_alldata` reads zip files using DuckDB's `read_csv('zip://...')` function - **Grain:** Defines unique keys for each raw table ### Layer 2: Staging (`models/staging/`) - **Purpose:** Apply schema, cast types, basic cleansing - **Pattern:** `stg_[source]__[entity]` - **Example:** `stg_psdalldata__commodity.sql` casts raw strings to proper types, joins lookup tables - **Features:** - Deduplication using hash keys - Extracts metadata (ingest_date) from file paths - 1:1 relationship with raw sources ### Layer 3: Cleaned (`models/cleaned/`) - **Purpose:** Integration, business logic, unified models - **Pattern:** `cln_[entity]` or `cln_[vault_component]_[entity]` - **Example:** `cln_psdalldata__commodity_pivoted.sql` pivots commodity attributes into columns ### Layer 4: Serving (`models/serving/`) - **Purpose:** Analytics-ready models (star schema, aggregates) - **Patterns:** - `dim_[entity]` for dimensions - `fct_[process]` for facts - `agg_[description]` for aggregates - `obt_[description]` for one-big-tables - **Example:** `obt_commodity_metrics.sql` provides wide table for analysis ## Model Development **Incremental models:** - Use `INCREMENTAL_BY_TIME_RANGE` kind - Define `time_column` (usually `ingest_date`) - Filter with `WHERE time_column BETWEEN @start_ds AND @end_ds` **Full refresh models:** - Use `FULL` kind for small lookup tables and raw sources **Model properties:** - `grain`: Define unique key columns for data quality - `start`: Historical backfill start date (project default: 2025-07-07) - `cron`: Schedule (project default: '@daily') ## Linting and Formatting **Run linting:** ```bash ruff check . ``` **Auto-fix issues:** ```bash ruff check --fix . ``` **Format code:** ```bash ruff format . ``` Pre-commit hooks automatically run ruff on commits. ## Testing **Run SQLMesh tests:** ```bash cd transform/sqlmesh_materia sqlmesh test ``` **Run Python tests (if configured):** ```bash pytest --cov=./ --cov-report=xml ``` ## CI/CD Pipeline and Production Architecture ### CI/CD Pipeline (`.gitlab-ci.yml`) **4 Stages: Lint → Test → Build → Deploy** #### 1. Lint Stage - Runs `ruff check` and `ruff format --check` - Validates code quality on every commit #### 2. Test Stage - **`test:cli`**: Runs pytest on materia CLI with 71% coverage - Tests secrets management (Pulumi ESC integration) - Tests worker lifecycle (create, list, destroy) - Tests pipeline execution (extract, transform) - Exports coverage reports to GitLab - **`test:sqlmesh`**: Runs SQLMesh model tests in transform layer #### 3. Build Stage (only on master branch) Creates separate artifacts for each workspace package: - **`build:extract`**: Builds `materia-extract-latest.tar.gz` (psdonline package) - **`build:transform`**: Builds `materia-transform-latest.tar.gz` (sqlmesh_materia package) - **`build:cli`**: Builds `materia-cli-latest.tar.gz` (materia management CLI) Each artifact is a self-contained tarball with all dependencies. #### 4. Deploy Stage (only on master branch) - **`deploy:r2`**: Uploads artifacts to Cloudflare R2 using rclone - Loads secrets from Pulumi ESC (`beanflows/prod`) - Only requires `PULUMI_ACCESS_TOKEN` in GitLab variables - All other secrets (R2 credentials, SSH keys, API tokens) come from ESC - **`deploy:infra`**: Runs `pulumi up` to ensure supervisor instance exists - Runs on every master push (not just on infra changes) - Creates/updates Hetzner CCX11 supervisor instance - Configures Cloudflare R2 buckets (`beanflows-artifacts`, `beanflows-data-prod`) - **`deploy:supervisor`**: Deploys supervisor script and materia CLI - Runs after `deploy:r2` and `deploy:infra` - Copies `supervisor.sh` and systemd service to supervisor instance - Downloads and installs latest materia CLI from R2 - Restarts supervisor service to pick up changes ### Production Architecture: Ephemeral Worker Model **Design Philosophy:** - No always-on workers (cost optimization) - Supervisor instance dynamically creates/destroys workers on-demand - Language-agnostic artifacts enable future migration to C/Rust/Go - Multi-cloud abstraction for pricing optimization **Components:** #### 1. Supervisor Instance (Small Hetzner VM) - Runs `supervisor.sh` - continuous orchestration loop (inspired by TigerBeetle's CFO supervisor) - Hetzner CCX11: 2 vCPU, 4GB RAM (~€4/mo) - Always-on, minimal resource usage - Checks for new CLI versions every hour (self-updating) - Runs pipelines on schedule: - Extract: Daily at 2 AM UTC - Transform: Daily at 3 AM UTC - Uses systemd service for automatic restart on failure - Pulls secrets from Pulumi ESC and passes to workers #### 2. Ephemeral Workers (On-Demand) - Created for each pipeline execution - Downloads pre-built artifacts from R2 (no git, no uv on worker) - Receives secrets via SSH environment variable injection - Destroyed immediately after job completion - Different instance types per pipeline: - Extract: `ccx12` (2 vCPU, 8GB RAM) - Transform: `ccx22` (4 vCPU, 16GB RAM) #### 3. Secrets Flow ``` Pulumi ESC (beanflows/prod) ↓ Supervisor Instance (materia CLI) ↓ Workers (injected as env vars via SSH) ``` #### 4. Artifact Flow ``` GitLab CI: uv build → tar.gz ↓ Cloudflare R2 (artifact storage) ↓ Worker: curl → extract → execute ``` #### 5. Data Storage - **Dev**: Local DuckDB file (`materia_dev.db`) - **Prod**: DuckDB in-memory + Cloudflare R2 Data Catalog (Iceberg REST API) - ACID transactions on object storage - No persistent database on workers **Execution Flow:** 1. Supervisor loop wakes up every 15 minutes 2. Checks if current time matches pipeline schedule (e.g., 2 AM for extract) 3. Checks for CLI updates (hourly) and self-updates if needed 4. CLI runs: `materia pipeline run extract` 5. Creates Hetzner worker with SSH key 6. Worker downloads `materia-extract-latest.tar.gz` from R2 7. CLI injects secrets via SSH: `export R2_ACCESS_KEY_ID=... && ./extract_psd` 8. Pipeline executes, writes to R2 Iceberg catalog 9. Worker destroyed (entire lifecycle ~5-10 minutes) 10. Supervisor logs results and continues loop **Multi-Cloud Provider Abstraction:** - Protocol-based interface (data-oriented design, no OOP) - Providers: Hetzner (implemented), OVH, Scaleway, Oracle (stubs) - Allows switching providers for cost optimization - Each provider implements: `create_instance`, `destroy_instance`, `list_instances`, `wait_for_ssh` ## Key Design Patterns **Raw data ingestion:** - DuckDB reads directly from zip archives using `read_csv('zip://...')` - `filename=true` captures source file path for metadata - `union_by_name=true` handles schema evolution **Deduplication:** - Use `hash()` function to create unique keys - Use `any_value()` with `GROUP BY hkey` to deduplicate - Preserve all metadata in hash key for change detection **Date handling:** - Extract ingest dates from file paths: `make_date(split(filename, '/')[-4]::int, split(filename, '/')[-3]::int, 1)` - Calculate market dates: `last_day(make_date(market_year, month, 1))` **SQLMesh best practices:** - Always define `grain` for data quality validation - Use meaningful model names following layer conventions - Leverage SQLMesh's built-in time macros (`@start_ds`, `@end_ds`) - Keep raw layer thin, push transformations to staging+ ## Database Location - **Dev database:** `materia_dev.db` (13GB, in project root) - **Prod database:** `materia_prod.db` (not yet created) Note: The dev database is large and should not be committed to git (.gitignore already configured). - We use a monorepo with uv workspaces - The pulumi env is called beanflows/prod