diff --git a/infra/bootstrap_supervisor.sh b/infra/bootstrap_supervisor.sh index bb33c7b..d29330b 100755 --- a/infra/bootstrap_supervisor.sh +++ b/infra/bootstrap_supervisor.sh @@ -93,6 +93,7 @@ PULUMI_ACCESS_TOKEN=${PULUMI_ACCESS_TOKEN} PATH=/root/.cargo/bin:/root/.pulumi/bin:/usr/local/bin:/usr/bin:/bin LANDING_DIR=/data/materia/landing DUCKDB_PATH=/data/materia/lakehouse.duckdb +SERVING_DUCKDB_PATH=/data/materia/serving.duckdb EOF echo "--- Setting up systemd service ---" diff --git a/infra/supervisor/materia-supervisor.service b/infra/supervisor/materia-supervisor.service index aaaa55c..bf087c6 100644 --- a/infra/supervisor/materia-supervisor.service +++ b/infra/supervisor/materia-supervisor.service @@ -13,6 +13,7 @@ RestartSec=10 EnvironmentFile=/opt/materia/.env Environment=LANDING_DIR=/data/materia/landing Environment=DUCKDB_PATH=/data/materia/lakehouse.duckdb +Environment=SERVING_DUCKDB_PATH=/data/materia/serving.duckdb # Resource limits LimitNOFILE=65536 diff --git a/infra/supervisor/supervisor.sh b/infra/supervisor/supervisor.sh index 701fab4..f890968 100644 --- a/infra/supervisor/supervisor.sh +++ b/infra/supervisor/supervisor.sh @@ -4,9 +4,10 @@ # https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh # # Environment variables (set in systemd EnvironmentFile): -# LANDING_DIR — local path for extracted landing data -# DUCKDB_PATH — path to DuckDB lakehouse file -# ALERT_WEBHOOK_URL — optional ntfy.sh / Slack / Telegram webhook for failure alerts +# LANDING_DIR — local path for extracted landing data +# DUCKDB_PATH — path to DuckDB lakehouse file (SQLMesh pipeline DB) +# SERVING_DUCKDB_PATH — path to serving-only DuckDB (web app reads from here) +# ALERT_WEBHOOK_URL — optional ntfy.sh / Slack / Telegram webhook for failure alerts set -eu @@ -51,6 +52,13 @@ do DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ uv run materia pipeline run transform + # Export serving tables to serving.duckdb (atomic swap). + # The web app reads from SERVING_DUCKDB_PATH and picks up the new file + # automatically via inode-based connection reopen — no restart needed. + DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ + SERVING_DUCKDB_PATH="${SERVING_DUCKDB_PATH:-/data/materia/serving.duckdb}" \ + uv run materia pipeline run export_serving + ) || { # Notify on failure if webhook is configured, then sleep to avoid busy-loop if [ -n "${ALERT_WEBHOOK_URL:-}" ]; then diff --git a/src/materia/export_serving.py b/src/materia/export_serving.py new file mode 100644 index 0000000..6c308f6 --- /dev/null +++ b/src/materia/export_serving.py @@ -0,0 +1,59 @@ +""" +Export serving tables from lakehouse.duckdb to serving.duckdb (atomic swap). + +Called by the supervisor after each SQLMesh transform run. Reads all tables in +the 'serving' schema from the pipeline DB (DUCKDB_PATH / lakehouse.duckdb), +writes them to a temp file, then atomically renames it to the serving DB path +(SERVING_DUCKDB_PATH / serving.duckdb). + +The web app's _get_conn() detects the inode change on the next query and +reopens the connection automatically — no restart or signal required. + +Usage: + DUCKDB_PATH=lakehouse.duckdb SERVING_DUCKDB_PATH=serving.duckdb \ + uv run materia pipeline run export_serving +""" +import logging +import os + +import duckdb + +logger = logging.getLogger(__name__) + + +def export_serving() -> None: + """Copy all serving.* tables from the pipeline DB to the serving DB atomically.""" + pipeline_path = os.getenv("DUCKDB_PATH", "") + serving_path = os.getenv("SERVING_DUCKDB_PATH", "") + assert pipeline_path, "DUCKDB_PATH must be set" + assert serving_path, "SERVING_DUCKDB_PATH must be set" + assert os.path.exists(pipeline_path), f"Pipeline DB not found: {pipeline_path}" + + tmp_path = serving_path + ".tmp" + + src = duckdb.connect(pipeline_path, read_only=True) + try: + tables = src.sql( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'serving' ORDER BY table_name" + ).fetchall() + assert tables, f"No tables found in serving schema of {pipeline_path}" + logger.info(f"Exporting {len(tables)} serving tables: {[t[0] for t in tables]}") + + dst = duckdb.connect(tmp_path) + try: + dst.execute("CREATE SCHEMA IF NOT EXISTS serving") + for (table,) in tables: + dst.execute( + f"CREATE OR REPLACE TABLE serving.{table} AS " + f"SELECT * FROM src.serving.{table}", + ) + row_count = dst.sql(f"SELECT count(*) FROM serving.{table}").fetchone()[0] + logger.info(f" serving.{table}: {row_count:,} rows") + finally: + dst.close() + finally: + src.close() + + # Atomic rename — on Linux, rename() is atomic when src and dst are on the same filesystem. + os.rename(tmp_path, serving_path) + logger.info(f"Serving DB atomically updated: {serving_path}") diff --git a/src/materia/pipelines.py b/src/materia/pipelines.py index b193ad4..f94be5a 100644 --- a/src/materia/pipelines.py +++ b/src/materia/pipelines.py @@ -48,6 +48,14 @@ PIPELINES = { "command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"], "timeout_seconds": 3600, }, + # Copies serving.* tables from lakehouse.duckdb → serving.duckdb (atomic swap). + # Run after every transform. Requires both DUCKDB_PATH and SERVING_DUCKDB_PATH. + "export_serving": { + "command": ["uv", "run", "python", "-c", + "import logging; logging.basicConfig(level=logging.INFO); " + "from materia.export_serving import export_serving; export_serving()"], + "timeout_seconds": 300, + }, } diff --git a/web/.env.example b/web/.env.example index 65c0267..8934f85 100644 --- a/web/.env.example +++ b/web/.env.example @@ -7,7 +7,11 @@ ADMIN_EMAILS=admin@beanflows.coffee # Database DATABASE_PATH=data/app.db +# DUCKDB_PATH points to the full pipeline DB (lakehouse.duckdb) — used by SQLMesh and export_serving. +# SERVING_DUCKDB_PATH points to the serving-only export (serving.duckdb) — used by the web app. +# Run `uv run materia pipeline run export_serving` after each SQLMesh transform to populate it. DUCKDB_PATH=../local.duckdb +SERVING_DUCKDB_PATH=../serving.duckdb # Auth MAGIC_LINK_EXPIRY_MINUTES=15 diff --git a/web/docker-compose.yml b/web/docker-compose.yml index a29a6d4..dc58d4f 100644 --- a/web/docker-compose.yml +++ b/web/docker-compose.yml @@ -10,7 +10,7 @@ services: env_file: .env environment: - DATABASE_PATH=/app/data/app.db - - DUCKDB_PATH=/app/duckdb/local.duckdb + - SERVING_DUCKDB_PATH=/app/duckdb/serving.duckdb healthcheck: test: ["CMD", "curl", "-f", "http://localhost:5000/health"] interval: 30s diff --git a/web/src/beanflows/analytics.py b/web/src/beanflows/analytics.py index 11a75ff..374a1a0 100644 --- a/web/src/beanflows/analytics.py +++ b/web/src/beanflows/analytics.py @@ -2,10 +2,41 @@ DuckDB analytics data access layer. Bridge between the async Quart app and sync DuckDB reads. -All queries run via asyncio.to_thread() against a read-only connection. +All queries run via asyncio.to_thread() against a per-thread read-only connection. + +## Connection architecture + +The web app reads from `serving.duckdb` (SERVING_DUCKDB_PATH), a small file containing +only the serving-layer tables. The full pipeline DB (DUCKDB_PATH / lakehouse.duckdb) is +SQLMesh's exclusive domain — the web app never opens it. + +After each SQLMesh run, an export script copies all serving.* tables from lakehouse.duckdb +to serving_new.duckdb, then atomically renames it to serving.duckdb. The inode-based +reopen in _get_conn() picks up the new file automatically on the next query. + +Per-thread connections (threading.local) are used because: +- DuckDB read_only=True allows multiple concurrent connections to the same file (fine). +- DuckDB connections are NOT thread-safe for concurrent use by multiple threads — each + thread needs its own connection object. +- asyncio.to_thread() dispatches to a thread pool; concurrent dashboard queries run in + parallel threads, so each must have its own connection. + +## Future scaling path + +At tens of GBs of serving data, atomic file swap becomes impractical. Migration options: +- DuckLake: DuckDB extension (duckdb/ducklake, released May 2025) providing concurrent + multi-process read/write via an external SQL catalog (PostgreSQL/SQLite). Data stored + as Parquet files + catalog metadata. Experimental as of 2025. When mature, replace + _get_conn() / fetch_analytics() with DuckLake connections. +- Raw Parquet: Export serving tables as per-table Parquet files. Web app queries via + read_parquet(). Zero lock contention. ~10-20 line change in this file. +Both migrations are isolated to open_analytics_db() and _get_conn() — all 18+ query +functions above stay unchanged. """ import asyncio import os +import threading +from pathlib import Path import duckdb @@ -32,35 +63,56 @@ ALLOWED_METRICS = frozenset({ "production_yoy_pct", }) -_conn: duckdb.DuckDBPyConnection | None = None +_local = threading.local() +_db_path: str = "" def open_analytics_db() -> None: - """Open read-only DuckDB connection. No-op if the database file does not exist.""" - global _conn - import pathlib - db_path = os.getenv("DUCKDB_PATH", "") - assert db_path, "DUCKDB_PATH environment variable must be set" - if not pathlib.Path(db_path).exists(): - print(f"[analytics] DuckDB not found at {db_path!r} — analytics disabled") + """Store the serving DB path. Connections are created per-thread on first use.""" + global _db_path + db_path = os.getenv("SERVING_DUCKDB_PATH", "") + assert db_path, "SERVING_DUCKDB_PATH environment variable must be set" + if not Path(db_path).exists(): + print(f"[analytics] Serving DuckDB not found at {db_path!r} — analytics disabled") return - _conn = duckdb.connect(db_path, read_only=True) + _db_path = db_path def close_analytics_db() -> None: - """Close DuckDB connection.""" - global _conn - if _conn: - _conn.close() - _conn = None + """Close this thread's DuckDB connection if open.""" + conn = getattr(_local, "conn", None) + if conn: + conn.close() + _local.conn = None + global _db_path + _db_path = "" + + +def _get_conn() -> duckdb.DuckDBPyConnection: + """Return a read-only DuckDB connection for the current thread. + + Opens a new connection on first call per thread, or when the file's inode + changes (i.e. the export script atomically swapped serving.duckdb). + Cost: one os.stat() per call (~1 µs). + """ + current_ino = os.stat(_db_path).st_ino + conn = getattr(_local, "conn", None) + conn_ino = getattr(_local, "ino", 0) + if conn is None or conn_ino != current_ino: + if conn: + conn.close() + _local.conn = duckdb.connect(_db_path, read_only=True) + _local.ino = current_ino + return _local.conn async def fetch_analytics(sql: str, params: list | None = None) -> list[dict]: """Run a read-only DuckDB query off the event loop. Returns list of dicts.""" - assert _conn is not None, "Analytics DB not initialized — call open_analytics_db() first" + assert _db_path, "Analytics DB not configured — call open_analytics_db() first" def _query(): - cursor = _conn.cursor() + conn = _get_conn() + cursor = conn.cursor() result = cursor.execute(sql, params or []) columns = [desc[0] for desc in result.description] return [dict(zip(columns, row)) for row in result.fetchall()] diff --git a/web/src/beanflows/app.py b/web/src/beanflows/app.py index b364725..546b2f9 100644 --- a/web/src/beanflows/app.py +++ b/web/src/beanflows/app.py @@ -34,7 +34,7 @@ def create_app() -> Quart: @app.before_serving async def startup(): await init_db() - if os.getenv("DUCKDB_PATH"): + if os.getenv("SERVING_DUCKDB_PATH"): open_analytics_db() @app.after_serving @@ -105,7 +105,7 @@ def create_app() -> Quart: # Health check @app.route("/health") async def health(): - from .analytics import _conn as duckdb_conn + from .analytics import _db_path as serving_db_path from .analytics import fetch_analytics from .core import fetch_one result = {"status": "healthy", "sqlite": "ok", "duckdb": "ok"} @@ -114,7 +114,7 @@ def create_app() -> Quart: except Exception as e: result["sqlite"] = str(e) result["status"] = "unhealthy" - if duckdb_conn is not None: + if serving_db_path: try: await fetch_analytics("SELECT 1") except Exception as e: diff --git a/web/src/beanflows/dashboard/templates/settings.html b/web/src/beanflows/dashboard/templates/settings.html index eb2fda4..80dcda7 100644 --- a/web/src/beanflows/dashboard/templates/settings.html +++ b/web/src/beanflows/dashboard/templates/settings.html @@ -107,8 +107,8 @@