diff --git a/transform/sqlmesh_materia/models/serving/ice_aging_stocks.sql b/transform/sqlmesh_materia/models/serving/ice_aging_stocks.sql new file mode 100644 index 0000000..6f3e4d2 --- /dev/null +++ b/transform/sqlmesh_materia/models/serving/ice_aging_stocks.sql @@ -0,0 +1,60 @@ +-- Serving mart: ICE certified Coffee C stock aging report, analytics-ready. +-- +-- Shows the age distribution of certified stocks across delivery ports. +-- Age buckets represent how long coffee has been in certified storage. +-- Older stock approaching certificate limits is a supply quality signal. +-- +-- Source: ICE Certified Stock Aging Report (monthly) +-- Grain: one row per (report_date, age_bucket). + +MODEL ( + name serving.ice_aging_stocks, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column report_date + ), + grain (report_date, age_bucket), + start '2020-01-01', + cron '@daily' +); + +WITH base AS ( + SELECT + f.report_date, + f.age_bucket, + + -- Parse age range from "0000 to 0120" format for correct sort order + TRY_CAST(split_part(f.age_bucket, ' to ', 1) AS int) AS age_bucket_start_days, + TRY_CAST(split_part(f.age_bucket, ' to ', 2) AS int) AS age_bucket_end_days, + + f.antwerp_bags, + f.hamburg_bremen_bags, + f.houston_bags, + f.miami_bags, + f.new_orleans_bags, + f.new_york_bags, + f.total_bags, + + f.source_file + FROM foundation.fct_ice_aging_stocks f + WHERE f.report_date BETWEEN @start_ds AND @end_ds +) + +SELECT + b.report_date, + d.commodity_name, + d.ice_stock_report_code, + b.age_bucket, + b.age_bucket_start_days, + b.age_bucket_end_days, + b.antwerp_bags, + b.hamburg_bremen_bags, + b.houston_bags, + b.miami_bags, + b.new_orleans_bags, + b.new_york_bags, + b.total_bags, + b.source_file +FROM base b +CROSS JOIN foundation.dim_commodity d +WHERE d.ice_stock_report_code = 'COFFEE-C' +ORDER BY b.report_date, b.age_bucket_start_days diff --git a/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks_by_port.sql b/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks_by_port.sql new file mode 100644 index 0000000..1573a5f --- /dev/null +++ b/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks_by_port.sql @@ -0,0 +1,78 @@ +-- Serving mart: ICE certified Coffee C warehouse stocks by port, analytics-ready. +-- +-- End-of-month certified stock levels broken down by delivery port. +-- Covers November 1996 to present (~30 years). Useful for understanding +-- geographic shifts in the certified supply base over time. +-- +-- Source: ICE historical by-port XLS (EOM_KC_cert_stox_by_port_nov96-present.xls) +-- Grain: one row per report_date (end-of-month). + +MODEL ( + name serving.ice_warehouse_stocks_by_port, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column report_date + ), + grain (report_date), + start '1996-11-01', + cron '@daily' +); + +WITH base AS ( + SELECT + f.report_date, + f.new_york_bags, + f.new_orleans_bags, + f.houston_bags, + f.miami_bags, + f.antwerp_bags, + f.hamburg_bremen_bags, + f.barcelona_bags, + f.virginia_bags, + f.total_bags, + + -- Month-over-month change in total certified bags + f.total_bags + - LAG(f.total_bags, 1) OVER (ORDER BY f.report_date) AS mom_change_bags, + + -- Month-over-month percent change + round( + (f.total_bags::double + - LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::double) + / NULLIF(LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::double, 0) * 100, + 2 + ) AS mom_change_pct, + + -- 12-month rolling average + round( + AVG(f.total_bags::double) OVER ( + ORDER BY f.report_date ROWS BETWEEN 11 PRECEDING AND CURRENT ROW + ), + 0 + ) AS avg_12m_bags, + + f.source_file + FROM foundation.fct_ice_warehouse_stocks_by_port f + WHERE f.report_date BETWEEN @start_ds AND @end_ds +) + +SELECT + b.report_date, + d.commodity_name, + d.ice_stock_report_code, + b.new_york_bags, + b.new_orleans_bags, + b.houston_bags, + b.miami_bags, + b.antwerp_bags, + b.hamburg_bremen_bags, + b.barcelona_bags, + b.virginia_bags, + b.total_bags, + b.mom_change_bags, + b.mom_change_pct, + b.avg_12m_bags, + b.source_file +FROM base b +CROSS JOIN foundation.dim_commodity d +WHERE d.ice_stock_report_code = 'COFFEE-C' +ORDER BY b.report_date diff --git a/web/src/beanflows/analytics.py b/web/src/beanflows/analytics.py index 35d322b..11a75ff 100644 --- a/web/src/beanflows/analytics.py +++ b/web/src/beanflows/analytics.py @@ -389,6 +389,72 @@ async def get_ice_stocks_latest() -> dict | None: return rows[0] if rows else None +async def get_ice_aging_latest() -> list[dict]: + """All age buckets for the most recent ICE aging report, sorted youngest first.""" + return await fetch_analytics( + """ + SELECT report_date, age_bucket, age_bucket_start_days, age_bucket_end_days, + antwerp_bags, hamburg_bremen_bags, houston_bags, miami_bags, + new_orleans_bags, new_york_bags, total_bags + FROM serving.ice_aging_stocks + WHERE report_date = (SELECT MAX(report_date) FROM serving.ice_aging_stocks) + ORDER BY age_bucket_start_days + """ + ) + + +async def get_ice_aging_trend(reports: int = 6) -> list[dict]: + """Age bucket distribution for the last N aging reports, sorted oldest report first.""" + assert 1 <= reports <= 12, "reports must be between 1 and 12" + return await fetch_analytics( + """ + SELECT report_date, age_bucket, age_bucket_start_days, age_bucket_end_days, + antwerp_bags, hamburg_bremen_bags, houston_bags, miami_bags, + new_orleans_bags, new_york_bags, total_bags + FROM serving.ice_aging_stocks + WHERE report_date IN ( + SELECT DISTINCT report_date + FROM serving.ice_aging_stocks + ORDER BY report_date DESC + LIMIT ? + ) + ORDER BY report_date, age_bucket_start_days + """, + [reports], + ) + + +async def get_ice_stocks_by_port_trend(months: int = 120) -> list[dict]: + """Monthly by-port warehouse stocks over the trailing N months.""" + assert 1 <= months <= 360, "months must be between 1 and 360" + return await fetch_analytics( + """ + SELECT report_date, new_york_bags, new_orleans_bags, houston_bags, miami_bags, + antwerp_bags, hamburg_bremen_bags, barcelona_bags, virginia_bags, + total_bags, mom_change_bags, mom_change_pct, avg_12m_bags + FROM serving.ice_warehouse_stocks_by_port + ORDER BY report_date DESC + LIMIT ? + """, + [months], + ) + + +async def get_ice_stocks_by_port_latest() -> dict | None: + """Latest month's by-port warehouse stock breakdown.""" + rows = await fetch_analytics( + """ + SELECT report_date, new_york_bags, new_orleans_bags, houston_bags, miami_bags, + antwerp_bags, hamburg_bremen_bags, barcelona_bags, virginia_bags, + total_bags, mom_change_bags, mom_change_pct, avg_12m_bags + FROM serving.ice_warehouse_stocks_by_port + ORDER BY report_date DESC + LIMIT 1 + """ + ) + return rows[0] if rows else None + + async def get_country_comparison( commodity_code: int, country_codes: list[str], diff --git a/web/src/beanflows/api/routes.py b/web/src/beanflows/api/routes.py index 24c4f39..a5ab769 100644 --- a/web/src/beanflows/api/routes.py +++ b/web/src/beanflows/api/routes.py @@ -247,6 +247,57 @@ async def commodity_ice_stocks(code: str): return jsonify({"commodity": code, "data": data}) +@bp.route("/commodities//stocks/aging") +@api_key_required(scopes=["read"]) +async def commodity_ice_aging(code: str): + """ICE certified stock aging report — age distribution across delivery ports. + + Query params: + reports — number of monthly snapshots to return (default 1, max 12) + start_date — ISO date filter (YYYY-MM-DD) + end_date — ISO date filter (YYYY-MM-DD) + """ + reports = min(int(request.args.get("reports", 1)), 12) + start_date = request.args.get("start_date") + end_date = request.args.get("end_date") + + if reports == 1: + data = await analytics.get_ice_aging_latest() + else: + data = await analytics.get_ice_aging_trend(reports=reports) + + if start_date: + data = [r for r in data if str(r["report_date"]) >= start_date] + if end_date: + data = [r for r in data if str(r["report_date"]) <= end_date] + + return jsonify({"commodity": code, "data": data}) + + +@bp.route("/commodities//stocks/by-port") +@api_key_required(scopes=["read"]) +async def commodity_ice_stocks_by_port(code: str): + """ICE certified warehouse stocks by delivery port — monthly, back to 1996. + + Query params: + months — trailing months (default 120, max 360) + start_date — ISO date filter (YYYY-MM-DD) + end_date — ISO date filter (YYYY-MM-DD) + """ + months = min(int(request.args.get("months", 120)), 360) + start_date = request.args.get("start_date") + end_date = request.args.get("end_date") + + data = await analytics.get_ice_stocks_by_port_trend(months=months) + + if start_date: + data = [r for r in data if str(r["report_date"]) >= start_date] + if end_date: + data = [r for r in data if str(r["report_date"]) <= end_date] + + return jsonify({"commodity": code, "data": data}) + + @bp.route("/commodities//metrics.csv") @api_key_required(scopes=["read"]) async def commodity_metrics_csv(code: int): diff --git a/web/src/beanflows/dashboard/routes.py b/web/src/beanflows/dashboard/routes.py index 61254c7..ca8009d 100644 --- a/web/src/beanflows/dashboard/routes.py +++ b/web/src/beanflows/dashboard/routes.py @@ -115,14 +115,18 @@ async def index(): analytics.get_price_latest(analytics.COFFEE_TICKER), analytics.get_ice_stocks_trend(days=365), analytics.get_ice_stocks_latest(), + analytics.get_ice_aging_latest(), + analytics.get_ice_stocks_by_port_trend(months=120), + analytics.get_ice_stocks_by_port_latest(), return_exceptions=True, ) - defaults = [[], [], [], [], [], None, [], [], None, [], None] + defaults = [[], [], [], [], [], None, [], [], None, [], None, [], [], None] ( time_series, top_producers, stu_trend, balance, yoy, cot_latest, cot_trend, price_series, price_latest, ice_stocks_trend, ice_stocks_latest, + ice_aging_latest, ice_stocks_by_port, ice_stocks_by_port_latest, ) = [ r if not isinstance(r, Exception) else ( current_app.logger.warning("Analytics query %d failed: %s", i, r) or defaults[i] @@ -134,6 +138,7 @@ async def index(): cot_latest, cot_trend = None, [] price_series, price_latest = [], None ice_stocks_trend, ice_stocks_latest = [], None + ice_aging_latest, ice_stocks_by_port, ice_stocks_by_port_latest = [], [], None # Latest global snapshot for key metric cards latest = time_series[-1] if time_series else {} @@ -163,6 +168,9 @@ async def index(): price_latest=price_latest, ice_stocks_trend=ice_stocks_trend, ice_stocks_latest=ice_stocks_latest, + ice_aging_latest=ice_aging_latest, + ice_stocks_by_port=ice_stocks_by_port, + ice_stocks_by_port_latest=ice_stocks_by_port_latest, ) diff --git a/web/src/beanflows/dashboard/templates/index.html b/web/src/beanflows/dashboard/templates/index.html index f4477d3..71a0ce3 100644 --- a/web/src/beanflows/dashboard/templates/index.html +++ b/web/src/beanflows/dashboard/templates/index.html @@ -221,6 +221,77 @@ {% endif %} + + {% if ice_aging_latest %} + {% set aging_total = ice_aging_latest | sum(attribute='total_bags') %} + {% set youngest = ice_aging_latest | selectattr('age_bucket_start_days', 'equalto', 0) | list %} + {% set oldest = ice_aging_latest | selectattr('age_bucket_start_days', 'ge', 720) | list %} + {% set youngest_pct = (youngest | sum(attribute='total_bags') / aging_total * 100) if aging_total > 0 else 0 %} + {% set oldest_pct = (oldest | sum(attribute='total_bags') / aging_total * 100) if aging_total > 0 else 0 %} +
+

Certified Stock Aging Report

+

Age distribution of certified Arabica stocks by delivery port · as of {{ ice_aging_latest[0].report_date }}

+
+
+
Total Certified
+
{{ "{:,.0f}".format(aging_total) }}
+
60-kg bags
+
+
+
Young Stock (0–120 days)
+
{{ "{:.1f}%".format(youngest_pct) }}
+
freshest certified supply
+
+
+
Old Stock (720+ days)
+
{{ "{:.1f}%".format(oldest_pct) }}
+
at risk of cert expiry
+
+
+
Age Buckets
+
{{ ice_aging_latest | length }}
+
tracking ranges
+
+
+ +
+ {% endif %} + + + {% if ice_stocks_by_port_latest %} +
+

Warehouse Stocks by Delivery Port

+

End-of-month certified stocks at each ICE delivery port · Nov 1996 to present · as of {{ ice_stocks_by_port_latest.report_date }}

+
+
+
Total Certified
+
{{ "{:,.0f}".format(ice_stocks_by_port_latest.total_bags) }}
+
60-kg bags (latest month)
+
+
+
Month-over-Month
+
+ {% if ice_stocks_by_port_latest.mom_change_bags is not none %} + {{ "{:+,d}".format(ice_stocks_by_port_latest.mom_change_bags | int) }} + {% else %}--{% endif %} +
+
bags vs prior month
+
+
+
12-Month Average
+
{{ "{:,.0f}".format(ice_stocks_by_port_latest.avg_12m_bags) }}
+
60-kg bags
+
+
+
History
+
30 yrs
+
since Nov 1996
+
+
+ +
+ {% endif %} +
Country Comparison @@ -449,6 +520,66 @@ if (iceRaw && iceRaw.length > 0) { }); } +// -- ICE Aging Report Chart -- +const agingRaw = {{ ice_aging_latest | tojson }}; +if (agingRaw && agingRaw.length > 0) { + const ports = ['antwerp_bags', 'hamburg_bremen_bags', 'houston_bags', 'miami_bags', 'new_orleans_bags', 'new_york_bags']; + const portLabels = ['Antwerp', 'Hamburg/Bremen', 'Houston', 'Miami', 'New Orleans', 'New York']; + new Chart(document.getElementById('iceAgingChart'), { + type: 'bar', + data: { + labels: agingRaw.map(r => r.age_bucket), + datasets: ports.map((col, i) => ({ + label: portLabels[i], + data: agingRaw.map(r => r[col] || 0), + backgroundColor: CHART_PALETTE[i], + borderWidth: 0, + })) + }, + options: { + responsive: true, + interaction: {mode: 'index', intersect: false}, + plugins: {legend: {position: 'bottom'}}, + scales: { + x: {stacked: true, ticks: {maxRotation: 45}}, + y: {stacked: true, title: {display: true, text: '60-kg bags'}, beginAtZero: true} + } + } + }); +} + +// -- ICE Historical Stocks by Port Chart -- +const byPortRaw = {{ ice_stocks_by_port | tojson }}; +if (byPortRaw && byPortRaw.length > 0) { + const byPortData = [...byPortRaw].reverse(); + const portCols = ['antwerp_bags', 'hamburg_bremen_bags', 'new_york_bags', 'new_orleans_bags', 'houston_bags', 'miami_bags', 'barcelona_bags', 'virginia_bags']; + const portNames = ['Antwerp', 'Hamburg/Bremen', 'New York', 'New Orleans', 'Houston', 'Miami', 'Barcelona', 'Virginia']; + new Chart(document.getElementById('iceByPortChart'), { + type: 'line', + data: { + labels: byPortData.map(r => r.report_date), + datasets: portCols.map((col, i) => ({ + label: portNames[i], + data: byPortData.map(r => r[col] || 0), + borderColor: CHART_PALETTE[i], + backgroundColor: CHART_PALETTE[i] + '22', + fill: true, + tension: 0.2, + pointRadius: 0, + })) + }, + options: { + responsive: true, + interaction: {mode: 'index', intersect: false}, + plugins: {legend: {position: 'bottom'}}, + scales: { + x: {stacked: true, ticks: {maxTicksLimit: 15}}, + y: {stacked: true, title: {display: true, text: '60-kg bags'}, beginAtZero: true} + } + } + }); +} + // -- Top Producers Horizontal Bar -- const topData = {{ top_producers | tojson }}; if (topData.length > 0) {