feat(pseo): write _serving_meta.json after atomic serving DB swap
Records exported_at_utc timestamp and per-table row counts immediately after export_serving.py completes. The pSEO Engine dashboard reads this file to show data freshness without querying file mtimes. Also moves the inline `import re` to the top-level imports. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -24,8 +24,12 @@ Usage:
|
|||||||
uv run python -m padelnomics.export_serving
|
uv run python -m padelnomics.export_serving
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import re
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
import duckdb
|
import duckdb
|
||||||
|
|
||||||
@@ -44,6 +48,8 @@ def export_serving() -> None:
|
|||||||
# (rename across filesystems is not atomic on Linux).
|
# (rename across filesystems is not atomic on Linux).
|
||||||
tmp_path = os.path.join(os.path.dirname(os.path.abspath(serving_path)), "_export.duckdb")
|
tmp_path = os.path.join(os.path.dirname(os.path.abspath(serving_path)), "_export.duckdb")
|
||||||
|
|
||||||
|
table_counts: dict[str, int] = {}
|
||||||
|
|
||||||
src = duckdb.connect(pipeline_path, read_only=True)
|
src = duckdb.connect(pipeline_path, read_only=True)
|
||||||
try:
|
try:
|
||||||
# SQLMesh creates serving views that reference "local".sqlmesh__serving.*
|
# SQLMesh creates serving views that reference "local".sqlmesh__serving.*
|
||||||
@@ -60,7 +66,6 @@ def export_serving() -> None:
|
|||||||
for view_name, view_sql in view_rows:
|
for view_name, view_sql in view_rows:
|
||||||
# Pattern: ... FROM "local".sqlmesh__serving.serving__name__hash;
|
# Pattern: ... FROM "local".sqlmesh__serving.serving__name__hash;
|
||||||
# Strip the "local". prefix to get schema.table
|
# Strip the "local". prefix to get schema.table
|
||||||
import re
|
|
||||||
match = re.search(r'FROM\s+"local"\.(sqlmesh__serving\.\S+)', view_sql)
|
match = re.search(r'FROM\s+"local"\.(sqlmesh__serving\.\S+)', view_sql)
|
||||||
assert match, f"Cannot parse view definition for {view_name}: {view_sql[:200]}"
|
assert match, f"Cannot parse view definition for {view_name}: {view_sql[:200]}"
|
||||||
physical_tables.append((view_name, match.group(1)))
|
physical_tables.append((view_name, match.group(1)))
|
||||||
@@ -81,6 +86,7 @@ def export_serving() -> None:
|
|||||||
dst.execute(f"CREATE OR REPLACE TABLE serving.{logical_name} AS SELECT * FROM _src")
|
dst.execute(f"CREATE OR REPLACE TABLE serving.{logical_name} AS SELECT * FROM _src")
|
||||||
dst.unregister("_src")
|
dst.unregister("_src")
|
||||||
row_count = dst.sql(f"SELECT count(*) FROM serving.{logical_name}").fetchone()[0]
|
row_count = dst.sql(f"SELECT count(*) FROM serving.{logical_name}").fetchone()[0]
|
||||||
|
table_counts[logical_name] = row_count
|
||||||
logger.info(f" serving.{logical_name}: {row_count:,} rows")
|
logger.info(f" serving.{logical_name}: {row_count:,} rows")
|
||||||
finally:
|
finally:
|
||||||
dst.close()
|
dst.close()
|
||||||
@@ -91,6 +97,16 @@ def export_serving() -> None:
|
|||||||
os.rename(tmp_path, serving_path)
|
os.rename(tmp_path, serving_path)
|
||||||
logger.info(f"Serving DB atomically updated: {serving_path}")
|
logger.info(f"Serving DB atomically updated: {serving_path}")
|
||||||
|
|
||||||
|
# Write freshness metadata so the pSEO dashboard can show data age without
|
||||||
|
# querying file mtimes (which are unreliable after rclone syncs).
|
||||||
|
meta_path = Path(serving_path).parent / "_serving_meta.json"
|
||||||
|
meta = {
|
||||||
|
"exported_at_utc": datetime.now(tz=UTC).isoformat(),
|
||||||
|
"tables": {name: {"row_count": count} for name, count in table_counts.items()},
|
||||||
|
}
|
||||||
|
meta_path.write_text(json.dumps(meta))
|
||||||
|
logger.info("Wrote serving metadata: %s", meta_path)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
|
||||||
|
|||||||
Reference in New Issue
Block a user