diff --git a/src/beanflows_pipeline/export_serving.py b/src/beanflows_pipeline/export_serving.py index a1c129a..bc74176 100644 --- a/src/beanflows_pipeline/export_serving.py +++ b/src/beanflows_pipeline/export_serving.py @@ -9,12 +9,24 @@ writes them to a temp file, then atomically renames it to the serving DB path The web app's _get_conn() detects the inode change on the next query and reopens the connection automatically — no restart or signal required. +Why two files? + SQLMesh holds an exclusive write lock on DUCKDB_PATH during plan/run. + The web app needs read-only access at all times. Two separate files allow + both to operate concurrently: SQLMesh writes to the pipeline DB, the web + app reads from the serving DB, and this script swaps them atomically. + +The temp file is named _export.duckdb (not serving.duckdb.tmp) because DuckDB +names its catalog after the filename stem. A file named serving.* would create +a catalog named 'serving', which conflicts with the schema named 'serving' +inside the file, making all queries ambiguous. + Usage: - DUCKDB_PATH=lakehouse.duckdb SERVING_DUCKDB_PATH=serving.duckdb \ + DUCKDB_PATH=lakehouse.duckdb SERVING_DUCKDB_PATH=analytics.duckdb \\ uv run beanflows pipeline run export_serving """ import logging import os +import re import duckdb @@ -29,31 +41,46 @@ def export_serving() -> None: assert serving_path, "SERVING_DUCKDB_PATH must be set" assert os.path.exists(pipeline_path), f"Pipeline DB not found: {pipeline_path}" - # Temp path must not start with "serving" — DuckDB names the catalog after - # the filename stem, so "serving.duckdb.tmp" → catalog "serving", which - # clashes with the schema we create inside it. + # Temp path in the same directory as the serving DB so rename() is atomic + # (rename across filesystems is not atomic on Linux). tmp_path = os.path.join(os.path.dirname(os.path.abspath(serving_path)), "_export.duckdb") src = duckdb.connect(pipeline_path, read_only=True) try: - tables = src.sql( - "SELECT table_name FROM information_schema.tables" - " WHERE table_schema = 'serving' ORDER BY table_name" + # SQLMesh creates serving views that reference "local".sqlmesh__serving.* + # which fails when connecting directly. Resolve the physical table each + # view points to by parsing the view definition. + view_rows = src.execute( + "SELECT view_name, sql FROM duckdb_views()" + " WHERE schema_name = 'serving' ORDER BY view_name" ).fetchall() - assert tables, f"No tables found in serving schema of {pipeline_path}" - logger.info(f"Exporting {len(tables)} serving tables: {[t[0] for t in tables]}") + assert view_rows, f"No views found in serving schema of {pipeline_path}" + + # Extract physical table reference from: CREATE VIEW ... AS SELECT * FROM "local".schema.table; + # Strip the "local". prefix to get schema.table + physical_tables: list[tuple[str, str]] = [] # (logical_name, physical_ref) + for view_name, view_sql in view_rows: + match = re.search(r'FROM\s+"local"\.(sqlmesh__serving\.\S+)', view_sql) + assert match, f"Cannot parse view definition for {view_name}: {view_sql[:200]}" + physical_tables.append((view_name, match.group(1))) + + logger.info( + "Exporting %d serving tables: %s", + len(physical_tables), + [name for name, _ in physical_tables], + ) dst = duckdb.connect(tmp_path) try: dst.execute("CREATE SCHEMA IF NOT EXISTS serving") - for (table,) in tables: - # Read via Arrow so there is no cross-connection catalog ambiguity. - arrow_data = src.sql(f"SELECT * FROM serving.{table}").arrow() + for logical_name, physical_ref in physical_tables: + # Read via Arrow to avoid cross-connection catalog ambiguity. + arrow_data = src.sql(f"SELECT * FROM {physical_ref}").arrow() dst.register("_src", arrow_data) - dst.execute(f"CREATE OR REPLACE TABLE serving.{table} AS SELECT * FROM _src") + dst.execute(f"CREATE OR REPLACE TABLE serving.{logical_name} AS SELECT * FROM _src") dst.unregister("_src") - row_count = dst.sql(f"SELECT count(*) FROM serving.{table}").fetchone()[0] - logger.info(f" serving.{table}: {row_count:,} rows") + row_count = dst.sql(f"SELECT count(*) FROM serving.{logical_name}").fetchone()[0] + logger.info(f" serving.{logical_name}: {row_count:,} rows") finally: dst.close() finally: