From b884bc2b4aa86d5e52edc2f89930c3cd34ad00e9 Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 11:24:56 +0100 Subject: [PATCH 1/2] feat(cot): add combined (futures+options) COT extractor and transform models MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - extract/cftc_cot: refactor extract_cot_year() to accept url_template and landing_subdir params; add _extract_cot() shared loop; add extract_cot_combined() entry point using com_disagg_txt_{year}.zip → landing/cot_combined/ - pyproject.toml: add extract_cot_combined script entry point - macros/__init__.py: add @cot_combined_glob() for cot_combined/**/*.csv.gzip - fct_cot_positioning.sql: union cot_glob and cot_combined_glob in src CTE; add report_type column (FutOnly_or_Combined) to cast_and_clean + deduplicated; include FutOnly_or_Combined in hkey to avoid key collisions; add report_type to grain - obt_cot_positioning.sql: add report_type = 'FutOnly' filter to preserve existing serving behavior - obt_cot_positioning_combined.sql: new serving model filtered to report_type = 'Combined'; identical analytics (COT index, net %, windows) on combined data - pipelines.py: register extract_cot_combined; add to extract_all meta-pipeline Co-Authored-By: Claude Sonnet 4.6 --- extract/cftc_cot/pyproject.toml | 1 + extract/cftc_cot/src/cftc_cot/execute.py | 37 +++-- src/materia/pipelines.py | 8 +- transform/sqlmesh_materia/macros/__init__.py | 7 + .../models/foundation/fct_cot_positioning.sql | 19 ++- .../models/serving/obt_cot_positioning.sql | 1 + .../serving/obt_cot_positioning_combined.sql | 148 ++++++++++++++++++ 7 files changed, 205 insertions(+), 16 deletions(-) create mode 100644 transform/sqlmesh_materia/models/serving/obt_cot_positioning_combined.sql diff --git a/extract/cftc_cot/pyproject.toml b/extract/cftc_cot/pyproject.toml index b3275b3..5b3e483 100644 --- a/extract/cftc_cot/pyproject.toml +++ b/extract/cftc_cot/pyproject.toml @@ -10,6 +10,7 @@ dependencies = [ [project.scripts] extract_cot = "cftc_cot.execute:extract_cot_dataset" +extract_cot_combined = "cftc_cot.execute:extract_cot_combined" [build-system] requires = ["hatchling"] diff --git a/extract/cftc_cot/src/cftc_cot/execute.py b/extract/cftc_cot/src/cftc_cot/execute.py index cb3d87e..e5d7341 100644 --- a/extract/cftc_cot/src/cftc_cot/execute.py +++ b/extract/cftc_cot/src/cftc_cot/execute.py @@ -1,11 +1,13 @@ -"""CFTC COT Disaggregated Futures data extraction. +"""CFTC COT Disaggregated data extraction. Downloads yearly ZIP files from CFTC and stores as gzip CSV in the landing directory. CFTC publishes one file per year that updates every Friday at 3:30 PM ET. On first run this backfills all years from 2006. On subsequent runs it skips files whose etag matches what is already on disk. -Landing path: LANDING_DIR/cot/{year}/{etag}.csv.gzip +Two report variants are supported: + - Futures-only: Landing path: LANDING_DIR/cot/{year}/{etag}.csv.gzip + - Combined (fut+options): Landing path: LANDING_DIR/cot_combined/{year}/{etag}.csv.gzip """ import logging @@ -37,9 +39,10 @@ logger = logging.getLogger("CFTC COT Extractor") LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing")) -# CFTC publishes yearly ZIPs for the disaggregated futures-only report. +# CFTC publishes yearly ZIPs for both variants of the disaggregated report. # The file for the current year is updated each Friday at 3:30 PM ET. -COT_URL_TEMPLATE = "https://www.cftc.gov/files/dea/history/fut_disagg_txt_{year}.zip" +COT_URL_FUTURES_ONLY = "https://www.cftc.gov/files/dea/history/fut_disagg_txt_{year}.zip" +COT_URL_COMBINED = "https://www.cftc.gov/files/dea/history/com_disagg_txt_{year}.zip" FIRST_YEAR = 2006 # Disaggregated report starts June 2006 HTTP_TIMEOUT_SECONDS = 120 # COT ZIPs are up to ~30 MB @@ -60,12 +63,12 @@ def _synthetic_etag(year: int, headers: dict) -> str: return etag -def extract_cot_year(year: int, http_session: niquests.Session) -> int: +def extract_cot_year(year: int, http_session: niquests.Session, url_template: str, landing_subdir: str) -> int: """Download and store COT data for a single year. Returns bytes_written (0 if skipped or unavailable). """ - url = COT_URL_TEMPLATE.format(year=year) + url = url_template.format(year=year) logger.info(f"Checking COT data for {year}: {url}") head = http_session.head(url, timeout=HTTP_TIMEOUT_SECONDS) @@ -79,7 +82,7 @@ def extract_cot_year(year: int, http_session: niquests.Session) -> int: raw_etag = head.headers.get("etag", "") etag = normalize_etag(raw_etag) if raw_etag else _synthetic_etag(year, head.headers) - dest_dir = landing_path(LANDING_DIR, "cot", str(year)) + dest_dir = landing_path(LANDING_DIR, landing_subdir, str(year)) local_file = dest_dir / f"{etag}.csv.gzip" if local_file.exists(): @@ -104,8 +107,8 @@ def extract_cot_year(year: int, http_session: niquests.Session) -> int: return bytes_written -def extract_cot_dataset(): - """Extract all available CFTC COT disaggregated futures data. +def _extract_cot(url_template: str, landing_subdir: str, extractor_name: str) -> None: + """Shared extraction loop for any COT report variant. Downloads current year first (always re-checks for weekly Friday updates), then backfills historical years. Bounded to MAX_YEARS. Continues on @@ -119,7 +122,7 @@ def extract_cot_dataset(): ) conn = open_state_db(LANDING_DIR) - run_id = start_run(conn, "cftc_cot") + run_id = start_run(conn, extractor_name) files_written = 0 files_skipped = 0 bytes_written_total = 0 @@ -127,7 +130,7 @@ def extract_cot_dataset(): with niquests.Session() as session: for year in years: try: - result = extract_cot_year(year, session) + result = extract_cot_year(year, session, url_template, landing_subdir) if result > 0: files_written += 1 bytes_written_total += result @@ -136,7 +139,7 @@ def extract_cot_dataset(): except Exception: logger.exception(f"Failed to extract COT data for {year}, continuing") - logger.info(f"COT extraction complete: {files_written} new file(s) downloaded") + logger.info(f"COT extraction complete ({extractor_name}): {files_written} new file(s) downloaded") end_run( conn, run_id, status="success", files_written=files_written, files_skipped=files_skipped, @@ -150,5 +153,15 @@ def extract_cot_dataset(): conn.close() +def extract_cot_dataset(): + """Extract CFTC COT disaggregated futures-only report.""" + _extract_cot(COT_URL_FUTURES_ONLY, "cot", "cftc_cot") + + +def extract_cot_combined(): + """Extract CFTC COT disaggregated combined (futures+options) report.""" + _extract_cot(COT_URL_COMBINED, "cot_combined", "cftc_cot_combined") + + if __name__ == "__main__": extract_cot_dataset() diff --git a/src/materia/pipelines.py b/src/materia/pipelines.py index c7b9cee..0fb4bbe 100644 --- a/src/materia/pipelines.py +++ b/src/materia/pipelines.py @@ -20,6 +20,10 @@ PIPELINES = { "command": ["uv", "run", "--package", "cftc_cot", "extract_cot"], "timeout_seconds": 1800, }, + "extract_cot_combined": { + "command": ["uv", "run", "--package", "cftc_cot", "extract_cot_combined"], + "timeout_seconds": 1800, + }, "extract_prices": { "command": ["uv", "run", "--package", "coffee_prices", "extract_prices"], "timeout_seconds": 300, @@ -49,7 +53,7 @@ PIPELINES = { "timeout_seconds": 120, }, "extract_all": { - "command": ["meta", "extract", "extract_cot", "extract_prices", "extract_ice_all", "extract_weather"], + "command": ["meta", "extract", "extract_cot", "extract_cot_combined", "extract_prices", "extract_ice_all", "extract_weather"], "timeout_seconds": 6600, }, "transform": { @@ -68,7 +72,7 @@ PIPELINES = { META_PIPELINES: dict[str, list[str]] = { - "extract_all": ["extract", "extract_cot", "extract_prices", "extract_ice_all", "extract_weather"], + "extract_all": ["extract", "extract_cot", "extract_cot_combined", "extract_prices", "extract_ice_all", "extract_weather"], } diff --git a/transform/sqlmesh_materia/macros/__init__.py b/transform/sqlmesh_materia/macros/__init__.py index 0bf2bf1..45c2adf 100644 --- a/transform/sqlmesh_materia/macros/__init__.py +++ b/transform/sqlmesh_materia/macros/__init__.py @@ -17,6 +17,13 @@ def cot_glob(evaluator) -> str: return f"'{landing_dir}/cot/**/*.csv.gzip'" +@macro() +def cot_combined_glob(evaluator) -> str: + """Return a quoted glob path for all COT combined (futures+options) CSV gzip files under LANDING_DIR.""" + landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") + return f"'{landing_dir}/cot_combined/**/*.csv.gzip'" + + @macro() def prices_glob(evaluator) -> str: """Return a quoted glob path for all coffee price CSV gzip files under LANDING_DIR.""" diff --git a/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql b/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql index 2a2e9cd..c19c99d 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql @@ -4,7 +4,7 @@ MODEL ( kind INCREMENTAL_BY_TIME_RANGE ( time_column report_date ), - grain (cftc_commodity_code, report_date, cftc_contract_market_code, ingest_date), + grain (cftc_commodity_code, report_date, cftc_contract_market_code, ingest_date, report_type), start '2006-06-13', cron '@daily' ); @@ -21,6 +21,18 @@ WITH src AS ( all_varchar = TRUE, max_line_size = 10000000 ) + UNION ALL BY NAME + SELECT + * + FROM READ_CSV( + @cot_combined_glob(), + compression = 'gzip', + header = TRUE, + union_by_name = TRUE, + filename = TRUE, + all_varchar = TRUE, + max_line_size = 10000000 + ) ), cast_and_clean AS ( SELECT TRIM(market_and_exchange_names) AS market_and_exchange_name, /* Identifiers */ @@ -28,6 +40,7 @@ WITH src AS ( TRIM(cftc_commodity_code) AS cftc_commodity_code, TRIM(cftc_contract_market_code) AS cftc_contract_market_code, TRIM(contract_units) AS contract_units, + TRIM("FutOnly_or_Combined") AS report_type, /* 'FutOnly' or 'Combined' — discriminates the two CFTC report variants */ TRY_CAST(open_interest_all AS INT) AS open_interest, /* Open interest */ /* CFTC uses '.' as null for any field — use TRY_CAST throughout */ TRY_CAST(prod_merc_positions_long_all AS INT) AS prod_merc_long, /* Producer / Merchant (commercial hedgers: exporters, processors) */ TRY_CAST(prod_merc_positions_short_all AS INT) AS prod_merc_short, @@ -66,12 +79,13 @@ WITH src AS ( cftc_commodity_code, report_date_as_yyyy_mm_dd, cftc_contract_market_code, + "FutOnly_or_Combined", open_interest_all, m_money_positions_long_all, m_money_positions_short_all, prod_merc_positions_long_all, prod_merc_positions_short_all - ) AS hkey /* Dedup key: hash of business grain + key metrics */ + ) AS hkey /* Dedup key: hash of business grain + key metrics; includes report variant so fut-only and combined rows get distinct keys */ FROM src /* Reject rows with null commodity code or malformed date */ WHERE @@ -119,6 +133,7 @@ WITH src AS ( ANY_VALUE(traders_managed_money_short) AS traders_managed_money_short, ANY_VALUE(traders_managed_money_spread) AS traders_managed_money_spread, ANY_VALUE(ingest_date) AS ingest_date, + ANY_VALUE(report_type) AS report_type, hkey FROM cast_and_clean GROUP BY diff --git a/transform/sqlmesh_materia/models/serving/obt_cot_positioning.sql b/transform/sqlmesh_materia/models/serving/obt_cot_positioning.sql index 73e9663..161b35a 100644 --- a/transform/sqlmesh_materia/models/serving/obt_cot_positioning.sql +++ b/transform/sqlmesh_materia/models/serving/obt_cot_positioning.sql @@ -20,6 +20,7 @@ WITH latest_revision AS ( ON f.cftc_commodity_code = d.cftc_commodity_code WHERE d.commodity_name = 'Coffee, Green' + AND f.report_type = 'FutOnly' AND f.report_date BETWEEN @start_ds AND @end_ds QUALIFY ROW_NUMBER() OVER ( diff --git a/transform/sqlmesh_materia/models/serving/obt_cot_positioning_combined.sql b/transform/sqlmesh_materia/models/serving/obt_cot_positioning_combined.sql new file mode 100644 index 0000000..4b14940 --- /dev/null +++ b/transform/sqlmesh_materia/models/serving/obt_cot_positioning_combined.sql @@ -0,0 +1,148 @@ +/* Serving mart: COT positioning (combined futures+options) for Coffee C futures. */ /* Same analytics as serving.cot_positioning, but filtered to the combined */ /* report variant (FutOnly_or_Combined = 'Combined'). Positions include */ /* options delta-equivalent exposure, showing total directional market bet. */ /* Grain: one row per report_date for Coffee C futures. */ /* Latest revision per date: MAX(ingest_date) used to deduplicate CFTC corrections. */ +MODEL ( + name serving.cot_positioning_combined, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column report_date + ), + grain ( + report_date + ), + start '2006-06-13', + cron '@daily' +); + +WITH latest_revision AS ( + /* Pick the most recently ingested row when CFTC issues corrections */ + SELECT + f.* + FROM foundation.fct_cot_positioning AS f + INNER JOIN foundation.dim_commodity AS d + ON f.cftc_commodity_code = d.cftc_commodity_code + WHERE + d.commodity_name = 'Coffee, Green' + AND f.report_type = 'Combined' + AND f.report_date BETWEEN @start_ds AND @end_ds + QUALIFY + ROW_NUMBER() OVER ( + PARTITION BY f.report_date, f.cftc_contract_market_code + ORDER BY f.ingest_date DESC + ) = 1 +), with_derived AS ( + SELECT + report_date, + market_and_exchange_name, + cftc_commodity_code, + cftc_contract_market_code, + contract_units, + ingest_date, + open_interest, /* Absolute positions (contracts, delta-equivalent for options) */ + managed_money_long, + managed_money_short, + managed_money_spread, + managed_money_net, + prod_merc_long, + prod_merc_short, + prod_merc_net, + swap_long, + swap_short, + swap_spread, + swap_net, + other_reportable_long, + other_reportable_short, + other_reportable_spread, + other_reportable_net, + nonreportable_long, + nonreportable_short, + nonreportable_net, + ROUND(managed_money_net::REAL / NULLIF(open_interest, 0) * 100, 2) AS managed_money_net_pct_of_oi, /* Normalized: managed money net as % of open interest */ /* Removes size effects and makes cross-period comparison meaningful */ + ROUND(managed_money_long::REAL / NULLIF(managed_money_short, 0), 3) AS managed_money_long_short_ratio, /* Long/short ratio: >1 = more bulls than bears in managed money */ + change_open_interest, /* Weekly changes */ + change_managed_money_long, + change_managed_money_short, + change_managed_money_net, + change_prod_merc_long, + change_prod_merc_short, + managed_money_net /* Week-over-week momentum in managed money net (via LAG) */ - LAG(managed_money_net, 1) OVER (ORDER BY report_date) AS managed_money_net_wow, + concentration_top4_long_pct, /* Concentration */ + concentration_top4_short_pct, + concentration_top8_long_pct, + concentration_top8_short_pct, + traders_total, /* Trader counts */ + traders_managed_money_long, + traders_managed_money_short, + traders_managed_money_spread, + CASE + WHEN MAX(managed_money_net) OVER w26 = MIN(managed_money_net) OVER w26 + THEN 50.0 + ELSE ROUND( + ( + managed_money_net - MIN(managed_money_net) OVER w26 + )::REAL / ( + MAX(managed_money_net) OVER w26 - MIN(managed_money_net) OVER w26 + ) * 100, + 1 + ) + END AS cot_index_26w, /* COT Index (26-week): where is current net vs. trailing 26 weeks? */ /* 0 = most bearish extreme, 100 = most bullish extreme */ /* Includes options delta-equivalent exposure */ + CASE + WHEN MAX(managed_money_net) OVER w52 = MIN(managed_money_net) OVER w52 + THEN 50.0 + ELSE ROUND( + ( + managed_money_net - MIN(managed_money_net) OVER w52 + )::REAL / ( + MAX(managed_money_net) OVER w52 - MIN(managed_money_net) OVER w52 + ) * 100, + 1 + ) + END AS cot_index_52w /* COT Index (52-week): longer-term positioning context */ + FROM latest_revision + WINDOW w26 AS (ORDER BY report_date ROWS BETWEEN 25 PRECEDING AND CURRENT ROW), w52 AS (ORDER BY report_date ROWS BETWEEN 51 PRECEDING AND CURRENT ROW) +) +SELECT + report_date, + market_and_exchange_name, + cftc_commodity_code, + cftc_contract_market_code, + contract_units, + ingest_date, + open_interest, + managed_money_long, + managed_money_short, + managed_money_spread, + managed_money_net, + prod_merc_long, + prod_merc_short, + prod_merc_net, + swap_long, + swap_short, + swap_spread, + swap_net, + other_reportable_long, + other_reportable_short, + other_reportable_spread, + other_reportable_net, + nonreportable_long, + nonreportable_short, + nonreportable_net, + managed_money_net_pct_of_oi, + managed_money_long_short_ratio, + change_open_interest, + change_managed_money_long, + change_managed_money_short, + change_managed_money_net, + change_prod_merc_long, + change_prod_merc_short, + managed_money_net_wow, + concentration_top4_long_pct, + concentration_top4_short_pct, + concentration_top8_long_pct, + concentration_top8_short_pct, + traders_total, + traders_managed_money_long, + traders_managed_money_short, + traders_managed_money_spread, + cot_index_26w, + cot_index_52w +FROM with_derived +ORDER BY + report_date From 0326e5c83dbf6e2bfb9833efbe1df9932c0027c9 Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 11:25:05 +0100 Subject: [PATCH 2/2] feat(web): add F+O Combined toggle to positioning dashboard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - analytics.py: add _cot_table() helper; add combined=False param to get_cot_positioning_time_series(), get_cot_positioning_latest(), get_cot_index_trend(); add get_cot_options_delta() for MM net delta between combined and futures-only - dashboard/routes.py: read ?type=fut|combined param; pass combined flag to analytics calls; conditionally fetch options_delta when combined - api/routes.py: add ?type= param to /positioning and /positioning/latest endpoints; returned JSON includes type field - positioning.html: add report type pill group (Futures / F+O Combined) with setType() JS; setRange() and popstate now preserve the type param - positioning_canvas.html: sync type pills on HTMX swap; show Opt Δ badge on MM Net card when combined+options_delta available; conditional chart title and subtitle reflect which report variant is shown Co-Authored-By: Claude Sonnet 4.6 --- web/src/beanflows/analytics.py | 45 ++++++++++++++++--- web/src/beanflows/api/routes.py | 19 +++++--- web/src/beanflows/dashboard/routes.py | 29 +++++++++--- .../dashboard/templates/positioning.html | 31 ++++++++++++- .../templates/positioning_canvas.html | 19 ++++++-- 5 files changed, 120 insertions(+), 23 deletions(-) diff --git a/web/src/beanflows/analytics.py b/web/src/beanflows/analytics.py index 409b309..e0e529f 100644 --- a/web/src/beanflows/analytics.py +++ b/web/src/beanflows/analytics.py @@ -302,17 +302,23 @@ def _validate_cot_metrics(metrics: list[str]) -> list[str]: return valid +def _cot_table(combined: bool) -> str: + return "serving.cot_positioning_combined" if combined else "serving.cot_positioning" + + async def get_cot_positioning_time_series( cftc_commodity_code: str, metrics: list[str], start_date: str | None = None, end_date: str | None = None, limit: int = 520, + combined: bool = False, ) -> list[dict]: """Weekly COT positioning time series. limit defaults to ~10 years of weekly data.""" assert 1 <= limit <= 2000, "limit must be between 1 and 2000" metrics = _validate_cot_metrics(metrics) cols = ", ".join(metrics) + table = _cot_table(combined) where_parts = ["cftc_commodity_code = ?"] params: list = [cftc_commodity_code] @@ -329,7 +335,7 @@ async def get_cot_positioning_time_series( return await fetch_analytics( f""" SELECT report_date, {cols} - FROM serving.cot_positioning + FROM {table} WHERE {where_clause} ORDER BY report_date ASC LIMIT ? @@ -338,12 +344,13 @@ async def get_cot_positioning_time_series( ) -async def get_cot_positioning_latest(cftc_commodity_code: str) -> dict | None: +async def get_cot_positioning_latest(cftc_commodity_code: str, combined: bool = False) -> dict | None: """Latest week's full COT positioning snapshot.""" + table = _cot_table(combined) rows = await fetch_analytics( - """ + f""" SELECT * - FROM serving.cot_positioning + FROM {table} WHERE cftc_commodity_code = ? ORDER BY report_date DESC LIMIT 1 @@ -356,14 +363,16 @@ async def get_cot_positioning_latest(cftc_commodity_code: str) -> dict | None: async def get_cot_index_trend( cftc_commodity_code: str, weeks: int = 104, + combined: bool = False, ) -> list[dict]: """COT Index time series (26w and 52w) for the trailing N weeks.""" assert 1 <= weeks <= 1040, "weeks must be between 1 and 1040" + table = _cot_table(combined) return await fetch_analytics( - """ + f""" SELECT report_date, cot_index_26w, cot_index_52w, managed_money_net, managed_money_net_pct_of_oi - FROM serving.cot_positioning + FROM {table} WHERE cftc_commodity_code = ? ORDER BY report_date DESC LIMIT ? @@ -372,6 +381,30 @@ async def get_cot_index_trend( ) +async def get_cot_options_delta(cftc_commodity_code: str) -> dict | None: + """Latest managed_money_net difference between combined and futures-only reports. + + Shows whether the options book is reinforcing (same direction) or hedging + (opposite direction) the futures position. Returns None if either table + has no data. + """ + rows = await fetch_analytics( + """ + SELECT f.report_date, + f.managed_money_net AS fut_net, + c.managed_money_net AS combined_net, + c.managed_money_net - f.managed_money_net AS options_delta + FROM serving.cot_positioning f + JOIN serving.cot_positioning_combined c USING (report_date) + WHERE f.cftc_commodity_code = ? + ORDER BY f.report_date DESC + LIMIT 1 + """, + [cftc_commodity_code], + ) + return rows[0] if rows else None + + # ============================================================================= # Coffee Prices Queries # ============================================================================= diff --git a/web/src/beanflows/api/routes.py b/web/src/beanflows/api/routes.py index 6a6b0dd..07ce6a2 100644 --- a/web/src/beanflows/api/routes.py +++ b/web/src/beanflows/api/routes.py @@ -172,6 +172,7 @@ async def commodity_positioning(code: str): start_date — ISO date filter (YYYY-MM-DD) end_date — ISO date filter (YYYY-MM-DD) limit — max rows returned (default 260, max 2000) + type — report variant: "fut" (futures-only, default) or "combined" (futures+options) """ raw_metrics = request.args.getlist("metrics") or [ "managed_money_net", "prod_merc_net", "open_interest", "cot_index_26w" @@ -183,19 +184,27 @@ async def commodity_positioning(code: str): start_date = request.args.get("start_date") end_date = request.args.get("end_date") limit = min(int(request.args.get("limit", 260)), 2000) + cot_type = request.args.get("type", "fut") + combined = cot_type == "combined" - data = await analytics.get_cot_positioning_time_series(code, metrics, start_date, end_date, limit) - return jsonify({"cftc_commodity_code": code, "metrics": metrics, "data": data}) + data = await analytics.get_cot_positioning_time_series(code, metrics, start_date, end_date, limit, combined=combined) + return jsonify({"cftc_commodity_code": code, "type": cot_type, "metrics": metrics, "data": data}) @bp.route("/commodities//positioning/latest") @api_key_required(scopes=["read"]) async def commodity_positioning_latest(code: str): - """Latest week's full COT positioning snapshot for a commodity.""" - data = await analytics.get_cot_positioning_latest(code) + """Latest week's full COT positioning snapshot for a commodity. + + Query params: + type — report variant: "fut" (futures-only, default) or "combined" (futures+options) + """ + cot_type = request.args.get("type", "fut") + combined = cot_type == "combined" + data = await analytics.get_cot_positioning_latest(code, combined=combined) if not data: return jsonify({"error": "No positioning data found for this commodity"}), 404 - return jsonify({"cftc_commodity_code": code, "data": data}) + return jsonify({"cftc_commodity_code": code, "type": cot_type, "data": data}) @bp.route("/commodities//prices") diff --git a/web/src/beanflows/dashboard/routes.py b/web/src/beanflows/dashboard/routes.py index 8a26057..c99889c 100644 --- a/web/src/beanflows/dashboard/routes.py +++ b/web/src/beanflows/dashboard/routes.py @@ -257,30 +257,45 @@ async def positioning(): if range_key not in RANGE_MAP: range_key = "1y" + cot_type = request.args.get("type", "fut") + if cot_type not in ("fut", "combined"): + cot_type = "fut" + combined = cot_type == "combined" + rng = RANGE_MAP[range_key] price_limit = rng["days"] cot_weeks = rng["weeks"] + options_delta = None if analytics._db_path: - results = await asyncio.gather( + gather_coros = [ analytics.get_price_latest(analytics.COFFEE_TICKER), analytics.get_price_time_series(analytics.COFFEE_TICKER, limit=price_limit), - analytics.get_cot_positioning_latest(analytics.COFFEE_CFTC_CODE), - analytics.get_cot_index_trend(analytics.COFFEE_CFTC_CODE, weeks=cot_weeks), - return_exceptions=True, - ) - defaults = [None, [], None, []] - price_latest, price_series, cot_latest, cot_trend = _safe(results, defaults) + analytics.get_cot_positioning_latest(analytics.COFFEE_CFTC_CODE, combined=combined), + analytics.get_cot_index_trend(analytics.COFFEE_CFTC_CODE, weeks=cot_weeks, combined=combined), + ] + if combined: + gather_coros.append(analytics.get_cot_options_delta(analytics.COFFEE_CFTC_CODE)) + + results = await asyncio.gather(*gather_coros, return_exceptions=True) + defaults = [None, [], None, [], None] if combined else [None, [], None, []] + safe_results = _safe(results, defaults) + + price_latest, price_series, cot_latest, cot_trend = safe_results[:4] + if combined: + options_delta = safe_results[4] else: price_latest, price_series, cot_latest, cot_trend = None, [], None, [] ctx = dict( plan=plan, range_key=range_key, + cot_type=cot_type, price_latest=price_latest, price_series=price_series, cot_latest=cot_latest, cot_trend=cot_trend, + options_delta=options_delta, ) if request.headers.get("HX-Request"): diff --git a/web/src/beanflows/dashboard/templates/positioning.html b/web/src/beanflows/dashboard/templates/positioning.html index e312b20..18bfa0a 100644 --- a/web/src/beanflows/dashboard/templates/positioning.html +++ b/web/src/beanflows/dashboard/templates/positioning.html @@ -37,6 +37,15 @@ {% endfor %} + +
+ {% for val, label in [("fut", "Futures"), ("combined", "F+O Combined")] %} + + {% endfor %} +
+