- Replace brittle ICE_STOCKS_URL env var with API-based URL discovery via the private ICE Report Center JSON API (no auth required) - Add rolling CSV → XLS fallback in extract_ice_stocks() using find_latest_report() from ice_api.py - Add ice_api.py: fetch_report_listings(), find_latest_report() with pagination up to MAX_API_PAGES - Add xls_parse.py: detect_file_format() (magic bytes), xls_to_rows() using xlrd for OLE2/BIFF XLS files - Add extract_ice_aging(): monthly certified stock aging report by age bucket × port → ice_aging/ landing dir - Add extract_ice_historical(): 30-year EOM by-port stocks from static ICE URL → ice_stocks_by_port/ landing dir - Add xlrd>=2.0.1 (parse XLS), xlwt>=1.3.0 (dev, test fixtures) - Add SQLMesh raw + foundation models for both new datasets - Add ice_aging_glob(), ice_stocks_by_port_glob() macros - Add extract_ice_aging + extract_ice_historical pipeline entries - Add 12 unit tests (format detection, XLS roundtrip, API mock, CSV output) Seed files (data/landing/ice_aging/seed/ and ice_stocks_by_port/seed/) must be created locally — data/ is gitignored. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
185 lines
7.2 KiB
Python
185 lines
7.2 KiB
Python
"""Tests for ICE extraction: format detection, XLS parsing, API client."""
|
|
|
|
import csv
|
|
import gzip
|
|
import io
|
|
import struct
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
import xlwt # noqa: F401 — needed to create XLS fixtures; skip tests if missing
|
|
|
|
from ice_stocks.ice_api import fetch_report_listings, find_latest_report
|
|
from ice_stocks.xls_parse import OLE2_MAGIC, detect_file_format, xls_to_rows
|
|
|
|
# ── helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def _make_xls_bytes(rows: list[list]) -> bytes:
|
|
"""Create a minimal in-memory XLS with one sheet."""
|
|
book = xlwt.Workbook()
|
|
sheet = book.add_sheet("Sheet1")
|
|
for r, row in enumerate(rows):
|
|
for c, val in enumerate(row):
|
|
sheet.write(r, c, val)
|
|
buf = io.BytesIO()
|
|
book.save(buf)
|
|
return buf.getvalue()
|
|
|
|
|
|
def _api_response(rows: list[dict]) -> dict:
|
|
"""Build a mock ICE API response payload."""
|
|
return {"datasets": {"results": {"rows": rows}}}
|
|
|
|
|
|
def _make_api_row(label: str, url: str = "/download/test.xls", publish_date: str = "2026-02-01") -> dict:
|
|
return {
|
|
"publishDate": publish_date,
|
|
"productName": "Coffee C",
|
|
"download": {"url": url, "label": label},
|
|
}
|
|
|
|
|
|
# ── detect_file_format ───────────────────────────────────────────────────────
|
|
|
|
def test_detect_file_format_xls():
|
|
content = OLE2_MAGIC + b"\x00" * 100
|
|
assert detect_file_format(content) == "xls"
|
|
|
|
|
|
def test_detect_file_format_xlsx():
|
|
content = b"PK\x03\x04" + b"\x00" * 100
|
|
assert detect_file_format(content) == "xlsx"
|
|
|
|
|
|
def test_detect_file_format_html():
|
|
content = b"<html><body>foo</body></html>"
|
|
assert detect_file_format(content) == "html"
|
|
|
|
|
|
def test_detect_file_format_csv():
|
|
content = b"report_date,total_certified_bags\n2026-01-01,100000\n"
|
|
assert detect_file_format(content) == "csv"
|
|
|
|
|
|
# ── xls_to_rows ──────────────────────────────────────────────────────────────
|
|
|
|
def test_xls_to_rows_roundtrip():
|
|
pytest.importorskip("xlwt")
|
|
input_rows = [
|
|
["header1", "header2", "header3"],
|
|
["value1", 42.0, "value3"],
|
|
["", 0.0, ""],
|
|
]
|
|
xls_bytes = _make_xls_bytes(input_rows)
|
|
assert xls_bytes[:4] == OLE2_MAGIC
|
|
|
|
result = xls_to_rows(xls_bytes)
|
|
assert len(result) == 3
|
|
assert result[0][0] == "header1"
|
|
assert result[1][1] == 42.0
|
|
# Empty cells come back as ""
|
|
assert result[2][0] == ""
|
|
|
|
|
|
def test_xls_to_rows_rejects_non_xls():
|
|
with pytest.raises(AssertionError, match="Not an OLE2"):
|
|
xls_to_rows(b"PK\x03\x04" + b"\x00" * 100)
|
|
|
|
|
|
# ── fetch_report_listings ────────────────────────────────────────────────────
|
|
|
|
def test_fetch_report_listings_parses_response():
|
|
mock_session = MagicMock()
|
|
mock_session.post.return_value.status_code = 200
|
|
mock_session.post.return_value.json.return_value = _api_response([
|
|
_make_api_row("Daily Warehouse Stocks", "/dl/stocks.xls"),
|
|
_make_api_row("Certified Stock Aging Report", "/dl/aging.xls"),
|
|
])
|
|
|
|
from ice_stocks.ice_api import ICE_BASE_URL, fetch_report_listings
|
|
rows = fetch_report_listings(mock_session, product_id=2)
|
|
|
|
assert len(rows) == 2
|
|
assert rows[0]["download_label"] == "Daily Warehouse Stocks"
|
|
assert rows[0]["download_url"] == ICE_BASE_URL + "/dl/stocks.xls"
|
|
assert rows[1]["download_label"] == "Certified Stock Aging Report"
|
|
|
|
|
|
def test_fetch_report_listings_prepends_base_url_for_absolute():
|
|
"""If URL already starts with http, don't prepend base."""
|
|
mock_session = MagicMock()
|
|
mock_session.post.return_value.status_code = 200
|
|
mock_session.post.return_value.json.return_value = _api_response([
|
|
_make_api_row("Test", "https://other.example.com/file.xls"),
|
|
])
|
|
|
|
from ice_stocks.ice_api import fetch_report_listings
|
|
rows = fetch_report_listings(mock_session, product_id=2)
|
|
assert rows[0]["download_url"] == "https://other.example.com/file.xls"
|
|
|
|
|
|
# ── find_latest_report ───────────────────────────────────────────────────────
|
|
|
|
def test_find_latest_report_label_match():
|
|
mock_session = MagicMock()
|
|
mock_session.post.return_value.status_code = 200
|
|
mock_session.post.return_value.json.return_value = _api_response([
|
|
_make_api_row("Daily Warehouse Stocks"),
|
|
_make_api_row("Certified Stock Aging Report"),
|
|
_make_api_row("Historical Stocks"),
|
|
])
|
|
|
|
result = find_latest_report(mock_session, "Aging Report")
|
|
assert result is not None
|
|
assert result["download_label"] == "Certified Stock Aging Report"
|
|
|
|
|
|
def test_find_latest_report_no_match_returns_none():
|
|
mock_session = MagicMock()
|
|
mock_session.post.return_value.status_code = 200
|
|
# Return empty rows on all pages
|
|
mock_session.post.return_value.json.return_value = _api_response([])
|
|
|
|
result = find_latest_report(mock_session, "Nonexistent Label XYZ")
|
|
assert result is None
|
|
|
|
|
|
# ── canonical CSV output ──────────────────────────────────────────────────────
|
|
|
|
def test_build_canonical_csv_from_xls_rows():
|
|
"""Verify execute._build_canonical_csv_from_xls produces correct schema."""
|
|
pytest.importorskip("xlwt")
|
|
|
|
# Simulate ICE daily stocks XLS structure
|
|
sheet_rows = [
|
|
["Coffee C Warehouse Stocks"] + [""] * 9, # row 0
|
|
[""] * 10, # row 1
|
|
["As of: 2/14/2026"] + [""] * 9, # row 2 — report date
|
|
] + [[""] * 10] * 20 + [ # rows 3-22
|
|
["Total in Bags", 10000.0, 5000.0, 2000.0, 1000.0, 500.0, 0.0, 0.0, 0.0, 18500.0], # row 23
|
|
]
|
|
|
|
xls_bytes = _make_xls_bytes(sheet_rows)
|
|
|
|
from ice_stocks.execute import _build_canonical_csv_from_xls
|
|
result = _build_canonical_csv_from_xls(xls_bytes)
|
|
|
|
assert result, "Expected non-empty CSV output"
|
|
reader = csv.DictReader(io.StringIO(result.decode("utf-8")))
|
|
rows = list(reader)
|
|
assert len(rows) == 1
|
|
assert rows[0]["report_date"] == "2026-02-14"
|
|
assert rows[0]["total_certified_bags"] == "18500"
|
|
|
|
|
|
def test_build_canonical_csv_from_xls_missing_date_returns_empty():
|
|
"""If header row has no parseable date, return empty bytes."""
|
|
pytest.importorskip("xlwt")
|
|
|
|
sheet_rows = [["Not a valid date header"] + [""] * 9] + [[""] * 10] * 30
|
|
xls_bytes = _make_xls_bytes(sheet_rows)
|
|
|
|
from ice_stocks.execute import _build_canonical_csv_from_xls
|
|
result = _build_canonical_csv_from_xls(xls_bytes)
|
|
assert result == b""
|