From 4e81987741c58e78cd55a91a51ffcb9240d3b06d Mon Sep 17 00:00:00 2001 From: Deeman Date: Tue, 10 Mar 2026 21:32:38 +0100 Subject: [PATCH] refactor: replace prod_query.py with unified prod.py CLI tool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds subcommands: query, sqlmesh-plan, export, deploy, status, logs. Fixes hardcoded DB paths (/opt/padelnomics/data/ → /data/padelnomics/). Streaming SSH for long-running ops, capture for query/status. Co-Authored-By: Claude Opus 4.6 --- scripts/prod.py | 266 ++++++++++++++++++++++++++++++++++++++++++ scripts/prod_query.py | 124 -------------------- 2 files changed, 266 insertions(+), 124 deletions(-) create mode 100644 scripts/prod.py delete mode 100644 scripts/prod_query.py diff --git a/scripts/prod.py b/scripts/prod.py new file mode 100644 index 0000000..cbd1c2b --- /dev/null +++ b/scripts/prod.py @@ -0,0 +1,266 @@ +""" +Unified prod server tool — query, pipeline ops, deploy, logs. + +Usage: + uv run python scripts/prod.py query "SELECT COUNT(*) FROM serving.location_profiles" + uv run python scripts/prod.py query --db lakehouse "SELECT 1" + uv run python scripts/prod.py query --json "SELECT * FROM serving.pseo_country_overview LIMIT 3" + uv run python scripts/prod.py sqlmesh-plan + uv run python scripts/prod.py sqlmesh-plan --dry-run + uv run python scripts/prod.py export + uv run python scripts/prod.py deploy + uv run python scripts/prod.py status + uv run python scripts/prod.py logs + uv run python scripts/prod.py logs -f + uv run python scripts/prod.py logs -n 50 +""" + +import argparse +import base64 +import subprocess +import sys + +# --- Constants --- + +SSH_HOST = "hetzner_root" +SSH_USER = "padelnomics_service" +REPO_DIR = "/opt/padelnomics" +DATA_DIR = "/data/padelnomics" +DB_PATHS = { + "analytics": f"{DATA_DIR}/analytics.duckdb", + "lakehouse": f"{DATA_DIR}/lakehouse.duckdb", +} +LANDING_DIR = f"{DATA_DIR}/landing" + +MAX_ROWS = 500 +QUERY_TIMEOUT_SECONDS = 40 +SQLMESH_TIMEOUT_SECONDS = 14400 # 4h — matches supervisor SUBPROCESS_TIMEOUT_SECONDS +EXPORT_TIMEOUT_SECONDS = 300 +DEPLOY_TIMEOUT_SECONDS = 600 +STATUS_TIMEOUT_SECONDS = 30 + +# Mutation keywords blocked (defense in depth — DB is read_only anyway) +BLOCKED_KEYWORDS = { + "CREATE", "DROP", "ALTER", "INSERT", "UPDATE", "DELETE", + "ATTACH", "COPY", "EXPORT", "INSTALL", "LOAD", +} + +# Remote Python script template for query subcommand. +# Receives SQL as base64 via {b64_sql}. +# Uses ATTACH + USE to alias the lakehouse catalog as "local" for SQLMesh view compat. +REMOTE_QUERY_SCRIPT = """\ +import duckdb, json, sys, base64 +db_path = "{db_path}" +sql = base64.b64decode("{b64_sql}").decode() +max_rows = {max_rows} +output_json = {output_json} +try: + if "lakehouse" in db_path: + con = duckdb.connect(":memory:") + con.execute(f"ATTACH '{{db_path}}' AS local (READ_ONLY)") + con.execute("USE local") + else: + con = duckdb.connect(db_path, read_only=True) + result = con.execute(sql) + cols = [d[0] for d in result.description] + rows = result.fetchmany(max_rows) + if output_json: + print(json.dumps({{"columns": cols, "rows": [list(r) for r in rows], "count": len(rows)}}, default=str)) + else: + print("\\t".join(cols)) + for row in rows: + print("\\t".join(str(v) if v is not None else "NULL" for v in row)) + if len(rows) == max_rows: + print(f"... truncated at {{max_rows}} rows", file=sys.stderr) +except Exception as e: + print(f"ERROR: {{e}}", file=sys.stderr) + sys.exit(1) +""" + + +# --- SSH execution --- + + +def run_ssh_shell(shell_cmd, *, timeout_seconds=None, capture=False): + """Run a shell command on the prod server via SSH, streaming or capturing output. + + Commands run as SSH_USER via sudo. Returns the remote exit code. + """ + ssh_cmd = [ + "ssh", SSH_HOST, + f"sudo -u {SSH_USER} bash -lc {_shell_quote(f'cd {REPO_DIR} && {shell_cmd}')}", + ] + if capture: + result = subprocess.run( + ssh_cmd, capture_output=True, text=True, timeout=timeout_seconds, + ) + if result.stdout: + print(result.stdout, end="") + if result.stderr: + print(result.stderr, end="", file=sys.stderr) + return result.returncode + else: + result = subprocess.run(ssh_cmd, timeout=timeout_seconds) + return result.returncode + + +def run_ssh_shell_as_root(shell_cmd, *, timeout_seconds=None): + """Run a shell command as root (not the service user). For journalctl etc.""" + ssh_cmd = ["ssh", SSH_HOST, shell_cmd] + result = subprocess.run(ssh_cmd, timeout=timeout_seconds) + return result.returncode + + +def run_ssh_python(script, *, timeout_seconds): + """Send a Python script to the prod server via SSH stdin and capture output.""" + ssh_cmd = [ + "ssh", SSH_HOST, + f"sudo -u {SSH_USER} bash -lc 'cd {REPO_DIR} && uv run python3 -'", + ] + return subprocess.run( + ssh_cmd, input=script, capture_output=True, text=True, + timeout=timeout_seconds, + ) + + +def _shell_quote(s): + """Single-quote a string for shell, escaping embedded single quotes.""" + return "'" + s.replace("'", "'\\''") + "'" + + +# --- Subcommands --- + + +def cmd_query(args): + sql = args.sql + if args.stdin or sql is None: + sql = sys.stdin.read().strip() + if not sql: + print("ERROR: No SQL provided", file=sys.stderr) + return 1 + + sql_upper = sql.upper() + for kw in BLOCKED_KEYWORDS: + if kw in sql_upper: + print(f"ERROR: Blocked keyword '{kw}' in query", file=sys.stderr) + return 1 + + b64_sql = base64.b64encode(sql.encode()).decode() + remote_script = REMOTE_QUERY_SCRIPT.format( + db_path=DB_PATHS[args.db], + b64_sql=b64_sql, + max_rows=args.max_rows, + output_json=args.json, + ) + + result = run_ssh_python(remote_script, timeout_seconds=QUERY_TIMEOUT_SECONDS) + if result.stdout: + print(result.stdout, end="") + if result.stderr: + print(result.stderr, end="", file=sys.stderr) + return result.returncode + + +def cmd_sqlmesh_plan(args): + auto_apply = "" if args.dry_run else " --auto-apply" + shell_cmd = ( + f"LANDING_DIR={LANDING_DIR} " + f"DUCKDB_PATH={DB_PATHS['lakehouse']} " + f"uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod{auto_apply}" + ) + return run_ssh_shell(shell_cmd, timeout_seconds=SQLMESH_TIMEOUT_SECONDS) + + +def cmd_export(args): + shell_cmd = ( + f"DUCKDB_PATH={DB_PATHS['lakehouse']} " + f"SERVING_DUCKDB_PATH={DB_PATHS['analytics']} " + "uv run python src/padelnomics/export_serving.py" + ) + return run_ssh_shell(shell_cmd, timeout_seconds=EXPORT_TIMEOUT_SECONDS) + + +def cmd_deploy(args): + return run_ssh_shell("bash deploy.sh", timeout_seconds=DEPLOY_TIMEOUT_SECONDS) + + +def cmd_status(args): + shell_cmd = "uv run python src/padelnomics/supervisor.py status" + return run_ssh_shell(shell_cmd, timeout_seconds=STATUS_TIMEOUT_SECONDS, capture=True) + + +def cmd_logs(args): + follow = " -f" if args.follow else "" + shell_cmd = f"journalctl -u padelnomics-supervisor --no-pager -n {args.lines}{follow}" + return run_ssh_shell_as_root(shell_cmd, timeout_seconds=None if args.follow else STATUS_TIMEOUT_SECONDS) + + +# --- CLI --- + + +def main(): + parser = argparse.ArgumentParser( + prog="prod", + description="Unified prod server tool — query, pipeline ops, deploy, logs", + ) + subparsers = parser.add_subparsers(dest="command", required=True) + + # query + p_query = subparsers.add_parser("query", help="Run a read-only DuckDB query") + p_query.add_argument("sql", nargs="?", help="SQL query to run") + p_query.add_argument("--stdin", action="store_true", help="Read SQL from stdin") + p_query.add_argument( + "--db", choices=list(DB_PATHS.keys()), default="analytics", + help="Which database (default: analytics)", + ) + p_query.add_argument( + "--max-rows", type=int, default=MAX_ROWS, + help=f"Max rows to return (default: {MAX_ROWS})", + ) + p_query.add_argument("--json", action="store_true", help="Output JSON instead of TSV") + p_query.set_defaults(func=cmd_query) + + # sqlmesh-plan + p_sqlmesh = subparsers.add_parser("sqlmesh-plan", help="Run SQLMesh plan prod") + p_sqlmesh.add_argument( + "--dry-run", action="store_true", + help="Show plan without applying (omits --auto-apply)", + ) + p_sqlmesh.set_defaults(func=cmd_sqlmesh_plan) + + # export + p_export = subparsers.add_parser("export", help="Export serving tables to analytics.duckdb") + p_export.set_defaults(func=cmd_export) + + # deploy + p_deploy = subparsers.add_parser("deploy", help="Run deploy.sh (blue/green swap)") + p_deploy.set_defaults(func=cmd_deploy) + + # status + p_status = subparsers.add_parser("status", help="Show supervisor status") + p_status.set_defaults(func=cmd_status) + + # logs + p_logs = subparsers.add_parser("logs", help="Show supervisor journal logs") + p_logs.add_argument( + "-f", "--follow", action="store_true", + help="Follow log output (Ctrl-C to stop)", + ) + p_logs.add_argument( + "-n", "--lines", type=int, default=100, + help="Number of log lines to show (default: 100)", + ) + p_logs.set_defaults(func=cmd_logs) + + args = parser.parse_args() + return args.func(args) + + +if __name__ == "__main__": + try: + sys.exit(main()) + except subprocess.TimeoutExpired as exc: + print(f"\nERROR: Timed out after {exc.timeout}s: {' '.join(str(a) for a in exc.cmd)}", file=sys.stderr) + sys.exit(124) + except KeyboardInterrupt: + sys.exit(130) diff --git a/scripts/prod_query.py b/scripts/prod_query.py deleted file mode 100644 index 0d7eee6..0000000 --- a/scripts/prod_query.py +++ /dev/null @@ -1,124 +0,0 @@ -""" -Run a read-only SQL query against the production DuckDB (analytics or lakehouse). - -Usage: - uv run python scripts/prod_query.py "SELECT COUNT(*) FROM serving.location_profiles" - uv run python scripts/prod_query.py --db lakehouse "SELECT * FROM foundation.dim_countries LIMIT 5" - echo "SELECT 1" | uv run python scripts/prod_query.py --stdin - -The script SSHes to the prod server, runs the query via Python/DuckDB, and prints -tab-separated results with a header row. Read-only: DuckDB is opened with read_only=True. - -For lakehouse queries, automatically aliases the catalog as "local" so SQLMesh views work. - -Designed for Claude Code to call without nested shell escaping nightmares. -""" - -import argparse -import base64 -import subprocess -import sys - -SSH_HOST = "hetzner_root" -SSH_USER = "padelnomics_service" -DB_PATHS = { - "analytics": "/opt/padelnomics/data/analytics.duckdb", - "lakehouse": "/opt/padelnomics/data/lakehouse.duckdb", -} -MAX_ROWS = 500 -TIMEOUT_SECONDS = 30 - -# Mutation keywords blocked (defense in depth — DB is read_only anyway) -BLOCKED_KEYWORDS = {"CREATE", "DROP", "ALTER", "INSERT", "UPDATE", "DELETE", "ATTACH", "COPY", "EXPORT", "INSTALL", "LOAD"} - -# Remote Python script template. Receives SQL as base64 via {b64_sql}. -# Uses ATTACH + USE to alias the lakehouse catalog as "local" for SQLMesh view compat. -REMOTE_SCRIPT = """\ -import duckdb, json, sys, base64 -db_path = "{db_path}" -sql = base64.b64decode("{b64_sql}").decode() -max_rows = {max_rows} -output_json = {output_json} -try: - if "lakehouse" in db_path: - con = duckdb.connect(":memory:") - con.execute(f"ATTACH '{db_path}' AS local (READ_ONLY)") - con.execute("USE local") - else: - con = duckdb.connect(db_path, read_only=True) - result = con.execute(sql) - cols = [d[0] for d in result.description] - rows = result.fetchmany(max_rows) - if output_json: - print(json.dumps({{"columns": cols, "rows": [list(r) for r in rows], "count": len(rows)}}, default=str)) - else: - print("\\t".join(cols)) - for row in rows: - print("\\t".join(str(v) if v is not None else "NULL" for v in row)) - if len(rows) == max_rows: - print(f"... truncated at {{max_rows}} rows", file=sys.stderr) -except Exception as e: - print(f"ERROR: {{e}}", file=sys.stderr) - sys.exit(1) -""" - - -def main(): - parser = argparse.ArgumentParser(description="Query prod DuckDB over SSH") - parser.add_argument("sql", nargs="?", help="SQL query to run") - parser.add_argument("--stdin", action="store_true", help="Read SQL from stdin") - parser.add_argument( - "--db", - choices=list(DB_PATHS.keys()), - default="analytics", - help="Which database (default: analytics)", - ) - parser.add_argument( - "--max-rows", type=int, default=MAX_ROWS, help=f"Max rows (default: {MAX_ROWS})" - ) - parser.add_argument("--json", action="store_true", help="Output JSON instead of TSV") - args = parser.parse_args() - - sql = args.sql - if args.stdin or sql is None: - sql = sys.stdin.read().strip() - if not sql: - print("ERROR: No SQL provided", file=sys.stderr) - sys.exit(1) - - sql_upper = sql.upper() - for kw in BLOCKED_KEYWORDS: - if kw in sql_upper: - print(f"ERROR: Blocked keyword '{kw}' in query", file=sys.stderr) - sys.exit(1) - - b64_sql = base64.b64encode(sql.encode()).decode() - remote_script = REMOTE_SCRIPT.format( - db_path=DB_PATHS[args.db], - b64_sql=b64_sql, - max_rows=args.max_rows, - output_json=args.json, - ) - - cmd = [ - "ssh", SSH_HOST, - f"sudo -u {SSH_USER} bash -lc 'cd /opt/padelnomics && uv run python3 -'", - ] - - result = subprocess.run( - cmd, - input=remote_script, - capture_output=True, - text=True, - timeout=TIMEOUT_SECONDS + 10, - ) - - if result.stdout: - print(result.stdout, end="") - if result.stderr: - print(result.stderr, end="", file=sys.stderr) - sys.exit(result.returncode) - - -if __name__ == "__main__": - main()