feat(cot): add combined (futures+options) COT extractor and transform models

- extract/cftc_cot: refactor extract_cot_year() to accept url_template and
  landing_subdir params; add _extract_cot() shared loop; add extract_cot_combined()
  entry point using com_disagg_txt_{year}.zip → landing/cot_combined/
- pyproject.toml: add extract_cot_combined script entry point
- macros/__init__.py: add @cot_combined_glob() for cot_combined/**/*.csv.gzip
- fct_cot_positioning.sql: union cot_glob and cot_combined_glob in src CTE;
  add report_type column (FutOnly_or_Combined) to cast_and_clean + deduplicated;
  include FutOnly_or_Combined in hkey to avoid key collisions; add report_type to grain
- obt_cot_positioning.sql: add report_type = 'FutOnly' filter to preserve
  existing serving behavior
- obt_cot_positioning_combined.sql: new serving model filtered to report_type =
  'Combined'; identical analytics (COT index, net %, windows) on combined data
- pipelines.py: register extract_cot_combined; add to extract_all meta-pipeline

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-26 11:24:56 +01:00
parent 8628496881
commit b884bc2b4a
7 changed files with 205 additions and 16 deletions

View File

@@ -10,6 +10,7 @@ dependencies = [
[project.scripts]
extract_cot = "cftc_cot.execute:extract_cot_dataset"
extract_cot_combined = "cftc_cot.execute:extract_cot_combined"
[build-system]
requires = ["hatchling"]

View File

@@ -1,11 +1,13 @@
"""CFTC COT Disaggregated Futures data extraction.
"""CFTC COT Disaggregated data extraction.
Downloads yearly ZIP files from CFTC and stores as gzip CSV in the landing
directory. CFTC publishes one file per year that updates every Friday at
3:30 PM ET. On first run this backfills all years from 2006. On subsequent
runs it skips files whose etag matches what is already on disk.
Landing path: LANDING_DIR/cot/{year}/{etag}.csv.gzip
Two report variants are supported:
- Futures-only: Landing path: LANDING_DIR/cot/{year}/{etag}.csv.gzip
- Combined (fut+options): Landing path: LANDING_DIR/cot_combined/{year}/{etag}.csv.gzip
"""
import logging
@@ -37,9 +39,10 @@ logger = logging.getLogger("CFTC COT Extractor")
LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing"))
# CFTC publishes yearly ZIPs for the disaggregated futures-only report.
# CFTC publishes yearly ZIPs for both variants of the disaggregated report.
# The file for the current year is updated each Friday at 3:30 PM ET.
COT_URL_TEMPLATE = "https://www.cftc.gov/files/dea/history/fut_disagg_txt_{year}.zip"
COT_URL_FUTURES_ONLY = "https://www.cftc.gov/files/dea/history/fut_disagg_txt_{year}.zip"
COT_URL_COMBINED = "https://www.cftc.gov/files/dea/history/com_disagg_txt_{year}.zip"
FIRST_YEAR = 2006 # Disaggregated report starts June 2006
HTTP_TIMEOUT_SECONDS = 120 # COT ZIPs are up to ~30 MB
@@ -60,12 +63,12 @@ def _synthetic_etag(year: int, headers: dict) -> str:
return etag
def extract_cot_year(year: int, http_session: niquests.Session) -> int:
def extract_cot_year(year: int, http_session: niquests.Session, url_template: str, landing_subdir: str) -> int:
"""Download and store COT data for a single year.
Returns bytes_written (0 if skipped or unavailable).
"""
url = COT_URL_TEMPLATE.format(year=year)
url = url_template.format(year=year)
logger.info(f"Checking COT data for {year}: {url}")
head = http_session.head(url, timeout=HTTP_TIMEOUT_SECONDS)
@@ -79,7 +82,7 @@ def extract_cot_year(year: int, http_session: niquests.Session) -> int:
raw_etag = head.headers.get("etag", "")
etag = normalize_etag(raw_etag) if raw_etag else _synthetic_etag(year, head.headers)
dest_dir = landing_path(LANDING_DIR, "cot", str(year))
dest_dir = landing_path(LANDING_DIR, landing_subdir, str(year))
local_file = dest_dir / f"{etag}.csv.gzip"
if local_file.exists():
@@ -104,8 +107,8 @@ def extract_cot_year(year: int, http_session: niquests.Session) -> int:
return bytes_written
def extract_cot_dataset():
"""Extract all available CFTC COT disaggregated futures data.
def _extract_cot(url_template: str, landing_subdir: str, extractor_name: str) -> None:
"""Shared extraction loop for any COT report variant.
Downloads current year first (always re-checks for weekly Friday updates),
then backfills historical years. Bounded to MAX_YEARS. Continues on
@@ -119,7 +122,7 @@ def extract_cot_dataset():
)
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, "cftc_cot")
run_id = start_run(conn, extractor_name)
files_written = 0
files_skipped = 0
bytes_written_total = 0
@@ -127,7 +130,7 @@ def extract_cot_dataset():
with niquests.Session() as session:
for year in years:
try:
result = extract_cot_year(year, session)
result = extract_cot_year(year, session, url_template, landing_subdir)
if result > 0:
files_written += 1
bytes_written_total += result
@@ -136,7 +139,7 @@ def extract_cot_dataset():
except Exception:
logger.exception(f"Failed to extract COT data for {year}, continuing")
logger.info(f"COT extraction complete: {files_written} new file(s) downloaded")
logger.info(f"COT extraction complete ({extractor_name}): {files_written} new file(s) downloaded")
end_run(
conn, run_id, status="success",
files_written=files_written, files_skipped=files_skipped,
@@ -150,5 +153,15 @@ def extract_cot_dataset():
conn.close()
def extract_cot_dataset():
"""Extract CFTC COT disaggregated futures-only report."""
_extract_cot(COT_URL_FUTURES_ONLY, "cot", "cftc_cot")
def extract_cot_combined():
"""Extract CFTC COT disaggregated combined (futures+options) report."""
_extract_cot(COT_URL_COMBINED, "cot_combined", "cftc_cot_combined")
if __name__ == "__main__":
extract_cot_dataset()