Add extract_all meta-pipeline: runs all four data source extractors in sequence

Sequences: extract (PSD) → extract_cot (CFTC) → extract_prices (KC=F) → extract_ice_all (ICE)
Stops and reports on first failure. META_PIPELINES dict makes it easy to
add more meta-pipelines as sources expand.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-21 22:00:10 +01:00
parent ff896685d2
commit 562e2d1847

View File

@@ -40,6 +40,10 @@ PIPELINES = {
"command": ["uv", "run", "--package", "ice_stocks", "extract_ice_all"], "command": ["uv", "run", "--package", "ice_stocks", "extract_ice_all"],
"timeout_seconds": 1800, "timeout_seconds": 1800,
}, },
"extract_all": {
"command": ["meta", "extract", "extract_cot", "extract_prices", "extract_ice_all"],
"timeout_seconds": 6300,
},
"transform": { "transform": {
"command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"], "command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"],
"timeout_seconds": 3600, "timeout_seconds": 3600,
@@ -47,6 +51,11 @@ PIPELINES = {
} }
META_PIPELINES: dict[str, list[str]] = {
"extract_all": ["extract", "extract_cot", "extract_prices", "extract_ice_all"],
}
def run_pipeline(pipeline_name: str) -> PipelineResult: def run_pipeline(pipeline_name: str) -> PipelineResult:
assert pipeline_name, "pipeline_name must not be empty" assert pipeline_name, "pipeline_name must not be empty"
@@ -57,6 +66,20 @@ def run_pipeline(pipeline_name: str) -> PipelineResult:
error=f"Unknown pipeline: {pipeline_name}. Available: {', '.join(PIPELINES.keys())}", error=f"Unknown pipeline: {pipeline_name}. Available: {', '.join(PIPELINES.keys())}",
) )
# Meta-pipelines run a sequence of sub-pipelines, stopping on first failure
if pipeline_name in META_PIPELINES:
combined_output = []
for sub in META_PIPELINES[pipeline_name]:
result = run_pipeline(sub)
combined_output.append(result.output)
if not result.success:
return PipelineResult(
success=False,
output="\n".join(combined_output),
error=f"Sub-pipeline '{sub}' failed: {result.error}",
)
return PipelineResult(success=True, output="\n".join(combined_output))
pipeline = PIPELINES[pipeline_name] pipeline = PIPELINES[pipeline_name]
timeout_seconds = pipeline["timeout_seconds"] timeout_seconds = pipeline["timeout_seconds"]