/* Serving mart: daily weather analytics for 12 coffee-growing regions. */ /* Source: foundation.fct_weather_daily (already has seed join for location metadata). */ /* Adds rolling aggregates, water balance, gaps-and-islands streak counters, */ /* and a composite crop stress index (0–100) as a single severity gauge. */ /* Grain: (location_id, observation_date) */ /* Lookback 90: rolling windows reach up to 30 days, streak counters can extend */ /* up to ~90 days; without lookback a daily run sees only 1 row and all window */ /* functions degrade to single-row values. */ MODEL ( name serving.weather_daily, kind INCREMENTAL_BY_TIME_RANGE ( time_column observation_date, lookback 90 ), grain (location_id, observation_date), start '2020-01-01', cron '@daily' ); WITH base AS ( SELECT observation_date, location_id, location_name, country, lat, lon, variety, temp_min_c, temp_max_c, temp_mean_c, precipitation_mm, humidity_max_pct, cloud_cover_mean_pct, wind_max_speed_ms, et0_mm, vpd_max_kpa, is_frost, is_heat_stress, is_drought, is_high_vpd, in_growing_season, /* Rolling precipitation — w7 = trailing 7 days, w30 = trailing 30 days */ SUM(precipitation_mm) OVER w7 AS precip_sum_7d_mm, SUM(precipitation_mm) OVER w30 AS precip_sum_30d_mm, /* Rolling temperature baseline */ AVG(temp_mean_c) OVER w30 AS temp_mean_30d_c, /* Temperature anomaly: today vs trailing 30-day mean */ temp_mean_c - AVG(temp_mean_c) OVER w30 AS temp_anomaly_c, /* Water balance: net daily water gain/loss (precipitation minus evapotranspiration) */ precipitation_mm - et0_mm AS water_balance_mm, SUM(precipitation_mm - et0_mm) OVER w7 AS water_balance_7d_mm, /* Gaps-and-islands group markers for streak counting. */ /* Pattern: ROW_NUMBER() - running_count_of_true creates a stable group ID */ /* for each consecutive run of TRUE. Rows where flag=FALSE get a unique group ID */ /* (so their streak length stays 0 after the CASE in with_streaks). */ ROW_NUMBER() OVER ( PARTITION BY location_id ORDER BY observation_date ) - SUM( CASE WHEN is_drought THEN 1 ELSE 0 END ) OVER ( PARTITION BY location_id ORDER BY observation_date ROWS UNBOUNDED PRECEDING ) AS _drought_group, ROW_NUMBER() OVER ( PARTITION BY location_id ORDER BY observation_date ) - SUM( CASE WHEN is_heat_stress THEN 1 ELSE 0 END ) OVER ( PARTITION BY location_id ORDER BY observation_date ROWS UNBOUNDED PRECEDING ) AS _heat_group, ROW_NUMBER() OVER ( PARTITION BY location_id ORDER BY observation_date ) - SUM( CASE WHEN is_high_vpd THEN 1 ELSE 0 END ) OVER ( PARTITION BY location_id ORDER BY observation_date ROWS UNBOUNDED PRECEDING ) AS _vpd_group FROM foundation.fct_weather_daily WHERE observation_date BETWEEN @start_ds AND @end_ds WINDOW w7 AS ( PARTITION BY location_id ORDER BY observation_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW ), w30 AS ( PARTITION BY location_id ORDER BY observation_date ROWS BETWEEN 29 PRECEDING AND CURRENT ROW ) ), with_streaks AS ( SELECT base.*, /* Drought streak: number of consecutive dry days ending on observation_date. */ /* Returns 0 when flag is FALSE (not a drought day). */ CASE WHEN NOT is_drought THEN 0 ELSE ROW_NUMBER() OVER ( PARTITION BY location_id, _drought_group ORDER BY observation_date ) END AS drought_streak_days, /* Heat stress streak: consecutive days with temp_max > 35°C */ CASE WHEN NOT is_heat_stress THEN 0 ELSE ROW_NUMBER() OVER ( PARTITION BY location_id, _heat_group ORDER BY observation_date ) END AS heat_streak_days, /* VPD stress streak: consecutive days with vpd_max > 1.5 kPa */ CASE WHEN NOT is_high_vpd THEN 0 ELSE ROW_NUMBER() OVER ( PARTITION BY location_id, _vpd_group ORDER BY observation_date ) END AS vpd_streak_days FROM base ) SELECT observation_date, location_id, location_name, country, lat, lon, variety, temp_min_c, temp_max_c, temp_mean_c, precipitation_mm, humidity_max_pct, cloud_cover_mean_pct, wind_max_speed_ms, et0_mm, vpd_max_kpa, is_frost, is_heat_stress, is_drought, is_high_vpd, in_growing_season, ROUND(precip_sum_7d_mm, 2) AS precip_sum_7d_mm, ROUND(precip_sum_30d_mm, 2) AS precip_sum_30d_mm, ROUND(temp_mean_30d_c, 2) AS temp_mean_30d_c, ROUND(temp_anomaly_c, 2) AS temp_anomaly_c, ROUND(water_balance_mm, 2) AS water_balance_mm, ROUND(water_balance_7d_mm, 2) AS water_balance_7d_mm, drought_streak_days, heat_streak_days, vpd_streak_days, /* Composite crop stress index (0–100). Weights: drought streak 30%, water deficit 25%, heat streak 20%, VPD streak 15%, frost (binary) 10%. Each component is normalized to [0,1] then capped before weighting: drought: 14 days = fully stressed water: 20mm 7d deficit = fully stressed heat: 7 days = fully stressed vpd: 7 days = fully stressed frost: binary (Arabica highland catastrophic event) */ ROUND( GREATEST(0.0, LEAST(100.0, LEAST(1.0, drought_streak_days / 14.0) * 30.0 + LEAST(1.0, GREATEST(0.0, -water_balance_7d_mm) / 20.0) * 25.0 + LEAST(1.0, heat_streak_days / 7.0) * 20.0 + LEAST(1.0, vpd_streak_days / 7.0) * 15.0 + CASE WHEN is_frost THEN 10.0 ELSE 0.0 END )), 1 ) AS crop_stress_index FROM with_streaks ORDER BY location_id, observation_date