refactor: replace prod_query.py with unified prod.py CLI tool

Adds subcommands: query, sqlmesh-plan, export, deploy, status, logs.
Fixes hardcoded DB paths (/opt/padelnomics/data/ → /data/padelnomics/).
Streaming SSH for long-running ops, capture for query/status.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-03-10 21:32:38 +01:00
parent 236f0d1061
commit 4e81987741
2 changed files with 266 additions and 124 deletions

266
scripts/prod.py Normal file
View File

@@ -0,0 +1,266 @@
"""
Unified prod server tool — query, pipeline ops, deploy, logs.
Usage:
uv run python scripts/prod.py query "SELECT COUNT(*) FROM serving.location_profiles"
uv run python scripts/prod.py query --db lakehouse "SELECT 1"
uv run python scripts/prod.py query --json "SELECT * FROM serving.pseo_country_overview LIMIT 3"
uv run python scripts/prod.py sqlmesh-plan
uv run python scripts/prod.py sqlmesh-plan --dry-run
uv run python scripts/prod.py export
uv run python scripts/prod.py deploy
uv run python scripts/prod.py status
uv run python scripts/prod.py logs
uv run python scripts/prod.py logs -f
uv run python scripts/prod.py logs -n 50
"""
import argparse
import base64
import subprocess
import sys
# --- Constants ---
SSH_HOST = "hetzner_root"
SSH_USER = "padelnomics_service"
REPO_DIR = "/opt/padelnomics"
DATA_DIR = "/data/padelnomics"
DB_PATHS = {
"analytics": f"{DATA_DIR}/analytics.duckdb",
"lakehouse": f"{DATA_DIR}/lakehouse.duckdb",
}
LANDING_DIR = f"{DATA_DIR}/landing"
MAX_ROWS = 500
QUERY_TIMEOUT_SECONDS = 40
SQLMESH_TIMEOUT_SECONDS = 14400 # 4h — matches supervisor SUBPROCESS_TIMEOUT_SECONDS
EXPORT_TIMEOUT_SECONDS = 300
DEPLOY_TIMEOUT_SECONDS = 600
STATUS_TIMEOUT_SECONDS = 30
# Mutation keywords blocked (defense in depth — DB is read_only anyway)
BLOCKED_KEYWORDS = {
"CREATE", "DROP", "ALTER", "INSERT", "UPDATE", "DELETE",
"ATTACH", "COPY", "EXPORT", "INSTALL", "LOAD",
}
# Remote Python script template for query subcommand.
# Receives SQL as base64 via {b64_sql}.
# Uses ATTACH + USE to alias the lakehouse catalog as "local" for SQLMesh view compat.
REMOTE_QUERY_SCRIPT = """\
import duckdb, json, sys, base64
db_path = "{db_path}"
sql = base64.b64decode("{b64_sql}").decode()
max_rows = {max_rows}
output_json = {output_json}
try:
if "lakehouse" in db_path:
con = duckdb.connect(":memory:")
con.execute(f"ATTACH '{{db_path}}' AS local (READ_ONLY)")
con.execute("USE local")
else:
con = duckdb.connect(db_path, read_only=True)
result = con.execute(sql)
cols = [d[0] for d in result.description]
rows = result.fetchmany(max_rows)
if output_json:
print(json.dumps({{"columns": cols, "rows": [list(r) for r in rows], "count": len(rows)}}, default=str))
else:
print("\\t".join(cols))
for row in rows:
print("\\t".join(str(v) if v is not None else "NULL" for v in row))
if len(rows) == max_rows:
print(f"... truncated at {{max_rows}} rows", file=sys.stderr)
except Exception as e:
print(f"ERROR: {{e}}", file=sys.stderr)
sys.exit(1)
"""
# --- SSH execution ---
def run_ssh_shell(shell_cmd, *, timeout_seconds=None, capture=False):
"""Run a shell command on the prod server via SSH, streaming or capturing output.
Commands run as SSH_USER via sudo. Returns the remote exit code.
"""
ssh_cmd = [
"ssh", SSH_HOST,
f"sudo -u {SSH_USER} bash -lc {_shell_quote(f'cd {REPO_DIR} && {shell_cmd}')}",
]
if capture:
result = subprocess.run(
ssh_cmd, capture_output=True, text=True, timeout=timeout_seconds,
)
if result.stdout:
print(result.stdout, end="")
if result.stderr:
print(result.stderr, end="", file=sys.stderr)
return result.returncode
else:
result = subprocess.run(ssh_cmd, timeout=timeout_seconds)
return result.returncode
def run_ssh_shell_as_root(shell_cmd, *, timeout_seconds=None):
"""Run a shell command as root (not the service user). For journalctl etc."""
ssh_cmd = ["ssh", SSH_HOST, shell_cmd]
result = subprocess.run(ssh_cmd, timeout=timeout_seconds)
return result.returncode
def run_ssh_python(script, *, timeout_seconds):
"""Send a Python script to the prod server via SSH stdin and capture output."""
ssh_cmd = [
"ssh", SSH_HOST,
f"sudo -u {SSH_USER} bash -lc 'cd {REPO_DIR} && uv run python3 -'",
]
return subprocess.run(
ssh_cmd, input=script, capture_output=True, text=True,
timeout=timeout_seconds,
)
def _shell_quote(s):
"""Single-quote a string for shell, escaping embedded single quotes."""
return "'" + s.replace("'", "'\\''") + "'"
# --- Subcommands ---
def cmd_query(args):
sql = args.sql
if args.stdin or sql is None:
sql = sys.stdin.read().strip()
if not sql:
print("ERROR: No SQL provided", file=sys.stderr)
return 1
sql_upper = sql.upper()
for kw in BLOCKED_KEYWORDS:
if kw in sql_upper:
print(f"ERROR: Blocked keyword '{kw}' in query", file=sys.stderr)
return 1
b64_sql = base64.b64encode(sql.encode()).decode()
remote_script = REMOTE_QUERY_SCRIPT.format(
db_path=DB_PATHS[args.db],
b64_sql=b64_sql,
max_rows=args.max_rows,
output_json=args.json,
)
result = run_ssh_python(remote_script, timeout_seconds=QUERY_TIMEOUT_SECONDS)
if result.stdout:
print(result.stdout, end="")
if result.stderr:
print(result.stderr, end="", file=sys.stderr)
return result.returncode
def cmd_sqlmesh_plan(args):
auto_apply = "" if args.dry_run else " --auto-apply"
shell_cmd = (
f"LANDING_DIR={LANDING_DIR} "
f"DUCKDB_PATH={DB_PATHS['lakehouse']} "
f"uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod{auto_apply}"
)
return run_ssh_shell(shell_cmd, timeout_seconds=SQLMESH_TIMEOUT_SECONDS)
def cmd_export(args):
shell_cmd = (
f"DUCKDB_PATH={DB_PATHS['lakehouse']} "
f"SERVING_DUCKDB_PATH={DB_PATHS['analytics']} "
"uv run python src/padelnomics/export_serving.py"
)
return run_ssh_shell(shell_cmd, timeout_seconds=EXPORT_TIMEOUT_SECONDS)
def cmd_deploy(args):
return run_ssh_shell("bash deploy.sh", timeout_seconds=DEPLOY_TIMEOUT_SECONDS)
def cmd_status(args):
shell_cmd = "uv run python src/padelnomics/supervisor.py status"
return run_ssh_shell(shell_cmd, timeout_seconds=STATUS_TIMEOUT_SECONDS, capture=True)
def cmd_logs(args):
follow = " -f" if args.follow else ""
shell_cmd = f"journalctl -u padelnomics-supervisor --no-pager -n {args.lines}{follow}"
return run_ssh_shell_as_root(shell_cmd, timeout_seconds=None if args.follow else STATUS_TIMEOUT_SECONDS)
# --- CLI ---
def main():
parser = argparse.ArgumentParser(
prog="prod",
description="Unified prod server tool — query, pipeline ops, deploy, logs",
)
subparsers = parser.add_subparsers(dest="command", required=True)
# query
p_query = subparsers.add_parser("query", help="Run a read-only DuckDB query")
p_query.add_argument("sql", nargs="?", help="SQL query to run")
p_query.add_argument("--stdin", action="store_true", help="Read SQL from stdin")
p_query.add_argument(
"--db", choices=list(DB_PATHS.keys()), default="analytics",
help="Which database (default: analytics)",
)
p_query.add_argument(
"--max-rows", type=int, default=MAX_ROWS,
help=f"Max rows to return (default: {MAX_ROWS})",
)
p_query.add_argument("--json", action="store_true", help="Output JSON instead of TSV")
p_query.set_defaults(func=cmd_query)
# sqlmesh-plan
p_sqlmesh = subparsers.add_parser("sqlmesh-plan", help="Run SQLMesh plan prod")
p_sqlmesh.add_argument(
"--dry-run", action="store_true",
help="Show plan without applying (omits --auto-apply)",
)
p_sqlmesh.set_defaults(func=cmd_sqlmesh_plan)
# export
p_export = subparsers.add_parser("export", help="Export serving tables to analytics.duckdb")
p_export.set_defaults(func=cmd_export)
# deploy
p_deploy = subparsers.add_parser("deploy", help="Run deploy.sh (blue/green swap)")
p_deploy.set_defaults(func=cmd_deploy)
# status
p_status = subparsers.add_parser("status", help="Show supervisor status")
p_status.set_defaults(func=cmd_status)
# logs
p_logs = subparsers.add_parser("logs", help="Show supervisor journal logs")
p_logs.add_argument(
"-f", "--follow", action="store_true",
help="Follow log output (Ctrl-C to stop)",
)
p_logs.add_argument(
"-n", "--lines", type=int, default=100,
help="Number of log lines to show (default: 100)",
)
p_logs.set_defaults(func=cmd_logs)
args = parser.parse_args()
return args.func(args)
if __name__ == "__main__":
try:
sys.exit(main())
except subprocess.TimeoutExpired as exc:
print(f"\nERROR: Timed out after {exc.timeout}s: {' '.join(str(a) for a in exc.cmd)}", file=sys.stderr)
sys.exit(124)
except KeyboardInterrupt:
sys.exit(130)