From 9de3a3ba0112d6a6cf4986b3cbcde60e725bb579 Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 00:59:54 +0100 Subject: [PATCH] feat(extract): replace OpenWeatherMap with Open-Meteo weather extractor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaced the OWM extractor (8 locations, API key required, 14,600-call backfill over 30+ days) with Open-Meteo (12 locations, no API key, ERA5 reanalysis, full backfill in 12 API calls ~30 seconds). - Rename extract/openweathermap → extract/openmeteo (git mv) - Rewrite api.py: fetch_archive (ERA5, date-range) + fetch_recent (forecast, past_days=10 to cover ERA5 lag); 9 daily variables incl. et0 and VPD - Rewrite execute.py: _split_and_write() unzips parallel arrays into per-day flat JSON; no cursor / rate limiting / call cap needed - Update pipelines.py: --package openmeteo, timeout 120s (was 1200s) - Update fct_weather_daily.sql: flat Open-Meteo field names (temperature_2m_* etc.), remove pressure_afternoon_hpa, add et0_mm + vpd_max_kpa + is_high_vpd - Remove OPENWEATHERMAP_API_KEY from CLAUDE.md env vars table Co-Authored-By: Claude Sonnet 4.6 --- CLAUDE.md | 5 +- extract/openmeteo/pyproject.toml | 20 ++ extract/openmeteo/src/openmeteo/__init__.py | 1 + extract/openmeteo/src/openmeteo/api.py | 116 ++++++ extract/openmeteo/src/openmeteo/execute.py | 212 +++++++++++ .../src/openmeteo}/locations.py | 0 extract/openweathermap/pyproject.toml | 20 -- .../src/openweathermap/__init__.py | 0 .../openweathermap/src/openweathermap/api.py | 76 ---- .../src/openweathermap/execute.py | 330 ------------------ pyproject.toml | 2 +- src/materia/pipelines.py | 8 +- .../models/foundation/fct_weather_daily.sql | 80 +++-- 13 files changed, 412 insertions(+), 458 deletions(-) create mode 100644 extract/openmeteo/pyproject.toml create mode 100644 extract/openmeteo/src/openmeteo/__init__.py create mode 100644 extract/openmeteo/src/openmeteo/api.py create mode 100644 extract/openmeteo/src/openmeteo/execute.py rename extract/{openweathermap/src/openweathermap => openmeteo/src/openmeteo}/locations.py (100%) delete mode 100644 extract/openweathermap/pyproject.toml delete mode 100644 extract/openweathermap/src/openweathermap/__init__.py delete mode 100644 extract/openweathermap/src/openweathermap/api.py delete mode 100644 extract/openweathermap/src/openweathermap/execute.py diff --git a/CLAUDE.md b/CLAUDE.md index 050d387..1431444 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -44,7 +44,7 @@ uv run materia secrets get **Workspace packages** (`pyproject.toml` → `tool.uv.workspace`): - `extract/psdonline/` — Downloads USDA PSD Online data, normalizes ZIP→gzip CSV, writes to local landing directory -- `extract/openweathermap/` — Daily weather for 8 coffee-growing regions (OWM One Call API 3.0) +- `extract/openmeteo/` — Daily weather for 12 coffee-growing regions (Open-Meteo, ERA5 reanalysis, no API key) - `transform/sqlmesh_materia/` — 3-layer SQL transformation pipeline (local DuckDB) - `src/materia/` — CLI (Typer) for pipeline execution, worker management, secrets - `web/` — Future web frontend @@ -52,7 +52,7 @@ uv run materia secrets get **Data flow:** ``` USDA API → extract → /data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip -OWM API → extract → /data/materia/landing/weather/{location_id}/{year}/{date}.json.gz +Open-Meteo → extract → /data/materia/landing/weather/{location_id}/{year}/{date}.json.gz → rclone cron syncs landing/ to R2 → SQLMesh staging → foundation → serving → /data/materia/lakehouse.duckdb → Web app reads lakehouse.duckdb (read-only) @@ -101,4 +101,3 @@ Read `coding_philosophy.md` for the full guide. Key points: |----------|---------|-------------| | `LANDING_DIR` | `data/landing` | Root directory for extracted landing data | | `DUCKDB_PATH` | `local.duckdb` | Path to the DuckDB lakehouse database | -| `OPENWEATHERMAP_API_KEY` | — | OWM One Call API 3.0 key (required for weather extraction) | diff --git a/extract/openmeteo/pyproject.toml b/extract/openmeteo/pyproject.toml new file mode 100644 index 0000000..f4bf392 --- /dev/null +++ b/extract/openmeteo/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "openmeteo" +version = "0.1.0" +description = "Open-Meteo daily weather extractor for coffee-growing regions" +requires-python = ">=3.13" +dependencies = [ + "extract_core", + "niquests>=3.14.1", +] + +[project.scripts] +extract_weather = "openmeteo.execute:extract_weather" +extract_weather_backfill = "openmeteo.execute:extract_weather_backfill" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/openmeteo"] diff --git a/extract/openmeteo/src/openmeteo/__init__.py b/extract/openmeteo/src/openmeteo/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/extract/openmeteo/src/openmeteo/__init__.py @@ -0,0 +1 @@ + diff --git a/extract/openmeteo/src/openmeteo/api.py b/extract/openmeteo/src/openmeteo/api.py new file mode 100644 index 0000000..59b0199 --- /dev/null +++ b/extract/openmeteo/src/openmeteo/api.py @@ -0,0 +1,116 @@ +"""Open-Meteo weather API client. + +Two endpoints: + + fetch_archive(session, lat, lon, start_date, end_date) -> dict + ERA5 reanalysis data — consistent, scientifically validated. + Available from 1940 to ~5 days ago (reanalysis processing lag). + Use for historical backfill. + + fetch_recent(session, lat, lon, past_days) -> dict + Forecast model blended with recent observations. + Covers the last N days + today (fills the ERA5 lag window). + Use for daily updates. + +Both return the same structure: + { + "daily": { + "time": ["2020-01-01", "2020-01-02", ...], + "temperature_2m_max": [28.5, 27.1, ...], + ... + } + } + +No API key required. No rate limits for reasonable usage (~12 calls/day). +""" + +import niquests + +ARCHIVE_URL = "https://archive-api.open-meteo.com/v1/archive" +FORECAST_URL = "https://api.open-meteo.com/v1/forecast" +HTTP_TIMEOUT_SECONDS = 60 +MAX_RESPONSE_BYTES = 2_000_000 # multi-year response ~200 KB; 2 MB is generous + +# Variables fetched for each location. All metric units. +# wind_speed_unit=ms ensures m/s (Open-Meteo default is km/h). +DAILY_VARIABLES = ",".join([ + "temperature_2m_max", + "temperature_2m_min", + "temperature_2m_mean", + "precipitation_sum", + "wind_speed_10m_max", + "relative_humidity_2m_max", + "cloud_cover_mean", + "et0_fao_evapotranspiration", # FAO Penman-Monteith ET — direct crop water demand signal + "vapour_pressure_deficit_max", # VPD >1.5 kPa = significant plant water stress +]) + + +def _get(session: niquests.Session, url: str, params: dict) -> dict: + """GET url, validate HTTP 200, return parsed JSON dict.""" + response = session.get(url, params=params, timeout=HTTP_TIMEOUT_SECONDS) + + assert response.status_code == 200, ( + f"Open-Meteo returned HTTP {response.status_code}: {response.text[:300]}" + ) + assert len(response.content) <= MAX_RESPONSE_BYTES, ( + f"Open-Meteo response unexpectedly large: {len(response.content):,} bytes" + ) + + data = response.json() + assert isinstance(data, dict), f"Expected dict, got {type(data)}" + + # Open-Meteo signals some errors as HTTP 200 with error=true in body + if data.get("error"): + raise ValueError(f"Open-Meteo API error: {data.get('reason', data)}") + + assert "daily" in data, f"Open-Meteo response missing 'daily' key: {list(data.keys())}" + assert "time" in data["daily"], "Open-Meteo 'daily' missing 'time' array" + + return data + + +def fetch_archive( + session: niquests.Session, + lat: float, + lon: float, + start_date: str, + end_date: str, +) -> dict: + """Fetch ERA5 reanalysis daily data for a date range (YYYY-MM-DD strings).""" + assert start_date and len(start_date) == 10, f"start_date must be YYYY-MM-DD, got {start_date!r}" + assert end_date and len(end_date) == 10, f"end_date must be YYYY-MM-DD, got {end_date!r}" + assert start_date <= end_date, f"start_date {start_date} must be <= end_date {end_date}" + + return _get(session, ARCHIVE_URL, { + "latitude": lat, + "longitude": lon, + "start_date": start_date, + "end_date": end_date, + "daily": DAILY_VARIABLES, + "wind_speed_unit": "ms", + "timezone": "UTC", + }) + + +def fetch_recent( + session: niquests.Session, + lat: float, + lon: float, + past_days: int = 10, +) -> dict: + """Fetch recent weather via Open-Meteo forecast model (fills ERA5 lag window). + + past_days=10 captures the ~5-day ERA5 lag plus buffer for missed daily runs. + """ + assert 1 <= past_days <= 92, f"past_days must be 1-92, got {past_days}" + + return _get(session, FORECAST_URL, { + "latitude": lat, + "longitude": lon, + "daily": DAILY_VARIABLES, + "wind_speed_unit": "ms", + "timezone": "UTC", + "past_days": past_days, + "forecast_days": 1, + }) diff --git a/extract/openmeteo/src/openmeteo/execute.py b/extract/openmeteo/src/openmeteo/execute.py new file mode 100644 index 0000000..c722bc9 --- /dev/null +++ b/extract/openmeteo/src/openmeteo/execute.py @@ -0,0 +1,212 @@ +"""Open-Meteo daily weather extraction for coffee-growing regions. + +Two entry points: + + extract_weather() + Daily run: fetches the last 10 days for all 12 locations. + 10 days covers the ~5-day ERA5 reanalysis lag plus buffer for missed runs. + Uses the forecast API (fills the recent window not yet in ERA5 archive). + 12 API calls total. Completes in ~10 seconds. + + extract_weather_backfill() + Historical fill: fetches 2020-01-01 → yesterday for all 12 locations. + Uses the archive API. One date-range request per location = 12 total calls. + Completes in ~30 seconds. No cursor needed. + +Landing path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz + +Each file is a flat JSON object with Open-Meteo variable names: + {"date": "2020-01-01", "temperature_2m_max": 28.5, "precipitation_sum": 12.5, ...} + +No API key required. No rate limiting. Fully idempotent (file existence check). +""" + +import gzip +import json +import logging +import os +import sys +import time +from datetime import date, timedelta +from pathlib import Path + +import niquests +from extract_core import end_run, landing_path, open_state_db, start_run, write_bytes_atomic + +from openmeteo.api import fetch_archive, fetch_recent +from openmeteo.locations import LOCATIONS + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger("Open-Meteo Extractor") + +LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing")) +LANDING_SUBDIR = "weather" + +EXTRACTOR_DAILY = "openmeteo_daily" +EXTRACTOR_BACKFILL = "openmeteo_backfill" + +BACKFILL_START = date(2020, 1, 1) + +# Small sleep between location calls — polite usage of the free community API. +SLEEP_BETWEEN_LOCATIONS_SECONDS = 0.5 + + +# ── helpers ─────────────────────────────────────────────────────────────────── + +def _write_day_file(location_id: str, date_str: str, record: dict) -> int: + """Write one day's weather record as gzipped JSON. Returns bytes written, or 0 if skipped.""" + assert location_id and date_str and record + + year = date_str[:4] + dest_dir = landing_path(LANDING_DIR, LANDING_SUBDIR, location_id, year) + local_file = dest_dir / f"{date_str}.json.gz" + + if local_file.exists(): + return 0 + + compressed = gzip.compress(json.dumps(record, separators=(",", ":")).encode("utf-8")) + bytes_written = write_bytes_atomic(local_file, compressed) + logger.debug(f"Stored {local_file} ({bytes_written:,} bytes)") + return bytes_written + + +def _split_and_write(location_id: str, response: dict) -> tuple[int, int, int]: + """Split an Open-Meteo array response into per-day JSON.gz files. + + Open-Meteo returns parallel arrays per variable under response['daily']['time']. + We zip these into one flat dict per day and write each as a separate file. + + Returns (files_written, files_skipped, bytes_written). + """ + daily = response["daily"] + dates = daily["time"] + variables = [k for k in daily if k != "time"] + + files_written = 0 + files_skipped = 0 + bytes_written_total = 0 + + for i, date_str in enumerate(dates): + if not date_str: + continue + + record = {"date": date_str} + for var in variables: + values = daily[var] + record[var] = values[i] if i < len(values) else None + + bw = _write_day_file(location_id, date_str, record) + if bw > 0: + files_written += 1 + bytes_written_total += bw + else: + files_skipped += 1 + + return files_written, files_skipped, bytes_written_total + + +# ── daily extractor ─────────────────────────────────────────────────────────── + +def extract_weather() -> None: + """Fetch the last 10 days of weather for all 12 locations. + + Uses Open-Meteo forecast API (past_days=10). 12 API calls. ~10 seconds. + """ + conn = open_state_db(LANDING_DIR) + run_id = start_run(conn, EXTRACTOR_DAILY) + files_written = 0 + files_skipped = 0 + bytes_written_total = 0 + + try: + with niquests.Session() as session: + for loc in LOCATIONS: + logger.info(f"Fetching recent: {loc['id']} ({loc['country']})") + response = fetch_recent(session, loc["lat"], loc["lon"], past_days=10) + w, s, bw = _split_and_write(loc["id"], response) + files_written += w + files_skipped += s + bytes_written_total += bw + time.sleep(SLEEP_BETWEEN_LOCATIONS_SECONDS) + + end_run( + conn, run_id, + status="success", + files_written=files_written, + files_skipped=files_skipped, + bytes_written=bytes_written_total, + cursor_value=date.today().isoformat(), + ) + logger.info(f"Daily weather complete: {files_written} new, {files_skipped} skipped") + except Exception as e: + end_run(conn, run_id, status="failed", error_message=str(e)) + raise + finally: + conn.close() + + +# ── backfill extractor ──────────────────────────────────────────────────────── + +def extract_weather_backfill() -> None: + """Fetch full weather history (2020-01-01 → yesterday) for all 12 locations. + + Uses Open-Meteo archive API (ERA5 reanalysis). One date-range request per + location = 12 calls total. Completes in ~30 seconds. + + Idempotent: per-day files already on disk are skipped when splitting the + response. Safe to re-run at any time — will skip what already exists. + """ + yesterday = (date.today() - timedelta(days=1)).isoformat() + start_date = BACKFILL_START.isoformat() + + conn = open_state_db(LANDING_DIR) + run_id = start_run(conn, EXTRACTOR_BACKFILL) + files_written = 0 + files_skipped = 0 + bytes_written_total = 0 + + try: + with niquests.Session() as session: + for loc in LOCATIONS: + logger.info( + f"Backfill {loc['id']} ({loc['country']}) " + f"{start_date} → {yesterday}" + ) + response = fetch_archive( + session, loc["lat"], loc["lon"], + start_date=start_date, + end_date=yesterday, + ) + w, s, bw = _split_and_write(loc["id"], response) + files_written += w + files_skipped += s + bytes_written_total += bw + logger.info(f" {loc['id']}: {w} new, {s} already existed") + time.sleep(SLEEP_BETWEEN_LOCATIONS_SECONDS) + + end_run( + conn, run_id, + status="success", + files_written=files_written, + files_skipped=files_skipped, + bytes_written=bytes_written_total, + cursor_value=yesterday, + ) + logger.info( + f"Backfill complete: {files_written} new files, " + f"{files_skipped} already existed" + ) + except Exception as e: + end_run(conn, run_id, status="failed", error_message=str(e)) + raise + finally: + conn.close() + + +if __name__ == "__main__": + extract_weather() diff --git a/extract/openweathermap/src/openweathermap/locations.py b/extract/openmeteo/src/openmeteo/locations.py similarity index 100% rename from extract/openweathermap/src/openweathermap/locations.py rename to extract/openmeteo/src/openmeteo/locations.py diff --git a/extract/openweathermap/pyproject.toml b/extract/openweathermap/pyproject.toml deleted file mode 100644 index b23e3a0..0000000 --- a/extract/openweathermap/pyproject.toml +++ /dev/null @@ -1,20 +0,0 @@ -[project] -name = "openweathermap" -version = "0.1.0" -description = "OpenWeatherMap daily weather extractor for coffee-growing regions" -requires-python = ">=3.13" -dependencies = [ - "extract_core", - "niquests>=3.14.1", -] - -[project.scripts] -extract_weather = "openweathermap.execute:extract_weather" -extract_weather_backfill = "openweathermap.execute:extract_weather_backfill" - -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[tool.hatch.build.targets.wheel] -packages = ["src/openweathermap"] diff --git a/extract/openweathermap/src/openweathermap/__init__.py b/extract/openweathermap/src/openweathermap/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/extract/openweathermap/src/openweathermap/api.py b/extract/openweathermap/src/openweathermap/api.py deleted file mode 100644 index 8a48c34..0000000 --- a/extract/openweathermap/src/openweathermap/api.py +++ /dev/null @@ -1,76 +0,0 @@ -"""Thin client for the OpenWeatherMap One Call API 3.0 — Day Summary endpoint. - -Endpoint: GET https://api.openweathermap.org/data/3.0/onecall/day_summary -Docs: https://openweathermap.org/api/one-call-3#history_daily_aggregation - -Returns one JSON object per (lat, lon, date) with daily aggregates: - temperature.{min,max,morning,afternoon,evening,night} - precipitation.total - humidity.afternoon - cloud_cover.afternoon - wind.max.{speed,direction} - pressure.afternoon - -This module contains only the HTTP call and basic response validation. -All business logic (file storage, rate limiting, cursor tracking) lives in execute.py. -""" - -import niquests - -OWM_BASE_URL = "https://api.openweathermap.org/data/3.0/onecall/day_summary" -HTTP_TIMEOUT_SECONDS = 30 -MAX_RESPONSE_BYTES = 10_000 # Day summary is ~500 bytes; 10 KB is a generous bound - - -class RateLimitError(Exception): - """Raised when OWM returns HTTP 429 (rate limit exceeded).""" - - -def fetch_day_summary( - session: niquests.Session, - lat: float, - lon: float, - date_str: str, - api_key: str, -) -> dict: - """Fetch the OWM One Call 3.0 day summary for a single (lat, lon, date). - - date_str must be YYYY-MM-DD format. - Returns the parsed JSON dict on success. - - Raises RateLimitError on HTTP 429 — caller is responsible for sleeping and retrying. - Raises AssertionError on any other non-200 status. - """ - assert api_key, "api_key must not be empty" - assert date_str and len(date_str) == 10, f"date_str must be YYYY-MM-DD, got {date_str!r}" - assert -90.0 <= lat <= 90.0, f"lat out of range: {lat}" - assert -180.0 <= lon <= 180.0, f"lon out of range: {lon}" - - response = session.get( - OWM_BASE_URL, - params={ - "lat": lat, - "lon": lon, - "date": date_str, - "appid": api_key, - "units": "metric", - }, - timeout=HTTP_TIMEOUT_SECONDS, - ) - - if response.status_code == 429: - raise RateLimitError(f"OWM rate limit hit for lat={lat} lon={lon} date={date_str}") - - assert response.status_code == 200, ( - f"OWM API returned HTTP {response.status_code} for " - f"lat={lat} lon={lon} date={date_str}: {response.text[:200]}" - ) - assert len(response.content) <= MAX_RESPONSE_BYTES, ( - f"OWM response unexpectedly large ({len(response.content)} bytes) for {date_str}" - ) - - data = response.json() - assert isinstance(data, dict), f"Expected dict response, got {type(data)}" - assert "date" in data, f"OWM response missing 'date' field: {list(data.keys())}" - - return data diff --git a/extract/openweathermap/src/openweathermap/execute.py b/extract/openweathermap/src/openweathermap/execute.py deleted file mode 100644 index 6466c1f..0000000 --- a/extract/openweathermap/src/openweathermap/execute.py +++ /dev/null @@ -1,330 +0,0 @@ -"""OpenWeatherMap daily weather extraction for coffee-growing regions. - -Two entry points: - - extract_weather() - Daily run: fetches yesterday + today for all 8 locations (16 calls max). - Yesterday is included to cover the midnight edge case — if the daily job - fires just after midnight UTC, today's OWM data may still be partial. - Idempotent: skips if the landing file already exists. - - extract_weather_backfill() - Historical fill: iterates (date, location) pairs from 2020-01-01 to - yesterday. Bounded to MAX_CALLS_PER_BACKFILL_RUN per run; re-run daily - to advance. Resumes from cursor on restart. - -Landing path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz - -Idempotency: file existence check. Past weather is immutable — (location_id, date) -uniquely identifies a file that never changes once written. - -Backfill cursor format: '{location_id}:{date}' (e.g. 'brazil_parana:2022-07-15'). -Encodes both dimensions so a mid-run crash resumes at the exact (location, date) pair. -""" - -import gzip -import json -import logging -import os -import sys -import time -from datetime import date, timedelta -from pathlib import Path - -import niquests -from extract_core import end_run, get_last_cursor, landing_path, open_state_db, start_run, write_bytes_atomic - -from openweathermap.api import RateLimitError, fetch_day_summary -from openweathermap.locations import LOCATIONS - -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - handlers=[logging.StreamHandler(sys.stdout)], -) -logger = logging.getLogger("OWM Weather Extractor") - -LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing")) -LANDING_SUBDIR = "weather" - -EXTRACTOR_DAILY = "owm_weather_daily" -EXTRACTOR_BACKFILL = "owm_weather_backfill" - -# Rate limiting: OWM free tier = 1000 calls/day (~0.7/s). -# 1.5s between calls stays comfortably below the limit for the daily run. -# 2.0s for backfill (more conservative, many sequential calls). -SLEEP_BETWEEN_CALLS_SECONDS = 1.5 -SLEEP_BETWEEN_BACKFILL_CALLS_SECONDS = 2.0 - -# On 429: wait 60s, then one retry. If still 429, abort the run. -SLEEP_ON_RATE_LIMIT_SECONDS = 60 -MAX_RATE_LIMIT_RETRIES = 1 - -# Cap backfill at 500 calls per run (~17 min at 2s/call). -# 5-year backfill = 14,600 calls → ~30 runs. Re-run daily until complete. -MAX_CALLS_PER_BACKFILL_RUN = 500 - - -# ── helpers ────────────────────────────────────────────────────────────────── - -def _write_weather_file(location_id: str, date_str: str, payload: dict) -> int: - """Gzip-compress payload JSON and write atomically to the landing zone. - - Returns bytes_written, or 0 if the file already exists (idempotent skip). - Path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz - """ - assert location_id, "location_id must not be empty" - assert date_str and len(date_str) == 10, f"date_str must be YYYY-MM-DD, got {date_str!r}" - assert isinstance(payload, dict) and payload, "payload must be a non-empty dict" - - year = date_str[:4] - dest_dir = landing_path(LANDING_DIR, LANDING_SUBDIR, location_id, year) - local_file = dest_dir / f"{date_str}.json.gz" - - if local_file.exists(): - logger.debug(f"Already exists, skipping: {local_file}") - return 0 - - compressed = gzip.compress(json.dumps(payload, separators=(",", ":")).encode("utf-8")) - bytes_written = write_bytes_atomic(local_file, compressed) - logger.info(f"Stored {local_file} ({bytes_written:,} bytes)") - return bytes_written - - -def _fetch_with_retry(session: niquests.Session, loc: dict, date_str: str, api_key: str) -> dict | None: - """Fetch OWM day summary with one 429-retry. - - Returns the JSON dict on success, or None if rate limit persists after retry. - """ - for attempt in range(MAX_RATE_LIMIT_RETRIES + 1): - try: - return fetch_day_summary(session, loc["lat"], loc["lon"], date_str, api_key) - except RateLimitError: - if attempt < MAX_RATE_LIMIT_RETRIES: - logger.warning( - f"Rate limit hit for {loc['id']} {date_str} — " - f"sleeping {SLEEP_ON_RATE_LIMIT_SECONDS}s before retry" - ) - time.sleep(SLEEP_ON_RATE_LIMIT_SECONDS) - else: - logger.error(f"Rate limit persisted after retry for {loc['id']} {date_str}") - return None - return None # unreachable; satisfies type checker - - -def _file_exists(location_id: str, date_str: str) -> bool: - year = date_str[:4] - return (LANDING_DIR / LANDING_SUBDIR / location_id / year / f"{date_str}.json.gz").exists() - - -# ── daily extractor ─────────────────────────────────────────────────────────── - -def extract_weather() -> None: - """Fetch yesterday + today weather for all 8 coffee-growing locations. - - Up to 16 API calls. Both days are skipped if files already exist, - so re-running costs zero API calls (fully idempotent). - """ - api_key = os.environ.get("OPENWEATHERMAP_API_KEY", "") - assert api_key, "OPENWEATHERMAP_API_KEY environment variable must be set" - - today = date.today() - yesterday = today - timedelta(days=1) - dates_to_fetch = [yesterday.isoformat(), today.isoformat()] - - conn = open_state_db(LANDING_DIR) - run_id = start_run(conn, EXTRACTOR_DAILY) - files_written = 0 - files_skipped = 0 - bytes_written_total = 0 - - try: - with niquests.Session() as session: - for date_str in dates_to_fetch: - for loc in LOCATIONS: - if _file_exists(loc["id"], date_str): - logger.info(f"Already exists: {loc['id']} {date_str}") - files_skipped += 1 - continue - - data = _fetch_with_retry(session, loc, date_str, api_key) - if data is None: - logger.error(f"Skipping {loc['id']} {date_str} after persistent rate limit") - continue - - bw = _write_weather_file(loc["id"], date_str, data) - if bw > 0: - files_written += 1 - bytes_written_total += bw - else: - files_skipped += 1 - - time.sleep(SLEEP_BETWEEN_CALLS_SECONDS) - - end_run( - conn, run_id, - status="success", - files_written=files_written, - files_skipped=files_skipped, - bytes_written=bytes_written_total, - cursor_value=today.isoformat(), - ) - logger.info(f"Daily weather complete: {files_written} new, {files_skipped} skipped") - except Exception as e: - end_run(conn, run_id, status="failed", error_message=str(e)) - raise - finally: - conn.close() - - -# ── backfill extractor ──────────────────────────────────────────────────────── - -def extract_weather_backfill() -> None: - """Fill historical weather data from 2020-01-01 to yesterday. - - Iterates (date, location) pairs in date-ascending, LOCATIONS-list order. - Bounded to MAX_CALLS_PER_BACKFILL_RUN per run — re-run daily to advance. - - Cursor format: '{location_id}:{date}' (e.g. 'brazil_parana:2022-07-15'). - Encodes both dimensions: on resume, all pairs at or before the cursor are - skipped (via cursor comparison first, then file-existence check). - - 5-year backfill (2020–2025) = 14,600 calls. At 500/run = ~30 runs. - - 429 handling: sleep 60s, one retry. If still 429, save cursor and exit - with status='failed' so the cursor does not advance beyond the last - successfully written pair. Safe to re-run the next day. - """ - api_key = os.environ.get("OPENWEATHERMAP_API_KEY", "") - assert api_key, "OPENWEATHERMAP_API_KEY environment variable must be set" - - start = date(2020, 1, 1) - end = date.today() - timedelta(days=1) # never fetch today in backfill - - conn = open_state_db(LANDING_DIR) - run_id = start_run(conn, EXTRACTOR_BACKFILL) - files_written = 0 - files_skipped = 0 - bytes_written_total = 0 - calls_made = 0 - last_cursor: str | None = None - - # Load resume cursor from last successful run - resume_cursor = get_last_cursor(conn, EXTRACTOR_BACKFILL) - if resume_cursor: - logger.info(f"Resuming backfill from cursor: {resume_cursor}") - else: - logger.info(f"Starting fresh backfill from {start.isoformat()}") - - # Parse cursor into (location_id, date_str) for skip comparison - resume_location_id: str | None = None - resume_date_str: str | None = None - if resume_cursor and ":" in resume_cursor: - resume_location_id, resume_date_str = resume_cursor.split(":", 1) - - location_ids = [loc["id"] for loc in LOCATIONS] - resume_loc_idx = -1 - if resume_location_id and resume_location_id in location_ids: - resume_loc_idx = location_ids.index(resume_location_id) - - try: - with niquests.Session() as session: - current = start - while current <= end: - date_str = current.isoformat() - - for loc in LOCATIONS: - loc_idx = location_ids.index(loc["id"]) - - # Cursor-based skip: (date, loc_idx) <= (resume_date, resume_loc_idx) - # This skips everything already processed in previous runs. - if resume_date_str: - if date_str < resume_date_str: - files_skipped += 1 - continue - if date_str == resume_date_str and loc_idx <= resume_loc_idx: - files_skipped += 1 - continue - - # File-existence check: idempotency guard for files already on disk - # (e.g. written by the daily extractor, or a previous partial run) - if _file_exists(loc["id"], date_str): - files_skipped += 1 - last_cursor = f"{loc['id']}:{date_str}" - continue - - # Per-run call cap - if calls_made >= MAX_CALLS_PER_BACKFILL_RUN: - logger.info( - f"Reached cap of {MAX_CALLS_PER_BACKFILL_RUN} calls. " - f"Re-run to continue from {last_cursor or resume_cursor}" - ) - end_run( - conn, run_id, - status="success", - files_written=files_written, - files_skipped=files_skipped, - bytes_written=bytes_written_total, - cursor_value=last_cursor or resume_cursor, - ) - return - - data = _fetch_with_retry(session, loc, date_str, api_key) - calls_made += 1 - - if data is None: - logger.warning(f"Persistent rate limit at {loc['id']} {date_str} — stopping run") - end_run( - conn, run_id, - status="failed", - files_written=files_written, - files_skipped=files_skipped, - bytes_written=bytes_written_total, - cursor_value=last_cursor or resume_cursor, - error_message="Persistent rate limit — resume from cursor", - ) - return - - bw = _write_weather_file(loc["id"], date_str, data) - if bw > 0: - files_written += 1 - bytes_written_total += bw - else: - files_skipped += 1 - - last_cursor = f"{loc['id']}:{date_str}" - time.sleep(SLEEP_BETWEEN_BACKFILL_CALLS_SECONDS) - - current += timedelta(days=1) - - final_cursor = last_cursor or resume_cursor or end.isoformat() - logger.info( - f"Backfill complete: {files_written} written, " - f"{files_skipped} skipped, {calls_made} API calls" - ) - end_run( - conn, run_id, - status="success", - files_written=files_written, - files_skipped=files_skipped, - bytes_written=bytes_written_total, - cursor_value=final_cursor, - ) - except Exception as e: - end_run( - conn, run_id, - status="failed", - files_written=files_written, - files_skipped=files_skipped, - bytes_written=bytes_written_total, - cursor_value=last_cursor or resume_cursor, - error_message=str(e), - ) - raise - finally: - conn.close() - - -if __name__ == "__main__": - extract_weather() diff --git a/pyproject.toml b/pyproject.toml index 9904cab..1467931 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,7 +46,7 @@ sqlmesh_materia = {workspace = true } cftc_cot = {workspace = true } coffee_prices = {workspace = true } ice_stocks = {workspace = true } -openweathermap = {workspace = true } +openmeteo = {workspace = true } [tool.uv.workspace] members = [ "extract/*", diff --git a/src/materia/pipelines.py b/src/materia/pipelines.py index 0f3d2f6..c7b9cee 100644 --- a/src/materia/pipelines.py +++ b/src/materia/pipelines.py @@ -41,12 +41,12 @@ PIPELINES = { "timeout_seconds": 1800, }, "extract_weather": { - "command": ["uv", "run", "--package", "openweathermap", "extract_weather"], - "timeout_seconds": 300, + "command": ["uv", "run", "--package", "openmeteo", "extract_weather"], + "timeout_seconds": 120, }, "extract_weather_backfill": { - "command": ["uv", "run", "--package", "openweathermap", "extract_weather_backfill"], - "timeout_seconds": 1200, + "command": ["uv", "run", "--package", "openmeteo", "extract_weather_backfill"], + "timeout_seconds": 120, }, "extract_all": { "command": ["meta", "extract", "extract_cot", "extract_prices", "extract_ice_all", "extract_weather"], diff --git a/transform/sqlmesh_materia/models/foundation/fct_weather_daily.sql b/transform/sqlmesh_materia/models/foundation/fct_weather_daily.sql index 020504e..6595fd9 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_weather_daily.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_weather_daily.sql @@ -1,4 +1,18 @@ -/* Foundation fact: daily weather observations for 8 coffee-growing regions. */ /* Source: OpenWeatherMap One Call API 3.0 / Day Summary */ /* Landing: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz */ /* One file per (location_id, date). Content: raw OWM day summary JSON. */ /* Each file is a single JSON object (not newline-delimited), so format='auto'. */ /* Grain: (location_id, observation_date) — one row per location per day. */ /* Dedup key: hash(location_id, date) — past weather is immutable. */ /* location_id is parsed from the filename path: split(filename, '/')[-3] */ /* Path structure: .../weather/{location_id}/{year}/{date}.json.gz */ /* Crop stress flags (agronomic thresholds for Arabica coffee): */ /* is_frost — temp_min_c < 2.0°C (ICO frost damage threshold) */ /* is_heat_stress — temp_max_c > 35.0°C (photosynthesis impairment) */ /* is_drought — precipitation_mm < 1.0 (dry day; OWM omits field when 0) */ /* in_growing_season — simplified month-range flag by variety */ +/* Foundation fact: daily weather observations for 12 coffee-growing regions. */ +/* Source: Open-Meteo (ERA5 reanalysis archive + forecast model for recent days) */ +/* Landing: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz */ +/* One file per (location_id, date). Content: flat Open-Meteo JSON per day. */ +/* Open-Meteo returns parallel arrays; execute.py splits them into per-day files. */ +/* Grain: (location_id, observation_date) — one row per location per day. */ +/* Dedup key: hash(location_id, date) — past weather is immutable. */ +/* location_id is parsed from filename: split(filename, '/')[-3] */ +/* Path structure: .../weather/{location_id}/{year}/{date}.json.gz */ +/* Crop stress flags: */ +/* is_frost — temp_min_c < 2.0°C (ICO Arabica frost damage threshold) */ +/* is_heat_stress — temp_max_c > 35.0°C (photosynthesis impairment) */ +/* is_drought — precipitation_mm < 1.0 (agronomic dry day) */ +/* is_high_vpd — vpd_max_kpa > 1.5 (significant plant water stress) */ +/* in_growing_season — simplified month-range flag by variety */ MODEL ( name foundation.fct_weather_daily, kind INCREMENTAL_BY_TIME_RANGE ( @@ -10,31 +24,43 @@ MODEL ( ); WITH src AS ( - /* Each file is a single JSON object with nested fields: */ /* temperature.{min,max,afternoon,morning,evening,night} */ /* precipitation.total (absent when 0 — COALESCE to 0 downstream) */ /* humidity.afternoon */ /* cloud_cover.afternoon */ /* wind.max.{speed,direction} */ /* pressure.afternoon */ /* DuckDB read_json(format='auto') creates STRUCT columns for nested objects; */ /* fields are accessed with dot notation (temperature.min, wind.max.speed). */ + /* Open-Meteo files are flat JSON: all variables at top level (no nested structs). */ + /* read_json(format='auto') infers column types directly from the numeric values. */ SELECT * FROM READ_JSON(@weather_glob(), format = 'auto', compression = 'gzip', filename = TRUE) ), located AS ( SELECT src.*, - STR_SPLIT(filename, '/')[-3] AS location_id, /* location_id is the 3rd-from-last path segment: */ /* e.g. .../weather/brazil_minas_gerais/2024/2024-01-15.json.gz → 'brazil_minas_gerais' */ + STR_SPLIT(filename, '/')[-3] AS location_id, /* location_id is the 3rd-from-last path segment: */ + /* e.g. .../weather/brazil_minas_gerais/2024/2024-01-15.json.gz → 'brazil_minas_gerais' */ TRY_CAST(src."date" AS DATE) AS observation_date FROM src ), cast_and_clean AS ( SELECT location_id, observation_date, - TRY_CAST(located.temperature.min AS DOUBLE) AS temp_min_c, /* Temperature (°C, metric units) */ - TRY_CAST(located.temperature.max AS DOUBLE) AS temp_max_c, - TRY_CAST(located.temperature.afternoon AS DOUBLE) AS temp_afternoon_c, - COALESCE(TRY_CAST(located.precipitation.total AS DOUBLE), 0.0) AS precipitation_mm, /* Precipitation (mm total for the day; OWM omits field when 0) */ - TRY_CAST(located.humidity.afternoon AS DOUBLE) AS humidity_afternoon_pct, /* Humidity (% afternoon reading) */ - TRY_CAST(located.cloud_cover.afternoon AS DOUBLE) AS cloud_cover_afternoon_pct, /* Cloud cover (% afternoon) */ - TRY_CAST(located.wind.max.speed AS DOUBLE) AS wind_max_speed_ms, /* Wind (m/s max speed, degrees direction) */ - TRY_CAST(located.pressure.afternoon AS DOUBLE) AS pressure_afternoon_hpa, /* Pressure (hPa afternoon) */ - TRY_CAST(located.temperature.min AS DOUBLE) /* Crop stress flags */ < 2.0 AS is_frost, - TRY_CAST(located.temperature.max AS DOUBLE) > 35.0 AS is_heat_stress, - COALESCE(TRY_CAST(located.precipitation.total AS DOUBLE), 0.0) < 1.0 AS is_drought, + /* Temperature (°C) */ + TRY_CAST(located.temperature_2m_min AS DOUBLE) AS temp_min_c, + TRY_CAST(located.temperature_2m_max AS DOUBLE) AS temp_max_c, + TRY_CAST(located.temperature_2m_mean AS DOUBLE) AS temp_mean_c, + /* Precipitation (mm total for the day) */ + COALESCE(TRY_CAST(located.precipitation_sum AS DOUBLE), 0.0) AS precipitation_mm, + /* Humidity (% — daily max) */ + TRY_CAST(located.relative_humidity_2m_max AS DOUBLE) AS humidity_max_pct, + /* Cloud cover (% — daily mean) */ + TRY_CAST(located.cloud_cover_mean AS DOUBLE) AS cloud_cover_mean_pct, + /* Wind (m/s max — Open-Meteo requested with wind_speed_unit=ms) */ + TRY_CAST(located.wind_speed_10m_max AS DOUBLE) AS wind_max_speed_ms, + /* ET₀ (mm/day — FAO Penman-Monteith; direct crop water demand signal) */ + TRY_CAST(located.et0_fao_evapotranspiration AS DOUBLE) AS et0_mm, + /* VPD (kPa — max; >1.5 kPa = significant plant water stress) */ + TRY_CAST(located.vapour_pressure_deficit_max AS DOUBLE) AS vpd_max_kpa, + /* Crop stress flags */ + TRY_CAST(located.temperature_2m_min AS DOUBLE) < 2.0 AS is_frost, + TRY_CAST(located.temperature_2m_max AS DOUBLE) > 35.0 AS is_heat_stress, + COALESCE(TRY_CAST(located.precipitation_sum AS DOUBLE), 0.0) < 1.0 AS is_drought, + TRY_CAST(located.vapour_pressure_deficit_max AS DOUBLE) > 1.5 AS is_high_vpd, HASH(location_id, src."date") AS hkey, filename FROM located @@ -46,15 +72,17 @@ WITH src AS ( ANY_VALUE(observation_date) AS observation_date, ANY_VALUE(temp_min_c) AS temp_min_c, ANY_VALUE(temp_max_c) AS temp_max_c, - ANY_VALUE(temp_afternoon_c) AS temp_afternoon_c, + ANY_VALUE(temp_mean_c) AS temp_mean_c, ANY_VALUE(precipitation_mm) AS precipitation_mm, - ANY_VALUE(humidity_afternoon_pct) AS humidity_afternoon_pct, - ANY_VALUE(cloud_cover_afternoon_pct) AS cloud_cover_afternoon_pct, + ANY_VALUE(humidity_max_pct) AS humidity_max_pct, + ANY_VALUE(cloud_cover_mean_pct) AS cloud_cover_mean_pct, ANY_VALUE(wind_max_speed_ms) AS wind_max_speed_ms, - ANY_VALUE(pressure_afternoon_hpa) AS pressure_afternoon_hpa, + ANY_VALUE(et0_mm) AS et0_mm, + ANY_VALUE(vpd_max_kpa) AS vpd_max_kpa, ANY_VALUE(is_frost) AS is_frost, ANY_VALUE(is_heat_stress) AS is_heat_stress, ANY_VALUE(is_drought) AS is_drought, + ANY_VALUE(is_high_vpd) AS is_high_vpd, hkey FROM cast_and_clean GROUP BY @@ -70,24 +98,28 @@ SELECT loc.variety, d.temp_min_c, d.temp_max_c, - d.temp_afternoon_c, + d.temp_mean_c, d.precipitation_mm, - d.humidity_afternoon_pct, - d.cloud_cover_afternoon_pct, + d.humidity_max_pct, + d.cloud_cover_mean_pct, d.wind_max_speed_ms, - d.pressure_afternoon_hpa, + d.et0_mm, + d.vpd_max_kpa, d.is_frost, d.is_heat_stress, d.is_drought, + d.is_high_vpd, CASE loc.variety WHEN 'Arabica' THEN EXTRACT(MONTH FROM d.observation_date) BETWEEN 4 AND 10 WHEN 'Robusta' THEN EXTRACT(MONTH FROM d.observation_date) BETWEEN 4 AND 11 ELSE FALSE - END AS in_growing_season /* Growing season: simplified month-range flag by variety. */ /* Arabica: Apr–Oct (covers northern + southern hemisphere risk windows). */ /* Robusta: Apr–Nov (Vietnam/Indonesia main cycle). */ + END AS in_growing_season /* Growing season: simplified month-range flag by variety. */ + /* Arabica: Apr–Oct (covers northern + southern hemisphere risk windows). */ + /* Robusta: Apr–Nov (Vietnam/Indonesia main cycle). */ FROM deduplicated AS d LEFT JOIN seeds.weather_locations AS loc ON d.location_id = loc.location_id WHERE - d.observation_date BETWEEN @start_ds AND @end_ds \ No newline at end of file + d.observation_date BETWEEN @start_ds AND @end_ds