Files
beanflows/src/materia/pipelines.py
Deeman 08e74665bb feat(extract): add OpenWeatherMap daily weather extractor
Adds extract/openweathermap package with daily weather extraction for 8
coffee-growing regions (Brazil, Vietnam, Colombia, Ethiopia, Honduras,
Guatemala, Indonesia). Feeds crop stress signal for commodity sentiment score.

Extractor:
- OWM One Call API 3.0 / Day Summary — one JSON.gz per (location, date)
- extract_weather: daily, fetches yesterday + today (16 calls max)
- extract_weather_backfill: fills 2020-01-01 to yesterday, capped at 500
  calls/run with resume cursor '{location_id}:{date}' for crash safety
- Full idempotency via file existence check; state tracking via extract_core

SQLMesh:
- seeds.weather_locations (8 regions with lat/lon/variety)
- foundation.fct_weather_daily: INCREMENTAL_BY_TIME_RANGE, grain
  (location_id, observation_date), dedup via hash key, crop stress flags:
  is_frost (<2°C), is_heat_stress (>35°C), is_drought (<1mm), in_growing_season

Landing path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-25 22:40:27 +01:00

120 lines
4.1 KiB
Python

"""Pipeline execution via local subprocess."""
import subprocess
from dataclasses import dataclass
@dataclass
class PipelineResult:
success: bool
output: str
error: str | None = None
PIPELINES = {
"extract": {
"command": ["uv", "run", "--package", "psdonline", "extract_psd"],
"timeout_seconds": 1800,
},
"extract_cot": {
"command": ["uv", "run", "--package", "cftc_cot", "extract_cot"],
"timeout_seconds": 1800,
},
"extract_prices": {
"command": ["uv", "run", "--package", "coffee_prices", "extract_prices"],
"timeout_seconds": 300,
},
"extract_ice": {
"command": ["uv", "run", "--package", "ice_stocks", "extract_ice"],
"timeout_seconds": 600,
},
"extract_ice_aging": {
"command": ["uv", "run", "--package", "ice_stocks", "extract_ice_aging"],
"timeout_seconds": 600,
},
"extract_ice_historical": {
"command": ["uv", "run", "--package", "ice_stocks", "extract_ice_historical"],
"timeout_seconds": 600,
},
"extract_ice_all": {
"command": ["uv", "run", "--package", "ice_stocks", "extract_ice_all"],
"timeout_seconds": 1800,
},
"extract_weather": {
"command": ["uv", "run", "--package", "openweathermap", "extract_weather"],
"timeout_seconds": 300,
},
"extract_weather_backfill": {
"command": ["uv", "run", "--package", "openweathermap", "extract_weather_backfill"],
"timeout_seconds": 1200,
},
"extract_all": {
"command": ["meta", "extract", "extract_cot", "extract_prices", "extract_ice_all", "extract_weather"],
"timeout_seconds": 6600,
},
"transform": {
"command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"],
"timeout_seconds": 3600,
},
# Copies serving.* tables from lakehouse.duckdb → serving.duckdb (atomic swap).
# Run after every transform. Requires both DUCKDB_PATH and SERVING_DUCKDB_PATH.
"export_serving": {
"command": ["uv", "run", "python", "-c",
"import logging; logging.basicConfig(level=logging.INFO); "
"from materia.export_serving import export_serving; export_serving()"],
"timeout_seconds": 300,
},
}
META_PIPELINES: dict[str, list[str]] = {
"extract_all": ["extract", "extract_cot", "extract_prices", "extract_ice_all", "extract_weather"],
}
def run_pipeline(pipeline_name: str) -> PipelineResult:
assert pipeline_name, "pipeline_name must not be empty"
if pipeline_name not in PIPELINES:
return PipelineResult(
success=False,
output="",
error=f"Unknown pipeline: {pipeline_name}. Available: {', '.join(PIPELINES.keys())}",
)
# Meta-pipelines run a sequence of sub-pipelines, stopping on first failure
if pipeline_name in META_PIPELINES:
combined_output = []
for sub in META_PIPELINES[pipeline_name]:
result = run_pipeline(sub)
combined_output.append(result.output)
if not result.success:
return PipelineResult(
success=False,
output="\n".join(combined_output),
error=f"Sub-pipeline '{sub}' failed: {result.error}",
)
return PipelineResult(success=True, output="\n".join(combined_output))
pipeline = PIPELINES[pipeline_name]
timeout_seconds = pipeline["timeout_seconds"]
try:
result = subprocess.run(
pipeline["command"],
capture_output=True,
text=True,
timeout=timeout_seconds,
)
return PipelineResult(
success=result.returncode == 0,
output=result.stdout,
error=result.stderr if result.returncode != 0 else None,
)
except subprocess.TimeoutExpired:
return PipelineResult(
success=False,
output="",
error=f"Pipeline '{pipeline_name}' timed out after {timeout_seconds} seconds",
)