CLAUDE.md additions: - List all 6 extractor packages + extract_core - Full data flow with all sources + dual-DuckDB - Foundation-as-ontology: dim_commodity conforms cross-source identifiers - Two-DuckDB architecture explanation (why not serving.duckdb) - Extraction pattern: one-package-per-source, state SQLite, adding new source - Supervisor: croniter scheduling, topological waves, tag-based deploy - CI/CD: pull-based via git tags, no SSH - Secrets management: SOPS+age section, file table, server key workflow - uv workspace management section - Remove Pulumi ESC references; update env vars table infra/readme.md: - Update architecture diagram (add analytics.duckdb, age-key.txt) - Rewrite setup flow: setup_server.sh → add key to SOPS → bootstrap - Secrets management section with file table - Deploy model: pull-based (no SSH/CI credentials) - Monitoring: add supervisor status + extraction state DB query Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
9.8 KiB
CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
Project Overview
Materia is a commodity data analytics platform (product: BeanFlows.coffee) for coffee traders. It's a uv workspace monorepo: multiple extraction packages, a SQL transformation pipeline, a web app, and a CLI for local pipeline execution.
Commands
# Install dependencies
uv sync --all-packages
# Lint & format
ruff check . # Check
ruff check --fix . # Auto-fix
ruff format . # Format
# Tests
uv run pytest tests/ -v --cov=src/materia # CLI/Python tests
cd transform/sqlmesh_materia && uv run sqlmesh test # SQLMesh model tests
# Run a single test
uv run pytest tests/test_cli.py::test_name -v
# SQLMesh (from repo root)
uv run sqlmesh -p transform/sqlmesh_materia plan # Plans to dev_<username> by default
uv run sqlmesh -p transform/sqlmesh_materia plan prod # Production
uv run sqlmesh -p transform/sqlmesh_materia test # Run model tests
uv run sqlmesh -p transform/sqlmesh_materia format # Format SQL
# CLI
uv run materia pipeline run extract|transform|export_serving
uv run materia pipeline list
uv run materia secrets list
uv run materia secrets test
# Supervisor status (production)
uv run python src/materia/supervisor.py status
# CSS (Tailwind)
make css-build # one-shot build
make css-watch # watch mode
# Secrets
make secrets-decrypt-dev # decrypt .env.dev.sops → .env (local dev)
make secrets-decrypt-prod # decrypt .env.prod.sops → .env
make secrets-edit-dev # edit dev secrets in $EDITOR
make secrets-edit-prod # edit prod secrets in $EDITOR
Architecture
Workspace packages (pyproject.toml → [tool.uv.workspace]):
extract/extract_core/— Shared extraction utilities: state tracking (SQLite), HTTP helpers, atomic file writesextract/psdonline/— USDA PSD Online data (ZIP → gzip CSV)extract/cftc_cot/— CFTC Commitments of Traders (weekly)extract/coffee_prices/— KC=F futures pricesextract/ice_stocks/— ICE warehouse stocks + aging reportsextract/openmeteo/— Daily weather for 12 coffee-growing regions (Open-Meteo ERA5, no API key)transform/sqlmesh_materia/— 3-layer SQL transformation pipeline (DuckDB)src/materia/— CLI (Typer): pipeline execution, secrets, versionweb/— Quart + HTMX web app (BeanFlows.coffee dashboard)
Data flow:
USDA API → extract → /data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip
CFTC API → extract → /data/materia/landing/cot/{year}/{date}.csv.gz
Yahoo/prices → extract → /data/materia/landing/prices/{symbol}/{date}.json.gz
ICE API → extract → /data/materia/landing/ice_stocks/{date}.csv.gz
Open-Meteo → extract → /data/materia/landing/weather/{location_id}/{year}/{date}.json.gz
→ rclone timer syncs landing/ to R2 every 6 hours
→ SQLMesh staging → foundation → serving → /data/materia/lakehouse.duckdb
→ export_serving pipeline → /data/materia/analytics.duckdb (web app)
→ Web app reads analytics.duckdb (read-only, per-thread)
SQLMesh 3-layer model structure (transform/sqlmesh_materia/models/):
staging/— Type casting, lookup joins, basic cleansing (reads landing directly)foundation/— Business logic, pivoting, conformed dimensions (ontology), factsserving/— Analytics-ready aggregates for the web app
Foundation layer is the ontology. dim_commodity conforms identifiers across all sources:
- Each row = one commodity (e.g. Arabica coffee)
- Columns:
usda_commodity_code,cftc_contract_market_code,ice_stock_report_code,ticker(KC=F), etc. - New data sources add columns to existing dims, not new tables
- Facts join to dims via surrogate keys (MD5 hash keys generated in staging)
Two-DuckDB architecture:
lakehouse.duckdb(DUCKDB_PATH) — SQLMesh exclusive write; never opened by web appanalytics.duckdb(SERVING_DUCKDB_PATH) — read-only serving copy for web app- Why not
serving.duckdb: DuckDB derives catalog name from filename stem — "serving" would collide with the "serving" schema inside export_servingpipeline copiesserving.*tables via Arrow + atomic rename after each transform- Web app uses per-thread connections (
threading.local) with inode-based reopen on rotation
Extraction pattern — one workspace package per data source:
- All packages depend on
extract_core(shared state tracking, HTTP, file writes) - Landing zone is immutable and content-addressed:
{LANDING_DIR}/{source}/{partitions}/{hash}.ext - State tracked in SQLite at
{LANDING_DIR}/.state.sqlite(WAL mode, OLTP — not DuckDB) - Query state:
sqlite3 data/landing/.state.sqlite "SELECT * FROM extraction_runs ORDER BY run_id DESC LIMIT 20"
Adding a new data source:
# Create package
uv init --package extract/new_source
uv add --package new_source extract-core niquests
# Add entry function in extract/new_source/src/new_source/execute.py
# Register in infra/supervisor/workflows.toml
# Add staging + foundation models in transform/sqlmesh_materia/models/
Supervisor (src/materia/supervisor.py):
- Croniter-based scheduling with named presets:
hourly,daily,weekly,monthly - Workflow registry:
infra/supervisor/workflows.toml - Dependency-wave execution: independent workflows run in parallel (ThreadPoolExecutor)
- Each tick: git pull (tag-based) → due extractors → SQLMesh → export_serving → web deploy if changed
- Crash-safe: systemd
Restart=always+ 10-minute backoff on tick failure
CI/CD (.gitlab/.gitlab-ci.yml) — pull-based, no SSH:
teststage: pytest, sqlmesh test, web pytesttagstage: createsv${CI_PIPELINE_IID}tag after tests pass (master branch only)- Supervisor polls for new tags every 60s, checks out latest, runs
uv sync - No SSH keys or deploy credentials in CI — only
CI_JOB_TOKEN(built-in)
CLI modules (src/materia/):
cli.py— Typer app with subcommands: pipeline, secrets, versionpipelines.py— Local subprocess pipeline execution with bounded timeoutssecrets.py— SOPS+age integration (decrypts.env.prod.sops)
Infrastructure (infra/):
- Pulumi IaC for Cloudflare R2 buckets
- Python supervisor + systemd service
- rclone systemd timer for landing data backup to R2
setup_server.sh— one-time server init (age keypair generation)bootstrap_supervisor.sh— full server setup from scratch
Secrets management (SOPS + age)
| File | Purpose |
|---|---|
.env.dev.sops |
Dev defaults (safe values, local paths) |
.env.prod.sops |
Production secrets (encrypted) |
.sops.yaml |
Maps file patterns to age public keys |
age-key.txt |
Server age keypair (gitignored, generated by setup_server.sh) |
make secrets-decrypt-dev # decrypt dev secrets → .env (local dev)
make secrets-edit-prod # edit prod secrets in $EDITOR
web/deploy.sh auto-decrypts .env.prod.sops → web/.env on each deploy.
src/materia/secrets.py decrypts on-demand via subprocess call to sops.
Adding the server key (new server setup):
- Run
infra/setup_server.shon the server — prints the age public key - Add the public key to
.sops.yamlon your workstation - Run
sops updatekeys .env.prod.sops - Commit + push
uv workspace management
# Install everything (run from repo root)
uv sync --all-packages --all-groups
# Create a new extraction package
uv init --package extract/new_source
uv add --package new_source extract-core niquests
# Add a dependency to an existing package
uv add --package materia croniter
uv add --package beanflows duckdb
# Run a command in a specific package context
uv run --package new_source python -c "import new_source"
Always use uv CLI to manage dependencies — never edit pyproject.toml manually for dependency changes.
Coding Philosophy
Read coding_philosophy.md for the full guide. Key points:
- Simple, procedural code — Functions over classes, no inheritance hierarchies, no "Manager" patterns
- Data-oriented — Use dicts/lists/tuples, not objects hiding data behind getters
- Keep logic in SQL — Let DuckDB do the heavy lifting, don't pull data into Python to transform it
- Build minimum that works — No premature abstraction, three examples before generalizing
- Explicit over implicit — No framework magic, no metaprogramming, no hidden behavior
- Question every dependency — Can you write it simply yourself? Are you using 5% of a large framework?
Key Configuration
- Python 3.13 (
.python-version) - Ruff: double quotes, spaces, E501 ignored (formatter handles line length)
- SQLMesh: DuckDB dialect,
@dailycron, start date2025-07-07, default envdev_{{ user() }} - Storage: Local NVMe (
LANDING_DIR,DUCKDB_PATH,SERVING_DUCKDB_PATH), R2 for backup via rclone - Secrets: SOPS + age (
.env.*.sopsfiles, Makefile targets) - CI: GitLab CI — test → tag (pull-based deploy, no SSH)
- Pre-commit hooks: installed via
pre-commit install
Environment Variables
| Variable | Default | Description |
|---|---|---|
LANDING_DIR |
data/landing |
Root directory for extracted landing data |
DUCKDB_PATH |
local.duckdb |
Path to the SQLMesh lakehouse database (exclusive write) |
SERVING_DUCKDB_PATH |
analytics.duckdb |
Path to the serving DB (read by web app) |
ALERT_WEBHOOK_URL |
(empty) | ntfy.sh URL for supervisor failure alerts |
SUPERVISOR_GIT_PULL |
(unset) | Set to any value to enable tag-based git pull in supervisor |