refactor: rename materia → beanflows throughout codebase
Some checks failed
CI / test-cli (push) Failing after 5s
CI / test-sqlmesh (push) Failing after 4s
CI / test-web (push) Failing after 5s
CI / tag (push) Has been skipped

- Rename src/materia/ → src/beanflows/ (Python package)
- Rename transform/sqlmesh_materia/ → transform/sqlmesh_beanflows/
- Rename infra/supervisor/materia-supervisor.service → beanflows-supervisor.service
- Rename infra/backup/materia-backup.{service,timer} → beanflows-backup.{service,timer}
- Update all path strings: /opt/materia → /opt/beanflows, /data/materia → /data/beanflows
- Update pyproject.toml: project name, CLI entrypoint, workspace source key
- Update all internal imports from materia.* → beanflows.*
- Update infra scripts: REPO_DIR, service names, systemctl references
- Fix docker-compose.prod.yml: /data/materia → /data/beanflows (bind mount path)

Intentionally left unchanged: Pulumi stack name (materia-infrastructure) and
Hetzner resource names ("materia-key", "managed_by: materia") — these reference
live cloud infrastructure and require separate cloud-side renames.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-28 23:00:52 +01:00
parent 9ea4f09600
commit d14990bb01
58 changed files with 128 additions and 93 deletions

View File

@@ -0,0 +1,63 @@
/* Serving mart: KC=F Coffee C futures prices, analytics-ready. */ /* Adds moving averages (20-day, 50-day SMA) and 52-week high/low range. */ /* Filtered to trading days only (NULL close rows excluded upstream). */ /* Grain: one row per trade_date. */
MODEL (
name serving.coffee_prices,
kind INCREMENTAL_BY_TIME_RANGE (
time_column trade_date
),
grain (
trade_date
),
start '1971-08-16',
cron '@daily'
);
WITH base AS (
SELECT
f.trade_date,
f.open,
f.high,
f.low,
f.close,
f.adj_close,
f.volume,
ROUND(
(
f.close - LAG(f.close, 1) OVER (ORDER BY f.trade_date)
) / NULLIF(LAG(f.close, 1) OVER (ORDER BY f.trade_date), 0) * 100,
4
) AS daily_return_pct, /* Daily return: (close - prev_close) / prev_close * 100 */
ROUND(
AVG(f.close) OVER (ORDER BY f.trade_date ROWS BETWEEN 19 PRECEDING AND CURRENT ROW),
4
) AS sma_20d, /* 20-day simple moving average (1 trading month) */
ROUND(
AVG(f.close) OVER (ORDER BY f.trade_date ROWS BETWEEN 49 PRECEDING AND CURRENT ROW),
4
) AS sma_50d, /* 50-day simple moving average (2.5 trading months) */
MAX(f.high) OVER (ORDER BY f.trade_date ROWS BETWEEN 251 PRECEDING AND CURRENT ROW) AS high_52w, /* 52-week high (approximately 252 trading days) */
MIN(f.low) OVER (ORDER BY f.trade_date ROWS BETWEEN 251 PRECEDING AND CURRENT ROW) AS low_52w /* 52-week low */
FROM foundation.fct_coffee_prices AS f
WHERE
f.trade_date BETWEEN @start_ds AND @end_ds
)
SELECT
b.trade_date,
d.commodity_name,
d.ticker,
b.open,
b.high,
b.low,
b.close,
b.adj_close,
b.volume,
b.daily_return_pct,
b.sma_20d,
b.sma_50d,
b.high_52w,
b.low_52w
FROM base AS b
CROSS JOIN foundation.dim_commodity AS d
WHERE
d.ticker = 'KC=F'
ORDER BY
b.trade_date

View File

@@ -0,0 +1,51 @@
/* Serving mart: ICE certified Coffee C stock aging report, analytics-ready. */ /* Shows the age distribution of certified stocks across delivery ports. */ /* Age buckets represent how long coffee has been in certified storage. */ /* Older stock approaching certificate limits is a supply quality signal. */ /* Source: ICE Certified Stock Aging Report (monthly) */ /* Grain: one row per (report_date, age_bucket). */
MODEL (
name serving.ice_aging_stocks,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (report_date, age_bucket),
start '2020-01-01',
cron '@daily'
);
WITH base AS (
SELECT
f.report_date,
f.age_bucket,
TRY_CAST(SPLIT_PART(f.age_bucket, ' to ', 1) AS INT) AS age_bucket_start_days, /* Parse age range from "0000 to 0120" format for correct sort order */
TRY_CAST(SPLIT_PART(f.age_bucket, ' to ', 2) AS INT) AS age_bucket_end_days,
f.antwerp_bags,
f.hamburg_bremen_bags,
f.houston_bags,
f.miami_bags,
f.new_orleans_bags,
f.new_york_bags,
f.total_bags,
f.source_file
FROM foundation.fct_ice_aging_stocks AS f
WHERE
f.report_date BETWEEN @start_ds AND @end_ds
)
SELECT
b.report_date,
d.commodity_name,
d.ice_stock_report_code,
b.age_bucket,
b.age_bucket_start_days,
b.age_bucket_end_days,
b.antwerp_bags,
b.hamburg_bremen_bags,
b.houston_bags,
b.miami_bags,
b.new_orleans_bags,
b.new_york_bags,
b.total_bags,
b.source_file
FROM base AS b
CROSS JOIN foundation.dim_commodity AS d
WHERE
d.ice_stock_report_code = 'COFFEE-C'
ORDER BY
b.report_date,
b.age_bucket_start_days

View File

@@ -0,0 +1,53 @@
/* Serving mart: ICE certified Coffee C warehouse stocks, analytics-ready. */ /* Adds 30-day rolling average, week-over-week change, and drawdown from */ /* 52-week high. Physical supply indicator used alongside S/D and positioning. */ /* "Certified stocks" = coffee graded and stamped as eligible for delivery */ /* against ICE Coffee C futures — traders watch this as a squeeze indicator. */ /* Grain: one row per report_date. */
MODEL (
name serving.ice_warehouse_stocks,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (
report_date
),
start '2000-01-01',
cron '@daily'
);
WITH base AS (
SELECT
f.report_date,
f.total_certified_bags,
f.pending_grading_bags,
f.total_certified_bags /* Week-over-week change (compare to 7 calendar days ago via LAG over ordered rows) */ /* Using LAG(1) since data is daily: compares to previous trading/reporting day */ - LAG(f.total_certified_bags, 1) OVER (ORDER BY f.report_date) AS wow_change_bags,
ROUND(
AVG(f.total_certified_bags::DOUBLE) OVER (ORDER BY f.report_date ROWS BETWEEN 29 PRECEDING AND CURRENT ROW),
0
) AS avg_30d_bags, /* 30-day rolling average (smooths daily noise) */
MAX(f.total_certified_bags) OVER (ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW) AS high_52w_bags, /* 52-week high (365 calendar days ≈ 252 trading days; use 365-row window as proxy) */
ROUND(
(
f.total_certified_bags::DOUBLE - MAX(f.total_certified_bags) OVER (ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW)::DOUBLE
) / NULLIF(
MAX(f.total_certified_bags) OVER (ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW)::DOUBLE,
0
) * 100,
2
) AS drawdown_from_52w_high_pct /* Drawdown from 52-week high (pct below peak — squeeze indicator) */
FROM foundation.fct_ice_warehouse_stocks AS f
WHERE
f.report_date BETWEEN @start_ds AND @end_ds
)
SELECT
b.report_date,
d.commodity_name,
d.ice_stock_report_code,
b.total_certified_bags,
b.pending_grading_bags,
b.wow_change_bags,
b.avg_30d_bags,
b.high_52w_bags,
b.drawdown_from_52w_high_pct
FROM base AS b
CROSS JOIN foundation.dim_commodity AS d
WHERE
d.ice_stock_report_code = 'COFFEE-C'
ORDER BY
b.report_date

View File

@@ -0,0 +1,64 @@
/* Serving mart: ICE certified Coffee C warehouse stocks by port, analytics-ready. */ /* End-of-month certified stock levels broken down by delivery port. */ /* Covers November 1996 to present (~30 years). Useful for understanding */ /* geographic shifts in the certified supply base over time. */ /* Source: ICE historical by-port XLS (EOM_KC_cert_stox_by_port_nov96-present.xls) */ /* Grain: one row per report_date (end-of-month). */
MODEL (
name serving.ice_warehouse_stocks_by_port,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (
report_date
),
start '1996-11-01',
cron '@daily'
);
WITH base AS (
SELECT
f.report_date,
f.new_york_bags,
f.new_orleans_bags,
f.houston_bags,
f.miami_bags,
f.antwerp_bags,
f.hamburg_bremen_bags,
f.barcelona_bags,
f.virginia_bags,
f.total_bags,
f.total_bags /* Month-over-month change in total certified bags */ - LAG(f.total_bags, 1) OVER (ORDER BY f.report_date) AS mom_change_bags,
ROUND(
(
f.total_bags::DOUBLE - LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::DOUBLE
) / NULLIF(LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::DOUBLE, 0) * 100,
2
) AS mom_change_pct, /* Month-over-month percent change */
ROUND(
AVG(f.total_bags::DOUBLE) OVER (ORDER BY f.report_date ROWS BETWEEN 11 PRECEDING AND CURRENT ROW),
0
) AS avg_12m_bags, /* 12-month rolling average */
f.source_file
FROM foundation.fct_ice_warehouse_stocks_by_port AS f
WHERE
f.report_date BETWEEN @start_ds AND @end_ds
)
SELECT
b.report_date,
d.commodity_name,
d.ice_stock_report_code,
b.new_york_bags,
b.new_orleans_bags,
b.houston_bags,
b.miami_bags,
b.antwerp_bags,
b.hamburg_bremen_bags,
b.barcelona_bags,
b.virginia_bags,
b.total_bags,
b.mom_change_bags,
b.mom_change_pct,
b.avg_12m_bags,
b.source_file
FROM base AS b
CROSS JOIN foundation.dim_commodity AS d
WHERE
d.ice_stock_report_code = 'COFFEE-C'
ORDER BY
b.report_date

View File

@@ -0,0 +1,126 @@
MODEL (
name serving.commodity_metrics,
kind INCREMENTAL_BY_TIME_RANGE (
time_column ingest_date
),
start '2006-08-01',
cron '@daily'
);
/* CTE to calculate country-level derived metrics */
WITH country_metrics AS (
SELECT
commodity_code,
commodity_name,
country_code,
country_name,
market_year,
ingest_date,
Production,
Imports,
Exports,
Total_Distribution,
Ending_Stocks,
(
Production + Imports - Exports
) AS Net_Supply, /* Derived metrics per country, mirroring Python script */
(
Exports - Imports
) AS Trade_Balance,
(
Production + Imports - Exports
) - Total_Distribution AS Supply_Demand_Balance,
(
Ending_Stocks / NULLIF(Total_Distribution, 0)
) /* Handle division by zero for Stock-to-Use Ratio */ * 100 AS Stock_to_Use_Ratio_pct,
(
Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY market_year, ingest_date)
) /* Calculate Production YoY percentage change using a window function */ / NULLIF(
LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY market_year, ingest_date),
0
) * 100 AS Production_YoY_pct
FROM cleaned.psdalldata__commodity_pivoted
), global_aggregates AS (
SELECT
commodity_code,
commodity_name,
NULL::TEXT AS country_code, /* Use NULL for global aggregates */
'Global' AS country_name,
market_year,
ingest_date,
SUM(Production) AS Production,
SUM(Imports) AS Imports,
SUM(Exports) AS Exports,
SUM(Total_Distribution) AS Total_Distribution,
SUM(Ending_Stocks) AS Ending_Stocks
FROM cleaned.psdalldata__commodity_pivoted
GROUP BY
commodity_code,
commodity_name,
market_year,
ingest_date
), global_metrics /* CTE to calculate derived metrics for global aggregates */ AS (
SELECT
commodity_code,
commodity_name,
country_code,
country_name,
market_year,
ingest_date,
Production,
Imports,
Exports,
Total_Distribution,
Ending_Stocks,
(
Production + Imports - Exports
) AS Net_Supply,
(
Exports - Imports
) AS Trade_Balance,
(
Production + Imports - Exports
) - Total_Distribution AS Supply_Demand_Balance,
(
Ending_Stocks / NULLIF(Total_Distribution, 0)
) * 100 AS Stock_to_Use_Ratio_pct,
(
Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY market_year, ingest_date)
) / NULLIF(
LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY market_year, ingest_date),
0
) * 100 AS Production_YoY_pct
FROM global_aggregates
)
/* Combine country-level and global-level data into a single output */
SELECT
commodity_code,
commodity_name,
country_code,
country_name,
market_year,
ingest_date,
Production,
Imports,
Exports,
Total_Distribution,
Ending_Stocks,
Net_Supply,
Trade_Balance,
Supply_Demand_Balance,
Stock_to_Use_Ratio_pct,
Production_YoY_pct
FROM (
SELECT
*
FROM country_metrics
UNION ALL
SELECT
*
FROM global_metrics
) AS combined_data
ORDER BY
commodity_name,
country_name,
market_year,
ingest_date

View File

@@ -0,0 +1,148 @@
/* Serving mart: COT positioning for Coffee C futures, analytics-ready. */ /* Joins foundation.fct_cot_positioning with foundation.dim_commodity so */ /* the coffee filter is driven by the dimension (not a hardcoded CFTC code). */ /* Adds derived analytics used by the dashboard and API: */ /* - Normalized positioning (% of open interest) */ /* - Long/short ratio */ /* - Week-over-week momentum */ /* - COT Index over 26-week and 52-week trailing windows (0=bearish, 100=bullish) */ /* Grain: one row per report_date for Coffee C futures. */ /* Latest revision per date: MAX(ingest_date) used to deduplicate CFTC corrections. */
MODEL (
name serving.cot_positioning,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (
report_date
),
start '2006-06-13',
cron '@daily'
);
WITH latest_revision AS (
/* Pick the most recently ingested row when CFTC issues corrections */
SELECT
f.*
FROM foundation.fct_cot_positioning AS f
INNER JOIN foundation.dim_commodity AS d
ON f.cftc_commodity_code = d.cftc_commodity_code
WHERE
d.commodity_name = 'Coffee, Green'
AND f.report_type = 'FutOnly'
AND f.report_date BETWEEN @start_ds AND @end_ds
QUALIFY
ROW_NUMBER() OVER (
PARTITION BY f.report_date, f.cftc_contract_market_code
ORDER BY f.ingest_date DESC
) = 1
), with_derived AS (
SELECT
report_date,
market_and_exchange_name,
cftc_commodity_code,
cftc_contract_market_code,
contract_units,
ingest_date,
open_interest, /* Absolute positions (contracts) */
managed_money_long,
managed_money_short,
managed_money_spread,
managed_money_net,
prod_merc_long,
prod_merc_short,
prod_merc_net,
swap_long,
swap_short,
swap_spread,
swap_net,
other_reportable_long,
other_reportable_short,
other_reportable_spread,
other_reportable_net,
nonreportable_long,
nonreportable_short,
nonreportable_net,
ROUND(managed_money_net::REAL / NULLIF(open_interest, 0) * 100, 2) AS managed_money_net_pct_of_oi, /* Normalized: managed money net as % of open interest */ /* Removes size effects and makes cross-period comparison meaningful */
ROUND(managed_money_long::REAL / NULLIF(managed_money_short, 0), 3) AS managed_money_long_short_ratio, /* Long/short ratio: >1 = more bulls than bears in managed money */
change_open_interest, /* Weekly changes */
change_managed_money_long,
change_managed_money_short,
change_managed_money_net,
change_prod_merc_long,
change_prod_merc_short,
managed_money_net /* Week-over-week momentum in managed money net (via LAG) */ - LAG(managed_money_net, 1) OVER (ORDER BY report_date) AS managed_money_net_wow,
concentration_top4_long_pct, /* Concentration */
concentration_top4_short_pct,
concentration_top8_long_pct,
concentration_top8_short_pct,
traders_total, /* Trader counts */
traders_managed_money_long,
traders_managed_money_short,
traders_managed_money_spread,
CASE
WHEN MAX(managed_money_net) OVER w26 = MIN(managed_money_net) OVER w26
THEN 50.0
ELSE ROUND(
(
managed_money_net - MIN(managed_money_net) OVER w26
)::REAL / (
MAX(managed_money_net) OVER w26 - MIN(managed_money_net) OVER w26
) * 100,
1
)
END AS cot_index_26w, /* COT Index (26-week): where is current net vs. trailing 26 weeks? */ /* 0 = most bearish extreme, 100 = most bullish extreme */ /* Industry-standard sentiment gauge (equivalent to RSI for positioning) */
CASE
WHEN MAX(managed_money_net) OVER w52 = MIN(managed_money_net) OVER w52
THEN 50.0
ELSE ROUND(
(
managed_money_net - MIN(managed_money_net) OVER w52
)::REAL / (
MAX(managed_money_net) OVER w52 - MIN(managed_money_net) OVER w52
) * 100,
1
)
END AS cot_index_52w /* COT Index (52-week): longer-term positioning context */
FROM latest_revision
WINDOW w26 AS (ORDER BY report_date ROWS BETWEEN 25 PRECEDING AND CURRENT ROW), w52 AS (ORDER BY report_date ROWS BETWEEN 51 PRECEDING AND CURRENT ROW)
)
SELECT
report_date,
market_and_exchange_name,
cftc_commodity_code,
cftc_contract_market_code,
contract_units,
ingest_date,
open_interest,
managed_money_long,
managed_money_short,
managed_money_spread,
managed_money_net,
prod_merc_long,
prod_merc_short,
prod_merc_net,
swap_long,
swap_short,
swap_spread,
swap_net,
other_reportable_long,
other_reportable_short,
other_reportable_spread,
other_reportable_net,
nonreportable_long,
nonreportable_short,
nonreportable_net,
managed_money_net_pct_of_oi,
managed_money_long_short_ratio,
change_open_interest,
change_managed_money_long,
change_managed_money_short,
change_managed_money_net,
change_prod_merc_long,
change_prod_merc_short,
managed_money_net_wow,
concentration_top4_long_pct,
concentration_top4_short_pct,
concentration_top8_long_pct,
concentration_top8_short_pct,
traders_total,
traders_managed_money_long,
traders_managed_money_short,
traders_managed_money_spread,
cot_index_26w,
cot_index_52w
FROM with_derived
ORDER BY
report_date

View File

@@ -0,0 +1,148 @@
/* Serving mart: COT positioning (combined futures+options) for Coffee C futures. */ /* Same analytics as serving.cot_positioning, but filtered to the combined */ /* report variant (FutOnly_or_Combined = 'Combined'). Positions include */ /* options delta-equivalent exposure, showing total directional market bet. */ /* Grain: one row per report_date for Coffee C futures. */ /* Latest revision per date: MAX(ingest_date) used to deduplicate CFTC corrections. */
MODEL (
name serving.cot_positioning_combined,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (
report_date
),
start '2006-06-13',
cron '@daily'
);
WITH latest_revision AS (
/* Pick the most recently ingested row when CFTC issues corrections */
SELECT
f.*
FROM foundation.fct_cot_positioning AS f
INNER JOIN foundation.dim_commodity AS d
ON f.cftc_commodity_code = d.cftc_commodity_code
WHERE
d.commodity_name = 'Coffee, Green'
AND f.report_type = 'Combined'
AND f.report_date BETWEEN @start_ds AND @end_ds
QUALIFY
ROW_NUMBER() OVER (
PARTITION BY f.report_date, f.cftc_contract_market_code
ORDER BY f.ingest_date DESC
) = 1
), with_derived AS (
SELECT
report_date,
market_and_exchange_name,
cftc_commodity_code,
cftc_contract_market_code,
contract_units,
ingest_date,
open_interest, /* Absolute positions (contracts, delta-equivalent for options) */
managed_money_long,
managed_money_short,
managed_money_spread,
managed_money_net,
prod_merc_long,
prod_merc_short,
prod_merc_net,
swap_long,
swap_short,
swap_spread,
swap_net,
other_reportable_long,
other_reportable_short,
other_reportable_spread,
other_reportable_net,
nonreportable_long,
nonreportable_short,
nonreportable_net,
ROUND(managed_money_net::REAL / NULLIF(open_interest, 0) * 100, 2) AS managed_money_net_pct_of_oi, /* Normalized: managed money net as % of open interest */ /* Removes size effects and makes cross-period comparison meaningful */
ROUND(managed_money_long::REAL / NULLIF(managed_money_short, 0), 3) AS managed_money_long_short_ratio, /* Long/short ratio: >1 = more bulls than bears in managed money */
change_open_interest, /* Weekly changes */
change_managed_money_long,
change_managed_money_short,
change_managed_money_net,
change_prod_merc_long,
change_prod_merc_short,
managed_money_net /* Week-over-week momentum in managed money net (via LAG) */ - LAG(managed_money_net, 1) OVER (ORDER BY report_date) AS managed_money_net_wow,
concentration_top4_long_pct, /* Concentration */
concentration_top4_short_pct,
concentration_top8_long_pct,
concentration_top8_short_pct,
traders_total, /* Trader counts */
traders_managed_money_long,
traders_managed_money_short,
traders_managed_money_spread,
CASE
WHEN MAX(managed_money_net) OVER w26 = MIN(managed_money_net) OVER w26
THEN 50.0
ELSE ROUND(
(
managed_money_net - MIN(managed_money_net) OVER w26
)::REAL / (
MAX(managed_money_net) OVER w26 - MIN(managed_money_net) OVER w26
) * 100,
1
)
END AS cot_index_26w, /* COT Index (26-week): where is current net vs. trailing 26 weeks? */ /* 0 = most bearish extreme, 100 = most bullish extreme */ /* Includes options delta-equivalent exposure */
CASE
WHEN MAX(managed_money_net) OVER w52 = MIN(managed_money_net) OVER w52
THEN 50.0
ELSE ROUND(
(
managed_money_net - MIN(managed_money_net) OVER w52
)::REAL / (
MAX(managed_money_net) OVER w52 - MIN(managed_money_net) OVER w52
) * 100,
1
)
END AS cot_index_52w /* COT Index (52-week): longer-term positioning context */
FROM latest_revision
WINDOW w26 AS (ORDER BY report_date ROWS BETWEEN 25 PRECEDING AND CURRENT ROW), w52 AS (ORDER BY report_date ROWS BETWEEN 51 PRECEDING AND CURRENT ROW)
)
SELECT
report_date,
market_and_exchange_name,
cftc_commodity_code,
cftc_contract_market_code,
contract_units,
ingest_date,
open_interest,
managed_money_long,
managed_money_short,
managed_money_spread,
managed_money_net,
prod_merc_long,
prod_merc_short,
prod_merc_net,
swap_long,
swap_short,
swap_spread,
swap_net,
other_reportable_long,
other_reportable_short,
other_reportable_spread,
other_reportable_net,
nonreportable_long,
nonreportable_short,
nonreportable_net,
managed_money_net_pct_of_oi,
managed_money_long_short_ratio,
change_open_interest,
change_managed_money_long,
change_managed_money_short,
change_managed_money_net,
change_prod_merc_long,
change_prod_merc_short,
managed_money_net_wow,
concentration_top4_long_pct,
concentration_top4_short_pct,
concentration_top8_long_pct,
concentration_top8_short_pct,
traders_total,
traders_managed_money_long,
traders_managed_money_short,
traders_managed_money_spread,
cot_index_26w,
cot_index_52w
FROM with_derived
ORDER BY
report_date

View File

@@ -0,0 +1,187 @@
/* Serving mart: daily weather analytics for 12 coffee-growing regions. */
/* Source: foundation.fct_weather_daily (already has seed join for location metadata). */
/* Adds rolling aggregates, water balance, gaps-and-islands streak counters, */
/* and a composite crop stress index (0100) as a single severity gauge. */
/* Grain: (location_id, observation_date) */
/* Lookback 90: rolling windows reach up to 30 days, streak counters can extend */
/* up to ~90 days; without lookback a daily run sees only 1 row and all window */
/* functions degrade to single-row values. */
MODEL (
name serving.weather_daily,
kind INCREMENTAL_BY_TIME_RANGE (
time_column observation_date,
lookback 90
),
grain (location_id, observation_date),
start '2020-01-01',
cron '@daily'
);
WITH base AS (
SELECT
observation_date,
location_id,
location_name,
country,
lat,
lon,
variety,
temp_min_c,
temp_max_c,
temp_mean_c,
precipitation_mm,
humidity_max_pct,
cloud_cover_mean_pct,
wind_max_speed_ms,
et0_mm,
vpd_max_kpa,
is_frost,
is_heat_stress,
is_drought,
is_high_vpd,
in_growing_season,
/* Rolling precipitation — w7 = trailing 7 days, w30 = trailing 30 days */
SUM(precipitation_mm) OVER w7 AS precip_sum_7d_mm,
SUM(precipitation_mm) OVER w30 AS precip_sum_30d_mm,
/* Rolling temperature baseline */
AVG(temp_mean_c) OVER w30 AS temp_mean_30d_c,
/* Temperature anomaly: today vs trailing 30-day mean */
temp_mean_c - AVG(temp_mean_c) OVER w30 AS temp_anomaly_c,
/* Water balance: net daily water gain/loss (precipitation minus evapotranspiration) */
precipitation_mm - et0_mm AS water_balance_mm,
SUM(precipitation_mm - et0_mm) OVER w7 AS water_balance_7d_mm,
/* Gaps-and-islands group markers for streak counting. */
/* Pattern: ROW_NUMBER() - running_count_of_true creates a stable group ID */
/* for each consecutive run of TRUE. Rows where flag=FALSE get a unique group ID */
/* (so their streak length stays 0 after the CASE in with_streaks). */
ROW_NUMBER() OVER (
PARTITION BY location_id
ORDER BY observation_date
) - SUM(
CASE WHEN is_drought THEN 1 ELSE 0 END
) OVER (
PARTITION BY location_id
ORDER BY observation_date
ROWS UNBOUNDED PRECEDING
) AS _drought_group,
ROW_NUMBER() OVER (
PARTITION BY location_id
ORDER BY observation_date
) - SUM(
CASE WHEN is_heat_stress THEN 1 ELSE 0 END
) OVER (
PARTITION BY location_id
ORDER BY observation_date
ROWS UNBOUNDED PRECEDING
) AS _heat_group,
ROW_NUMBER() OVER (
PARTITION BY location_id
ORDER BY observation_date
) - SUM(
CASE WHEN is_high_vpd THEN 1 ELSE 0 END
) OVER (
PARTITION BY location_id
ORDER BY observation_date
ROWS UNBOUNDED PRECEDING
) AS _vpd_group
FROM foundation.fct_weather_daily
WHERE
observation_date BETWEEN @start_ds AND @end_ds
WINDOW
w7 AS (
PARTITION BY location_id
ORDER BY observation_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
),
w30 AS (
PARTITION BY location_id
ORDER BY observation_date
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
)
), with_streaks AS (
SELECT
base.*,
/* Drought streak: number of consecutive dry days ending on observation_date. */
/* Returns 0 when flag is FALSE (not a drought day). */
CASE
WHEN NOT is_drought
THEN 0
ELSE ROW_NUMBER() OVER (
PARTITION BY location_id, _drought_group
ORDER BY observation_date
)
END AS drought_streak_days,
/* Heat stress streak: consecutive days with temp_max > 35°C */
CASE
WHEN NOT is_heat_stress
THEN 0
ELSE ROW_NUMBER() OVER (
PARTITION BY location_id, _heat_group
ORDER BY observation_date
)
END AS heat_streak_days,
/* VPD stress streak: consecutive days with vpd_max > 1.5 kPa */
CASE
WHEN NOT is_high_vpd
THEN 0
ELSE ROW_NUMBER() OVER (
PARTITION BY location_id, _vpd_group
ORDER BY observation_date
)
END AS vpd_streak_days
FROM base
)
SELECT
observation_date,
location_id,
location_name,
country,
lat,
lon,
variety,
temp_min_c,
temp_max_c,
temp_mean_c,
precipitation_mm,
humidity_max_pct,
cloud_cover_mean_pct,
wind_max_speed_ms,
et0_mm,
vpd_max_kpa,
is_frost,
is_heat_stress,
is_drought,
is_high_vpd,
in_growing_season,
ROUND(precip_sum_7d_mm, 2) AS precip_sum_7d_mm,
ROUND(precip_sum_30d_mm, 2) AS precip_sum_30d_mm,
ROUND(temp_mean_30d_c, 2) AS temp_mean_30d_c,
ROUND(temp_anomaly_c, 2) AS temp_anomaly_c,
ROUND(water_balance_mm, 2) AS water_balance_mm,
ROUND(water_balance_7d_mm, 2) AS water_balance_7d_mm,
drought_streak_days,
heat_streak_days,
vpd_streak_days,
/* Composite crop stress index (0100).
Weights: drought streak 30%, water deficit 25%, heat streak 20%,
VPD streak 15%, frost (binary) 10%.
Each component is normalized to [0,1] then capped before weighting:
drought: 14 days = fully stressed
water: 20mm 7d deficit = fully stressed
heat: 7 days = fully stressed
vpd: 7 days = fully stressed
frost: binary (Arabica highland catastrophic event) */
ROUND(
GREATEST(0.0, LEAST(100.0,
LEAST(1.0, drought_streak_days / 14.0) * 30.0
+ LEAST(1.0, GREATEST(0.0, -water_balance_7d_mm) / 20.0) * 25.0
+ LEAST(1.0, heat_streak_days / 7.0) * 20.0
+ LEAST(1.0, vpd_streak_days / 7.0) * 15.0
+ CASE WHEN is_frost THEN 10.0 ELSE 0.0 END
)),
1
) AS crop_stress_index
FROM with_streaks
ORDER BY
location_id,
observation_date