fix: DuckDB compat issues in Playtomic pipeline + export_serving
- Add maximum_object_size=128MB to read_json for 14K-venue tenants file - Rewrite opening_hours to use UNION ALL unpivot (DuckDB struct dynamic access) - Add seed file guard for availability model (empty result on first run) - Fix snapshot_date VARCHAR→DATE comparison in venue_pricing_benchmarks - Fix export_serving to resolve SQLMesh physical tables from view definitions (SQLMesh views reference "local" catalog unavailable outside its context) - Add pyarrow dependency for Arrow-based cross-connection data transfer Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -46,24 +46,42 @@ def export_serving() -> None:
|
||||
|
||||
src = duckdb.connect(pipeline_path, read_only=True)
|
||||
try:
|
||||
tables = src.sql(
|
||||
"SELECT table_name FROM information_schema.tables"
|
||||
" WHERE table_schema = 'serving' ORDER BY table_name"
|
||||
# SQLMesh creates serving views that reference "local".sqlmesh__serving.*
|
||||
# which fails when connecting directly. Resolve the physical table each
|
||||
# view points to by parsing the view definition.
|
||||
view_rows = src.execute(
|
||||
"SELECT view_name, sql FROM duckdb_views()"
|
||||
" WHERE schema_name = 'serving' ORDER BY view_name"
|
||||
).fetchall()
|
||||
assert tables, f"No tables found in serving schema of {pipeline_path}"
|
||||
logger.info(f"Exporting {len(tables)} serving tables: {[t[0] for t in tables]}")
|
||||
assert view_rows, f"No views found in serving schema of {pipeline_path}"
|
||||
|
||||
# Extract physical table reference from: CREATE VIEW ... AS SELECT * FROM "local".schema.table;
|
||||
physical_tables: list[tuple[str, str]] = [] # (logical_name, physical_ref)
|
||||
for view_name, view_sql in view_rows:
|
||||
# Pattern: ... FROM "local".sqlmesh__serving.serving__name__hash;
|
||||
# Strip the "local". prefix to get schema.table
|
||||
import re
|
||||
match = re.search(r'FROM\s+"local"\.(sqlmesh__serving\.\S+)', view_sql)
|
||||
assert match, f"Cannot parse view definition for {view_name}: {view_sql[:200]}"
|
||||
physical_tables.append((view_name, match.group(1)))
|
||||
|
||||
logger.info(
|
||||
"Exporting %d serving tables: %s",
|
||||
len(physical_tables),
|
||||
[name for name, _ in physical_tables],
|
||||
)
|
||||
|
||||
dst = duckdb.connect(tmp_path)
|
||||
try:
|
||||
dst.execute("CREATE SCHEMA IF NOT EXISTS serving")
|
||||
for (table,) in tables:
|
||||
for logical_name, physical_ref in physical_tables:
|
||||
# Read via Arrow to avoid cross-connection catalog ambiguity.
|
||||
arrow_data = src.sql(f"SELECT * FROM serving.{table}").arrow()
|
||||
arrow_data = src.sql(f"SELECT * FROM {physical_ref}").arrow()
|
||||
dst.register("_src", arrow_data)
|
||||
dst.execute(f"CREATE OR REPLACE TABLE serving.{table} AS SELECT * FROM _src")
|
||||
dst.execute(f"CREATE OR REPLACE TABLE serving.{logical_name} AS SELECT * FROM _src")
|
||||
dst.unregister("_src")
|
||||
row_count = dst.sql(f"SELECT count(*) FROM serving.{table}").fetchone()[0]
|
||||
logger.info(f" serving.{table}: {row_count:,} rows")
|
||||
row_count = dst.sql(f"SELECT count(*) FROM serving.{logical_name}").fetchone()[0]
|
||||
logger.info(f" serving.{logical_name}: {row_count:,} rows")
|
||||
finally:
|
||||
dst.close()
|
||||
finally:
|
||||
|
||||
Reference in New Issue
Block a user