"""Tests for CFTC COT extraction package.""" import gzip import zipfile from io import BytesIO from unittest.mock import MagicMock from cftc_cot.normalize import find_csv_inner_filename, normalize_zipped_csv # ============================================================================= # normalize.py # ============================================================================= def _make_zip(inner_name: str, content: bytes) -> BytesIO: """Helper: create a ZIP buffer containing a single named file.""" buf = BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr(inner_name, content) buf.seek(0) return buf def test_find_csv_inner_filename_returns_txt_file(): buf = _make_zip("f_year.txt", b"col1,col2\nv1,v2\n") assert find_csv_inner_filename(buf) == "f_year.txt" def test_find_csv_inner_filename_case_insensitive(): buf = _make_zip("FUT_DISAGG_2015.TXT", b"data") assert find_csv_inner_filename(buf) == "FUT_DISAGG_2015.TXT" def test_find_csv_inner_filename_asserts_on_zero_txt_files(): buf = BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("readme.md", b"not a txt file") buf.seek(0) try: find_csv_inner_filename(buf) assert False, "Should have raised AssertionError" except AssertionError as e: assert "Expected exactly 1" in str(e) def test_find_csv_inner_filename_asserts_on_multiple_txt_files(): buf = BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("a.txt", b"data a") zf.writestr("b.txt", b"data b") buf.seek(0) try: find_csv_inner_filename(buf) assert False, "Should have raised AssertionError" except AssertionError: pass def test_normalize_zipped_csv_produces_valid_gzip(): csv_content = b"Market_and_Exchange_Names,CFTC_Commodity_Code\nCOFFEE C,083731\n" buf = _make_zip("f_year.txt", csv_content) result = normalize_zipped_csv(buf, "f_year.txt") # Decompress and verify content round-trips with gzip.open(result, "rb") as gz: decompressed = gz.read() assert decompressed == csv_content def test_normalize_zipped_csv_resets_seek_position(): buf = _make_zip("f_year.txt", b"data") result = normalize_zipped_csv(buf, "f_year.txt") assert result.tell() == 0, "Result BytesIO should be seeked to position 0" def test_normalize_zipped_csv_asserts_on_wrong_inner_name(): buf = _make_zip("actual.txt", b"data") try: normalize_zipped_csv(buf, "expected.txt") assert False, "Should have raised AssertionError" except AssertionError as e: assert "expected.txt" in str(e) # ============================================================================= # execute.py — pipeline registry integration # ============================================================================= def test_extract_cot_pipeline_registered(): """extract_cot must appear in the materia pipeline registry.""" from materia.pipelines import PIPELINES assert "extract_cot" in PIPELINES entry = PIPELINES["extract_cot"] assert entry["command"] == ["uv", "run", "--package", "cftc_cot", "extract_cot"] assert entry["timeout_seconds"] == 1800 def test_extract_cot_year_skips_existing_file(tmp_path, monkeypatch): """extract_cot_year returns False and skips download when file already exists.""" import pathlib monkeypatch.setenv("LANDING_DIR", str(tmp_path)) # Pre-create the etag file to simulate existing data dest = tmp_path / "cot" / "2024" dest.mkdir(parents=True) etag = "abc123" (dest / f"{etag}.csv.gzip").write_bytes(b"existing") from cftc_cot import execute as cot_execute # Reload LANDING_DIR after monkeypatch cot_execute.LANDING_DIR = pathlib.Path(str(tmp_path)) mock_session = MagicMock() mock_head = MagicMock() mock_head.status_code = 200 mock_head.headers = {"etag": f'"{etag}"'} mock_session.head.return_value = mock_head result = cot_execute.extract_cot_year(2024, mock_session) assert result == 0 mock_session.get.assert_not_called() # No download should occur def test_extract_cot_year_returns_false_on_404(tmp_path, monkeypatch): """extract_cot_year returns False when CFTC returns 404 for a year.""" import pathlib monkeypatch.setenv("LANDING_DIR", str(tmp_path)) from cftc_cot import execute as cot_execute cot_execute.LANDING_DIR = pathlib.Path(str(tmp_path)) mock_session = MagicMock() mock_head = MagicMock() mock_head.status_code = 404 mock_session.head.return_value = mock_head result = cot_execute.extract_cot_year(2006, mock_session) assert result == 0 mock_session.get.assert_not_called()