diff --git a/web/src/beanflows/analytics.py b/web/src/beanflows/analytics.py index 409b309..e0e529f 100644 --- a/web/src/beanflows/analytics.py +++ b/web/src/beanflows/analytics.py @@ -302,17 +302,23 @@ def _validate_cot_metrics(metrics: list[str]) -> list[str]: return valid +def _cot_table(combined: bool) -> str: + return "serving.cot_positioning_combined" if combined else "serving.cot_positioning" + + async def get_cot_positioning_time_series( cftc_commodity_code: str, metrics: list[str], start_date: str | None = None, end_date: str | None = None, limit: int = 520, + combined: bool = False, ) -> list[dict]: """Weekly COT positioning time series. limit defaults to ~10 years of weekly data.""" assert 1 <= limit <= 2000, "limit must be between 1 and 2000" metrics = _validate_cot_metrics(metrics) cols = ", ".join(metrics) + table = _cot_table(combined) where_parts = ["cftc_commodity_code = ?"] params: list = [cftc_commodity_code] @@ -329,7 +335,7 @@ async def get_cot_positioning_time_series( return await fetch_analytics( f""" SELECT report_date, {cols} - FROM serving.cot_positioning + FROM {table} WHERE {where_clause} ORDER BY report_date ASC LIMIT ? @@ -338,12 +344,13 @@ async def get_cot_positioning_time_series( ) -async def get_cot_positioning_latest(cftc_commodity_code: str) -> dict | None: +async def get_cot_positioning_latest(cftc_commodity_code: str, combined: bool = False) -> dict | None: """Latest week's full COT positioning snapshot.""" + table = _cot_table(combined) rows = await fetch_analytics( - """ + f""" SELECT * - FROM serving.cot_positioning + FROM {table} WHERE cftc_commodity_code = ? ORDER BY report_date DESC LIMIT 1 @@ -356,14 +363,16 @@ async def get_cot_positioning_latest(cftc_commodity_code: str) -> dict | None: async def get_cot_index_trend( cftc_commodity_code: str, weeks: int = 104, + combined: bool = False, ) -> list[dict]: """COT Index time series (26w and 52w) for the trailing N weeks.""" assert 1 <= weeks <= 1040, "weeks must be between 1 and 1040" + table = _cot_table(combined) return await fetch_analytics( - """ + f""" SELECT report_date, cot_index_26w, cot_index_52w, managed_money_net, managed_money_net_pct_of_oi - FROM serving.cot_positioning + FROM {table} WHERE cftc_commodity_code = ? ORDER BY report_date DESC LIMIT ? @@ -372,6 +381,30 @@ async def get_cot_index_trend( ) +async def get_cot_options_delta(cftc_commodity_code: str) -> dict | None: + """Latest managed_money_net difference between combined and futures-only reports. + + Shows whether the options book is reinforcing (same direction) or hedging + (opposite direction) the futures position. Returns None if either table + has no data. + """ + rows = await fetch_analytics( + """ + SELECT f.report_date, + f.managed_money_net AS fut_net, + c.managed_money_net AS combined_net, + c.managed_money_net - f.managed_money_net AS options_delta + FROM serving.cot_positioning f + JOIN serving.cot_positioning_combined c USING (report_date) + WHERE f.cftc_commodity_code = ? + ORDER BY f.report_date DESC + LIMIT 1 + """, + [cftc_commodity_code], + ) + return rows[0] if rows else None + + # ============================================================================= # Coffee Prices Queries # ============================================================================= diff --git a/web/src/beanflows/api/routes.py b/web/src/beanflows/api/routes.py index 6a6b0dd..07ce6a2 100644 --- a/web/src/beanflows/api/routes.py +++ b/web/src/beanflows/api/routes.py @@ -172,6 +172,7 @@ async def commodity_positioning(code: str): start_date — ISO date filter (YYYY-MM-DD) end_date — ISO date filter (YYYY-MM-DD) limit — max rows returned (default 260, max 2000) + type — report variant: "fut" (futures-only, default) or "combined" (futures+options) """ raw_metrics = request.args.getlist("metrics") or [ "managed_money_net", "prod_merc_net", "open_interest", "cot_index_26w" @@ -183,19 +184,27 @@ async def commodity_positioning(code: str): start_date = request.args.get("start_date") end_date = request.args.get("end_date") limit = min(int(request.args.get("limit", 260)), 2000) + cot_type = request.args.get("type", "fut") + combined = cot_type == "combined" - data = await analytics.get_cot_positioning_time_series(code, metrics, start_date, end_date, limit) - return jsonify({"cftc_commodity_code": code, "metrics": metrics, "data": data}) + data = await analytics.get_cot_positioning_time_series(code, metrics, start_date, end_date, limit, combined=combined) + return jsonify({"cftc_commodity_code": code, "type": cot_type, "metrics": metrics, "data": data}) @bp.route("/commodities//positioning/latest") @api_key_required(scopes=["read"]) async def commodity_positioning_latest(code: str): - """Latest week's full COT positioning snapshot for a commodity.""" - data = await analytics.get_cot_positioning_latest(code) + """Latest week's full COT positioning snapshot for a commodity. + + Query params: + type — report variant: "fut" (futures-only, default) or "combined" (futures+options) + """ + cot_type = request.args.get("type", "fut") + combined = cot_type == "combined" + data = await analytics.get_cot_positioning_latest(code, combined=combined) if not data: return jsonify({"error": "No positioning data found for this commodity"}), 404 - return jsonify({"cftc_commodity_code": code, "data": data}) + return jsonify({"cftc_commodity_code": code, "type": cot_type, "data": data}) @bp.route("/commodities//prices") diff --git a/web/src/beanflows/dashboard/routes.py b/web/src/beanflows/dashboard/routes.py index 8a26057..c99889c 100644 --- a/web/src/beanflows/dashboard/routes.py +++ b/web/src/beanflows/dashboard/routes.py @@ -257,30 +257,45 @@ async def positioning(): if range_key not in RANGE_MAP: range_key = "1y" + cot_type = request.args.get("type", "fut") + if cot_type not in ("fut", "combined"): + cot_type = "fut" + combined = cot_type == "combined" + rng = RANGE_MAP[range_key] price_limit = rng["days"] cot_weeks = rng["weeks"] + options_delta = None if analytics._db_path: - results = await asyncio.gather( + gather_coros = [ analytics.get_price_latest(analytics.COFFEE_TICKER), analytics.get_price_time_series(analytics.COFFEE_TICKER, limit=price_limit), - analytics.get_cot_positioning_latest(analytics.COFFEE_CFTC_CODE), - analytics.get_cot_index_trend(analytics.COFFEE_CFTC_CODE, weeks=cot_weeks), - return_exceptions=True, - ) - defaults = [None, [], None, []] - price_latest, price_series, cot_latest, cot_trend = _safe(results, defaults) + analytics.get_cot_positioning_latest(analytics.COFFEE_CFTC_CODE, combined=combined), + analytics.get_cot_index_trend(analytics.COFFEE_CFTC_CODE, weeks=cot_weeks, combined=combined), + ] + if combined: + gather_coros.append(analytics.get_cot_options_delta(analytics.COFFEE_CFTC_CODE)) + + results = await asyncio.gather(*gather_coros, return_exceptions=True) + defaults = [None, [], None, [], None] if combined else [None, [], None, []] + safe_results = _safe(results, defaults) + + price_latest, price_series, cot_latest, cot_trend = safe_results[:4] + if combined: + options_delta = safe_results[4] else: price_latest, price_series, cot_latest, cot_trend = None, [], None, [] ctx = dict( plan=plan, range_key=range_key, + cot_type=cot_type, price_latest=price_latest, price_series=price_series, cot_latest=cot_latest, cot_trend=cot_trend, + options_delta=options_delta, ) if request.headers.get("HX-Request"): diff --git a/web/src/beanflows/dashboard/templates/positioning.html b/web/src/beanflows/dashboard/templates/positioning.html index e312b20..18bfa0a 100644 --- a/web/src/beanflows/dashboard/templates/positioning.html +++ b/web/src/beanflows/dashboard/templates/positioning.html @@ -37,6 +37,15 @@ {% endfor %} + +
+ {% for val, label in [("fut", "Futures"), ("combined", "F+O Combined")] %} + + {% endfor %} +
+