""" Unified prod server tool — query, pipeline ops, deploy, logs. Usage: uv run python scripts/prod.py query "SELECT COUNT(*) FROM serving.location_profiles" uv run python scripts/prod.py query --db lakehouse "SELECT 1" uv run python scripts/prod.py query --json "SELECT * FROM serving.pseo_country_overview LIMIT 3" uv run python scripts/prod.py sqlmesh-plan uv run python scripts/prod.py sqlmesh-plan --dry-run uv run python scripts/prod.py export uv run python scripts/prod.py deploy uv run python scripts/prod.py status uv run python scripts/prod.py logs uv run python scripts/prod.py logs -f uv run python scripts/prod.py logs -n 50 """ import argparse import base64 import subprocess import sys # --- Constants --- SSH_HOST = "hetzner_root" SSH_USER = "padelnomics_service" REPO_DIR = "/opt/padelnomics" DATA_DIR = "/data/padelnomics" DB_PATHS = { "analytics": f"{DATA_DIR}/analytics.duckdb", "lakehouse": f"{DATA_DIR}/lakehouse.duckdb", } LANDING_DIR = f"{DATA_DIR}/landing" MAX_ROWS = 500 QUERY_TIMEOUT_SECONDS = 40 SQLMESH_TIMEOUT_SECONDS = 14400 # 4h — matches supervisor SUBPROCESS_TIMEOUT_SECONDS EXPORT_TIMEOUT_SECONDS = 300 DEPLOY_TIMEOUT_SECONDS = 600 STATUS_TIMEOUT_SECONDS = 30 # Mutation keywords blocked (defense in depth — DB is read_only anyway) BLOCKED_KEYWORDS = { "CREATE", "DROP", "ALTER", "INSERT", "UPDATE", "DELETE", "ATTACH", "COPY", "EXPORT", "INSTALL", "LOAD", } # Remote Python script template for query subcommand. # Receives SQL as base64 via {b64_sql}. # Uses ATTACH + USE to alias the lakehouse catalog as "local" for SQLMesh view compat. REMOTE_QUERY_SCRIPT = """\ import duckdb, json, sys, base64 db_path = "{db_path}" sql = base64.b64decode("{b64_sql}").decode() max_rows = {max_rows} output_json = {output_json} try: if "lakehouse" in db_path: con = duckdb.connect(":memory:") con.execute(f"ATTACH '{{db_path}}' AS local (READ_ONLY)") con.execute("USE local") else: con = duckdb.connect(db_path, read_only=True) result = con.execute(sql) cols = [d[0] for d in result.description] rows = result.fetchmany(max_rows) if output_json: print(json.dumps({{"columns": cols, "rows": [list(r) for r in rows], "count": len(rows)}}, default=str)) else: print("\\t".join(cols)) for row in rows: print("\\t".join(str(v) if v is not None else "NULL" for v in row)) if len(rows) == max_rows: print(f"... truncated at {{max_rows}} rows", file=sys.stderr) except Exception as e: print(f"ERROR: {{e}}", file=sys.stderr) sys.exit(1) """ # --- SSH execution --- def run_ssh_shell(shell_cmd, *, timeout_seconds=None, capture=False): """Run a shell command on the prod server via SSH, streaming or capturing output. Commands run as SSH_USER via sudo. Returns the remote exit code. """ ssh_cmd = [ "ssh", SSH_HOST, f"sudo -u {SSH_USER} bash -lc {_shell_quote(f'cd {REPO_DIR} && {shell_cmd}')}", ] if capture: result = subprocess.run( ssh_cmd, capture_output=True, text=True, timeout=timeout_seconds, ) if result.stdout: print(result.stdout, end="") if result.stderr: print(result.stderr, end="", file=sys.stderr) return result.returncode else: result = subprocess.run(ssh_cmd, timeout=timeout_seconds) return result.returncode def run_ssh_shell_as_root(shell_cmd, *, timeout_seconds=None): """Run a shell command as root (not the service user). For journalctl etc.""" ssh_cmd = ["ssh", SSH_HOST, shell_cmd] result = subprocess.run(ssh_cmd, timeout=timeout_seconds) return result.returncode def run_ssh_python(script, *, timeout_seconds): """Send a Python script to the prod server via SSH stdin and capture output.""" ssh_cmd = [ "ssh", SSH_HOST, f"sudo -u {SSH_USER} bash -lc 'cd {REPO_DIR} && uv run python3 -'", ] return subprocess.run( ssh_cmd, input=script, capture_output=True, text=True, timeout=timeout_seconds, ) def _shell_quote(s): """Single-quote a string for shell, escaping embedded single quotes.""" return "'" + s.replace("'", "'\\''") + "'" # --- Subcommands --- def cmd_query(args): sql = args.sql if args.stdin or sql is None: sql = sys.stdin.read().strip() if not sql: print("ERROR: No SQL provided", file=sys.stderr) return 1 sql_upper = sql.upper() for kw in BLOCKED_KEYWORDS: if kw in sql_upper: print(f"ERROR: Blocked keyword '{kw}' in query", file=sys.stderr) return 1 b64_sql = base64.b64encode(sql.encode()).decode() remote_script = REMOTE_QUERY_SCRIPT.format( db_path=DB_PATHS[args.db], b64_sql=b64_sql, max_rows=args.max_rows, output_json=args.json, ) result = run_ssh_python(remote_script, timeout_seconds=QUERY_TIMEOUT_SECONDS) if result.stdout: print(result.stdout, end="") if result.stderr: print(result.stderr, end="", file=sys.stderr) return result.returncode def cmd_sqlmesh_plan(args): auto_apply = "" if args.dry_run else " --auto-apply" shell_cmd = ( f"LANDING_DIR={LANDING_DIR} " f"DUCKDB_PATH={DB_PATHS['lakehouse']} " f"uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod{auto_apply}" ) return run_ssh_shell(shell_cmd, timeout_seconds=SQLMESH_TIMEOUT_SECONDS) def cmd_export(args): shell_cmd = ( f"DUCKDB_PATH={DB_PATHS['lakehouse']} " f"SERVING_DUCKDB_PATH={DB_PATHS['analytics']} " "uv run python src/padelnomics/export_serving.py" ) return run_ssh_shell(shell_cmd, timeout_seconds=EXPORT_TIMEOUT_SECONDS) def cmd_deploy(args): return run_ssh_shell("bash deploy.sh", timeout_seconds=DEPLOY_TIMEOUT_SECONDS) def cmd_status(args): shell_cmd = "uv run python src/padelnomics/supervisor.py status" return run_ssh_shell(shell_cmd, timeout_seconds=STATUS_TIMEOUT_SECONDS, capture=True) def cmd_logs(args): follow = " -f" if args.follow else "" shell_cmd = f"journalctl -u padelnomics-supervisor --no-pager -n {args.lines}{follow}" return run_ssh_shell_as_root(shell_cmd, timeout_seconds=None if args.follow else STATUS_TIMEOUT_SECONDS) # --- CLI --- def main(): parser = argparse.ArgumentParser( prog="prod", description="Unified prod server tool — query, pipeline ops, deploy, logs", ) subparsers = parser.add_subparsers(dest="command", required=True) # query p_query = subparsers.add_parser("query", help="Run a read-only DuckDB query") p_query.add_argument("sql", nargs="?", help="SQL query to run") p_query.add_argument("--stdin", action="store_true", help="Read SQL from stdin") p_query.add_argument( "--db", choices=list(DB_PATHS.keys()), default="analytics", help="Which database (default: analytics)", ) p_query.add_argument( "--max-rows", type=int, default=MAX_ROWS, help=f"Max rows to return (default: {MAX_ROWS})", ) p_query.add_argument("--json", action="store_true", help="Output JSON instead of TSV") p_query.set_defaults(func=cmd_query) # sqlmesh-plan p_sqlmesh = subparsers.add_parser("sqlmesh-plan", help="Run SQLMesh plan prod") p_sqlmesh.add_argument( "--dry-run", action="store_true", help="Show plan without applying (omits --auto-apply)", ) p_sqlmesh.set_defaults(func=cmd_sqlmesh_plan) # export p_export = subparsers.add_parser("export", help="Export serving tables to analytics.duckdb") p_export.set_defaults(func=cmd_export) # deploy p_deploy = subparsers.add_parser("deploy", help="Run deploy.sh (blue/green swap)") p_deploy.set_defaults(func=cmd_deploy) # status p_status = subparsers.add_parser("status", help="Show supervisor status") p_status.set_defaults(func=cmd_status) # logs p_logs = subparsers.add_parser("logs", help="Show supervisor journal logs") p_logs.add_argument( "-f", "--follow", action="store_true", help="Follow log output (Ctrl-C to stop)", ) p_logs.add_argument( "-n", "--lines", type=int, default=100, help="Number of log lines to show (default: 100)", ) p_logs.set_defaults(func=cmd_logs) args = parser.parse_args() return args.func(args) if __name__ == "__main__": try: sys.exit(main()) except subprocess.TimeoutExpired as exc: print(f"\nERROR: Timed out after {exc.timeout}s: {' '.join(str(a) for a in exc.cmd)}", file=sys.stderr) sys.exit(124) except KeyboardInterrupt: sys.exit(130)