chore: move ci.py to ~/.claude/scripts (uv inline script, no project dep)
Script now lives globally as a uv inline-dependency script. Removes per-project scripts/ci.py and the msgspec dev dependency. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
356
scripts/ci.py
356
scripts/ci.py
@@ -1,356 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Query Gitea Actions CI pipeline status.
|
||||
|
||||
Usage:
|
||||
uv run python scripts/ci.py # list last 5 runs
|
||||
uv run python scripts/ci.py --limit 10 # list last 10 runs
|
||||
uv run python scripts/ci.py --branch master # filter by branch
|
||||
uv run python scripts/ci.py --run-id 44 # inspect specific run + logs
|
||||
uv run python scripts/ci.py --logs # list mode with failed job logs
|
||||
uv run python scripts/ci.py --tail-lines 200 # more log context
|
||||
|
||||
Requires: GITEA_TOKEN env var with read:repository + actions scopes.
|
||||
Remote URL is auto-detected from git remote origin (SSH format:
|
||||
ssh://git@HOST:PORT/OWNER/REPO.git).
|
||||
Override with GITEA_BASE_URL + GITEA_REPO (e.g. GITEA_REPO=owner/repo).
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import msgspec
|
||||
import niquests
|
||||
|
||||
_REQUEST_TIMEOUT_SECONDS = 30
|
||||
_ANSI_ESCAPE = re.compile(r"\x1b\[[0-9;]*[a-zA-Z]")
|
||||
|
||||
|
||||
# ── API boundary types ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class GiteaRun(msgspec.Struct):
|
||||
id: int
|
||||
display_title: str
|
||||
status: str
|
||||
conclusion: str | None = None
|
||||
head_branch: str = ""
|
||||
head_sha: str = ""
|
||||
html_url: str = ""
|
||||
started_at: str | None = None
|
||||
completed_at: str | None = None
|
||||
|
||||
|
||||
class GiteaRunsResponse(msgspec.Struct):
|
||||
workflow_runs: list[GiteaRun]
|
||||
total_count: int
|
||||
|
||||
|
||||
class GiteaStep(msgspec.Struct):
|
||||
name: str
|
||||
status: str
|
||||
conclusion: str | None = None
|
||||
|
||||
|
||||
class GiteaJob(msgspec.Struct):
|
||||
id: int
|
||||
run_id: int
|
||||
name: str
|
||||
status: str
|
||||
conclusion: str | None = None
|
||||
steps: list[GiteaStep] | None = None # API returns null when job hasn't run yet
|
||||
|
||||
|
||||
class GiteaJobsResponse(msgspec.Struct):
|
||||
jobs: list[GiteaJob]
|
||||
total_count: int
|
||||
|
||||
|
||||
# ── Remote detection ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def parse_gitea_remote() -> tuple[str, str, str]:
|
||||
"""Return (api_base_url, owner, repo) from git remote or env vars.
|
||||
|
||||
Parses SSH remote URL: ssh://git@HOST:PORT/OWNER/REPO.git
|
||||
Falls back to GITEA_BASE_URL + GITEA_REPO env vars.
|
||||
"""
|
||||
base_url = os.environ.get("GITEA_BASE_URL", "")
|
||||
repo_slug = os.environ.get("GITEA_REPO", "")
|
||||
if base_url and repo_slug:
|
||||
parts = repo_slug.split("/", 1)
|
||||
assert len(parts) == 2, f"GITEA_REPO must be OWNER/REPO, got: {repo_slug}"
|
||||
return base_url.rstrip("/"), parts[0], parts[1]
|
||||
|
||||
try:
|
||||
raw_url = subprocess.check_output(
|
||||
["git", "remote", "get-url", "origin"],
|
||||
stderr=subprocess.DEVNULL,
|
||||
text=True,
|
||||
).strip()
|
||||
except subprocess.CalledProcessError:
|
||||
print(
|
||||
"Error: could not get git remote URL. Set GITEA_BASE_URL and GITEA_REPO.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
parsed = urlparse(raw_url)
|
||||
host = parsed.hostname
|
||||
assert host, f"Could not parse host from remote URL: {raw_url}"
|
||||
|
||||
path_parts = parsed.path.lstrip("/").split("/")
|
||||
assert len(path_parts) >= 2, f"Could not parse owner/repo from remote URL: {raw_url}"
|
||||
|
||||
owner = path_parts[0]
|
||||
repo = path_parts[1].removesuffix(".git")
|
||||
assert owner, f"Empty owner in remote URL: {raw_url}"
|
||||
assert repo, f"Empty repo in remote URL: {raw_url}"
|
||||
|
||||
return f"https://{host}", owner, repo
|
||||
|
||||
|
||||
# ── API calls ─────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def fetch_runs(
|
||||
session: niquests.Session,
|
||||
api_base: str,
|
||||
owner: str,
|
||||
repo: str,
|
||||
*,
|
||||
limit: int,
|
||||
branch: str,
|
||||
) -> list[GiteaRun]:
|
||||
assert limit > 0
|
||||
assert limit <= 20
|
||||
|
||||
# page=1 is required for limit to be respected by Gitea's Actions API
|
||||
params: dict[str, str | int] = {"limit": limit, "page": 1}
|
||||
if branch:
|
||||
params["branch"] = branch
|
||||
|
||||
resp = session.get(
|
||||
f"{api_base}/api/v1/repos/{owner}/{repo}/actions/runs",
|
||||
params=params,
|
||||
timeout=_REQUEST_TIMEOUT_SECONDS,
|
||||
)
|
||||
if resp.status_code == 404:
|
||||
print("Warning: Actions API endpoint not found — check Gitea version/config.", file=sys.stderr)
|
||||
return []
|
||||
_check_response(resp, "fetch runs")
|
||||
return msgspec.json.decode(resp.content, type=GiteaRunsResponse).workflow_runs
|
||||
|
||||
|
||||
def fetch_jobs(
|
||||
session: niquests.Session,
|
||||
api_base: str,
|
||||
owner: str,
|
||||
repo: str,
|
||||
run_id: int,
|
||||
) -> list[GiteaJob]:
|
||||
resp = session.get(
|
||||
f"{api_base}/api/v1/repos/{owner}/{repo}/actions/runs/{run_id}/jobs",
|
||||
timeout=_REQUEST_TIMEOUT_SECONDS,
|
||||
)
|
||||
if resp.status_code == 404:
|
||||
print(f"Warning: run #{run_id} not found.", file=sys.stderr)
|
||||
return []
|
||||
_check_response(resp, f"fetch jobs for run #{run_id}")
|
||||
return msgspec.json.decode(resp.content, type=GiteaJobsResponse).jobs
|
||||
|
||||
|
||||
def fetch_job_log(
|
||||
session: niquests.Session,
|
||||
api_base: str,
|
||||
owner: str,
|
||||
repo: str,
|
||||
job_id: int,
|
||||
tail_lines: int,
|
||||
) -> str:
|
||||
assert tail_lines > 0
|
||||
assert tail_lines <= 500
|
||||
|
||||
resp = session.get(
|
||||
f"{api_base}/api/v1/repos/{owner}/{repo}/actions/jobs/{job_id}/logs",
|
||||
timeout=_REQUEST_TIMEOUT_SECONDS,
|
||||
)
|
||||
if resp.status_code == 403:
|
||||
# Gitea issue #36268: log access may require additional token scopes
|
||||
return "[log unavailable — 403 Forbidden; ensure token has actions read scope (see Gitea issue #36268)]"
|
||||
if resp.status_code == 404:
|
||||
return "[log not found — job may still be running or logs have expired]"
|
||||
_check_response(resp, f"fetch log for job #{job_id}")
|
||||
|
||||
text = _ANSI_ESCAPE.sub("", resp.text)
|
||||
lines = text.splitlines()
|
||||
return "\n".join(lines[-tail_lines:])
|
||||
|
||||
|
||||
def _check_response(resp: niquests.Response, context: str) -> None:
|
||||
if resp.status_code == 401:
|
||||
print(f"Error: 401 Unauthorized ({context}). Check GITEA_TOKEN.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
if resp.status_code == 403:
|
||||
print(f"Error: 403 Forbidden ({context}). Token may lack required scopes.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
resp.raise_for_status()
|
||||
|
||||
|
||||
# ── Formatting ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def format_run_summary(run: GiteaRun) -> str:
|
||||
icon = "[OK] " if run.conclusion == "success" else "[FAIL]"
|
||||
sha = run.head_sha[:7] if run.head_sha else "unknown"
|
||||
duration = _format_duration(run.started_at, run.completed_at)
|
||||
return f"{icon} #{run.id} \"{run.display_title}\" on {run.head_branch} ({sha}) -- {duration}"
|
||||
|
||||
|
||||
def format_job_detail(job: GiteaJob) -> str:
|
||||
icon = " [ok]" if job.conclusion == "success" else "[FAIL]"
|
||||
lines = [f"{icon} {job.name}"]
|
||||
failed_steps = [s.name for s in (job.steps or []) if s.conclusion not in (None, "success", "skipped")]
|
||||
if failed_steps:
|
||||
lines.append(f" Failed steps: {', '.join(failed_steps)}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def format_log_block(job_name: str, log_text: str) -> str:
|
||||
return f"--- LOG: {job_name} ---\n{log_text}\n--- END LOG: {job_name} ---"
|
||||
|
||||
|
||||
def _format_duration(started_at: str | None, completed_at: str | None) -> str:
|
||||
if not started_at or not completed_at:
|
||||
return "?"
|
||||
try:
|
||||
start = datetime.fromisoformat(started_at.replace("Z", "+00:00"))
|
||||
end = datetime.fromisoformat(completed_at.replace("Z", "+00:00"))
|
||||
total_seconds = int((end - start).total_seconds())
|
||||
if total_seconds < 0:
|
||||
return "?"
|
||||
minutes, seconds = divmod(total_seconds, 60)
|
||||
return f"{minutes}m{seconds:02d}s"
|
||||
except (ValueError, AttributeError):
|
||||
return "?"
|
||||
|
||||
|
||||
# ── Orchestration ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _list_runs(
|
||||
session: niquests.Session,
|
||||
api_base: str,
|
||||
owner: str,
|
||||
repo: str,
|
||||
limit: int,
|
||||
branch: str,
|
||||
show_logs: bool,
|
||||
tail_lines: int,
|
||||
) -> None:
|
||||
runs = fetch_runs(session, api_base, owner, repo, limit=limit, branch=branch)
|
||||
if not runs:
|
||||
print("No runs found.")
|
||||
return
|
||||
|
||||
print(f"=== CI Pipeline Status (last {len(runs)} runs) ===\n")
|
||||
|
||||
passed = 0
|
||||
failed = 0
|
||||
in_progress = 0
|
||||
|
||||
for run in runs:
|
||||
print(format_run_summary(run))
|
||||
|
||||
if run.conclusion == "success":
|
||||
passed += 1
|
||||
elif run.conclusion is None:
|
||||
in_progress += 1
|
||||
else:
|
||||
failed += 1
|
||||
jobs = fetch_jobs(session, api_base, owner, repo, run.id)
|
||||
failed_jobs = [j for j in jobs if j.conclusion not in ("success", None, "skipped")]
|
||||
if failed_jobs:
|
||||
print(f" Failed jobs: {', '.join(j.name for j in failed_jobs)}")
|
||||
if show_logs:
|
||||
for job in failed_jobs:
|
||||
log = fetch_job_log(session, api_base, owner, repo, job.id, tail_lines)
|
||||
print(f"\n{format_log_block(job.name, log)}")
|
||||
|
||||
parts = [f"{passed} passed", f"{failed} failed"]
|
||||
if in_progress:
|
||||
parts.append(f"{in_progress} in progress")
|
||||
print(f"\nSummary: {', '.join(parts)} out of {len(runs)} runs")
|
||||
|
||||
|
||||
def _inspect_run(
|
||||
session: niquests.Session,
|
||||
api_base: str,
|
||||
owner: str,
|
||||
repo: str,
|
||||
run_id: int,
|
||||
tail_lines: int,
|
||||
) -> None:
|
||||
jobs = fetch_jobs(session, api_base, owner, repo, run_id)
|
||||
if not jobs:
|
||||
print(f"No jobs found for run #{run_id}.")
|
||||
return
|
||||
|
||||
print(f"=== Run #{run_id} — {len(jobs)} job(s) ===\n")
|
||||
for job in jobs:
|
||||
print(format_job_detail(job))
|
||||
|
||||
failed_jobs = [j for j in jobs if j.conclusion not in ("success", None, "skipped")]
|
||||
if not failed_jobs:
|
||||
return
|
||||
|
||||
print(f"\nFetching logs for {len(failed_jobs)} failed job(s)...")
|
||||
for job in failed_jobs:
|
||||
log = fetch_job_log(session, api_base, owner, repo, job.id, tail_lines)
|
||||
print(f"\n{format_log_block(job.name, log)}")
|
||||
|
||||
|
||||
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def main() -> None:
|
||||
token = os.environ.get("GITEA_TOKEN", "")
|
||||
if not token:
|
||||
print("Error: GITEA_TOKEN environment variable not set.", file=sys.stderr)
|
||||
print("Generate a token: Gitea → Settings → Applications → Access Tokens", file=sys.stderr)
|
||||
print("Required scopes: read:repository + actions read.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Query Gitea Actions CI pipeline status.",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
)
|
||||
parser.add_argument("--limit", type=int, default=5, help="Runs to show (max 20, default 5)")
|
||||
parser.add_argument("--branch", default="", help="Filter by branch name")
|
||||
parser.add_argument("--run-id", type=int, default=0, help="Inspect a specific run (all jobs + logs)")
|
||||
parser.add_argument("--logs", action="store_true", help="Include truncated logs for failed jobs")
|
||||
parser.add_argument("--tail-lines", type=int, default=150, help="Log lines per failed job (max 500, default 150)")
|
||||
args = parser.parse_args()
|
||||
|
||||
assert args.limit > 0, f"--limit must be positive, got {args.limit}"
|
||||
assert args.limit <= 20, f"--limit must be ≤ 20, got {args.limit}"
|
||||
assert args.tail_lines > 0, f"--tail-lines must be positive, got {args.tail_lines}"
|
||||
assert args.tail_lines <= 500, f"--tail-lines must be ≤ 500, got {args.tail_lines}"
|
||||
|
||||
api_base, owner, repo = parse_gitea_remote()
|
||||
|
||||
session = niquests.Session()
|
||||
session.headers["Authorization"] = f"token {token}"
|
||||
|
||||
if args.run_id:
|
||||
_inspect_run(session, api_base, owner, repo, args.run_id, args.tail_lines)
|
||||
else:
|
||||
_list_runs(session, api_base, owner, repo, args.limit, args.branch, args.logs, args.tail_lines)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user