Files
beanflows/transform/sqlmesh_materia/macros/__init__.py
Deeman 08e74665bb feat(extract): add OpenWeatherMap daily weather extractor
Adds extract/openweathermap package with daily weather extraction for 8
coffee-growing regions (Brazil, Vietnam, Colombia, Ethiopia, Honduras,
Guatemala, Indonesia). Feeds crop stress signal for commodity sentiment score.

Extractor:
- OWM One Call API 3.0 / Day Summary — one JSON.gz per (location, date)
- extract_weather: daily, fetches yesterday + today (16 calls max)
- extract_weather_backfill: fills 2020-01-01 to yesterday, capped at 500
  calls/run with resume cursor '{location_id}:{date}' for crash safety
- Full idempotency via file existence check; state tracking via extract_core

SQLMesh:
- seeds.weather_locations (8 regions with lat/lon/variety)
- foundation.fct_weather_daily: INCREMENTAL_BY_TIME_RANGE, grain
  (location_id, observation_date), dedup via hash key, crop stress flags:
  is_frost (<2°C), is_heat_stress (>35°C), is_drought (<1mm), in_growing_season

Landing path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-25 22:40:27 +01:00

57 lines
2.1 KiB
Python

import os
from sqlmesh import macro
@macro()
def psd_glob(evaluator) -> str:
"""Return a quoted glob path for all PSD CSV gzip files under LANDING_DIR."""
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
return f"'{landing_dir}/psd/**/*.csv.gzip'"
@macro()
def cot_glob(evaluator) -> str:
"""Return a quoted glob path for all COT CSV gzip files under LANDING_DIR."""
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
return f"'{landing_dir}/cot/**/*.csv.gzip'"
@macro()
def prices_glob(evaluator) -> str:
"""Return a quoted glob path for all coffee price CSV gzip files under LANDING_DIR."""
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
return f"'{landing_dir}/prices/coffee_kc/**/*.csv.gzip'"
@macro()
def ice_stocks_glob(evaluator) -> str:
"""Return a quoted glob path for all ICE warehouse stock CSV gzip files under LANDING_DIR."""
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
return f"'{landing_dir}/ice_stocks/**/*.csv.gzip'"
@macro()
def ice_aging_glob(evaluator) -> str:
"""Return a quoted glob path for all ICE aging report CSV gzip files under LANDING_DIR."""
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
return f"'{landing_dir}/ice_aging/**/*.csv.gzip'"
@macro()
def ice_stocks_by_port_glob(evaluator) -> str:
"""Return a quoted glob path for all ICE historical by-port CSV gzip files under LANDING_DIR."""
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
return f"'{landing_dir}/ice_stocks_by_port/**/*.csv.gzip'"
@macro()
def weather_glob(evaluator) -> str:
"""Return a quoted glob path for all OWM weather JSON gzip files under LANDING_DIR.
Pattern: weather/{location_id}/{year}/{date}.json.gz
The double-star catches all location_id subdirectories.
"""
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
return f"'{landing_dir}/weather/**/*.json.gz'"