diff --git a/transform/sqlmesh_materia/models/serving/weather_daily.sql b/transform/sqlmesh_materia/models/serving/weather_daily.sql new file mode 100644 index 0000000..ec1bc25 --- /dev/null +++ b/transform/sqlmesh_materia/models/serving/weather_daily.sql @@ -0,0 +1,187 @@ +/* Serving mart: daily weather analytics for 12 coffee-growing regions. */ +/* Source: foundation.fct_weather_daily (already has seed join for location metadata). */ +/* Adds rolling aggregates, water balance, gaps-and-islands streak counters, */ +/* and a composite crop stress index (0–100) as a single severity gauge. */ +/* Grain: (location_id, observation_date) */ +/* Lookback 90: rolling windows reach up to 30 days, streak counters can extend */ +/* up to ~90 days; without lookback a daily run sees only 1 row and all window */ +/* functions degrade to single-row values. */ +MODEL ( + name serving.weather_daily, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column observation_date, + lookback 90 + ), + grain (location_id, observation_date), + start '2020-01-01', + cron '@daily' +); + +WITH base AS ( + SELECT + observation_date, + location_id, + location_name, + country, + lat, + lon, + variety, + temp_min_c, + temp_max_c, + temp_mean_c, + precipitation_mm, + humidity_max_pct, + cloud_cover_mean_pct, + wind_max_speed_ms, + et0_mm, + vpd_max_kpa, + is_frost, + is_heat_stress, + is_drought, + is_high_vpd, + in_growing_season, + /* Rolling precipitation — w7 = trailing 7 days, w30 = trailing 30 days */ + SUM(precipitation_mm) OVER w7 AS precip_sum_7d_mm, + SUM(precipitation_mm) OVER w30 AS precip_sum_30d_mm, + /* Rolling temperature baseline */ + AVG(temp_mean_c) OVER w30 AS temp_mean_30d_c, + /* Temperature anomaly: today vs trailing 30-day mean */ + temp_mean_c - AVG(temp_mean_c) OVER w30 AS temp_anomaly_c, + /* Water balance: net daily water gain/loss (precipitation minus evapotranspiration) */ + precipitation_mm - et0_mm AS water_balance_mm, + SUM(precipitation_mm - et0_mm) OVER w7 AS water_balance_7d_mm, + /* Gaps-and-islands group markers for streak counting. */ + /* Pattern: ROW_NUMBER() - running_count_of_true creates a stable group ID */ + /* for each consecutive run of TRUE. Rows where flag=FALSE get a unique group ID */ + /* (so their streak length stays 0 after the CASE in with_streaks). */ + ROW_NUMBER() OVER ( + PARTITION BY location_id + ORDER BY observation_date + ) - SUM( + CASE WHEN is_drought THEN 1 ELSE 0 END + ) OVER ( + PARTITION BY location_id + ORDER BY observation_date + ROWS UNBOUNDED PRECEDING + ) AS _drought_group, + ROW_NUMBER() OVER ( + PARTITION BY location_id + ORDER BY observation_date + ) - SUM( + CASE WHEN is_heat_stress THEN 1 ELSE 0 END + ) OVER ( + PARTITION BY location_id + ORDER BY observation_date + ROWS UNBOUNDED PRECEDING + ) AS _heat_group, + ROW_NUMBER() OVER ( + PARTITION BY location_id + ORDER BY observation_date + ) - SUM( + CASE WHEN is_high_vpd THEN 1 ELSE 0 END + ) OVER ( + PARTITION BY location_id + ORDER BY observation_date + ROWS UNBOUNDED PRECEDING + ) AS _vpd_group + FROM foundation.fct_weather_daily + WHERE + observation_date BETWEEN @start_ds AND @end_ds + WINDOW + w7 AS ( + PARTITION BY location_id + ORDER BY observation_date + ROWS BETWEEN 6 PRECEDING AND CURRENT ROW + ), + w30 AS ( + PARTITION BY location_id + ORDER BY observation_date + ROWS BETWEEN 29 PRECEDING AND CURRENT ROW + ) +), with_streaks AS ( + SELECT + base.*, + /* Drought streak: number of consecutive dry days ending on observation_date. */ + /* Returns 0 when flag is FALSE (not a drought day). */ + CASE + WHEN NOT is_drought + THEN 0 + ELSE ROW_NUMBER() OVER ( + PARTITION BY location_id, _drought_group + ORDER BY observation_date + ) + END AS drought_streak_days, + /* Heat stress streak: consecutive days with temp_max > 35°C */ + CASE + WHEN NOT is_heat_stress + THEN 0 + ELSE ROW_NUMBER() OVER ( + PARTITION BY location_id, _heat_group + ORDER BY observation_date + ) + END AS heat_streak_days, + /* VPD stress streak: consecutive days with vpd_max > 1.5 kPa */ + CASE + WHEN NOT is_high_vpd + THEN 0 + ELSE ROW_NUMBER() OVER ( + PARTITION BY location_id, _vpd_group + ORDER BY observation_date + ) + END AS vpd_streak_days + FROM base +) +SELECT + observation_date, + location_id, + location_name, + country, + lat, + lon, + variety, + temp_min_c, + temp_max_c, + temp_mean_c, + precipitation_mm, + humidity_max_pct, + cloud_cover_mean_pct, + wind_max_speed_ms, + et0_mm, + vpd_max_kpa, + is_frost, + is_heat_stress, + is_drought, + is_high_vpd, + in_growing_season, + ROUND(precip_sum_7d_mm, 2) AS precip_sum_7d_mm, + ROUND(precip_sum_30d_mm, 2) AS precip_sum_30d_mm, + ROUND(temp_mean_30d_c, 2) AS temp_mean_30d_c, + ROUND(temp_anomaly_c, 2) AS temp_anomaly_c, + ROUND(water_balance_mm, 2) AS water_balance_mm, + ROUND(water_balance_7d_mm, 2) AS water_balance_7d_mm, + drought_streak_days, + heat_streak_days, + vpd_streak_days, + /* Composite crop stress index (0–100). + Weights: drought streak 30%, water deficit 25%, heat streak 20%, + VPD streak 15%, frost (binary) 10%. + Each component is normalized to [0,1] then capped before weighting: + drought: 14 days = fully stressed + water: 20mm 7d deficit = fully stressed + heat: 7 days = fully stressed + vpd: 7 days = fully stressed + frost: binary (Arabica highland catastrophic event) */ + ROUND( + GREATEST(0.0, LEAST(100.0, + LEAST(1.0, drought_streak_days / 14.0) * 30.0 + + LEAST(1.0, GREATEST(0.0, -water_balance_7d_mm) / 20.0) * 25.0 + + LEAST(1.0, heat_streak_days / 7.0) * 20.0 + + LEAST(1.0, vpd_streak_days / 7.0) * 15.0 + + CASE WHEN is_frost THEN 10.0 ELSE 0.0 END + )), + 1 + ) AS crop_stress_index +FROM with_streaks +ORDER BY + location_id, + observation_date