Phase 1A — KC=F Coffee Futures Prices: - New extract/coffee_prices/ package (yfinance): downloads KC=F daily OHLCV, stores as gzip CSV with SHA256-based idempotency - SQLMesh models: raw/coffee_prices → foundation/fct_coffee_prices → serving/coffee_prices (with 20d/50d SMA, 52-week high/low, daily return %) - Dashboard: 4 metric cards + dual-line chart (close, 20d MA, 50d MA) - API: GET /commodities/<ticker>/prices Phase 1B — Data Methodology Page: - New /methodology route with full-page template (base.html) - 6 anchored sections: USDA PSD, CFTC COT, KC=F price, ICE warehouse stocks, data quality model, update schedule table - "Methodology" link added to marketing footer Phase 1C — Automated Pipeline: - supervisor.sh updated: runs extract_cot, extract_prices, extract_ice in sequence before transform - Webhook failure alerting via ALERT_WEBHOOK_URL env var (ntfy/Slack/Telegram) ICE Warehouse Stocks: - New extract/ice_stocks/ package (niquests): normalizes ICE Report Center CSV to canonical schema, hash-based idempotency, soft-fail on 404 with guidance - SQLMesh models: raw/ice_warehouse_stocks → foundation/fct_ice_warehouse_stocks → serving/ice_warehouse_stocks (30d avg, WoW change, 52w drawdown) - Dashboard: 4 metric cards + line chart (certified bags + 30d avg) - API: GET /commodities/<code>/stocks Foundation: - dim_commodity: added ticker (KC=F) and ice_stock_report_code (COFFEE-C) columns - macros/__init__.py: added prices_glob() and ice_stocks_glob() - pipelines.py: added extract_prices and extract_ice entries Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
62 lines
2.2 KiB
Bash
62 lines
2.2 KiB
Bash
#!/bin/sh
|
|
# Materia Supervisor - Continuous pipeline orchestration
|
|
# Inspired by TigerBeetle's CFO supervisor: simple, resilient, easy to understand
|
|
# https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh
|
|
#
|
|
# Environment variables (set in systemd EnvironmentFile):
|
|
# LANDING_DIR — local path for extracted landing data
|
|
# DUCKDB_PATH — path to DuckDB lakehouse file
|
|
# ALERT_WEBHOOK_URL — optional ntfy.sh / Slack / Telegram webhook for failure alerts
|
|
|
|
set -eu
|
|
|
|
readonly REPO_DIR="/opt/materia"
|
|
|
|
while true
|
|
do
|
|
(
|
|
# Clone repo if missing
|
|
if ! [ -d "$REPO_DIR/.git" ]
|
|
then
|
|
echo "Repository not found, bootstrap required!"
|
|
exit 1
|
|
fi
|
|
|
|
cd "$REPO_DIR"
|
|
|
|
# Update code from git
|
|
git fetch origin master
|
|
git switch --discard-changes --detach origin/master
|
|
uv sync
|
|
|
|
# Extract all data sources
|
|
LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
|
uv run materia pipeline run extract
|
|
|
|
LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
|
uv run materia pipeline run extract_cot
|
|
|
|
LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
|
uv run materia pipeline run extract_prices
|
|
|
|
LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
|
uv run materia pipeline run extract_ice
|
|
|
|
# Transform all data sources
|
|
LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
|
uv run materia pipeline run transform
|
|
|
|
) || {
|
|
# Notify on failure if webhook is configured, then sleep to avoid busy-loop
|
|
if [ -n "${ALERT_WEBHOOK_URL:-}" ]; then
|
|
curl -s -d "Materia pipeline failed at $(date)" "$ALERT_WEBHOOK_URL" 2>/dev/null || true
|
|
fi
|
|
sleep 600 # Sleep 10 min on failure
|
|
}
|
|
done
|