feat: DuckDB two-file architecture — resolve SQLMesh/web-app lock contention

Split the single lakehouse.duckdb into two files to eliminate the exclusive
write-lock conflict between SQLMesh (pipeline) and the Quart web app (reader):

  lakehouse.duckdb  — SQLMesh exclusive (all pipeline layers)
  serving.duckdb    — web app reads (serving tables only, atomically swapped)

Changes:

web/src/beanflows/analytics.py
- Replace persistent global _conn with per-thread connections (threading.local)
- Add _get_conn(): opens read_only=True on first call per thread, reopens
  automatically on inode change (~1μs os.stat) to pick up atomic file swaps
- Switch env var from DUCKDB_PATH → SERVING_DUCKDB_PATH
- Add module docstring documenting architecture + DuckLake migration path

web/src/beanflows/app.py
- Startup check: use SERVING_DUCKDB_PATH
- Health check: use _db_path instead of _conn

src/materia/export_serving.py (new)
- Reads all serving.* tables from lakehouse.duckdb (read_only)
- Writes to serving_new.duckdb, then os.rename → serving.duckdb (atomic)
- ~50 lines; runs after each SQLMesh transform

src/materia/pipelines.py
- Add export_serving pipeline entry (uv run python -c ...)

infra/supervisor/supervisor.sh
- Add SERVING_DUCKDB_PATH env var comment
- Add export step: uv run materia pipeline run export_serving

infra/supervisor/materia-supervisor.service
- Add Environment=SERVING_DUCKDB_PATH=/data/materia/serving.duckdb

infra/bootstrap_supervisor.sh
- Add SERVING_DUCKDB_PATH to .env template

web/.env.example + web/docker-compose.yml
- Document both env vars; switch web service to SERVING_DUCKDB_PATH

web/src/beanflows/dashboard/templates/settings.html
- Minor settings page fix from prior session

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-22 11:06:55 +01:00
parent ca7b2ab18b
commit b899bcbad4
10 changed files with 159 additions and 26 deletions

View File

@@ -93,6 +93,7 @@ PULUMI_ACCESS_TOKEN=${PULUMI_ACCESS_TOKEN}
PATH=/root/.cargo/bin:/root/.pulumi/bin:/usr/local/bin:/usr/bin:/bin PATH=/root/.cargo/bin:/root/.pulumi/bin:/usr/local/bin:/usr/bin:/bin
LANDING_DIR=/data/materia/landing LANDING_DIR=/data/materia/landing
DUCKDB_PATH=/data/materia/lakehouse.duckdb DUCKDB_PATH=/data/materia/lakehouse.duckdb
SERVING_DUCKDB_PATH=/data/materia/serving.duckdb
EOF EOF
echo "--- Setting up systemd service ---" echo "--- Setting up systemd service ---"

View File

@@ -13,6 +13,7 @@ RestartSec=10
EnvironmentFile=/opt/materia/.env EnvironmentFile=/opt/materia/.env
Environment=LANDING_DIR=/data/materia/landing Environment=LANDING_DIR=/data/materia/landing
Environment=DUCKDB_PATH=/data/materia/lakehouse.duckdb Environment=DUCKDB_PATH=/data/materia/lakehouse.duckdb
Environment=SERVING_DUCKDB_PATH=/data/materia/serving.duckdb
# Resource limits # Resource limits
LimitNOFILE=65536 LimitNOFILE=65536

View File

@@ -5,7 +5,8 @@
# #
# Environment variables (set in systemd EnvironmentFile): # Environment variables (set in systemd EnvironmentFile):
# LANDING_DIR — local path for extracted landing data # LANDING_DIR — local path for extracted landing data
# DUCKDB_PATH — path to DuckDB lakehouse file # DUCKDB_PATH — path to DuckDB lakehouse file (SQLMesh pipeline DB)
# SERVING_DUCKDB_PATH — path to serving-only DuckDB (web app reads from here)
# ALERT_WEBHOOK_URL — optional ntfy.sh / Slack / Telegram webhook for failure alerts # ALERT_WEBHOOK_URL — optional ntfy.sh / Slack / Telegram webhook for failure alerts
set -eu set -eu
@@ -51,6 +52,13 @@ do
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
uv run materia pipeline run transform uv run materia pipeline run transform
# Export serving tables to serving.duckdb (atomic swap).
# The web app reads from SERVING_DUCKDB_PATH and picks up the new file
# automatically via inode-based connection reopen — no restart needed.
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
SERVING_DUCKDB_PATH="${SERVING_DUCKDB_PATH:-/data/materia/serving.duckdb}" \
uv run materia pipeline run export_serving
) || { ) || {
# Notify on failure if webhook is configured, then sleep to avoid busy-loop # Notify on failure if webhook is configured, then sleep to avoid busy-loop
if [ -n "${ALERT_WEBHOOK_URL:-}" ]; then if [ -n "${ALERT_WEBHOOK_URL:-}" ]; then

View File

@@ -0,0 +1,59 @@
"""
Export serving tables from lakehouse.duckdb to serving.duckdb (atomic swap).
Called by the supervisor after each SQLMesh transform run. Reads all tables in
the 'serving' schema from the pipeline DB (DUCKDB_PATH / lakehouse.duckdb),
writes them to a temp file, then atomically renames it to the serving DB path
(SERVING_DUCKDB_PATH / serving.duckdb).
The web app's _get_conn() detects the inode change on the next query and
reopens the connection automatically — no restart or signal required.
Usage:
DUCKDB_PATH=lakehouse.duckdb SERVING_DUCKDB_PATH=serving.duckdb \
uv run materia pipeline run export_serving
"""
import logging
import os
import duckdb
logger = logging.getLogger(__name__)
def export_serving() -> None:
"""Copy all serving.* tables from the pipeline DB to the serving DB atomically."""
pipeline_path = os.getenv("DUCKDB_PATH", "")
serving_path = os.getenv("SERVING_DUCKDB_PATH", "")
assert pipeline_path, "DUCKDB_PATH must be set"
assert serving_path, "SERVING_DUCKDB_PATH must be set"
assert os.path.exists(pipeline_path), f"Pipeline DB not found: {pipeline_path}"
tmp_path = serving_path + ".tmp"
src = duckdb.connect(pipeline_path, read_only=True)
try:
tables = src.sql(
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'serving' ORDER BY table_name"
).fetchall()
assert tables, f"No tables found in serving schema of {pipeline_path}"
logger.info(f"Exporting {len(tables)} serving tables: {[t[0] for t in tables]}")
dst = duckdb.connect(tmp_path)
try:
dst.execute("CREATE SCHEMA IF NOT EXISTS serving")
for (table,) in tables:
dst.execute(
f"CREATE OR REPLACE TABLE serving.{table} AS "
f"SELECT * FROM src.serving.{table}",
)
row_count = dst.sql(f"SELECT count(*) FROM serving.{table}").fetchone()[0]
logger.info(f" serving.{table}: {row_count:,} rows")
finally:
dst.close()
finally:
src.close()
# Atomic rename — on Linux, rename() is atomic when src and dst are on the same filesystem.
os.rename(tmp_path, serving_path)
logger.info(f"Serving DB atomically updated: {serving_path}")

View File

@@ -48,6 +48,14 @@ PIPELINES = {
"command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"], "command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"],
"timeout_seconds": 3600, "timeout_seconds": 3600,
}, },
# Copies serving.* tables from lakehouse.duckdb → serving.duckdb (atomic swap).
# Run after every transform. Requires both DUCKDB_PATH and SERVING_DUCKDB_PATH.
"export_serving": {
"command": ["uv", "run", "python", "-c",
"import logging; logging.basicConfig(level=logging.INFO); "
"from materia.export_serving import export_serving; export_serving()"],
"timeout_seconds": 300,
},
} }

View File

@@ -7,7 +7,11 @@ ADMIN_EMAILS=admin@beanflows.coffee
# Database # Database
DATABASE_PATH=data/app.db DATABASE_PATH=data/app.db
# DUCKDB_PATH points to the full pipeline DB (lakehouse.duckdb) — used by SQLMesh and export_serving.
# SERVING_DUCKDB_PATH points to the serving-only export (serving.duckdb) — used by the web app.
# Run `uv run materia pipeline run export_serving` after each SQLMesh transform to populate it.
DUCKDB_PATH=../local.duckdb DUCKDB_PATH=../local.duckdb
SERVING_DUCKDB_PATH=../serving.duckdb
# Auth # Auth
MAGIC_LINK_EXPIRY_MINUTES=15 MAGIC_LINK_EXPIRY_MINUTES=15

View File

@@ -10,7 +10,7 @@ services:
env_file: .env env_file: .env
environment: environment:
- DATABASE_PATH=/app/data/app.db - DATABASE_PATH=/app/data/app.db
- DUCKDB_PATH=/app/duckdb/local.duckdb - SERVING_DUCKDB_PATH=/app/duckdb/serving.duckdb
healthcheck: healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5000/health"] test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
interval: 30s interval: 30s

View File

@@ -2,10 +2,41 @@
DuckDB analytics data access layer. DuckDB analytics data access layer.
Bridge between the async Quart app and sync DuckDB reads. Bridge between the async Quart app and sync DuckDB reads.
All queries run via asyncio.to_thread() against a read-only connection. All queries run via asyncio.to_thread() against a per-thread read-only connection.
## Connection architecture
The web app reads from `serving.duckdb` (SERVING_DUCKDB_PATH), a small file containing
only the serving-layer tables. The full pipeline DB (DUCKDB_PATH / lakehouse.duckdb) is
SQLMesh's exclusive domain — the web app never opens it.
After each SQLMesh run, an export script copies all serving.* tables from lakehouse.duckdb
to serving_new.duckdb, then atomically renames it to serving.duckdb. The inode-based
reopen in _get_conn() picks up the new file automatically on the next query.
Per-thread connections (threading.local) are used because:
- DuckDB read_only=True allows multiple concurrent connections to the same file (fine).
- DuckDB connections are NOT thread-safe for concurrent use by multiple threads — each
thread needs its own connection object.
- asyncio.to_thread() dispatches to a thread pool; concurrent dashboard queries run in
parallel threads, so each must have its own connection.
## Future scaling path
At tens of GBs of serving data, atomic file swap becomes impractical. Migration options:
- DuckLake: DuckDB extension (duckdb/ducklake, released May 2025) providing concurrent
multi-process read/write via an external SQL catalog (PostgreSQL/SQLite). Data stored
as Parquet files + catalog metadata. Experimental as of 2025. When mature, replace
_get_conn() / fetch_analytics() with DuckLake connections.
- Raw Parquet: Export serving tables as per-table Parquet files. Web app queries via
read_parquet(). Zero lock contention. ~10-20 line change in this file.
Both migrations are isolated to open_analytics_db() and _get_conn() — all 18+ query
functions above stay unchanged.
""" """
import asyncio import asyncio
import os import os
import threading
from pathlib import Path
import duckdb import duckdb
@@ -32,35 +63,56 @@ ALLOWED_METRICS = frozenset({
"production_yoy_pct", "production_yoy_pct",
}) })
_conn: duckdb.DuckDBPyConnection | None = None _local = threading.local()
_db_path: str = ""
def open_analytics_db() -> None: def open_analytics_db() -> None:
"""Open read-only DuckDB connection. No-op if the database file does not exist.""" """Store the serving DB path. Connections are created per-thread on first use."""
global _conn global _db_path
import pathlib db_path = os.getenv("SERVING_DUCKDB_PATH", "")
db_path = os.getenv("DUCKDB_PATH", "") assert db_path, "SERVING_DUCKDB_PATH environment variable must be set"
assert db_path, "DUCKDB_PATH environment variable must be set" if not Path(db_path).exists():
if not pathlib.Path(db_path).exists(): print(f"[analytics] Serving DuckDB not found at {db_path!r} — analytics disabled")
print(f"[analytics] DuckDB not found at {db_path!r} — analytics disabled")
return return
_conn = duckdb.connect(db_path, read_only=True) _db_path = db_path
def close_analytics_db() -> None: def close_analytics_db() -> None:
"""Close DuckDB connection.""" """Close this thread's DuckDB connection if open."""
global _conn conn = getattr(_local, "conn", None)
if _conn: if conn:
_conn.close() conn.close()
_conn = None _local.conn = None
global _db_path
_db_path = ""
def _get_conn() -> duckdb.DuckDBPyConnection:
"""Return a read-only DuckDB connection for the current thread.
Opens a new connection on first call per thread, or when the file's inode
changes (i.e. the export script atomically swapped serving.duckdb).
Cost: one os.stat() per call (~1 µs).
"""
current_ino = os.stat(_db_path).st_ino
conn = getattr(_local, "conn", None)
conn_ino = getattr(_local, "ino", 0)
if conn is None or conn_ino != current_ino:
if conn:
conn.close()
_local.conn = duckdb.connect(_db_path, read_only=True)
_local.ino = current_ino
return _local.conn
async def fetch_analytics(sql: str, params: list | None = None) -> list[dict]: async def fetch_analytics(sql: str, params: list | None = None) -> list[dict]:
"""Run a read-only DuckDB query off the event loop. Returns list of dicts.""" """Run a read-only DuckDB query off the event loop. Returns list of dicts."""
assert _conn is not None, "Analytics DB not initialized — call open_analytics_db() first" assert _db_path, "Analytics DB not configured — call open_analytics_db() first"
def _query(): def _query():
cursor = _conn.cursor() conn = _get_conn()
cursor = conn.cursor()
result = cursor.execute(sql, params or []) result = cursor.execute(sql, params or [])
columns = [desc[0] for desc in result.description] columns = [desc[0] for desc in result.description]
return [dict(zip(columns, row)) for row in result.fetchall()] return [dict(zip(columns, row)) for row in result.fetchall()]

View File

@@ -34,7 +34,7 @@ def create_app() -> Quart:
@app.before_serving @app.before_serving
async def startup(): async def startup():
await init_db() await init_db()
if os.getenv("DUCKDB_PATH"): if os.getenv("SERVING_DUCKDB_PATH"):
open_analytics_db() open_analytics_db()
@app.after_serving @app.after_serving
@@ -105,7 +105,7 @@ def create_app() -> Quart:
# Health check # Health check
@app.route("/health") @app.route("/health")
async def health(): async def health():
from .analytics import _conn as duckdb_conn from .analytics import _db_path as serving_db_path
from .analytics import fetch_analytics from .analytics import fetch_analytics
from .core import fetch_one from .core import fetch_one
result = {"status": "healthy", "sqlite": "ok", "duckdb": "ok"} result = {"status": "healthy", "sqlite": "ok", "duckdb": "ok"}
@@ -114,7 +114,7 @@ def create_app() -> Quart:
except Exception as e: except Exception as e:
result["sqlite"] = str(e) result["sqlite"] = str(e)
result["status"] = "unhealthy" result["status"] = "unhealthy"
if duckdb_conn is not None: if serving_db_path:
try: try:
await fetch_analytics("SELECT 1") await fetch_analytics("SELECT 1")
except Exception as e: except Exception as e:

View File

@@ -107,8 +107,8 @@
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"> <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<div class="mb-4"> <div class="mb-4">
<label for="key-name" class="form-label">Key Name</label> <label for="key-name" class="form-label">Key Name <span class="text-stone font-normal">(optional)</span></label>
<input type="text" id="key-name" name="name" class="form-input" placeholder="My API Key" required> <input type="text" id="key-name" name="name" class="form-input" placeholder="My API Key">
</div> </div>
<input type="hidden" name="scopes" value="read"> <input type="hidden" name="scopes" value="read">