feat(extract): replace OpenWeatherMap with Open-Meteo weather extractor

Replaced the OWM extractor (8 locations, API key required, 14,600-call
backfill over 30+ days) with Open-Meteo (12 locations, no API key,
ERA5 reanalysis, full backfill in 12 API calls ~30 seconds).

- Rename extract/openweathermap → extract/openmeteo (git mv)
- Rewrite api.py: fetch_archive (ERA5, date-range) + fetch_recent (forecast,
  past_days=10 to cover ERA5 lag); 9 daily variables incl. et0 and VPD
- Rewrite execute.py: _split_and_write() unzips parallel arrays into per-day
  flat JSON; no cursor / rate limiting / call cap needed
- Update pipelines.py: --package openmeteo, timeout 120s (was 1200s)
- Update fct_weather_daily.sql: flat Open-Meteo field names (temperature_2m_*
  etc.), remove pressure_afternoon_hpa, add et0_mm + vpd_max_kpa + is_high_vpd
- Remove OPENWEATHERMAP_API_KEY from CLAUDE.md env vars table

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-26 00:59:54 +01:00
parent 4817f7de2f
commit 9de3a3ba01
13 changed files with 412 additions and 458 deletions

View File

@@ -44,7 +44,7 @@ uv run materia secrets get
**Workspace packages** (`pyproject.toml``tool.uv.workspace`): **Workspace packages** (`pyproject.toml``tool.uv.workspace`):
- `extract/psdonline/` — Downloads USDA PSD Online data, normalizes ZIP→gzip CSV, writes to local landing directory - `extract/psdonline/` — Downloads USDA PSD Online data, normalizes ZIP→gzip CSV, writes to local landing directory
- `extract/openweathermap/` — Daily weather for 8 coffee-growing regions (OWM One Call API 3.0) - `extract/openmeteo/` — Daily weather for 12 coffee-growing regions (Open-Meteo, ERA5 reanalysis, no API key)
- `transform/sqlmesh_materia/` — 3-layer SQL transformation pipeline (local DuckDB) - `transform/sqlmesh_materia/` — 3-layer SQL transformation pipeline (local DuckDB)
- `src/materia/` — CLI (Typer) for pipeline execution, worker management, secrets - `src/materia/` — CLI (Typer) for pipeline execution, worker management, secrets
- `web/` — Future web frontend - `web/` — Future web frontend
@@ -52,7 +52,7 @@ uv run materia secrets get
**Data flow:** **Data flow:**
``` ```
USDA API → extract → /data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip USDA API → extract → /data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip
OWM API → extract → /data/materia/landing/weather/{location_id}/{year}/{date}.json.gz Open-Meteo → extract → /data/materia/landing/weather/{location_id}/{year}/{date}.json.gz
→ rclone cron syncs landing/ to R2 → rclone cron syncs landing/ to R2
→ SQLMesh staging → foundation → serving → /data/materia/lakehouse.duckdb → SQLMesh staging → foundation → serving → /data/materia/lakehouse.duckdb
→ Web app reads lakehouse.duckdb (read-only) → Web app reads lakehouse.duckdb (read-only)
@@ -101,4 +101,3 @@ Read `coding_philosophy.md` for the full guide. Key points:
|----------|---------|-------------| |----------|---------|-------------|
| `LANDING_DIR` | `data/landing` | Root directory for extracted landing data | | `LANDING_DIR` | `data/landing` | Root directory for extracted landing data |
| `DUCKDB_PATH` | `local.duckdb` | Path to the DuckDB lakehouse database | | `DUCKDB_PATH` | `local.duckdb` | Path to the DuckDB lakehouse database |
| `OPENWEATHERMAP_API_KEY` | — | OWM One Call API 3.0 key (required for weather extraction) |

View File

@@ -0,0 +1,20 @@
[project]
name = "openmeteo"
version = "0.1.0"
description = "Open-Meteo daily weather extractor for coffee-growing regions"
requires-python = ">=3.13"
dependencies = [
"extract_core",
"niquests>=3.14.1",
]
[project.scripts]
extract_weather = "openmeteo.execute:extract_weather"
extract_weather_backfill = "openmeteo.execute:extract_weather_backfill"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/openmeteo"]

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,116 @@
"""Open-Meteo weather API client.
Two endpoints:
fetch_archive(session, lat, lon, start_date, end_date) -> dict
ERA5 reanalysis data — consistent, scientifically validated.
Available from 1940 to ~5 days ago (reanalysis processing lag).
Use for historical backfill.
fetch_recent(session, lat, lon, past_days) -> dict
Forecast model blended with recent observations.
Covers the last N days + today (fills the ERA5 lag window).
Use for daily updates.
Both return the same structure:
{
"daily": {
"time": ["2020-01-01", "2020-01-02", ...],
"temperature_2m_max": [28.5, 27.1, ...],
...
}
}
No API key required. No rate limits for reasonable usage (~12 calls/day).
"""
import niquests
ARCHIVE_URL = "https://archive-api.open-meteo.com/v1/archive"
FORECAST_URL = "https://api.open-meteo.com/v1/forecast"
HTTP_TIMEOUT_SECONDS = 60
MAX_RESPONSE_BYTES = 2_000_000 # multi-year response ~200 KB; 2 MB is generous
# Variables fetched for each location. All metric units.
# wind_speed_unit=ms ensures m/s (Open-Meteo default is km/h).
DAILY_VARIABLES = ",".join([
"temperature_2m_max",
"temperature_2m_min",
"temperature_2m_mean",
"precipitation_sum",
"wind_speed_10m_max",
"relative_humidity_2m_max",
"cloud_cover_mean",
"et0_fao_evapotranspiration", # FAO Penman-Monteith ET — direct crop water demand signal
"vapour_pressure_deficit_max", # VPD >1.5 kPa = significant plant water stress
])
def _get(session: niquests.Session, url: str, params: dict) -> dict:
"""GET url, validate HTTP 200, return parsed JSON dict."""
response = session.get(url, params=params, timeout=HTTP_TIMEOUT_SECONDS)
assert response.status_code == 200, (
f"Open-Meteo returned HTTP {response.status_code}: {response.text[:300]}"
)
assert len(response.content) <= MAX_RESPONSE_BYTES, (
f"Open-Meteo response unexpectedly large: {len(response.content):,} bytes"
)
data = response.json()
assert isinstance(data, dict), f"Expected dict, got {type(data)}"
# Open-Meteo signals some errors as HTTP 200 with error=true in body
if data.get("error"):
raise ValueError(f"Open-Meteo API error: {data.get('reason', data)}")
assert "daily" in data, f"Open-Meteo response missing 'daily' key: {list(data.keys())}"
assert "time" in data["daily"], "Open-Meteo 'daily' missing 'time' array"
return data
def fetch_archive(
session: niquests.Session,
lat: float,
lon: float,
start_date: str,
end_date: str,
) -> dict:
"""Fetch ERA5 reanalysis daily data for a date range (YYYY-MM-DD strings)."""
assert start_date and len(start_date) == 10, f"start_date must be YYYY-MM-DD, got {start_date!r}"
assert end_date and len(end_date) == 10, f"end_date must be YYYY-MM-DD, got {end_date!r}"
assert start_date <= end_date, f"start_date {start_date} must be <= end_date {end_date}"
return _get(session, ARCHIVE_URL, {
"latitude": lat,
"longitude": lon,
"start_date": start_date,
"end_date": end_date,
"daily": DAILY_VARIABLES,
"wind_speed_unit": "ms",
"timezone": "UTC",
})
def fetch_recent(
session: niquests.Session,
lat: float,
lon: float,
past_days: int = 10,
) -> dict:
"""Fetch recent weather via Open-Meteo forecast model (fills ERA5 lag window).
past_days=10 captures the ~5-day ERA5 lag plus buffer for missed daily runs.
"""
assert 1 <= past_days <= 92, f"past_days must be 1-92, got {past_days}"
return _get(session, FORECAST_URL, {
"latitude": lat,
"longitude": lon,
"daily": DAILY_VARIABLES,
"wind_speed_unit": "ms",
"timezone": "UTC",
"past_days": past_days,
"forecast_days": 1,
})

View File

@@ -0,0 +1,212 @@
"""Open-Meteo daily weather extraction for coffee-growing regions.
Two entry points:
extract_weather()
Daily run: fetches the last 10 days for all 12 locations.
10 days covers the ~5-day ERA5 reanalysis lag plus buffer for missed runs.
Uses the forecast API (fills the recent window not yet in ERA5 archive).
12 API calls total. Completes in ~10 seconds.
extract_weather_backfill()
Historical fill: fetches 2020-01-01 → yesterday for all 12 locations.
Uses the archive API. One date-range request per location = 12 total calls.
Completes in ~30 seconds. No cursor needed.
Landing path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz
Each file is a flat JSON object with Open-Meteo variable names:
{"date": "2020-01-01", "temperature_2m_max": 28.5, "precipitation_sum": 12.5, ...}
No API key required. No rate limiting. Fully idempotent (file existence check).
"""
import gzip
import json
import logging
import os
import sys
import time
from datetime import date, timedelta
from pathlib import Path
import niquests
from extract_core import end_run, landing_path, open_state_db, start_run, write_bytes_atomic
from openmeteo.api import fetch_archive, fetch_recent
from openmeteo.locations import LOCATIONS
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("Open-Meteo Extractor")
LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing"))
LANDING_SUBDIR = "weather"
EXTRACTOR_DAILY = "openmeteo_daily"
EXTRACTOR_BACKFILL = "openmeteo_backfill"
BACKFILL_START = date(2020, 1, 1)
# Small sleep between location calls — polite usage of the free community API.
SLEEP_BETWEEN_LOCATIONS_SECONDS = 0.5
# ── helpers ───────────────────────────────────────────────────────────────────
def _write_day_file(location_id: str, date_str: str, record: dict) -> int:
"""Write one day's weather record as gzipped JSON. Returns bytes written, or 0 if skipped."""
assert location_id and date_str and record
year = date_str[:4]
dest_dir = landing_path(LANDING_DIR, LANDING_SUBDIR, location_id, year)
local_file = dest_dir / f"{date_str}.json.gz"
if local_file.exists():
return 0
compressed = gzip.compress(json.dumps(record, separators=(",", ":")).encode("utf-8"))
bytes_written = write_bytes_atomic(local_file, compressed)
logger.debug(f"Stored {local_file} ({bytes_written:,} bytes)")
return bytes_written
def _split_and_write(location_id: str, response: dict) -> tuple[int, int, int]:
"""Split an Open-Meteo array response into per-day JSON.gz files.
Open-Meteo returns parallel arrays per variable under response['daily']['time'].
We zip these into one flat dict per day and write each as a separate file.
Returns (files_written, files_skipped, bytes_written).
"""
daily = response["daily"]
dates = daily["time"]
variables = [k for k in daily if k != "time"]
files_written = 0
files_skipped = 0
bytes_written_total = 0
for i, date_str in enumerate(dates):
if not date_str:
continue
record = {"date": date_str}
for var in variables:
values = daily[var]
record[var] = values[i] if i < len(values) else None
bw = _write_day_file(location_id, date_str, record)
if bw > 0:
files_written += 1
bytes_written_total += bw
else:
files_skipped += 1
return files_written, files_skipped, bytes_written_total
# ── daily extractor ───────────────────────────────────────────────────────────
def extract_weather() -> None:
"""Fetch the last 10 days of weather for all 12 locations.
Uses Open-Meteo forecast API (past_days=10). 12 API calls. ~10 seconds.
"""
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, EXTRACTOR_DAILY)
files_written = 0
files_skipped = 0
bytes_written_total = 0
try:
with niquests.Session() as session:
for loc in LOCATIONS:
logger.info(f"Fetching recent: {loc['id']} ({loc['country']})")
response = fetch_recent(session, loc["lat"], loc["lon"], past_days=10)
w, s, bw = _split_and_write(loc["id"], response)
files_written += w
files_skipped += s
bytes_written_total += bw
time.sleep(SLEEP_BETWEEN_LOCATIONS_SECONDS)
end_run(
conn, run_id,
status="success",
files_written=files_written,
files_skipped=files_skipped,
bytes_written=bytes_written_total,
cursor_value=date.today().isoformat(),
)
logger.info(f"Daily weather complete: {files_written} new, {files_skipped} skipped")
except Exception as e:
end_run(conn, run_id, status="failed", error_message=str(e))
raise
finally:
conn.close()
# ── backfill extractor ────────────────────────────────────────────────────────
def extract_weather_backfill() -> None:
"""Fetch full weather history (2020-01-01 → yesterday) for all 12 locations.
Uses Open-Meteo archive API (ERA5 reanalysis). One date-range request per
location = 12 calls total. Completes in ~30 seconds.
Idempotent: per-day files already on disk are skipped when splitting the
response. Safe to re-run at any time — will skip what already exists.
"""
yesterday = (date.today() - timedelta(days=1)).isoformat()
start_date = BACKFILL_START.isoformat()
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, EXTRACTOR_BACKFILL)
files_written = 0
files_skipped = 0
bytes_written_total = 0
try:
with niquests.Session() as session:
for loc in LOCATIONS:
logger.info(
f"Backfill {loc['id']} ({loc['country']}) "
f"{start_date}{yesterday}"
)
response = fetch_archive(
session, loc["lat"], loc["lon"],
start_date=start_date,
end_date=yesterday,
)
w, s, bw = _split_and_write(loc["id"], response)
files_written += w
files_skipped += s
bytes_written_total += bw
logger.info(f" {loc['id']}: {w} new, {s} already existed")
time.sleep(SLEEP_BETWEEN_LOCATIONS_SECONDS)
end_run(
conn, run_id,
status="success",
files_written=files_written,
files_skipped=files_skipped,
bytes_written=bytes_written_total,
cursor_value=yesterday,
)
logger.info(
f"Backfill complete: {files_written} new files, "
f"{files_skipped} already existed"
)
except Exception as e:
end_run(conn, run_id, status="failed", error_message=str(e))
raise
finally:
conn.close()
if __name__ == "__main__":
extract_weather()

View File

@@ -1,20 +0,0 @@
[project]
name = "openweathermap"
version = "0.1.0"
description = "OpenWeatherMap daily weather extractor for coffee-growing regions"
requires-python = ">=3.13"
dependencies = [
"extract_core",
"niquests>=3.14.1",
]
[project.scripts]
extract_weather = "openweathermap.execute:extract_weather"
extract_weather_backfill = "openweathermap.execute:extract_weather_backfill"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/openweathermap"]

View File

@@ -1,76 +0,0 @@
"""Thin client for the OpenWeatherMap One Call API 3.0 — Day Summary endpoint.
Endpoint: GET https://api.openweathermap.org/data/3.0/onecall/day_summary
Docs: https://openweathermap.org/api/one-call-3#history_daily_aggregation
Returns one JSON object per (lat, lon, date) with daily aggregates:
temperature.{min,max,morning,afternoon,evening,night}
precipitation.total
humidity.afternoon
cloud_cover.afternoon
wind.max.{speed,direction}
pressure.afternoon
This module contains only the HTTP call and basic response validation.
All business logic (file storage, rate limiting, cursor tracking) lives in execute.py.
"""
import niquests
OWM_BASE_URL = "https://api.openweathermap.org/data/3.0/onecall/day_summary"
HTTP_TIMEOUT_SECONDS = 30
MAX_RESPONSE_BYTES = 10_000 # Day summary is ~500 bytes; 10 KB is a generous bound
class RateLimitError(Exception):
"""Raised when OWM returns HTTP 429 (rate limit exceeded)."""
def fetch_day_summary(
session: niquests.Session,
lat: float,
lon: float,
date_str: str,
api_key: str,
) -> dict:
"""Fetch the OWM One Call 3.0 day summary for a single (lat, lon, date).
date_str must be YYYY-MM-DD format.
Returns the parsed JSON dict on success.
Raises RateLimitError on HTTP 429 — caller is responsible for sleeping and retrying.
Raises AssertionError on any other non-200 status.
"""
assert api_key, "api_key must not be empty"
assert date_str and len(date_str) == 10, f"date_str must be YYYY-MM-DD, got {date_str!r}"
assert -90.0 <= lat <= 90.0, f"lat out of range: {lat}"
assert -180.0 <= lon <= 180.0, f"lon out of range: {lon}"
response = session.get(
OWM_BASE_URL,
params={
"lat": lat,
"lon": lon,
"date": date_str,
"appid": api_key,
"units": "metric",
},
timeout=HTTP_TIMEOUT_SECONDS,
)
if response.status_code == 429:
raise RateLimitError(f"OWM rate limit hit for lat={lat} lon={lon} date={date_str}")
assert response.status_code == 200, (
f"OWM API returned HTTP {response.status_code} for "
f"lat={lat} lon={lon} date={date_str}: {response.text[:200]}"
)
assert len(response.content) <= MAX_RESPONSE_BYTES, (
f"OWM response unexpectedly large ({len(response.content)} bytes) for {date_str}"
)
data = response.json()
assert isinstance(data, dict), f"Expected dict response, got {type(data)}"
assert "date" in data, f"OWM response missing 'date' field: {list(data.keys())}"
return data

View File

@@ -1,330 +0,0 @@
"""OpenWeatherMap daily weather extraction for coffee-growing regions.
Two entry points:
extract_weather()
Daily run: fetches yesterday + today for all 8 locations (16 calls max).
Yesterday is included to cover the midnight edge case — if the daily job
fires just after midnight UTC, today's OWM data may still be partial.
Idempotent: skips if the landing file already exists.
extract_weather_backfill()
Historical fill: iterates (date, location) pairs from 2020-01-01 to
yesterday. Bounded to MAX_CALLS_PER_BACKFILL_RUN per run; re-run daily
to advance. Resumes from cursor on restart.
Landing path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz
Idempotency: file existence check. Past weather is immutable — (location_id, date)
uniquely identifies a file that never changes once written.
Backfill cursor format: '{location_id}:{date}' (e.g. 'brazil_parana:2022-07-15').
Encodes both dimensions so a mid-run crash resumes at the exact (location, date) pair.
"""
import gzip
import json
import logging
import os
import sys
import time
from datetime import date, timedelta
from pathlib import Path
import niquests
from extract_core import end_run, get_last_cursor, landing_path, open_state_db, start_run, write_bytes_atomic
from openweathermap.api import RateLimitError, fetch_day_summary
from openweathermap.locations import LOCATIONS
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("OWM Weather Extractor")
LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing"))
LANDING_SUBDIR = "weather"
EXTRACTOR_DAILY = "owm_weather_daily"
EXTRACTOR_BACKFILL = "owm_weather_backfill"
# Rate limiting: OWM free tier = 1000 calls/day (~0.7/s).
# 1.5s between calls stays comfortably below the limit for the daily run.
# 2.0s for backfill (more conservative, many sequential calls).
SLEEP_BETWEEN_CALLS_SECONDS = 1.5
SLEEP_BETWEEN_BACKFILL_CALLS_SECONDS = 2.0
# On 429: wait 60s, then one retry. If still 429, abort the run.
SLEEP_ON_RATE_LIMIT_SECONDS = 60
MAX_RATE_LIMIT_RETRIES = 1
# Cap backfill at 500 calls per run (~17 min at 2s/call).
# 5-year backfill = 14,600 calls → ~30 runs. Re-run daily until complete.
MAX_CALLS_PER_BACKFILL_RUN = 500
# ── helpers ──────────────────────────────────────────────────────────────────
def _write_weather_file(location_id: str, date_str: str, payload: dict) -> int:
"""Gzip-compress payload JSON and write atomically to the landing zone.
Returns bytes_written, or 0 if the file already exists (idempotent skip).
Path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz
"""
assert location_id, "location_id must not be empty"
assert date_str and len(date_str) == 10, f"date_str must be YYYY-MM-DD, got {date_str!r}"
assert isinstance(payload, dict) and payload, "payload must be a non-empty dict"
year = date_str[:4]
dest_dir = landing_path(LANDING_DIR, LANDING_SUBDIR, location_id, year)
local_file = dest_dir / f"{date_str}.json.gz"
if local_file.exists():
logger.debug(f"Already exists, skipping: {local_file}")
return 0
compressed = gzip.compress(json.dumps(payload, separators=(",", ":")).encode("utf-8"))
bytes_written = write_bytes_atomic(local_file, compressed)
logger.info(f"Stored {local_file} ({bytes_written:,} bytes)")
return bytes_written
def _fetch_with_retry(session: niquests.Session, loc: dict, date_str: str, api_key: str) -> dict | None:
"""Fetch OWM day summary with one 429-retry.
Returns the JSON dict on success, or None if rate limit persists after retry.
"""
for attempt in range(MAX_RATE_LIMIT_RETRIES + 1):
try:
return fetch_day_summary(session, loc["lat"], loc["lon"], date_str, api_key)
except RateLimitError:
if attempt < MAX_RATE_LIMIT_RETRIES:
logger.warning(
f"Rate limit hit for {loc['id']} {date_str}"
f"sleeping {SLEEP_ON_RATE_LIMIT_SECONDS}s before retry"
)
time.sleep(SLEEP_ON_RATE_LIMIT_SECONDS)
else:
logger.error(f"Rate limit persisted after retry for {loc['id']} {date_str}")
return None
return None # unreachable; satisfies type checker
def _file_exists(location_id: str, date_str: str) -> bool:
year = date_str[:4]
return (LANDING_DIR / LANDING_SUBDIR / location_id / year / f"{date_str}.json.gz").exists()
# ── daily extractor ───────────────────────────────────────────────────────────
def extract_weather() -> None:
"""Fetch yesterday + today weather for all 8 coffee-growing locations.
Up to 16 API calls. Both days are skipped if files already exist,
so re-running costs zero API calls (fully idempotent).
"""
api_key = os.environ.get("OPENWEATHERMAP_API_KEY", "")
assert api_key, "OPENWEATHERMAP_API_KEY environment variable must be set"
today = date.today()
yesterday = today - timedelta(days=1)
dates_to_fetch = [yesterday.isoformat(), today.isoformat()]
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, EXTRACTOR_DAILY)
files_written = 0
files_skipped = 0
bytes_written_total = 0
try:
with niquests.Session() as session:
for date_str in dates_to_fetch:
for loc in LOCATIONS:
if _file_exists(loc["id"], date_str):
logger.info(f"Already exists: {loc['id']} {date_str}")
files_skipped += 1
continue
data = _fetch_with_retry(session, loc, date_str, api_key)
if data is None:
logger.error(f"Skipping {loc['id']} {date_str} after persistent rate limit")
continue
bw = _write_weather_file(loc["id"], date_str, data)
if bw > 0:
files_written += 1
bytes_written_total += bw
else:
files_skipped += 1
time.sleep(SLEEP_BETWEEN_CALLS_SECONDS)
end_run(
conn, run_id,
status="success",
files_written=files_written,
files_skipped=files_skipped,
bytes_written=bytes_written_total,
cursor_value=today.isoformat(),
)
logger.info(f"Daily weather complete: {files_written} new, {files_skipped} skipped")
except Exception as e:
end_run(conn, run_id, status="failed", error_message=str(e))
raise
finally:
conn.close()
# ── backfill extractor ────────────────────────────────────────────────────────
def extract_weather_backfill() -> None:
"""Fill historical weather data from 2020-01-01 to yesterday.
Iterates (date, location) pairs in date-ascending, LOCATIONS-list order.
Bounded to MAX_CALLS_PER_BACKFILL_RUN per run — re-run daily to advance.
Cursor format: '{location_id}:{date}' (e.g. 'brazil_parana:2022-07-15').
Encodes both dimensions: on resume, all pairs at or before the cursor are
skipped (via cursor comparison first, then file-existence check).
5-year backfill (20202025) = 14,600 calls. At 500/run = ~30 runs.
429 handling: sleep 60s, one retry. If still 429, save cursor and exit
with status='failed' so the cursor does not advance beyond the last
successfully written pair. Safe to re-run the next day.
"""
api_key = os.environ.get("OPENWEATHERMAP_API_KEY", "")
assert api_key, "OPENWEATHERMAP_API_KEY environment variable must be set"
start = date(2020, 1, 1)
end = date.today() - timedelta(days=1) # never fetch today in backfill
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, EXTRACTOR_BACKFILL)
files_written = 0
files_skipped = 0
bytes_written_total = 0
calls_made = 0
last_cursor: str | None = None
# Load resume cursor from last successful run
resume_cursor = get_last_cursor(conn, EXTRACTOR_BACKFILL)
if resume_cursor:
logger.info(f"Resuming backfill from cursor: {resume_cursor}")
else:
logger.info(f"Starting fresh backfill from {start.isoformat()}")
# Parse cursor into (location_id, date_str) for skip comparison
resume_location_id: str | None = None
resume_date_str: str | None = None
if resume_cursor and ":" in resume_cursor:
resume_location_id, resume_date_str = resume_cursor.split(":", 1)
location_ids = [loc["id"] for loc in LOCATIONS]
resume_loc_idx = -1
if resume_location_id and resume_location_id in location_ids:
resume_loc_idx = location_ids.index(resume_location_id)
try:
with niquests.Session() as session:
current = start
while current <= end:
date_str = current.isoformat()
for loc in LOCATIONS:
loc_idx = location_ids.index(loc["id"])
# Cursor-based skip: (date, loc_idx) <= (resume_date, resume_loc_idx)
# This skips everything already processed in previous runs.
if resume_date_str:
if date_str < resume_date_str:
files_skipped += 1
continue
if date_str == resume_date_str and loc_idx <= resume_loc_idx:
files_skipped += 1
continue
# File-existence check: idempotency guard for files already on disk
# (e.g. written by the daily extractor, or a previous partial run)
if _file_exists(loc["id"], date_str):
files_skipped += 1
last_cursor = f"{loc['id']}:{date_str}"
continue
# Per-run call cap
if calls_made >= MAX_CALLS_PER_BACKFILL_RUN:
logger.info(
f"Reached cap of {MAX_CALLS_PER_BACKFILL_RUN} calls. "
f"Re-run to continue from {last_cursor or resume_cursor}"
)
end_run(
conn, run_id,
status="success",
files_written=files_written,
files_skipped=files_skipped,
bytes_written=bytes_written_total,
cursor_value=last_cursor or resume_cursor,
)
return
data = _fetch_with_retry(session, loc, date_str, api_key)
calls_made += 1
if data is None:
logger.warning(f"Persistent rate limit at {loc['id']} {date_str} — stopping run")
end_run(
conn, run_id,
status="failed",
files_written=files_written,
files_skipped=files_skipped,
bytes_written=bytes_written_total,
cursor_value=last_cursor or resume_cursor,
error_message="Persistent rate limit — resume from cursor",
)
return
bw = _write_weather_file(loc["id"], date_str, data)
if bw > 0:
files_written += 1
bytes_written_total += bw
else:
files_skipped += 1
last_cursor = f"{loc['id']}:{date_str}"
time.sleep(SLEEP_BETWEEN_BACKFILL_CALLS_SECONDS)
current += timedelta(days=1)
final_cursor = last_cursor or resume_cursor or end.isoformat()
logger.info(
f"Backfill complete: {files_written} written, "
f"{files_skipped} skipped, {calls_made} API calls"
)
end_run(
conn, run_id,
status="success",
files_written=files_written,
files_skipped=files_skipped,
bytes_written=bytes_written_total,
cursor_value=final_cursor,
)
except Exception as e:
end_run(
conn, run_id,
status="failed",
files_written=files_written,
files_skipped=files_skipped,
bytes_written=bytes_written_total,
cursor_value=last_cursor or resume_cursor,
error_message=str(e),
)
raise
finally:
conn.close()
if __name__ == "__main__":
extract_weather()

View File

@@ -46,7 +46,7 @@ sqlmesh_materia = {workspace = true }
cftc_cot = {workspace = true } cftc_cot = {workspace = true }
coffee_prices = {workspace = true } coffee_prices = {workspace = true }
ice_stocks = {workspace = true } ice_stocks = {workspace = true }
openweathermap = {workspace = true } openmeteo = {workspace = true }
[tool.uv.workspace] [tool.uv.workspace]
members = [ members = [
"extract/*", "extract/*",

View File

@@ -41,12 +41,12 @@ PIPELINES = {
"timeout_seconds": 1800, "timeout_seconds": 1800,
}, },
"extract_weather": { "extract_weather": {
"command": ["uv", "run", "--package", "openweathermap", "extract_weather"], "command": ["uv", "run", "--package", "openmeteo", "extract_weather"],
"timeout_seconds": 300, "timeout_seconds": 120,
}, },
"extract_weather_backfill": { "extract_weather_backfill": {
"command": ["uv", "run", "--package", "openweathermap", "extract_weather_backfill"], "command": ["uv", "run", "--package", "openmeteo", "extract_weather_backfill"],
"timeout_seconds": 1200, "timeout_seconds": 120,
}, },
"extract_all": { "extract_all": {
"command": ["meta", "extract", "extract_cot", "extract_prices", "extract_ice_all", "extract_weather"], "command": ["meta", "extract", "extract_cot", "extract_prices", "extract_ice_all", "extract_weather"],

View File

@@ -1,4 +1,18 @@
/* Foundation fact: daily weather observations for 8 coffee-growing regions. */ /* Source: OpenWeatherMap One Call API 3.0 / Day Summary */ /* Landing: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz */ /* One file per (location_id, date). Content: raw OWM day summary JSON. */ /* Each file is a single JSON object (not newline-delimited), so format='auto'. */ /* Grain: (location_id, observation_date) — one row per location per day. */ /* Dedup key: hash(location_id, date) — past weather is immutable. */ /* location_id is parsed from the filename path: split(filename, '/')[-3] */ /* Path structure: .../weather/{location_id}/{year}/{date}.json.gz */ /* Crop stress flags (agronomic thresholds for Arabica coffee): */ /* is_frost — temp_min_c < 2.0°C (ICO frost damage threshold) */ /* is_heat_stress — temp_max_c > 35.0°C (photosynthesis impairment) */ /* is_drought — precipitation_mm < 1.0 (dry day; OWM omits field when 0) */ /* in_growing_season — simplified month-range flag by variety */ /* Foundation fact: daily weather observations for 12 coffee-growing regions. */
/* Source: Open-Meteo (ERA5 reanalysis archive + forecast model for recent days) */
/* Landing: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz */
/* One file per (location_id, date). Content: flat Open-Meteo JSON per day. */
/* Open-Meteo returns parallel arrays; execute.py splits them into per-day files. */
/* Grain: (location_id, observation_date) — one row per location per day. */
/* Dedup key: hash(location_id, date) — past weather is immutable. */
/* location_id is parsed from filename: split(filename, '/')[-3] */
/* Path structure: .../weather/{location_id}/{year}/{date}.json.gz */
/* Crop stress flags: */
/* is_frost — temp_min_c < 2.0°C (ICO Arabica frost damage threshold) */
/* is_heat_stress — temp_max_c > 35.0°C (photosynthesis impairment) */
/* is_drought — precipitation_mm < 1.0 (agronomic dry day) */
/* is_high_vpd — vpd_max_kpa > 1.5 (significant plant water stress) */
/* in_growing_season — simplified month-range flag by variety */
MODEL ( MODEL (
name foundation.fct_weather_daily, name foundation.fct_weather_daily,
kind INCREMENTAL_BY_TIME_RANGE ( kind INCREMENTAL_BY_TIME_RANGE (
@@ -10,31 +24,43 @@ MODEL (
); );
WITH src AS ( WITH src AS (
/* Each file is a single JSON object with nested fields: */ /* temperature.{min,max,afternoon,morning,evening,night} */ /* precipitation.total (absent when 0 — COALESCE to 0 downstream) */ /* humidity.afternoon */ /* cloud_cover.afternoon */ /* wind.max.{speed,direction} */ /* pressure.afternoon */ /* DuckDB read_json(format='auto') creates STRUCT columns for nested objects; */ /* fields are accessed with dot notation (temperature.min, wind.max.speed). */ /* Open-Meteo files are flat JSON: all variables at top level (no nested structs). */
/* read_json(format='auto') infers column types directly from the numeric values. */
SELECT SELECT
* *
FROM READ_JSON(@weather_glob(), format = 'auto', compression = 'gzip', filename = TRUE) FROM READ_JSON(@weather_glob(), format = 'auto', compression = 'gzip', filename = TRUE)
), located AS ( ), located AS (
SELECT SELECT
src.*, src.*,
STR_SPLIT(filename, '/')[-3] AS location_id, /* location_id is the 3rd-from-last path segment: */ /* e.g. .../weather/brazil_minas_gerais/2024/2024-01-15.json.gz → 'brazil_minas_gerais' */ STR_SPLIT(filename, '/')[-3] AS location_id, /* location_id is the 3rd-from-last path segment: */
/* e.g. .../weather/brazil_minas_gerais/2024/2024-01-15.json.gz → 'brazil_minas_gerais' */
TRY_CAST(src."date" AS DATE) AS observation_date TRY_CAST(src."date" AS DATE) AS observation_date
FROM src FROM src
), cast_and_clean AS ( ), cast_and_clean AS (
SELECT SELECT
location_id, location_id,
observation_date, observation_date,
TRY_CAST(located.temperature.min AS DOUBLE) AS temp_min_c, /* Temperature (°C, metric units) */ /* Temperature (°C) */
TRY_CAST(located.temperature.max AS DOUBLE) AS temp_max_c, TRY_CAST(located.temperature_2m_min AS DOUBLE) AS temp_min_c,
TRY_CAST(located.temperature.afternoon AS DOUBLE) AS temp_afternoon_c, TRY_CAST(located.temperature_2m_max AS DOUBLE) AS temp_max_c,
COALESCE(TRY_CAST(located.precipitation.total AS DOUBLE), 0.0) AS precipitation_mm, /* Precipitation (mm total for the day; OWM omits field when 0) */ TRY_CAST(located.temperature_2m_mean AS DOUBLE) AS temp_mean_c,
TRY_CAST(located.humidity.afternoon AS DOUBLE) AS humidity_afternoon_pct, /* Humidity (% afternoon reading) */ /* Precipitation (mm total for the day) */
TRY_CAST(located.cloud_cover.afternoon AS DOUBLE) AS cloud_cover_afternoon_pct, /* Cloud cover (% afternoon) */ COALESCE(TRY_CAST(located.precipitation_sum AS DOUBLE), 0.0) AS precipitation_mm,
TRY_CAST(located.wind.max.speed AS DOUBLE) AS wind_max_speed_ms, /* Wind (m/s max speed, degrees direction) */ /* Humidity (% — daily max) */
TRY_CAST(located.pressure.afternoon AS DOUBLE) AS pressure_afternoon_hpa, /* Pressure (hPa afternoon) */ TRY_CAST(located.relative_humidity_2m_max AS DOUBLE) AS humidity_max_pct,
TRY_CAST(located.temperature.min AS DOUBLE) /* Crop stress flags */ < 2.0 AS is_frost, /* Cloud cover (% — daily mean) */
TRY_CAST(located.temperature.max AS DOUBLE) > 35.0 AS is_heat_stress, TRY_CAST(located.cloud_cover_mean AS DOUBLE) AS cloud_cover_mean_pct,
COALESCE(TRY_CAST(located.precipitation.total AS DOUBLE), 0.0) < 1.0 AS is_drought, /* Wind (m/s max — Open-Meteo requested with wind_speed_unit=ms) */
TRY_CAST(located.wind_speed_10m_max AS DOUBLE) AS wind_max_speed_ms,
/* ET₀ (mm/day — FAO Penman-Monteith; direct crop water demand signal) */
TRY_CAST(located.et0_fao_evapotranspiration AS DOUBLE) AS et0_mm,
/* VPD (kPa — max; >1.5 kPa = significant plant water stress) */
TRY_CAST(located.vapour_pressure_deficit_max AS DOUBLE) AS vpd_max_kpa,
/* Crop stress flags */
TRY_CAST(located.temperature_2m_min AS DOUBLE) < 2.0 AS is_frost,
TRY_CAST(located.temperature_2m_max AS DOUBLE) > 35.0 AS is_heat_stress,
COALESCE(TRY_CAST(located.precipitation_sum AS DOUBLE), 0.0) < 1.0 AS is_drought,
TRY_CAST(located.vapour_pressure_deficit_max AS DOUBLE) > 1.5 AS is_high_vpd,
HASH(location_id, src."date") AS hkey, HASH(location_id, src."date") AS hkey,
filename filename
FROM located FROM located
@@ -46,15 +72,17 @@ WITH src AS (
ANY_VALUE(observation_date) AS observation_date, ANY_VALUE(observation_date) AS observation_date,
ANY_VALUE(temp_min_c) AS temp_min_c, ANY_VALUE(temp_min_c) AS temp_min_c,
ANY_VALUE(temp_max_c) AS temp_max_c, ANY_VALUE(temp_max_c) AS temp_max_c,
ANY_VALUE(temp_afternoon_c) AS temp_afternoon_c, ANY_VALUE(temp_mean_c) AS temp_mean_c,
ANY_VALUE(precipitation_mm) AS precipitation_mm, ANY_VALUE(precipitation_mm) AS precipitation_mm,
ANY_VALUE(humidity_afternoon_pct) AS humidity_afternoon_pct, ANY_VALUE(humidity_max_pct) AS humidity_max_pct,
ANY_VALUE(cloud_cover_afternoon_pct) AS cloud_cover_afternoon_pct, ANY_VALUE(cloud_cover_mean_pct) AS cloud_cover_mean_pct,
ANY_VALUE(wind_max_speed_ms) AS wind_max_speed_ms, ANY_VALUE(wind_max_speed_ms) AS wind_max_speed_ms,
ANY_VALUE(pressure_afternoon_hpa) AS pressure_afternoon_hpa, ANY_VALUE(et0_mm) AS et0_mm,
ANY_VALUE(vpd_max_kpa) AS vpd_max_kpa,
ANY_VALUE(is_frost) AS is_frost, ANY_VALUE(is_frost) AS is_frost,
ANY_VALUE(is_heat_stress) AS is_heat_stress, ANY_VALUE(is_heat_stress) AS is_heat_stress,
ANY_VALUE(is_drought) AS is_drought, ANY_VALUE(is_drought) AS is_drought,
ANY_VALUE(is_high_vpd) AS is_high_vpd,
hkey hkey
FROM cast_and_clean FROM cast_and_clean
GROUP BY GROUP BY
@@ -70,22 +98,26 @@ SELECT
loc.variety, loc.variety,
d.temp_min_c, d.temp_min_c,
d.temp_max_c, d.temp_max_c,
d.temp_afternoon_c, d.temp_mean_c,
d.precipitation_mm, d.precipitation_mm,
d.humidity_afternoon_pct, d.humidity_max_pct,
d.cloud_cover_afternoon_pct, d.cloud_cover_mean_pct,
d.wind_max_speed_ms, d.wind_max_speed_ms,
d.pressure_afternoon_hpa, d.et0_mm,
d.vpd_max_kpa,
d.is_frost, d.is_frost,
d.is_heat_stress, d.is_heat_stress,
d.is_drought, d.is_drought,
d.is_high_vpd,
CASE loc.variety CASE loc.variety
WHEN 'Arabica' WHEN 'Arabica'
THEN EXTRACT(MONTH FROM d.observation_date) BETWEEN 4 AND 10 THEN EXTRACT(MONTH FROM d.observation_date) BETWEEN 4 AND 10
WHEN 'Robusta' WHEN 'Robusta'
THEN EXTRACT(MONTH FROM d.observation_date) BETWEEN 4 AND 11 THEN EXTRACT(MONTH FROM d.observation_date) BETWEEN 4 AND 11
ELSE FALSE ELSE FALSE
END AS in_growing_season /* Growing season: simplified month-range flag by variety. */ /* Arabica: AprOct (covers northern + southern hemisphere risk windows). */ /* Robusta: AprNov (Vietnam/Indonesia main cycle). */ END AS in_growing_season /* Growing season: simplified month-range flag by variety. */
/* Arabica: AprOct (covers northern + southern hemisphere risk windows). */
/* Robusta: AprNov (Vietnam/Indonesia main cycle). */
FROM deduplicated AS d FROM deduplicated AS d
LEFT JOIN seeds.weather_locations AS loc LEFT JOIN seeds.weather_locations AS loc
ON d.location_id = loc.location_id ON d.location_id = loc.location_id