Two bugs fixed: 1. Cross-connection COPY: DuckDB doesn't support referencing another connection's tables as src.serving.table. Replace with Arrow as intermediate: src reads to Arrow, dst.register() + CREATE TABLE. 2. Catalog/schema name collision: naming the export file serving.duckdb made DuckDB assign catalog name "serving" — same as the schema we create inside it. Every serving.table query became ambiguous. Rename to analytics.duckdb (catalog "analytics", schema "serving" = no clash). SERVING_DUCKDB_PATH values updated: serving.duckdb → analytics.duckdb in supervisor, service, bootstrap, dev_run.sh, .env.example, docker-compose. 3. Temp file: use _export.duckdb (not serving.duckdb.tmp) to avoid the same catalog collision during the write phase. Verified: 6 tables exported, serving.* queries work read-only. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
70 lines
2.7 KiB
Bash
70 lines
2.7 KiB
Bash
#!/bin/sh
|
|
# Materia Supervisor - Continuous pipeline orchestration
|
|
# Inspired by TigerBeetle's CFO supervisor: simple, resilient, easy to understand
|
|
# https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh
|
|
#
|
|
# Environment variables (set in systemd EnvironmentFile):
|
|
# LANDING_DIR — local path for extracted landing data
|
|
# DUCKDB_PATH — path to DuckDB lakehouse file (SQLMesh pipeline DB)
|
|
# SERVING_DUCKDB_PATH — path to serving-only DuckDB (web app reads from here)
|
|
# ALERT_WEBHOOK_URL — optional ntfy.sh / Slack / Telegram webhook for failure alerts
|
|
|
|
set -eu
|
|
|
|
readonly REPO_DIR="/opt/materia"
|
|
|
|
while true
|
|
do
|
|
(
|
|
# Clone repo if missing
|
|
if ! [ -d "$REPO_DIR/.git" ]
|
|
then
|
|
echo "Repository not found, bootstrap required!"
|
|
exit 1
|
|
fi
|
|
|
|
cd "$REPO_DIR"
|
|
|
|
# Update code from git
|
|
git fetch origin master
|
|
git switch --discard-changes --detach origin/master
|
|
uv sync
|
|
|
|
# Extract all data sources
|
|
LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
|
uv run materia pipeline run extract
|
|
|
|
LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
|
uv run materia pipeline run extract_cot
|
|
|
|
LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
|
uv run materia pipeline run extract_prices
|
|
|
|
LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
|
uv run materia pipeline run extract_ice
|
|
|
|
# Transform all data sources
|
|
LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
|
uv run materia pipeline run transform
|
|
|
|
# Export serving tables to analytics.duckdb (atomic swap).
|
|
# The web app reads from SERVING_DUCKDB_PATH and picks up the new file
|
|
# automatically via inode-based connection reopen — no restart needed.
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
|
SERVING_DUCKDB_PATH="${SERVING_DUCKDB_PATH:-/data/materia/analytics.duckdb}" \
|
|
uv run materia pipeline run export_serving
|
|
|
|
) || {
|
|
# Notify on failure if webhook is configured, then sleep to avoid busy-loop
|
|
if [ -n "${ALERT_WEBHOOK_URL:-}" ]; then
|
|
curl -s -d "Materia pipeline failed at $(date)" "$ALERT_WEBHOOK_URL" 2>/dev/null || true
|
|
fi
|
|
sleep 600 # Sleep 10 min on failure
|
|
}
|
|
done
|