feat: copier update v0.9.0 → v0.10.0
Pulls in template changes: export_serving.py for atomic DuckDB swap, supervisor export step, SQLMesh glob macro, server provisioning script, imprint template, and formatting improvements. Template scaffold SQL models excluded (padelnomics has real models). Web app routes/analytics unchanged (padelnomics-specific customizations). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
79
src/padelnomics/export_serving.py
Normal file
79
src/padelnomics/export_serving.py
Normal file
@@ -0,0 +1,79 @@
|
||||
"""
|
||||
Export serving tables from the pipeline DuckDB to the serving DuckDB (atomic swap).
|
||||
|
||||
Called by the supervisor after each SQLMesh transform run. Reads all tables in
|
||||
the 'serving' schema from the pipeline DB (DUCKDB_PATH), writes them to a temp
|
||||
file, then atomically renames it to the serving DB path (SERVING_DUCKDB_PATH).
|
||||
|
||||
The web app's analytics connection detects the inode change on the next query
|
||||
and reopens the connection automatically — no restart or signal required.
|
||||
|
||||
Why two files?
|
||||
SQLMesh holds an exclusive write lock on DUCKDB_PATH during plan/run.
|
||||
The web app needs read-only access at all times. Two separate files allow
|
||||
both to operate concurrently: SQLMesh writes to the pipeline DB, the web
|
||||
app reads from the serving DB, and this script swaps them atomically.
|
||||
|
||||
The temp file is named _export.duckdb (not serving.duckdb.tmp) because DuckDB
|
||||
names its catalog after the filename stem. A file named serving.* would create
|
||||
a catalog named 'serving', which conflicts with the schema named 'serving'
|
||||
inside the file, making all queries ambiguous.
|
||||
|
||||
Usage:
|
||||
DUCKDB_PATH=local.duckdb SERVING_DUCKDB_PATH=analytics.duckdb \\
|
||||
uv run python -m padelnomics.export_serving
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
import duckdb
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def export_serving() -> None:
|
||||
"""Copy all serving.* tables from the pipeline DB to the serving DB atomically."""
|
||||
pipeline_path = os.getenv("DUCKDB_PATH", "")
|
||||
serving_path = os.getenv("SERVING_DUCKDB_PATH", "")
|
||||
assert pipeline_path, "DUCKDB_PATH must be set"
|
||||
assert serving_path, "SERVING_DUCKDB_PATH must be set"
|
||||
assert os.path.exists(pipeline_path), f"Pipeline DB not found: {pipeline_path}"
|
||||
|
||||
# Temp path in the same directory as the serving DB so rename() is atomic
|
||||
# (rename across filesystems is not atomic on Linux).
|
||||
tmp_path = os.path.join(os.path.dirname(os.path.abspath(serving_path)), "_export.duckdb")
|
||||
|
||||
src = duckdb.connect(pipeline_path, read_only=True)
|
||||
try:
|
||||
tables = src.sql(
|
||||
"SELECT table_name FROM information_schema.tables"
|
||||
" WHERE table_schema = 'serving' ORDER BY table_name"
|
||||
).fetchall()
|
||||
assert tables, f"No tables found in serving schema of {pipeline_path}"
|
||||
logger.info(f"Exporting {len(tables)} serving tables: {[t[0] for t in tables]}")
|
||||
|
||||
dst = duckdb.connect(tmp_path)
|
||||
try:
|
||||
dst.execute("CREATE SCHEMA IF NOT EXISTS serving")
|
||||
for (table,) in tables:
|
||||
# Read via Arrow to avoid cross-connection catalog ambiguity.
|
||||
arrow_data = src.sql(f"SELECT * FROM serving.{table}").arrow()
|
||||
dst.register("_src", arrow_data)
|
||||
dst.execute(f"CREATE OR REPLACE TABLE serving.{table} AS SELECT * FROM _src")
|
||||
dst.unregister("_src")
|
||||
row_count = dst.sql(f"SELECT count(*) FROM serving.{table}").fetchone()[0]
|
||||
logger.info(f" serving.{table}: {row_count:,} rows")
|
||||
finally:
|
||||
dst.close()
|
||||
finally:
|
||||
src.close()
|
||||
|
||||
# Atomic rename — on Linux, rename() is atomic when src and dst are on the same filesystem.
|
||||
os.rename(tmp_path, serving_path)
|
||||
logger.info(f"Serving DB atomically updated: {serving_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
|
||||
export_serving()
|
||||
Reference in New Issue
Block a user