Files
beanflows/tests/test_ice_extraction.py
Deeman ff7301d6a8 ICE extraction overhaul: API discovery + aging report + historical backfill
- Replace brittle ICE_STOCKS_URL env var with API-based URL discovery via
  the private ICE Report Center JSON API (no auth required)
- Add rolling CSV → XLS fallback in extract_ice_stocks() using
  find_latest_report() from ice_api.py
- Add ice_api.py: fetch_report_listings(), find_latest_report() with
  pagination up to MAX_API_PAGES
- Add xls_parse.py: detect_file_format() (magic bytes), xls_to_rows()
  using xlrd for OLE2/BIFF XLS files
- Add extract_ice_aging(): monthly certified stock aging report by
  age bucket × port → ice_aging/ landing dir
- Add extract_ice_historical(): 30-year EOM by-port stocks from static
  ICE URL → ice_stocks_by_port/ landing dir
- Add xlrd>=2.0.1 (parse XLS), xlwt>=1.3.0 (dev, test fixtures)
- Add SQLMesh raw + foundation models for both new datasets
- Add ice_aging_glob(), ice_stocks_by_port_glob() macros
- Add extract_ice_aging + extract_ice_historical pipeline entries
- Add 12 unit tests (format detection, XLS roundtrip, API mock, CSV output)

Seed files (data/landing/ice_aging/seed/ and ice_stocks_by_port/seed/)
must be created locally — data/ is gitignored.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 21:13:18 +01:00

185 lines
7.2 KiB
Python

"""Tests for ICE extraction: format detection, XLS parsing, API client."""
import csv
import gzip
import io
import struct
from unittest.mock import MagicMock, patch
import pytest
import xlwt # noqa: F401 — needed to create XLS fixtures; skip tests if missing
from ice_stocks.ice_api import fetch_report_listings, find_latest_report
from ice_stocks.xls_parse import OLE2_MAGIC, detect_file_format, xls_to_rows
# ── helpers ──────────────────────────────────────────────────────────────────
def _make_xls_bytes(rows: list[list]) -> bytes:
"""Create a minimal in-memory XLS with one sheet."""
book = xlwt.Workbook()
sheet = book.add_sheet("Sheet1")
for r, row in enumerate(rows):
for c, val in enumerate(row):
sheet.write(r, c, val)
buf = io.BytesIO()
book.save(buf)
return buf.getvalue()
def _api_response(rows: list[dict]) -> dict:
"""Build a mock ICE API response payload."""
return {"datasets": {"results": {"rows": rows}}}
def _make_api_row(label: str, url: str = "/download/test.xls", publish_date: str = "2026-02-01") -> dict:
return {
"publishDate": publish_date,
"productName": "Coffee C",
"download": {"url": url, "label": label},
}
# ── detect_file_format ───────────────────────────────────────────────────────
def test_detect_file_format_xls():
content = OLE2_MAGIC + b"\x00" * 100
assert detect_file_format(content) == "xls"
def test_detect_file_format_xlsx():
content = b"PK\x03\x04" + b"\x00" * 100
assert detect_file_format(content) == "xlsx"
def test_detect_file_format_html():
content = b"<html><body>foo</body></html>"
assert detect_file_format(content) == "html"
def test_detect_file_format_csv():
content = b"report_date,total_certified_bags\n2026-01-01,100000\n"
assert detect_file_format(content) == "csv"
# ── xls_to_rows ──────────────────────────────────────────────────────────────
def test_xls_to_rows_roundtrip():
pytest.importorskip("xlwt")
input_rows = [
["header1", "header2", "header3"],
["value1", 42.0, "value3"],
["", 0.0, ""],
]
xls_bytes = _make_xls_bytes(input_rows)
assert xls_bytes[:4] == OLE2_MAGIC
result = xls_to_rows(xls_bytes)
assert len(result) == 3
assert result[0][0] == "header1"
assert result[1][1] == 42.0
# Empty cells come back as ""
assert result[2][0] == ""
def test_xls_to_rows_rejects_non_xls():
with pytest.raises(AssertionError, match="Not an OLE2"):
xls_to_rows(b"PK\x03\x04" + b"\x00" * 100)
# ── fetch_report_listings ────────────────────────────────────────────────────
def test_fetch_report_listings_parses_response():
mock_session = MagicMock()
mock_session.post.return_value.status_code = 200
mock_session.post.return_value.json.return_value = _api_response([
_make_api_row("Daily Warehouse Stocks", "/dl/stocks.xls"),
_make_api_row("Certified Stock Aging Report", "/dl/aging.xls"),
])
from ice_stocks.ice_api import ICE_BASE_URL, fetch_report_listings
rows = fetch_report_listings(mock_session, product_id=2)
assert len(rows) == 2
assert rows[0]["download_label"] == "Daily Warehouse Stocks"
assert rows[0]["download_url"] == ICE_BASE_URL + "/dl/stocks.xls"
assert rows[1]["download_label"] == "Certified Stock Aging Report"
def test_fetch_report_listings_prepends_base_url_for_absolute():
"""If URL already starts with http, don't prepend base."""
mock_session = MagicMock()
mock_session.post.return_value.status_code = 200
mock_session.post.return_value.json.return_value = _api_response([
_make_api_row("Test", "https://other.example.com/file.xls"),
])
from ice_stocks.ice_api import fetch_report_listings
rows = fetch_report_listings(mock_session, product_id=2)
assert rows[0]["download_url"] == "https://other.example.com/file.xls"
# ── find_latest_report ───────────────────────────────────────────────────────
def test_find_latest_report_label_match():
mock_session = MagicMock()
mock_session.post.return_value.status_code = 200
mock_session.post.return_value.json.return_value = _api_response([
_make_api_row("Daily Warehouse Stocks"),
_make_api_row("Certified Stock Aging Report"),
_make_api_row("Historical Stocks"),
])
result = find_latest_report(mock_session, "Aging Report")
assert result is not None
assert result["download_label"] == "Certified Stock Aging Report"
def test_find_latest_report_no_match_returns_none():
mock_session = MagicMock()
mock_session.post.return_value.status_code = 200
# Return empty rows on all pages
mock_session.post.return_value.json.return_value = _api_response([])
result = find_latest_report(mock_session, "Nonexistent Label XYZ")
assert result is None
# ── canonical CSV output ──────────────────────────────────────────────────────
def test_build_canonical_csv_from_xls_rows():
"""Verify execute._build_canonical_csv_from_xls produces correct schema."""
pytest.importorskip("xlwt")
# Simulate ICE daily stocks XLS structure
sheet_rows = [
["Coffee C Warehouse Stocks"] + [""] * 9, # row 0
[""] * 10, # row 1
["As of: 2/14/2026"] + [""] * 9, # row 2 — report date
] + [[""] * 10] * 20 + [ # rows 3-22
["Total in Bags", 10000.0, 5000.0, 2000.0, 1000.0, 500.0, 0.0, 0.0, 0.0, 18500.0], # row 23
]
xls_bytes = _make_xls_bytes(sheet_rows)
from ice_stocks.execute import _build_canonical_csv_from_xls
result = _build_canonical_csv_from_xls(xls_bytes)
assert result, "Expected non-empty CSV output"
reader = csv.DictReader(io.StringIO(result.decode("utf-8")))
rows = list(reader)
assert len(rows) == 1
assert rows[0]["report_date"] == "2026-02-14"
assert rows[0]["total_certified_bags"] == "18500"
def test_build_canonical_csv_from_xls_missing_date_returns_empty():
"""If header row has no parseable date, return empty bytes."""
pytest.importorskip("xlwt")
sheet_rows = [["Not a valid date header"] + [""] * 9] + [[""] * 10] * 30
xls_bytes = _make_xls_bytes(sheet_rows)
from ice_stocks.execute import _build_canonical_csv_from_xls
result = _build_canonical_csv_from_xls(xls_bytes)
assert result == b""