# CLAUDE.md This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. ## Project Overview Materia is a commodity data analytics platform built on a modern data engineering stack. The project extracts agricultural commodity data from USDA PSD Online, transforms it through a layered SQL pipeline using SQLMesh, and stores it in DuckDB for analysis. **Tech Stack:** - Python 3.13 with `uv` package manager - SQLMesh for SQL transformation and orchestration - DuckDB as the analytical database - Workspace structure with separate extract and transform packages ## Environment Setup **Install dependencies:** ```bash uv sync ``` **Setup pre-commit hooks:** ```bash pre-commit install ``` **Add new dependencies:** ```bash uv add ``` ## Project Structure This is a uv workspace with three main components: ### 1. Extract Layer (`extract/`) Contains extraction packages for pulling data from external sources. - **`extract/psdonline/`**: Extracts USDA PSD commodity data from archives dating back to 2006 - Entry point: `extract_psd` CLI command (defined in `extract/psdonline/src/psdonline/execute.py`) - Downloads monthly zip archives to `extract/psdonline/src/psdonline/data/` - Uses ETags to avoid re-downloading unchanged files **Run extraction:** ```bash extract_psd ``` ### 2. Transform Layer (`transform/sqlmesh_materia/`) SQLMesh project implementing a layered data architecture. **Working directory:** All SQLMesh commands must be run from `transform/sqlmesh_materia/` **Key commands:** ```bash cd transform/sqlmesh_materia # Plan changes (no prompts, auto-apply enabled in config) sqlmesh plan # Run tests sqlmesh test # Validate models sqlmesh validate # Run audits sqlmesh audit # Format SQL sqlmesh format # Start UI sqlmesh ui ``` **Configuration:** - Config: `transform/sqlmesh_materia/config.yaml` - Default gateway: `dev` (uses `materia_dev.db`) - Production gateway: `prod` (uses `materia_prod.db`) - Auto-apply enabled, no interactive prompts - DuckDB extensions: zipfs, httpfs, iceberg ### 3. Core Package (`src/materia/`) Currently minimal; main logic resides in workspace packages. ## Data Architecture SQLMesh models follow a strict 4-layer architecture defined in `transform/sqlmesh_materia/models/README.md`: ### Layer 1: Raw (`models/raw/`) - **Purpose:** Immutable archive of source data - **Pattern:** Directly reads from extraction outputs - **Example:** `raw.psd_alldata` reads zip files using DuckDB's `read_csv('zip://...')` function - **Grain:** Defines unique keys for each raw table ### Layer 2: Staging (`models/staging/`) - **Purpose:** Apply schema, cast types, basic cleansing - **Pattern:** `stg_[source]__[entity]` - **Example:** `stg_psdalldata__commodity.sql` casts raw strings to proper types, joins lookup tables - **Features:** - Deduplication using hash keys - Extracts metadata (ingest_date) from file paths - 1:1 relationship with raw sources ### Layer 3: Cleaned (`models/cleaned/`) - **Purpose:** Integration, business logic, unified models - **Pattern:** `cln_[entity]` or `cln_[vault_component]_[entity]` - **Example:** `cln_psdalldata__commodity_pivoted.sql` pivots commodity attributes into columns ### Layer 4: Serving (`models/serving/`) - **Purpose:** Analytics-ready models (star schema, aggregates) - **Patterns:** - `dim_[entity]` for dimensions - `fct_[process]` for facts - `agg_[description]` for aggregates - `obt_[description]` for one-big-tables - **Example:** `obt_commodity_metrics.sql` provides wide table for analysis ## Model Development **Incremental models:** - Use `INCREMENTAL_BY_TIME_RANGE` kind - Define `time_column` (usually `ingest_date`) - Filter with `WHERE time_column BETWEEN @start_ds AND @end_ds` **Full refresh models:** - Use `FULL` kind for small lookup tables and raw sources **Model properties:** - `grain`: Define unique key columns for data quality - `start`: Historical backfill start date (project default: 2025-07-07) - `cron`: Schedule (project default: '@daily') ## Linting and Formatting **Run linting:** ```bash ruff check . ``` **Auto-fix issues:** ```bash ruff check --fix . ``` **Format code:** ```bash ruff format . ``` Pre-commit hooks automatically run ruff on commits. ## Testing **Run SQLMesh tests:** ```bash cd transform/sqlmesh_materia sqlmesh test ``` **Run Python tests (if configured):** ```bash pytest --cov=./ --cov-report=xml ``` ## CI/CD Pipeline and Production Architecture ### CI/CD Pipeline (`.gitlab-ci.yml`) **3 Stages: Lint → Test → Deploy** #### 1. Lint Stage - Runs `ruff check` on every commit - Validates code quality #### 2. Test Stage - **`test:cli`**: Runs pytest on materia CLI with 71% coverage - Tests secrets management (Pulumi ESC integration) - Tests worker lifecycle (create, list, destroy) - Tests pipeline execution (extract, transform) - Exports coverage reports to GitLab - **`test:sqlmesh`**: Runs SQLMesh model tests in transform layer #### 3. Deploy Stage (only on master branch) - **`deploy:infra`**: Runs `pulumi up` to ensure supervisor instance exists - Runs on every master push - Creates/updates Hetzner CPX11 supervisor instance (~€4.49/mo) - Uses Pulumi ESC (`beanflows/prod`) for all secrets - **`deploy:supervisor`**: Checks supervisor status - Verifies supervisor is bootstrapped - Supervisor auto-updates via `git pull` every 15 minutes (no CI/CD deployment needed) **Note:** No build artifacts! Supervisor pulls code directly from git and runs via `uv`. ### Production Architecture: Git-Based Deployment with Ephemeral Workers **Design Philosophy:** - No always-on workers (cost optimization) - Supervisor pulls latest code from git (no artifact builds) - Supervisor dynamically creates/destroys workers on-demand - Simple, inspectable, easy to test locally - Multi-cloud abstraction for pricing optimization **Components:** #### 1. Supervisor Instance (Small Hetzner VM) - Runs `supervisor.sh` - continuous orchestration loop (inspired by TigerBeetle's CFO supervisor) - Hetzner CPX11: 2 vCPU (shared), 2GB RAM (~€4.49/mo) - Always-on, minimal resource usage - Git-based deployment: `git pull` every 15 minutes for auto-updates - Runs pipelines on schedule: - Extract: Daily at 2 AM UTC - Transform: Daily at 3 AM UTC - Uses systemd service for automatic restart on failure - Pulls secrets from Pulumi ESC **Bootstrap (one-time):** ```bash # Get supervisor IP from Pulumi cd infra && pulumi stack output supervisor_ip -s prod # Run bootstrap script export PULUMI_ACCESS_TOKEN= ssh root@ 'bash -s' < infra/bootstrap_supervisor.sh ``` #### 2. Ephemeral Workers (On-Demand) - Created for each pipeline execution by materia CLI - Receives secrets via SSH environment variable injection - Destroyed immediately after job completion - Different instance types per pipeline: - Extract: `ccx12` (2 vCPU, 8GB RAM) - Transform: `ccx22` (4 vCPU, 16GB RAM) #### 3. Secrets Flow ``` Pulumi ESC (beanflows/prod) ↓ Supervisor Instance (via esc CLI) ↓ Workers (injected as env vars via SSH) ``` #### 4. Code Deployment Flow ``` GitLab (master branch) ↓ Supervisor: git pull origin master (every 15 min) ↓ Supervisor: uv sync (update dependencies) ↓ Supervisor: uv run materia pipeline run ``` #### 5. Data Storage - **Dev**: Local DuckDB file (`materia_dev.db`) - **Prod**: DuckDB in-memory + Cloudflare R2 Data Catalog (Iceberg REST API) - ACID transactions on object storage - No persistent database on workers **Execution Flow:** 1. Supervisor loop wakes up every 15 minutes 2. Runs `git fetch` and checks if new commits on master 3. If updates available: `git pull && uv sync` 4. Checks if current time matches pipeline schedule (e.g., 2 AM for extract) 5. If scheduled: `uv run materia pipeline run extract` 6. CLI creates Hetzner worker with SSH key 7. CLI injects secrets via SSH and executes pipeline 8. Pipeline executes, writes to R2 Iceberg catalog 9. Worker destroyed (entire lifecycle ~5-10 minutes) 10. Supervisor logs results and continues loop **Multi-Cloud Provider Abstraction:** - Protocol-based interface (data-oriented design, no OOP) - Providers: Hetzner (implemented), OVH, Scaleway, Oracle (stubs) - Allows switching providers for cost optimization - Each provider implements: `create_instance`, `destroy_instance`, `list_instances`, `wait_for_ssh` ## Key Design Patterns **Raw data ingestion:** - DuckDB reads directly from zip archives using `read_csv('zip://...')` - `filename=true` captures source file path for metadata - `union_by_name=true` handles schema evolution **Deduplication:** - Use `hash()` function to create unique keys - Use `any_value()` with `GROUP BY hkey` to deduplicate - Preserve all metadata in hash key for change detection **Date handling:** - Extract ingest dates from file paths: `make_date(split(filename, '/')[-4]::int, split(filename, '/')[-3]::int, 1)` - Calculate market dates: `last_day(make_date(market_year, month, 1))` **SQLMesh best practices:** - Always define `grain` for data quality validation - Use meaningful model names following layer conventions - Leverage SQLMesh's built-in time macros (`@start_ds`, `@end_ds`) - Keep raw layer thin, push transformations to staging+ ## Database Location - **Dev database:** `materia_dev.db` (13GB, in project root) - **Prod database:** `materia_prod.db` (not yet created) Note: The dev database is large and should not be committed to git (.gitignore already configured). - We use a monorepo with uv workspaces - The pulumi env is called beanflows/prod - NEVER hardcode secrets in plaintext - Never add ssh keys to the git repo! - If there is a simpler more direct solution and there is no other tradeoff, always choose the simpler solution