Convert the availability chain (stg_playtomic_availability → fct_availability_slot → fct_daily_availability) from FULL to INCREMENTAL_BY_TIME_RANGE so sqlmesh run processes only new daily intervals instead of re-reading all files. Supervisor changes: - run_transform(): plan prod --auto-apply → run prod (evaluates missing cron intervals, picks up new data) - git_pull_and_sync(): add plan prod --auto-apply before re-exec so model code changes are applied on deploy - supervisor.sh: same plan → run change Staging model uses a date-scoped glob (@start_ds) to read only the current interval's files. snapshot_date cast to DATE (was VARCHAR) as required by time_column. Clean up redundant TRY_CAST(snapshot_date AS DATE) in venue_pricing_benchmarks since it's already DATE from foundation. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
58 lines
2.2 KiB
Bash
58 lines
2.2 KiB
Bash
#!/bin/sh
|
|
# Padelnomics Supervisor — continuous pipeline orchestration.
|
|
# Inspired by TigerBeetle's CFO supervisor: simple, resilient, easy to understand.
|
|
# https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh
|
|
#
|
|
# Environment variables (set in systemd EnvironmentFile or .env):
|
|
# LANDING_DIR — local path for extracted landing data
|
|
# DUCKDB_PATH — path to DuckDB lakehouse (pipeline DB, SQLMesh exclusive)
|
|
# SERVING_DUCKDB_PATH — path to serving-only DuckDB (web app reads from here)
|
|
# ALERT_WEBHOOK_URL — optional ntfy.sh / Slack / Telegram webhook for failures
|
|
|
|
set -eu
|
|
|
|
readonly REPO_DIR="/opt/padelnomics"
|
|
|
|
while true
|
|
do
|
|
(
|
|
if ! [ -d "$REPO_DIR/.git" ]; then
|
|
echo "Repository not found at $REPO_DIR — bootstrap required!"
|
|
exit 1
|
|
fi
|
|
|
|
cd "$REPO_DIR"
|
|
|
|
# Pull latest code
|
|
git fetch origin master
|
|
git switch --discard-changes --detach origin/master
|
|
uv sync
|
|
|
|
# Extract
|
|
LANDING_DIR="${LANDING_DIR:-/data/padelnomics/landing}" \
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \
|
|
uv run --package padelnomics_extract extract
|
|
|
|
# Transform — run evaluates missing daily intervals for incremental models.
|
|
LANDING_DIR="${LANDING_DIR:-/data/padelnomics/landing}" \
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \
|
|
uv run sqlmesh -p transform/sqlmesh_padelnomics run prod
|
|
|
|
# Export serving tables to analytics.duckdb (atomic swap).
|
|
# The web app detects the inode change on next query — no restart needed.
|
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \
|
|
SERVING_DUCKDB_PATH="${SERVING_DUCKDB_PATH:-/data/padelnomics/analytics.duckdb}" \
|
|
uv run python -m padelnomics.export_serving
|
|
|
|
) || {
|
|
if [ -n "${ALERT_WEBHOOK_URL:-}" ]; then
|
|
curl -s -d "Padelnomics pipeline failed at $(date)" \
|
|
"$ALERT_WEBHOOK_URL" 2>/dev/null || true
|
|
fi
|
|
sleep 600 # back off 10 min on failure
|
|
continue
|
|
}
|
|
|
|
sleep 86400 # run once per day
|
|
done
|