From c92e5a8e070e1f218c9e9564a8b55c58fb971a1c Mon Sep 17 00:00:00 2001 From: Deeman Date: Sun, 22 Feb 2026 01:35:57 +0100 Subject: [PATCH] ice_stocks: add backfill extractor for historical daily stocks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ICE API at /marketdata/api/reports/293/results stores all historical daily XLS reports date-descending. Previously the extractor only fetched the latest. New extract_ice_backfill entry point pages through the API and downloads all matching 'Daily Warehouse Stocks' reports. - ice_api.py: add find_all_reports() alongside find_latest_report() - execute.py: add extract_ice_stocks_backfill(max_pages=3) — default covers ~6 months; max_pages=20 fetches ~3 years of history - pyproject.toml: register extract_ice_backfill entry point Ran backfill: 131 files, 2025-08-15 → 2026-02-20 Co-Authored-By: Claude Sonnet 4.6 --- extract/ice_stocks/pyproject.toml | 1 + extract/ice_stocks/src/ice_stocks/execute.py | 60 +++++++++++++++++++- extract/ice_stocks/src/ice_stocks/ice_api.py | 20 +++++++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/extract/ice_stocks/pyproject.toml b/extract/ice_stocks/pyproject.toml index c894cee..b31be93 100644 --- a/extract/ice_stocks/pyproject.toml +++ b/extract/ice_stocks/pyproject.toml @@ -13,6 +13,7 @@ extract_ice = "ice_stocks.execute:extract_ice_stocks" extract_ice_aging = "ice_stocks.execute:extract_ice_aging" extract_ice_historical = "ice_stocks.execute:extract_ice_historical" extract_ice_all = "ice_stocks.execute:extract_ice_all" +extract_ice_backfill = "ice_stocks.execute:extract_ice_stocks_backfill" [build-system] requires = ["hatchling"] diff --git a/extract/ice_stocks/src/ice_stocks/execute.py b/extract/ice_stocks/src/ice_stocks/execute.py index 3eddc97..6645a6b 100644 --- a/extract/ice_stocks/src/ice_stocks/execute.py +++ b/extract/ice_stocks/src/ice_stocks/execute.py @@ -31,7 +31,7 @@ from datetime import datetime import niquests import xlrd -from ice_stocks.ice_api import find_latest_report +from ice_stocks.ice_api import find_all_reports, find_latest_report from ice_stocks.xls_parse import OLE2_MAGIC, detect_file_format, xls_to_rows logging.basicConfig( @@ -446,6 +446,64 @@ def extract_ice_historical() -> None: _write_landing_file(canonical_csv, HISTORICAL_DEST_SUBDIR, today) +def extract_ice_stocks_backfill(max_pages: int = 3) -> None: + """Download all available historical ICE daily stocks reports. + + Pages through the ICE API (date-descending) and downloads every + 'Daily Warehouse Stocks' XLS found. Each report is one trading day. + Idempotent: already-downloaded files are skipped via content hash. + + max_pages=3 gives roughly 4 months of history (~120 trading days). + Use max_pages=20 to fetch all available (~3 years). + """ + assert max_pages > 0, f"max_pages must be positive, got {max_pages}" + + with niquests.Session() as session: + logger.info(f"Fetching all available Daily Warehouse Stocks reports (max {max_pages} pages)...") + reports = find_all_reports(session, ICE_STOCKS_LABEL, max_pages=max_pages) + + if not reports: + logger.error("ICE API: no 'Daily Warehouse Stocks' reports found") + return + + logger.info(f"Found {len(reports)} reports: {reports[-1]['publish_date']} → {reports[0]['publish_date']}") + downloaded = 0 + skipped = 0 + + for report in reports: + publish_date = report["publish_date"] + try: + response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS) + except Exception as e: + logger.warning(f"Failed to download {publish_date}: {e}") + continue + + if response.status_code != 200: + logger.warning(f"HTTP {response.status_code} for {publish_date} — skipping") + continue + + fmt = detect_file_format(response.content) + if fmt == "xls": + canonical_csv = _build_canonical_csv_from_xls(response.content) + else: + canonical_csv = _build_canonical_csv_from_csv(response.content) + + if not canonical_csv: + logger.warning(f"Parsed 0 rows for {publish_date} — skipping") + continue + + # Use the report's publish date as the file date label + file_count_before = sum(1 for _ in (LANDING_DIR / DEST_SUBDIR).rglob("*.csv.gzip")) + _write_landing_file(canonical_csv, DEST_SUBDIR, publish_date) + file_count_after = sum(1 for _ in (LANDING_DIR / DEST_SUBDIR).rglob("*.csv.gzip")) + if file_count_after > file_count_before: + downloaded += 1 + else: + skipped += 1 + + logger.info(f"Backfill complete: {downloaded} new files downloaded, {skipped} already existed") + + def extract_ice_all() -> None: """Run all three ICE extractors: daily stocks, aging report, historical by port.""" extract_ice_stocks() diff --git a/extract/ice_stocks/src/ice_stocks/ice_api.py b/extract/ice_stocks/src/ice_stocks/ice_api.py index 9df44d7..6000642 100644 --- a/extract/ice_stocks/src/ice_stocks/ice_api.py +++ b/extract/ice_stocks/src/ice_stocks/ice_api.py @@ -56,6 +56,26 @@ def fetch_report_listings(session, product_id, max_results=50, page_number=1) -> return result +def find_all_reports(session, label_substring, max_pages=MAX_API_PAGES, product_id=PRODUCT_ID_COFFEE) -> list[dict]: + """Return all reports whose download_label contains label_substring. + + Paginates up to max_pages. Results are date-descending. + """ + assert label_substring, "label_substring must not be empty" + assert max_pages > 0, f"max_pages must be positive, got {max_pages}" + + found = [] + for page in range(1, max_pages + 1): + rows = fetch_report_listings(session, product_id, page_number=page) + if not rows: + break + for row in rows: + if label_substring.lower() in row["download_label"].lower(): + found.append(row) + + return found + + def find_latest_report(session, label_substring, product_id=PRODUCT_ID_COFFEE) -> dict | None: """Return first report whose download_label contains label_substring.