Files
beanflows/CLAUDE.md
2025-10-12 21:52:39 +02:00

295 lines
9.3 KiB
Markdown

# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Materia is a commodity data analytics platform built on a modern data engineering stack. The project extracts agricultural commodity data from USDA PSD Online, transforms it through a layered SQL pipeline using SQLMesh, and stores it in DuckDB for analysis.
**Tech Stack:**
- Python 3.13 with `uv` package manager
- SQLMesh for SQL transformation and orchestration
- DuckDB as the analytical database
- Workspace structure with separate extract and transform packages
## Environment Setup
**Install dependencies:**
```bash
uv sync
```
**Setup pre-commit hooks:**
```bash
pre-commit install
```
**Add new dependencies:**
```bash
uv add <package-name>
```
## Project Structure
This is a uv workspace with three main components:
### 1. Extract Layer (`extract/`)
Contains extraction packages for pulling data from external sources.
- **`extract/psdonline/`**: Extracts USDA PSD commodity data from archives dating back to 2006
- Entry point: `extract_psd` CLI command (defined in `extract/psdonline/src/psdonline/execute.py`)
- Downloads monthly zip archives to `extract/psdonline/src/psdonline/data/`
- Uses ETags to avoid re-downloading unchanged files
**Run extraction:**
```bash
extract_psd
```
### 2. Transform Layer (`transform/sqlmesh_materia/`)
SQLMesh project implementing a layered data architecture.
**Working directory:** All SQLMesh commands must be run from `transform/sqlmesh_materia/`
**Key commands:**
```bash
cd transform/sqlmesh_materia
# Plan changes (no prompts, auto-apply enabled in config)
sqlmesh plan
# Run tests
sqlmesh test
# Validate models
sqlmesh validate
# Run audits
sqlmesh audit
# Format SQL
sqlmesh format
# Start UI
sqlmesh ui
```
**Configuration:**
- Config: `transform/sqlmesh_materia/config.yaml`
- Default gateway: `dev` (uses `materia_dev.db`)
- Production gateway: `prod` (uses `materia_prod.db`)
- Auto-apply enabled, no interactive prompts
- DuckDB extensions: zipfs, httpfs, iceberg
### 3. Core Package (`src/materia/`)
Currently minimal; main logic resides in workspace packages.
## Data Architecture
SQLMesh models follow a strict 4-layer architecture defined in `transform/sqlmesh_materia/models/README.md`:
### Layer 1: Raw (`models/raw/`)
- **Purpose:** Immutable archive of source data
- **Pattern:** Directly reads from extraction outputs
- **Example:** `raw.psd_alldata` reads zip files using DuckDB's `read_csv('zip://...')` function
- **Grain:** Defines unique keys for each raw table
### Layer 2: Staging (`models/staging/`)
- **Purpose:** Apply schema, cast types, basic cleansing
- **Pattern:** `stg_[source]__[entity]`
- **Example:** `stg_psdalldata__commodity.sql` casts raw strings to proper types, joins lookup tables
- **Features:**
- Deduplication using hash keys
- Extracts metadata (ingest_date) from file paths
- 1:1 relationship with raw sources
### Layer 3: Cleaned (`models/cleaned/`)
- **Purpose:** Integration, business logic, unified models
- **Pattern:** `cln_[entity]` or `cln_[vault_component]_[entity]`
- **Example:** `cln_psdalldata__commodity_pivoted.sql` pivots commodity attributes into columns
### Layer 4: Serving (`models/serving/`)
- **Purpose:** Analytics-ready models (star schema, aggregates)
- **Patterns:**
- `dim_[entity]` for dimensions
- `fct_[process]` for facts
- `agg_[description]` for aggregates
- `obt_[description]` for one-big-tables
- **Example:** `obt_commodity_metrics.sql` provides wide table for analysis
## Model Development
**Incremental models:**
- Use `INCREMENTAL_BY_TIME_RANGE` kind
- Define `time_column` (usually `ingest_date`)
- Filter with `WHERE time_column BETWEEN @start_ds AND @end_ds`
**Full refresh models:**
- Use `FULL` kind for small lookup tables and raw sources
**Model properties:**
- `grain`: Define unique key columns for data quality
- `start`: Historical backfill start date (project default: 2025-07-07)
- `cron`: Schedule (project default: '@daily')
## Linting and Formatting
**Run linting:**
```bash
ruff check .
```
**Auto-fix issues:**
```bash
ruff check --fix .
```
**Format code:**
```bash
ruff format .
```
Pre-commit hooks automatically run ruff on commits.
## Testing
**Run SQLMesh tests:**
```bash
cd transform/sqlmesh_materia
sqlmesh test
```
**Run Python tests (if configured):**
```bash
pytest --cov=./ --cov-report=xml
```
## CI/CD Pipeline and Production Architecture
### CI/CD Pipeline (`.gitlab-ci.yml`)
**4 Stages: Lint → Test → Build → Deploy**
#### 1. Lint Stage
- Runs `ruff check` and `ruff format --check`
- Validates code quality on every commit
#### 2. Test Stage
- **`test:cli`**: Runs pytest on materia CLI with 71% coverage
- Tests secrets management (Pulumi ESC integration)
- Tests worker lifecycle (create, list, destroy)
- Tests pipeline execution (extract, transform)
- Exports coverage reports to GitLab
- **`test:sqlmesh`**: Runs SQLMesh model tests in transform layer
#### 3. Build Stage (only on master branch)
Creates separate artifacts for each workspace package:
- **`build:extract`**: Builds `materia-extract-latest.tar.gz` (psdonline package)
- **`build:transform`**: Builds `materia-transform-latest.tar.gz` (sqlmesh_materia package)
- **`build:cli`**: Builds `materia-cli-latest.tar.gz` (materia management CLI)
Each artifact is a self-contained tarball with all dependencies.
#### 4. Deploy Stage (only on master branch)
- **`deploy:r2`**: Uploads artifacts to Cloudflare R2 using rclone
- Loads secrets from Pulumi ESC (`beanflows/prod`)
- Only requires `PULUMI_ACCESS_TOKEN` in GitLab variables
- All other secrets (R2 credentials, SSH keys, API tokens) come from ESC
- **`deploy:infra`**: Runs `pulumi up` to deploy infrastructure changes
- Only triggers when `infra/**/*` files change
### Production Architecture: Ephemeral Worker Model
**Design Philosophy:**
- No always-on workers (cost optimization)
- Supervisor instance dynamically creates/destroys workers on-demand
- Language-agnostic artifacts enable future migration to C/Rust/Go
- Multi-cloud abstraction for pricing optimization
**Components:**
#### 1. Supervisor Instance (Small Hetzner VM)
- Runs the `materia` management CLI
- Small, always-on instance (cheap)
- Pulls secrets from Pulumi ESC
- Orchestrates worker lifecycle via cloud provider APIs
#### 2. Ephemeral Workers (On-Demand)
- Created for each pipeline execution
- Downloads pre-built artifacts from R2 (no git, no uv on worker)
- Receives secrets via SSH environment variable injection
- Destroyed immediately after job completion
- Different instance types per pipeline:
- Extract: `ccx12` (2 vCPU, 8GB RAM)
- Transform: `ccx22` (4 vCPU, 16GB RAM)
#### 3. Secrets Flow
```
Pulumi ESC (beanflows/prod)
Supervisor Instance (materia CLI)
Workers (injected as env vars via SSH)
```
#### 4. Artifact Flow
```
GitLab CI: uv build → tar.gz
Cloudflare R2 (artifact storage)
Worker: curl → extract → execute
```
#### 5. Data Storage
- **Dev**: Local DuckDB file (`materia_dev.db`)
- **Prod**: DuckDB in-memory + Cloudflare R2 Data Catalog (Iceberg REST API)
- ACID transactions on object storage
- No persistent database on workers
**Execution Flow:**
1. Supervisor receives schedule trigger (cron/manual)
2. CLI runs: `materia pipeline run extract`
3. Creates Hetzner worker with SSH key
4. Worker downloads `materia-extract-latest.tar.gz` from R2
5. CLI injects secrets via SSH: `export R2_ACCESS_KEY_ID=... && ./extract_psd`
6. Pipeline executes, writes to R2 Iceberg catalog
7. Worker destroyed (entire lifecycle ~5-10 minutes)
**Multi-Cloud Provider Abstraction:**
- Protocol-based interface (data-oriented design, no OOP)
- Providers: Hetzner (implemented), OVH, Scaleway, Oracle (stubs)
- Allows switching providers for cost optimization
- Each provider implements: `create_instance`, `destroy_instance`, `list_instances`, `wait_for_ssh`
## Key Design Patterns
**Raw data ingestion:**
- DuckDB reads directly from zip archives using `read_csv('zip://...')`
- `filename=true` captures source file path for metadata
- `union_by_name=true` handles schema evolution
**Deduplication:**
- Use `hash()` function to create unique keys
- Use `any_value()` with `GROUP BY hkey` to deduplicate
- Preserve all metadata in hash key for change detection
**Date handling:**
- Extract ingest dates from file paths: `make_date(split(filename, '/')[-4]::int, split(filename, '/')[-3]::int, 1)`
- Calculate market dates: `last_day(make_date(market_year, month, 1))`
**SQLMesh best practices:**
- Always define `grain` for data quality validation
- Use meaningful model names following layer conventions
- Leverage SQLMesh's built-in time macros (`@start_ds`, `@end_ds`)
- Keep raw layer thin, push transformations to staging+
## Database Location
- **Dev database:** `materia_dev.db` (13GB, in project root)
- **Prod database:** `materia_prod.db` (not yet created)
Note: The dev database is large and should not be committed to git (.gitignore already configured).
- We use a monorepo with uv workspaces
- The pulumi env is called beanflows/prod