ice_stocks: add backfill extractor for historical daily stocks

The ICE API at /marketdata/api/reports/293/results stores all historical
daily XLS reports date-descending. Previously the extractor only fetched
the latest. New extract_ice_backfill entry point pages through the API
and downloads all matching 'Daily Warehouse Stocks' reports.

- ice_api.py: add find_all_reports() alongside find_latest_report()
- execute.py: add extract_ice_stocks_backfill(max_pages=3) — default
  covers ~6 months; max_pages=20 fetches ~3 years of history
- pyproject.toml: register extract_ice_backfill entry point

Ran backfill: 131 files, 2025-08-15 → 2026-02-20

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-22 01:35:57 +01:00
parent 090fcb4fdb
commit c92e5a8e07
3 changed files with 80 additions and 1 deletions

View File

@@ -13,6 +13,7 @@ extract_ice = "ice_stocks.execute:extract_ice_stocks"
extract_ice_aging = "ice_stocks.execute:extract_ice_aging"
extract_ice_historical = "ice_stocks.execute:extract_ice_historical"
extract_ice_all = "ice_stocks.execute:extract_ice_all"
extract_ice_backfill = "ice_stocks.execute:extract_ice_stocks_backfill"
[build-system]
requires = ["hatchling"]

View File

@@ -31,7 +31,7 @@ from datetime import datetime
import niquests
import xlrd
from ice_stocks.ice_api import find_latest_report
from ice_stocks.ice_api import find_all_reports, find_latest_report
from ice_stocks.xls_parse import OLE2_MAGIC, detect_file_format, xls_to_rows
logging.basicConfig(
@@ -446,6 +446,64 @@ def extract_ice_historical() -> None:
_write_landing_file(canonical_csv, HISTORICAL_DEST_SUBDIR, today)
def extract_ice_stocks_backfill(max_pages: int = 3) -> None:
"""Download all available historical ICE daily stocks reports.
Pages through the ICE API (date-descending) and downloads every
'Daily Warehouse Stocks' XLS found. Each report is one trading day.
Idempotent: already-downloaded files are skipped via content hash.
max_pages=3 gives roughly 4 months of history (~120 trading days).
Use max_pages=20 to fetch all available (~3 years).
"""
assert max_pages > 0, f"max_pages must be positive, got {max_pages}"
with niquests.Session() as session:
logger.info(f"Fetching all available Daily Warehouse Stocks reports (max {max_pages} pages)...")
reports = find_all_reports(session, ICE_STOCKS_LABEL, max_pages=max_pages)
if not reports:
logger.error("ICE API: no 'Daily Warehouse Stocks' reports found")
return
logger.info(f"Found {len(reports)} reports: {reports[-1]['publish_date']}{reports[0]['publish_date']}")
downloaded = 0
skipped = 0
for report in reports:
publish_date = report["publish_date"]
try:
response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS)
except Exception as e:
logger.warning(f"Failed to download {publish_date}: {e}")
continue
if response.status_code != 200:
logger.warning(f"HTTP {response.status_code} for {publish_date} — skipping")
continue
fmt = detect_file_format(response.content)
if fmt == "xls":
canonical_csv = _build_canonical_csv_from_xls(response.content)
else:
canonical_csv = _build_canonical_csv_from_csv(response.content)
if not canonical_csv:
logger.warning(f"Parsed 0 rows for {publish_date} — skipping")
continue
# Use the report's publish date as the file date label
file_count_before = sum(1 for _ in (LANDING_DIR / DEST_SUBDIR).rglob("*.csv.gzip"))
_write_landing_file(canonical_csv, DEST_SUBDIR, publish_date)
file_count_after = sum(1 for _ in (LANDING_DIR / DEST_SUBDIR).rglob("*.csv.gzip"))
if file_count_after > file_count_before:
downloaded += 1
else:
skipped += 1
logger.info(f"Backfill complete: {downloaded} new files downloaded, {skipped} already existed")
def extract_ice_all() -> None:
"""Run all three ICE extractors: daily stocks, aging report, historical by port."""
extract_ice_stocks()

View File

@@ -56,6 +56,26 @@ def fetch_report_listings(session, product_id, max_results=50, page_number=1) ->
return result
def find_all_reports(session, label_substring, max_pages=MAX_API_PAGES, product_id=PRODUCT_ID_COFFEE) -> list[dict]:
"""Return all reports whose download_label contains label_substring.
Paginates up to max_pages. Results are date-descending.
"""
assert label_substring, "label_substring must not be empty"
assert max_pages > 0, f"max_pages must be positive, got {max_pages}"
found = []
for page in range(1, max_pages + 1):
rows = fetch_report_listings(session, product_id, page_number=page)
if not rows:
break
for row in rows:
if label_substring.lower() in row["download_label"].lower():
found.append(row)
return found
def find_latest_report(session, label_substring, product_id=PRODUCT_ID_COFFEE) -> dict | None:
"""Return first report whose download_label contains label_substring.