Add CFTC COT data integration with foundation data model layer

- New extraction package (cftc_cot): downloads yearly Disaggregated Futures ZIPs
  from CFTC, etag-based dedup, dynamic inner filename discovery, gzip normalization
- SQLMesh 3-layer architecture: raw (technical) → foundation (business model) → serving (mart)
- dim_commodity seed: conformed dimension mapping USDA ↔ CFTC codes — the commodity ontology
- fct_cot_positioning: typed, deduplicated weekly positioning facts for all commodities
- obt_cot_positioning: Coffee C mart with COT Index (26w/52w), WoW delta, OI ratios
- Analytics functions + REST API endpoints: /commodities/<code>/positioning[/latest]
- Dashboard widget: Managed Money net, COT Index card, dual-axis Chart.js chart
- 23 passing tests (10 unit + 2 SQLMesh model + existing regression suite)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-20 21:57:04 +01:00
parent d09ba91023
commit 0a83b2cb74
19 changed files with 1111 additions and 3 deletions

View File

@@ -0,0 +1,18 @@
[project]
name = "cftc_cot"
version = "0.1.0"
description = "CFTC Commitment of Traders data extractor"
requires-python = ">=3.13"
dependencies = [
"niquests>=3.14.1",
]
[project.scripts]
extract_cot = "cftc_cot.execute:extract_cot_dataset"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/cftc_cot"]

View File

@@ -0,0 +1,129 @@
"""CFTC COT Disaggregated Futures data extraction.
Downloads yearly ZIP files from CFTC and stores as gzip CSV in the landing
directory. CFTC publishes one file per year that updates every Friday at
3:30 PM ET. On first run this backfills all years from 2006. On subsequent
runs it skips files whose etag matches what is already on disk.
Landing path: LANDING_DIR/cot/{year}/{etag}.csv.gzip
"""
import logging
import os
import pathlib
import sys
from datetime import datetime
from io import BytesIO
import niquests
from .normalize import find_csv_inner_filename, normalize_zipped_csv
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("CFTC COT Extractor")
LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing"))
# CFTC publishes yearly ZIPs for the disaggregated futures-only report.
# The file for the current year is updated each Friday at 3:30 PM ET.
COT_URL_TEMPLATE = "https://www.cftc.gov/files/dea/history/fut_disagg_txt_{year}.zip"
FIRST_YEAR = 2006 # Disaggregated report starts June 2006
HTTP_TIMEOUT_SECONDS = 120 # COT ZIPs are up to ~30 MB
MAX_YEARS = 25 # Safety bound on backfill range
def _synthetic_etag(year: int, headers: dict) -> str:
"""Build a dedup key when CFTC omits the etag header.
Uses content-length + last-modified. This is not as strong as a real etag
(a server clock change would trigger a re-download), but it is safe because
the staging layer deduplicates via hash key.
"""
last_modified = headers.get("last-modified", "")
content_length = headers.get("content-length", "0")
etag = f"{year}_{content_length}_{hash(last_modified) & 0xFFFFFFFF:08x}"
logger.info(f"No etag header for {year}, using synthetic etag: {etag}")
return etag
def extract_cot_year(year: int, http_session: niquests.Session) -> bool:
"""Download and store COT data for a single year.
Returns True if a new file was written, False if skipped or unavailable.
"""
url = COT_URL_TEMPLATE.format(year=year)
logger.info(f"Checking COT data for {year}: {url}")
head = http_session.head(url, timeout=HTTP_TIMEOUT_SECONDS)
if head.status_code == 404:
logger.info(f"Year {year} not available (404) — skipping")
return False
assert head.status_code == 200, (
f"Unexpected HEAD status {head.status_code} for {url}"
)
raw_etag = head.headers.get("etag", "")
etag = raw_etag.replace('"', "").replace(":", "_") if raw_etag else _synthetic_etag(year, head.headers)
dest_dir = LANDING_DIR / "cot" / str(year)
local_file = dest_dir / f"{etag}.csv.gzip"
if local_file.exists():
logger.info(f"Year {year}: {etag}.csv.gzip already exists, skipping")
return False
logger.info(f"Downloading COT data for {year}...")
response = http_session.get(url, timeout=HTTP_TIMEOUT_SECONDS)
assert response.status_code == 200, (
f"GET failed with status {response.status_code} for {url}"
)
assert len(response.content) > 0, f"Downloaded empty file from {url}"
zip_buffer = BytesIO(response.content)
inner_filename = find_csv_inner_filename(BytesIO(response.content))
normalized = normalize_zipped_csv(zip_buffer, inner_filename)
dest_dir.mkdir(parents=True, exist_ok=True)
local_file.write_bytes(normalized.read())
assert local_file.exists(), f"File was not written: {local_file}"
assert local_file.stat().st_size > 0, f"Written file is empty: {local_file}"
logger.info(f"Stored {local_file} ({local_file.stat().st_size:,} bytes)")
return True
def extract_cot_dataset():
"""Extract all available CFTC COT disaggregated futures data.
Downloads current year first (always re-checks for weekly Friday updates),
then backfills historical years. Bounded to MAX_YEARS. Continues on
individual year failures so a single bad year does not abort the run.
"""
LANDING_DIR.mkdir(parents=True, exist_ok=True)
current_year = datetime.now().year
years = list(range(current_year, FIRST_YEAR - 1, -1))
assert len(years) <= MAX_YEARS, (
f"Year range {len(years)} exceeds MAX_YEARS={MAX_YEARS}"
)
new_count = 0
with niquests.Session() as session:
for year in years:
try:
if extract_cot_year(year, session):
new_count += 1
except Exception:
logger.exception(f"Failed to extract COT data for {year}, continuing")
logger.info(f"COT extraction complete: {new_count} new file(s) downloaded")
if __name__ == "__main__":
extract_cot_dataset()

View File

@@ -0,0 +1,43 @@
"""Normalize CFTC ZIP archives to gzip CSV."""
import gzip
import zipfile
from io import BytesIO
def find_csv_inner_filename(buffer: BytesIO) -> str:
"""Find the single .txt file inside a CFTC ZIP archive.
CFTC uses .txt extension for their comma-delimited CSV files. The filename
varies across years (e.g. 'f_year.txt', 'FUT_DISAGG_2015.txt'). We assert
exactly one .txt file exists and return its name.
"""
with zipfile.ZipFile(buffer, mode="r") as zf:
txt_files = [n for n in zf.namelist() if n.lower().endswith(".txt")]
assert len(txt_files) == 1, (
f"Expected exactly 1 .txt file in CFTC ZIP, found: {zf.namelist()}"
)
return txt_files[0]
def normalize_zipped_csv(buffer: BytesIO, inner_filename: str) -> BytesIO:
"""Extract a single CSV from a ZIP and recompress as gzip.
Args:
buffer: ZIP file content as BytesIO (will be read from position 0).
inner_filename: Name of the file inside the ZIP archive.
Returns:
BytesIO with gzip-compressed CSV content, seeked to position 0.
"""
buffer.seek(0)
out = BytesIO()
with zipfile.ZipFile(buffer, mode="r") as zf:
assert inner_filename in zf.namelist(), (
f"Expected '{inner_filename}' in ZIP, found: {zf.namelist()}"
)
with zf.open(inner_filename, mode="r") as csv_file:
with gzip.open(out, "wb") as gz:
gz.write(csv_file.read())
out.seek(0)
return out

View File

@@ -40,6 +40,7 @@ dev = [
[tool.uv.sources]
psdonline = {workspace = true }
sqlmesh_materia = {workspace = true }
cftc_cot = {workspace = true }
[tool.uv.workspace]
members = [

View File

@@ -16,6 +16,10 @@ PIPELINES = {
"command": ["uv", "run", "--package", "psdonline", "extract_psd"],
"timeout_seconds": 1800,
},
"extract_cot": {
"command": ["uv", "run", "--package", "cftc_cot", "extract_cot"],
"timeout_seconds": 1800,
},
"transform": {
"command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"],
"timeout_seconds": 3600,

View File

@@ -0,0 +1,147 @@
"""Tests for CFTC COT extraction package."""
import gzip
import zipfile
from io import BytesIO
from unittest.mock import MagicMock
from cftc_cot.normalize import find_csv_inner_filename, normalize_zipped_csv
# =============================================================================
# normalize.py
# =============================================================================
def _make_zip(inner_name: str, content: bytes) -> BytesIO:
"""Helper: create a ZIP buffer containing a single named file."""
buf = BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr(inner_name, content)
buf.seek(0)
return buf
def test_find_csv_inner_filename_returns_txt_file():
buf = _make_zip("f_year.txt", b"col1,col2\nv1,v2\n")
assert find_csv_inner_filename(buf) == "f_year.txt"
def test_find_csv_inner_filename_case_insensitive():
buf = _make_zip("FUT_DISAGG_2015.TXT", b"data")
assert find_csv_inner_filename(buf) == "FUT_DISAGG_2015.TXT"
def test_find_csv_inner_filename_asserts_on_zero_txt_files():
buf = BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("readme.md", b"not a txt file")
buf.seek(0)
try:
find_csv_inner_filename(buf)
assert False, "Should have raised AssertionError"
except AssertionError as e:
assert "Expected exactly 1" in str(e)
def test_find_csv_inner_filename_asserts_on_multiple_txt_files():
buf = BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("a.txt", b"data a")
zf.writestr("b.txt", b"data b")
buf.seek(0)
try:
find_csv_inner_filename(buf)
assert False, "Should have raised AssertionError"
except AssertionError:
pass
def test_normalize_zipped_csv_produces_valid_gzip():
csv_content = b"Market_and_Exchange_Names,CFTC_Commodity_Code\nCOFFEE C,083731\n"
buf = _make_zip("f_year.txt", csv_content)
result = normalize_zipped_csv(buf, "f_year.txt")
# Decompress and verify content round-trips
with gzip.open(result, "rb") as gz:
decompressed = gz.read()
assert decompressed == csv_content
def test_normalize_zipped_csv_resets_seek_position():
buf = _make_zip("f_year.txt", b"data")
result = normalize_zipped_csv(buf, "f_year.txt")
assert result.tell() == 0, "Result BytesIO should be seeked to position 0"
def test_normalize_zipped_csv_asserts_on_wrong_inner_name():
buf = _make_zip("actual.txt", b"data")
try:
normalize_zipped_csv(buf, "expected.txt")
assert False, "Should have raised AssertionError"
except AssertionError as e:
assert "expected.txt" in str(e)
# =============================================================================
# execute.py — pipeline registry integration
# =============================================================================
def test_extract_cot_pipeline_registered():
"""extract_cot must appear in the materia pipeline registry."""
from materia.pipelines import PIPELINES
assert "extract_cot" in PIPELINES
entry = PIPELINES["extract_cot"]
assert entry["command"] == ["uv", "run", "--package", "cftc_cot", "extract_cot"]
assert entry["timeout_seconds"] == 1800
def test_extract_cot_year_skips_existing_file(tmp_path, monkeypatch):
"""extract_cot_year returns False and skips download when file already exists."""
import pathlib
monkeypatch.setenv("LANDING_DIR", str(tmp_path))
# Pre-create the etag file to simulate existing data
dest = tmp_path / "cot" / "2024"
dest.mkdir(parents=True)
etag = "abc123"
(dest / f"{etag}.csv.gzip").write_bytes(b"existing")
from cftc_cot import execute as cot_execute
# Reload LANDING_DIR after monkeypatch
cot_execute.LANDING_DIR = pathlib.Path(str(tmp_path))
mock_session = MagicMock()
mock_head = MagicMock()
mock_head.status_code = 200
mock_head.headers = {"etag": f'"{etag}"'}
mock_session.head.return_value = mock_head
result = cot_execute.extract_cot_year(2024, mock_session)
assert result is False
mock_session.get.assert_not_called() # No download should occur
def test_extract_cot_year_returns_false_on_404(tmp_path, monkeypatch):
"""extract_cot_year returns False when CFTC returns 404 for a year."""
import pathlib
monkeypatch.setenv("LANDING_DIR", str(tmp_path))
from cftc_cot import execute as cot_execute
cot_execute.LANDING_DIR = pathlib.Path(str(tmp_path))
mock_session = MagicMock()
mock_head = MagicMock()
mock_head.status_code = 404
mock_session.head.return_value = mock_head
result = cot_execute.extract_cot_year(2006, mock_session)
assert result is False
mock_session.get.assert_not_called()

View File

@@ -8,3 +8,10 @@ def psd_glob(evaluator) -> str:
"""Return a quoted glob path for all PSD CSV gzip files under LANDING_DIR."""
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
return f"'{landing_dir}/psd/**/*.csv.gzip'"
@macro()
def cot_glob(evaluator) -> str:
"""Return a quoted glob path for all COT CSV gzip files under LANDING_DIR."""
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
return f"'{landing_dir}/cot/**/*.csv.gzip'"

View File

@@ -0,0 +1,24 @@
-- Commodity dimension: conforms identifiers across source systems.
--
-- This is the ontology seed. Each row is a commodity tracked by BeanFlows.
-- As new sources are added (ICO, futures prices, satellite), their
-- commodity identifiers are added as columns here — not as separate tables.
-- As new commodities are added (cocoa, sugar), rows are added here.
--
-- References:
-- usda_commodity_code → raw.psd_alldata.commodity_code
-- cftc_commodity_code → raw.cot_disaggregated.cftc_commodity_code
MODEL (
name foundation.dim_commodity,
kind SEED (
path '$root/seeds/dim_commodity.csv',
csv_settings (delimiter = ';')
),
columns (
usda_commodity_code varchar,
cftc_commodity_code varchar,
commodity_name varchar,
commodity_group varchar
)
);

View File

@@ -0,0 +1,160 @@
-- Foundation fact: CFTC COT positioning, weekly grain, all commodities.
--
-- Casts raw varchar columns to proper types, cleans column names,
-- computes net positions (long - short) per trader category, and
-- deduplicates via hash key. Covers all commodities — filtering to
-- a specific commodity happens in the serving layer.
--
-- Grain: one row per (cftc_commodity_code, report_date, cftc_contract_market_code)
-- History: revisions appear as new rows with a later ingest_date.
-- Serving layer picks max(ingest_date) per grain for latest view.
MODEL (
name foundation.fct_cot_positioning,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (cftc_commodity_code, report_date, cftc_contract_market_code, ingest_date),
start '2006-06-13',
cron '@daily'
);
WITH cast_and_clean AS (
SELECT
-- Identifiers
trim(market_and_exchange_names) AS market_and_exchange_name,
report_date_as_yyyy_mm_dd::date AS report_date,
trim(cftc_commodity_code) AS cftc_commodity_code,
trim(cftc_contract_market_code) AS cftc_contract_market_code,
trim(contract_units) AS contract_units,
-- Open interest
open_interest_all::int AS open_interest,
-- Producer / Merchant (commercial hedgers: exporters, processors)
prod_merc_positions_long_all::int AS prod_merc_long,
prod_merc_positions_short_all::int AS prod_merc_short,
-- Swap dealers
swap_positions_long_all::int AS swap_long,
swap_positions_short_all::int AS swap_short,
swap_positions_spread_all::int AS swap_spread,
-- Managed money (hedge funds, CTAs — the primary speculative signal)
m_money_positions_long_all::int AS managed_money_long,
m_money_positions_short_all::int AS managed_money_short,
m_money_positions_spread_all::int AS managed_money_spread,
-- Other reportables
other_rept_positions_long_all::int AS other_reportable_long,
other_rept_positions_short_all::int AS other_reportable_short,
other_rept_positions_spread_all::int AS other_reportable_spread,
-- Non-reportable (small speculators, below reporting threshold)
nonrept_positions_long_all::int AS nonreportable_long,
nonrept_positions_short_all::int AS nonreportable_short,
-- Net positions (long minus short per category)
prod_merc_positions_long_all::int
- prod_merc_positions_short_all::int AS prod_merc_net,
m_money_positions_long_all::int
- m_money_positions_short_all::int AS managed_money_net,
swap_positions_long_all::int
- swap_positions_short_all::int AS swap_net,
other_rept_positions_long_all::int
- other_rept_positions_short_all::int AS other_reportable_net,
nonrept_positions_long_all::int
- nonrept_positions_short_all::int AS nonreportable_net,
-- Week-over-week changes
change_in_open_interest_all::int AS change_open_interest,
change_in_m_money_long_all::int AS change_managed_money_long,
change_in_m_money_short_all::int AS change_managed_money_short,
change_in_m_money_long_all::int
- change_in_m_money_short_all::int AS change_managed_money_net,
change_in_prod_merc_long_all::int AS change_prod_merc_long,
change_in_prod_merc_short_all::int AS change_prod_merc_short,
-- Concentration ratios (% of OI held by top 4 / top 8 traders)
conc_gross_le_4_tdr_long_all::float AS concentration_top4_long_pct,
conc_gross_le_4_tdr_short_all::float AS concentration_top4_short_pct,
conc_gross_le_8_tdr_long_all::float AS concentration_top8_long_pct,
conc_gross_le_8_tdr_short_all::float AS concentration_top8_short_pct,
-- Trader counts
traders_tot_all::int AS traders_total,
traders_m_money_long_all::int AS traders_managed_money_long,
traders_m_money_short_all::int AS traders_managed_money_short,
traders_m_money_spread_all::int AS traders_managed_money_spread,
-- Ingest date: derived from landing path year directory
-- Path: .../cot/{year}/{etag}.csv.gzip → extract year from [-2]
make_date(split(filename, '/')[-2]::int, 1, 1) AS ingest_date,
-- Dedup key: hash of business grain + key metrics
hash(
cftc_commodity_code,
report_date_as_yyyy_mm_dd,
cftc_contract_market_code,
open_interest_all,
m_money_positions_long_all,
m_money_positions_short_all,
prod_merc_positions_long_all,
prod_merc_positions_short_all
) AS hkey
FROM raw.cot_disaggregated
-- Reject rows with null commodity code or malformed date
WHERE trim(cftc_commodity_code) IS NOT NULL
AND len(trim(cftc_commodity_code)) > 0
AND report_date_as_yyyy_mm_dd::date IS NOT NULL
),
deduplicated AS (
SELECT
any_value(market_and_exchange_name) AS market_and_exchange_name,
any_value(report_date) AS report_date,
any_value(cftc_commodity_code) AS cftc_commodity_code,
any_value(cftc_contract_market_code) AS cftc_contract_market_code,
any_value(contract_units) AS contract_units,
any_value(open_interest) AS open_interest,
any_value(prod_merc_long) AS prod_merc_long,
any_value(prod_merc_short) AS prod_merc_short,
any_value(prod_merc_net) AS prod_merc_net,
any_value(swap_long) AS swap_long,
any_value(swap_short) AS swap_short,
any_value(swap_spread) AS swap_spread,
any_value(swap_net) AS swap_net,
any_value(managed_money_long) AS managed_money_long,
any_value(managed_money_short) AS managed_money_short,
any_value(managed_money_spread) AS managed_money_spread,
any_value(managed_money_net) AS managed_money_net,
any_value(other_reportable_long) AS other_reportable_long,
any_value(other_reportable_short) AS other_reportable_short,
any_value(other_reportable_spread) AS other_reportable_spread,
any_value(other_reportable_net) AS other_reportable_net,
any_value(nonreportable_long) AS nonreportable_long,
any_value(nonreportable_short) AS nonreportable_short,
any_value(nonreportable_net) AS nonreportable_net,
any_value(change_open_interest) AS change_open_interest,
any_value(change_managed_money_long) AS change_managed_money_long,
any_value(change_managed_money_short) AS change_managed_money_short,
any_value(change_managed_money_net) AS change_managed_money_net,
any_value(change_prod_merc_long) AS change_prod_merc_long,
any_value(change_prod_merc_short) AS change_prod_merc_short,
any_value(concentration_top4_long_pct) AS concentration_top4_long_pct,
any_value(concentration_top4_short_pct) AS concentration_top4_short_pct,
any_value(concentration_top8_long_pct) AS concentration_top8_long_pct,
any_value(concentration_top8_short_pct) AS concentration_top8_short_pct,
any_value(traders_total) AS traders_total,
any_value(traders_managed_money_long) AS traders_managed_money_long,
any_value(traders_managed_money_short) AS traders_managed_money_short,
any_value(traders_managed_money_spread) AS traders_managed_money_spread,
any_value(ingest_date) AS ingest_date,
hkey
FROM cast_and_clean
GROUP BY hkey
)
SELECT *
FROM deduplicated
WHERE report_date BETWEEN @start_ds AND @end_ds

View File

@@ -0,0 +1,85 @@
-- Raw CFTC Commitment of Traders — Disaggregated Futures Only.
--
-- Technical ingestion layer only: reads gzip CSVs from the landing directory
-- and surfaces the columns needed by downstream foundation models.
-- All values are varchar; casting happens in foundation.
--
-- Source: CFTC yearly ZIPs at
-- https://www.cftc.gov/files/dea/history/fut_disagg_txt_{year}.zip
-- Coverage: June 2006 present (new file every Friday at 3:30 PM ET)
MODEL (
name raw.cot_disaggregated,
kind FULL,
grain (cftc_commodity_code, report_date_as_yyyy_mm_dd, cftc_contract_market_code),
start '2006-06-13',
cron '@daily'
);
SELECT
-- Identifiers
"Market_and_Exchange_Names" AS market_and_exchange_names,
"Report_Date_as_YYYY-MM-DD" AS report_date_as_yyyy_mm_dd,
"CFTC_Commodity_Code" AS cftc_commodity_code,
"CFTC_Contract_Market_Code" AS cftc_contract_market_code,
"Contract_Units" AS contract_units,
-- Open interest
"Open_Interest_All" AS open_interest_all,
-- Producer / Merchant / Processor / User (commercial hedgers)
"Prod_Merc_Positions_Long_All" AS prod_merc_positions_long_all,
"Prod_Merc_Positions_Short_All" AS prod_merc_positions_short_all,
-- Swap dealers
"Swap_Positions_Long_All" AS swap_positions_long_all,
"Swap__Positions_Short_All" AS swap_positions_short_all,
"Swap__Positions_Spread_All" AS swap_positions_spread_all,
-- Managed money (hedge funds, CTAs — key speculative signal)
"M_Money_Positions_Long_All" AS m_money_positions_long_all,
"M_Money_Positions_Short_All" AS m_money_positions_short_all,
"M_Money_Positions_Spread_All" AS m_money_positions_spread_all,
-- Other reportables
"Other_Rept_Positions_Long_All" AS other_rept_positions_long_all,
"Other_Rept_Positions_Short_All" AS other_rept_positions_short_all,
"Other_Rept_Positions_Spread_All" AS other_rept_positions_spread_all,
-- Non-reportable (small speculators)
"NonRept_Positions_Long_All" AS nonrept_positions_long_all,
"NonRept_Positions_Short_All" AS nonrept_positions_short_all,
-- Week-over-week changes
"Change_in_Open_Interest_All" AS change_in_open_interest_all,
"Change_in_M_Money_Long_All" AS change_in_m_money_long_all,
"Change_in_M_Money_Short_All" AS change_in_m_money_short_all,
"Change_in_Prod_Merc_Long_All" AS change_in_prod_merc_long_all,
"Change_in_Prod_Merc_Short_All" AS change_in_prod_merc_short_all,
-- Concentration (% of OI held by top 4 and top 8 traders)
"Conc_Gross_LE_4_TDR_Long_All" AS conc_gross_le_4_tdr_long_all,
"Conc_Gross_LE_4_TDR_Short_All" AS conc_gross_le_4_tdr_short_all,
"Conc_Gross_LE_8_TDR_Long_All" AS conc_gross_le_8_tdr_long_all,
"Conc_Gross_LE_8_TDR_Short_All" AS conc_gross_le_8_tdr_short_all,
-- Trader counts
"Traders_Tot_All" AS traders_tot_all,
"Traders_M_Money_Long_All" AS traders_m_money_long_all,
"Traders_M_Money_Short_All" AS traders_m_money_short_all,
"Traders_M_Money_Spread_All" AS traders_m_money_spread_all,
-- Lineage
filename
FROM read_csv(
@cot_glob(),
delim = ',',
encoding = 'utf-8',
compression = 'gzip',
header = true,
union_by_name = true,
filename = true,
all_varchar = true,
max_line_size = 10000000,
ignore_errors = true
)

View File

@@ -0,0 +1,140 @@
-- Serving mart: COT positioning for Coffee C futures, analytics-ready.
--
-- Joins foundation.fct_cot_positioning with foundation.dim_commodity so
-- the coffee filter is driven by the dimension (not a hardcoded CFTC code).
-- Adds derived analytics used by the dashboard and API:
-- - Normalized positioning (% of open interest)
-- - Long/short ratio
-- - Week-over-week momentum
-- - COT Index over 26-week and 52-week trailing windows (0=bearish, 100=bullish)
--
-- Grain: one row per report_date for Coffee C futures.
-- Latest revision per date: MAX(ingest_date) used to deduplicate CFTC corrections.
MODEL (
name serving.cot_positioning,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (report_date),
start '2006-06-13',
cron '@daily'
);
WITH latest_revision AS (
-- Pick the most recently ingested row when CFTC issues corrections
SELECT f.*
FROM foundation.fct_cot_positioning f
INNER JOIN foundation.dim_commodity d
ON f.cftc_commodity_code = d.cftc_commodity_code
WHERE d.commodity_name = 'Coffee, Green'
AND f.report_date BETWEEN @start_ds AND @end_ds
QUALIFY ROW_NUMBER() OVER (
PARTITION BY f.report_date, f.cftc_contract_market_code
ORDER BY f.ingest_date DESC
) = 1
),
with_derived AS (
SELECT
report_date,
market_and_exchange_name,
cftc_commodity_code,
cftc_contract_market_code,
contract_units,
ingest_date,
-- Absolute positions (contracts)
open_interest,
managed_money_long,
managed_money_short,
managed_money_spread,
managed_money_net,
prod_merc_long,
prod_merc_short,
prod_merc_net,
swap_long,
swap_short,
swap_spread,
swap_net,
other_reportable_long,
other_reportable_short,
other_reportable_spread,
other_reportable_net,
nonreportable_long,
nonreportable_short,
nonreportable_net,
-- Normalized: managed money net as % of open interest
-- Removes size effects and makes cross-period comparison meaningful
round(
managed_money_net::float / NULLIF(open_interest, 0) * 100,
2
) AS managed_money_net_pct_of_oi,
-- Long/short ratio: >1 = more bulls than bears in managed money
round(
managed_money_long::float / NULLIF(managed_money_short, 0),
3
) AS managed_money_long_short_ratio,
-- Weekly changes
change_open_interest,
change_managed_money_long,
change_managed_money_short,
change_managed_money_net,
change_prod_merc_long,
change_prod_merc_short,
-- Week-over-week momentum in managed money net (via LAG)
managed_money_net - LAG(managed_money_net, 1) OVER (
ORDER BY report_date
) AS managed_money_net_wow,
-- Concentration
concentration_top4_long_pct,
concentration_top4_short_pct,
concentration_top8_long_pct,
concentration_top8_short_pct,
-- Trader counts
traders_total,
traders_managed_money_long,
traders_managed_money_short,
traders_managed_money_spread,
-- COT Index (26-week): where is current net vs. trailing 26 weeks?
-- 0 = most bearish extreme, 100 = most bullish extreme
-- Industry-standard sentiment gauge (equivalent to RSI for positioning)
CASE
WHEN MAX(managed_money_net) OVER w26 = MIN(managed_money_net) OVER w26
THEN 50.0
ELSE round(
(managed_money_net - MIN(managed_money_net) OVER w26)::float
/ (MAX(managed_money_net) OVER w26 - MIN(managed_money_net) OVER w26)
* 100,
1
)
END AS cot_index_26w,
-- COT Index (52-week): longer-term positioning context
CASE
WHEN MAX(managed_money_net) OVER w52 = MIN(managed_money_net) OVER w52
THEN 50.0
ELSE round(
(managed_money_net - MIN(managed_money_net) OVER w52)::float
/ (MAX(managed_money_net) OVER w52 - MIN(managed_money_net) OVER w52)
* 100,
1
)
END AS cot_index_52w
FROM latest_revision
WINDOW
w26 AS (ORDER BY report_date ROWS BETWEEN 25 PRECEDING AND CURRENT ROW),
w52 AS (ORDER BY report_date ROWS BETWEEN 51 PRECEDING AND CURRENT ROW)
)
SELECT *
FROM with_derived
ORDER BY report_date

View File

@@ -0,0 +1,2 @@
usda_commodity_code;cftc_commodity_code;commodity_name;commodity_group
0711100;083731;Coffee, Green;Softs
1 usda_commodity_code cftc_commodity_code commodity_name commodity_group
2 0711100 083731 Coffee, Green Softs

View File

@@ -0,0 +1,99 @@
test_fct_cot_positioning_types_and_net_positions:
model: foundation.fct_cot_positioning
inputs:
raw.cot_disaggregated:
rows:
- market_and_exchange_names: "COFFEE C - ICE FUTURES U.S."
report_date_as_yyyy_mm_dd: "2024-01-02"
cftc_commodity_code: "083731"
cftc_contract_market_code: "083731"
contract_units: "37,500 POUNDS"
open_interest_all: "250000"
prod_merc_positions_long_all: "80000"
prod_merc_positions_short_all: "90000"
swap_positions_long_all: "30000"
swap_positions_short_all: "35000"
swap_positions_spread_all: "10000"
m_money_positions_long_all: "60000"
m_money_positions_short_all: "40000"
m_money_positions_spread_all: "15000"
other_rept_positions_long_all: "20000"
other_rept_positions_short_all: "18000"
other_rept_positions_spread_all: "5000"
nonrept_positions_long_all: "12000"
nonrept_positions_short_all: "14000"
change_in_open_interest_all: "5000"
change_in_m_money_long_all: "2000"
change_in_m_money_short_all: "-1000"
change_in_prod_merc_long_all: "1000"
change_in_prod_merc_short_all: "500"
conc_gross_le_4_tdr_long_all: "35.5"
conc_gross_le_4_tdr_short_all: "28.3"
conc_gross_le_8_tdr_long_all: "52.1"
conc_gross_le_8_tdr_short_all: "44.7"
traders_tot_all: "450"
traders_m_money_long_all: "85"
traders_m_money_short_all: "62"
traders_m_money_spread_all: "20"
filename: "data/landing/cot/2024/abc123.csv.gzip"
expected:
rows:
- report_date: "2024-01-02"
cftc_commodity_code: "083731"
open_interest: 250000
managed_money_long: 60000
managed_money_short: 40000
managed_money_net: 20000
prod_merc_long: 80000
prod_merc_short: 90000
prod_merc_net: -10000
swap_long: 30000
swap_short: 35000
swap_net: -5000
nonreportable_long: 12000
nonreportable_short: 14000
nonreportable_net: -2000
change_managed_money_net: 3000
traders_managed_money_long: 85
traders_managed_money_short: 62
test_fct_cot_positioning_rejects_null_commodity:
model: foundation.fct_cot_positioning
inputs:
raw.cot_disaggregated:
rows:
- market_and_exchange_names: "SOME OTHER CONTRACT"
report_date_as_yyyy_mm_dd: "2024-01-02"
cftc_commodity_code: ""
cftc_contract_market_code: "999999"
contract_units: "N/A"
open_interest_all: "1000"
prod_merc_positions_long_all: "500"
prod_merc_positions_short_all: "500"
swap_positions_long_all: "0"
swap_positions_short_all: "0"
swap_positions_spread_all: "0"
m_money_positions_long_all: "0"
m_money_positions_short_all: "0"
m_money_positions_spread_all: "0"
other_rept_positions_long_all: "0"
other_rept_positions_short_all: "0"
other_rept_positions_spread_all: "0"
nonrept_positions_long_all: "0"
nonrept_positions_short_all: "0"
change_in_open_interest_all: "0"
change_in_m_money_long_all: "0"
change_in_m_money_short_all: "0"
change_in_prod_merc_long_all: "0"
change_in_prod_merc_short_all: "0"
conc_gross_le_4_tdr_long_all: "0"
conc_gross_le_4_tdr_short_all: "0"
conc_gross_le_8_tdr_long_all: "0"
conc_gross_le_8_tdr_short_all: "0"
traders_tot_all: "10"
traders_m_money_long_all: "0"
traders_m_money_short_all: "0"
traders_m_money_spread_all: "0"
filename: "data/landing/cot/2024/abc123.csv.gzip"
expected:
rows: []

12
uv.lock generated
View File

@@ -9,6 +9,7 @@ resolution-markers = [
[manifest]
members = [
"beanflows",
"cftc-cot",
"materia",
"psdonline",
"sqlmesh-materia",
@@ -344,6 +345,17 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/db/3c/33bac158f8ab7f89b2e59426d5fe2e4f63f7ed25df84c036890172b412b5/cfgv-3.5.0-py2.py3-none-any.whl", hash = "sha256:a8dc6b26ad22ff227d2634a65cb388215ce6cc96bbcc5cfde7641ae87e8dacc0", size = 7445, upload-time = "2025-11-19T20:55:50.744Z" },
]
[[package]]
name = "cftc-cot"
version = "0.1.0"
source = { editable = "extract/cftc_cot" }
dependencies = [
{ name = "niquests" },
]
[package.metadata]
requires-dist = [{ name = "niquests", specifier = ">=3.14.1" }]
[[package]]
name = "charset-normalizer"
version = "3.4.4"

View File

@@ -12,6 +12,9 @@ import duckdb
# Coffee (Green) commodity code in USDA PSD
COFFEE_COMMODITY_CODE = 711100
# Coffee C futures commodity code in CFTC COT reports
COFFEE_CFTC_CODE = "083731"
# Metrics safe for user-facing queries (prevents SQL injection in dynamic column refs)
ALLOWED_METRICS = frozenset({
"production",
@@ -203,6 +206,116 @@ async def get_production_yoy_by_country(
)
# =============================================================================
# COT Positioning Queries
# =============================================================================
# Columns safe for user-facing COT queries
ALLOWED_COT_METRICS = frozenset({
"open_interest",
"managed_money_long",
"managed_money_short",
"managed_money_net",
"managed_money_spread",
"managed_money_net_pct_of_oi",
"managed_money_long_short_ratio",
"managed_money_net_wow",
"prod_merc_long",
"prod_merc_short",
"prod_merc_net",
"swap_long",
"swap_short",
"swap_net",
"other_reportable_net",
"nonreportable_net",
"change_open_interest",
"change_managed_money_net",
"cot_index_26w",
"cot_index_52w",
"concentration_top4_long_pct",
"concentration_top8_long_pct",
"traders_total",
"traders_managed_money_long",
"traders_managed_money_short",
})
def _validate_cot_metrics(metrics: list[str]) -> list[str]:
valid = [m for m in metrics if m in ALLOWED_COT_METRICS]
assert valid, f"No valid COT metrics in {metrics}. Allowed: {sorted(ALLOWED_COT_METRICS)}"
return valid
async def get_cot_positioning_time_series(
cftc_commodity_code: str,
metrics: list[str],
start_date: str | None = None,
end_date: str | None = None,
limit: int = 520,
) -> list[dict]:
"""Weekly COT positioning time series. limit defaults to ~10 years of weekly data."""
assert 1 <= limit <= 2000, "limit must be between 1 and 2000"
metrics = _validate_cot_metrics(metrics)
cols = ", ".join(metrics)
where_parts = ["cftc_commodity_code = ?"]
params: list = [cftc_commodity_code]
if start_date is not None:
where_parts.append("report_date >= ?")
params.append(start_date)
if end_date is not None:
where_parts.append("report_date <= ?")
params.append(end_date)
where_clause = " AND ".join(where_parts)
return await fetch_analytics(
f"""
SELECT report_date, {cols}
FROM serving.cot_positioning
WHERE {where_clause}
ORDER BY report_date ASC
LIMIT ?
""",
[*params, limit],
)
async def get_cot_positioning_latest(cftc_commodity_code: str) -> dict | None:
"""Latest week's full COT positioning snapshot."""
rows = await fetch_analytics(
"""
SELECT *
FROM serving.cot_positioning
WHERE cftc_commodity_code = ?
ORDER BY report_date DESC
LIMIT 1
""",
[cftc_commodity_code],
)
return rows[0] if rows else None
async def get_cot_index_trend(
cftc_commodity_code: str,
weeks: int = 104,
) -> list[dict]:
"""COT Index time series (26w and 52w) for the trailing N weeks."""
assert 1 <= weeks <= 1040, "weeks must be between 1 and 1040"
return await fetch_analytics(
"""
SELECT report_date, cot_index_26w, cot_index_52w,
managed_money_net, managed_money_net_pct_of_oi
FROM serving.cot_positioning
WHERE cftc_commodity_code = ?
ORDER BY report_date DESC
LIMIT ?
""",
[cftc_commodity_code, weeks],
)
async def get_country_comparison(
commodity_code: int,
country_codes: list[str],

View File

@@ -162,6 +162,42 @@ async def commodity_countries(code: int):
return jsonify({"commodity_code": code, "metric": metric, "data": data})
@bp.route("/commodities/<code>/positioning")
@api_key_required(scopes=["read"])
async def commodity_positioning(code: str):
"""COT trader positioning time series for a commodity.
Query params:
metrics — repeated param, e.g. ?metrics=managed_money_net&metrics=cot_index_26w
start_date — ISO date filter (YYYY-MM-DD)
end_date — ISO date filter (YYYY-MM-DD)
limit — max rows returned (default 260, max 2000)
"""
raw_metrics = request.args.getlist("metrics") or [
"managed_money_net", "prod_merc_net", "open_interest", "cot_index_26w"
]
metrics = [m for m in raw_metrics if m in analytics.ALLOWED_COT_METRICS]
if not metrics:
return jsonify({"error": f"No valid metrics. Allowed: {sorted(analytics.ALLOWED_COT_METRICS)}"}), 400
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
limit = min(int(request.args.get("limit", 260)), 2000)
data = await analytics.get_cot_positioning_time_series(code, metrics, start_date, end_date, limit)
return jsonify({"cftc_commodity_code": code, "metrics": metrics, "data": data})
@bp.route("/commodities/<code>/positioning/latest")
@api_key_required(scopes=["read"])
async def commodity_positioning_latest(code: str):
"""Latest week's full COT positioning snapshot for a commodity."""
data = await analytics.get_cot_positioning_latest(code)
if not data:
return jsonify({"error": "No positioning data found for this commodity"}), 404
return jsonify({"cftc_commodity_code": code, "data": data})
@bp.route("/commodities/<int:code>/metrics.csv")
@api_key_required(scopes=["read"])
async def commodity_metrics_csv(code: int):

View File

@@ -11,7 +11,6 @@ from quart import Blueprint, flash, g, jsonify, redirect, render_template, reque
from .. import analytics
from ..auth.routes import login_required, update_user
from ..core import csrf_protect, execute, fetch_all, fetch_one, soft_delete
# Blueprint with its own template folder
@@ -99,9 +98,9 @@ async def index():
stats = await get_user_stats(g.user["id"])
plan = (g.get("subscription") or {}).get("plan", "free")
# Fetch all analytics data in parallel (empty lists if DB not available)
# Fetch all analytics data in parallel (empty lists/None if DB not available)
if analytics._conn is not None:
time_series, top_producers, stu_trend, balance, yoy = await asyncio.gather(
time_series, top_producers, stu_trend, balance, yoy, cot_latest, cot_trend = await asyncio.gather(
analytics.get_global_time_series(
analytics.COFFEE_COMMODITY_CODE,
["production", "exports", "imports", "ending_stocks", "total_distribution"],
@@ -110,9 +109,12 @@ async def index():
analytics.get_stock_to_use_trend(analytics.COFFEE_COMMODITY_CODE),
analytics.get_supply_demand_balance(analytics.COFFEE_COMMODITY_CODE),
analytics.get_production_yoy_by_country(analytics.COFFEE_COMMODITY_CODE, limit=15),
analytics.get_cot_positioning_latest(analytics.COFFEE_CFTC_CODE),
analytics.get_cot_index_trend(analytics.COFFEE_CFTC_CODE, weeks=104),
)
else:
time_series, top_producers, stu_trend, balance, yoy = [], [], [], [], []
cot_latest, cot_trend = None, []
# Latest global snapshot for key metric cards
latest = time_series[-1] if time_series else {}
@@ -136,6 +138,8 @@ async def index():
stu_trend=stu_trend,
balance=balance,
yoy=yoy,
cot_latest=cot_latest,
cot_trend=cot_trend,
)

View File

@@ -115,6 +115,39 @@
<div class="plan-gate mb-8">CSV export available on Trader and Analyst plans. <a href="{{ url_for('billing.pricing') }}">Upgrade</a></div>
{% endif %}
<!-- Speculative Positioning (CFTC COT) -->
{% if cot_latest %}
<div class="chart-container mb-8">
<h2 class="text-xl mb-1">Speculative Positioning — Coffee C Futures</h2>
<p class="text-muted mb-4">CFTC Commitment of Traders · Managed Money net position (hedge funds &amp; CTAs) · as of {{ cot_latest.report_date }}</p>
<div class="grid-4 mb-4">
<div class="metric-card">
<div class="metric-label">Managed Money Net</div>
<div class="metric-value {% if cot_latest.managed_money_net > 0 %}text-green{% else %}text-red{% endif %}">
{{ "{:+,d}".format(cot_latest.managed_money_net | int) }}
</div>
<div class="metric-sub">contracts (long short)</div>
</div>
<div class="metric-card">
<div class="metric-label">COT Index (26w)</div>
<div class="metric-value">{{ "{:.0f}".format(cot_latest.cot_index_26w) }}</div>
<div class="metric-sub">0 = most bearish · 100 = most bullish</div>
</div>
<div class="metric-card">
<div class="metric-label">Net % of Open Interest</div>
<div class="metric-value">{{ "{:+.1f}".format(cot_latest.managed_money_net_pct_of_oi) }}%</div>
<div class="metric-sub">managed money positioning</div>
</div>
<div class="metric-card">
<div class="metric-label">Open Interest</div>
<div class="metric-value">{{ "{:,d}".format(cot_latest.open_interest | int) }}</div>
<div class="metric-sub">total contracts outstanding</div>
</div>
</div>
<canvas id="cotPositioningChart"></canvas>
</div>
{% endif %}
<!-- Quick Actions -->
<div class="grid-3">
<a href="{{ url_for('dashboard.countries') }}" class="btn-outline text-center">Country Comparison</a>
@@ -202,6 +235,57 @@ if (stuData.length > 0) {
});
}
// -- COT Positioning Chart --
const cotRaw = {{ cot_trend | tojson }};
if (cotRaw && cotRaw.length > 0) {
const cotData = [...cotRaw].reverse(); // query returns DESC, chart needs ASC
new Chart(document.getElementById('cotPositioningChart'), {
type: 'line',
data: {
labels: cotData.map(r => r.report_date),
datasets: [
{
label: 'Managed Money Net (contracts)',
data: cotData.map(r => r.managed_money_net),
borderColor: CHART_PALETTE[0],
backgroundColor: CHART_PALETTE[0] + '22',
fill: true,
tension: 0.3,
yAxisID: 'y'
},
{
label: 'COT Index 26w (0100)',
data: cotData.map(r => r.cot_index_26w),
borderColor: CHART_PALETTE[2],
borderDash: [5, 4],
tension: 0.3,
pointRadius: 0,
yAxisID: 'y1'
}
]
},
options: {
responsive: true,
interaction: {mode: 'index', intersect: false},
plugins: {legend: {position: 'bottom'}},
scales: {
x: {ticks: {maxTicksLimit: 12}},
y: {
title: {display: true, text: 'Net Contracts'},
position: 'left'
},
y1: {
title: {display: true, text: 'COT Index'},
position: 'right',
min: 0,
max: 100,
grid: {drawOnChartArea: false}
}
}
}
});
}
// -- Top Producers Horizontal Bar --
const topData = {{ top_producers | tojson }};
if (topData.length > 0) {