#!/usr/bin/env python3 """Query Gitea Actions CI pipeline status. Usage: uv run python scripts/ci.py # list last 5 runs uv run python scripts/ci.py --limit 10 # list last 10 runs uv run python scripts/ci.py --branch master # filter by branch uv run python scripts/ci.py --run-id 44 # inspect specific run + logs uv run python scripts/ci.py --logs # list mode with failed job logs uv run python scripts/ci.py --tail-lines 200 # more log context Requires: GITEA_TOKEN env var with read:repository + actions scopes. Remote URL is auto-detected from git remote origin (SSH format: ssh://git@HOST:PORT/OWNER/REPO.git). Override with GITEA_BASE_URL + GITEA_REPO (e.g. GITEA_REPO=owner/repo). """ import argparse import os import re import subprocess import sys from datetime import datetime from urllib.parse import urlparse import msgspec import niquests _REQUEST_TIMEOUT_SECONDS = 30 _ANSI_ESCAPE = re.compile(r"\x1b\[[0-9;]*[a-zA-Z]") # ── API boundary types ──────────────────────────────────────────────────────── class GiteaRun(msgspec.Struct): id: int display_title: str status: str conclusion: str | None = None head_branch: str = "" head_sha: str = "" html_url: str = "" started_at: str | None = None completed_at: str | None = None class GiteaRunsResponse(msgspec.Struct): workflow_runs: list[GiteaRun] total_count: int class GiteaStep(msgspec.Struct): name: str status: str conclusion: str | None = None class GiteaJob(msgspec.Struct): id: int run_id: int name: str status: str conclusion: str | None = None steps: list[GiteaStep] | None = None # API returns null when job hasn't run yet class GiteaJobsResponse(msgspec.Struct): jobs: list[GiteaJob] total_count: int # ── Remote detection ────────────────────────────────────────────────────────── def parse_gitea_remote() -> tuple[str, str, str]: """Return (api_base_url, owner, repo) from git remote or env vars. Parses SSH remote URL: ssh://git@HOST:PORT/OWNER/REPO.git Falls back to GITEA_BASE_URL + GITEA_REPO env vars. """ base_url = os.environ.get("GITEA_BASE_URL", "") repo_slug = os.environ.get("GITEA_REPO", "") if base_url and repo_slug: parts = repo_slug.split("/", 1) assert len(parts) == 2, f"GITEA_REPO must be OWNER/REPO, got: {repo_slug}" return base_url.rstrip("/"), parts[0], parts[1] try: raw_url = subprocess.check_output( ["git", "remote", "get-url", "origin"], stderr=subprocess.DEVNULL, text=True, ).strip() except subprocess.CalledProcessError: print( "Error: could not get git remote URL. Set GITEA_BASE_URL and GITEA_REPO.", file=sys.stderr, ) sys.exit(1) parsed = urlparse(raw_url) host = parsed.hostname assert host, f"Could not parse host from remote URL: {raw_url}" path_parts = parsed.path.lstrip("/").split("/") assert len(path_parts) >= 2, f"Could not parse owner/repo from remote URL: {raw_url}" owner = path_parts[0] repo = path_parts[1].removesuffix(".git") assert owner, f"Empty owner in remote URL: {raw_url}" assert repo, f"Empty repo in remote URL: {raw_url}" return f"https://{host}", owner, repo # ── API calls ───────────────────────────────────────────────────────────────── def fetch_runs( session: niquests.Session, api_base: str, owner: str, repo: str, *, limit: int, branch: str, ) -> list[GiteaRun]: assert limit > 0 assert limit <= 20 # page=1 is required for limit to be respected by Gitea's Actions API params: dict[str, str | int] = {"limit": limit, "page": 1} if branch: params["branch"] = branch resp = session.get( f"{api_base}/api/v1/repos/{owner}/{repo}/actions/runs", params=params, timeout=_REQUEST_TIMEOUT_SECONDS, ) if resp.status_code == 404: print("Warning: Actions API endpoint not found — check Gitea version/config.", file=sys.stderr) return [] _check_response(resp, "fetch runs") return msgspec.json.decode(resp.content, type=GiteaRunsResponse).workflow_runs def fetch_jobs( session: niquests.Session, api_base: str, owner: str, repo: str, run_id: int, ) -> list[GiteaJob]: resp = session.get( f"{api_base}/api/v1/repos/{owner}/{repo}/actions/runs/{run_id}/jobs", timeout=_REQUEST_TIMEOUT_SECONDS, ) if resp.status_code == 404: print(f"Warning: run #{run_id} not found.", file=sys.stderr) return [] _check_response(resp, f"fetch jobs for run #{run_id}") return msgspec.json.decode(resp.content, type=GiteaJobsResponse).jobs def fetch_job_log( session: niquests.Session, api_base: str, owner: str, repo: str, job_id: int, tail_lines: int, ) -> str: assert tail_lines > 0 assert tail_lines <= 500 resp = session.get( f"{api_base}/api/v1/repos/{owner}/{repo}/actions/jobs/{job_id}/logs", timeout=_REQUEST_TIMEOUT_SECONDS, ) if resp.status_code == 403: # Gitea issue #36268: log access may require additional token scopes return "[log unavailable — 403 Forbidden; ensure token has actions read scope (see Gitea issue #36268)]" if resp.status_code == 404: return "[log not found — job may still be running or logs have expired]" _check_response(resp, f"fetch log for job #{job_id}") text = _ANSI_ESCAPE.sub("", resp.text) lines = text.splitlines() return "\n".join(lines[-tail_lines:]) def _check_response(resp: niquests.Response, context: str) -> None: if resp.status_code == 401: print(f"Error: 401 Unauthorized ({context}). Check GITEA_TOKEN.", file=sys.stderr) sys.exit(1) if resp.status_code == 403: print(f"Error: 403 Forbidden ({context}). Token may lack required scopes.", file=sys.stderr) sys.exit(1) resp.raise_for_status() # ── Formatting ──────────────────────────────────────────────────────────────── def format_run_summary(run: GiteaRun) -> str: icon = "[OK] " if run.conclusion == "success" else "[FAIL]" sha = run.head_sha[:7] if run.head_sha else "unknown" duration = _format_duration(run.started_at, run.completed_at) return f"{icon} #{run.id} \"{run.display_title}\" on {run.head_branch} ({sha}) -- {duration}" def format_job_detail(job: GiteaJob) -> str: icon = " [ok]" if job.conclusion == "success" else "[FAIL]" lines = [f"{icon} {job.name}"] failed_steps = [s.name for s in (job.steps or []) if s.conclusion not in (None, "success", "skipped")] if failed_steps: lines.append(f" Failed steps: {', '.join(failed_steps)}") return "\n".join(lines) def format_log_block(job_name: str, log_text: str) -> str: return f"--- LOG: {job_name} ---\n{log_text}\n--- END LOG: {job_name} ---" def _format_duration(started_at: str | None, completed_at: str | None) -> str: if not started_at or not completed_at: return "?" try: start = datetime.fromisoformat(started_at.replace("Z", "+00:00")) end = datetime.fromisoformat(completed_at.replace("Z", "+00:00")) total_seconds = int((end - start).total_seconds()) if total_seconds < 0: return "?" minutes, seconds = divmod(total_seconds, 60) return f"{minutes}m{seconds:02d}s" except (ValueError, AttributeError): return "?" # ── Orchestration ───────────────────────────────────────────────────────────── def _list_runs( session: niquests.Session, api_base: str, owner: str, repo: str, limit: int, branch: str, show_logs: bool, tail_lines: int, ) -> None: runs = fetch_runs(session, api_base, owner, repo, limit=limit, branch=branch) if not runs: print("No runs found.") return print(f"=== CI Pipeline Status (last {len(runs)} runs) ===\n") passed = 0 failed = 0 in_progress = 0 for run in runs: print(format_run_summary(run)) if run.conclusion == "success": passed += 1 elif run.conclusion is None: in_progress += 1 else: failed += 1 jobs = fetch_jobs(session, api_base, owner, repo, run.id) failed_jobs = [j for j in jobs if j.conclusion not in ("success", None, "skipped")] if failed_jobs: print(f" Failed jobs: {', '.join(j.name for j in failed_jobs)}") if show_logs: for job in failed_jobs: log = fetch_job_log(session, api_base, owner, repo, job.id, tail_lines) print(f"\n{format_log_block(job.name, log)}") parts = [f"{passed} passed", f"{failed} failed"] if in_progress: parts.append(f"{in_progress} in progress") print(f"\nSummary: {', '.join(parts)} out of {len(runs)} runs") def _inspect_run( session: niquests.Session, api_base: str, owner: str, repo: str, run_id: int, tail_lines: int, ) -> None: jobs = fetch_jobs(session, api_base, owner, repo, run_id) if not jobs: print(f"No jobs found for run #{run_id}.") return print(f"=== Run #{run_id} — {len(jobs)} job(s) ===\n") for job in jobs: print(format_job_detail(job)) failed_jobs = [j for j in jobs if j.conclusion not in ("success", None, "skipped")] if not failed_jobs: return print(f"\nFetching logs for {len(failed_jobs)} failed job(s)...") for job in failed_jobs: log = fetch_job_log(session, api_base, owner, repo, job.id, tail_lines) print(f"\n{format_log_block(job.name, log)}") # ── Entry point ─────────────────────────────────────────────────────────────── def main() -> None: token = os.environ.get("GITEA_TOKEN", "") if not token: print("Error: GITEA_TOKEN environment variable not set.", file=sys.stderr) print("Generate a token: Gitea → Settings → Applications → Access Tokens", file=sys.stderr) print("Required scopes: read:repository + actions read.", file=sys.stderr) sys.exit(1) parser = argparse.ArgumentParser( description="Query Gitea Actions CI pipeline status.", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--limit", type=int, default=5, help="Runs to show (max 20, default 5)") parser.add_argument("--branch", default="", help="Filter by branch name") parser.add_argument("--run-id", type=int, default=0, help="Inspect a specific run (all jobs + logs)") parser.add_argument("--logs", action="store_true", help="Include truncated logs for failed jobs") parser.add_argument("--tail-lines", type=int, default=150, help="Log lines per failed job (max 500, default 150)") args = parser.parse_args() assert args.limit > 0, f"--limit must be positive, got {args.limit}" assert args.limit <= 20, f"--limit must be ≤ 20, got {args.limit}" assert args.tail_lines > 0, f"--tail-lines must be positive, got {args.tail_lines}" assert args.tail_lines <= 500, f"--tail-lines must be ≤ 500, got {args.tail_lines}" api_base, owner, repo = parse_gitea_remote() session = niquests.Session() session.headers["Authorization"] = f"token {token}" if args.run_id: _inspect_run(session, api_base, owner, repo, args.run_id, args.tail_lines) else: _list_runs(session, api_base, owner, repo, args.limit, args.branch, args.logs, args.tail_lines) if __name__ == "__main__": main()