implement cli/infra update cicd
This commit is contained in:
139
src/materia/pipelines.py
Normal file
139
src/materia/pipelines.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""Pipeline execution on ephemeral workers."""
|
||||
|
||||
import paramiko
|
||||
from dataclasses import dataclass
|
||||
|
||||
from materia.workers import create_worker, destroy_worker
|
||||
from materia.secrets import get_secret
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineConfig:
|
||||
worker_type: str
|
||||
artifact: str
|
||||
command: str
|
||||
secrets: list[str]
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineResult:
|
||||
success: bool
|
||||
output: str
|
||||
error: str | None = None
|
||||
|
||||
|
||||
PIPELINES = {
|
||||
"extract": PipelineConfig(
|
||||
worker_type="ccx12",
|
||||
artifact="materia-extract-latest.tar.gz",
|
||||
command="./extract_psd",
|
||||
secrets=["R2_ACCESS_KEY_ID", "R2_SECRET_ACCESS_KEY", "R2_ENDPOINT", "R2_ARTIFACTS_BUCKET"],
|
||||
),
|
||||
"transform": PipelineConfig(
|
||||
worker_type="ccx22",
|
||||
artifact="materia-transform-latest.tar.gz",
|
||||
command="cd sqlmesh_materia && ./sqlmesh plan prod",
|
||||
secrets=[
|
||||
"CLOUDFLARE_API_TOKEN",
|
||||
"ICEBERG_REST_URI",
|
||||
"R2_WAREHOUSE_NAME",
|
||||
],
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def _execute_ssh_command(ip: str, command: str, env_vars: dict[str, str]) -> tuple[str, str, int]:
|
||||
ssh_key_path = get_secret("SSH_PRIVATE_KEY_PATH")
|
||||
if not ssh_key_path:
|
||||
raise ValueError("SSH_PRIVATE_KEY_PATH not found in secrets")
|
||||
|
||||
client = paramiko.SSHClient()
|
||||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
|
||||
pkey = paramiko.RSAKey.from_private_key_file(ssh_key_path)
|
||||
client.connect(ip, username="root", pkey=pkey)
|
||||
|
||||
env_string = " ".join([f"export {k}='{v}' &&" for k, v in env_vars.items()])
|
||||
full_command = f"{env_string} {command}" if env_vars else command
|
||||
|
||||
stdin, stdout, stderr = client.exec_command(full_command)
|
||||
exit_code = stdout.channel.recv_exit_status()
|
||||
|
||||
output = stdout.read().decode()
|
||||
error = stderr.read().decode()
|
||||
|
||||
client.close()
|
||||
|
||||
return output, error, exit_code
|
||||
|
||||
|
||||
def run_pipeline(
|
||||
pipeline_name: str,
|
||||
worker_type: str | None = None,
|
||||
auto_destroy: bool = True,
|
||||
provider: str = "hetzner",
|
||||
) -> PipelineResult:
|
||||
if pipeline_name not in PIPELINES:
|
||||
return PipelineResult(
|
||||
success=False,
|
||||
output="",
|
||||
error=f"Unknown pipeline: {pipeline_name}. Available: {', '.join(PIPELINES.keys())}",
|
||||
)
|
||||
|
||||
pipeline_config = PIPELINES[pipeline_name]
|
||||
worker_type = worker_type or pipeline_config.worker_type
|
||||
worker_name = f"materia-{pipeline_name}-worker"
|
||||
|
||||
r2_bucket = get_secret("R2_ARTIFACTS_BUCKET") or "materia-artifacts"
|
||||
r2_endpoint = get_secret("R2_ENDPOINT")
|
||||
|
||||
if not r2_endpoint:
|
||||
return PipelineResult(
|
||||
success=False,
|
||||
output="",
|
||||
error="R2_ENDPOINT not configured in secrets",
|
||||
)
|
||||
|
||||
try:
|
||||
worker = create_worker(worker_name, worker_type, provider)
|
||||
|
||||
artifact_url = f"https://{r2_endpoint}/{r2_bucket}/{pipeline_config.artifact}"
|
||||
|
||||
bootstrap_commands = [
|
||||
f"curl -fsSL -o artifact.tar.gz {artifact_url}",
|
||||
"tar -xzf artifact.tar.gz",
|
||||
"chmod +x -R .",
|
||||
]
|
||||
|
||||
for cmd in bootstrap_commands:
|
||||
_, error, exit_code = _execute_ssh_command(worker.ip, cmd, {})
|
||||
if exit_code != 0:
|
||||
return PipelineResult(
|
||||
success=False,
|
||||
output="",
|
||||
error=f"Bootstrap failed: {error}",
|
||||
)
|
||||
|
||||
env_vars = {}
|
||||
for secret_key in pipeline_config.secrets:
|
||||
value = get_secret(secret_key)
|
||||
if value:
|
||||
env_vars[secret_key] = value
|
||||
|
||||
command = pipeline_config.command
|
||||
output, error, exit_code = _execute_ssh_command(worker.ip, command, env_vars)
|
||||
|
||||
success = exit_code == 0
|
||||
|
||||
return PipelineResult(
|
||||
success=success,
|
||||
output=output,
|
||||
error=error if not success else None,
|
||||
)
|
||||
|
||||
finally:
|
||||
if auto_destroy:
|
||||
try:
|
||||
destroy_worker(worker_name, provider)
|
||||
except Exception:
|
||||
pass
|
||||
Reference in New Issue
Block a user