Files
beanflows/CLAUDE.md
2026-02-27 13:30:53 +01:00

10 KiB

CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

Project Overview

Materia is a commodity data analytics platform (product: BeanFlows.coffee) for coffee traders. It's a uv workspace monorepo: multiple extraction packages, a SQL transformation pipeline, a web app, and a CLI for local pipeline execution.

Commands

# Install dependencies
uv sync --all-packages

# Lint & format
ruff check .            # Check
ruff check --fix .      # Auto-fix
ruff format .           # Format

# Tests
uv run pytest tests/ -v --cov=src/materia         # CLI/Python tests
cd transform/sqlmesh_materia && uv run sqlmesh test  # SQLMesh model tests

# Run a single test
uv run pytest tests/test_cli.py::test_name -v

# SQLMesh (from repo root)
uv run sqlmesh -p transform/sqlmesh_materia plan              # Plans to dev_<username> by default
uv run sqlmesh -p transform/sqlmesh_materia plan prod          # Production
uv run sqlmesh -p transform/sqlmesh_materia test               # Run model tests
uv run sqlmesh -p transform/sqlmesh_materia format             # Format SQL

# CLI
uv run materia pipeline run extract|transform|export_serving
uv run materia pipeline list
uv run materia secrets list
uv run materia secrets test

# Supervisor status (production)
uv run python src/materia/supervisor.py status

# CSS (Tailwind)
make css-build     # one-shot build
make css-watch     # watch mode

# Secrets
make secrets-decrypt-dev   # decrypt .env.dev.sops → .env (local dev)
make secrets-decrypt-prod  # decrypt .env.prod.sops → .env
make secrets-edit-dev      # edit dev secrets in $EDITOR
make secrets-edit-prod     # edit prod secrets in $EDITOR

Architecture

Workspace packages (pyproject.toml[tool.uv.workspace]):

  • extract/extract_core/ — Shared extraction utilities: state tracking (SQLite), HTTP helpers, atomic file writes
  • extract/psdonline/ — USDA PSD Online data (ZIP → gzip CSV)
  • extract/cftc_cot/ — CFTC Commitments of Traders (weekly)
  • extract/coffee_prices/ — KC=F futures prices
  • extract/ice_stocks/ — ICE warehouse stocks + aging reports
  • extract/openmeteo/ — Daily weather for 12 coffee-growing regions (Open-Meteo ERA5, no API key)
  • transform/sqlmesh_materia/ — 3-layer SQL transformation pipeline (DuckDB)
  • src/materia/ — CLI (Typer): pipeline execution, secrets, version
  • web/ — Quart + HTMX web app (BeanFlows.coffee dashboard)

Data flow:

USDA API     → extract → /data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip
CFTC API     → extract → /data/materia/landing/cot/{year}/{date}.csv.gz
Yahoo/prices → extract → /data/materia/landing/prices/{symbol}/{date}.json.gz
ICE API      → extract → /data/materia/landing/ice_stocks/{date}.csv.gz
Open-Meteo   → extract → /data/materia/landing/weather/{location_id}/{year}/{date}.json.gz
         → rclone timer syncs landing/ to R2 every 6 hours
         → SQLMesh staging → foundation → serving → /data/materia/lakehouse.duckdb
         → export_serving pipeline → /data/materia/analytics.duckdb (web app)
         → Web app reads analytics.duckdb (read-only, per-thread)

SQLMesh 3-layer model structure (transform/sqlmesh_materia/models/):

  1. staging/ — Type casting, lookup joins, basic cleansing (reads landing directly)
  2. foundation/ — Business logic, pivoting, conformed dimensions (ontology), facts
  3. serving/ — Analytics-ready aggregates for the web app

Foundation layer is the ontology. dim_commodity conforms identifiers across all sources:

  • Each row = one commodity (e.g. Arabica coffee)
  • Columns: usda_commodity_code, cftc_contract_market_code, ice_stock_report_code, ticker (KC=F), etc.
  • New data sources add columns to existing dims, not new tables
  • Facts join to dims via surrogate keys (MD5 hash keys generated in staging)

Two-DuckDB architecture:

  • lakehouse.duckdb (DUCKDB_PATH) — SQLMesh exclusive write; never opened by web app
  • analytics.duckdb (SERVING_DUCKDB_PATH) — read-only serving copy for web app
  • Why not serving.duckdb: DuckDB derives catalog name from filename stem — "serving" would collide with the "serving" schema inside
  • export_serving pipeline copies serving.* tables via Arrow + atomic rename after each transform
  • Web app uses per-thread connections (threading.local) with inode-based reopen on rotation

Extraction pattern — one workspace package per data source:

  • All packages depend on extract_core (shared state tracking, HTTP, file writes)
  • Landing zone is immutable and content-addressed: {LANDING_DIR}/{source}/{partitions}/{hash}.ext
  • State tracked in SQLite at {LANDING_DIR}/.state.sqlite (WAL mode, OLTP — not DuckDB)
  • Query state: sqlite3 data/landing/.state.sqlite "SELECT * FROM extraction_runs ORDER BY run_id DESC LIMIT 20"

Adding a new data source:

# Create package
uv init --package extract/new_source
uv add --package new_source extract-core niquests

# Add entry function in extract/new_source/src/new_source/execute.py
# Register in infra/supervisor/workflows.toml
# Add staging + foundation models in transform/sqlmesh_materia/models/

Supervisor (src/materia/supervisor.py):

  • Croniter-based scheduling with named presets: hourly, daily, weekly, monthly
  • Workflow registry: infra/supervisor/workflows.toml
  • Dependency-wave execution: independent workflows run in parallel (ThreadPoolExecutor)
  • Each tick: git pull (tag-based) → due extractors → SQLMesh → export_serving → web deploy if changed
  • Crash-safe: systemd Restart=always + 10-minute backoff on tick failure

CI/CD (.gitlab/.gitlab-ci.yml) — pull-based, no SSH:

  • test stage: pytest, sqlmesh test, web pytest
  • tag stage: creates v${CI_PIPELINE_IID} tag after tests pass (master branch only)
  • Supervisor polls for new tags every 60s, checks out latest, runs uv sync
  • No SSH keys or deploy credentials in CI — only CI_JOB_TOKEN (built-in)

CLI modules (src/materia/):

  • cli.py — Typer app with subcommands: pipeline, secrets, version
  • pipelines.py — Local subprocess pipeline execution with bounded timeouts
  • secrets.py — SOPS+age integration (decrypts .env.prod.sops)

Infrastructure (infra/):

  • Pulumi IaC for Cloudflare R2 buckets
  • Python supervisor + systemd service
  • rclone systemd timer for landing data backup to R2
  • setup_server.sh — one-time server init (age keypair generation)
  • bootstrap_supervisor.sh — full server setup from scratch

Secrets management (SOPS + age)

File Purpose
.env.dev.sops Dev defaults (safe values, local paths)
.env.prod.sops Production secrets (encrypted)
.sops.yaml Maps file patterns to age public keys
age-key.txt Server age keypair (gitignored, generated by setup_server.sh)
make secrets-decrypt-dev   # decrypt dev secrets → .env (local dev)
make secrets-edit-prod     # edit prod secrets in $EDITOR

web/deploy.sh auto-decrypts .env.prod.sopsweb/.env on each deploy. src/materia/secrets.py decrypts on-demand via subprocess call to sops.

Adding the server key (new server setup):

  1. Run infra/setup_server.sh on the server — prints the age public key
  2. Add the public key to .sops.yaml on your workstation
  3. Run sops updatekeys .env.prod.sops
  4. Commit + push

uv workspace management

# Install everything (run from repo root)
uv sync --all-packages --all-groups

# Create a new extraction package
uv init --package extract/new_source
uv add --package new_source extract-core niquests

# Add a dependency to an existing package
uv add --package materia croniter
uv add --package beanflows duckdb

# Run a command in a specific package context
uv run --package new_source python -c "import new_source"

Always use uv CLI to manage dependencies — never edit pyproject.toml manually for dependency changes.

Coding Philosophy

Read coding_philosophy.md for the full guide. Key points:

  • Simple, procedural code — Functions over classes, no inheritance hierarchies, no "Manager" patterns
  • Data-oriented — Use dicts/lists/tuples, not objects hiding data behind getters
  • Keep logic in SQL — Let DuckDB do the heavy lifting, don't pull data into Python to transform it
  • Build minimum that works — No premature abstraction, three examples before generalizing
  • Explicit over implicit — No framework magic, no metaprogramming, no hidden behavior
  • Question every dependency — Can you write it simply yourself? Are you using 5% of a large framework?

Key Configuration

  • Python 3.13 (.python-version)
  • Ruff: double quotes, spaces, E501 ignored (formatter handles line length)
  • SQLMesh: DuckDB dialect, @daily cron, start date 2025-07-07, default env dev_{{ user() }}
  • Storage: Local NVMe (LANDING_DIR, DUCKDB_PATH, SERVING_DUCKDB_PATH), R2 for backup via rclone
  • Secrets: SOPS + age (.env.*.sops files, Makefile targets)
  • CI: GitLab CI — test → tag (pull-based deploy, no SSH)
  • Pre-commit hooks: installed via pre-commit install

Environment Variables

Variable Default Description
LANDING_DIR data/landing Root directory for extracted landing data
DUCKDB_PATH local.duckdb Path to the SQLMesh lakehouse database (exclusive write)
SERVING_DUCKDB_PATH analytics.duckdb Path to the serving DB (read by web app)
ALERT_WEBHOOK_URL (empty) ntfy.sh URL for supervisor failure alerts
SUPERVISOR_GIT_PULL (unset) Set to any value to enable tag-based git pull in supervisor
R2_ACCESS_KEY_ID (empty) Cloudflare R2 access key — enables backup timer when all three R2 vars are set
R2_SECRET_ACCESS_KEY (empty) Cloudflare R2 secret key
R2_ENDPOINT (empty) Cloudflare account ID (used to construct R2 endpoint URL)