feat(web): add weather analytics query functions to analytics.py

Adds ALLOWED_WEATHER_METRICS frozenset and 5 new query functions:
- get_weather_locations(): 12 locations with latest stress index for map/cards
- get_weather_location_series(): time series for one location (dynamic metrics)
- get_weather_stress_latest(): global snapshot for Pulse metric card
- get_weather_stress_trend(): daily global avg/max for chart and sparkline
- get_weather_active_alerts(): locations with active stress flags

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-26 02:39:12 +01:00
parent 07b813198a
commit 127881f7d8

View File

@@ -531,3 +531,185 @@ async def get_country_comparison(
""", """,
[commodity_code, *country_codes], [commodity_code, *country_codes],
) )
# =============================================================================
# Weather Queries
# =============================================================================
# Columns safe for user-facing weather series queries (prevents SQL injection)
ALLOWED_WEATHER_METRICS = frozenset({
"temp_min_c",
"temp_max_c",
"temp_mean_c",
"precipitation_mm",
"humidity_max_pct",
"et0_mm",
"vpd_max_kpa",
"precip_sum_7d_mm",
"precip_sum_30d_mm",
"temp_mean_30d_c",
"temp_anomaly_c",
"water_balance_mm",
"water_balance_7d_mm",
"drought_streak_days",
"heat_streak_days",
"vpd_streak_days",
"crop_stress_index",
})
def _validate_weather_metrics(metrics: list[str]) -> list[str]:
valid = [m for m in metrics if m in ALLOWED_WEATHER_METRICS]
assert valid, f"No valid weather metrics in {metrics}. Allowed: {sorted(ALLOWED_WEATHER_METRICS)}"
return valid
async def get_weather_locations() -> list[dict]:
"""12 locations with latest-day stress index and flags, sorted by severity.
Used by the map and location card grid (overview mode).
"""
return await fetch_analytics(
"""
WITH latest AS (
SELECT MAX(observation_date) AS max_date
FROM serving.weather_daily
)
SELECT
w.location_id,
w.location_name,
w.country,
w.lat,
w.lon,
w.variety,
w.observation_date,
w.crop_stress_index,
w.drought_streak_days,
w.heat_streak_days,
w.vpd_streak_days,
w.precip_sum_7d_mm,
w.precip_sum_30d_mm,
w.temp_anomaly_c,
w.is_frost,
w.is_heat_stress,
w.is_drought,
w.is_high_vpd
FROM serving.weather_daily AS w, latest
WHERE w.observation_date = latest.max_date
ORDER BY w.crop_stress_index DESC
"""
)
async def get_weather_location_series(
location_id: str,
metrics: list[str] | None = None,
days: int = 365,
) -> list[dict]:
"""Time series for one location. limit defaults to 1 year of daily data."""
assert 1 <= days <= 3650, "days must be between 1 and 3650"
if metrics is None:
metrics = [
"temp_mean_c", "precipitation_mm", "crop_stress_index",
"precip_sum_7d_mm", "water_balance_7d_mm", "temp_anomaly_c",
]
metrics = _validate_weather_metrics(metrics)
cols = ", ".join(metrics)
return await fetch_analytics(
f"""
SELECT observation_date, location_id, location_name, country,
is_frost, is_heat_stress, is_drought, is_high_vpd,
{cols}
FROM serving.weather_daily
WHERE location_id = ?
ORDER BY observation_date DESC
LIMIT ?
""",
[location_id, days],
)
async def get_weather_stress_latest() -> dict | None:
"""Global stress snapshot for the latest day.
Returns avg/max stress index, count of locations under stress (index > 20),
worst location name, and observation date. Aggregates 12 rows in DuckDB.
"""
rows = await fetch_analytics(
"""
WITH latest AS (
SELECT MAX(observation_date) AS max_date
FROM serving.weather_daily
)
SELECT
latest.max_date AS observation_date,
ROUND(AVG(w.crop_stress_index), 1) AS avg_crop_stress_index,
MAX(w.crop_stress_index) AS max_crop_stress_index,
COUNT(*) FILTER (WHERE w.crop_stress_index > 20) AS locations_under_stress,
ARG_MAX(w.location_name, w.crop_stress_index) AS worst_location_name,
ARG_MAX(w.country, w.crop_stress_index) AS worst_location_country,
COUNT(*) FILTER (WHERE w.is_frost) AS frost_count,
COUNT(*) FILTER (WHERE w.is_drought) AS drought_count
FROM serving.weather_daily AS w, latest
WHERE w.observation_date = latest.max_date
"""
)
return rows[0] if rows else None
async def get_weather_stress_trend(days: int = 365) -> list[dict]:
"""Daily global stress time series — avg and max across all 12 locations.
Used by the global stress chart and Pulse sparkline.
"""
assert 1 <= days <= 3650, "days must be between 1 and 3650"
return await fetch_analytics(
"""
SELECT
observation_date,
ROUND(AVG(crop_stress_index), 1) AS avg_crop_stress_index,
MAX(crop_stress_index) AS max_crop_stress_index,
COUNT(*) FILTER (WHERE crop_stress_index > 20) AS locations_under_stress
FROM serving.weather_daily
GROUP BY observation_date
ORDER BY observation_date DESC
LIMIT ?
""",
[days],
)
async def get_weather_active_alerts() -> list[dict]:
"""Locations with active stress flags on the latest observation date.
Sorted by crop_stress_index descending.
"""
return await fetch_analytics(
"""
WITH latest AS (
SELECT MAX(observation_date) AS max_date
FROM serving.weather_daily
)
SELECT
w.location_id,
w.location_name,
w.country,
w.variety,
w.observation_date,
w.crop_stress_index,
w.drought_streak_days,
w.heat_streak_days,
w.vpd_streak_days,
w.precip_sum_7d_mm,
w.is_frost,
w.is_heat_stress,
w.is_drought,
w.is_high_vpd
FROM serving.weather_daily AS w, latest
WHERE w.observation_date = latest.max_date
AND (w.is_frost OR w.is_heat_stress OR w.is_drought OR w.is_high_vpd)
ORDER BY w.crop_stress_index DESC
"""
)