From 0a83b2cb74ddc8a398847c7101f514c0bab3b5ff Mon Sep 17 00:00:00 2001 From: Deeman Date: Fri, 20 Feb 2026 21:57:04 +0100 Subject: [PATCH] Add CFTC COT data integration with foundation data model layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New extraction package (cftc_cot): downloads yearly Disaggregated Futures ZIPs from CFTC, etag-based dedup, dynamic inner filename discovery, gzip normalization - SQLMesh 3-layer architecture: raw (technical) → foundation (business model) → serving (mart) - dim_commodity seed: conformed dimension mapping USDA ↔ CFTC codes — the commodity ontology - fct_cot_positioning: typed, deduplicated weekly positioning facts for all commodities - obt_cot_positioning: Coffee C mart with COT Index (26w/52w), WoW delta, OI ratios - Analytics functions + REST API endpoints: /commodities//positioning[/latest] - Dashboard widget: Managed Money net, COT Index card, dual-axis Chart.js chart - 23 passing tests (10 unit + 2 SQLMesh model + existing regression suite) Co-Authored-By: Claude Sonnet 4.6 --- extract/cftc_cot/pyproject.toml | 18 ++ extract/cftc_cot/src/cftc_cot/__init__.py | 0 extract/cftc_cot/src/cftc_cot/execute.py | 129 ++++++++++++++ extract/cftc_cot/src/cftc_cot/normalize.py | 43 +++++ pyproject.toml | 1 + src/materia/pipelines.py | 4 + tests/test_cot_extraction.py | 147 ++++++++++++++++ transform/sqlmesh_materia/macros/__init__.py | 7 + .../models/foundation/dim_commodity.sql | 24 +++ .../models/foundation/fct_cot_positioning.sql | 160 ++++++++++++++++++ .../models/raw/cot_disaggregated.sql | 85 ++++++++++ .../models/serving/obt_cot_positioning.sql | 140 +++++++++++++++ .../sqlmesh_materia/seeds/dim_commodity.csv | 2 + .../tests/test_cot_foundation.yaml | 99 +++++++++++ uv.lock | 12 ++ web/src/beanflows/analytics.py | 113 +++++++++++++ web/src/beanflows/api/routes.py | 36 ++++ web/src/beanflows/dashboard/routes.py | 10 +- .../beanflows/dashboard/templates/index.html | 84 +++++++++ 19 files changed, 1111 insertions(+), 3 deletions(-) create mode 100644 extract/cftc_cot/pyproject.toml create mode 100644 extract/cftc_cot/src/cftc_cot/__init__.py create mode 100644 extract/cftc_cot/src/cftc_cot/execute.py create mode 100644 extract/cftc_cot/src/cftc_cot/normalize.py create mode 100644 tests/test_cot_extraction.py create mode 100644 transform/sqlmesh_materia/models/foundation/dim_commodity.sql create mode 100644 transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql create mode 100644 transform/sqlmesh_materia/models/raw/cot_disaggregated.sql create mode 100644 transform/sqlmesh_materia/models/serving/obt_cot_positioning.sql create mode 100644 transform/sqlmesh_materia/seeds/dim_commodity.csv create mode 100644 transform/sqlmesh_materia/tests/test_cot_foundation.yaml diff --git a/extract/cftc_cot/pyproject.toml b/extract/cftc_cot/pyproject.toml new file mode 100644 index 0000000..743057c --- /dev/null +++ b/extract/cftc_cot/pyproject.toml @@ -0,0 +1,18 @@ +[project] +name = "cftc_cot" +version = "0.1.0" +description = "CFTC Commitment of Traders data extractor" +requires-python = ">=3.13" +dependencies = [ + "niquests>=3.14.1", +] + +[project.scripts] +extract_cot = "cftc_cot.execute:extract_cot_dataset" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/cftc_cot"] diff --git a/extract/cftc_cot/src/cftc_cot/__init__.py b/extract/cftc_cot/src/cftc_cot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/extract/cftc_cot/src/cftc_cot/execute.py b/extract/cftc_cot/src/cftc_cot/execute.py new file mode 100644 index 0000000..aaebecc --- /dev/null +++ b/extract/cftc_cot/src/cftc_cot/execute.py @@ -0,0 +1,129 @@ +"""CFTC COT Disaggregated Futures data extraction. + +Downloads yearly ZIP files from CFTC and stores as gzip CSV in the landing +directory. CFTC publishes one file per year that updates every Friday at +3:30 PM ET. On first run this backfills all years from 2006. On subsequent +runs it skips files whose etag matches what is already on disk. + +Landing path: LANDING_DIR/cot/{year}/{etag}.csv.gzip +""" + +import logging +import os +import pathlib +import sys +from datetime import datetime +from io import BytesIO + +import niquests + +from .normalize import find_csv_inner_filename, normalize_zipped_csv + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger("CFTC COT Extractor") + +LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) + +# CFTC publishes yearly ZIPs for the disaggregated futures-only report. +# The file for the current year is updated each Friday at 3:30 PM ET. +COT_URL_TEMPLATE = "https://www.cftc.gov/files/dea/history/fut_disagg_txt_{year}.zip" + +FIRST_YEAR = 2006 # Disaggregated report starts June 2006 +HTTP_TIMEOUT_SECONDS = 120 # COT ZIPs are up to ~30 MB +MAX_YEARS = 25 # Safety bound on backfill range + + +def _synthetic_etag(year: int, headers: dict) -> str: + """Build a dedup key when CFTC omits the etag header. + + Uses content-length + last-modified. This is not as strong as a real etag + (a server clock change would trigger a re-download), but it is safe because + the staging layer deduplicates via hash key. + """ + last_modified = headers.get("last-modified", "") + content_length = headers.get("content-length", "0") + etag = f"{year}_{content_length}_{hash(last_modified) & 0xFFFFFFFF:08x}" + logger.info(f"No etag header for {year}, using synthetic etag: {etag}") + return etag + + +def extract_cot_year(year: int, http_session: niquests.Session) -> bool: + """Download and store COT data for a single year. + + Returns True if a new file was written, False if skipped or unavailable. + """ + url = COT_URL_TEMPLATE.format(year=year) + logger.info(f"Checking COT data for {year}: {url}") + + head = http_session.head(url, timeout=HTTP_TIMEOUT_SECONDS) + if head.status_code == 404: + logger.info(f"Year {year} not available (404) — skipping") + return False + assert head.status_code == 200, ( + f"Unexpected HEAD status {head.status_code} for {url}" + ) + + raw_etag = head.headers.get("etag", "") + etag = raw_etag.replace('"', "").replace(":", "_") if raw_etag else _synthetic_etag(year, head.headers) + + dest_dir = LANDING_DIR / "cot" / str(year) + local_file = dest_dir / f"{etag}.csv.gzip" + + if local_file.exists(): + logger.info(f"Year {year}: {etag}.csv.gzip already exists, skipping") + return False + + logger.info(f"Downloading COT data for {year}...") + response = http_session.get(url, timeout=HTTP_TIMEOUT_SECONDS) + assert response.status_code == 200, ( + f"GET failed with status {response.status_code} for {url}" + ) + assert len(response.content) > 0, f"Downloaded empty file from {url}" + + zip_buffer = BytesIO(response.content) + inner_filename = find_csv_inner_filename(BytesIO(response.content)) + normalized = normalize_zipped_csv(zip_buffer, inner_filename) + + dest_dir.mkdir(parents=True, exist_ok=True) + local_file.write_bytes(normalized.read()) + + assert local_file.exists(), f"File was not written: {local_file}" + assert local_file.stat().st_size > 0, f"Written file is empty: {local_file}" + + logger.info(f"Stored {local_file} ({local_file.stat().st_size:,} bytes)") + return True + + +def extract_cot_dataset(): + """Extract all available CFTC COT disaggregated futures data. + + Downloads current year first (always re-checks for weekly Friday updates), + then backfills historical years. Bounded to MAX_YEARS. Continues on + individual year failures so a single bad year does not abort the run. + """ + LANDING_DIR.mkdir(parents=True, exist_ok=True) + current_year = datetime.now().year + years = list(range(current_year, FIRST_YEAR - 1, -1)) + assert len(years) <= MAX_YEARS, ( + f"Year range {len(years)} exceeds MAX_YEARS={MAX_YEARS}" + ) + + new_count = 0 + with niquests.Session() as session: + for year in years: + try: + if extract_cot_year(year, session): + new_count += 1 + except Exception: + logger.exception(f"Failed to extract COT data for {year}, continuing") + + logger.info(f"COT extraction complete: {new_count} new file(s) downloaded") + + +if __name__ == "__main__": + extract_cot_dataset() diff --git a/extract/cftc_cot/src/cftc_cot/normalize.py b/extract/cftc_cot/src/cftc_cot/normalize.py new file mode 100644 index 0000000..a30a7ae --- /dev/null +++ b/extract/cftc_cot/src/cftc_cot/normalize.py @@ -0,0 +1,43 @@ +"""Normalize CFTC ZIP archives to gzip CSV.""" + +import gzip +import zipfile +from io import BytesIO + + +def find_csv_inner_filename(buffer: BytesIO) -> str: + """Find the single .txt file inside a CFTC ZIP archive. + + CFTC uses .txt extension for their comma-delimited CSV files. The filename + varies across years (e.g. 'f_year.txt', 'FUT_DISAGG_2015.txt'). We assert + exactly one .txt file exists and return its name. + """ + with zipfile.ZipFile(buffer, mode="r") as zf: + txt_files = [n for n in zf.namelist() if n.lower().endswith(".txt")] + assert len(txt_files) == 1, ( + f"Expected exactly 1 .txt file in CFTC ZIP, found: {zf.namelist()}" + ) + return txt_files[0] + + +def normalize_zipped_csv(buffer: BytesIO, inner_filename: str) -> BytesIO: + """Extract a single CSV from a ZIP and recompress as gzip. + + Args: + buffer: ZIP file content as BytesIO (will be read from position 0). + inner_filename: Name of the file inside the ZIP archive. + + Returns: + BytesIO with gzip-compressed CSV content, seeked to position 0. + """ + buffer.seek(0) + out = BytesIO() + with zipfile.ZipFile(buffer, mode="r") as zf: + assert inner_filename in zf.namelist(), ( + f"Expected '{inner_filename}' in ZIP, found: {zf.namelist()}" + ) + with zf.open(inner_filename, mode="r") as csv_file: + with gzip.open(out, "wb") as gz: + gz.write(csv_file.read()) + out.seek(0) + return out diff --git a/pyproject.toml b/pyproject.toml index 6afc279..858f4c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ dev = [ [tool.uv.sources] psdonline = {workspace = true } sqlmesh_materia = {workspace = true } +cftc_cot = {workspace = true } [tool.uv.workspace] members = [ diff --git a/src/materia/pipelines.py b/src/materia/pipelines.py index 1754ecf..2750082 100644 --- a/src/materia/pipelines.py +++ b/src/materia/pipelines.py @@ -16,6 +16,10 @@ PIPELINES = { "command": ["uv", "run", "--package", "psdonline", "extract_psd"], "timeout_seconds": 1800, }, + "extract_cot": { + "command": ["uv", "run", "--package", "cftc_cot", "extract_cot"], + "timeout_seconds": 1800, + }, "transform": { "command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"], "timeout_seconds": 3600, diff --git a/tests/test_cot_extraction.py b/tests/test_cot_extraction.py new file mode 100644 index 0000000..8c596e9 --- /dev/null +++ b/tests/test_cot_extraction.py @@ -0,0 +1,147 @@ +"""Tests for CFTC COT extraction package.""" + +import gzip +import zipfile +from io import BytesIO +from unittest.mock import MagicMock + +from cftc_cot.normalize import find_csv_inner_filename, normalize_zipped_csv + + +# ============================================================================= +# normalize.py +# ============================================================================= + + +def _make_zip(inner_name: str, content: bytes) -> BytesIO: + """Helper: create a ZIP buffer containing a single named file.""" + buf = BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr(inner_name, content) + buf.seek(0) + return buf + + +def test_find_csv_inner_filename_returns_txt_file(): + buf = _make_zip("f_year.txt", b"col1,col2\nv1,v2\n") + assert find_csv_inner_filename(buf) == "f_year.txt" + + +def test_find_csv_inner_filename_case_insensitive(): + buf = _make_zip("FUT_DISAGG_2015.TXT", b"data") + assert find_csv_inner_filename(buf) == "FUT_DISAGG_2015.TXT" + + +def test_find_csv_inner_filename_asserts_on_zero_txt_files(): + buf = BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("readme.md", b"not a txt file") + buf.seek(0) + try: + find_csv_inner_filename(buf) + assert False, "Should have raised AssertionError" + except AssertionError as e: + assert "Expected exactly 1" in str(e) + + +def test_find_csv_inner_filename_asserts_on_multiple_txt_files(): + buf = BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("a.txt", b"data a") + zf.writestr("b.txt", b"data b") + buf.seek(0) + try: + find_csv_inner_filename(buf) + assert False, "Should have raised AssertionError" + except AssertionError: + pass + + +def test_normalize_zipped_csv_produces_valid_gzip(): + csv_content = b"Market_and_Exchange_Names,CFTC_Commodity_Code\nCOFFEE C,083731\n" + buf = _make_zip("f_year.txt", csv_content) + result = normalize_zipped_csv(buf, "f_year.txt") + + # Decompress and verify content round-trips + with gzip.open(result, "rb") as gz: + decompressed = gz.read() + assert decompressed == csv_content + + +def test_normalize_zipped_csv_resets_seek_position(): + buf = _make_zip("f_year.txt", b"data") + result = normalize_zipped_csv(buf, "f_year.txt") + assert result.tell() == 0, "Result BytesIO should be seeked to position 0" + + +def test_normalize_zipped_csv_asserts_on_wrong_inner_name(): + buf = _make_zip("actual.txt", b"data") + try: + normalize_zipped_csv(buf, "expected.txt") + assert False, "Should have raised AssertionError" + except AssertionError as e: + assert "expected.txt" in str(e) + + +# ============================================================================= +# execute.py — pipeline registry integration +# ============================================================================= + + +def test_extract_cot_pipeline_registered(): + """extract_cot must appear in the materia pipeline registry.""" + from materia.pipelines import PIPELINES + + assert "extract_cot" in PIPELINES + entry = PIPELINES["extract_cot"] + assert entry["command"] == ["uv", "run", "--package", "cftc_cot", "extract_cot"] + assert entry["timeout_seconds"] == 1800 + + +def test_extract_cot_year_skips_existing_file(tmp_path, monkeypatch): + """extract_cot_year returns False and skips download when file already exists.""" + import pathlib + + monkeypatch.setenv("LANDING_DIR", str(tmp_path)) + + # Pre-create the etag file to simulate existing data + dest = tmp_path / "cot" / "2024" + dest.mkdir(parents=True) + etag = "abc123" + (dest / f"{etag}.csv.gzip").write_bytes(b"existing") + + from cftc_cot import execute as cot_execute + + # Reload LANDING_DIR after monkeypatch + cot_execute.LANDING_DIR = pathlib.Path(str(tmp_path)) + + mock_session = MagicMock() + mock_head = MagicMock() + mock_head.status_code = 200 + mock_head.headers = {"etag": f'"{etag}"'} + mock_session.head.return_value = mock_head + + result = cot_execute.extract_cot_year(2024, mock_session) + + assert result is False + mock_session.get.assert_not_called() # No download should occur + + +def test_extract_cot_year_returns_false_on_404(tmp_path, monkeypatch): + """extract_cot_year returns False when CFTC returns 404 for a year.""" + import pathlib + + monkeypatch.setenv("LANDING_DIR", str(tmp_path)) + + from cftc_cot import execute as cot_execute + cot_execute.LANDING_DIR = pathlib.Path(str(tmp_path)) + + mock_session = MagicMock() + mock_head = MagicMock() + mock_head.status_code = 404 + mock_session.head.return_value = mock_head + + result = cot_execute.extract_cot_year(2006, mock_session) + + assert result is False + mock_session.get.assert_not_called() diff --git a/transform/sqlmesh_materia/macros/__init__.py b/transform/sqlmesh_materia/macros/__init__.py index 48e55a2..7201019 100644 --- a/transform/sqlmesh_materia/macros/__init__.py +++ b/transform/sqlmesh_materia/macros/__init__.py @@ -8,3 +8,10 @@ def psd_glob(evaluator) -> str: """Return a quoted glob path for all PSD CSV gzip files under LANDING_DIR.""" landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") return f"'{landing_dir}/psd/**/*.csv.gzip'" + + +@macro() +def cot_glob(evaluator) -> str: + """Return a quoted glob path for all COT CSV gzip files under LANDING_DIR.""" + landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") + return f"'{landing_dir}/cot/**/*.csv.gzip'" diff --git a/transform/sqlmesh_materia/models/foundation/dim_commodity.sql b/transform/sqlmesh_materia/models/foundation/dim_commodity.sql new file mode 100644 index 0000000..0bfcd78 --- /dev/null +++ b/transform/sqlmesh_materia/models/foundation/dim_commodity.sql @@ -0,0 +1,24 @@ +-- Commodity dimension: conforms identifiers across source systems. +-- +-- This is the ontology seed. Each row is a commodity tracked by BeanFlows. +-- As new sources are added (ICO, futures prices, satellite), their +-- commodity identifiers are added as columns here — not as separate tables. +-- As new commodities are added (cocoa, sugar), rows are added here. +-- +-- References: +-- usda_commodity_code → raw.psd_alldata.commodity_code +-- cftc_commodity_code → raw.cot_disaggregated.cftc_commodity_code + +MODEL ( + name foundation.dim_commodity, + kind SEED ( + path '$root/seeds/dim_commodity.csv', + csv_settings (delimiter = ';') + ), + columns ( + usda_commodity_code varchar, + cftc_commodity_code varchar, + commodity_name varchar, + commodity_group varchar + ) +); diff --git a/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql b/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql new file mode 100644 index 0000000..412b94a --- /dev/null +++ b/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql @@ -0,0 +1,160 @@ +-- Foundation fact: CFTC COT positioning, weekly grain, all commodities. +-- +-- Casts raw varchar columns to proper types, cleans column names, +-- computes net positions (long - short) per trader category, and +-- deduplicates via hash key. Covers all commodities — filtering to +-- a specific commodity happens in the serving layer. +-- +-- Grain: one row per (cftc_commodity_code, report_date, cftc_contract_market_code) +-- History: revisions appear as new rows with a later ingest_date. +-- Serving layer picks max(ingest_date) per grain for latest view. + +MODEL ( + name foundation.fct_cot_positioning, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column report_date + ), + grain (cftc_commodity_code, report_date, cftc_contract_market_code, ingest_date), + start '2006-06-13', + cron '@daily' +); + +WITH cast_and_clean AS ( + SELECT + -- Identifiers + trim(market_and_exchange_names) AS market_and_exchange_name, + report_date_as_yyyy_mm_dd::date AS report_date, + trim(cftc_commodity_code) AS cftc_commodity_code, + trim(cftc_contract_market_code) AS cftc_contract_market_code, + trim(contract_units) AS contract_units, + + -- Open interest + open_interest_all::int AS open_interest, + + -- Producer / Merchant (commercial hedgers: exporters, processors) + prod_merc_positions_long_all::int AS prod_merc_long, + prod_merc_positions_short_all::int AS prod_merc_short, + + -- Swap dealers + swap_positions_long_all::int AS swap_long, + swap_positions_short_all::int AS swap_short, + swap_positions_spread_all::int AS swap_spread, + + -- Managed money (hedge funds, CTAs — the primary speculative signal) + m_money_positions_long_all::int AS managed_money_long, + m_money_positions_short_all::int AS managed_money_short, + m_money_positions_spread_all::int AS managed_money_spread, + + -- Other reportables + other_rept_positions_long_all::int AS other_reportable_long, + other_rept_positions_short_all::int AS other_reportable_short, + other_rept_positions_spread_all::int AS other_reportable_spread, + + -- Non-reportable (small speculators, below reporting threshold) + nonrept_positions_long_all::int AS nonreportable_long, + nonrept_positions_short_all::int AS nonreportable_short, + + -- Net positions (long minus short per category) + prod_merc_positions_long_all::int + - prod_merc_positions_short_all::int AS prod_merc_net, + m_money_positions_long_all::int + - m_money_positions_short_all::int AS managed_money_net, + swap_positions_long_all::int + - swap_positions_short_all::int AS swap_net, + other_rept_positions_long_all::int + - other_rept_positions_short_all::int AS other_reportable_net, + nonrept_positions_long_all::int + - nonrept_positions_short_all::int AS nonreportable_net, + + -- Week-over-week changes + change_in_open_interest_all::int AS change_open_interest, + change_in_m_money_long_all::int AS change_managed_money_long, + change_in_m_money_short_all::int AS change_managed_money_short, + change_in_m_money_long_all::int + - change_in_m_money_short_all::int AS change_managed_money_net, + change_in_prod_merc_long_all::int AS change_prod_merc_long, + change_in_prod_merc_short_all::int AS change_prod_merc_short, + + -- Concentration ratios (% of OI held by top 4 / top 8 traders) + conc_gross_le_4_tdr_long_all::float AS concentration_top4_long_pct, + conc_gross_le_4_tdr_short_all::float AS concentration_top4_short_pct, + conc_gross_le_8_tdr_long_all::float AS concentration_top8_long_pct, + conc_gross_le_8_tdr_short_all::float AS concentration_top8_short_pct, + + -- Trader counts + traders_tot_all::int AS traders_total, + traders_m_money_long_all::int AS traders_managed_money_long, + traders_m_money_short_all::int AS traders_managed_money_short, + traders_m_money_spread_all::int AS traders_managed_money_spread, + + -- Ingest date: derived from landing path year directory + -- Path: .../cot/{year}/{etag}.csv.gzip → extract year from [-2] + make_date(split(filename, '/')[-2]::int, 1, 1) AS ingest_date, + + -- Dedup key: hash of business grain + key metrics + hash( + cftc_commodity_code, + report_date_as_yyyy_mm_dd, + cftc_contract_market_code, + open_interest_all, + m_money_positions_long_all, + m_money_positions_short_all, + prod_merc_positions_long_all, + prod_merc_positions_short_all + ) AS hkey + FROM raw.cot_disaggregated + -- Reject rows with null commodity code or malformed date + WHERE trim(cftc_commodity_code) IS NOT NULL + AND len(trim(cftc_commodity_code)) > 0 + AND report_date_as_yyyy_mm_dd::date IS NOT NULL +), + +deduplicated AS ( + SELECT + any_value(market_and_exchange_name) AS market_and_exchange_name, + any_value(report_date) AS report_date, + any_value(cftc_commodity_code) AS cftc_commodity_code, + any_value(cftc_contract_market_code) AS cftc_contract_market_code, + any_value(contract_units) AS contract_units, + any_value(open_interest) AS open_interest, + any_value(prod_merc_long) AS prod_merc_long, + any_value(prod_merc_short) AS prod_merc_short, + any_value(prod_merc_net) AS prod_merc_net, + any_value(swap_long) AS swap_long, + any_value(swap_short) AS swap_short, + any_value(swap_spread) AS swap_spread, + any_value(swap_net) AS swap_net, + any_value(managed_money_long) AS managed_money_long, + any_value(managed_money_short) AS managed_money_short, + any_value(managed_money_spread) AS managed_money_spread, + any_value(managed_money_net) AS managed_money_net, + any_value(other_reportable_long) AS other_reportable_long, + any_value(other_reportable_short) AS other_reportable_short, + any_value(other_reportable_spread) AS other_reportable_spread, + any_value(other_reportable_net) AS other_reportable_net, + any_value(nonreportable_long) AS nonreportable_long, + any_value(nonreportable_short) AS nonreportable_short, + any_value(nonreportable_net) AS nonreportable_net, + any_value(change_open_interest) AS change_open_interest, + any_value(change_managed_money_long) AS change_managed_money_long, + any_value(change_managed_money_short) AS change_managed_money_short, + any_value(change_managed_money_net) AS change_managed_money_net, + any_value(change_prod_merc_long) AS change_prod_merc_long, + any_value(change_prod_merc_short) AS change_prod_merc_short, + any_value(concentration_top4_long_pct) AS concentration_top4_long_pct, + any_value(concentration_top4_short_pct) AS concentration_top4_short_pct, + any_value(concentration_top8_long_pct) AS concentration_top8_long_pct, + any_value(concentration_top8_short_pct) AS concentration_top8_short_pct, + any_value(traders_total) AS traders_total, + any_value(traders_managed_money_long) AS traders_managed_money_long, + any_value(traders_managed_money_short) AS traders_managed_money_short, + any_value(traders_managed_money_spread) AS traders_managed_money_spread, + any_value(ingest_date) AS ingest_date, + hkey + FROM cast_and_clean + GROUP BY hkey +) + +SELECT * +FROM deduplicated +WHERE report_date BETWEEN @start_ds AND @end_ds diff --git a/transform/sqlmesh_materia/models/raw/cot_disaggregated.sql b/transform/sqlmesh_materia/models/raw/cot_disaggregated.sql new file mode 100644 index 0000000..9191eda --- /dev/null +++ b/transform/sqlmesh_materia/models/raw/cot_disaggregated.sql @@ -0,0 +1,85 @@ +-- Raw CFTC Commitment of Traders — Disaggregated Futures Only. +-- +-- Technical ingestion layer only: reads gzip CSVs from the landing directory +-- and surfaces the columns needed by downstream foundation models. +-- All values are varchar; casting happens in foundation. +-- +-- Source: CFTC yearly ZIPs at +-- https://www.cftc.gov/files/dea/history/fut_disagg_txt_{year}.zip +-- Coverage: June 2006 – present (new file every Friday at 3:30 PM ET) + +MODEL ( + name raw.cot_disaggregated, + kind FULL, + grain (cftc_commodity_code, report_date_as_yyyy_mm_dd, cftc_contract_market_code), + start '2006-06-13', + cron '@daily' +); + +SELECT + -- Identifiers + "Market_and_Exchange_Names" AS market_and_exchange_names, + "Report_Date_as_YYYY-MM-DD" AS report_date_as_yyyy_mm_dd, + "CFTC_Commodity_Code" AS cftc_commodity_code, + "CFTC_Contract_Market_Code" AS cftc_contract_market_code, + "Contract_Units" AS contract_units, + + -- Open interest + "Open_Interest_All" AS open_interest_all, + + -- Producer / Merchant / Processor / User (commercial hedgers) + "Prod_Merc_Positions_Long_All" AS prod_merc_positions_long_all, + "Prod_Merc_Positions_Short_All" AS prod_merc_positions_short_all, + + -- Swap dealers + "Swap_Positions_Long_All" AS swap_positions_long_all, + "Swap__Positions_Short_All" AS swap_positions_short_all, + "Swap__Positions_Spread_All" AS swap_positions_spread_all, + + -- Managed money (hedge funds, CTAs — key speculative signal) + "M_Money_Positions_Long_All" AS m_money_positions_long_all, + "M_Money_Positions_Short_All" AS m_money_positions_short_all, + "M_Money_Positions_Spread_All" AS m_money_positions_spread_all, + + -- Other reportables + "Other_Rept_Positions_Long_All" AS other_rept_positions_long_all, + "Other_Rept_Positions_Short_All" AS other_rept_positions_short_all, + "Other_Rept_Positions_Spread_All" AS other_rept_positions_spread_all, + + -- Non-reportable (small speculators) + "NonRept_Positions_Long_All" AS nonrept_positions_long_all, + "NonRept_Positions_Short_All" AS nonrept_positions_short_all, + + -- Week-over-week changes + "Change_in_Open_Interest_All" AS change_in_open_interest_all, + "Change_in_M_Money_Long_All" AS change_in_m_money_long_all, + "Change_in_M_Money_Short_All" AS change_in_m_money_short_all, + "Change_in_Prod_Merc_Long_All" AS change_in_prod_merc_long_all, + "Change_in_Prod_Merc_Short_All" AS change_in_prod_merc_short_all, + + -- Concentration (% of OI held by top 4 and top 8 traders) + "Conc_Gross_LE_4_TDR_Long_All" AS conc_gross_le_4_tdr_long_all, + "Conc_Gross_LE_4_TDR_Short_All" AS conc_gross_le_4_tdr_short_all, + "Conc_Gross_LE_8_TDR_Long_All" AS conc_gross_le_8_tdr_long_all, + "Conc_Gross_LE_8_TDR_Short_All" AS conc_gross_le_8_tdr_short_all, + + -- Trader counts + "Traders_Tot_All" AS traders_tot_all, + "Traders_M_Money_Long_All" AS traders_m_money_long_all, + "Traders_M_Money_Short_All" AS traders_m_money_short_all, + "Traders_M_Money_Spread_All" AS traders_m_money_spread_all, + + -- Lineage + filename +FROM read_csv( + @cot_glob(), + delim = ',', + encoding = 'utf-8', + compression = 'gzip', + header = true, + union_by_name = true, + filename = true, + all_varchar = true, + max_line_size = 10000000, + ignore_errors = true +) diff --git a/transform/sqlmesh_materia/models/serving/obt_cot_positioning.sql b/transform/sqlmesh_materia/models/serving/obt_cot_positioning.sql new file mode 100644 index 0000000..8044341 --- /dev/null +++ b/transform/sqlmesh_materia/models/serving/obt_cot_positioning.sql @@ -0,0 +1,140 @@ +-- Serving mart: COT positioning for Coffee C futures, analytics-ready. +-- +-- Joins foundation.fct_cot_positioning with foundation.dim_commodity so +-- the coffee filter is driven by the dimension (not a hardcoded CFTC code). +-- Adds derived analytics used by the dashboard and API: +-- - Normalized positioning (% of open interest) +-- - Long/short ratio +-- - Week-over-week momentum +-- - COT Index over 26-week and 52-week trailing windows (0=bearish, 100=bullish) +-- +-- Grain: one row per report_date for Coffee C futures. +-- Latest revision per date: MAX(ingest_date) used to deduplicate CFTC corrections. + +MODEL ( + name serving.cot_positioning, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column report_date + ), + grain (report_date), + start '2006-06-13', + cron '@daily' +); + +WITH latest_revision AS ( + -- Pick the most recently ingested row when CFTC issues corrections + SELECT f.* + FROM foundation.fct_cot_positioning f + INNER JOIN foundation.dim_commodity d + ON f.cftc_commodity_code = d.cftc_commodity_code + WHERE d.commodity_name = 'Coffee, Green' + AND f.report_date BETWEEN @start_ds AND @end_ds + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY f.report_date, f.cftc_contract_market_code + ORDER BY f.ingest_date DESC + ) = 1 +), + +with_derived AS ( + SELECT + report_date, + market_and_exchange_name, + cftc_commodity_code, + cftc_contract_market_code, + contract_units, + ingest_date, + + -- Absolute positions (contracts) + open_interest, + managed_money_long, + managed_money_short, + managed_money_spread, + managed_money_net, + prod_merc_long, + prod_merc_short, + prod_merc_net, + swap_long, + swap_short, + swap_spread, + swap_net, + other_reportable_long, + other_reportable_short, + other_reportable_spread, + other_reportable_net, + nonreportable_long, + nonreportable_short, + nonreportable_net, + + -- Normalized: managed money net as % of open interest + -- Removes size effects and makes cross-period comparison meaningful + round( + managed_money_net::float / NULLIF(open_interest, 0) * 100, + 2 + ) AS managed_money_net_pct_of_oi, + + -- Long/short ratio: >1 = more bulls than bears in managed money + round( + managed_money_long::float / NULLIF(managed_money_short, 0), + 3 + ) AS managed_money_long_short_ratio, + + -- Weekly changes + change_open_interest, + change_managed_money_long, + change_managed_money_short, + change_managed_money_net, + change_prod_merc_long, + change_prod_merc_short, + + -- Week-over-week momentum in managed money net (via LAG) + managed_money_net - LAG(managed_money_net, 1) OVER ( + ORDER BY report_date + ) AS managed_money_net_wow, + + -- Concentration + concentration_top4_long_pct, + concentration_top4_short_pct, + concentration_top8_long_pct, + concentration_top8_short_pct, + + -- Trader counts + traders_total, + traders_managed_money_long, + traders_managed_money_short, + traders_managed_money_spread, + + -- COT Index (26-week): where is current net vs. trailing 26 weeks? + -- 0 = most bearish extreme, 100 = most bullish extreme + -- Industry-standard sentiment gauge (equivalent to RSI for positioning) + CASE + WHEN MAX(managed_money_net) OVER w26 = MIN(managed_money_net) OVER w26 + THEN 50.0 + ELSE round( + (managed_money_net - MIN(managed_money_net) OVER w26)::float + / (MAX(managed_money_net) OVER w26 - MIN(managed_money_net) OVER w26) + * 100, + 1 + ) + END AS cot_index_26w, + + -- COT Index (52-week): longer-term positioning context + CASE + WHEN MAX(managed_money_net) OVER w52 = MIN(managed_money_net) OVER w52 + THEN 50.0 + ELSE round( + (managed_money_net - MIN(managed_money_net) OVER w52)::float + / (MAX(managed_money_net) OVER w52 - MIN(managed_money_net) OVER w52) + * 100, + 1 + ) + END AS cot_index_52w + + FROM latest_revision + WINDOW + w26 AS (ORDER BY report_date ROWS BETWEEN 25 PRECEDING AND CURRENT ROW), + w52 AS (ORDER BY report_date ROWS BETWEEN 51 PRECEDING AND CURRENT ROW) +) + +SELECT * +FROM with_derived +ORDER BY report_date diff --git a/transform/sqlmesh_materia/seeds/dim_commodity.csv b/transform/sqlmesh_materia/seeds/dim_commodity.csv new file mode 100644 index 0000000..4189793 --- /dev/null +++ b/transform/sqlmesh_materia/seeds/dim_commodity.csv @@ -0,0 +1,2 @@ +usda_commodity_code;cftc_commodity_code;commodity_name;commodity_group +0711100;083731;Coffee, Green;Softs diff --git a/transform/sqlmesh_materia/tests/test_cot_foundation.yaml b/transform/sqlmesh_materia/tests/test_cot_foundation.yaml new file mode 100644 index 0000000..0fd6cf9 --- /dev/null +++ b/transform/sqlmesh_materia/tests/test_cot_foundation.yaml @@ -0,0 +1,99 @@ +test_fct_cot_positioning_types_and_net_positions: + model: foundation.fct_cot_positioning + inputs: + raw.cot_disaggregated: + rows: + - market_and_exchange_names: "COFFEE C - ICE FUTURES U.S." + report_date_as_yyyy_mm_dd: "2024-01-02" + cftc_commodity_code: "083731" + cftc_contract_market_code: "083731" + contract_units: "37,500 POUNDS" + open_interest_all: "250000" + prod_merc_positions_long_all: "80000" + prod_merc_positions_short_all: "90000" + swap_positions_long_all: "30000" + swap_positions_short_all: "35000" + swap_positions_spread_all: "10000" + m_money_positions_long_all: "60000" + m_money_positions_short_all: "40000" + m_money_positions_spread_all: "15000" + other_rept_positions_long_all: "20000" + other_rept_positions_short_all: "18000" + other_rept_positions_spread_all: "5000" + nonrept_positions_long_all: "12000" + nonrept_positions_short_all: "14000" + change_in_open_interest_all: "5000" + change_in_m_money_long_all: "2000" + change_in_m_money_short_all: "-1000" + change_in_prod_merc_long_all: "1000" + change_in_prod_merc_short_all: "500" + conc_gross_le_4_tdr_long_all: "35.5" + conc_gross_le_4_tdr_short_all: "28.3" + conc_gross_le_8_tdr_long_all: "52.1" + conc_gross_le_8_tdr_short_all: "44.7" + traders_tot_all: "450" + traders_m_money_long_all: "85" + traders_m_money_short_all: "62" + traders_m_money_spread_all: "20" + filename: "data/landing/cot/2024/abc123.csv.gzip" + expected: + rows: + - report_date: "2024-01-02" + cftc_commodity_code: "083731" + open_interest: 250000 + managed_money_long: 60000 + managed_money_short: 40000 + managed_money_net: 20000 + prod_merc_long: 80000 + prod_merc_short: 90000 + prod_merc_net: -10000 + swap_long: 30000 + swap_short: 35000 + swap_net: -5000 + nonreportable_long: 12000 + nonreportable_short: 14000 + nonreportable_net: -2000 + change_managed_money_net: 3000 + traders_managed_money_long: 85 + traders_managed_money_short: 62 + +test_fct_cot_positioning_rejects_null_commodity: + model: foundation.fct_cot_positioning + inputs: + raw.cot_disaggregated: + rows: + - market_and_exchange_names: "SOME OTHER CONTRACT" + report_date_as_yyyy_mm_dd: "2024-01-02" + cftc_commodity_code: "" + cftc_contract_market_code: "999999" + contract_units: "N/A" + open_interest_all: "1000" + prod_merc_positions_long_all: "500" + prod_merc_positions_short_all: "500" + swap_positions_long_all: "0" + swap_positions_short_all: "0" + swap_positions_spread_all: "0" + m_money_positions_long_all: "0" + m_money_positions_short_all: "0" + m_money_positions_spread_all: "0" + other_rept_positions_long_all: "0" + other_rept_positions_short_all: "0" + other_rept_positions_spread_all: "0" + nonrept_positions_long_all: "0" + nonrept_positions_short_all: "0" + change_in_open_interest_all: "0" + change_in_m_money_long_all: "0" + change_in_m_money_short_all: "0" + change_in_prod_merc_long_all: "0" + change_in_prod_merc_short_all: "0" + conc_gross_le_4_tdr_long_all: "0" + conc_gross_le_4_tdr_short_all: "0" + conc_gross_le_8_tdr_long_all: "0" + conc_gross_le_8_tdr_short_all: "0" + traders_tot_all: "10" + traders_m_money_long_all: "0" + traders_m_money_short_all: "0" + traders_m_money_spread_all: "0" + filename: "data/landing/cot/2024/abc123.csv.gzip" + expected: + rows: [] diff --git a/uv.lock b/uv.lock index b03c8b9..14c020d 100644 --- a/uv.lock +++ b/uv.lock @@ -9,6 +9,7 @@ resolution-markers = [ [manifest] members = [ "beanflows", + "cftc-cot", "materia", "psdonline", "sqlmesh-materia", @@ -344,6 +345,17 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/db/3c/33bac158f8ab7f89b2e59426d5fe2e4f63f7ed25df84c036890172b412b5/cfgv-3.5.0-py2.py3-none-any.whl", hash = "sha256:a8dc6b26ad22ff227d2634a65cb388215ce6cc96bbcc5cfde7641ae87e8dacc0", size = 7445, upload-time = "2025-11-19T20:55:50.744Z" }, ] +[[package]] +name = "cftc-cot" +version = "0.1.0" +source = { editable = "extract/cftc_cot" } +dependencies = [ + { name = "niquests" }, +] + +[package.metadata] +requires-dist = [{ name = "niquests", specifier = ">=3.14.1" }] + [[package]] name = "charset-normalizer" version = "3.4.4" diff --git a/web/src/beanflows/analytics.py b/web/src/beanflows/analytics.py index 72980a1..17d6104 100644 --- a/web/src/beanflows/analytics.py +++ b/web/src/beanflows/analytics.py @@ -12,6 +12,9 @@ import duckdb # Coffee (Green) commodity code in USDA PSD COFFEE_COMMODITY_CODE = 711100 +# Coffee C futures commodity code in CFTC COT reports +COFFEE_CFTC_CODE = "083731" + # Metrics safe for user-facing queries (prevents SQL injection in dynamic column refs) ALLOWED_METRICS = frozenset({ "production", @@ -203,6 +206,116 @@ async def get_production_yoy_by_country( ) +# ============================================================================= +# COT Positioning Queries +# ============================================================================= + +# Columns safe for user-facing COT queries +ALLOWED_COT_METRICS = frozenset({ + "open_interest", + "managed_money_long", + "managed_money_short", + "managed_money_net", + "managed_money_spread", + "managed_money_net_pct_of_oi", + "managed_money_long_short_ratio", + "managed_money_net_wow", + "prod_merc_long", + "prod_merc_short", + "prod_merc_net", + "swap_long", + "swap_short", + "swap_net", + "other_reportable_net", + "nonreportable_net", + "change_open_interest", + "change_managed_money_net", + "cot_index_26w", + "cot_index_52w", + "concentration_top4_long_pct", + "concentration_top8_long_pct", + "traders_total", + "traders_managed_money_long", + "traders_managed_money_short", +}) + + +def _validate_cot_metrics(metrics: list[str]) -> list[str]: + valid = [m for m in metrics if m in ALLOWED_COT_METRICS] + assert valid, f"No valid COT metrics in {metrics}. Allowed: {sorted(ALLOWED_COT_METRICS)}" + return valid + + +async def get_cot_positioning_time_series( + cftc_commodity_code: str, + metrics: list[str], + start_date: str | None = None, + end_date: str | None = None, + limit: int = 520, +) -> list[dict]: + """Weekly COT positioning time series. limit defaults to ~10 years of weekly data.""" + assert 1 <= limit <= 2000, "limit must be between 1 and 2000" + metrics = _validate_cot_metrics(metrics) + cols = ", ".join(metrics) + + where_parts = ["cftc_commodity_code = ?"] + params: list = [cftc_commodity_code] + + if start_date is not None: + where_parts.append("report_date >= ?") + params.append(start_date) + if end_date is not None: + where_parts.append("report_date <= ?") + params.append(end_date) + + where_clause = " AND ".join(where_parts) + + return await fetch_analytics( + f""" + SELECT report_date, {cols} + FROM serving.cot_positioning + WHERE {where_clause} + ORDER BY report_date ASC + LIMIT ? + """, + [*params, limit], + ) + + +async def get_cot_positioning_latest(cftc_commodity_code: str) -> dict | None: + """Latest week's full COT positioning snapshot.""" + rows = await fetch_analytics( + """ + SELECT * + FROM serving.cot_positioning + WHERE cftc_commodity_code = ? + ORDER BY report_date DESC + LIMIT 1 + """, + [cftc_commodity_code], + ) + return rows[0] if rows else None + + +async def get_cot_index_trend( + cftc_commodity_code: str, + weeks: int = 104, +) -> list[dict]: + """COT Index time series (26w and 52w) for the trailing N weeks.""" + assert 1 <= weeks <= 1040, "weeks must be between 1 and 1040" + return await fetch_analytics( + """ + SELECT report_date, cot_index_26w, cot_index_52w, + managed_money_net, managed_money_net_pct_of_oi + FROM serving.cot_positioning + WHERE cftc_commodity_code = ? + ORDER BY report_date DESC + LIMIT ? + """, + [cftc_commodity_code, weeks], + ) + + async def get_country_comparison( commodity_code: int, country_codes: list[str], diff --git a/web/src/beanflows/api/routes.py b/web/src/beanflows/api/routes.py index f32532c..1c62e04 100644 --- a/web/src/beanflows/api/routes.py +++ b/web/src/beanflows/api/routes.py @@ -162,6 +162,42 @@ async def commodity_countries(code: int): return jsonify({"commodity_code": code, "metric": metric, "data": data}) +@bp.route("/commodities//positioning") +@api_key_required(scopes=["read"]) +async def commodity_positioning(code: str): + """COT trader positioning time series for a commodity. + + Query params: + metrics — repeated param, e.g. ?metrics=managed_money_net&metrics=cot_index_26w + start_date — ISO date filter (YYYY-MM-DD) + end_date — ISO date filter (YYYY-MM-DD) + limit — max rows returned (default 260, max 2000) + """ + raw_metrics = request.args.getlist("metrics") or [ + "managed_money_net", "prod_merc_net", "open_interest", "cot_index_26w" + ] + metrics = [m for m in raw_metrics if m in analytics.ALLOWED_COT_METRICS] + if not metrics: + return jsonify({"error": f"No valid metrics. Allowed: {sorted(analytics.ALLOWED_COT_METRICS)}"}), 400 + + start_date = request.args.get("start_date") + end_date = request.args.get("end_date") + limit = min(int(request.args.get("limit", 260)), 2000) + + data = await analytics.get_cot_positioning_time_series(code, metrics, start_date, end_date, limit) + return jsonify({"cftc_commodity_code": code, "metrics": metrics, "data": data}) + + +@bp.route("/commodities//positioning/latest") +@api_key_required(scopes=["read"]) +async def commodity_positioning_latest(code: str): + """Latest week's full COT positioning snapshot for a commodity.""" + data = await analytics.get_cot_positioning_latest(code) + if not data: + return jsonify({"error": "No positioning data found for this commodity"}), 404 + return jsonify({"cftc_commodity_code": code, "data": data}) + + @bp.route("/commodities//metrics.csv") @api_key_required(scopes=["read"]) async def commodity_metrics_csv(code: int): diff --git a/web/src/beanflows/dashboard/routes.py b/web/src/beanflows/dashboard/routes.py index bd6b200..c82fca7 100644 --- a/web/src/beanflows/dashboard/routes.py +++ b/web/src/beanflows/dashboard/routes.py @@ -11,7 +11,6 @@ from quart import Blueprint, flash, g, jsonify, redirect, render_template, reque from .. import analytics from ..auth.routes import login_required, update_user - from ..core import csrf_protect, execute, fetch_all, fetch_one, soft_delete # Blueprint with its own template folder @@ -99,9 +98,9 @@ async def index(): stats = await get_user_stats(g.user["id"]) plan = (g.get("subscription") or {}).get("plan", "free") - # Fetch all analytics data in parallel (empty lists if DB not available) + # Fetch all analytics data in parallel (empty lists/None if DB not available) if analytics._conn is not None: - time_series, top_producers, stu_trend, balance, yoy = await asyncio.gather( + time_series, top_producers, stu_trend, balance, yoy, cot_latest, cot_trend = await asyncio.gather( analytics.get_global_time_series( analytics.COFFEE_COMMODITY_CODE, ["production", "exports", "imports", "ending_stocks", "total_distribution"], @@ -110,9 +109,12 @@ async def index(): analytics.get_stock_to_use_trend(analytics.COFFEE_COMMODITY_CODE), analytics.get_supply_demand_balance(analytics.COFFEE_COMMODITY_CODE), analytics.get_production_yoy_by_country(analytics.COFFEE_COMMODITY_CODE, limit=15), + analytics.get_cot_positioning_latest(analytics.COFFEE_CFTC_CODE), + analytics.get_cot_index_trend(analytics.COFFEE_CFTC_CODE, weeks=104), ) else: time_series, top_producers, stu_trend, balance, yoy = [], [], [], [], [] + cot_latest, cot_trend = None, [] # Latest global snapshot for key metric cards latest = time_series[-1] if time_series else {} @@ -136,6 +138,8 @@ async def index(): stu_trend=stu_trend, balance=balance, yoy=yoy, + cot_latest=cot_latest, + cot_trend=cot_trend, ) diff --git a/web/src/beanflows/dashboard/templates/index.html b/web/src/beanflows/dashboard/templates/index.html index 9c08dc8..d180348 100644 --- a/web/src/beanflows/dashboard/templates/index.html +++ b/web/src/beanflows/dashboard/templates/index.html @@ -115,6 +115,39 @@
CSV export available on Trader and Analyst plans. Upgrade
{% endif %} + + {% if cot_latest %} +
+

Speculative Positioning — Coffee C Futures

+

CFTC Commitment of Traders · Managed Money net position (hedge funds & CTAs) · as of {{ cot_latest.report_date }}

+
+
+
Managed Money Net
+
+ {{ "{:+,d}".format(cot_latest.managed_money_net | int) }} +
+
contracts (long − short)
+
+
+
COT Index (26w)
+
{{ "{:.0f}".format(cot_latest.cot_index_26w) }}
+
0 = most bearish · 100 = most bullish
+
+
+
Net % of Open Interest
+
{{ "{:+.1f}".format(cot_latest.managed_money_net_pct_of_oi) }}%
+
managed money positioning
+
+
+
Open Interest
+
{{ "{:,d}".format(cot_latest.open_interest | int) }}
+
total contracts outstanding
+
+
+ +
+ {% endif %} +
Country Comparison @@ -202,6 +235,57 @@ if (stuData.length > 0) { }); } +// -- COT Positioning Chart -- +const cotRaw = {{ cot_trend | tojson }}; +if (cotRaw && cotRaw.length > 0) { + const cotData = [...cotRaw].reverse(); // query returns DESC, chart needs ASC + new Chart(document.getElementById('cotPositioningChart'), { + type: 'line', + data: { + labels: cotData.map(r => r.report_date), + datasets: [ + { + label: 'Managed Money Net (contracts)', + data: cotData.map(r => r.managed_money_net), + borderColor: CHART_PALETTE[0], + backgroundColor: CHART_PALETTE[0] + '22', + fill: true, + tension: 0.3, + yAxisID: 'y' + }, + { + label: 'COT Index 26w (0–100)', + data: cotData.map(r => r.cot_index_26w), + borderColor: CHART_PALETTE[2], + borderDash: [5, 4], + tension: 0.3, + pointRadius: 0, + yAxisID: 'y1' + } + ] + }, + options: { + responsive: true, + interaction: {mode: 'index', intersect: false}, + plugins: {legend: {position: 'bottom'}}, + scales: { + x: {ticks: {maxTicksLimit: 12}}, + y: { + title: {display: true, text: 'Net Contracts'}, + position: 'left' + }, + y1: { + title: {display: true, text: 'COT Index'}, + position: 'right', + min: 0, + max: 100, + grid: {drawOnChartArea: false} + } + } + } + }); +} + // -- Top Producers Horizontal Bar -- const topData = {{ top_producers | tojson }}; if (topData.length > 0) {