diff --git a/web/src/beanflows/analytics.py b/web/src/beanflows/analytics.py index c054c9c..409b309 100644 --- a/web/src/beanflows/analytics.py +++ b/web/src/beanflows/analytics.py @@ -531,3 +531,185 @@ async def get_country_comparison( """, [commodity_code, *country_codes], ) + + +# ============================================================================= +# Weather Queries +# ============================================================================= + +# Columns safe for user-facing weather series queries (prevents SQL injection) +ALLOWED_WEATHER_METRICS = frozenset({ + "temp_min_c", + "temp_max_c", + "temp_mean_c", + "precipitation_mm", + "humidity_max_pct", + "et0_mm", + "vpd_max_kpa", + "precip_sum_7d_mm", + "precip_sum_30d_mm", + "temp_mean_30d_c", + "temp_anomaly_c", + "water_balance_mm", + "water_balance_7d_mm", + "drought_streak_days", + "heat_streak_days", + "vpd_streak_days", + "crop_stress_index", +}) + + +def _validate_weather_metrics(metrics: list[str]) -> list[str]: + valid = [m for m in metrics if m in ALLOWED_WEATHER_METRICS] + assert valid, f"No valid weather metrics in {metrics}. Allowed: {sorted(ALLOWED_WEATHER_METRICS)}" + return valid + + +async def get_weather_locations() -> list[dict]: + """12 locations with latest-day stress index and flags, sorted by severity. + + Used by the map and location card grid (overview mode). + """ + return await fetch_analytics( + """ + WITH latest AS ( + SELECT MAX(observation_date) AS max_date + FROM serving.weather_daily + ) + SELECT + w.location_id, + w.location_name, + w.country, + w.lat, + w.lon, + w.variety, + w.observation_date, + w.crop_stress_index, + w.drought_streak_days, + w.heat_streak_days, + w.vpd_streak_days, + w.precip_sum_7d_mm, + w.precip_sum_30d_mm, + w.temp_anomaly_c, + w.is_frost, + w.is_heat_stress, + w.is_drought, + w.is_high_vpd + FROM serving.weather_daily AS w, latest + WHERE w.observation_date = latest.max_date + ORDER BY w.crop_stress_index DESC + """ + ) + + +async def get_weather_location_series( + location_id: str, + metrics: list[str] | None = None, + days: int = 365, +) -> list[dict]: + """Time series for one location. limit defaults to 1 year of daily data.""" + assert 1 <= days <= 3650, "days must be between 1 and 3650" + if metrics is None: + metrics = [ + "temp_mean_c", "precipitation_mm", "crop_stress_index", + "precip_sum_7d_mm", "water_balance_7d_mm", "temp_anomaly_c", + ] + metrics = _validate_weather_metrics(metrics) + cols = ", ".join(metrics) + + return await fetch_analytics( + f""" + SELECT observation_date, location_id, location_name, country, + is_frost, is_heat_stress, is_drought, is_high_vpd, + {cols} + FROM serving.weather_daily + WHERE location_id = ? + ORDER BY observation_date DESC + LIMIT ? + """, + [location_id, days], + ) + + +async def get_weather_stress_latest() -> dict | None: + """Global stress snapshot for the latest day. + + Returns avg/max stress index, count of locations under stress (index > 20), + worst location name, and observation date. Aggregates 12 rows in DuckDB. + """ + rows = await fetch_analytics( + """ + WITH latest AS ( + SELECT MAX(observation_date) AS max_date + FROM serving.weather_daily + ) + SELECT + latest.max_date AS observation_date, + ROUND(AVG(w.crop_stress_index), 1) AS avg_crop_stress_index, + MAX(w.crop_stress_index) AS max_crop_stress_index, + COUNT(*) FILTER (WHERE w.crop_stress_index > 20) AS locations_under_stress, + ARG_MAX(w.location_name, w.crop_stress_index) AS worst_location_name, + ARG_MAX(w.country, w.crop_stress_index) AS worst_location_country, + COUNT(*) FILTER (WHERE w.is_frost) AS frost_count, + COUNT(*) FILTER (WHERE w.is_drought) AS drought_count + FROM serving.weather_daily AS w, latest + WHERE w.observation_date = latest.max_date + """ + ) + return rows[0] if rows else None + + +async def get_weather_stress_trend(days: int = 365) -> list[dict]: + """Daily global stress time series — avg and max across all 12 locations. + + Used by the global stress chart and Pulse sparkline. + """ + assert 1 <= days <= 3650, "days must be between 1 and 3650" + return await fetch_analytics( + """ + SELECT + observation_date, + ROUND(AVG(crop_stress_index), 1) AS avg_crop_stress_index, + MAX(crop_stress_index) AS max_crop_stress_index, + COUNT(*) FILTER (WHERE crop_stress_index > 20) AS locations_under_stress + FROM serving.weather_daily + GROUP BY observation_date + ORDER BY observation_date DESC + LIMIT ? + """, + [days], + ) + + +async def get_weather_active_alerts() -> list[dict]: + """Locations with active stress flags on the latest observation date. + + Sorted by crop_stress_index descending. + """ + return await fetch_analytics( + """ + WITH latest AS ( + SELECT MAX(observation_date) AS max_date + FROM serving.weather_daily + ) + SELECT + w.location_id, + w.location_name, + w.country, + w.variety, + w.observation_date, + w.crop_stress_index, + w.drought_streak_days, + w.heat_streak_days, + w.vpd_streak_days, + w.precip_sum_7d_mm, + w.is_frost, + w.is_heat_stress, + w.is_drought, + w.is_high_vpd + FROM serving.weather_daily AS w, latest + WHERE w.observation_date = latest.max_date + AND (w.is_frost OR w.is_heat_stress OR w.is_drought OR w.is_high_vpd) + ORDER BY w.crop_stress_index DESC + """ + )