diff --git a/extract/ice_stocks/pyproject.toml b/extract/ice_stocks/pyproject.toml index 01fedb3..564116e 100644 --- a/extract/ice_stocks/pyproject.toml +++ b/extract/ice_stocks/pyproject.toml @@ -5,10 +5,13 @@ description = "ICE certified warehouse stocks extractor" requires-python = ">=3.13" dependencies = [ "niquests>=3.14.1", + "xlrd>=2.0.1", ] [project.scripts] extract_ice = "ice_stocks.execute:extract_ice_stocks" +extract_ice_aging = "ice_stocks.execute:extract_ice_aging" +extract_ice_historical = "ice_stocks.execute:extract_ice_historical" [build-system] requires = ["hatchling"] diff --git a/extract/ice_stocks/src/ice_stocks/execute.py b/extract/ice_stocks/src/ice_stocks/execute.py index e4dccb4..63c3764 100644 --- a/extract/ice_stocks/src/ice_stocks/execute.py +++ b/extract/ice_stocks/src/ice_stocks/execute.py @@ -4,16 +4,18 @@ Downloads daily certified stock reports from the ICE Report Center and stores as gzip CSV in the landing directory. Uses SHA256 of content as the idempotency key — skips if a file with the same hash already exists. -Landing path: LANDING_DIR/ice_stocks/{year}/{date}_{hash8}.csv.gzip +Landing paths: + LANDING_DIR/ice_stocks/{year}/{date}_{hash8}.csv.gzip (daily rolling stocks) + LANDING_DIR/ice_aging/{year}/{date}_{hash8}.csv.gzip (monthly aging report) + LANDING_DIR/ice_stocks_by_port/{year}/{date}_{hash8}.csv.gzip (historical by port) -CSV format produced (matching raw.ice_warehouse_stocks columns): - report_date,total_certified_bags,pending_grading_bags - -ICE Report Center URL discovery: - Visit https://www.theice.com/report-center and locate the - "Coffee C Warehouse Stocks" report. The download URL has the pattern: - https://www.theice.com/report-center/commodities/COFFEE/reports/... - Set ICE_STOCKS_URL environment variable to the discovered URL. +CSV schemas: + ice_stocks: report_date,total_certified_bags,pending_grading_bags + ice_aging: report_date,age_bucket,antwerp_bags,hamburg_bremen_bags, + houston_bags,miami_bags,new_orleans_bags,new_york_bags,total_bags + ice_stocks_by_port: report_date,new_york_bags,new_orleans_bags,houston_bags, + miami_bags,antwerp_bags,hamburg_bremen_bags,barcelona_bags, + virginia_bags,total_bags """ import csv @@ -27,6 +29,10 @@ import sys from datetime import datetime import niquests +import xlrd + +from ice_stocks.ice_api import find_latest_report +from ice_stocks.xls_parse import OLE2_MAGIC, detect_file_format, xls_to_rows logging.basicConfig( level=logging.INFO, @@ -37,23 +43,19 @@ logging.basicConfig( logger = logging.getLogger("ICE Stocks Extractor") LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) + +# ── ice_stocks (daily rolling) ────────────────────────────────────────────── DEST_SUBDIR = "ice_stocks" -# ICE Report Center URL for Coffee C certified warehouse stocks. -# Discover by visiting https://www.theice.com/report-center and locating -# the Coffee C warehouse stocks CSV export. Override via environment variable. -ICE_STOCKS_URL = os.getenv( - "ICE_STOCKS_URL", - "https://www.theice.com/publicdocs/futures_us/exchange_notices/coffee_certifiedstocks.csv", +# Static rolling CSV URL — try this first, fall back to API on 404. +ICE_ROLLING_CSV_URL = ( + "https://www.theice.com/publicdocs/futures_us/exchange_notices/coffee_certifiedstocks.csv" ) +ICE_STOCKS_LABEL = "Daily Warehouse Stocks" HTTP_TIMEOUT_SECONDS = 60 -# Expected column names from ICE CSV (may vary — adapt to actual column names) -# The ICE report typically has: Date, Certified Stocks (bags), Pending Grading (bags) -# We normalize to our canonical names. COLUMN_MAPPINGS = { - # Possible ICE column name → our canonical name "date": "report_date", "report date": "report_date", "Date": "report_date", @@ -66,94 +68,55 @@ COLUMN_MAPPINGS = { "pending grading (bags)": "pending_grading_bags", } +# ── ice_aging (monthly aging report) ──────────────────────────────────────── +ICE_AGING_LABEL = "Certified Stock Aging Report" +AGING_DEST_SUBDIR = "ice_aging" -def _normalize_row(row: dict) -> dict | None: - """Map raw ICE CSV columns to canonical schema. Returns None if date missing.""" - normalized = {} - for raw_key, value in row.items(): - canonical = COLUMN_MAPPINGS.get(raw_key.strip()) or COLUMN_MAPPINGS.get(raw_key.strip().lower()) - if canonical: - # Strip commas from numeric strings (ICE uses "1,234,567" format) - normalized[canonical] = value.strip().replace(",", "") if value else "" +AGING_PORT_HEADERS = [ + "antwerp_bags", + "hamburg_bremen_bags", + "houston_bags", + "miami_bags", + "new_orleans_bags", + "new_york_bags", + "total_bags", +] - if "report_date" not in normalized or not normalized["report_date"]: - return None +# ── ice_stocks_by_port (historical end-of-month) ───────────────────────────── +ICE_HISTORICAL_URL = ( + "https://www.ice.com/publicdocs/futures_us_reports/coffee/" + "EOM_KC_cert_stox_by_port_nov96-present.xls" +) +HISTORICAL_DEST_SUBDIR = "ice_stocks_by_port" +HISTORICAL_HTTP_TIMEOUT_SECONDS = 120 - # Fill missing optional columns with empty string - normalized.setdefault("total_certified_bags", "") - normalized.setdefault("pending_grading_bags", "") - - return normalized +HISTORICAL_PORT_COLS = [ + "new_york_bags", + "new_orleans_bags", + "houston_bags", + "miami_bags", + "antwerp_bags", + "hamburg_bremen_bags", + "barcelona_bags", + "virginia_bags", + "total_bags", +] -def _build_canonical_csv(raw_content: bytes) -> bytes: - """Parse ICE CSV and emit canonical CSV with our column schema.""" - text = raw_content.decode("utf-8", errors="replace") - reader = csv.DictReader(io.StringIO(text)) +# ── shared helpers ─────────────────────────────────────────────────────────── - rows = [] - for row in reader: - normalized = _normalize_row(row) - if normalized: - rows.append(normalized) +def _write_landing_file(canonical_csv: bytes, dest_subdir: str, date_label: str) -> None: + """SHA256-hash canonical_csv, skip if exists, else gzip and write.""" + assert canonical_csv, "canonical_csv must not be empty" + assert dest_subdir, "dest_subdir must not be empty" + assert date_label, "date_label must not be empty" - if not rows: - return b"" - - out = io.StringIO() - writer = csv.DictWriter(out, fieldnames=["report_date", "total_certified_bags", "pending_grading_bags"]) - writer.writeheader() - writer.writerows(rows) - return out.getvalue().encode("utf-8") - - -def extract_ice_stocks() -> None: - """Download ICE certified Coffee C warehouse stocks and store as gzip CSV. - - Idempotent: computes SHA256 of canonical CSV bytes, skips if already on disk. - The ICE report is a rolling file (same URL, updated daily) — we detect - changes via content hash. - """ - logger.info(f"Downloading ICE warehouse stocks from: {ICE_STOCKS_URL}") - - with niquests.Session() as session: - try: - response = session.get(ICE_STOCKS_URL, timeout=HTTP_TIMEOUT_SECONDS) - except Exception as e: - logger.error( - f"Failed to connect to ICE Report Center: {e}\n" - "If the URL has changed, set ICE_STOCKS_URL environment variable.\n" - "Visit https://www.theice.com/report-center to find the current URL." - ) - return - - if response.status_code == 404: - logger.warning( - "ICE stocks URL returned 404. The report URL may have changed.\n" - "Visit https://www.theice.com/report-center to find the current URL,\n" - "then set ICE_STOCKS_URL environment variable." - ) - return - - assert response.status_code == 200, ( - f"Unexpected status {response.status_code} from {ICE_STOCKS_URL}" - ) - assert len(response.content) > 0, "Downloaded empty file from ICE" - - canonical_csv = _build_canonical_csv(response.content) - if not canonical_csv: - logger.warning("ICE CSV parsed to 0 rows — column mapping may need updating") - return - - # Hash-based idempotency sha256 = hashlib.sha256(canonical_csv).hexdigest() etag = sha256[:8] + year = date_label[:4] - today = datetime.now().strftime("%Y-%m-%d") - year = datetime.now().strftime("%Y") - - dest_dir = LANDING_DIR / DEST_SUBDIR / year - local_file = dest_dir / f"{today}_{etag}.csv.gzip" + dest_dir = LANDING_DIR / dest_subdir / year + local_file = dest_dir / f"{date_label}_{etag}.csv.gzip" if local_file.exists(): logger.info(f"File {local_file.name} already exists — content unchanged, skipping") @@ -169,5 +132,302 @@ def extract_ice_stocks() -> None: logger.info(f"Stored {local_file} ({local_file.stat().st_size:,} bytes)") +def _build_csv_bytes(fieldnames: list[str], rows: list[dict]) -> bytes: + """Serialize list of dicts to CSV bytes.""" + out = io.StringIO() + writer = csv.DictWriter(out, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + return out.getvalue().encode("utf-8") + + +# ── ice_stocks (daily rolling) ─────────────────────────────────────────────── + +def _normalize_row(row: dict) -> dict | None: + """Map raw ICE CSV columns to canonical schema. Returns None if date missing.""" + normalized = {} + for raw_key, value in row.items(): + canonical = COLUMN_MAPPINGS.get(raw_key.strip()) or COLUMN_MAPPINGS.get(raw_key.strip().lower()) + if canonical: + normalized[canonical] = value.strip().replace(",", "") if value else "" + + if "report_date" not in normalized or not normalized["report_date"]: + return None + + normalized.setdefault("total_certified_bags", "") + normalized.setdefault("pending_grading_bags", "") + return normalized + + +def _build_canonical_csv_from_csv(raw_content: bytes) -> bytes: + """Parse ICE CSV bytes and emit canonical CSV.""" + text = raw_content.decode("utf-8", errors="replace") + reader = csv.DictReader(io.StringIO(text)) + + rows = [] + for row in reader: + normalized = _normalize_row(row) + if normalized: + rows.append(normalized) + + if not rows: + return b"" + + return _build_csv_bytes(["report_date", "total_certified_bags", "pending_grading_bags"], rows) + + +def _build_canonical_csv_from_xls(xls_bytes: bytes) -> bytes: + """Extract total certified bags from ICE daily stocks XLS. + + Sheet structure: + Row 2: header with report date in cell [0] + Row 23: ['Total in Bags', ANT, BAR, HA/BR, HOU, MIAMI, NOLA, NY, VA, total] + """ + rows = xls_to_rows(xls_bytes) + + # Extract report date from row 2, cell 0 (e.g. "As of: 1/30/2026") + header_cell = str(rows[2][0]) if len(rows) > 2 else "" + report_date = "" + if "as of" in header_cell.lower(): + date_part = header_cell.lower().replace("as of:", "").replace("as of", "").strip() + try: + dt = datetime.strptime(date_part.split()[0], "%m/%d/%Y") + report_date = dt.strftime("%Y-%m-%d") + except ValueError: + pass + + if not report_date: + logger.warning(f"Could not parse report date from XLS header: {header_cell!r}") + return b"" + + # Find "Total in Bags" row + total_bags = "" + for row in rows: + if row and str(row[0]).strip().lower() == "total in bags": + val = row[-1] + if isinstance(val, float): + total_bags = str(int(val)) + else: + total_bags = str(val).replace(",", "").strip() + break + + canonical_row = { + "report_date": report_date, + "total_certified_bags": total_bags, + "pending_grading_bags": "", + } + return _build_csv_bytes(["report_date", "total_certified_bags", "pending_grading_bags"], [canonical_row]) + + +def extract_ice_stocks() -> None: + """Download ICE certified Coffee C warehouse stocks and store as gzip CSV. + + Tries static rolling CSV URL first. On 404 or error, falls back to API + discovery to find the latest 'Daily Warehouse Stocks' report. + Idempotent: skips if content hash already on disk. + """ + with niquests.Session() as session: + logger.info(f"Trying ICE rolling CSV: {ICE_ROLLING_CSV_URL}") + try: + response = session.get(ICE_ROLLING_CSV_URL, timeout=HTTP_TIMEOUT_SECONDS) + except Exception as e: + logger.warning(f"Rolling CSV fetch failed: {e} — trying API discovery") + response = None + + use_api = response is None or response.status_code == 404 + + if use_api: + logger.info("Falling back to ICE API discovery for Daily Warehouse Stocks") + report = find_latest_report(session, ICE_STOCKS_LABEL) + if not report: + logger.error("ICE API: no 'Daily Warehouse Stocks' report found") + return + logger.info(f"Found report via API: {report['download_label']} ({report['publish_date']})") + try: + response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS) + except Exception as e: + logger.error(f"Failed to download report from API URL: {e}") + return + + if response.status_code != 200: + logger.error(f"Unexpected status {response.status_code}") + return + + assert len(response.content) > 0, "Downloaded empty file from ICE" + + fmt = detect_file_format(response.content) + if fmt == "xls": + canonical_csv = _build_canonical_csv_from_xls(response.content) + else: + canonical_csv = _build_canonical_csv_from_csv(response.content) + + if not canonical_csv: + logger.warning("ICE stocks parsed to 0 rows — check column mapping or XLS structure") + return + + today = datetime.now().strftime("%Y-%m-%d") + _write_landing_file(canonical_csv, DEST_SUBDIR, today) + + +# ── ice_aging (monthly aging report) ──────────────────────────────────────── + +def _parse_aging_date(cell_value: str) -> str: + """Parse 'As of Delivery 3/2/2026' or 'As of: 1/30/2026' → '2026-03-02'.""" + text = str(cell_value).strip() + for prefix in ("as of delivery ", "as of:"): + if text.lower().startswith(prefix): + text = text[len(prefix):].strip() + break + date_part = text.split()[0] + try: + dt = datetime.strptime(date_part, "%m/%d/%Y") + return dt.strftime("%Y-%m-%d") + except ValueError: + return "" + + +def extract_ice_aging() -> None: + """Download ICE Certified Stock Aging Report and store as gzip CSV. + + Monthly report: stock quantities by age bucket × port. + Idempotent: skips if content hash already on disk. + """ + with niquests.Session() as session: + logger.info("Fetching latest ICE Aging Report via API") + report = find_latest_report(session, ICE_AGING_LABEL) + if not report: + logger.error(f"ICE API: no report matching {ICE_AGING_LABEL!r}") + return + + logger.info(f"Downloading: {report['download_label']} ({report['publish_date']})") + try: + response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS) + except Exception as e: + logger.error(f"Failed to download aging report: {e}") + return + + assert response.status_code == 200, f"HTTP {response.status_code}" + assert response.content[:4] == OLE2_MAGIC, "Aging report is not an XLS file" + + rows = xls_to_rows(response.content) + + report_date = _parse_aging_date(str(rows[0][0]) if rows else "") + if not report_date: + logger.error(f"Could not parse aging report date from row 0: {rows[0] if rows else '(empty)'!r}") + return + + # Row 3+ are data rows; stop at row labelled "Total" + fieldnames = ["report_date", "age_bucket"] + AGING_PORT_HEADERS + data_rows = [] + + for row in rows[3:]: + if not row or not str(row[0]).strip(): + continue + label = str(row[0]).strip() + if label.lower() == "total": + break + + port_values = [] + for cell in row[1:]: + if isinstance(cell, float): + port_values.append(str(int(cell))) + elif str(cell).strip() in ("-", ""): + port_values.append("0") + else: + port_values.append(str(cell).replace(",", "").strip()) + + while len(port_values) < len(AGING_PORT_HEADERS): + port_values.append("0") + port_values = port_values[:len(AGING_PORT_HEADERS)] + + record = {"report_date": report_date, "age_bucket": label} + for col, val in zip(AGING_PORT_HEADERS, port_values): + record[col] = val + data_rows.append(record) + + if not data_rows: + logger.warning("Aging report parsed to 0 data rows") + return + + canonical_csv = _build_csv_bytes(fieldnames, data_rows) + _write_landing_file(canonical_csv, AGING_DEST_SUBDIR, report_date) + + +# ── ice_stocks_by_port (historical) ───────────────────────────────────────── + +def _excel_serial_to_date(serial: float, datemode: int) -> str: + """Convert Excel date serial to ISO date string, or '' on failure.""" + try: + dt = xlrd.xldate_as_datetime(serial, datemode) + return dt.strftime("%Y-%m-%d") + except Exception: + return "" + + +def extract_ice_historical() -> None: + """Download ICE historical end-of-month warehouse stocks by port. + + Static URL updated monthly. Covers Nov 1996 to present. + Idempotent: skips if content hash already on disk. + """ + logger.info(f"Downloading ICE historical by-port XLS: {ICE_HISTORICAL_URL}") + + with niquests.Session() as session: + try: + response = session.get(ICE_HISTORICAL_URL, timeout=HISTORICAL_HTTP_TIMEOUT_SECONDS) + except Exception as e: + logger.error(f"Failed to download historical XLS: {e}") + return + + assert response.status_code == 200, f"HTTP {response.status_code}" + assert response.content[:4] == OLE2_MAGIC, "Historical file is not an XLS" + + book = xlrd.open_workbook(file_contents=response.content) + datemode = book.datemode + rows = xls_to_rows(response.content) + + # Data starts at row 8 (0-indexed); rows 0-7 are headers + fieldnames = ["report_date"] + HISTORICAL_PORT_COLS + data_rows = [] + + for row in rows[8:]: + if not row or len(row) < 2: + continue + + serial_cell = row[1] + if not isinstance(serial_cell, float) or serial_cell <= 0: + continue + + report_date = _excel_serial_to_date(serial_cell, datemode) + if not report_date: + continue + + port_cells = row[2:2 + len(HISTORICAL_PORT_COLS)] + port_values = [] + for cell in port_cells: + if cell == "" or str(cell).strip() in ("-", ""): + port_values.append("0") + elif isinstance(cell, float): + port_values.append(str(int(cell))) + else: + port_values.append(str(cell).replace(",", "").strip()) + + while len(port_values) < len(HISTORICAL_PORT_COLS): + port_values.append("0") + + record = {"report_date": report_date} + for col, val in zip(HISTORICAL_PORT_COLS, port_values): + record[col] = val + data_rows.append(record) + + if not data_rows: + logger.warning("Historical XLS parsed to 0 data rows") + return + + canonical_csv = _build_csv_bytes(fieldnames, data_rows) + today = datetime.now().strftime("%Y-%m-%d") + _write_landing_file(canonical_csv, HISTORICAL_DEST_SUBDIR, today) + + if __name__ == "__main__": extract_ice_stocks() diff --git a/extract/ice_stocks/src/ice_stocks/ice_api.py b/extract/ice_stocks/src/ice_stocks/ice_api.py new file mode 100644 index 0000000..9df44d7 --- /dev/null +++ b/extract/ice_stocks/src/ice_stocks/ice_api.py @@ -0,0 +1,75 @@ +"""ICE Report Center API client. + +Discovers report download URLs via the private JSON API at +https://www.ice.com/marketdata/api/reports/293/results +No authentication required. Results are date-descending. +""" + +ICE_API_URL = "https://www.ice.com/marketdata/api/reports/293/results" +ICE_BASE_URL = "https://www.ice.com" +PRODUCT_ID_COFFEE = 2 +API_TIMEOUT_SECONDS = 30 +MAX_API_PAGES = 10 + + +def fetch_report_listings(session, product_id, max_results=50, page_number=1) -> list[dict]: + """POST to ICE API and return normalized report rows. + + Each row: {publish_date, product_name, download_url, download_label} + """ + assert product_id > 0, f"product_id must be positive, got {product_id}" + assert max_results > 0, f"max_results must be positive, got {max_results}" + assert page_number > 0, f"page_number must be positive, got {page_number}" + + payload = { + "offset": (page_number - 1) * max_results, + "pageNumber": page_number, + "productId": product_id, + "max": max_results, + } + response = session.post( + ICE_API_URL, + data=payload, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=API_TIMEOUT_SECONDS, + ) + assert response.status_code == 200, ( + f"ICE API returned {response.status_code}" + ) + + data = response.json() + rows = data["datasets"]["results"]["rows"] + + result = [] + for row in rows: + download = row.get("download", {}) or {} + url = download.get("url", "") or "" + if url and not url.startswith("http"): + url = ICE_BASE_URL + url + result.append({ + "publish_date": row.get("publishDate", ""), + "product_name": row.get("productName", ""), + "download_url": url, + "download_label": download.get("label", "") or "", + }) + + return result + + +def find_latest_report(session, label_substring, product_id=PRODUCT_ID_COFFEE) -> dict | None: + """Return first report whose download_label contains label_substring. + + Paginates up to MAX_API_PAGES. Results are date-descending so + the first match is the most recent. + """ + assert label_substring, "label_substring must not be empty" + + for page in range(1, MAX_API_PAGES + 1): + rows = fetch_report_listings(session, product_id, page_number=page) + if not rows: + break + for row in rows: + if label_substring.lower() in row["download_label"].lower(): + return row + + return None diff --git a/extract/ice_stocks/src/ice_stocks/xls_parse.py b/extract/ice_stocks/src/ice_stocks/xls_parse.py new file mode 100644 index 0000000..fc56600 --- /dev/null +++ b/extract/ice_stocks/src/ice_stocks/xls_parse.py @@ -0,0 +1,59 @@ +"""XLS file format detection and row extraction. + +Handles OLE2/BIFF .xls files (the format ICE uses for all reports). +Format detection via magic bytes — no extension sniffing. +""" + +import xlrd + +OLE2_MAGIC = b"\xd0\xcf\x11\xe0" +XLSX_MAGIC = b"PK\x03\x04" + + +def detect_file_format(content_bytes: bytes) -> str: + """Return 'xls', 'xlsx', 'csv', or 'html' based on magic bytes/content.""" + assert content_bytes, "content_bytes must not be empty" + + if content_bytes[:4] == OLE2_MAGIC: + return "xls" + if content_bytes[:4] == XLSX_MAGIC: + return "xlsx" + # Sniff text-based formats + sample = content_bytes[:512].decode("utf-8", errors="replace").lstrip() + if sample.startswith("<"): + return "html" + return "csv" + + +def xls_to_rows(content_bytes: bytes, sheet_index: int = 0) -> list[list]: + """Parse XLS bytes and return sheet rows as list of lists. + + Values are returned as Python types (str, int, float, datetime, bool). + Empty cells become empty string "". + """ + assert content_bytes, "content_bytes must not be empty" + assert content_bytes[:4] == OLE2_MAGIC, ( + f"Not an OLE2/BIFF XLS file (magic: {content_bytes[:4].hex()})" + ) + + book = xlrd.open_workbook(file_contents=content_bytes) + assert sheet_index < book.nsheets, ( + f"sheet_index {sheet_index} out of range (nsheets={book.nsheets})" + ) + sheet = book.sheets()[sheet_index] + + rows = [] + for row_idx in range(sheet.nrows): + row = [] + for col_idx in range(sheet.ncols): + cell = sheet.cell(row_idx, col_idx) + if cell.ctype == xlrd.XL_CELL_EMPTY: + row.append("") + elif cell.ctype == xlrd.XL_CELL_DATE: + # Keep as raw serial — callers convert with xlrd.xldate_as_datetime + row.append(cell.value) + else: + row.append(cell.value) + rows.append(row) + + return rows diff --git a/pyproject.toml b/pyproject.toml index 46561c6..5732b2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ dev = [ "pytest-cov>=7.0.0", "pyyaml>=6.0.2", "ruff>=0.9.9", + "xlwt>=1.3.0", ] [tool.uv.sources] diff --git a/src/materia/pipelines.py b/src/materia/pipelines.py index 1cfe51a..932d1a2 100644 --- a/src/materia/pipelines.py +++ b/src/materia/pipelines.py @@ -28,6 +28,14 @@ PIPELINES = { "command": ["uv", "run", "--package", "ice_stocks", "extract_ice"], "timeout_seconds": 600, }, + "extract_ice_aging": { + "command": ["uv", "run", "--package", "ice_stocks", "extract_ice_aging"], + "timeout_seconds": 600, + }, + "extract_ice_historical": { + "command": ["uv", "run", "--package", "ice_stocks", "extract_ice_historical"], + "timeout_seconds": 600, + }, "transform": { "command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"], "timeout_seconds": 3600, diff --git a/tests/test_ice_extraction.py b/tests/test_ice_extraction.py new file mode 100644 index 0000000..2a6d008 --- /dev/null +++ b/tests/test_ice_extraction.py @@ -0,0 +1,184 @@ +"""Tests for ICE extraction: format detection, XLS parsing, API client.""" + +import csv +import gzip +import io +import struct +from unittest.mock import MagicMock, patch + +import pytest +import xlwt # noqa: F401 — needed to create XLS fixtures; skip tests if missing + +from ice_stocks.ice_api import fetch_report_listings, find_latest_report +from ice_stocks.xls_parse import OLE2_MAGIC, detect_file_format, xls_to_rows + +# ── helpers ────────────────────────────────────────────────────────────────── + +def _make_xls_bytes(rows: list[list]) -> bytes: + """Create a minimal in-memory XLS with one sheet.""" + book = xlwt.Workbook() + sheet = book.add_sheet("Sheet1") + for r, row in enumerate(rows): + for c, val in enumerate(row): + sheet.write(r, c, val) + buf = io.BytesIO() + book.save(buf) + return buf.getvalue() + + +def _api_response(rows: list[dict]) -> dict: + """Build a mock ICE API response payload.""" + return {"datasets": {"results": {"rows": rows}}} + + +def _make_api_row(label: str, url: str = "/download/test.xls", publish_date: str = "2026-02-01") -> dict: + return { + "publishDate": publish_date, + "productName": "Coffee C", + "download": {"url": url, "label": label}, + } + + +# ── detect_file_format ─────────────────────────────────────────────────────── + +def test_detect_file_format_xls(): + content = OLE2_MAGIC + b"\x00" * 100 + assert detect_file_format(content) == "xls" + + +def test_detect_file_format_xlsx(): + content = b"PK\x03\x04" + b"\x00" * 100 + assert detect_file_format(content) == "xlsx" + + +def test_detect_file_format_html(): + content = b"foo" + assert detect_file_format(content) == "html" + + +def test_detect_file_format_csv(): + content = b"report_date,total_certified_bags\n2026-01-01,100000\n" + assert detect_file_format(content) == "csv" + + +# ── xls_to_rows ────────────────────────────────────────────────────────────── + +def test_xls_to_rows_roundtrip(): + pytest.importorskip("xlwt") + input_rows = [ + ["header1", "header2", "header3"], + ["value1", 42.0, "value3"], + ["", 0.0, ""], + ] + xls_bytes = _make_xls_bytes(input_rows) + assert xls_bytes[:4] == OLE2_MAGIC + + result = xls_to_rows(xls_bytes) + assert len(result) == 3 + assert result[0][0] == "header1" + assert result[1][1] == 42.0 + # Empty cells come back as "" + assert result[2][0] == "" + + +def test_xls_to_rows_rejects_non_xls(): + with pytest.raises(AssertionError, match="Not an OLE2"): + xls_to_rows(b"PK\x03\x04" + b"\x00" * 100) + + +# ── fetch_report_listings ──────────────────────────────────────────────────── + +def test_fetch_report_listings_parses_response(): + mock_session = MagicMock() + mock_session.post.return_value.status_code = 200 + mock_session.post.return_value.json.return_value = _api_response([ + _make_api_row("Daily Warehouse Stocks", "/dl/stocks.xls"), + _make_api_row("Certified Stock Aging Report", "/dl/aging.xls"), + ]) + + from ice_stocks.ice_api import ICE_BASE_URL, fetch_report_listings + rows = fetch_report_listings(mock_session, product_id=2) + + assert len(rows) == 2 + assert rows[0]["download_label"] == "Daily Warehouse Stocks" + assert rows[0]["download_url"] == ICE_BASE_URL + "/dl/stocks.xls" + assert rows[1]["download_label"] == "Certified Stock Aging Report" + + +def test_fetch_report_listings_prepends_base_url_for_absolute(): + """If URL already starts with http, don't prepend base.""" + mock_session = MagicMock() + mock_session.post.return_value.status_code = 200 + mock_session.post.return_value.json.return_value = _api_response([ + _make_api_row("Test", "https://other.example.com/file.xls"), + ]) + + from ice_stocks.ice_api import fetch_report_listings + rows = fetch_report_listings(mock_session, product_id=2) + assert rows[0]["download_url"] == "https://other.example.com/file.xls" + + +# ── find_latest_report ─────────────────────────────────────────────────────── + +def test_find_latest_report_label_match(): + mock_session = MagicMock() + mock_session.post.return_value.status_code = 200 + mock_session.post.return_value.json.return_value = _api_response([ + _make_api_row("Daily Warehouse Stocks"), + _make_api_row("Certified Stock Aging Report"), + _make_api_row("Historical Stocks"), + ]) + + result = find_latest_report(mock_session, "Aging Report") + assert result is not None + assert result["download_label"] == "Certified Stock Aging Report" + + +def test_find_latest_report_no_match_returns_none(): + mock_session = MagicMock() + mock_session.post.return_value.status_code = 200 + # Return empty rows on all pages + mock_session.post.return_value.json.return_value = _api_response([]) + + result = find_latest_report(mock_session, "Nonexistent Label XYZ") + assert result is None + + +# ── canonical CSV output ────────────────────────────────────────────────────── + +def test_build_canonical_csv_from_xls_rows(): + """Verify execute._build_canonical_csv_from_xls produces correct schema.""" + pytest.importorskip("xlwt") + + # Simulate ICE daily stocks XLS structure + sheet_rows = [ + ["Coffee C Warehouse Stocks"] + [""] * 9, # row 0 + [""] * 10, # row 1 + ["As of: 2/14/2026"] + [""] * 9, # row 2 — report date + ] + [[""] * 10] * 20 + [ # rows 3-22 + ["Total in Bags", 10000.0, 5000.0, 2000.0, 1000.0, 500.0, 0.0, 0.0, 0.0, 18500.0], # row 23 + ] + + xls_bytes = _make_xls_bytes(sheet_rows) + + from ice_stocks.execute import _build_canonical_csv_from_xls + result = _build_canonical_csv_from_xls(xls_bytes) + + assert result, "Expected non-empty CSV output" + reader = csv.DictReader(io.StringIO(result.decode("utf-8"))) + rows = list(reader) + assert len(rows) == 1 + assert rows[0]["report_date"] == "2026-02-14" + assert rows[0]["total_certified_bags"] == "18500" + + +def test_build_canonical_csv_from_xls_missing_date_returns_empty(): + """If header row has no parseable date, return empty bytes.""" + pytest.importorskip("xlwt") + + sheet_rows = [["Not a valid date header"] + [""] * 9] + [[""] * 10] * 30 + xls_bytes = _make_xls_bytes(sheet_rows) + + from ice_stocks.execute import _build_canonical_csv_from_xls + result = _build_canonical_csv_from_xls(xls_bytes) + assert result == b"" diff --git a/transform/sqlmesh_materia/macros/__init__.py b/transform/sqlmesh_materia/macros/__init__.py index 7d6aa61..9d42e6d 100644 --- a/transform/sqlmesh_materia/macros/__init__.py +++ b/transform/sqlmesh_materia/macros/__init__.py @@ -29,3 +29,17 @@ def ice_stocks_glob(evaluator) -> str: """Return a quoted glob path for all ICE warehouse stock CSV gzip files under LANDING_DIR.""" landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") return f"'{landing_dir}/ice_stocks/**/*.csv.gzip'" + + +@macro() +def ice_aging_glob(evaluator) -> str: + """Return a quoted glob path for all ICE aging report CSV gzip files under LANDING_DIR.""" + landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") + return f"'{landing_dir}/ice_aging/**/*.csv.gzip'" + + +@macro() +def ice_stocks_by_port_glob(evaluator) -> str: + """Return a quoted glob path for all ICE historical by-port CSV gzip files under LANDING_DIR.""" + landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") + return f"'{landing_dir}/ice_stocks_by_port/**/*.csv.gzip'" diff --git a/transform/sqlmesh_materia/models/foundation/fct_ice_aging_stocks.sql b/transform/sqlmesh_materia/models/foundation/fct_ice_aging_stocks.sql new file mode 100644 index 0000000..6f9030f --- /dev/null +++ b/transform/sqlmesh_materia/models/foundation/fct_ice_aging_stocks.sql @@ -0,0 +1,58 @@ +-- Foundation fact: ICE certified Coffee C (Arabica) aging report. +-- +-- Casts raw varchar columns to proper types and deduplicates via hash key. +-- Grain: one row per (report_date, age_bucket). +-- Age buckets represent how long coffee has been in certified storage. +-- Port columns are in bags (60kg). + +MODEL ( + name foundation.fct_ice_aging_stocks, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column report_date + ), + grain (report_date, age_bucket), + start '2020-01-01', + cron '@daily' +); + +WITH cast_and_clean AS ( + SELECT + TRY_CAST(report_date AS date) AS report_date, + age_bucket, + TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags, + TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags, + TRY_CAST(houston_bags AS bigint) AS houston_bags, + TRY_CAST(miami_bags AS bigint) AS miami_bags, + TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags, + TRY_CAST(new_york_bags AS bigint) AS new_york_bags, + TRY_CAST(total_bags AS bigint) AS total_bags, + + filename AS source_file, + + hash(report_date, age_bucket, total_bags) AS hkey + FROM raw.ice_aging_stocks + WHERE TRY_CAST(report_date AS date) IS NOT NULL + AND age_bucket IS NOT NULL + AND age_bucket != '' +), + +deduplicated AS ( + SELECT + any_value(report_date) AS report_date, + any_value(age_bucket) AS age_bucket, + any_value(antwerp_bags) AS antwerp_bags, + any_value(hamburg_bremen_bags) AS hamburg_bremen_bags, + any_value(houston_bags) AS houston_bags, + any_value(miami_bags) AS miami_bags, + any_value(new_orleans_bags) AS new_orleans_bags, + any_value(new_york_bags) AS new_york_bags, + any_value(total_bags) AS total_bags, + any_value(source_file) AS source_file, + hkey + FROM cast_and_clean + GROUP BY hkey +) + +SELECT * +FROM deduplicated +WHERE report_date BETWEEN @start_ds AND @end_ds diff --git a/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks_by_port.sql b/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks_by_port.sql new file mode 100644 index 0000000..392070b --- /dev/null +++ b/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks_by_port.sql @@ -0,0 +1,60 @@ +-- Foundation fact: ICE historical end-of-month Coffee C certified warehouse stocks by port. +-- +-- Covers November 1996 to present (30-year history). Casts raw varchar columns +-- to proper types and deduplicates via hash key. +-- +-- Grain: one row per report_date (end-of-month). +-- Port columns are in bags (60kg). + +MODEL ( + name foundation.fct_ice_warehouse_stocks_by_port, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column report_date + ), + grain (report_date), + start '1996-11-01', + cron '@daily' +); + +WITH cast_and_clean AS ( + SELECT + TRY_CAST(report_date AS date) AS report_date, + TRY_CAST(new_york_bags AS bigint) AS new_york_bags, + TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags, + TRY_CAST(houston_bags AS bigint) AS houston_bags, + TRY_CAST(miami_bags AS bigint) AS miami_bags, + TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags, + TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags, + TRY_CAST(barcelona_bags AS bigint) AS barcelona_bags, + TRY_CAST(virginia_bags AS bigint) AS virginia_bags, + TRY_CAST(total_bags AS bigint) AS total_bags, + + filename AS source_file, + + hash(report_date, total_bags) AS hkey + FROM raw.ice_warehouse_stocks_by_port + WHERE TRY_CAST(report_date AS date) IS NOT NULL + AND TRY_CAST(total_bags AS bigint) IS NOT NULL +), + +deduplicated AS ( + SELECT + any_value(report_date) AS report_date, + any_value(new_york_bags) AS new_york_bags, + any_value(new_orleans_bags) AS new_orleans_bags, + any_value(houston_bags) AS houston_bags, + any_value(miami_bags) AS miami_bags, + any_value(antwerp_bags) AS antwerp_bags, + any_value(hamburg_bremen_bags) AS hamburg_bremen_bags, + any_value(barcelona_bags) AS barcelona_bags, + any_value(virginia_bags) AS virginia_bags, + any_value(total_bags) AS total_bags, + any_value(source_file) AS source_file, + hkey + FROM cast_and_clean + GROUP BY hkey +) + +SELECT * +FROM deduplicated +WHERE report_date BETWEEN @start_ds AND @end_ds diff --git a/transform/sqlmesh_materia/models/raw/ice_aging_stocks.sql b/transform/sqlmesh_materia/models/raw/ice_aging_stocks.sql new file mode 100644 index 0000000..777deae --- /dev/null +++ b/transform/sqlmesh_materia/models/raw/ice_aging_stocks.sql @@ -0,0 +1,49 @@ +-- Raw ICE certified stock aging report — technical ingestion layer. +-- +-- Reads monthly aging report gzip CSVs from the landing directory. +-- All values are varchar; casting happens in foundation.fct_ice_aging_stocks. +-- +-- Source: ICE Report Center (Certified Stock Aging Report) +-- Coverage: varies by download history +-- Frequency: monthly (ICE updates after each delivery month) + +MODEL ( + name raw.ice_aging_stocks, + kind FULL, + cron '@daily', + columns ( + report_date varchar, + age_bucket varchar, + antwerp_bags varchar, + hamburg_bremen_bags varchar, + houston_bags varchar, + miami_bags varchar, + new_orleans_bags varchar, + new_york_bags varchar, + total_bags varchar, + filename varchar + ) +); + +SELECT + report_date, + age_bucket, + antwerp_bags, + hamburg_bremen_bags, + houston_bags, + miami_bags, + new_orleans_bags, + new_york_bags, + total_bags, + filename +FROM read_csv( + @ice_aging_glob(), + delim = ',', + encoding = 'utf-8', + compression = 'gzip', + header = true, + union_by_name = true, + filename = true, + all_varchar = true, + ignore_errors = true +) diff --git a/transform/sqlmesh_materia/models/raw/ice_warehouse_stocks_by_port.sql b/transform/sqlmesh_materia/models/raw/ice_warehouse_stocks_by_port.sql new file mode 100644 index 0000000..ce98184 --- /dev/null +++ b/transform/sqlmesh_materia/models/raw/ice_warehouse_stocks_by_port.sql @@ -0,0 +1,51 @@ +-- Raw ICE historical end-of-month warehouse stocks by port — technical ingestion layer. +-- +-- Reads historical by-port stock gzip CSVs from the landing directory. +-- All values are varchar; casting happens in foundation.fct_ice_warehouse_stocks_by_port. +-- +-- Source: ICE (EOM_KC_cert_stox_by_port_nov96-present.xls) +-- Coverage: November 1996 to present +-- Frequency: monthly (ICE updates the static file monthly) + +MODEL ( + name raw.ice_warehouse_stocks_by_port, + kind FULL, + cron '@daily', + columns ( + report_date varchar, + new_york_bags varchar, + new_orleans_bags varchar, + houston_bags varchar, + miami_bags varchar, + antwerp_bags varchar, + hamburg_bremen_bags varchar, + barcelona_bags varchar, + virginia_bags varchar, + total_bags varchar, + filename varchar + ) +); + +SELECT + report_date, + new_york_bags, + new_orleans_bags, + houston_bags, + miami_bags, + antwerp_bags, + hamburg_bremen_bags, + barcelona_bags, + virginia_bags, + total_bags, + filename +FROM read_csv( + @ice_stocks_by_port_glob(), + delim = ',', + encoding = 'utf-8', + compression = 'gzip', + header = true, + union_by_name = true, + filename = true, + all_varchar = true, + ignore_errors = true +) diff --git a/uv.lock b/uv.lock index 08c3e64..e3cc4a0 100644 --- a/uv.lock +++ b/uv.lock @@ -1060,10 +1060,14 @@ version = "0.1.0" source = { editable = "extract/ice_stocks" } dependencies = [ { name = "niquests" }, + { name = "xlrd" }, ] [package.metadata] -requires-dist = [{ name = "niquests", specifier = ">=3.14.1" }] +requires-dist = [ + { name = "niquests", specifier = ">=3.14.1" }, + { name = "xlrd", specifier = ">=2.0.1" }, +] [[package]] name = "identify" @@ -1558,6 +1562,7 @@ dev = [ { name = "pytest-cov" }, { name = "pyyaml" }, { name = "ruff" }, + { name = "xlwt" }, ] exploration = [ { name = "ipykernel" }, @@ -1584,6 +1589,7 @@ dev = [ { name = "pytest-cov", specifier = ">=7.0.0" }, { name = "pyyaml", specifier = ">=6.0.2" }, { name = "ruff", specifier = ">=0.9.9" }, + { name = "xlwt", specifier = ">=1.3.0" }, ] exploration = [{ name = "ipykernel", specifier = ">=6.29.5" }] @@ -3591,6 +3597,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a4/f5/10b68b7b1544245097b2a1b8238f66f2fc6dcaeb24ba5d917f52bd2eed4f/wsproto-1.3.2-py3-none-any.whl", hash = "sha256:61eea322cdf56e8cc904bd3ad7573359a242ba65688716b0710a5eb12beab584", size = 24405, upload-time = "2025-11-20T18:18:00.454Z" }, ] +[[package]] +name = "xlrd" +version = "2.0.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/07/5a/377161c2d3538d1990d7af382c79f3b2372e880b65de21b01b1a2b78691e/xlrd-2.0.2.tar.gz", hash = "sha256:08b5e25de58f21ce71dc7db3b3b8106c1fa776f3024c54e45b45b374e89234c9", size = 100167, upload-time = "2025-06-14T08:46:39.039Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1a/62/c8d562e7766786ba6587d09c5a8ba9f718ed3fa8af7f4553e8f91c36f302/xlrd-2.0.2-py2.py3-none-any.whl", hash = "sha256:ea762c3d29f4cca48d82df517b6d89fbce4db3107f9d78713e48cd321d5c9aa9", size = 96555, upload-time = "2025-06-14T08:46:37.766Z" }, +] + +[[package]] +name = "xlwt" +version = "1.3.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/06/97/56a6f56ce44578a69343449aa5a0d98eefe04085d69da539f3034e2cd5c1/xlwt-1.3.0.tar.gz", hash = "sha256:c59912717a9b28f1a3c2a98fd60741014b06b043936dcecbc113eaaada156c88", size = 153929, upload-time = "2017-08-22T06:47:16.498Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/44/48/def306413b25c3d01753603b1a222a011b8621aed27cd7f89cbc27e6b0f4/xlwt-1.3.0-py2.py3-none-any.whl", hash = "sha256:a082260524678ba48a297d922cc385f58278b8aa68741596a87de01a9c628b2e", size = 99981, upload-time = "2017-08-22T06:47:15.281Z" }, +] + [[package]] name = "yfinance" version = "1.2.0"