Merge worktree-ice-extraction-overhaul: ICE extraction overhaul
API discovery + aging report + historical by-port backfill. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -5,10 +5,13 @@ description = "ICE certified warehouse stocks extractor"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = [
|
||||
"niquests>=3.14.1",
|
||||
"xlrd>=2.0.1",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
extract_ice = "ice_stocks.execute:extract_ice_stocks"
|
||||
extract_ice_aging = "ice_stocks.execute:extract_ice_aging"
|
||||
extract_ice_historical = "ice_stocks.execute:extract_ice_historical"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
|
||||
@@ -4,16 +4,18 @@ Downloads daily certified stock reports from the ICE Report Center and stores
|
||||
as gzip CSV in the landing directory. Uses SHA256 of content as the
|
||||
idempotency key — skips if a file with the same hash already exists.
|
||||
|
||||
Landing path: LANDING_DIR/ice_stocks/{year}/{date}_{hash8}.csv.gzip
|
||||
Landing paths:
|
||||
LANDING_DIR/ice_stocks/{year}/{date}_{hash8}.csv.gzip (daily rolling stocks)
|
||||
LANDING_DIR/ice_aging/{year}/{date}_{hash8}.csv.gzip (monthly aging report)
|
||||
LANDING_DIR/ice_stocks_by_port/{year}/{date}_{hash8}.csv.gzip (historical by port)
|
||||
|
||||
CSV format produced (matching raw.ice_warehouse_stocks columns):
|
||||
report_date,total_certified_bags,pending_grading_bags
|
||||
|
||||
ICE Report Center URL discovery:
|
||||
Visit https://www.theice.com/report-center and locate the
|
||||
"Coffee C Warehouse Stocks" report. The download URL has the pattern:
|
||||
https://www.theice.com/report-center/commodities/COFFEE/reports/...
|
||||
Set ICE_STOCKS_URL environment variable to the discovered URL.
|
||||
CSV schemas:
|
||||
ice_stocks: report_date,total_certified_bags,pending_grading_bags
|
||||
ice_aging: report_date,age_bucket,antwerp_bags,hamburg_bremen_bags,
|
||||
houston_bags,miami_bags,new_orleans_bags,new_york_bags,total_bags
|
||||
ice_stocks_by_port: report_date,new_york_bags,new_orleans_bags,houston_bags,
|
||||
miami_bags,antwerp_bags,hamburg_bremen_bags,barcelona_bags,
|
||||
virginia_bags,total_bags
|
||||
"""
|
||||
|
||||
import csv
|
||||
@@ -27,6 +29,10 @@ import sys
|
||||
from datetime import datetime
|
||||
|
||||
import niquests
|
||||
import xlrd
|
||||
|
||||
from ice_stocks.ice_api import find_latest_report
|
||||
from ice_stocks.xls_parse import OLE2_MAGIC, detect_file_format, xls_to_rows
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
@@ -37,23 +43,19 @@ logging.basicConfig(
|
||||
logger = logging.getLogger("ICE Stocks Extractor")
|
||||
|
||||
LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing"))
|
||||
|
||||
# ── ice_stocks (daily rolling) ──────────────────────────────────────────────
|
||||
DEST_SUBDIR = "ice_stocks"
|
||||
|
||||
# ICE Report Center URL for Coffee C certified warehouse stocks.
|
||||
# Discover by visiting https://www.theice.com/report-center and locating
|
||||
# the Coffee C warehouse stocks CSV export. Override via environment variable.
|
||||
ICE_STOCKS_URL = os.getenv(
|
||||
"ICE_STOCKS_URL",
|
||||
"https://www.theice.com/publicdocs/futures_us/exchange_notices/coffee_certifiedstocks.csv",
|
||||
# Static rolling CSV URL — try this first, fall back to API on 404.
|
||||
ICE_ROLLING_CSV_URL = (
|
||||
"https://www.theice.com/publicdocs/futures_us/exchange_notices/coffee_certifiedstocks.csv"
|
||||
)
|
||||
ICE_STOCKS_LABEL = "Daily Warehouse Stocks"
|
||||
|
||||
HTTP_TIMEOUT_SECONDS = 60
|
||||
|
||||
# Expected column names from ICE CSV (may vary — adapt to actual column names)
|
||||
# The ICE report typically has: Date, Certified Stocks (bags), Pending Grading (bags)
|
||||
# We normalize to our canonical names.
|
||||
COLUMN_MAPPINGS = {
|
||||
# Possible ICE column name → our canonical name
|
||||
"date": "report_date",
|
||||
"report date": "report_date",
|
||||
"Date": "report_date",
|
||||
@@ -66,94 +68,55 @@ COLUMN_MAPPINGS = {
|
||||
"pending grading (bags)": "pending_grading_bags",
|
||||
}
|
||||
|
||||
# ── ice_aging (monthly aging report) ────────────────────────────────────────
|
||||
ICE_AGING_LABEL = "Certified Stock Aging Report"
|
||||
AGING_DEST_SUBDIR = "ice_aging"
|
||||
|
||||
def _normalize_row(row: dict) -> dict | None:
|
||||
"""Map raw ICE CSV columns to canonical schema. Returns None if date missing."""
|
||||
normalized = {}
|
||||
for raw_key, value in row.items():
|
||||
canonical = COLUMN_MAPPINGS.get(raw_key.strip()) or COLUMN_MAPPINGS.get(raw_key.strip().lower())
|
||||
if canonical:
|
||||
# Strip commas from numeric strings (ICE uses "1,234,567" format)
|
||||
normalized[canonical] = value.strip().replace(",", "") if value else ""
|
||||
AGING_PORT_HEADERS = [
|
||||
"antwerp_bags",
|
||||
"hamburg_bremen_bags",
|
||||
"houston_bags",
|
||||
"miami_bags",
|
||||
"new_orleans_bags",
|
||||
"new_york_bags",
|
||||
"total_bags",
|
||||
]
|
||||
|
||||
if "report_date" not in normalized or not normalized["report_date"]:
|
||||
return None
|
||||
# ── ice_stocks_by_port (historical end-of-month) ─────────────────────────────
|
||||
ICE_HISTORICAL_URL = (
|
||||
"https://www.ice.com/publicdocs/futures_us_reports/coffee/"
|
||||
"EOM_KC_cert_stox_by_port_nov96-present.xls"
|
||||
)
|
||||
HISTORICAL_DEST_SUBDIR = "ice_stocks_by_port"
|
||||
HISTORICAL_HTTP_TIMEOUT_SECONDS = 120
|
||||
|
||||
# Fill missing optional columns with empty string
|
||||
normalized.setdefault("total_certified_bags", "")
|
||||
normalized.setdefault("pending_grading_bags", "")
|
||||
|
||||
return normalized
|
||||
HISTORICAL_PORT_COLS = [
|
||||
"new_york_bags",
|
||||
"new_orleans_bags",
|
||||
"houston_bags",
|
||||
"miami_bags",
|
||||
"antwerp_bags",
|
||||
"hamburg_bremen_bags",
|
||||
"barcelona_bags",
|
||||
"virginia_bags",
|
||||
"total_bags",
|
||||
]
|
||||
|
||||
|
||||
def _build_canonical_csv(raw_content: bytes) -> bytes:
|
||||
"""Parse ICE CSV and emit canonical CSV with our column schema."""
|
||||
text = raw_content.decode("utf-8", errors="replace")
|
||||
reader = csv.DictReader(io.StringIO(text))
|
||||
# ── shared helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
rows = []
|
||||
for row in reader:
|
||||
normalized = _normalize_row(row)
|
||||
if normalized:
|
||||
rows.append(normalized)
|
||||
def _write_landing_file(canonical_csv: bytes, dest_subdir: str, date_label: str) -> None:
|
||||
"""SHA256-hash canonical_csv, skip if exists, else gzip and write."""
|
||||
assert canonical_csv, "canonical_csv must not be empty"
|
||||
assert dest_subdir, "dest_subdir must not be empty"
|
||||
assert date_label, "date_label must not be empty"
|
||||
|
||||
if not rows:
|
||||
return b""
|
||||
|
||||
out = io.StringIO()
|
||||
writer = csv.DictWriter(out, fieldnames=["report_date", "total_certified_bags", "pending_grading_bags"])
|
||||
writer.writeheader()
|
||||
writer.writerows(rows)
|
||||
return out.getvalue().encode("utf-8")
|
||||
|
||||
|
||||
def extract_ice_stocks() -> None:
|
||||
"""Download ICE certified Coffee C warehouse stocks and store as gzip CSV.
|
||||
|
||||
Idempotent: computes SHA256 of canonical CSV bytes, skips if already on disk.
|
||||
The ICE report is a rolling file (same URL, updated daily) — we detect
|
||||
changes via content hash.
|
||||
"""
|
||||
logger.info(f"Downloading ICE warehouse stocks from: {ICE_STOCKS_URL}")
|
||||
|
||||
with niquests.Session() as session:
|
||||
try:
|
||||
response = session.get(ICE_STOCKS_URL, timeout=HTTP_TIMEOUT_SECONDS)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to connect to ICE Report Center: {e}\n"
|
||||
"If the URL has changed, set ICE_STOCKS_URL environment variable.\n"
|
||||
"Visit https://www.theice.com/report-center to find the current URL."
|
||||
)
|
||||
return
|
||||
|
||||
if response.status_code == 404:
|
||||
logger.warning(
|
||||
"ICE stocks URL returned 404. The report URL may have changed.\n"
|
||||
"Visit https://www.theice.com/report-center to find the current URL,\n"
|
||||
"then set ICE_STOCKS_URL environment variable."
|
||||
)
|
||||
return
|
||||
|
||||
assert response.status_code == 200, (
|
||||
f"Unexpected status {response.status_code} from {ICE_STOCKS_URL}"
|
||||
)
|
||||
assert len(response.content) > 0, "Downloaded empty file from ICE"
|
||||
|
||||
canonical_csv = _build_canonical_csv(response.content)
|
||||
if not canonical_csv:
|
||||
logger.warning("ICE CSV parsed to 0 rows — column mapping may need updating")
|
||||
return
|
||||
|
||||
# Hash-based idempotency
|
||||
sha256 = hashlib.sha256(canonical_csv).hexdigest()
|
||||
etag = sha256[:8]
|
||||
year = date_label[:4]
|
||||
|
||||
today = datetime.now().strftime("%Y-%m-%d")
|
||||
year = datetime.now().strftime("%Y")
|
||||
|
||||
dest_dir = LANDING_DIR / DEST_SUBDIR / year
|
||||
local_file = dest_dir / f"{today}_{etag}.csv.gzip"
|
||||
dest_dir = LANDING_DIR / dest_subdir / year
|
||||
local_file = dest_dir / f"{date_label}_{etag}.csv.gzip"
|
||||
|
||||
if local_file.exists():
|
||||
logger.info(f"File {local_file.name} already exists — content unchanged, skipping")
|
||||
@@ -169,5 +132,302 @@ def extract_ice_stocks() -> None:
|
||||
logger.info(f"Stored {local_file} ({local_file.stat().st_size:,} bytes)")
|
||||
|
||||
|
||||
def _build_csv_bytes(fieldnames: list[str], rows: list[dict]) -> bytes:
|
||||
"""Serialize list of dicts to CSV bytes."""
|
||||
out = io.StringIO()
|
||||
writer = csv.DictWriter(out, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
writer.writerows(rows)
|
||||
return out.getvalue().encode("utf-8")
|
||||
|
||||
|
||||
# ── ice_stocks (daily rolling) ───────────────────────────────────────────────
|
||||
|
||||
def _normalize_row(row: dict) -> dict | None:
|
||||
"""Map raw ICE CSV columns to canonical schema. Returns None if date missing."""
|
||||
normalized = {}
|
||||
for raw_key, value in row.items():
|
||||
canonical = COLUMN_MAPPINGS.get(raw_key.strip()) or COLUMN_MAPPINGS.get(raw_key.strip().lower())
|
||||
if canonical:
|
||||
normalized[canonical] = value.strip().replace(",", "") if value else ""
|
||||
|
||||
if "report_date" not in normalized or not normalized["report_date"]:
|
||||
return None
|
||||
|
||||
normalized.setdefault("total_certified_bags", "")
|
||||
normalized.setdefault("pending_grading_bags", "")
|
||||
return normalized
|
||||
|
||||
|
||||
def _build_canonical_csv_from_csv(raw_content: bytes) -> bytes:
|
||||
"""Parse ICE CSV bytes and emit canonical CSV."""
|
||||
text = raw_content.decode("utf-8", errors="replace")
|
||||
reader = csv.DictReader(io.StringIO(text))
|
||||
|
||||
rows = []
|
||||
for row in reader:
|
||||
normalized = _normalize_row(row)
|
||||
if normalized:
|
||||
rows.append(normalized)
|
||||
|
||||
if not rows:
|
||||
return b""
|
||||
|
||||
return _build_csv_bytes(["report_date", "total_certified_bags", "pending_grading_bags"], rows)
|
||||
|
||||
|
||||
def _build_canonical_csv_from_xls(xls_bytes: bytes) -> bytes:
|
||||
"""Extract total certified bags from ICE daily stocks XLS.
|
||||
|
||||
Sheet structure:
|
||||
Row 2: header with report date in cell [0]
|
||||
Row 23: ['Total in Bags', ANT, BAR, HA/BR, HOU, MIAMI, NOLA, NY, VA, total]
|
||||
"""
|
||||
rows = xls_to_rows(xls_bytes)
|
||||
|
||||
# Extract report date from row 2, cell 0 (e.g. "As of: 1/30/2026")
|
||||
header_cell = str(rows[2][0]) if len(rows) > 2 else ""
|
||||
report_date = ""
|
||||
if "as of" in header_cell.lower():
|
||||
date_part = header_cell.lower().replace("as of:", "").replace("as of", "").strip()
|
||||
try:
|
||||
dt = datetime.strptime(date_part.split()[0], "%m/%d/%Y")
|
||||
report_date = dt.strftime("%Y-%m-%d")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if not report_date:
|
||||
logger.warning(f"Could not parse report date from XLS header: {header_cell!r}")
|
||||
return b""
|
||||
|
||||
# Find "Total in Bags" row
|
||||
total_bags = ""
|
||||
for row in rows:
|
||||
if row and str(row[0]).strip().lower() == "total in bags":
|
||||
val = row[-1]
|
||||
if isinstance(val, float):
|
||||
total_bags = str(int(val))
|
||||
else:
|
||||
total_bags = str(val).replace(",", "").strip()
|
||||
break
|
||||
|
||||
canonical_row = {
|
||||
"report_date": report_date,
|
||||
"total_certified_bags": total_bags,
|
||||
"pending_grading_bags": "",
|
||||
}
|
||||
return _build_csv_bytes(["report_date", "total_certified_bags", "pending_grading_bags"], [canonical_row])
|
||||
|
||||
|
||||
def extract_ice_stocks() -> None:
|
||||
"""Download ICE certified Coffee C warehouse stocks and store as gzip CSV.
|
||||
|
||||
Tries static rolling CSV URL first. On 404 or error, falls back to API
|
||||
discovery to find the latest 'Daily Warehouse Stocks' report.
|
||||
Idempotent: skips if content hash already on disk.
|
||||
"""
|
||||
with niquests.Session() as session:
|
||||
logger.info(f"Trying ICE rolling CSV: {ICE_ROLLING_CSV_URL}")
|
||||
try:
|
||||
response = session.get(ICE_ROLLING_CSV_URL, timeout=HTTP_TIMEOUT_SECONDS)
|
||||
except Exception as e:
|
||||
logger.warning(f"Rolling CSV fetch failed: {e} — trying API discovery")
|
||||
response = None
|
||||
|
||||
use_api = response is None or response.status_code == 404
|
||||
|
||||
if use_api:
|
||||
logger.info("Falling back to ICE API discovery for Daily Warehouse Stocks")
|
||||
report = find_latest_report(session, ICE_STOCKS_LABEL)
|
||||
if not report:
|
||||
logger.error("ICE API: no 'Daily Warehouse Stocks' report found")
|
||||
return
|
||||
logger.info(f"Found report via API: {report['download_label']} ({report['publish_date']})")
|
||||
try:
|
||||
response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to download report from API URL: {e}")
|
||||
return
|
||||
|
||||
if response.status_code != 200:
|
||||
logger.error(f"Unexpected status {response.status_code}")
|
||||
return
|
||||
|
||||
assert len(response.content) > 0, "Downloaded empty file from ICE"
|
||||
|
||||
fmt = detect_file_format(response.content)
|
||||
if fmt == "xls":
|
||||
canonical_csv = _build_canonical_csv_from_xls(response.content)
|
||||
else:
|
||||
canonical_csv = _build_canonical_csv_from_csv(response.content)
|
||||
|
||||
if not canonical_csv:
|
||||
logger.warning("ICE stocks parsed to 0 rows — check column mapping or XLS structure")
|
||||
return
|
||||
|
||||
today = datetime.now().strftime("%Y-%m-%d")
|
||||
_write_landing_file(canonical_csv, DEST_SUBDIR, today)
|
||||
|
||||
|
||||
# ── ice_aging (monthly aging report) ────────────────────────────────────────
|
||||
|
||||
def _parse_aging_date(cell_value: str) -> str:
|
||||
"""Parse 'As of Delivery 3/2/2026' or 'As of: 1/30/2026' → '2026-03-02'."""
|
||||
text = str(cell_value).strip()
|
||||
for prefix in ("as of delivery ", "as of:"):
|
||||
if text.lower().startswith(prefix):
|
||||
text = text[len(prefix):].strip()
|
||||
break
|
||||
date_part = text.split()[0]
|
||||
try:
|
||||
dt = datetime.strptime(date_part, "%m/%d/%Y")
|
||||
return dt.strftime("%Y-%m-%d")
|
||||
except ValueError:
|
||||
return ""
|
||||
|
||||
|
||||
def extract_ice_aging() -> None:
|
||||
"""Download ICE Certified Stock Aging Report and store as gzip CSV.
|
||||
|
||||
Monthly report: stock quantities by age bucket × port.
|
||||
Idempotent: skips if content hash already on disk.
|
||||
"""
|
||||
with niquests.Session() as session:
|
||||
logger.info("Fetching latest ICE Aging Report via API")
|
||||
report = find_latest_report(session, ICE_AGING_LABEL)
|
||||
if not report:
|
||||
logger.error(f"ICE API: no report matching {ICE_AGING_LABEL!r}")
|
||||
return
|
||||
|
||||
logger.info(f"Downloading: {report['download_label']} ({report['publish_date']})")
|
||||
try:
|
||||
response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to download aging report: {e}")
|
||||
return
|
||||
|
||||
assert response.status_code == 200, f"HTTP {response.status_code}"
|
||||
assert response.content[:4] == OLE2_MAGIC, "Aging report is not an XLS file"
|
||||
|
||||
rows = xls_to_rows(response.content)
|
||||
|
||||
report_date = _parse_aging_date(str(rows[0][0]) if rows else "")
|
||||
if not report_date:
|
||||
logger.error(f"Could not parse aging report date from row 0: {rows[0] if rows else '(empty)'!r}")
|
||||
return
|
||||
|
||||
# Row 3+ are data rows; stop at row labelled "Total"
|
||||
fieldnames = ["report_date", "age_bucket"] + AGING_PORT_HEADERS
|
||||
data_rows = []
|
||||
|
||||
for row in rows[3:]:
|
||||
if not row or not str(row[0]).strip():
|
||||
continue
|
||||
label = str(row[0]).strip()
|
||||
if label.lower() == "total":
|
||||
break
|
||||
|
||||
port_values = []
|
||||
for cell in row[1:]:
|
||||
if isinstance(cell, float):
|
||||
port_values.append(str(int(cell)))
|
||||
elif str(cell).strip() in ("-", ""):
|
||||
port_values.append("0")
|
||||
else:
|
||||
port_values.append(str(cell).replace(",", "").strip())
|
||||
|
||||
while len(port_values) < len(AGING_PORT_HEADERS):
|
||||
port_values.append("0")
|
||||
port_values = port_values[:len(AGING_PORT_HEADERS)]
|
||||
|
||||
record = {"report_date": report_date, "age_bucket": label}
|
||||
for col, val in zip(AGING_PORT_HEADERS, port_values):
|
||||
record[col] = val
|
||||
data_rows.append(record)
|
||||
|
||||
if not data_rows:
|
||||
logger.warning("Aging report parsed to 0 data rows")
|
||||
return
|
||||
|
||||
canonical_csv = _build_csv_bytes(fieldnames, data_rows)
|
||||
_write_landing_file(canonical_csv, AGING_DEST_SUBDIR, report_date)
|
||||
|
||||
|
||||
# ── ice_stocks_by_port (historical) ─────────────────────────────────────────
|
||||
|
||||
def _excel_serial_to_date(serial: float, datemode: int) -> str:
|
||||
"""Convert Excel date serial to ISO date string, or '' on failure."""
|
||||
try:
|
||||
dt = xlrd.xldate_as_datetime(serial, datemode)
|
||||
return dt.strftime("%Y-%m-%d")
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def extract_ice_historical() -> None:
|
||||
"""Download ICE historical end-of-month warehouse stocks by port.
|
||||
|
||||
Static URL updated monthly. Covers Nov 1996 to present.
|
||||
Idempotent: skips if content hash already on disk.
|
||||
"""
|
||||
logger.info(f"Downloading ICE historical by-port XLS: {ICE_HISTORICAL_URL}")
|
||||
|
||||
with niquests.Session() as session:
|
||||
try:
|
||||
response = session.get(ICE_HISTORICAL_URL, timeout=HISTORICAL_HTTP_TIMEOUT_SECONDS)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to download historical XLS: {e}")
|
||||
return
|
||||
|
||||
assert response.status_code == 200, f"HTTP {response.status_code}"
|
||||
assert response.content[:4] == OLE2_MAGIC, "Historical file is not an XLS"
|
||||
|
||||
book = xlrd.open_workbook(file_contents=response.content)
|
||||
datemode = book.datemode
|
||||
rows = xls_to_rows(response.content)
|
||||
|
||||
# Data starts at row 8 (0-indexed); rows 0-7 are headers
|
||||
fieldnames = ["report_date"] + HISTORICAL_PORT_COLS
|
||||
data_rows = []
|
||||
|
||||
for row in rows[8:]:
|
||||
if not row or len(row) < 2:
|
||||
continue
|
||||
|
||||
serial_cell = row[1]
|
||||
if not isinstance(serial_cell, float) or serial_cell <= 0:
|
||||
continue
|
||||
|
||||
report_date = _excel_serial_to_date(serial_cell, datemode)
|
||||
if not report_date:
|
||||
continue
|
||||
|
||||
port_cells = row[2:2 + len(HISTORICAL_PORT_COLS)]
|
||||
port_values = []
|
||||
for cell in port_cells:
|
||||
if cell == "" or str(cell).strip() in ("-", ""):
|
||||
port_values.append("0")
|
||||
elif isinstance(cell, float):
|
||||
port_values.append(str(int(cell)))
|
||||
else:
|
||||
port_values.append(str(cell).replace(",", "").strip())
|
||||
|
||||
while len(port_values) < len(HISTORICAL_PORT_COLS):
|
||||
port_values.append("0")
|
||||
|
||||
record = {"report_date": report_date}
|
||||
for col, val in zip(HISTORICAL_PORT_COLS, port_values):
|
||||
record[col] = val
|
||||
data_rows.append(record)
|
||||
|
||||
if not data_rows:
|
||||
logger.warning("Historical XLS parsed to 0 data rows")
|
||||
return
|
||||
|
||||
canonical_csv = _build_csv_bytes(fieldnames, data_rows)
|
||||
today = datetime.now().strftime("%Y-%m-%d")
|
||||
_write_landing_file(canonical_csv, HISTORICAL_DEST_SUBDIR, today)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
extract_ice_stocks()
|
||||
|
||||
75
extract/ice_stocks/src/ice_stocks/ice_api.py
Normal file
75
extract/ice_stocks/src/ice_stocks/ice_api.py
Normal file
@@ -0,0 +1,75 @@
|
||||
"""ICE Report Center API client.
|
||||
|
||||
Discovers report download URLs via the private JSON API at
|
||||
https://www.ice.com/marketdata/api/reports/293/results
|
||||
No authentication required. Results are date-descending.
|
||||
"""
|
||||
|
||||
ICE_API_URL = "https://www.ice.com/marketdata/api/reports/293/results"
|
||||
ICE_BASE_URL = "https://www.ice.com"
|
||||
PRODUCT_ID_COFFEE = 2
|
||||
API_TIMEOUT_SECONDS = 30
|
||||
MAX_API_PAGES = 10
|
||||
|
||||
|
||||
def fetch_report_listings(session, product_id, max_results=50, page_number=1) -> list[dict]:
|
||||
"""POST to ICE API and return normalized report rows.
|
||||
|
||||
Each row: {publish_date, product_name, download_url, download_label}
|
||||
"""
|
||||
assert product_id > 0, f"product_id must be positive, got {product_id}"
|
||||
assert max_results > 0, f"max_results must be positive, got {max_results}"
|
||||
assert page_number > 0, f"page_number must be positive, got {page_number}"
|
||||
|
||||
payload = {
|
||||
"offset": (page_number - 1) * max_results,
|
||||
"pageNumber": page_number,
|
||||
"productId": product_id,
|
||||
"max": max_results,
|
||||
}
|
||||
response = session.post(
|
||||
ICE_API_URL,
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
timeout=API_TIMEOUT_SECONDS,
|
||||
)
|
||||
assert response.status_code == 200, (
|
||||
f"ICE API returned {response.status_code}"
|
||||
)
|
||||
|
||||
data = response.json()
|
||||
rows = data["datasets"]["results"]["rows"]
|
||||
|
||||
result = []
|
||||
for row in rows:
|
||||
download = row.get("download", {}) or {}
|
||||
url = download.get("url", "") or ""
|
||||
if url and not url.startswith("http"):
|
||||
url = ICE_BASE_URL + url
|
||||
result.append({
|
||||
"publish_date": row.get("publishDate", ""),
|
||||
"product_name": row.get("productName", ""),
|
||||
"download_url": url,
|
||||
"download_label": download.get("label", "") or "",
|
||||
})
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def find_latest_report(session, label_substring, product_id=PRODUCT_ID_COFFEE) -> dict | None:
|
||||
"""Return first report whose download_label contains label_substring.
|
||||
|
||||
Paginates up to MAX_API_PAGES. Results are date-descending so
|
||||
the first match is the most recent.
|
||||
"""
|
||||
assert label_substring, "label_substring must not be empty"
|
||||
|
||||
for page in range(1, MAX_API_PAGES + 1):
|
||||
rows = fetch_report_listings(session, product_id, page_number=page)
|
||||
if not rows:
|
||||
break
|
||||
for row in rows:
|
||||
if label_substring.lower() in row["download_label"].lower():
|
||||
return row
|
||||
|
||||
return None
|
||||
59
extract/ice_stocks/src/ice_stocks/xls_parse.py
Normal file
59
extract/ice_stocks/src/ice_stocks/xls_parse.py
Normal file
@@ -0,0 +1,59 @@
|
||||
"""XLS file format detection and row extraction.
|
||||
|
||||
Handles OLE2/BIFF .xls files (the format ICE uses for all reports).
|
||||
Format detection via magic bytes — no extension sniffing.
|
||||
"""
|
||||
|
||||
import xlrd
|
||||
|
||||
OLE2_MAGIC = b"\xd0\xcf\x11\xe0"
|
||||
XLSX_MAGIC = b"PK\x03\x04"
|
||||
|
||||
|
||||
def detect_file_format(content_bytes: bytes) -> str:
|
||||
"""Return 'xls', 'xlsx', 'csv', or 'html' based on magic bytes/content."""
|
||||
assert content_bytes, "content_bytes must not be empty"
|
||||
|
||||
if content_bytes[:4] == OLE2_MAGIC:
|
||||
return "xls"
|
||||
if content_bytes[:4] == XLSX_MAGIC:
|
||||
return "xlsx"
|
||||
# Sniff text-based formats
|
||||
sample = content_bytes[:512].decode("utf-8", errors="replace").lstrip()
|
||||
if sample.startswith("<"):
|
||||
return "html"
|
||||
return "csv"
|
||||
|
||||
|
||||
def xls_to_rows(content_bytes: bytes, sheet_index: int = 0) -> list[list]:
|
||||
"""Parse XLS bytes and return sheet rows as list of lists.
|
||||
|
||||
Values are returned as Python types (str, int, float, datetime, bool).
|
||||
Empty cells become empty string "".
|
||||
"""
|
||||
assert content_bytes, "content_bytes must not be empty"
|
||||
assert content_bytes[:4] == OLE2_MAGIC, (
|
||||
f"Not an OLE2/BIFF XLS file (magic: {content_bytes[:4].hex()})"
|
||||
)
|
||||
|
||||
book = xlrd.open_workbook(file_contents=content_bytes)
|
||||
assert sheet_index < book.nsheets, (
|
||||
f"sheet_index {sheet_index} out of range (nsheets={book.nsheets})"
|
||||
)
|
||||
sheet = book.sheets()[sheet_index]
|
||||
|
||||
rows = []
|
||||
for row_idx in range(sheet.nrows):
|
||||
row = []
|
||||
for col_idx in range(sheet.ncols):
|
||||
cell = sheet.cell(row_idx, col_idx)
|
||||
if cell.ctype == xlrd.XL_CELL_EMPTY:
|
||||
row.append("")
|
||||
elif cell.ctype == xlrd.XL_CELL_DATE:
|
||||
# Keep as raw serial — callers convert with xlrd.xldate_as_datetime
|
||||
row.append(cell.value)
|
||||
else:
|
||||
row.append(cell.value)
|
||||
rows.append(row)
|
||||
|
||||
return rows
|
||||
@@ -36,6 +36,7 @@ dev = [
|
||||
"pytest-cov>=7.0.0",
|
||||
"pyyaml>=6.0.2",
|
||||
"ruff>=0.9.9",
|
||||
"xlwt>=1.3.0",
|
||||
]
|
||||
|
||||
[tool.uv.sources]
|
||||
|
||||
@@ -28,6 +28,14 @@ PIPELINES = {
|
||||
"command": ["uv", "run", "--package", "ice_stocks", "extract_ice"],
|
||||
"timeout_seconds": 600,
|
||||
},
|
||||
"extract_ice_aging": {
|
||||
"command": ["uv", "run", "--package", "ice_stocks", "extract_ice_aging"],
|
||||
"timeout_seconds": 600,
|
||||
},
|
||||
"extract_ice_historical": {
|
||||
"command": ["uv", "run", "--package", "ice_stocks", "extract_ice_historical"],
|
||||
"timeout_seconds": 600,
|
||||
},
|
||||
"transform": {
|
||||
"command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"],
|
||||
"timeout_seconds": 3600,
|
||||
|
||||
184
tests/test_ice_extraction.py
Normal file
184
tests/test_ice_extraction.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""Tests for ICE extraction: format detection, XLS parsing, API client."""
|
||||
|
||||
import csv
|
||||
import gzip
|
||||
import io
|
||||
import struct
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import xlwt # noqa: F401 — needed to create XLS fixtures; skip tests if missing
|
||||
|
||||
from ice_stocks.ice_api import fetch_report_listings, find_latest_report
|
||||
from ice_stocks.xls_parse import OLE2_MAGIC, detect_file_format, xls_to_rows
|
||||
|
||||
# ── helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def _make_xls_bytes(rows: list[list]) -> bytes:
|
||||
"""Create a minimal in-memory XLS with one sheet."""
|
||||
book = xlwt.Workbook()
|
||||
sheet = book.add_sheet("Sheet1")
|
||||
for r, row in enumerate(rows):
|
||||
for c, val in enumerate(row):
|
||||
sheet.write(r, c, val)
|
||||
buf = io.BytesIO()
|
||||
book.save(buf)
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
def _api_response(rows: list[dict]) -> dict:
|
||||
"""Build a mock ICE API response payload."""
|
||||
return {"datasets": {"results": {"rows": rows}}}
|
||||
|
||||
|
||||
def _make_api_row(label: str, url: str = "/download/test.xls", publish_date: str = "2026-02-01") -> dict:
|
||||
return {
|
||||
"publishDate": publish_date,
|
||||
"productName": "Coffee C",
|
||||
"download": {"url": url, "label": label},
|
||||
}
|
||||
|
||||
|
||||
# ── detect_file_format ───────────────────────────────────────────────────────
|
||||
|
||||
def test_detect_file_format_xls():
|
||||
content = OLE2_MAGIC + b"\x00" * 100
|
||||
assert detect_file_format(content) == "xls"
|
||||
|
||||
|
||||
def test_detect_file_format_xlsx():
|
||||
content = b"PK\x03\x04" + b"\x00" * 100
|
||||
assert detect_file_format(content) == "xlsx"
|
||||
|
||||
|
||||
def test_detect_file_format_html():
|
||||
content = b"<html><body>foo</body></html>"
|
||||
assert detect_file_format(content) == "html"
|
||||
|
||||
|
||||
def test_detect_file_format_csv():
|
||||
content = b"report_date,total_certified_bags\n2026-01-01,100000\n"
|
||||
assert detect_file_format(content) == "csv"
|
||||
|
||||
|
||||
# ── xls_to_rows ──────────────────────────────────────────────────────────────
|
||||
|
||||
def test_xls_to_rows_roundtrip():
|
||||
pytest.importorskip("xlwt")
|
||||
input_rows = [
|
||||
["header1", "header2", "header3"],
|
||||
["value1", 42.0, "value3"],
|
||||
["", 0.0, ""],
|
||||
]
|
||||
xls_bytes = _make_xls_bytes(input_rows)
|
||||
assert xls_bytes[:4] == OLE2_MAGIC
|
||||
|
||||
result = xls_to_rows(xls_bytes)
|
||||
assert len(result) == 3
|
||||
assert result[0][0] == "header1"
|
||||
assert result[1][1] == 42.0
|
||||
# Empty cells come back as ""
|
||||
assert result[2][0] == ""
|
||||
|
||||
|
||||
def test_xls_to_rows_rejects_non_xls():
|
||||
with pytest.raises(AssertionError, match="Not an OLE2"):
|
||||
xls_to_rows(b"PK\x03\x04" + b"\x00" * 100)
|
||||
|
||||
|
||||
# ── fetch_report_listings ────────────────────────────────────────────────────
|
||||
|
||||
def test_fetch_report_listings_parses_response():
|
||||
mock_session = MagicMock()
|
||||
mock_session.post.return_value.status_code = 200
|
||||
mock_session.post.return_value.json.return_value = _api_response([
|
||||
_make_api_row("Daily Warehouse Stocks", "/dl/stocks.xls"),
|
||||
_make_api_row("Certified Stock Aging Report", "/dl/aging.xls"),
|
||||
])
|
||||
|
||||
from ice_stocks.ice_api import ICE_BASE_URL, fetch_report_listings
|
||||
rows = fetch_report_listings(mock_session, product_id=2)
|
||||
|
||||
assert len(rows) == 2
|
||||
assert rows[0]["download_label"] == "Daily Warehouse Stocks"
|
||||
assert rows[0]["download_url"] == ICE_BASE_URL + "/dl/stocks.xls"
|
||||
assert rows[1]["download_label"] == "Certified Stock Aging Report"
|
||||
|
||||
|
||||
def test_fetch_report_listings_prepends_base_url_for_absolute():
|
||||
"""If URL already starts with http, don't prepend base."""
|
||||
mock_session = MagicMock()
|
||||
mock_session.post.return_value.status_code = 200
|
||||
mock_session.post.return_value.json.return_value = _api_response([
|
||||
_make_api_row("Test", "https://other.example.com/file.xls"),
|
||||
])
|
||||
|
||||
from ice_stocks.ice_api import fetch_report_listings
|
||||
rows = fetch_report_listings(mock_session, product_id=2)
|
||||
assert rows[0]["download_url"] == "https://other.example.com/file.xls"
|
||||
|
||||
|
||||
# ── find_latest_report ───────────────────────────────────────────────────────
|
||||
|
||||
def test_find_latest_report_label_match():
|
||||
mock_session = MagicMock()
|
||||
mock_session.post.return_value.status_code = 200
|
||||
mock_session.post.return_value.json.return_value = _api_response([
|
||||
_make_api_row("Daily Warehouse Stocks"),
|
||||
_make_api_row("Certified Stock Aging Report"),
|
||||
_make_api_row("Historical Stocks"),
|
||||
])
|
||||
|
||||
result = find_latest_report(mock_session, "Aging Report")
|
||||
assert result is not None
|
||||
assert result["download_label"] == "Certified Stock Aging Report"
|
||||
|
||||
|
||||
def test_find_latest_report_no_match_returns_none():
|
||||
mock_session = MagicMock()
|
||||
mock_session.post.return_value.status_code = 200
|
||||
# Return empty rows on all pages
|
||||
mock_session.post.return_value.json.return_value = _api_response([])
|
||||
|
||||
result = find_latest_report(mock_session, "Nonexistent Label XYZ")
|
||||
assert result is None
|
||||
|
||||
|
||||
# ── canonical CSV output ──────────────────────────────────────────────────────
|
||||
|
||||
def test_build_canonical_csv_from_xls_rows():
|
||||
"""Verify execute._build_canonical_csv_from_xls produces correct schema."""
|
||||
pytest.importorskip("xlwt")
|
||||
|
||||
# Simulate ICE daily stocks XLS structure
|
||||
sheet_rows = [
|
||||
["Coffee C Warehouse Stocks"] + [""] * 9, # row 0
|
||||
[""] * 10, # row 1
|
||||
["As of: 2/14/2026"] + [""] * 9, # row 2 — report date
|
||||
] + [[""] * 10] * 20 + [ # rows 3-22
|
||||
["Total in Bags", 10000.0, 5000.0, 2000.0, 1000.0, 500.0, 0.0, 0.0, 0.0, 18500.0], # row 23
|
||||
]
|
||||
|
||||
xls_bytes = _make_xls_bytes(sheet_rows)
|
||||
|
||||
from ice_stocks.execute import _build_canonical_csv_from_xls
|
||||
result = _build_canonical_csv_from_xls(xls_bytes)
|
||||
|
||||
assert result, "Expected non-empty CSV output"
|
||||
reader = csv.DictReader(io.StringIO(result.decode("utf-8")))
|
||||
rows = list(reader)
|
||||
assert len(rows) == 1
|
||||
assert rows[0]["report_date"] == "2026-02-14"
|
||||
assert rows[0]["total_certified_bags"] == "18500"
|
||||
|
||||
|
||||
def test_build_canonical_csv_from_xls_missing_date_returns_empty():
|
||||
"""If header row has no parseable date, return empty bytes."""
|
||||
pytest.importorskip("xlwt")
|
||||
|
||||
sheet_rows = [["Not a valid date header"] + [""] * 9] + [[""] * 10] * 30
|
||||
xls_bytes = _make_xls_bytes(sheet_rows)
|
||||
|
||||
from ice_stocks.execute import _build_canonical_csv_from_xls
|
||||
result = _build_canonical_csv_from_xls(xls_bytes)
|
||||
assert result == b""
|
||||
@@ -29,3 +29,17 @@ def ice_stocks_glob(evaluator) -> str:
|
||||
"""Return a quoted glob path for all ICE warehouse stock CSV gzip files under LANDING_DIR."""
|
||||
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
|
||||
return f"'{landing_dir}/ice_stocks/**/*.csv.gzip'"
|
||||
|
||||
|
||||
@macro()
|
||||
def ice_aging_glob(evaluator) -> str:
|
||||
"""Return a quoted glob path for all ICE aging report CSV gzip files under LANDING_DIR."""
|
||||
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
|
||||
return f"'{landing_dir}/ice_aging/**/*.csv.gzip'"
|
||||
|
||||
|
||||
@macro()
|
||||
def ice_stocks_by_port_glob(evaluator) -> str:
|
||||
"""Return a quoted glob path for all ICE historical by-port CSV gzip files under LANDING_DIR."""
|
||||
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
|
||||
return f"'{landing_dir}/ice_stocks_by_port/**/*.csv.gzip'"
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
-- Foundation fact: ICE certified Coffee C (Arabica) aging report.
|
||||
--
|
||||
-- Casts raw varchar columns to proper types and deduplicates via hash key.
|
||||
-- Grain: one row per (report_date, age_bucket).
|
||||
-- Age buckets represent how long coffee has been in certified storage.
|
||||
-- Port columns are in bags (60kg).
|
||||
|
||||
MODEL (
|
||||
name foundation.fct_ice_aging_stocks,
|
||||
kind INCREMENTAL_BY_TIME_RANGE (
|
||||
time_column report_date
|
||||
),
|
||||
grain (report_date, age_bucket),
|
||||
start '2020-01-01',
|
||||
cron '@daily'
|
||||
);
|
||||
|
||||
WITH cast_and_clean AS (
|
||||
SELECT
|
||||
TRY_CAST(report_date AS date) AS report_date,
|
||||
age_bucket,
|
||||
TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags,
|
||||
TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags,
|
||||
TRY_CAST(houston_bags AS bigint) AS houston_bags,
|
||||
TRY_CAST(miami_bags AS bigint) AS miami_bags,
|
||||
TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags,
|
||||
TRY_CAST(new_york_bags AS bigint) AS new_york_bags,
|
||||
TRY_CAST(total_bags AS bigint) AS total_bags,
|
||||
|
||||
filename AS source_file,
|
||||
|
||||
hash(report_date, age_bucket, total_bags) AS hkey
|
||||
FROM raw.ice_aging_stocks
|
||||
WHERE TRY_CAST(report_date AS date) IS NOT NULL
|
||||
AND age_bucket IS NOT NULL
|
||||
AND age_bucket != ''
|
||||
),
|
||||
|
||||
deduplicated AS (
|
||||
SELECT
|
||||
any_value(report_date) AS report_date,
|
||||
any_value(age_bucket) AS age_bucket,
|
||||
any_value(antwerp_bags) AS antwerp_bags,
|
||||
any_value(hamburg_bremen_bags) AS hamburg_bremen_bags,
|
||||
any_value(houston_bags) AS houston_bags,
|
||||
any_value(miami_bags) AS miami_bags,
|
||||
any_value(new_orleans_bags) AS new_orleans_bags,
|
||||
any_value(new_york_bags) AS new_york_bags,
|
||||
any_value(total_bags) AS total_bags,
|
||||
any_value(source_file) AS source_file,
|
||||
hkey
|
||||
FROM cast_and_clean
|
||||
GROUP BY hkey
|
||||
)
|
||||
|
||||
SELECT *
|
||||
FROM deduplicated
|
||||
WHERE report_date BETWEEN @start_ds AND @end_ds
|
||||
@@ -0,0 +1,60 @@
|
||||
-- Foundation fact: ICE historical end-of-month Coffee C certified warehouse stocks by port.
|
||||
--
|
||||
-- Covers November 1996 to present (30-year history). Casts raw varchar columns
|
||||
-- to proper types and deduplicates via hash key.
|
||||
--
|
||||
-- Grain: one row per report_date (end-of-month).
|
||||
-- Port columns are in bags (60kg).
|
||||
|
||||
MODEL (
|
||||
name foundation.fct_ice_warehouse_stocks_by_port,
|
||||
kind INCREMENTAL_BY_TIME_RANGE (
|
||||
time_column report_date
|
||||
),
|
||||
grain (report_date),
|
||||
start '1996-11-01',
|
||||
cron '@daily'
|
||||
);
|
||||
|
||||
WITH cast_and_clean AS (
|
||||
SELECT
|
||||
TRY_CAST(report_date AS date) AS report_date,
|
||||
TRY_CAST(new_york_bags AS bigint) AS new_york_bags,
|
||||
TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags,
|
||||
TRY_CAST(houston_bags AS bigint) AS houston_bags,
|
||||
TRY_CAST(miami_bags AS bigint) AS miami_bags,
|
||||
TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags,
|
||||
TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags,
|
||||
TRY_CAST(barcelona_bags AS bigint) AS barcelona_bags,
|
||||
TRY_CAST(virginia_bags AS bigint) AS virginia_bags,
|
||||
TRY_CAST(total_bags AS bigint) AS total_bags,
|
||||
|
||||
filename AS source_file,
|
||||
|
||||
hash(report_date, total_bags) AS hkey
|
||||
FROM raw.ice_warehouse_stocks_by_port
|
||||
WHERE TRY_CAST(report_date AS date) IS NOT NULL
|
||||
AND TRY_CAST(total_bags AS bigint) IS NOT NULL
|
||||
),
|
||||
|
||||
deduplicated AS (
|
||||
SELECT
|
||||
any_value(report_date) AS report_date,
|
||||
any_value(new_york_bags) AS new_york_bags,
|
||||
any_value(new_orleans_bags) AS new_orleans_bags,
|
||||
any_value(houston_bags) AS houston_bags,
|
||||
any_value(miami_bags) AS miami_bags,
|
||||
any_value(antwerp_bags) AS antwerp_bags,
|
||||
any_value(hamburg_bremen_bags) AS hamburg_bremen_bags,
|
||||
any_value(barcelona_bags) AS barcelona_bags,
|
||||
any_value(virginia_bags) AS virginia_bags,
|
||||
any_value(total_bags) AS total_bags,
|
||||
any_value(source_file) AS source_file,
|
||||
hkey
|
||||
FROM cast_and_clean
|
||||
GROUP BY hkey
|
||||
)
|
||||
|
||||
SELECT *
|
||||
FROM deduplicated
|
||||
WHERE report_date BETWEEN @start_ds AND @end_ds
|
||||
49
transform/sqlmesh_materia/models/raw/ice_aging_stocks.sql
Normal file
49
transform/sqlmesh_materia/models/raw/ice_aging_stocks.sql
Normal file
@@ -0,0 +1,49 @@
|
||||
-- Raw ICE certified stock aging report — technical ingestion layer.
|
||||
--
|
||||
-- Reads monthly aging report gzip CSVs from the landing directory.
|
||||
-- All values are varchar; casting happens in foundation.fct_ice_aging_stocks.
|
||||
--
|
||||
-- Source: ICE Report Center (Certified Stock Aging Report)
|
||||
-- Coverage: varies by download history
|
||||
-- Frequency: monthly (ICE updates after each delivery month)
|
||||
|
||||
MODEL (
|
||||
name raw.ice_aging_stocks,
|
||||
kind FULL,
|
||||
cron '@daily',
|
||||
columns (
|
||||
report_date varchar,
|
||||
age_bucket varchar,
|
||||
antwerp_bags varchar,
|
||||
hamburg_bremen_bags varchar,
|
||||
houston_bags varchar,
|
||||
miami_bags varchar,
|
||||
new_orleans_bags varchar,
|
||||
new_york_bags varchar,
|
||||
total_bags varchar,
|
||||
filename varchar
|
||||
)
|
||||
);
|
||||
|
||||
SELECT
|
||||
report_date,
|
||||
age_bucket,
|
||||
antwerp_bags,
|
||||
hamburg_bremen_bags,
|
||||
houston_bags,
|
||||
miami_bags,
|
||||
new_orleans_bags,
|
||||
new_york_bags,
|
||||
total_bags,
|
||||
filename
|
||||
FROM read_csv(
|
||||
@ice_aging_glob(),
|
||||
delim = ',',
|
||||
encoding = 'utf-8',
|
||||
compression = 'gzip',
|
||||
header = true,
|
||||
union_by_name = true,
|
||||
filename = true,
|
||||
all_varchar = true,
|
||||
ignore_errors = true
|
||||
)
|
||||
@@ -0,0 +1,51 @@
|
||||
-- Raw ICE historical end-of-month warehouse stocks by port — technical ingestion layer.
|
||||
--
|
||||
-- Reads historical by-port stock gzip CSVs from the landing directory.
|
||||
-- All values are varchar; casting happens in foundation.fct_ice_warehouse_stocks_by_port.
|
||||
--
|
||||
-- Source: ICE (EOM_KC_cert_stox_by_port_nov96-present.xls)
|
||||
-- Coverage: November 1996 to present
|
||||
-- Frequency: monthly (ICE updates the static file monthly)
|
||||
|
||||
MODEL (
|
||||
name raw.ice_warehouse_stocks_by_port,
|
||||
kind FULL,
|
||||
cron '@daily',
|
||||
columns (
|
||||
report_date varchar,
|
||||
new_york_bags varchar,
|
||||
new_orleans_bags varchar,
|
||||
houston_bags varchar,
|
||||
miami_bags varchar,
|
||||
antwerp_bags varchar,
|
||||
hamburg_bremen_bags varchar,
|
||||
barcelona_bags varchar,
|
||||
virginia_bags varchar,
|
||||
total_bags varchar,
|
||||
filename varchar
|
||||
)
|
||||
);
|
||||
|
||||
SELECT
|
||||
report_date,
|
||||
new_york_bags,
|
||||
new_orleans_bags,
|
||||
houston_bags,
|
||||
miami_bags,
|
||||
antwerp_bags,
|
||||
hamburg_bremen_bags,
|
||||
barcelona_bags,
|
||||
virginia_bags,
|
||||
total_bags,
|
||||
filename
|
||||
FROM read_csv(
|
||||
@ice_stocks_by_port_glob(),
|
||||
delim = ',',
|
||||
encoding = 'utf-8',
|
||||
compression = 'gzip',
|
||||
header = true,
|
||||
union_by_name = true,
|
||||
filename = true,
|
||||
all_varchar = true,
|
||||
ignore_errors = true
|
||||
)
|
||||
26
uv.lock
generated
26
uv.lock
generated
@@ -1060,10 +1060,14 @@ version = "0.1.0"
|
||||
source = { editable = "extract/ice_stocks" }
|
||||
dependencies = [
|
||||
{ name = "niquests" },
|
||||
{ name = "xlrd" },
|
||||
]
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [{ name = "niquests", specifier = ">=3.14.1" }]
|
||||
requires-dist = [
|
||||
{ name = "niquests", specifier = ">=3.14.1" },
|
||||
{ name = "xlrd", specifier = ">=2.0.1" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "identify"
|
||||
@@ -1558,6 +1562,7 @@ dev = [
|
||||
{ name = "pytest-cov" },
|
||||
{ name = "pyyaml" },
|
||||
{ name = "ruff" },
|
||||
{ name = "xlwt" },
|
||||
]
|
||||
exploration = [
|
||||
{ name = "ipykernel" },
|
||||
@@ -1584,6 +1589,7 @@ dev = [
|
||||
{ name = "pytest-cov", specifier = ">=7.0.0" },
|
||||
{ name = "pyyaml", specifier = ">=6.0.2" },
|
||||
{ name = "ruff", specifier = ">=0.9.9" },
|
||||
{ name = "xlwt", specifier = ">=1.3.0" },
|
||||
]
|
||||
exploration = [{ name = "ipykernel", specifier = ">=6.29.5" }]
|
||||
|
||||
@@ -3591,6 +3597,24 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/a4/f5/10b68b7b1544245097b2a1b8238f66f2fc6dcaeb24ba5d917f52bd2eed4f/wsproto-1.3.2-py3-none-any.whl", hash = "sha256:61eea322cdf56e8cc904bd3ad7573359a242ba65688716b0710a5eb12beab584", size = 24405, upload-time = "2025-11-20T18:18:00.454Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "xlrd"
|
||||
version = "2.0.2"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/07/5a/377161c2d3538d1990d7af382c79f3b2372e880b65de21b01b1a2b78691e/xlrd-2.0.2.tar.gz", hash = "sha256:08b5e25de58f21ce71dc7db3b3b8106c1fa776f3024c54e45b45b374e89234c9", size = 100167, upload-time = "2025-06-14T08:46:39.039Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/1a/62/c8d562e7766786ba6587d09c5a8ba9f718ed3fa8af7f4553e8f91c36f302/xlrd-2.0.2-py2.py3-none-any.whl", hash = "sha256:ea762c3d29f4cca48d82df517b6d89fbce4db3107f9d78713e48cd321d5c9aa9", size = 96555, upload-time = "2025-06-14T08:46:37.766Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "xlwt"
|
||||
version = "1.3.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/06/97/56a6f56ce44578a69343449aa5a0d98eefe04085d69da539f3034e2cd5c1/xlwt-1.3.0.tar.gz", hash = "sha256:c59912717a9b28f1a3c2a98fd60741014b06b043936dcecbc113eaaada156c88", size = 153929, upload-time = "2017-08-22T06:47:16.498Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/44/48/def306413b25c3d01753603b1a222a011b8621aed27cd7f89cbc27e6b0f4/xlwt-1.3.0-py2.py3-none-any.whl", hash = "sha256:a082260524678ba48a297d922cc385f58278b8aa68741596a87de01a9c628b2e", size = 99981, upload-time = "2017-08-22T06:47:15.281Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "yfinance"
|
||||
version = "1.2.0"
|
||||
|
||||
Reference in New Issue
Block a user