diff --git a/src/materia/pipelines.py b/src/materia/pipelines.py index 943ec1c..b193ad4 100644 --- a/src/materia/pipelines.py +++ b/src/materia/pipelines.py @@ -40,6 +40,10 @@ PIPELINES = { "command": ["uv", "run", "--package", "ice_stocks", "extract_ice_all"], "timeout_seconds": 1800, }, + "extract_all": { + "command": ["meta", "extract", "extract_cot", "extract_prices", "extract_ice_all"], + "timeout_seconds": 6300, + }, "transform": { "command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"], "timeout_seconds": 3600, @@ -47,6 +51,11 @@ PIPELINES = { } +META_PIPELINES: dict[str, list[str]] = { + "extract_all": ["extract", "extract_cot", "extract_prices", "extract_ice_all"], +} + + def run_pipeline(pipeline_name: str) -> PipelineResult: assert pipeline_name, "pipeline_name must not be empty" @@ -57,6 +66,20 @@ def run_pipeline(pipeline_name: str) -> PipelineResult: error=f"Unknown pipeline: {pipeline_name}. Available: {', '.join(PIPELINES.keys())}", ) + # Meta-pipelines run a sequence of sub-pipelines, stopping on first failure + if pipeline_name in META_PIPELINES: + combined_output = [] + for sub in META_PIPELINES[pipeline_name]: + result = run_pipeline(sub) + combined_output.append(result.output) + if not result.success: + return PipelineResult( + success=False, + output="\n".join(combined_output), + error=f"Sub-pipeline '{sub}' failed: {result.error}", + ) + return PipelineResult(success=True, output="\n".join(combined_output)) + pipeline = PIPELINES[pipeline_name] timeout_seconds = pipeline["timeout_seconds"]