ICE extraction overhaul: API discovery + aging report + historical backfill

- Replace brittle ICE_STOCKS_URL env var with API-based URL discovery via
  the private ICE Report Center JSON API (no auth required)
- Add rolling CSV → XLS fallback in extract_ice_stocks() using
  find_latest_report() from ice_api.py
- Add ice_api.py: fetch_report_listings(), find_latest_report() with
  pagination up to MAX_API_PAGES
- Add xls_parse.py: detect_file_format() (magic bytes), xls_to_rows()
  using xlrd for OLE2/BIFF XLS files
- Add extract_ice_aging(): monthly certified stock aging report by
  age bucket × port → ice_aging/ landing dir
- Add extract_ice_historical(): 30-year EOM by-port stocks from static
  ICE URL → ice_stocks_by_port/ landing dir
- Add xlrd>=2.0.1 (parse XLS), xlwt>=1.3.0 (dev, test fixtures)
- Add SQLMesh raw + foundation models for both new datasets
- Add ice_aging_glob(), ice_stocks_by_port_glob() macros
- Add extract_ice_aging + extract_ice_historical pipeline entries
- Add 12 unit tests (format detection, XLS roundtrip, API mock, CSV output)

Seed files (data/landing/ice_aging/seed/ and ice_stocks_by_port/seed/)
must be created locally — data/ is gitignored.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-21 21:13:18 +01:00
parent ff39d65dc6
commit ff7301d6a8
13 changed files with 944 additions and 98 deletions

View File

@@ -5,10 +5,13 @@ description = "ICE certified warehouse stocks extractor"
requires-python = ">=3.13"
dependencies = [
"niquests>=3.14.1",
"xlrd>=2.0.1",
]
[project.scripts]
extract_ice = "ice_stocks.execute:extract_ice_stocks"
extract_ice_aging = "ice_stocks.execute:extract_ice_aging"
extract_ice_historical = "ice_stocks.execute:extract_ice_historical"
[build-system]
requires = ["hatchling"]

View File

@@ -4,16 +4,18 @@ Downloads daily certified stock reports from the ICE Report Center and stores
as gzip CSV in the landing directory. Uses SHA256 of content as the
idempotency key — skips if a file with the same hash already exists.
Landing path: LANDING_DIR/ice_stocks/{year}/{date}_{hash8}.csv.gzip
Landing paths:
LANDING_DIR/ice_stocks/{year}/{date}_{hash8}.csv.gzip (daily rolling stocks)
LANDING_DIR/ice_aging/{year}/{date}_{hash8}.csv.gzip (monthly aging report)
LANDING_DIR/ice_stocks_by_port/{year}/{date}_{hash8}.csv.gzip (historical by port)
CSV format produced (matching raw.ice_warehouse_stocks columns):
report_date,total_certified_bags,pending_grading_bags
ICE Report Center URL discovery:
Visit https://www.theice.com/report-center and locate the
"Coffee C Warehouse Stocks" report. The download URL has the pattern:
https://www.theice.com/report-center/commodities/COFFEE/reports/...
Set ICE_STOCKS_URL environment variable to the discovered URL.
CSV schemas:
ice_stocks: report_date,total_certified_bags,pending_grading_bags
ice_aging: report_date,age_bucket,antwerp_bags,hamburg_bremen_bags,
houston_bags,miami_bags,new_orleans_bags,new_york_bags,total_bags
ice_stocks_by_port: report_date,new_york_bags,new_orleans_bags,houston_bags,
miami_bags,antwerp_bags,hamburg_bremen_bags,barcelona_bags,
virginia_bags,total_bags
"""
import csv
@@ -27,6 +29,10 @@ import sys
from datetime import datetime
import niquests
import xlrd
from ice_stocks.ice_api import find_latest_report
from ice_stocks.xls_parse import OLE2_MAGIC, detect_file_format, xls_to_rows
logging.basicConfig(
level=logging.INFO,
@@ -37,23 +43,19 @@ logging.basicConfig(
logger = logging.getLogger("ICE Stocks Extractor")
LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing"))
# ── ice_stocks (daily rolling) ──────────────────────────────────────────────
DEST_SUBDIR = "ice_stocks"
# ICE Report Center URL for Coffee C certified warehouse stocks.
# Discover by visiting https://www.theice.com/report-center and locating
# the Coffee C warehouse stocks CSV export. Override via environment variable.
ICE_STOCKS_URL = os.getenv(
"ICE_STOCKS_URL",
"https://www.theice.com/publicdocs/futures_us/exchange_notices/coffee_certifiedstocks.csv",
# Static rolling CSV URL — try this first, fall back to API on 404.
ICE_ROLLING_CSV_URL = (
"https://www.theice.com/publicdocs/futures_us/exchange_notices/coffee_certifiedstocks.csv"
)
ICE_STOCKS_LABEL = "Daily Warehouse Stocks"
HTTP_TIMEOUT_SECONDS = 60
# Expected column names from ICE CSV (may vary — adapt to actual column names)
# The ICE report typically has: Date, Certified Stocks (bags), Pending Grading (bags)
# We normalize to our canonical names.
COLUMN_MAPPINGS = {
# Possible ICE column name → our canonical name
"date": "report_date",
"report date": "report_date",
"Date": "report_date",
@@ -66,94 +68,55 @@ COLUMN_MAPPINGS = {
"pending grading (bags)": "pending_grading_bags",
}
# ── ice_aging (monthly aging report) ────────────────────────────────────────
ICE_AGING_LABEL = "Certified Stock Aging Report"
AGING_DEST_SUBDIR = "ice_aging"
def _normalize_row(row: dict) -> dict | None:
"""Map raw ICE CSV columns to canonical schema. Returns None if date missing."""
normalized = {}
for raw_key, value in row.items():
canonical = COLUMN_MAPPINGS.get(raw_key.strip()) or COLUMN_MAPPINGS.get(raw_key.strip().lower())
if canonical:
# Strip commas from numeric strings (ICE uses "1,234,567" format)
normalized[canonical] = value.strip().replace(",", "") if value else ""
AGING_PORT_HEADERS = [
"antwerp_bags",
"hamburg_bremen_bags",
"houston_bags",
"miami_bags",
"new_orleans_bags",
"new_york_bags",
"total_bags",
]
if "report_date" not in normalized or not normalized["report_date"]:
return None
# ── ice_stocks_by_port (historical end-of-month) ─────────────────────────────
ICE_HISTORICAL_URL = (
"https://www.ice.com/publicdocs/futures_us_reports/coffee/"
"EOM_KC_cert_stox_by_port_nov96-present.xls"
)
HISTORICAL_DEST_SUBDIR = "ice_stocks_by_port"
HISTORICAL_HTTP_TIMEOUT_SECONDS = 120
# Fill missing optional columns with empty string
normalized.setdefault("total_certified_bags", "")
normalized.setdefault("pending_grading_bags", "")
return normalized
HISTORICAL_PORT_COLS = [
"new_york_bags",
"new_orleans_bags",
"houston_bags",
"miami_bags",
"antwerp_bags",
"hamburg_bremen_bags",
"barcelona_bags",
"virginia_bags",
"total_bags",
]
def _build_canonical_csv(raw_content: bytes) -> bytes:
"""Parse ICE CSV and emit canonical CSV with our column schema."""
text = raw_content.decode("utf-8", errors="replace")
reader = csv.DictReader(io.StringIO(text))
# ── shared helpers ───────────────────────────────────────────────────────────
rows = []
for row in reader:
normalized = _normalize_row(row)
if normalized:
rows.append(normalized)
def _write_landing_file(canonical_csv: bytes, dest_subdir: str, date_label: str) -> None:
"""SHA256-hash canonical_csv, skip if exists, else gzip and write."""
assert canonical_csv, "canonical_csv must not be empty"
assert dest_subdir, "dest_subdir must not be empty"
assert date_label, "date_label must not be empty"
if not rows:
return b""
out = io.StringIO()
writer = csv.DictWriter(out, fieldnames=["report_date", "total_certified_bags", "pending_grading_bags"])
writer.writeheader()
writer.writerows(rows)
return out.getvalue().encode("utf-8")
def extract_ice_stocks() -> None:
"""Download ICE certified Coffee C warehouse stocks and store as gzip CSV.
Idempotent: computes SHA256 of canonical CSV bytes, skips if already on disk.
The ICE report is a rolling file (same URL, updated daily) — we detect
changes via content hash.
"""
logger.info(f"Downloading ICE warehouse stocks from: {ICE_STOCKS_URL}")
with niquests.Session() as session:
try:
response = session.get(ICE_STOCKS_URL, timeout=HTTP_TIMEOUT_SECONDS)
except Exception as e:
logger.error(
f"Failed to connect to ICE Report Center: {e}\n"
"If the URL has changed, set ICE_STOCKS_URL environment variable.\n"
"Visit https://www.theice.com/report-center to find the current URL."
)
return
if response.status_code == 404:
logger.warning(
"ICE stocks URL returned 404. The report URL may have changed.\n"
"Visit https://www.theice.com/report-center to find the current URL,\n"
"then set ICE_STOCKS_URL environment variable."
)
return
assert response.status_code == 200, (
f"Unexpected status {response.status_code} from {ICE_STOCKS_URL}"
)
assert len(response.content) > 0, "Downloaded empty file from ICE"
canonical_csv = _build_canonical_csv(response.content)
if not canonical_csv:
logger.warning("ICE CSV parsed to 0 rows — column mapping may need updating")
return
# Hash-based idempotency
sha256 = hashlib.sha256(canonical_csv).hexdigest()
etag = sha256[:8]
year = date_label[:4]
today = datetime.now().strftime("%Y-%m-%d")
year = datetime.now().strftime("%Y")
dest_dir = LANDING_DIR / DEST_SUBDIR / year
local_file = dest_dir / f"{today}_{etag}.csv.gzip"
dest_dir = LANDING_DIR / dest_subdir / year
local_file = dest_dir / f"{date_label}_{etag}.csv.gzip"
if local_file.exists():
logger.info(f"File {local_file.name} already exists — content unchanged, skipping")
@@ -169,5 +132,302 @@ def extract_ice_stocks() -> None:
logger.info(f"Stored {local_file} ({local_file.stat().st_size:,} bytes)")
def _build_csv_bytes(fieldnames: list[str], rows: list[dict]) -> bytes:
"""Serialize list of dicts to CSV bytes."""
out = io.StringIO()
writer = csv.DictWriter(out, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
return out.getvalue().encode("utf-8")
# ── ice_stocks (daily rolling) ───────────────────────────────────────────────
def _normalize_row(row: dict) -> dict | None:
"""Map raw ICE CSV columns to canonical schema. Returns None if date missing."""
normalized = {}
for raw_key, value in row.items():
canonical = COLUMN_MAPPINGS.get(raw_key.strip()) or COLUMN_MAPPINGS.get(raw_key.strip().lower())
if canonical:
normalized[canonical] = value.strip().replace(",", "") if value else ""
if "report_date" not in normalized or not normalized["report_date"]:
return None
normalized.setdefault("total_certified_bags", "")
normalized.setdefault("pending_grading_bags", "")
return normalized
def _build_canonical_csv_from_csv(raw_content: bytes) -> bytes:
"""Parse ICE CSV bytes and emit canonical CSV."""
text = raw_content.decode("utf-8", errors="replace")
reader = csv.DictReader(io.StringIO(text))
rows = []
for row in reader:
normalized = _normalize_row(row)
if normalized:
rows.append(normalized)
if not rows:
return b""
return _build_csv_bytes(["report_date", "total_certified_bags", "pending_grading_bags"], rows)
def _build_canonical_csv_from_xls(xls_bytes: bytes) -> bytes:
"""Extract total certified bags from ICE daily stocks XLS.
Sheet structure:
Row 2: header with report date in cell [0]
Row 23: ['Total in Bags', ANT, BAR, HA/BR, HOU, MIAMI, NOLA, NY, VA, total]
"""
rows = xls_to_rows(xls_bytes)
# Extract report date from row 2, cell 0 (e.g. "As of: 1/30/2026")
header_cell = str(rows[2][0]) if len(rows) > 2 else ""
report_date = ""
if "as of" in header_cell.lower():
date_part = header_cell.lower().replace("as of:", "").replace("as of", "").strip()
try:
dt = datetime.strptime(date_part.split()[0], "%m/%d/%Y")
report_date = dt.strftime("%Y-%m-%d")
except ValueError:
pass
if not report_date:
logger.warning(f"Could not parse report date from XLS header: {header_cell!r}")
return b""
# Find "Total in Bags" row
total_bags = ""
for row in rows:
if row and str(row[0]).strip().lower() == "total in bags":
val = row[-1]
if isinstance(val, float):
total_bags = str(int(val))
else:
total_bags = str(val).replace(",", "").strip()
break
canonical_row = {
"report_date": report_date,
"total_certified_bags": total_bags,
"pending_grading_bags": "",
}
return _build_csv_bytes(["report_date", "total_certified_bags", "pending_grading_bags"], [canonical_row])
def extract_ice_stocks() -> None:
"""Download ICE certified Coffee C warehouse stocks and store as gzip CSV.
Tries static rolling CSV URL first. On 404 or error, falls back to API
discovery to find the latest 'Daily Warehouse Stocks' report.
Idempotent: skips if content hash already on disk.
"""
with niquests.Session() as session:
logger.info(f"Trying ICE rolling CSV: {ICE_ROLLING_CSV_URL}")
try:
response = session.get(ICE_ROLLING_CSV_URL, timeout=HTTP_TIMEOUT_SECONDS)
except Exception as e:
logger.warning(f"Rolling CSV fetch failed: {e} — trying API discovery")
response = None
use_api = response is None or response.status_code == 404
if use_api:
logger.info("Falling back to ICE API discovery for Daily Warehouse Stocks")
report = find_latest_report(session, ICE_STOCKS_LABEL)
if not report:
logger.error("ICE API: no 'Daily Warehouse Stocks' report found")
return
logger.info(f"Found report via API: {report['download_label']} ({report['publish_date']})")
try:
response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS)
except Exception as e:
logger.error(f"Failed to download report from API URL: {e}")
return
if response.status_code != 200:
logger.error(f"Unexpected status {response.status_code}")
return
assert len(response.content) > 0, "Downloaded empty file from ICE"
fmt = detect_file_format(response.content)
if fmt == "xls":
canonical_csv = _build_canonical_csv_from_xls(response.content)
else:
canonical_csv = _build_canonical_csv_from_csv(response.content)
if not canonical_csv:
logger.warning("ICE stocks parsed to 0 rows — check column mapping or XLS structure")
return
today = datetime.now().strftime("%Y-%m-%d")
_write_landing_file(canonical_csv, DEST_SUBDIR, today)
# ── ice_aging (monthly aging report) ────────────────────────────────────────
def _parse_aging_date(cell_value: str) -> str:
"""Parse 'As of Delivery 3/2/2026' or 'As of: 1/30/2026''2026-03-02'."""
text = str(cell_value).strip()
for prefix in ("as of delivery ", "as of:"):
if text.lower().startswith(prefix):
text = text[len(prefix):].strip()
break
date_part = text.split()[0]
try:
dt = datetime.strptime(date_part, "%m/%d/%Y")
return dt.strftime("%Y-%m-%d")
except ValueError:
return ""
def extract_ice_aging() -> None:
"""Download ICE Certified Stock Aging Report and store as gzip CSV.
Monthly report: stock quantities by age bucket × port.
Idempotent: skips if content hash already on disk.
"""
with niquests.Session() as session:
logger.info("Fetching latest ICE Aging Report via API")
report = find_latest_report(session, ICE_AGING_LABEL)
if not report:
logger.error(f"ICE API: no report matching {ICE_AGING_LABEL!r}")
return
logger.info(f"Downloading: {report['download_label']} ({report['publish_date']})")
try:
response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS)
except Exception as e:
logger.error(f"Failed to download aging report: {e}")
return
assert response.status_code == 200, f"HTTP {response.status_code}"
assert response.content[:4] == OLE2_MAGIC, "Aging report is not an XLS file"
rows = xls_to_rows(response.content)
report_date = _parse_aging_date(str(rows[0][0]) if rows else "")
if not report_date:
logger.error(f"Could not parse aging report date from row 0: {rows[0] if rows else '(empty)'!r}")
return
# Row 3+ are data rows; stop at row labelled "Total"
fieldnames = ["report_date", "age_bucket"] + AGING_PORT_HEADERS
data_rows = []
for row in rows[3:]:
if not row or not str(row[0]).strip():
continue
label = str(row[0]).strip()
if label.lower() == "total":
break
port_values = []
for cell in row[1:]:
if isinstance(cell, float):
port_values.append(str(int(cell)))
elif str(cell).strip() in ("-", ""):
port_values.append("0")
else:
port_values.append(str(cell).replace(",", "").strip())
while len(port_values) < len(AGING_PORT_HEADERS):
port_values.append("0")
port_values = port_values[:len(AGING_PORT_HEADERS)]
record = {"report_date": report_date, "age_bucket": label}
for col, val in zip(AGING_PORT_HEADERS, port_values):
record[col] = val
data_rows.append(record)
if not data_rows:
logger.warning("Aging report parsed to 0 data rows")
return
canonical_csv = _build_csv_bytes(fieldnames, data_rows)
_write_landing_file(canonical_csv, AGING_DEST_SUBDIR, report_date)
# ── ice_stocks_by_port (historical) ─────────────────────────────────────────
def _excel_serial_to_date(serial: float, datemode: int) -> str:
"""Convert Excel date serial to ISO date string, or '' on failure."""
try:
dt = xlrd.xldate_as_datetime(serial, datemode)
return dt.strftime("%Y-%m-%d")
except Exception:
return ""
def extract_ice_historical() -> None:
"""Download ICE historical end-of-month warehouse stocks by port.
Static URL updated monthly. Covers Nov 1996 to present.
Idempotent: skips if content hash already on disk.
"""
logger.info(f"Downloading ICE historical by-port XLS: {ICE_HISTORICAL_URL}")
with niquests.Session() as session:
try:
response = session.get(ICE_HISTORICAL_URL, timeout=HISTORICAL_HTTP_TIMEOUT_SECONDS)
except Exception as e:
logger.error(f"Failed to download historical XLS: {e}")
return
assert response.status_code == 200, f"HTTP {response.status_code}"
assert response.content[:4] == OLE2_MAGIC, "Historical file is not an XLS"
book = xlrd.open_workbook(file_contents=response.content)
datemode = book.datemode
rows = xls_to_rows(response.content)
# Data starts at row 8 (0-indexed); rows 0-7 are headers
fieldnames = ["report_date"] + HISTORICAL_PORT_COLS
data_rows = []
for row in rows[8:]:
if not row or len(row) < 2:
continue
serial_cell = row[1]
if not isinstance(serial_cell, float) or serial_cell <= 0:
continue
report_date = _excel_serial_to_date(serial_cell, datemode)
if not report_date:
continue
port_cells = row[2:2 + len(HISTORICAL_PORT_COLS)]
port_values = []
for cell in port_cells:
if cell == "" or str(cell).strip() in ("-", ""):
port_values.append("0")
elif isinstance(cell, float):
port_values.append(str(int(cell)))
else:
port_values.append(str(cell).replace(",", "").strip())
while len(port_values) < len(HISTORICAL_PORT_COLS):
port_values.append("0")
record = {"report_date": report_date}
for col, val in zip(HISTORICAL_PORT_COLS, port_values):
record[col] = val
data_rows.append(record)
if not data_rows:
logger.warning("Historical XLS parsed to 0 data rows")
return
canonical_csv = _build_csv_bytes(fieldnames, data_rows)
today = datetime.now().strftime("%Y-%m-%d")
_write_landing_file(canonical_csv, HISTORICAL_DEST_SUBDIR, today)
if __name__ == "__main__":
extract_ice_stocks()

View File

@@ -0,0 +1,75 @@
"""ICE Report Center API client.
Discovers report download URLs via the private JSON API at
https://www.ice.com/marketdata/api/reports/293/results
No authentication required. Results are date-descending.
"""
ICE_API_URL = "https://www.ice.com/marketdata/api/reports/293/results"
ICE_BASE_URL = "https://www.ice.com"
PRODUCT_ID_COFFEE = 2
API_TIMEOUT_SECONDS = 30
MAX_API_PAGES = 10
def fetch_report_listings(session, product_id, max_results=50, page_number=1) -> list[dict]:
"""POST to ICE API and return normalized report rows.
Each row: {publish_date, product_name, download_url, download_label}
"""
assert product_id > 0, f"product_id must be positive, got {product_id}"
assert max_results > 0, f"max_results must be positive, got {max_results}"
assert page_number > 0, f"page_number must be positive, got {page_number}"
payload = {
"offset": (page_number - 1) * max_results,
"pageNumber": page_number,
"productId": product_id,
"max": max_results,
}
response = session.post(
ICE_API_URL,
data=payload,
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=API_TIMEOUT_SECONDS,
)
assert response.status_code == 200, (
f"ICE API returned {response.status_code}"
)
data = response.json()
rows = data["datasets"]["results"]["rows"]
result = []
for row in rows:
download = row.get("download", {}) or {}
url = download.get("url", "") or ""
if url and not url.startswith("http"):
url = ICE_BASE_URL + url
result.append({
"publish_date": row.get("publishDate", ""),
"product_name": row.get("productName", ""),
"download_url": url,
"download_label": download.get("label", "") or "",
})
return result
def find_latest_report(session, label_substring, product_id=PRODUCT_ID_COFFEE) -> dict | None:
"""Return first report whose download_label contains label_substring.
Paginates up to MAX_API_PAGES. Results are date-descending so
the first match is the most recent.
"""
assert label_substring, "label_substring must not be empty"
for page in range(1, MAX_API_PAGES + 1):
rows = fetch_report_listings(session, product_id, page_number=page)
if not rows:
break
for row in rows:
if label_substring.lower() in row["download_label"].lower():
return row
return None

View File

@@ -0,0 +1,59 @@
"""XLS file format detection and row extraction.
Handles OLE2/BIFF .xls files (the format ICE uses for all reports).
Format detection via magic bytes — no extension sniffing.
"""
import xlrd
OLE2_MAGIC = b"\xd0\xcf\x11\xe0"
XLSX_MAGIC = b"PK\x03\x04"
def detect_file_format(content_bytes: bytes) -> str:
"""Return 'xls', 'xlsx', 'csv', or 'html' based on magic bytes/content."""
assert content_bytes, "content_bytes must not be empty"
if content_bytes[:4] == OLE2_MAGIC:
return "xls"
if content_bytes[:4] == XLSX_MAGIC:
return "xlsx"
# Sniff text-based formats
sample = content_bytes[:512].decode("utf-8", errors="replace").lstrip()
if sample.startswith("<"):
return "html"
return "csv"
def xls_to_rows(content_bytes: bytes, sheet_index: int = 0) -> list[list]:
"""Parse XLS bytes and return sheet rows as list of lists.
Values are returned as Python types (str, int, float, datetime, bool).
Empty cells become empty string "".
"""
assert content_bytes, "content_bytes must not be empty"
assert content_bytes[:4] == OLE2_MAGIC, (
f"Not an OLE2/BIFF XLS file (magic: {content_bytes[:4].hex()})"
)
book = xlrd.open_workbook(file_contents=content_bytes)
assert sheet_index < book.nsheets, (
f"sheet_index {sheet_index} out of range (nsheets={book.nsheets})"
)
sheet = book.sheets()[sheet_index]
rows = []
for row_idx in range(sheet.nrows):
row = []
for col_idx in range(sheet.ncols):
cell = sheet.cell(row_idx, col_idx)
if cell.ctype == xlrd.XL_CELL_EMPTY:
row.append("")
elif cell.ctype == xlrd.XL_CELL_DATE:
# Keep as raw serial — callers convert with xlrd.xldate_as_datetime
row.append(cell.value)
else:
row.append(cell.value)
rows.append(row)
return rows