From 518b50d0f57a59e686943fca1dbb63be3bcb6fe3 Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 12:04:55 +0100 Subject: [PATCH] docs(claude+infra): expand CLAUDE.md + infra/readme.md for full architecture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CLAUDE.md additions: - List all 6 extractor packages + extract_core - Full data flow with all sources + dual-DuckDB - Foundation-as-ontology: dim_commodity conforms cross-source identifiers - Two-DuckDB architecture explanation (why not serving.duckdb) - Extraction pattern: one-package-per-source, state SQLite, adding new source - Supervisor: croniter scheduling, topological waves, tag-based deploy - CI/CD: pull-based via git tags, no SSH - Secrets management: SOPS+age section, file table, server key workflow - uv workspace management section - Remove Pulumi ESC references; update env vars table infra/readme.md: - Update architecture diagram (add analytics.duckdb, age-key.txt) - Rewrite setup flow: setup_server.sh → add key to SOPS → bootstrap - Secrets management section with file table - Deploy model: pull-based (no SSH/CI credentials) - Monitoring: add supervisor status + extraction state DB query Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 164 +++++++++++++++++++++++++++++++++++++++--------- infra/readme.md | 118 +++++++++++++++++++++++++--------- 2 files changed, 223 insertions(+), 59 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 1431444..9845bf4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,13 +4,13 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Project Overview -Materia is a commodity data analytics platform (product: **BeanFlows.coffee**) for coffee traders. It's a uv workspace monorepo with three packages: extraction (USDA PSD data), SQL transformation (SQLMesh + DuckDB), and a CLI for worker management and local pipeline execution. +Materia is a commodity data analytics platform (product: **BeanFlows.coffee**) for coffee traders. It's a uv workspace monorepo: multiple extraction packages, a SQL transformation pipeline, a web app, and a CLI for local pipeline execution. ## Commands ```bash # Install dependencies -uv sync +uv sync --all-packages # Lint & format ruff check . # Check @@ -24,9 +24,6 @@ cd transform/sqlmesh_materia && uv run sqlmesh test # SQLMesh model tests # Run a single test uv run pytest tests/test_cli.py::test_name -v -# Extract data -LANDING_DIR=data/landing uv run extract_psd - # SQLMesh (from repo root) uv run sqlmesh -p transform/sqlmesh_materia plan # Plans to dev_ by default uv run sqlmesh -p transform/sqlmesh_materia plan prod # Production @@ -34,45 +31,153 @@ uv run sqlmesh -p transform/sqlmesh_materia test # Run model tests uv run sqlmesh -p transform/sqlmesh_materia format # Format SQL # CLI -uv run materia pipeline run extract|transform +uv run materia pipeline run extract|transform|export_serving uv run materia pipeline list -uv run materia worker create|destroy|list -uv run materia secrets get +uv run materia secrets list +uv run materia secrets test + +# Supervisor status (production) +uv run python src/materia/supervisor.py status + +# CSS (Tailwind) +make css-build # one-shot build +make css-watch # watch mode + +# Secrets +make secrets-decrypt-dev # decrypt .env.dev.sops → .env (local dev) +make secrets-decrypt-prod # decrypt .env.prod.sops → .env +make secrets-edit-dev # edit dev secrets in $EDITOR +make secrets-edit-prod # edit prod secrets in $EDITOR ``` ## Architecture -**Workspace packages** (`pyproject.toml` → `tool.uv.workspace`): -- `extract/psdonline/` — Downloads USDA PSD Online data, normalizes ZIP→gzip CSV, writes to local landing directory -- `extract/openmeteo/` — Daily weather for 12 coffee-growing regions (Open-Meteo, ERA5 reanalysis, no API key) -- `transform/sqlmesh_materia/` — 3-layer SQL transformation pipeline (local DuckDB) -- `src/materia/` — CLI (Typer) for pipeline execution, worker management, secrets -- `web/` — Future web frontend +**Workspace packages** (`pyproject.toml` → `[tool.uv.workspace]`): +- `extract/extract_core/` — Shared extraction utilities: state tracking (SQLite), HTTP helpers, atomic file writes +- `extract/psdonline/` — USDA PSD Online data (ZIP → gzip CSV) +- `extract/cftc_cot/` — CFTC Commitments of Traders (weekly) +- `extract/coffee_prices/` — KC=F futures prices +- `extract/ice_stocks/` — ICE warehouse stocks + aging reports +- `extract/openmeteo/` — Daily weather for 12 coffee-growing regions (Open-Meteo ERA5, no API key) +- `transform/sqlmesh_materia/` — 3-layer SQL transformation pipeline (DuckDB) +- `src/materia/` — CLI (Typer): pipeline execution, secrets, version +- `web/` — Quart + HTMX web app (BeanFlows.coffee dashboard) **Data flow:** ``` -USDA API → extract → /data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip -Open-Meteo → extract → /data/materia/landing/weather/{location_id}/{year}/{date}.json.gz - → rclone cron syncs landing/ to R2 +USDA API → extract → /data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip +CFTC API → extract → /data/materia/landing/cot/{year}/{date}.csv.gz +Yahoo/prices → extract → /data/materia/landing/prices/{symbol}/{date}.json.gz +ICE API → extract → /data/materia/landing/ice_stocks/{date}.csv.gz +Open-Meteo → extract → /data/materia/landing/weather/{location_id}/{year}/{date}.json.gz + → rclone timer syncs landing/ to R2 every 6 hours → SQLMesh staging → foundation → serving → /data/materia/lakehouse.duckdb - → Web app reads lakehouse.duckdb (read-only) + → export_serving pipeline → /data/materia/analytics.duckdb (web app) + → Web app reads analytics.duckdb (read-only, per-thread) ``` **SQLMesh 3-layer model structure** (`transform/sqlmesh_materia/models/`): 1. `staging/` — Type casting, lookup joins, basic cleansing (reads landing directly) -2. `foundation/` — Business logic, pivoting, dimensions, facts (also reads landing directly) +2. `foundation/` — Business logic, pivoting, **conformed dimensions** (ontology), facts 3. `serving/` — Analytics-ready aggregates for the web app +**Foundation layer is the ontology.** `dim_commodity` conforms identifiers across all sources: +- Each row = one commodity (e.g. Arabica coffee) +- Columns: `usda_commodity_code`, `cftc_contract_market_code`, `ice_stock_report_code`, `ticker` (KC=F), etc. +- New data sources add columns to existing dims, not new tables +- Facts join to dims via surrogate keys (MD5 hash keys generated in staging) + +**Two-DuckDB architecture:** +- `lakehouse.duckdb` (`DUCKDB_PATH`) — SQLMesh exclusive write; never opened by web app +- `analytics.duckdb` (`SERVING_DUCKDB_PATH`) — read-only serving copy for web app +- Why not `serving.duckdb`: DuckDB derives catalog name from filename stem — "serving" would collide with the "serving" schema inside +- `export_serving` pipeline copies `serving.*` tables via Arrow + atomic rename after each transform +- Web app uses per-thread connections (`threading.local`) with inode-based reopen on rotation + +**Extraction pattern** — one workspace package per data source: +- All packages depend on `extract_core` (shared state tracking, HTTP, file writes) +- Landing zone is immutable and content-addressed: `{LANDING_DIR}/{source}/{partitions}/{hash}.ext` +- State tracked in SQLite at `{LANDING_DIR}/.state.sqlite` (WAL mode, OLTP — not DuckDB) +- Query state: `sqlite3 data/landing/.state.sqlite "SELECT * FROM extraction_runs ORDER BY run_id DESC LIMIT 20"` + +**Adding a new data source:** +```bash +# Create package +uv init --package extract/new_source +uv add --package new_source extract-core niquests + +# Add entry function in extract/new_source/src/new_source/execute.py +# Register in infra/supervisor/workflows.toml +# Add staging + foundation models in transform/sqlmesh_materia/models/ +``` + +**Supervisor** (`src/materia/supervisor.py`): +- Croniter-based scheduling with named presets: `hourly`, `daily`, `weekly`, `monthly` +- Workflow registry: `infra/supervisor/workflows.toml` +- Dependency-wave execution: independent workflows run in parallel (ThreadPoolExecutor) +- Each tick: git pull (tag-based) → due extractors → SQLMesh → export_serving → web deploy if changed +- Crash-safe: systemd `Restart=always` + 10-minute backoff on tick failure + +**CI/CD** (`.gitlab/.gitlab-ci.yml`) — pull-based, no SSH: +- `test` stage: pytest, sqlmesh test, web pytest +- `tag` stage: creates `v${CI_PIPELINE_IID}` tag after tests pass (master branch only) +- Supervisor polls for new tags every 60s, checks out latest, runs `uv sync` +- No SSH keys or deploy credentials in CI — only `CI_JOB_TOKEN` (built-in) + **CLI modules** (`src/materia/`): -- `cli.py` — Typer app with subcommands: worker, pipeline, secrets, version -- `workers.py` — Hetzner cloud instance management (for ad-hoc compute) +- `cli.py` — Typer app with subcommands: pipeline, secrets, version - `pipelines.py` — Local subprocess pipeline execution with bounded timeouts -- `secrets.py` — Pulumi ESC integration for environment secrets +- `secrets.py` — SOPS+age integration (decrypts `.env.prod.sops`) **Infrastructure** (`infra/`): -- Pulumi IaC for Cloudflare R2 buckets and Hetzner compute -- Supervisor systemd service for always-on orchestration (pulls git, runs pipelines) +- Pulumi IaC for Cloudflare R2 buckets +- Python supervisor + systemd service - rclone systemd timer for landing data backup to R2 +- `setup_server.sh` — one-time server init (age keypair generation) +- `bootstrap_supervisor.sh` — full server setup from scratch + +## Secrets management (SOPS + age) + +| File | Purpose | +|------|---------| +| `.env.dev.sops` | Dev defaults (safe values, local paths) | +| `.env.prod.sops` | Production secrets (encrypted) | +| `.sops.yaml` | Maps file patterns to age public keys | +| `age-key.txt` | Server age keypair (gitignored, generated by `setup_server.sh`) | + +```bash +make secrets-decrypt-dev # decrypt dev secrets → .env (local dev) +make secrets-edit-prod # edit prod secrets in $EDITOR +``` + +`web/deploy.sh` auto-decrypts `.env.prod.sops` → `web/.env` on each deploy. +`src/materia/secrets.py` decrypts on-demand via subprocess call to `sops`. + +**Adding the server key (new server setup):** +1. Run `infra/setup_server.sh` on the server — prints the age public key +2. Add the public key to `.sops.yaml` on your workstation +3. Run `sops updatekeys .env.prod.sops` +4. Commit + push + +## uv workspace management + +```bash +# Install everything (run from repo root) +uv sync --all-packages --all-groups + +# Create a new extraction package +uv init --package extract/new_source +uv add --package new_source extract-core niquests + +# Add a dependency to an existing package +uv add --package materia croniter +uv add --package beanflows duckdb + +# Run a command in a specific package context +uv run --package new_source python -c "import new_source" +``` + +Always use `uv` CLI to manage dependencies — never edit `pyproject.toml` manually for dependency changes. ## Coding Philosophy @@ -90,9 +195,9 @@ Read `coding_philosophy.md` for the full guide. Key points: - **Python 3.13** (`.python-version`) - **Ruff**: double quotes, spaces, E501 ignored (formatter handles line length) - **SQLMesh**: DuckDB dialect, `@daily` cron, start date `2025-07-07`, default env `dev_{{ user() }}` -- **Storage**: Local NVMe (`LANDING_DIR`, `DUCKDB_PATH`), R2 for backup via rclone -- **Secrets**: Pulumi ESC (`esc run beanflows/prod -- `) -- **CI**: GitLab CI (`.gitlab/.gitlab-ci.yml`) — runs pytest and sqlmesh test on push/MR +- **Storage**: Local NVMe (`LANDING_DIR`, `DUCKDB_PATH`, `SERVING_DUCKDB_PATH`), R2 for backup via rclone +- **Secrets**: SOPS + age (`.env.*.sops` files, Makefile targets) +- **CI**: GitLab CI — test → tag (pull-based deploy, no SSH) - **Pre-commit hooks**: installed via `pre-commit install` ## Environment Variables @@ -100,4 +205,7 @@ Read `coding_philosophy.md` for the full guide. Key points: | Variable | Default | Description | |----------|---------|-------------| | `LANDING_DIR` | `data/landing` | Root directory for extracted landing data | -| `DUCKDB_PATH` | `local.duckdb` | Path to the DuckDB lakehouse database | +| `DUCKDB_PATH` | `local.duckdb` | Path to the SQLMesh lakehouse database (exclusive write) | +| `SERVING_DUCKDB_PATH` | `analytics.duckdb` | Path to the serving DB (read by web app) | +| `ALERT_WEBHOOK_URL` | _(empty)_ | ntfy.sh URL for supervisor failure alerts | +| `SUPERVISOR_GIT_PULL` | _(unset)_ | Set to any value to enable tag-based git pull in supervisor | diff --git a/infra/readme.md b/infra/readme.md index 9a8060c..a7319a2 100644 --- a/infra/readme.md +++ b/infra/readme.md @@ -6,63 +6,101 @@ Single-server local-first setup for BeanFlows.coffee on Hetzner NVMe. ``` Hetzner Server (NVMe) -├── /opt/materia/ # Git repo, code, uv environment -├── /data/materia/landing/ # Extracted USDA data (year/month subdirs) -├── /data/materia/lakehouse.duckdb # SQLMesh output database +├── /opt/materia/ # Git repo (checked out at latest release tag) +├── /opt/materia/age-key.txt # Server age keypair (chmod 600, gitignored) +├── /opt/materia/.env # Decrypted from .env.prod.sops at deploy time +├── /data/materia/landing/ # Extracted raw data (immutable, content-addressed) +├── /data/materia/lakehouse.duckdb # SQLMesh exclusive write +├── /data/materia/analytics.duckdb # Read-only serving copy for web app └── systemd services: - ├── materia-supervisor # Pulls git, runs extract + transform daily - └── materia-backup.timer # Syncs landing/ to R2 every 6 hours + ├── materia-supervisor # Python supervisor: extract → transform → export → deploy + └── materia-backup.timer # rclone: syncs landing/ to R2 every 6 hours ``` ## Data Flow -1. **Extract**: USDA API → `/data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip` -2. **Transform**: SQLMesh reads landing CSVs → writes to `/data/materia/lakehouse.duckdb` -3. **Backup**: rclone syncs `/data/materia/landing/` → R2 `materia-raw/landing/` -4. **Web**: Reads `lakehouse.duckdb` (read-only) +1. **Extract** — Supervisor runs due extractors per `infra/supervisor/workflows.toml` +2. **Transform** — SQLMesh reads landing → writes `lakehouse.duckdb` +3. **Export** — `export_serving` copies `serving.*` → `analytics.duckdb` (atomic rename) +4. **Backup** — rclone syncs `/data/materia/landing/` → R2 `materia-raw/landing/` +5. **Web** — Web app reads `analytics.duckdb` read-only (per-thread connections) -## Setup +## Setup (new server) -### Prerequisites - -- Hetzner server with NVMe storage -- Pulumi ESC configured (`beanflows/prod` environment) -- `GITLAB_READ_TOKEN` and `PULUMI_ACCESS_TOKEN` set - -### Bootstrap +### 1. Run setup_server.sh ```bash -# From local machine or CI: +bash infra/setup_server.sh +``` + +This creates data directories, installs age, and generates the server age keypair at `/opt/materia/age-key.txt`. It prints the server's age public key. + +### 2. Add the server key to SOPS + +On your workstation: + +```bash +# Add the server public key to .sops.yaml +# Then re-encrypt prod secrets to include the server key: +sops updatekeys .env.prod.sops +git add .sops.yaml .env.prod.sops +git commit -m "chore: add server age key" +git push +``` + +### 3. Bootstrap the supervisor + +```bash +# Requires GITLAB_READ_TOKEN (GitLab project access token, read-only) +export GITLAB_READ_TOKEN= ssh root@ 'bash -s' < infra/bootstrap_supervisor.sh ``` -This installs dependencies, clones the repo, creates data directories, and starts the supervisor service. +This installs uv + sops + age, clones the repo, decrypts secrets, installs Python dependencies, and starts the supervisor service. -### R2 Backup - -1. Install rclone: `apt install rclone` -2. Copy and configure: `cp infra/backup/rclone.conf.example /root/.config/rclone/rclone.conf` -3. Fill in R2 credentials from Pulumi ESC -4. Install systemd units: +### 4. Set up R2 backup ```bash +apt install rclone +cp infra/backup/rclone.conf.example /root/.config/rclone/rclone.conf +# Fill in R2 credentials from .env.prod.sops (ACCESS_KEY_ID, SECRET_ACCESS_KEY, bucket endpoint) cp infra/backup/materia-backup.service /etc/systemd/system/ cp infra/backup/materia-backup.timer /etc/systemd/system/ systemctl daemon-reload systemctl enable --now materia-backup.timer ``` -## Pulumi IaC +## Secrets management -Still manages Cloudflare R2 buckets and can provision Hetzner instances: +Secrets are stored as SOPS-encrypted dotenv files in the repo root: + +| File | Purpose | +|------|---------| +| `.env.dev.sops` | Dev defaults (safe values, local paths) | +| `.env.prod.sops` | Production secrets | +| `.sops.yaml` | Maps file patterns to age public keys | ```bash -cd infra -pulumi login -pulumi stack select prod -pulumi up +# Decrypt for local dev +make secrets-decrypt-dev + +# Edit prod secrets +make secrets-edit-prod ``` +`bootstrap_supervisor.sh` decrypts `.env.prod.sops` → `/opt/materia/.env` during setup. +`web/deploy.sh` re-decrypts on every deploy (so secret rotations take effect automatically). + +## Deploy model (pull-based) + +No SSH keys or deploy credentials in CI. + +1. CI runs tests (`test:cli`, `test:sqlmesh`, `test:web`) +2. On master, CI creates tag `v${CI_PIPELINE_IID}` using built-in `CI_JOB_TOKEN` +3. Supervisor polls for new tags every 60s +4. When a new tag appears: `git checkout --detach ` + `uv sync --all-packages` +5. If `web/` files changed: `./web/deploy.sh` (Docker blue/green + health check) + ## Monitoring ```bash @@ -70,9 +108,27 @@ pulumi up systemctl status materia-supervisor journalctl -u materia-supervisor -f +# Workflow status table +cd /opt/materia && uv run python src/materia/supervisor.py status + # Backup timer status systemctl list-timers materia-backup.timer journalctl -u materia-backup -f + +# Extraction state DB +sqlite3 /data/materia/landing/.state.sqlite \ + "SELECT extractor, status, finished_at FROM extraction_runs ORDER BY run_id DESC LIMIT 20" +``` + +## Pulumi IaC + +Still manages Cloudflare R2 buckets: + +```bash +cd infra +pulumi login +pulumi stack select prod +pulumi up ``` ## Cost