Files
beanflows/transform/sqlmesh_materia
Deeman 67c048485b Add Phase 1A-C + ICE warehouse stocks: prices, methodology, pipeline automation
Phase 1A — KC=F Coffee Futures Prices:
- New extract/coffee_prices/ package (yfinance): downloads KC=F daily OHLCV,
  stores as gzip CSV with SHA256-based idempotency
- SQLMesh models: raw/coffee_prices → foundation/fct_coffee_prices →
  serving/coffee_prices (with 20d/50d SMA, 52-week high/low, daily return %)
- Dashboard: 4 metric cards + dual-line chart (close, 20d MA, 50d MA)
- API: GET /commodities/<ticker>/prices

Phase 1B — Data Methodology Page:
- New /methodology route with full-page template (base.html)
- 6 anchored sections: USDA PSD, CFTC COT, KC=F price, ICE warehouse stocks,
  data quality model, update schedule table
- "Methodology" link added to marketing footer

Phase 1C — Automated Pipeline:
- supervisor.sh updated: runs extract_cot, extract_prices, extract_ice in
  sequence before transform
- Webhook failure alerting via ALERT_WEBHOOK_URL env var (ntfy/Slack/Telegram)

ICE Warehouse Stocks:
- New extract/ice_stocks/ package (niquests): normalizes ICE Report Center CSV
  to canonical schema, hash-based idempotency, soft-fail on 404 with guidance
- SQLMesh models: raw/ice_warehouse_stocks → foundation/fct_ice_warehouse_stocks
  → serving/ice_warehouse_stocks (30d avg, WoW change, 52w drawdown)
- Dashboard: 4 metric cards + line chart (certified bags + 30d avg)
- API: GET /commodities/<code>/stocks

Foundation:
- dim_commodity: added ticker (KC=F) and ice_stock_report_code (COFFEE-C) columns
- macros/__init__.py: added prices_glob() and ice_stocks_glob()
- pipelines.py: added extract_prices and extract_ice entries

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 11:41:43 +01:00
..
2025-07-26 22:32:47 +02:00
2025-09-10 18:46:18 +02:00
2026-02-04 22:24:55 +01:00

Materia SQLMesh Transform Layer

Data transformation pipeline using SQLMesh and DuckDB, implementing a 4-layer architecture.

Quick Start

cd transform/sqlmesh_materia

# Local development (virtual environment)
sqlmesh plan dev_<username>

# Production
sqlmesh plan prod

# Run tests
sqlmesh test

# Format SQL
sqlmesh format

Architecture

Gateway Configuration

Single Gateway: All environments connect to Cloudflare R2 Data Catalog (Apache Iceberg)

  • Production: sqlmesh plan prod
  • Development: sqlmesh plan dev_<username> (isolated virtual environment)

SQLMesh manages environment isolation automatically - no need for separate local databases.

4-Layer Data Model

See models/README.md for detailed architecture documentation:

  1. Raw - Immutable source data
  2. Staging - Schema, types, basic cleansing
  3. Cleaned - Business logic, integration
  4. Serving - Analytics-ready (facts, dimensions, aggregates)

Configuration

Config: config.yaml

  • DuckDB in-memory with R2 Iceberg catalog
  • Extensions: httpfs, iceberg
  • Auto-apply enabled (no prompts)
  • Initialization hooks for R2 secret/catalog attachment

Commands

# Plan changes for dev environment
sqlmesh plan dev_yourname

# Plan changes for prod
sqlmesh plan prod

# Run tests
sqlmesh test

# Validate models
sqlmesh validate

# Run audits
sqlmesh audit

# Format SQL files
sqlmesh format

# Start web UI
sqlmesh ui

Environment Variables (Prod)

Required for production R2 Iceberg catalog:

  • CLOUDFLARE_API_TOKEN - R2 API token
  • ICEBERG_REST_URI - R2 catalog REST endpoint
  • R2_WAREHOUSE_NAME - Warehouse name (default: "materia")

These are injected via Pulumi ESC (beanflows/prod) on the supervisor instance.

Development Workflow

  1. Make changes to models in models/
  2. Test locally: sqlmesh test
  3. Plan changes: sqlmesh plan dev_yourname
  4. Review and apply changes
  5. Commit and push to trigger CI/CD

SQLMesh will handle environment isolation, table versioning, and incremental updates automatically.