Files
beanflows/src/beanflows_pipeline/cli.py
Deeman 9201a4dca9
Some checks failed
CI / test-cli (push) Failing after 10s
CI / test-sqlmesh (push) Failing after 9s
CI / test-web (push) Successful in 15s
CI / tag (push) Has been skipped
fix: rename secrets.py → vault.py to avoid shadowing stdlib secrets module
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 23:33:06 +01:00

159 lines
4.5 KiB
Python

"""Materia CLI - Management interface for BeanFlows.coffee infrastructure."""
from typing import Annotated
import typer
app = typer.Typer(
name="beanflows",
help="BeanFlows.coffee data platform management CLI",
no_args_is_help=True,
)
@app.command()
def version():
"""Show Materia version."""
typer.echo("Materia CLI v0.1.0")
worker_app = typer.Typer(help="Manage worker instances")
app.add_typer(worker_app, name="worker")
@worker_app.command("list")
def worker_list(
provider: Annotated[str, typer.Option("--provider", "-p")] = "hetzner",
):
"""List all active worker instances."""
from beanflows_pipeline.workers import list_workers
workers = list_workers(provider)
if not workers:
typer.echo("No active workers")
return
typer.echo(f"{'NAME':<30} {'IP':<15} {'TYPE':<10} {'STATUS':<10}")
typer.echo("-" * 70)
for worker in workers:
typer.echo(f"{worker.name:<30} {worker.ip:<15} {worker.type:<10} {worker.status:<10}")
@worker_app.command("create")
def worker_create(
name: Annotated[str, typer.Argument(help="Worker name")],
server_type: Annotated[str, typer.Option("--type", "-t")] = "ccx22",
provider: Annotated[str, typer.Option("--provider", "-p")] = "hetzner",
location: Annotated[str | None, typer.Option("--location", "-l")] = None,
):
"""Create a new worker instance."""
from beanflows_pipeline.workers import create_worker
typer.echo(f"Creating worker '{name}' ({server_type}) on {provider}...")
worker = create_worker(name, server_type, provider, location)
typer.echo(f"✓ Worker created: {worker.ip}")
@worker_app.command("destroy")
def worker_destroy(
name: Annotated[str, typer.Argument(help="Worker name")],
provider: Annotated[str, typer.Option("--provider", "-p")] = "hetzner",
force: Annotated[bool, typer.Option("--force", "-f")] = False,
):
"""Destroy a worker instance."""
from beanflows_pipeline.workers import destroy_worker
if not force:
confirm = typer.confirm(f"Destroy worker '{name}'?")
if not confirm:
raise typer.Abort()
typer.echo(f"Destroying worker '{name}'...")
destroy_worker(name, provider)
typer.echo("✓ Worker destroyed")
pipeline_app = typer.Typer(help="Execute data pipelines")
app.add_typer(pipeline_app, name="pipeline")
@pipeline_app.command("run")
def pipeline_run(
name: Annotated[str, typer.Argument(help="Pipeline name (extract, transform)")],
):
"""Run a pipeline locally."""
from beanflows_pipeline.pipelines import run_pipeline
typer.echo(f"Running pipeline '{name}'...")
result = run_pipeline(name)
if result.success:
typer.echo(result.output)
typer.echo("\n✓ Pipeline completed successfully")
else:
typer.echo(result.error, err=True)
raise typer.Exit(1)
@pipeline_app.command("list")
def pipeline_list():
"""List available pipelines."""
from beanflows_pipeline.pipelines import PIPELINES
typer.echo("Available pipelines:")
for name, config in PIPELINES.items():
cmd = " ".join(config["command"])
typer.echo(f"{name:<15} (command: {cmd}, timeout: {config['timeout_seconds']}s)")
secrets_app = typer.Typer(help="Manage secrets via SOPS + age")
app.add_typer(secrets_app, name="secrets")
@secrets_app.command("list")
def secrets_list():
"""List available secrets (keys only)."""
from beanflows_pipeline.vault import list_secrets
secrets = list_secrets()
if not secrets:
typer.echo("No secrets configured")
return
typer.echo("Available secrets:")
for key in secrets:
typer.echo(f"{key}")
@secrets_app.command("get")
def secrets_get(
key: Annotated[str, typer.Argument(help="Secret key")],
):
"""Get a secret value."""
from beanflows_pipeline.vault import get_secret
value = get_secret(key)
if value is None:
typer.echo(f"Secret '{key}' not found", err=True)
raise typer.Exit(1)
typer.echo(value)
@secrets_app.command("test")
def secrets_test():
"""Test sops decryption (verifies sops is installed and age key is present)."""
from beanflows_pipeline.vault import test_connection
typer.echo("Testing SOPS decryption...")
if test_connection():
typer.echo("✓ SOPS decryption successful")
else:
typer.echo("✗ SOPS decryption failed", err=True)
typer.echo("\nMake sure sops is installed and your age key is at ~/.config/sops/age/keys.txt")
raise typer.Exit(1)
if __name__ == "__main__":
app()