fix: export_serving — Arrow-based copy, rename to analytics.duckdb
Two bugs fixed: 1. Cross-connection COPY: DuckDB doesn't support referencing another connection's tables as src.serving.table. Replace with Arrow as intermediate: src reads to Arrow, dst.register() + CREATE TABLE. 2. Catalog/schema name collision: naming the export file serving.duckdb made DuckDB assign catalog name "serving" — same as the schema we create inside it. Every serving.table query became ambiguous. Rename to analytics.duckdb (catalog "analytics", schema "serving" = no clash). SERVING_DUCKDB_PATH values updated: serving.duckdb → analytics.duckdb in supervisor, service, bootstrap, dev_run.sh, .env.example, docker-compose. 3. Temp file: use _export.duckdb (not serving.duckdb.tmp) to avoid the same catalog collision during the write phase. Verified: 6 tables exported, serving.* queries work read-only. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -93,7 +93,7 @@ PULUMI_ACCESS_TOKEN=${PULUMI_ACCESS_TOKEN}
|
|||||||
PATH=/root/.cargo/bin:/root/.pulumi/bin:/usr/local/bin:/usr/bin:/bin
|
PATH=/root/.cargo/bin:/root/.pulumi/bin:/usr/local/bin:/usr/bin:/bin
|
||||||
LANDING_DIR=/data/materia/landing
|
LANDING_DIR=/data/materia/landing
|
||||||
DUCKDB_PATH=/data/materia/lakehouse.duckdb
|
DUCKDB_PATH=/data/materia/lakehouse.duckdb
|
||||||
SERVING_DUCKDB_PATH=/data/materia/serving.duckdb
|
SERVING_DUCKDB_PATH=/data/materia/analytics.duckdb
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
echo "--- Setting up systemd service ---"
|
echo "--- Setting up systemd service ---"
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ RestartSec=10
|
|||||||
EnvironmentFile=/opt/materia/.env
|
EnvironmentFile=/opt/materia/.env
|
||||||
Environment=LANDING_DIR=/data/materia/landing
|
Environment=LANDING_DIR=/data/materia/landing
|
||||||
Environment=DUCKDB_PATH=/data/materia/lakehouse.duckdb
|
Environment=DUCKDB_PATH=/data/materia/lakehouse.duckdb
|
||||||
Environment=SERVING_DUCKDB_PATH=/data/materia/serving.duckdb
|
Environment=SERVING_DUCKDB_PATH=/data/materia/analytics.duckdb
|
||||||
|
|
||||||
# Resource limits
|
# Resource limits
|
||||||
LimitNOFILE=65536
|
LimitNOFILE=65536
|
||||||
|
|||||||
@@ -52,11 +52,11 @@ do
|
|||||||
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
||||||
uv run materia pipeline run transform
|
uv run materia pipeline run transform
|
||||||
|
|
||||||
# Export serving tables to serving.duckdb (atomic swap).
|
# Export serving tables to analytics.duckdb (atomic swap).
|
||||||
# The web app reads from SERVING_DUCKDB_PATH and picks up the new file
|
# The web app reads from SERVING_DUCKDB_PATH and picks up the new file
|
||||||
# automatically via inode-based connection reopen — no restart needed.
|
# automatically via inode-based connection reopen — no restart needed.
|
||||||
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \
|
||||||
SERVING_DUCKDB_PATH="${SERVING_DUCKDB_PATH:-/data/materia/serving.duckdb}" \
|
SERVING_DUCKDB_PATH="${SERVING_DUCKDB_PATH:-/data/materia/analytics.duckdb}" \
|
||||||
uv run materia pipeline run export_serving
|
uv run materia pipeline run export_serving
|
||||||
|
|
||||||
) || {
|
) || {
|
||||||
|
|||||||
@@ -29,12 +29,16 @@ def export_serving() -> None:
|
|||||||
assert serving_path, "SERVING_DUCKDB_PATH must be set"
|
assert serving_path, "SERVING_DUCKDB_PATH must be set"
|
||||||
assert os.path.exists(pipeline_path), f"Pipeline DB not found: {pipeline_path}"
|
assert os.path.exists(pipeline_path), f"Pipeline DB not found: {pipeline_path}"
|
||||||
|
|
||||||
tmp_path = serving_path + ".tmp"
|
# Temp path must not start with "serving" — DuckDB names the catalog after
|
||||||
|
# the filename stem, so "serving.duckdb.tmp" → catalog "serving", which
|
||||||
|
# clashes with the schema we create inside it.
|
||||||
|
tmp_path = os.path.join(os.path.dirname(os.path.abspath(serving_path)), "_export.duckdb")
|
||||||
|
|
||||||
src = duckdb.connect(pipeline_path, read_only=True)
|
src = duckdb.connect(pipeline_path, read_only=True)
|
||||||
try:
|
try:
|
||||||
tables = src.sql(
|
tables = src.sql(
|
||||||
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'serving' ORDER BY table_name"
|
"SELECT table_name FROM information_schema.tables"
|
||||||
|
" WHERE table_schema = 'serving' ORDER BY table_name"
|
||||||
).fetchall()
|
).fetchall()
|
||||||
assert tables, f"No tables found in serving schema of {pipeline_path}"
|
assert tables, f"No tables found in serving schema of {pipeline_path}"
|
||||||
logger.info(f"Exporting {len(tables)} serving tables: {[t[0] for t in tables]}")
|
logger.info(f"Exporting {len(tables)} serving tables: {[t[0] for t in tables]}")
|
||||||
@@ -43,10 +47,11 @@ def export_serving() -> None:
|
|||||||
try:
|
try:
|
||||||
dst.execute("CREATE SCHEMA IF NOT EXISTS serving")
|
dst.execute("CREATE SCHEMA IF NOT EXISTS serving")
|
||||||
for (table,) in tables:
|
for (table,) in tables:
|
||||||
dst.execute(
|
# Read via Arrow so there is no cross-connection catalog ambiguity.
|
||||||
f"CREATE OR REPLACE TABLE serving.{table} AS "
|
arrow_data = src.sql(f"SELECT * FROM serving.{table}").arrow()
|
||||||
f"SELECT * FROM src.serving.{table}",
|
dst.register("_src", arrow_data)
|
||||||
)
|
dst.execute(f"CREATE OR REPLACE TABLE serving.{table} AS SELECT * FROM _src")
|
||||||
|
dst.unregister("_src")
|
||||||
row_count = dst.sql(f"SELECT count(*) FROM serving.{table}").fetchone()[0]
|
row_count = dst.sql(f"SELECT count(*) FROM serving.{table}").fetchone()[0]
|
||||||
logger.info(f" serving.{table}: {row_count:,} rows")
|
logger.info(f" serving.{table}: {row_count:,} rows")
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
@@ -8,10 +8,10 @@ ADMIN_EMAILS=admin@beanflows.coffee
|
|||||||
# Database
|
# Database
|
||||||
DATABASE_PATH=data/app.db
|
DATABASE_PATH=data/app.db
|
||||||
# DUCKDB_PATH points to the full pipeline DB (lakehouse.duckdb) — used by SQLMesh and export_serving.
|
# DUCKDB_PATH points to the full pipeline DB (lakehouse.duckdb) — used by SQLMesh and export_serving.
|
||||||
# SERVING_DUCKDB_PATH points to the serving-only export (serving.duckdb) — used by the web app.
|
# SERVING_DUCKDB_PATH points to the serving-only export (analytics.duckdb) — used by the web app.
|
||||||
# Run `uv run materia pipeline run export_serving` after each SQLMesh transform to populate it.
|
# Run `uv run materia pipeline run export_serving` after each SQLMesh transform to populate it.
|
||||||
DUCKDB_PATH=../local.duckdb
|
DUCKDB_PATH=../local.duckdb
|
||||||
SERVING_DUCKDB_PATH=../serving.duckdb
|
SERVING_DUCKDB_PATH=../analytics.duckdb
|
||||||
|
|
||||||
# Auth
|
# Auth
|
||||||
MAGIC_LINK_EXPIRY_MINUTES=15
|
MAGIC_LINK_EXPIRY_MINUTES=15
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ services:
|
|||||||
env_file: .env
|
env_file: .env
|
||||||
environment:
|
environment:
|
||||||
- DATABASE_PATH=/app/data/app.db
|
- DATABASE_PATH=/app/data/app.db
|
||||||
- SERVING_DUCKDB_PATH=/app/duckdb/serving.duckdb
|
- SERVING_DUCKDB_PATH=/app/duckdb/analytics.duckdb
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
|
test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
|
||||||
interval: 30s
|
interval: 30s
|
||||||
|
|||||||
@@ -55,14 +55,14 @@ make css-build
|
|||||||
ok "CSS built"
|
ok "CSS built"
|
||||||
|
|
||||||
# -- Pipeline (first-time only) ----------------------------------------------
|
# -- Pipeline (first-time only) ----------------------------------------------
|
||||||
# Runs extract → transform → export_serving from the repo root if serving.duckdb
|
# Runs extract → transform → export_serving from the repo root if analytics.duckdb
|
||||||
# does not exist yet. Subsequent dev_run.sh invocations skip this — delete
|
# does not exist yet. Subsequent dev_run.sh invocations skip this — delete
|
||||||
# serving.duckdb from the repo root to force a full re-run.
|
# analytics.duckdb from the repo root to force a full re-run.
|
||||||
|
|
||||||
REPO_ROOT="$(cd .. && pwd)"
|
REPO_ROOT="$(cd .. && pwd)"
|
||||||
PIPELINE_LANDING="$REPO_ROOT/data/landing"
|
PIPELINE_LANDING="$REPO_ROOT/data/landing"
|
||||||
PIPELINE_DUCKDB="$REPO_ROOT/local.duckdb"
|
PIPELINE_DUCKDB="$REPO_ROOT/local.duckdb"
|
||||||
PIPELINE_SERVING="$REPO_ROOT/serving.duckdb"
|
PIPELINE_SERVING="$REPO_ROOT/analytics.duckdb"
|
||||||
|
|
||||||
if [ ! -f "$PIPELINE_SERVING" ]; then
|
if [ ! -f "$PIPELINE_SERVING" ]; then
|
||||||
info "First run — fetching and transforming data (this may take a few minutes)"
|
info "First run — fetching and transforming data (this may take a few minutes)"
|
||||||
|
|||||||
Reference in New Issue
Block a user