fix(analytics): directory bind mount + inode-based auto-reopen
- docker-compose.prod.yml: replace file bind mount for analytics.duckdb with directory bind mount (/opt/padelnomics/data:/app/data/pipeline:ro) so os.rename() on the host is visible inside the container - Override SERVING_DUCKDB_PATH to /app/data/pipeline/analytics.duckdb in all 6 blue/green services (removes dependency on .env value) - analytics.py: track file inode; call _check_and_reopen() at start of each query — transparently picks up new analytics.duckdb without restart when export_serving.py atomically replaces it after each pipeline run Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -59,10 +59,10 @@ services:
|
||||
env_file: ./.env
|
||||
environment:
|
||||
- DATABASE_PATH=/app/data/app.db
|
||||
- SERVING_DUCKDB_PATH=/app/data/analytics.duckdb
|
||||
- SERVING_DUCKDB_PATH=/app/data/pipeline/analytics.duckdb
|
||||
volumes:
|
||||
- app-data:/app/data
|
||||
- /data/padelnomics/analytics.duckdb:/app/data/analytics.duckdb:ro
|
||||
- /opt/padelnomics/data:/app/data/pipeline:ro
|
||||
networks:
|
||||
- net
|
||||
healthcheck:
|
||||
@@ -81,10 +81,10 @@ services:
|
||||
env_file: ./.env
|
||||
environment:
|
||||
- DATABASE_PATH=/app/data/app.db
|
||||
- SERVING_DUCKDB_PATH=/app/data/analytics.duckdb
|
||||
- SERVING_DUCKDB_PATH=/app/data/pipeline/analytics.duckdb
|
||||
volumes:
|
||||
- app-data:/app/data
|
||||
- /data/padelnomics/analytics.duckdb:/app/data/analytics.duckdb:ro
|
||||
- /opt/padelnomics/data:/app/data/pipeline:ro
|
||||
networks:
|
||||
- net
|
||||
|
||||
@@ -97,10 +97,10 @@ services:
|
||||
env_file: ./.env
|
||||
environment:
|
||||
- DATABASE_PATH=/app/data/app.db
|
||||
- SERVING_DUCKDB_PATH=/app/data/analytics.duckdb
|
||||
- SERVING_DUCKDB_PATH=/app/data/pipeline/analytics.duckdb
|
||||
volumes:
|
||||
- app-data:/app/data
|
||||
- /data/padelnomics/analytics.duckdb:/app/data/analytics.duckdb:ro
|
||||
- /opt/padelnomics/data:/app/data/pipeline:ro
|
||||
networks:
|
||||
- net
|
||||
|
||||
@@ -114,10 +114,10 @@ services:
|
||||
env_file: ./.env
|
||||
environment:
|
||||
- DATABASE_PATH=/app/data/app.db
|
||||
- SERVING_DUCKDB_PATH=/app/data/analytics.duckdb
|
||||
- SERVING_DUCKDB_PATH=/app/data/pipeline/analytics.duckdb
|
||||
volumes:
|
||||
- app-data:/app/data
|
||||
- /data/padelnomics/analytics.duckdb:/app/data/analytics.duckdb:ro
|
||||
- /opt/padelnomics/data:/app/data/pipeline:ro
|
||||
networks:
|
||||
- net
|
||||
healthcheck:
|
||||
@@ -136,10 +136,10 @@ services:
|
||||
env_file: ./.env
|
||||
environment:
|
||||
- DATABASE_PATH=/app/data/app.db
|
||||
- SERVING_DUCKDB_PATH=/app/data/analytics.duckdb
|
||||
- SERVING_DUCKDB_PATH=/app/data/pipeline/analytics.duckdb
|
||||
volumes:
|
||||
- app-data:/app/data
|
||||
- /data/padelnomics/analytics.duckdb:/app/data/analytics.duckdb:ro
|
||||
- /opt/padelnomics/data:/app/data/pipeline:ro
|
||||
networks:
|
||||
- net
|
||||
|
||||
@@ -152,10 +152,10 @@ services:
|
||||
env_file: ./.env
|
||||
environment:
|
||||
- DATABASE_PATH=/app/data/app.db
|
||||
- SERVING_DUCKDB_PATH=/app/data/analytics.duckdb
|
||||
- SERVING_DUCKDB_PATH=/app/data/pipeline/analytics.duckdb
|
||||
volumes:
|
||||
- app-data:/app/data
|
||||
- /data/padelnomics/analytics.duckdb:/app/data/analytics.duckdb:ro
|
||||
- /opt/padelnomics/data:/app/data/pipeline:ro
|
||||
networks:
|
||||
- net
|
||||
|
||||
|
||||
@@ -4,6 +4,10 @@ DuckDB read-only analytics reader.
|
||||
Opens a single long-lived DuckDB connection at startup (read_only=True).
|
||||
All queries run via asyncio.to_thread() to avoid blocking the event loop.
|
||||
|
||||
When export_serving.py atomically renames a new analytics.duckdb into place,
|
||||
_check_and_reopen() detects the inode change and transparently reopens —
|
||||
no app restart required.
|
||||
|
||||
Usage:
|
||||
from .analytics import fetch_analytics, execute_user_query
|
||||
|
||||
@@ -14,6 +18,7 @@ Usage:
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
@@ -21,6 +26,8 @@ from typing import Any
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_conn = None # duckdb.DuckDBPyConnection | None — lazy import
|
||||
_conn_inode: int | None = None
|
||||
_reopen_lock = threading.Lock()
|
||||
_DUCKDB_PATH = os.environ.get("SERVING_DUCKDB_PATH", "data/analytics.duckdb")
|
||||
|
||||
# DuckDB queries run in the asyncio thread pool. Cap them so a slow scan
|
||||
@@ -32,20 +39,67 @@ def open_analytics_db() -> None:
|
||||
"""Open the DuckDB connection. Call once at app startup."""
|
||||
import duckdb
|
||||
|
||||
global _conn
|
||||
global _conn, _conn_inode
|
||||
path = Path(_DUCKDB_PATH)
|
||||
if not path.exists():
|
||||
# Database doesn't exist yet — skip silently. Queries will return empty.
|
||||
return
|
||||
_conn = duckdb.connect(str(path), read_only=True)
|
||||
_conn_inode = path.stat().st_ino
|
||||
|
||||
|
||||
def close_analytics_db() -> None:
|
||||
"""Close the DuckDB connection. Call at app shutdown."""
|
||||
global _conn
|
||||
global _conn, _conn_inode
|
||||
if _conn is not None:
|
||||
_conn.close()
|
||||
_conn = None
|
||||
_conn_inode = None
|
||||
|
||||
|
||||
def _check_and_reopen() -> None:
|
||||
"""Reopen the connection if analytics.duckdb was atomically replaced (new inode).
|
||||
|
||||
Called at the start of each query. Requires a directory bind mount (not a file
|
||||
bind mount) so that os.stat() inside the container sees the new inode after rename.
|
||||
"""
|
||||
global _conn, _conn_inode
|
||||
import duckdb
|
||||
|
||||
path = Path(_DUCKDB_PATH)
|
||||
try:
|
||||
current_inode = path.stat().st_ino
|
||||
except OSError:
|
||||
return
|
||||
|
||||
if current_inode == _conn_inode:
|
||||
return # same file — nothing to do
|
||||
|
||||
with _reopen_lock:
|
||||
# Double-check under lock to avoid concurrent reopens.
|
||||
try:
|
||||
current_inode = path.stat().st_ino
|
||||
except OSError:
|
||||
return
|
||||
if current_inode == _conn_inode:
|
||||
return
|
||||
|
||||
old_conn = _conn
|
||||
try:
|
||||
new_conn = duckdb.connect(str(path), read_only=True)
|
||||
except Exception:
|
||||
logger.exception("Failed to reopen analytics DB after file change")
|
||||
return
|
||||
|
||||
_conn = new_conn
|
||||
_conn_inode = current_inode
|
||||
logger.info("Analytics DB reopened (inode changed to %d)", current_inode)
|
||||
|
||||
if old_conn is not None:
|
||||
try:
|
||||
old_conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
async def fetch_analytics(sql: str, params: list | None = None) -> list[dict[str, Any]]:
|
||||
@@ -61,7 +115,11 @@ async def fetch_analytics(sql: str, params: list | None = None) -> list[dict[str
|
||||
return []
|
||||
|
||||
def _run() -> list[dict]:
|
||||
cur = _conn.cursor()
|
||||
_check_and_reopen()
|
||||
conn = _conn
|
||||
if conn is None:
|
||||
return []
|
||||
cur = conn.cursor()
|
||||
try:
|
||||
rel = cur.execute(sql, params or [])
|
||||
cols = [d[0] for d in rel.description]
|
||||
@@ -104,8 +162,12 @@ async def execute_user_query(
|
||||
return [], [], "Analytics database is not available.", 0.0
|
||||
|
||||
def _run() -> tuple[list[str], list[tuple], str | None, float]:
|
||||
_check_and_reopen()
|
||||
conn = _conn
|
||||
if conn is None:
|
||||
return [], [], "Analytics database is not available.", 0.0
|
||||
t0 = time.monotonic()
|
||||
cur = _conn.cursor()
|
||||
cur = conn.cursor()
|
||||
try:
|
||||
rel = cur.execute(sql)
|
||||
cols = [d[0] for d in rel.description]
|
||||
|
||||
Reference in New Issue
Block a user