From 562e2d1847bc5fae500236efbed7d7c0a716dcd6 Mon Sep 17 00:00:00 2001 From: Deeman Date: Sat, 21 Feb 2026 22:00:10 +0100 Subject: [PATCH] Add extract_all meta-pipeline: runs all four data source extractors in sequence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sequences: extract (PSD) → extract_cot (CFTC) → extract_prices (KC=F) → extract_ice_all (ICE) Stops and reports on first failure. META_PIPELINES dict makes it easy to add more meta-pipelines as sources expand. Co-Authored-By: Claude Sonnet 4.6 --- src/materia/pipelines.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/materia/pipelines.py b/src/materia/pipelines.py index 943ec1c..b193ad4 100644 --- a/src/materia/pipelines.py +++ b/src/materia/pipelines.py @@ -40,6 +40,10 @@ PIPELINES = { "command": ["uv", "run", "--package", "ice_stocks", "extract_ice_all"], "timeout_seconds": 1800, }, + "extract_all": { + "command": ["meta", "extract", "extract_cot", "extract_prices", "extract_ice_all"], + "timeout_seconds": 6300, + }, "transform": { "command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"], "timeout_seconds": 3600, @@ -47,6 +51,11 @@ PIPELINES = { } +META_PIPELINES: dict[str, list[str]] = { + "extract_all": ["extract", "extract_cot", "extract_prices", "extract_ice_all"], +} + + def run_pipeline(pipeline_name: str) -> PipelineResult: assert pipeline_name, "pipeline_name must not be empty" @@ -57,6 +66,20 @@ def run_pipeline(pipeline_name: str) -> PipelineResult: error=f"Unknown pipeline: {pipeline_name}. Available: {', '.join(PIPELINES.keys())}", ) + # Meta-pipelines run a sequence of sub-pipelines, stopping on first failure + if pipeline_name in META_PIPELINES: + combined_output = [] + for sub in META_PIPELINES[pipeline_name]: + result = run_pipeline(sub) + combined_output.append(result.output) + if not result.success: + return PipelineResult( + success=False, + output="\n".join(combined_output), + error=f"Sub-pipeline '{sub}' failed: {result.error}", + ) + return PipelineResult(success=True, output="\n".join(combined_output)) + pipeline = PIPELINES[pipeline_name] timeout_seconds = pipeline["timeout_seconds"]