"""Pipeline execution via local subprocess.""" import subprocess from dataclasses import dataclass @dataclass class PipelineResult: success: bool output: str error: str | None = None PIPELINES = { "extract": { "command": ["uv", "run", "--package", "psdonline", "extract_psd"], "timeout_seconds": 1800, }, "extract_cot": { "command": ["uv", "run", "--package", "cftc_cot", "extract_cot"], "timeout_seconds": 1800, }, "extract_prices": { "command": ["uv", "run", "--package", "coffee_prices", "extract_prices"], "timeout_seconds": 300, }, "extract_ice": { "command": ["uv", "run", "--package", "ice_stocks", "extract_ice"], "timeout_seconds": 600, }, "extract_ice_aging": { "command": ["uv", "run", "--package", "ice_stocks", "extract_ice_aging"], "timeout_seconds": 600, }, "extract_ice_historical": { "command": ["uv", "run", "--package", "ice_stocks", "extract_ice_historical"], "timeout_seconds": 600, }, "extract_ice_all": { "command": ["uv", "run", "--package", "ice_stocks", "extract_ice_all"], "timeout_seconds": 1800, }, "extract_weather": { "command": ["uv", "run", "--package", "openmeteo", "extract_weather"], "timeout_seconds": 120, }, "extract_weather_backfill": { "command": ["uv", "run", "--package", "openmeteo", "extract_weather_backfill"], "timeout_seconds": 120, }, "extract_all": { "command": ["meta", "extract", "extract_cot", "extract_prices", "extract_ice_all", "extract_weather"], "timeout_seconds": 6600, }, "transform": { "command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"], "timeout_seconds": 3600, }, # Copies serving.* tables from lakehouse.duckdb → serving.duckdb (atomic swap). # Run after every transform. Requires both DUCKDB_PATH and SERVING_DUCKDB_PATH. "export_serving": { "command": ["uv", "run", "python", "-c", "import logging; logging.basicConfig(level=logging.INFO); " "from materia.export_serving import export_serving; export_serving()"], "timeout_seconds": 300, }, } META_PIPELINES: dict[str, list[str]] = { "extract_all": ["extract", "extract_cot", "extract_prices", "extract_ice_all", "extract_weather"], } def run_pipeline(pipeline_name: str) -> PipelineResult: assert pipeline_name, "pipeline_name must not be empty" if pipeline_name not in PIPELINES: return PipelineResult( success=False, output="", error=f"Unknown pipeline: {pipeline_name}. Available: {', '.join(PIPELINES.keys())}", ) # Meta-pipelines run a sequence of sub-pipelines, stopping on first failure if pipeline_name in META_PIPELINES: combined_output = [] for sub in META_PIPELINES[pipeline_name]: result = run_pipeline(sub) combined_output.append(result.output) if not result.success: return PipelineResult( success=False, output="\n".join(combined_output), error=f"Sub-pipeline '{sub}' failed: {result.error}", ) return PipelineResult(success=True, output="\n".join(combined_output)) pipeline = PIPELINES[pipeline_name] timeout_seconds = pipeline["timeout_seconds"] try: result = subprocess.run( pipeline["command"], capture_output=True, text=True, timeout=timeout_seconds, ) return PipelineResult( success=result.returncode == 0, output=result.stdout, error=result.stderr if result.returncode != 0 else None, ) except subprocess.TimeoutExpired: return PipelineResult( success=False, output="", error=f"Pipeline '{pipeline_name}' timed out after {timeout_seconds} seconds", )