Files
Deeman 07b813198a feat(transform): add serving.weather_daily with rolling analytics and crop stress index
Incremental serving model for 12 coffee-growing locations. Adds:
- Rolling aggregates: precip_sum_7d/30d, temp_mean_30d, temp_anomaly, water_balance_7d
- Gaps-and-islands streak counters: drought_streak_days, heat_streak_days, vpd_streak_days
- Composite crop_stress_index 0–100 (drought 30%, water deficit 25%, heat 20%, VPD 15%, frost 10%)
- lookback 90: ensures rolling windows and streak counters see sufficient history on daily runs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 02:39:07 +01:00

188 lines
5.9 KiB
SQL
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/* Serving mart: daily weather analytics for 12 coffee-growing regions. */
/* Source: foundation.fct_weather_daily (already has seed join for location metadata). */
/* Adds rolling aggregates, water balance, gaps-and-islands streak counters, */
/* and a composite crop stress index (0100) as a single severity gauge. */
/* Grain: (location_id, observation_date) */
/* Lookback 90: rolling windows reach up to 30 days, streak counters can extend */
/* up to ~90 days; without lookback a daily run sees only 1 row and all window */
/* functions degrade to single-row values. */
MODEL (
name serving.weather_daily,
kind INCREMENTAL_BY_TIME_RANGE (
time_column observation_date,
lookback 90
),
grain (location_id, observation_date),
start '2020-01-01',
cron '@daily'
);
WITH base AS (
SELECT
observation_date,
location_id,
location_name,
country,
lat,
lon,
variety,
temp_min_c,
temp_max_c,
temp_mean_c,
precipitation_mm,
humidity_max_pct,
cloud_cover_mean_pct,
wind_max_speed_ms,
et0_mm,
vpd_max_kpa,
is_frost,
is_heat_stress,
is_drought,
is_high_vpd,
in_growing_season,
/* Rolling precipitation — w7 = trailing 7 days, w30 = trailing 30 days */
SUM(precipitation_mm) OVER w7 AS precip_sum_7d_mm,
SUM(precipitation_mm) OVER w30 AS precip_sum_30d_mm,
/* Rolling temperature baseline */
AVG(temp_mean_c) OVER w30 AS temp_mean_30d_c,
/* Temperature anomaly: today vs trailing 30-day mean */
temp_mean_c - AVG(temp_mean_c) OVER w30 AS temp_anomaly_c,
/* Water balance: net daily water gain/loss (precipitation minus evapotranspiration) */
precipitation_mm - et0_mm AS water_balance_mm,
SUM(precipitation_mm - et0_mm) OVER w7 AS water_balance_7d_mm,
/* Gaps-and-islands group markers for streak counting. */
/* Pattern: ROW_NUMBER() - running_count_of_true creates a stable group ID */
/* for each consecutive run of TRUE. Rows where flag=FALSE get a unique group ID */
/* (so their streak length stays 0 after the CASE in with_streaks). */
ROW_NUMBER() OVER (
PARTITION BY location_id
ORDER BY observation_date
) - SUM(
CASE WHEN is_drought THEN 1 ELSE 0 END
) OVER (
PARTITION BY location_id
ORDER BY observation_date
ROWS UNBOUNDED PRECEDING
) AS _drought_group,
ROW_NUMBER() OVER (
PARTITION BY location_id
ORDER BY observation_date
) - SUM(
CASE WHEN is_heat_stress THEN 1 ELSE 0 END
) OVER (
PARTITION BY location_id
ORDER BY observation_date
ROWS UNBOUNDED PRECEDING
) AS _heat_group,
ROW_NUMBER() OVER (
PARTITION BY location_id
ORDER BY observation_date
) - SUM(
CASE WHEN is_high_vpd THEN 1 ELSE 0 END
) OVER (
PARTITION BY location_id
ORDER BY observation_date
ROWS UNBOUNDED PRECEDING
) AS _vpd_group
FROM foundation.fct_weather_daily
WHERE
observation_date BETWEEN @start_ds AND @end_ds
WINDOW
w7 AS (
PARTITION BY location_id
ORDER BY observation_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
),
w30 AS (
PARTITION BY location_id
ORDER BY observation_date
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
)
), with_streaks AS (
SELECT
base.*,
/* Drought streak: number of consecutive dry days ending on observation_date. */
/* Returns 0 when flag is FALSE (not a drought day). */
CASE
WHEN NOT is_drought
THEN 0
ELSE ROW_NUMBER() OVER (
PARTITION BY location_id, _drought_group
ORDER BY observation_date
)
END AS drought_streak_days,
/* Heat stress streak: consecutive days with temp_max > 35°C */
CASE
WHEN NOT is_heat_stress
THEN 0
ELSE ROW_NUMBER() OVER (
PARTITION BY location_id, _heat_group
ORDER BY observation_date
)
END AS heat_streak_days,
/* VPD stress streak: consecutive days with vpd_max > 1.5 kPa */
CASE
WHEN NOT is_high_vpd
THEN 0
ELSE ROW_NUMBER() OVER (
PARTITION BY location_id, _vpd_group
ORDER BY observation_date
)
END AS vpd_streak_days
FROM base
)
SELECT
observation_date,
location_id,
location_name,
country,
lat,
lon,
variety,
temp_min_c,
temp_max_c,
temp_mean_c,
precipitation_mm,
humidity_max_pct,
cloud_cover_mean_pct,
wind_max_speed_ms,
et0_mm,
vpd_max_kpa,
is_frost,
is_heat_stress,
is_drought,
is_high_vpd,
in_growing_season,
ROUND(precip_sum_7d_mm, 2) AS precip_sum_7d_mm,
ROUND(precip_sum_30d_mm, 2) AS precip_sum_30d_mm,
ROUND(temp_mean_30d_c, 2) AS temp_mean_30d_c,
ROUND(temp_anomaly_c, 2) AS temp_anomaly_c,
ROUND(water_balance_mm, 2) AS water_balance_mm,
ROUND(water_balance_7d_mm, 2) AS water_balance_7d_mm,
drought_streak_days,
heat_streak_days,
vpd_streak_days,
/* Composite crop stress index (0100).
Weights: drought streak 30%, water deficit 25%, heat streak 20%,
VPD streak 15%, frost (binary) 10%.
Each component is normalized to [0,1] then capped before weighting:
drought: 14 days = fully stressed
water: 20mm 7d deficit = fully stressed
heat: 7 days = fully stressed
vpd: 7 days = fully stressed
frost: binary (Arabica highland catastrophic event) */
ROUND(
GREATEST(0.0, LEAST(100.0,
LEAST(1.0, drought_streak_days / 14.0) * 30.0
+ LEAST(1.0, GREATEST(0.0, -water_balance_7d_mm) / 20.0) * 25.0
+ LEAST(1.0, heat_streak_days / 7.0) * 20.0
+ LEAST(1.0, vpd_streak_days / 7.0) * 15.0
+ CASE WHEN is_frost THEN 10.0 ELSE 0.0 END
)),
1
) AS crop_stress_index
FROM with_streaks
ORDER BY
location_id,
observation_date