fix(export_serving): resolve physical tables from SQLMesh view definitions
SQLMesh creates views in the serving schema that reference "local".sqlmesh__serving.* internally. Querying serving.table directly fails with "Catalog 'local' does not exist" when connecting to the file outside SQLMesh's ATTACH context. Parse each view's SQL to extract the physical table ref and query it directly — same approach already used in padelnomics. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -9,12 +9,24 @@ writes them to a temp file, then atomically renames it to the serving DB path
|
||||
The web app's _get_conn() detects the inode change on the next query and
|
||||
reopens the connection automatically — no restart or signal required.
|
||||
|
||||
Why two files?
|
||||
SQLMesh holds an exclusive write lock on DUCKDB_PATH during plan/run.
|
||||
The web app needs read-only access at all times. Two separate files allow
|
||||
both to operate concurrently: SQLMesh writes to the pipeline DB, the web
|
||||
app reads from the serving DB, and this script swaps them atomically.
|
||||
|
||||
The temp file is named _export.duckdb (not serving.duckdb.tmp) because DuckDB
|
||||
names its catalog after the filename stem. A file named serving.* would create
|
||||
a catalog named 'serving', which conflicts with the schema named 'serving'
|
||||
inside the file, making all queries ambiguous.
|
||||
|
||||
Usage:
|
||||
DUCKDB_PATH=lakehouse.duckdb SERVING_DUCKDB_PATH=serving.duckdb \
|
||||
DUCKDB_PATH=lakehouse.duckdb SERVING_DUCKDB_PATH=analytics.duckdb \\
|
||||
uv run beanflows pipeline run export_serving
|
||||
"""
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
||||
import duckdb
|
||||
|
||||
@@ -29,31 +41,46 @@ def export_serving() -> None:
|
||||
assert serving_path, "SERVING_DUCKDB_PATH must be set"
|
||||
assert os.path.exists(pipeline_path), f"Pipeline DB not found: {pipeline_path}"
|
||||
|
||||
# Temp path must not start with "serving" — DuckDB names the catalog after
|
||||
# the filename stem, so "serving.duckdb.tmp" → catalog "serving", which
|
||||
# clashes with the schema we create inside it.
|
||||
# Temp path in the same directory as the serving DB so rename() is atomic
|
||||
# (rename across filesystems is not atomic on Linux).
|
||||
tmp_path = os.path.join(os.path.dirname(os.path.abspath(serving_path)), "_export.duckdb")
|
||||
|
||||
src = duckdb.connect(pipeline_path, read_only=True)
|
||||
try:
|
||||
tables = src.sql(
|
||||
"SELECT table_name FROM information_schema.tables"
|
||||
" WHERE table_schema = 'serving' ORDER BY table_name"
|
||||
# SQLMesh creates serving views that reference "local".sqlmesh__serving.*
|
||||
# which fails when connecting directly. Resolve the physical table each
|
||||
# view points to by parsing the view definition.
|
||||
view_rows = src.execute(
|
||||
"SELECT view_name, sql FROM duckdb_views()"
|
||||
" WHERE schema_name = 'serving' ORDER BY view_name"
|
||||
).fetchall()
|
||||
assert tables, f"No tables found in serving schema of {pipeline_path}"
|
||||
logger.info(f"Exporting {len(tables)} serving tables: {[t[0] for t in tables]}")
|
||||
assert view_rows, f"No views found in serving schema of {pipeline_path}"
|
||||
|
||||
# Extract physical table reference from: CREATE VIEW ... AS SELECT * FROM "local".schema.table;
|
||||
# Strip the "local". prefix to get schema.table
|
||||
physical_tables: list[tuple[str, str]] = [] # (logical_name, physical_ref)
|
||||
for view_name, view_sql in view_rows:
|
||||
match = re.search(r'FROM\s+"local"\.(sqlmesh__serving\.\S+)', view_sql)
|
||||
assert match, f"Cannot parse view definition for {view_name}: {view_sql[:200]}"
|
||||
physical_tables.append((view_name, match.group(1)))
|
||||
|
||||
logger.info(
|
||||
"Exporting %d serving tables: %s",
|
||||
len(physical_tables),
|
||||
[name for name, _ in physical_tables],
|
||||
)
|
||||
|
||||
dst = duckdb.connect(tmp_path)
|
||||
try:
|
||||
dst.execute("CREATE SCHEMA IF NOT EXISTS serving")
|
||||
for (table,) in tables:
|
||||
# Read via Arrow so there is no cross-connection catalog ambiguity.
|
||||
arrow_data = src.sql(f"SELECT * FROM serving.{table}").arrow()
|
||||
for logical_name, physical_ref in physical_tables:
|
||||
# Read via Arrow to avoid cross-connection catalog ambiguity.
|
||||
arrow_data = src.sql(f"SELECT * FROM {physical_ref}").arrow()
|
||||
dst.register("_src", arrow_data)
|
||||
dst.execute(f"CREATE OR REPLACE TABLE serving.{table} AS SELECT * FROM _src")
|
||||
dst.execute(f"CREATE OR REPLACE TABLE serving.{logical_name} AS SELECT * FROM _src")
|
||||
dst.unregister("_src")
|
||||
row_count = dst.sql(f"SELECT count(*) FROM serving.{table}").fetchone()[0]
|
||||
logger.info(f" serving.{table}: {row_count:,} rows")
|
||||
row_count = dst.sql(f"SELECT count(*) FROM serving.{logical_name}").fetchone()[0]
|
||||
logger.info(f" serving.{logical_name}: {row_count:,} rows")
|
||||
finally:
|
||||
dst.close()
|
||||
finally:
|
||||
|
||||
Reference in New Issue
Block a user