Files
beanflows/tests/test_cot_extraction.py
Deeman d58fa67238
Some checks failed
CI / test-cli (push) Successful in 11s
CI / test-sqlmesh (push) Successful in 13s
CI / test-web (push) Failing after 11s
CI / tag (push) Has been skipped
fix(tests): update test assertions to match refactored function signatures
- Pass url_template and landing_subdir to extract_cot_year (signature changed to support both COT variants)
- Update secrets test assertion from 'ESC connection successful' to 'SOPS decryption successful'

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 01:56:13 +01:00

148 lines
4.7 KiB
Python

"""Tests for CFTC COT extraction package."""
import gzip
import zipfile
from io import BytesIO
from unittest.mock import MagicMock
from cftc_cot.normalize import find_csv_inner_filename, normalize_zipped_csv
# =============================================================================
# normalize.py
# =============================================================================
def _make_zip(inner_name: str, content: bytes) -> BytesIO:
"""Helper: create a ZIP buffer containing a single named file."""
buf = BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr(inner_name, content)
buf.seek(0)
return buf
def test_find_csv_inner_filename_returns_txt_file():
buf = _make_zip("f_year.txt", b"col1,col2\nv1,v2\n")
assert find_csv_inner_filename(buf) == "f_year.txt"
def test_find_csv_inner_filename_case_insensitive():
buf = _make_zip("FUT_DISAGG_2015.TXT", b"data")
assert find_csv_inner_filename(buf) == "FUT_DISAGG_2015.TXT"
def test_find_csv_inner_filename_asserts_on_zero_txt_files():
buf = BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("readme.md", b"not a txt file")
buf.seek(0)
try:
find_csv_inner_filename(buf)
assert False, "Should have raised AssertionError"
except AssertionError as e:
assert "Expected exactly 1" in str(e)
def test_find_csv_inner_filename_asserts_on_multiple_txt_files():
buf = BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("a.txt", b"data a")
zf.writestr("b.txt", b"data b")
buf.seek(0)
try:
find_csv_inner_filename(buf)
assert False, "Should have raised AssertionError"
except AssertionError:
pass
def test_normalize_zipped_csv_produces_valid_gzip():
csv_content = b"Market_and_Exchange_Names,CFTC_Commodity_Code\nCOFFEE C,083731\n"
buf = _make_zip("f_year.txt", csv_content)
result = normalize_zipped_csv(buf, "f_year.txt")
# Decompress and verify content round-trips
with gzip.open(result, "rb") as gz:
decompressed = gz.read()
assert decompressed == csv_content
def test_normalize_zipped_csv_resets_seek_position():
buf = _make_zip("f_year.txt", b"data")
result = normalize_zipped_csv(buf, "f_year.txt")
assert result.tell() == 0, "Result BytesIO should be seeked to position 0"
def test_normalize_zipped_csv_asserts_on_wrong_inner_name():
buf = _make_zip("actual.txt", b"data")
try:
normalize_zipped_csv(buf, "expected.txt")
assert False, "Should have raised AssertionError"
except AssertionError as e:
assert "expected.txt" in str(e)
# =============================================================================
# execute.py — pipeline registry integration
# =============================================================================
def test_extract_cot_pipeline_registered():
"""extract_cot must appear in the materia pipeline registry."""
from materia.pipelines import PIPELINES
assert "extract_cot" in PIPELINES
entry = PIPELINES["extract_cot"]
assert entry["command"] == ["uv", "run", "--package", "cftc_cot", "extract_cot"]
assert entry["timeout_seconds"] == 1800
def test_extract_cot_year_skips_existing_file(tmp_path, monkeypatch):
"""extract_cot_year returns False and skips download when file already exists."""
import pathlib
monkeypatch.setenv("LANDING_DIR", str(tmp_path))
# Pre-create the etag file to simulate existing data
dest = tmp_path / "cot" / "2024"
dest.mkdir(parents=True)
etag = "abc123"
(dest / f"{etag}.csv.gzip").write_bytes(b"existing")
from cftc_cot import execute as cot_execute
# Reload LANDING_DIR after monkeypatch
cot_execute.LANDING_DIR = pathlib.Path(str(tmp_path))
mock_session = MagicMock()
mock_head = MagicMock()
mock_head.status_code = 200
mock_head.headers = {"etag": f'"{etag}"'}
mock_session.head.return_value = mock_head
result = cot_execute.extract_cot_year(2024, mock_session, cot_execute.COT_URL_FUTURES_ONLY, "cot")
assert result == 0
mock_session.get.assert_not_called() # No download should occur
def test_extract_cot_year_returns_false_on_404(tmp_path, monkeypatch):
"""extract_cot_year returns False when CFTC returns 404 for a year."""
import pathlib
monkeypatch.setenv("LANDING_DIR", str(tmp_path))
from cftc_cot import execute as cot_execute
cot_execute.LANDING_DIR = pathlib.Path(str(tmp_path))
mock_session = MagicMock()
mock_head = MagicMock()
mock_head.status_code = 404
mock_session.head.return_value = mock_head
result = cot_execute.extract_cot_year(2006, mock_session, cot_execute.COT_URL_FUTURES_ONLY, "cot")
assert result == 0
mock_session.get.assert_not_called()