/* Foundation fact: daily weather observations for 12 coffee-growing regions. */ /* Source: Open-Meteo (ERA5 reanalysis archive + forecast model for recent days) */ /* Landing: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz */ /* One file per (location_id, date). Content: flat Open-Meteo JSON per day. */ /* Open-Meteo returns parallel arrays; execute.py splits them into per-day files. */ /* Grain: (location_id, observation_date) — one row per location per day. */ /* Dedup key: hash(location_id, date) — past weather is immutable. */ /* location_id is parsed from filename: split(filename, '/')[-3] */ /* Path structure: .../weather/{location_id}/{year}/{date}.json.gz */ /* Crop stress flags: */ /* is_frost — temp_min_c < 2.0°C (ICO Arabica frost damage threshold) */ /* is_heat_stress — temp_max_c > 35.0°C (photosynthesis impairment) */ /* is_drought — precipitation_mm < 1.0 (agronomic dry day) */ /* is_high_vpd — vpd_max_kpa > 1.5 (significant plant water stress) */ /* in_growing_season — simplified month-range flag by variety */ MODEL ( name foundation.fct_weather_daily, kind INCREMENTAL_BY_TIME_RANGE ( time_column observation_date ), grain (location_id, observation_date), start '2020-01-01', cron '@daily' ); WITH src AS ( /* Open-Meteo files are flat JSON: all variables at top level (no nested structs). */ /* read_json(format='auto') infers column types directly from the numeric values. */ SELECT * FROM READ_JSON(@weather_glob(), format = 'auto', compression = 'gzip', filename = TRUE) ), located AS ( SELECT src.*, STR_SPLIT(filename, '/')[-3] AS location_id, /* location_id is the 3rd-from-last path segment: */ /* e.g. .../weather/brazil_minas_gerais/2024/2024-01-15.json.gz → 'brazil_minas_gerais' */ TRY_CAST(src."date" AS DATE) AS observation_date FROM src ), cast_and_clean AS ( SELECT location_id, observation_date, /* Temperature (°C) */ TRY_CAST(located.temperature_2m_min AS DOUBLE) AS temp_min_c, TRY_CAST(located.temperature_2m_max AS DOUBLE) AS temp_max_c, TRY_CAST(located.temperature_2m_mean AS DOUBLE) AS temp_mean_c, /* Precipitation (mm total for the day) */ COALESCE(TRY_CAST(located.precipitation_sum AS DOUBLE), 0.0) AS precipitation_mm, /* Humidity (% — daily max) */ TRY_CAST(located.relative_humidity_2m_max AS DOUBLE) AS humidity_max_pct, /* Cloud cover (% — daily mean) */ TRY_CAST(located.cloud_cover_mean AS DOUBLE) AS cloud_cover_mean_pct, /* Wind (m/s max — Open-Meteo requested with wind_speed_unit=ms) */ TRY_CAST(located.wind_speed_10m_max AS DOUBLE) AS wind_max_speed_ms, /* ET₀ (mm/day — FAO Penman-Monteith; direct crop water demand signal) */ TRY_CAST(located.et0_fao_evapotranspiration AS DOUBLE) AS et0_mm, /* VPD (kPa — max; >1.5 kPa = significant plant water stress) */ TRY_CAST(located.vapour_pressure_deficit_max AS DOUBLE) AS vpd_max_kpa, /* Crop stress flags */ TRY_CAST(located.temperature_2m_min AS DOUBLE) < 2.0 AS is_frost, TRY_CAST(located.temperature_2m_max AS DOUBLE) > 35.0 AS is_heat_stress, COALESCE(TRY_CAST(located.precipitation_sum AS DOUBLE), 0.0) < 1.0 AS is_drought, TRY_CAST(located.vapour_pressure_deficit_max AS DOUBLE) > 1.5 AS is_high_vpd, HASH(location_id, located."date") AS hkey, filename FROM located WHERE NOT observation_date IS NULL AND NOT location_id IS NULL AND location_id <> '' ), deduplicated AS ( SELECT ANY_VALUE(location_id) AS location_id, ANY_VALUE(observation_date) AS observation_date, ANY_VALUE(temp_min_c) AS temp_min_c, ANY_VALUE(temp_max_c) AS temp_max_c, ANY_VALUE(temp_mean_c) AS temp_mean_c, ANY_VALUE(precipitation_mm) AS precipitation_mm, ANY_VALUE(humidity_max_pct) AS humidity_max_pct, ANY_VALUE(cloud_cover_mean_pct) AS cloud_cover_mean_pct, ANY_VALUE(wind_max_speed_ms) AS wind_max_speed_ms, ANY_VALUE(et0_mm) AS et0_mm, ANY_VALUE(vpd_max_kpa) AS vpd_max_kpa, ANY_VALUE(is_frost) AS is_frost, ANY_VALUE(is_heat_stress) AS is_heat_stress, ANY_VALUE(is_drought) AS is_drought, ANY_VALUE(is_high_vpd) AS is_high_vpd, hkey FROM cast_and_clean GROUP BY hkey ) SELECT d.observation_date, d.location_id, loc.name AS location_name, loc.country, loc.lat, loc.lon, loc.variety, d.temp_min_c, d.temp_max_c, d.temp_mean_c, d.precipitation_mm, d.humidity_max_pct, d.cloud_cover_mean_pct, d.wind_max_speed_ms, d.et0_mm, d.vpd_max_kpa, d.is_frost, d.is_heat_stress, d.is_drought, d.is_high_vpd, CASE loc.variety WHEN 'Arabica' THEN EXTRACT(MONTH FROM d.observation_date) BETWEEN 4 AND 10 WHEN 'Robusta' THEN EXTRACT(MONTH FROM d.observation_date) BETWEEN 4 AND 11 ELSE FALSE END AS in_growing_season /* Growing season: simplified month-range flag by variety. */ /* Arabica: Apr–Oct (covers northern + southern hemisphere risk windows). */ /* Robusta: Apr–Nov (Vietnam/Indonesia main cycle). */ FROM deduplicated AS d LEFT JOIN seeds.weather_locations AS loc ON d.location_id = loc.location_id WHERE d.observation_date BETWEEN @start_ds AND @end_ds