merge: CFTC COT combined (futures+options) report — extractor, transform, web toggle
This commit is contained in:
@@ -10,6 +10,7 @@ dependencies = [
|
||||
|
||||
[project.scripts]
|
||||
extract_cot = "cftc_cot.execute:extract_cot_dataset"
|
||||
extract_cot_combined = "cftc_cot.execute:extract_cot_combined"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
"""CFTC COT Disaggregated Futures data extraction.
|
||||
"""CFTC COT Disaggregated data extraction.
|
||||
|
||||
Downloads yearly ZIP files from CFTC and stores as gzip CSV in the landing
|
||||
directory. CFTC publishes one file per year that updates every Friday at
|
||||
3:30 PM ET. On first run this backfills all years from 2006. On subsequent
|
||||
runs it skips files whose etag matches what is already on disk.
|
||||
|
||||
Landing path: LANDING_DIR/cot/{year}/{etag}.csv.gzip
|
||||
Two report variants are supported:
|
||||
- Futures-only: Landing path: LANDING_DIR/cot/{year}/{etag}.csv.gzip
|
||||
- Combined (fut+options): Landing path: LANDING_DIR/cot_combined/{year}/{etag}.csv.gzip
|
||||
"""
|
||||
|
||||
import logging
|
||||
@@ -37,9 +39,10 @@ logger = logging.getLogger("CFTC COT Extractor")
|
||||
|
||||
LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing"))
|
||||
|
||||
# CFTC publishes yearly ZIPs for the disaggregated futures-only report.
|
||||
# CFTC publishes yearly ZIPs for both variants of the disaggregated report.
|
||||
# The file for the current year is updated each Friday at 3:30 PM ET.
|
||||
COT_URL_TEMPLATE = "https://www.cftc.gov/files/dea/history/fut_disagg_txt_{year}.zip"
|
||||
COT_URL_FUTURES_ONLY = "https://www.cftc.gov/files/dea/history/fut_disagg_txt_{year}.zip"
|
||||
COT_URL_COMBINED = "https://www.cftc.gov/files/dea/history/com_disagg_txt_{year}.zip"
|
||||
|
||||
FIRST_YEAR = 2006 # Disaggregated report starts June 2006
|
||||
HTTP_TIMEOUT_SECONDS = 120 # COT ZIPs are up to ~30 MB
|
||||
@@ -60,12 +63,12 @@ def _synthetic_etag(year: int, headers: dict) -> str:
|
||||
return etag
|
||||
|
||||
|
||||
def extract_cot_year(year: int, http_session: niquests.Session) -> int:
|
||||
def extract_cot_year(year: int, http_session: niquests.Session, url_template: str, landing_subdir: str) -> int:
|
||||
"""Download and store COT data for a single year.
|
||||
|
||||
Returns bytes_written (0 if skipped or unavailable).
|
||||
"""
|
||||
url = COT_URL_TEMPLATE.format(year=year)
|
||||
url = url_template.format(year=year)
|
||||
logger.info(f"Checking COT data for {year}: {url}")
|
||||
|
||||
head = http_session.head(url, timeout=HTTP_TIMEOUT_SECONDS)
|
||||
@@ -79,7 +82,7 @@ def extract_cot_year(year: int, http_session: niquests.Session) -> int:
|
||||
raw_etag = head.headers.get("etag", "")
|
||||
etag = normalize_etag(raw_etag) if raw_etag else _synthetic_etag(year, head.headers)
|
||||
|
||||
dest_dir = landing_path(LANDING_DIR, "cot", str(year))
|
||||
dest_dir = landing_path(LANDING_DIR, landing_subdir, str(year))
|
||||
local_file = dest_dir / f"{etag}.csv.gzip"
|
||||
|
||||
if local_file.exists():
|
||||
@@ -104,8 +107,8 @@ def extract_cot_year(year: int, http_session: niquests.Session) -> int:
|
||||
return bytes_written
|
||||
|
||||
|
||||
def extract_cot_dataset():
|
||||
"""Extract all available CFTC COT disaggregated futures data.
|
||||
def _extract_cot(url_template: str, landing_subdir: str, extractor_name: str) -> None:
|
||||
"""Shared extraction loop for any COT report variant.
|
||||
|
||||
Downloads current year first (always re-checks for weekly Friday updates),
|
||||
then backfills historical years. Bounded to MAX_YEARS. Continues on
|
||||
@@ -119,7 +122,7 @@ def extract_cot_dataset():
|
||||
)
|
||||
|
||||
conn = open_state_db(LANDING_DIR)
|
||||
run_id = start_run(conn, "cftc_cot")
|
||||
run_id = start_run(conn, extractor_name)
|
||||
files_written = 0
|
||||
files_skipped = 0
|
||||
bytes_written_total = 0
|
||||
@@ -127,7 +130,7 @@ def extract_cot_dataset():
|
||||
with niquests.Session() as session:
|
||||
for year in years:
|
||||
try:
|
||||
result = extract_cot_year(year, session)
|
||||
result = extract_cot_year(year, session, url_template, landing_subdir)
|
||||
if result > 0:
|
||||
files_written += 1
|
||||
bytes_written_total += result
|
||||
@@ -136,7 +139,7 @@ def extract_cot_dataset():
|
||||
except Exception:
|
||||
logger.exception(f"Failed to extract COT data for {year}, continuing")
|
||||
|
||||
logger.info(f"COT extraction complete: {files_written} new file(s) downloaded")
|
||||
logger.info(f"COT extraction complete ({extractor_name}): {files_written} new file(s) downloaded")
|
||||
end_run(
|
||||
conn, run_id, status="success",
|
||||
files_written=files_written, files_skipped=files_skipped,
|
||||
@@ -150,5 +153,15 @@ def extract_cot_dataset():
|
||||
conn.close()
|
||||
|
||||
|
||||
def extract_cot_dataset():
|
||||
"""Extract CFTC COT disaggregated futures-only report."""
|
||||
_extract_cot(COT_URL_FUTURES_ONLY, "cot", "cftc_cot")
|
||||
|
||||
|
||||
def extract_cot_combined():
|
||||
"""Extract CFTC COT disaggregated combined (futures+options) report."""
|
||||
_extract_cot(COT_URL_COMBINED, "cot_combined", "cftc_cot_combined")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
extract_cot_dataset()
|
||||
|
||||
Reference in New Issue
Block a user