diff --git a/extract/ice_stocks/pyproject.toml b/extract/ice_stocks/pyproject.toml index c894cee..b31be93 100644 --- a/extract/ice_stocks/pyproject.toml +++ b/extract/ice_stocks/pyproject.toml @@ -13,6 +13,7 @@ extract_ice = "ice_stocks.execute:extract_ice_stocks" extract_ice_aging = "ice_stocks.execute:extract_ice_aging" extract_ice_historical = "ice_stocks.execute:extract_ice_historical" extract_ice_all = "ice_stocks.execute:extract_ice_all" +extract_ice_backfill = "ice_stocks.execute:extract_ice_stocks_backfill" [build-system] requires = ["hatchling"] diff --git a/extract/ice_stocks/src/ice_stocks/execute.py b/extract/ice_stocks/src/ice_stocks/execute.py index 3eddc97..6645a6b 100644 --- a/extract/ice_stocks/src/ice_stocks/execute.py +++ b/extract/ice_stocks/src/ice_stocks/execute.py @@ -31,7 +31,7 @@ from datetime import datetime import niquests import xlrd -from ice_stocks.ice_api import find_latest_report +from ice_stocks.ice_api import find_all_reports, find_latest_report from ice_stocks.xls_parse import OLE2_MAGIC, detect_file_format, xls_to_rows logging.basicConfig( @@ -446,6 +446,64 @@ def extract_ice_historical() -> None: _write_landing_file(canonical_csv, HISTORICAL_DEST_SUBDIR, today) +def extract_ice_stocks_backfill(max_pages: int = 3) -> None: + """Download all available historical ICE daily stocks reports. + + Pages through the ICE API (date-descending) and downloads every + 'Daily Warehouse Stocks' XLS found. Each report is one trading day. + Idempotent: already-downloaded files are skipped via content hash. + + max_pages=3 gives roughly 4 months of history (~120 trading days). + Use max_pages=20 to fetch all available (~3 years). + """ + assert max_pages > 0, f"max_pages must be positive, got {max_pages}" + + with niquests.Session() as session: + logger.info(f"Fetching all available Daily Warehouse Stocks reports (max {max_pages} pages)...") + reports = find_all_reports(session, ICE_STOCKS_LABEL, max_pages=max_pages) + + if not reports: + logger.error("ICE API: no 'Daily Warehouse Stocks' reports found") + return + + logger.info(f"Found {len(reports)} reports: {reports[-1]['publish_date']} → {reports[0]['publish_date']}") + downloaded = 0 + skipped = 0 + + for report in reports: + publish_date = report["publish_date"] + try: + response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS) + except Exception as e: + logger.warning(f"Failed to download {publish_date}: {e}") + continue + + if response.status_code != 200: + logger.warning(f"HTTP {response.status_code} for {publish_date} — skipping") + continue + + fmt = detect_file_format(response.content) + if fmt == "xls": + canonical_csv = _build_canonical_csv_from_xls(response.content) + else: + canonical_csv = _build_canonical_csv_from_csv(response.content) + + if not canonical_csv: + logger.warning(f"Parsed 0 rows for {publish_date} — skipping") + continue + + # Use the report's publish date as the file date label + file_count_before = sum(1 for _ in (LANDING_DIR / DEST_SUBDIR).rglob("*.csv.gzip")) + _write_landing_file(canonical_csv, DEST_SUBDIR, publish_date) + file_count_after = sum(1 for _ in (LANDING_DIR / DEST_SUBDIR).rglob("*.csv.gzip")) + if file_count_after > file_count_before: + downloaded += 1 + else: + skipped += 1 + + logger.info(f"Backfill complete: {downloaded} new files downloaded, {skipped} already existed") + + def extract_ice_all() -> None: """Run all three ICE extractors: daily stocks, aging report, historical by port.""" extract_ice_stocks() diff --git a/extract/ice_stocks/src/ice_stocks/ice_api.py b/extract/ice_stocks/src/ice_stocks/ice_api.py index 9df44d7..6000642 100644 --- a/extract/ice_stocks/src/ice_stocks/ice_api.py +++ b/extract/ice_stocks/src/ice_stocks/ice_api.py @@ -56,6 +56,26 @@ def fetch_report_listings(session, product_id, max_results=50, page_number=1) -> return result +def find_all_reports(session, label_substring, max_pages=MAX_API_PAGES, product_id=PRODUCT_ID_COFFEE) -> list[dict]: + """Return all reports whose download_label contains label_substring. + + Paginates up to max_pages. Results are date-descending. + """ + assert label_substring, "label_substring must not be empty" + assert max_pages > 0, f"max_pages must be positive, got {max_pages}" + + found = [] + for page in range(1, max_pages + 1): + rows = fetch_report_listings(session, product_id, page_number=page) + if not rows: + break + for row in rows: + if label_substring.lower() in row["download_label"].lower(): + found.append(row) + + return found + + def find_latest_report(session, label_substring, product_id=PRODUCT_ID_COFFEE) -> dict | None: """Return first report whose download_label contains label_substring.