ICE aging + by-port: serving models, API endpoints, dashboard integration

- serving/ice_aging_stocks.sql: pass-through from foundation, parses age
  bucket string to start/end days ints for correct sort order
- serving/ice_warehouse_stocks_by_port.sql: monthly by-port since 1996,
  adds MoM change, MoM %, 12-month rolling average
- analytics.py: get_ice_aging_latest(), get_ice_aging_trend(),
  get_ice_stocks_by_port_trend(), get_ice_stocks_by_port_latest()
- api/routes.py: GET /commodities/<code>/stocks/aging and
  GET /commodities/<code>/stocks/by-port with auth + rate limiting
- dashboard/routes.py: add 3 new queries to asyncio.gather(), pass to template
- index.html: aging stacked bar chart (age buckets × port) with 4 metric
  cards; by-port stacked area chart (30-year history) with 4 metric cards

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-21 21:52:35 +01:00
parent ff7301d6a8
commit ff956b0138
6 changed files with 395 additions and 1 deletions

View File

@@ -0,0 +1,60 @@
-- Serving mart: ICE certified Coffee C stock aging report, analytics-ready.
--
-- Shows the age distribution of certified stocks across delivery ports.
-- Age buckets represent how long coffee has been in certified storage.
-- Older stock approaching certificate limits is a supply quality signal.
--
-- Source: ICE Certified Stock Aging Report (monthly)
-- Grain: one row per (report_date, age_bucket).
MODEL (
name serving.ice_aging_stocks,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (report_date, age_bucket),
start '2020-01-01',
cron '@daily'
);
WITH base AS (
SELECT
f.report_date,
f.age_bucket,
-- Parse age range from "0000 to 0120" format for correct sort order
TRY_CAST(split_part(f.age_bucket, ' to ', 1) AS int) AS age_bucket_start_days,
TRY_CAST(split_part(f.age_bucket, ' to ', 2) AS int) AS age_bucket_end_days,
f.antwerp_bags,
f.hamburg_bremen_bags,
f.houston_bags,
f.miami_bags,
f.new_orleans_bags,
f.new_york_bags,
f.total_bags,
f.source_file
FROM foundation.fct_ice_aging_stocks f
WHERE f.report_date BETWEEN @start_ds AND @end_ds
)
SELECT
b.report_date,
d.commodity_name,
d.ice_stock_report_code,
b.age_bucket,
b.age_bucket_start_days,
b.age_bucket_end_days,
b.antwerp_bags,
b.hamburg_bremen_bags,
b.houston_bags,
b.miami_bags,
b.new_orleans_bags,
b.new_york_bags,
b.total_bags,
b.source_file
FROM base b
CROSS JOIN foundation.dim_commodity d
WHERE d.ice_stock_report_code = 'COFFEE-C'
ORDER BY b.report_date, b.age_bucket_start_days

View File

@@ -0,0 +1,78 @@
-- Serving mart: ICE certified Coffee C warehouse stocks by port, analytics-ready.
--
-- End-of-month certified stock levels broken down by delivery port.
-- Covers November 1996 to present (~30 years). Useful for understanding
-- geographic shifts in the certified supply base over time.
--
-- Source: ICE historical by-port XLS (EOM_KC_cert_stox_by_port_nov96-present.xls)
-- Grain: one row per report_date (end-of-month).
MODEL (
name serving.ice_warehouse_stocks_by_port,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (report_date),
start '1996-11-01',
cron '@daily'
);
WITH base AS (
SELECT
f.report_date,
f.new_york_bags,
f.new_orleans_bags,
f.houston_bags,
f.miami_bags,
f.antwerp_bags,
f.hamburg_bremen_bags,
f.barcelona_bags,
f.virginia_bags,
f.total_bags,
-- Month-over-month change in total certified bags
f.total_bags
- LAG(f.total_bags, 1) OVER (ORDER BY f.report_date) AS mom_change_bags,
-- Month-over-month percent change
round(
(f.total_bags::double
- LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::double)
/ NULLIF(LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::double, 0) * 100,
2
) AS mom_change_pct,
-- 12-month rolling average
round(
AVG(f.total_bags::double) OVER (
ORDER BY f.report_date ROWS BETWEEN 11 PRECEDING AND CURRENT ROW
),
0
) AS avg_12m_bags,
f.source_file
FROM foundation.fct_ice_warehouse_stocks_by_port f
WHERE f.report_date BETWEEN @start_ds AND @end_ds
)
SELECT
b.report_date,
d.commodity_name,
d.ice_stock_report_code,
b.new_york_bags,
b.new_orleans_bags,
b.houston_bags,
b.miami_bags,
b.antwerp_bags,
b.hamburg_bremen_bags,
b.barcelona_bags,
b.virginia_bags,
b.total_bags,
b.mom_change_bags,
b.mom_change_pct,
b.avg_12m_bags,
b.source_file
FROM base b
CROSS JOIN foundation.dim_commodity d
WHERE d.ice_stock_report_code = 'COFFEE-C'
ORDER BY b.report_date

View File

@@ -389,6 +389,72 @@ async def get_ice_stocks_latest() -> dict | None:
return rows[0] if rows else None
async def get_ice_aging_latest() -> list[dict]:
"""All age buckets for the most recent ICE aging report, sorted youngest first."""
return await fetch_analytics(
"""
SELECT report_date, age_bucket, age_bucket_start_days, age_bucket_end_days,
antwerp_bags, hamburg_bremen_bags, houston_bags, miami_bags,
new_orleans_bags, new_york_bags, total_bags
FROM serving.ice_aging_stocks
WHERE report_date = (SELECT MAX(report_date) FROM serving.ice_aging_stocks)
ORDER BY age_bucket_start_days
"""
)
async def get_ice_aging_trend(reports: int = 6) -> list[dict]:
"""Age bucket distribution for the last N aging reports, sorted oldest report first."""
assert 1 <= reports <= 12, "reports must be between 1 and 12"
return await fetch_analytics(
"""
SELECT report_date, age_bucket, age_bucket_start_days, age_bucket_end_days,
antwerp_bags, hamburg_bremen_bags, houston_bags, miami_bags,
new_orleans_bags, new_york_bags, total_bags
FROM serving.ice_aging_stocks
WHERE report_date IN (
SELECT DISTINCT report_date
FROM serving.ice_aging_stocks
ORDER BY report_date DESC
LIMIT ?
)
ORDER BY report_date, age_bucket_start_days
""",
[reports],
)
async def get_ice_stocks_by_port_trend(months: int = 120) -> list[dict]:
"""Monthly by-port warehouse stocks over the trailing N months."""
assert 1 <= months <= 360, "months must be between 1 and 360"
return await fetch_analytics(
"""
SELECT report_date, new_york_bags, new_orleans_bags, houston_bags, miami_bags,
antwerp_bags, hamburg_bremen_bags, barcelona_bags, virginia_bags,
total_bags, mom_change_bags, mom_change_pct, avg_12m_bags
FROM serving.ice_warehouse_stocks_by_port
ORDER BY report_date DESC
LIMIT ?
""",
[months],
)
async def get_ice_stocks_by_port_latest() -> dict | None:
"""Latest month's by-port warehouse stock breakdown."""
rows = await fetch_analytics(
"""
SELECT report_date, new_york_bags, new_orleans_bags, houston_bags, miami_bags,
antwerp_bags, hamburg_bremen_bags, barcelona_bags, virginia_bags,
total_bags, mom_change_bags, mom_change_pct, avg_12m_bags
FROM serving.ice_warehouse_stocks_by_port
ORDER BY report_date DESC
LIMIT 1
"""
)
return rows[0] if rows else None
async def get_country_comparison(
commodity_code: int,
country_codes: list[str],

View File

@@ -247,6 +247,57 @@ async def commodity_ice_stocks(code: str):
return jsonify({"commodity": code, "data": data})
@bp.route("/commodities/<code>/stocks/aging")
@api_key_required(scopes=["read"])
async def commodity_ice_aging(code: str):
"""ICE certified stock aging report — age distribution across delivery ports.
Query params:
reports — number of monthly snapshots to return (default 1, max 12)
start_date — ISO date filter (YYYY-MM-DD)
end_date — ISO date filter (YYYY-MM-DD)
"""
reports = min(int(request.args.get("reports", 1)), 12)
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
if reports == 1:
data = await analytics.get_ice_aging_latest()
else:
data = await analytics.get_ice_aging_trend(reports=reports)
if start_date:
data = [r for r in data if str(r["report_date"]) >= start_date]
if end_date:
data = [r for r in data if str(r["report_date"]) <= end_date]
return jsonify({"commodity": code, "data": data})
@bp.route("/commodities/<code>/stocks/by-port")
@api_key_required(scopes=["read"])
async def commodity_ice_stocks_by_port(code: str):
"""ICE certified warehouse stocks by delivery port — monthly, back to 1996.
Query params:
months — trailing months (default 120, max 360)
start_date — ISO date filter (YYYY-MM-DD)
end_date — ISO date filter (YYYY-MM-DD)
"""
months = min(int(request.args.get("months", 120)), 360)
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
data = await analytics.get_ice_stocks_by_port_trend(months=months)
if start_date:
data = [r for r in data if str(r["report_date"]) >= start_date]
if end_date:
data = [r for r in data if str(r["report_date"]) <= end_date]
return jsonify({"commodity": code, "data": data})
@bp.route("/commodities/<int:code>/metrics.csv")
@api_key_required(scopes=["read"])
async def commodity_metrics_csv(code: int):

View File

@@ -115,14 +115,18 @@ async def index():
analytics.get_price_latest(analytics.COFFEE_TICKER),
analytics.get_ice_stocks_trend(days=365),
analytics.get_ice_stocks_latest(),
analytics.get_ice_aging_latest(),
analytics.get_ice_stocks_by_port_trend(months=120),
analytics.get_ice_stocks_by_port_latest(),
return_exceptions=True,
)
defaults = [[], [], [], [], [], None, [], [], None, [], None]
defaults = [[], [], [], [], [], None, [], [], None, [], None, [], [], None]
(
time_series, top_producers, stu_trend, balance, yoy,
cot_latest, cot_trend,
price_series, price_latest,
ice_stocks_trend, ice_stocks_latest,
ice_aging_latest, ice_stocks_by_port, ice_stocks_by_port_latest,
) = [
r if not isinstance(r, Exception) else (
current_app.logger.warning("Analytics query %d failed: %s", i, r) or defaults[i]
@@ -134,6 +138,7 @@ async def index():
cot_latest, cot_trend = None, []
price_series, price_latest = [], None
ice_stocks_trend, ice_stocks_latest = [], None
ice_aging_latest, ice_stocks_by_port, ice_stocks_by_port_latest = [], [], None
# Latest global snapshot for key metric cards
latest = time_series[-1] if time_series else {}
@@ -163,6 +168,9 @@ async def index():
price_latest=price_latest,
ice_stocks_trend=ice_stocks_trend,
ice_stocks_latest=ice_stocks_latest,
ice_aging_latest=ice_aging_latest,
ice_stocks_by_port=ice_stocks_by_port,
ice_stocks_by_port_latest=ice_stocks_by_port_latest,
)

View File

@@ -221,6 +221,77 @@
</div>
{% endif %}
<!-- ICE Certified Stock Aging Report -->
{% if ice_aging_latest %}
{% set aging_total = ice_aging_latest | sum(attribute='total_bags') %}
{% set youngest = ice_aging_latest | selectattr('age_bucket_start_days', 'equalto', 0) | list %}
{% set oldest = ice_aging_latest | selectattr('age_bucket_start_days', 'ge', 720) | list %}
{% set youngest_pct = (youngest | sum(attribute='total_bags') / aging_total * 100) if aging_total > 0 else 0 %}
{% set oldest_pct = (oldest | sum(attribute='total_bags') / aging_total * 100) if aging_total > 0 else 0 %}
<div class="chart-container mb-8">
<h2 class="text-xl mb-1">Certified Stock Aging Report</h2>
<p class="text-muted mb-4">Age distribution of certified Arabica stocks by delivery port · as of {{ ice_aging_latest[0].report_date }}</p>
<div class="grid-4 mb-4">
<div class="metric-card">
<div class="metric-label">Total Certified</div>
<div class="metric-value">{{ "{:,.0f}".format(aging_total) }}</div>
<div class="metric-sub">60-kg bags</div>
</div>
<div class="metric-card">
<div class="metric-label">Young Stock (0120 days)</div>
<div class="metric-value text-green">{{ "{:.1f}%".format(youngest_pct) }}</div>
<div class="metric-sub">freshest certified supply</div>
</div>
<div class="metric-card">
<div class="metric-label">Old Stock (720+ days)</div>
<div class="metric-value {% if oldest_pct > 20 %}text-red{% endif %}">{{ "{:.1f}%".format(oldest_pct) }}</div>
<div class="metric-sub">at risk of cert expiry</div>
</div>
<div class="metric-card">
<div class="metric-label">Age Buckets</div>
<div class="metric-value">{{ ice_aging_latest | length }}</div>
<div class="metric-sub">tracking ranges</div>
</div>
</div>
<canvas id="iceAgingChart"></canvas>
</div>
{% endif %}
<!-- ICE Historical Warehouse Stocks by Port -->
{% if ice_stocks_by_port_latest %}
<div class="chart-container mb-8">
<h2 class="text-xl mb-1">Warehouse Stocks by Delivery Port</h2>
<p class="text-muted mb-4">End-of-month certified stocks at each ICE delivery port · Nov 1996 to present · as of {{ ice_stocks_by_port_latest.report_date }}</p>
<div class="grid-4 mb-4">
<div class="metric-card">
<div class="metric-label">Total Certified</div>
<div class="metric-value">{{ "{:,.0f}".format(ice_stocks_by_port_latest.total_bags) }}</div>
<div class="metric-sub">60-kg bags (latest month)</div>
</div>
<div class="metric-card">
<div class="metric-label">Month-over-Month</div>
<div class="metric-value {% if ice_stocks_by_port_latest.mom_change_bags and ice_stocks_by_port_latest.mom_change_bags > 0 %}text-green{% elif ice_stocks_by_port_latest.mom_change_bags and ice_stocks_by_port_latest.mom_change_bags < 0 %}text-red{% endif %}">
{% if ice_stocks_by_port_latest.mom_change_bags is not none %}
{{ "{:+,d}".format(ice_stocks_by_port_latest.mom_change_bags | int) }}
{% else %}--{% endif %}
</div>
<div class="metric-sub">bags vs prior month</div>
</div>
<div class="metric-card">
<div class="metric-label">12-Month Average</div>
<div class="metric-value">{{ "{:,.0f}".format(ice_stocks_by_port_latest.avg_12m_bags) }}</div>
<div class="metric-sub">60-kg bags</div>
</div>
<div class="metric-card">
<div class="metric-label">History</div>
<div class="metric-value">30 yrs</div>
<div class="metric-sub">since Nov 1996</div>
</div>
</div>
<canvas id="iceByPortChart"></canvas>
</div>
{% endif %}
<!-- Quick Actions -->
<div class="grid-3">
<a href="{{ url_for('dashboard.countries') }}" class="btn-outline text-center">Country Comparison</a>
@@ -449,6 +520,66 @@ if (iceRaw && iceRaw.length > 0) {
});
}
// -- ICE Aging Report Chart --
const agingRaw = {{ ice_aging_latest | tojson }};
if (agingRaw && agingRaw.length > 0) {
const ports = ['antwerp_bags', 'hamburg_bremen_bags', 'houston_bags', 'miami_bags', 'new_orleans_bags', 'new_york_bags'];
const portLabels = ['Antwerp', 'Hamburg/Bremen', 'Houston', 'Miami', 'New Orleans', 'New York'];
new Chart(document.getElementById('iceAgingChart'), {
type: 'bar',
data: {
labels: agingRaw.map(r => r.age_bucket),
datasets: ports.map((col, i) => ({
label: portLabels[i],
data: agingRaw.map(r => r[col] || 0),
backgroundColor: CHART_PALETTE[i],
borderWidth: 0,
}))
},
options: {
responsive: true,
interaction: {mode: 'index', intersect: false},
plugins: {legend: {position: 'bottom'}},
scales: {
x: {stacked: true, ticks: {maxRotation: 45}},
y: {stacked: true, title: {display: true, text: '60-kg bags'}, beginAtZero: true}
}
}
});
}
// -- ICE Historical Stocks by Port Chart --
const byPortRaw = {{ ice_stocks_by_port | tojson }};
if (byPortRaw && byPortRaw.length > 0) {
const byPortData = [...byPortRaw].reverse();
const portCols = ['antwerp_bags', 'hamburg_bremen_bags', 'new_york_bags', 'new_orleans_bags', 'houston_bags', 'miami_bags', 'barcelona_bags', 'virginia_bags'];
const portNames = ['Antwerp', 'Hamburg/Bremen', 'New York', 'New Orleans', 'Houston', 'Miami', 'Barcelona', 'Virginia'];
new Chart(document.getElementById('iceByPortChart'), {
type: 'line',
data: {
labels: byPortData.map(r => r.report_date),
datasets: portCols.map((col, i) => ({
label: portNames[i],
data: byPortData.map(r => r[col] || 0),
borderColor: CHART_PALETTE[i],
backgroundColor: CHART_PALETTE[i] + '22',
fill: true,
tension: 0.2,
pointRadius: 0,
}))
},
options: {
responsive: true,
interaction: {mode: 'index', intersect: false},
plugins: {legend: {position: 'bottom'}},
scales: {
x: {stacked: true, ticks: {maxTicksLimit: 15}},
y: {stacked: true, title: {display: true, text: '60-kg bags'}, beginAtZero: true}
}
}
});
}
// -- Top Producers Horizontal Bar --
const topData = {{ top_producers | tojson }};
if (topData.length > 0) {