feat: extraction framework overhaul — extract_core shared package + SQLite state tracking

- Add extract/extract_core/ workspace package with three modules:
  - state.py: SQLite run tracking (open_state_db, start_run, end_run, get_last_cursor)
  - http.py: niquests session factory + etag normalization helpers
  - files.py: landing_path, content_hash, write_bytes_atomic (atomic gzip writes)
- State lives at {LANDING_DIR}/.state.sqlite — no extra env var needed
- SQLite chosen over DuckDB: state tracking is OLTP (row inserts/updates), not analytical
- Refactor all 4 extractors (psdonline, cftc_cot, coffee_prices, ice_stocks):
  - Replace inline boilerplate with extract_core helpers
  - Add start_run/end_run tracking to every extraction entry point
  - extract_cot_year returns int (bytes_written) instead of bool
- Update tests: assert result == 0 (not `is False`) for the return type change

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-22 14:37:50 +01:00
parent fc4121183c
commit 80c1163a7f
16 changed files with 702 additions and 290 deletions

View File

@@ -4,6 +4,7 @@ version = "0.1.0"
description = "CFTC Commitment of Traders data extractor" description = "CFTC Commitment of Traders data extractor"
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = [ dependencies = [
"extract_core",
"niquests>=3.14.1", "niquests>=3.14.1",
] ]

View File

@@ -10,12 +10,20 @@ Landing path: LANDING_DIR/cot/{year}/{etag}.csv.gzip
import logging import logging
import os import os
import pathlib
import sys import sys
from datetime import datetime from datetime import datetime
from io import BytesIO from io import BytesIO
from pathlib import Path
import niquests import niquests
from extract_core import (
end_run,
landing_path,
normalize_etag,
open_state_db,
start_run,
write_bytes_atomic,
)
from .normalize import find_csv_inner_filename, normalize_zipped_csv from .normalize import find_csv_inner_filename, normalize_zipped_csv
@@ -27,7 +35,7 @@ logging.basicConfig(
) )
logger = logging.getLogger("CFTC COT Extractor") logger = logging.getLogger("CFTC COT Extractor")
LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing"))
# CFTC publishes yearly ZIPs for the disaggregated futures-only report. # CFTC publishes yearly ZIPs for the disaggregated futures-only report.
# The file for the current year is updated each Friday at 3:30 PM ET. # The file for the current year is updated each Friday at 3:30 PM ET.
@@ -52,10 +60,10 @@ def _synthetic_etag(year: int, headers: dict) -> str:
return etag return etag
def extract_cot_year(year: int, http_session: niquests.Session) -> bool: def extract_cot_year(year: int, http_session: niquests.Session) -> int:
"""Download and store COT data for a single year. """Download and store COT data for a single year.
Returns True if a new file was written, False if skipped or unavailable. Returns bytes_written (0 if skipped or unavailable).
""" """
url = COT_URL_TEMPLATE.format(year=year) url = COT_URL_TEMPLATE.format(year=year)
logger.info(f"Checking COT data for {year}: {url}") logger.info(f"Checking COT data for {year}: {url}")
@@ -63,20 +71,20 @@ def extract_cot_year(year: int, http_session: niquests.Session) -> bool:
head = http_session.head(url, timeout=HTTP_TIMEOUT_SECONDS) head = http_session.head(url, timeout=HTTP_TIMEOUT_SECONDS)
if head.status_code == 404: if head.status_code == 404:
logger.info(f"Year {year} not available (404) — skipping") logger.info(f"Year {year} not available (404) — skipping")
return False return 0
assert head.status_code == 200, ( assert head.status_code == 200, (
f"Unexpected HEAD status {head.status_code} for {url}" f"Unexpected HEAD status {head.status_code} for {url}"
) )
raw_etag = head.headers.get("etag", "") raw_etag = head.headers.get("etag", "")
etag = raw_etag.replace('"', "").replace(":", "_") if raw_etag else _synthetic_etag(year, head.headers) etag = normalize_etag(raw_etag) if raw_etag else _synthetic_etag(year, head.headers)
dest_dir = LANDING_DIR / "cot" / str(year) dest_dir = landing_path(LANDING_DIR, "cot", str(year))
local_file = dest_dir / f"{etag}.csv.gzip" local_file = dest_dir / f"{etag}.csv.gzip"
if local_file.exists(): if local_file.exists():
logger.info(f"Year {year}: {etag}.csv.gzip already exists, skipping") logger.info(f"Year {year}: {etag}.csv.gzip already exists, skipping")
return False return 0
logger.info(f"Downloading COT data for {year}...") logger.info(f"Downloading COT data for {year}...")
response = http_session.get(url, timeout=HTTP_TIMEOUT_SECONDS) response = http_session.get(url, timeout=HTTP_TIMEOUT_SECONDS)
@@ -89,14 +97,11 @@ def extract_cot_year(year: int, http_session: niquests.Session) -> bool:
inner_filename = find_csv_inner_filename(BytesIO(response.content)) inner_filename = find_csv_inner_filename(BytesIO(response.content))
normalized = normalize_zipped_csv(zip_buffer, inner_filename) normalized = normalize_zipped_csv(zip_buffer, inner_filename)
dest_dir.mkdir(parents=True, exist_ok=True) bytes_written = write_bytes_atomic(local_file, normalized.read())
local_file.write_bytes(normalized.read())
assert local_file.exists(), f"File was not written: {local_file}"
assert local_file.stat().st_size > 0, f"Written file is empty: {local_file}" assert local_file.stat().st_size > 0, f"Written file is empty: {local_file}"
logger.info(f"Stored {local_file} ({local_file.stat().st_size:,} bytes)") logger.info(f"Stored {local_file} ({bytes_written:,} bytes)")
return True return bytes_written
def extract_cot_dataset(): def extract_cot_dataset():
@@ -113,16 +118,36 @@ def extract_cot_dataset():
f"Year range {len(years)} exceeds MAX_YEARS={MAX_YEARS}" f"Year range {len(years)} exceeds MAX_YEARS={MAX_YEARS}"
) )
new_count = 0 conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, "cftc_cot")
files_written = 0
files_skipped = 0
bytes_written_total = 0
try:
with niquests.Session() as session: with niquests.Session() as session:
for year in years: for year in years:
try: try:
if extract_cot_year(year, session): result = extract_cot_year(year, session)
new_count += 1 if result > 0:
files_written += 1
bytes_written_total += result
else:
files_skipped += 1
except Exception: except Exception:
logger.exception(f"Failed to extract COT data for {year}, continuing") logger.exception(f"Failed to extract COT data for {year}, continuing")
logger.info(f"COT extraction complete: {new_count} new file(s) downloaded") logger.info(f"COT extraction complete: {files_written} new file(s) downloaded")
end_run(
conn, run_id, status="success",
files_written=files_written, files_skipped=files_skipped,
bytes_written=bytes_written_total,
cursor_value=str(current_year),
)
except Exception as e:
end_run(conn, run_id, status="failed", error_message=str(e))
raise
finally:
conn.close()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -4,6 +4,7 @@ version = "0.1.0"
description = "KC=F Coffee C futures price extractor" description = "KC=F Coffee C futures price extractor"
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = [ dependencies = [
"extract_core",
"yfinance>=0.2.55", "yfinance>=0.2.55",
] ]

View File

@@ -8,14 +8,15 @@ Landing path: LANDING_DIR/prices/coffee_kc/{hash8}.csv.gzip
""" """
import gzip import gzip
import hashlib
import io import io
import logging import logging
import os import os
import pathlib
import sys import sys
from pathlib import Path
import yfinance as yf import yfinance as yf
from extract_core import content_hash, end_run, landing_path, open_state_db, start_run
from extract_core import write_bytes_atomic
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -25,7 +26,7 @@ logging.basicConfig(
) )
logger = logging.getLogger("Coffee Prices Extractor") logger = logging.getLogger("Coffee Prices Extractor")
LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing"))
TICKER = "KC=F" TICKER = "KC=F"
DEST_SUBDIR = "prices/coffee_kc" DEST_SUBDIR = "prices/coffee_kc"
@@ -40,6 +41,9 @@ def extract_coffee_prices() -> None:
On first run downloads full history (period='max'). On subsequent runs On first run downloads full history (period='max'). On subsequent runs
the hash matches if no new trading days have closed since last run. the hash matches if no new trading days have closed since last run.
""" """
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, "coffee_prices")
try:
logger.info(f"Downloading {TICKER} daily OHLCV from Yahoo Finance...") logger.info(f"Downloading {TICKER} daily OHLCV from Yahoo Finance...")
ticker = yf.Ticker(TICKER) ticker = yf.Ticker(TICKER)
@@ -64,28 +68,27 @@ def extract_coffee_prices() -> None:
assert len(csv_bytes) > 0, "CSV serialization produced empty output" assert len(csv_bytes) > 0, "CSV serialization produced empty output"
# Hash-based idempotency key (first 8 hex chars of SHA256) etag = content_hash(csv_bytes)
sha256 = hashlib.sha256(csv_bytes).hexdigest() dest_dir = landing_path(LANDING_DIR, DEST_SUBDIR)
etag = sha256[:8]
dest_dir = LANDING_DIR / DEST_SUBDIR
local_file = dest_dir / f"{etag}.csv.gzip" local_file = dest_dir / f"{etag}.csv.gzip"
if local_file.exists(): if local_file.exists():
logger.info(f"File {local_file.name} already exists — no new data, skipping") logger.info(f"File {local_file.name} already exists — no new data, skipping")
end_run(conn, run_id, status="success", files_skipped=1)
return return
# Compress and write
dest_dir.mkdir(parents=True, exist_ok=True)
compressed = gzip.compress(csv_bytes) compressed = gzip.compress(csv_bytes)
local_file.write_bytes(compressed) bytes_written = write_bytes_atomic(local_file, compressed)
assert local_file.exists(), f"File was not written: {local_file}"
assert local_file.stat().st_size > 0, f"Written file is empty: {local_file}"
logger.info( logger.info(
f"Stored {local_file} ({local_file.stat().st_size:,} bytes, {len(df):,} rows)" f"Stored {local_file} ({bytes_written:,} bytes, {len(df):,} rows)"
) )
end_run(conn, run_id, status="success", files_written=1, bytes_written=bytes_written)
except Exception as e:
end_run(conn, run_id, status="failed", error_message=str(e))
raise
finally:
conn.close()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1,15 @@
[project]
name = "extract_core"
version = "0.1.0"
description = "Shared extraction utilities: SQLite state tracking, HTTP helpers, file I/O"
requires-python = ">=3.13"
dependencies = [
"niquests>=3.14.1",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/extract_core"]

View File

@@ -0,0 +1,17 @@
from .files import content_hash, landing_path, write_bytes_atomic
from .http import create_session, head_etag, normalize_etag
from .state import end_run, get_last_cursor, get_recent_runs, open_state_db, start_run
__all__ = [
"open_state_db",
"start_run",
"end_run",
"get_last_cursor",
"get_recent_runs",
"create_session",
"head_etag",
"normalize_etag",
"landing_path",
"content_hash",
"write_bytes_atomic",
]

View File

@@ -0,0 +1,53 @@
"""Landing zone file I/O helpers for extraction pipelines."""
import hashlib
from pathlib import Path
def landing_path(landing_dir: str | Path, *parts: str) -> Path:
"""Return path to a subdirectory of landing_dir, creating it if absent.
Example:
dest_dir = landing_path("data/landing", "psd", "2025", "07")
# Returns Path("data/landing/psd/2025/07"), all directories created.
Use this instead of manually calling mkdir — it makes the intent clear
and ensures the directory always exists before the caller writes to it.
"""
assert landing_dir, "landing_dir must not be empty"
path = Path(landing_dir).joinpath(*parts)
path.mkdir(parents=True, exist_ok=True)
return path
def content_hash(data: bytes, prefix_bytes: int = 8) -> str:
"""Return the first prefix_bytes hex chars of the SHA256 digest of data.
Used as a short idempotency key for content-addressed filenames. Two runs
that produce identical data will produce the same hash, so the file already
existing on disk is proof the content is unchanged.
prefix_bytes=8 gives 32-bit collision resistance — sufficient for a few
thousand files per extractor. Increase to 16 for very high-volume sources.
"""
assert data, "data must not be empty"
assert 1 <= prefix_bytes <= 64, f"prefix_bytes must be 1-64, got {prefix_bytes}"
return hashlib.sha256(data).hexdigest()[:prefix_bytes]
def write_bytes_atomic(path: Path, data: bytes) -> int:
"""Write data to path atomically via a .tmp sibling file.
Writes to {path}.tmp first, then renames to path. On Linux, rename() is
atomic when src and dst are on the same filesystem — a reader never sees
a partial file. Returns the number of bytes written.
Callers are responsible for compression (e.g. gzip.compress) before calling
this function.
"""
assert data, "data must not be empty"
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_bytes(data)
tmp.rename(path)
assert path.exists(), f"File was not written: {path}"
return len(data)

View File

@@ -0,0 +1,43 @@
"""HTTP session factory and etag helpers for extraction pipelines."""
import niquests
def create_session(timeout_seconds: int = 60, max_retries: int = 3) -> niquests.Session:
"""Return a new niquests Session.
timeout_seconds is stored as an attribute so callers that need a consistent
timeout can reference it via session.timeout_seconds.
"""
assert timeout_seconds > 0, f"timeout_seconds must be positive, got {timeout_seconds}"
assert max_retries >= 0, f"max_retries must be non-negative, got {max_retries}"
session = niquests.Session()
session.timeout_seconds = timeout_seconds # type: ignore[attr-defined]
return session
def normalize_etag(raw: str) -> str:
"""Normalize an HTTP etag for use as a filename component.
Strips surrounding quotes and replaces colons with underscores so the
result is safe as a filename on all platforms.
Example: '"abc:def"''abc_def'
"""
assert raw, "raw etag must not be empty"
return raw.strip().strip('"').replace(":", "_")
def head_etag(session: niquests.Session, url: str, timeout_seconds: int = 60) -> str | None:
"""Send a HEAD request and return the normalized etag, or None if absent.
Returns None (not raises) when the server omits the etag header, so callers
can fall back to a synthetic key (e.g. content-length + last-modified).
"""
assert url, "url must not be empty"
assert timeout_seconds > 0, f"timeout_seconds must be positive, got {timeout_seconds}"
response = session.head(url, timeout=timeout_seconds)
raw = response.headers.get("etag", "")
if not raw:
return None
return normalize_etag(raw)

View File

@@ -0,0 +1,122 @@
"""SQLite-backed extraction run state.
State table lives at {LANDING_DIR}/.state.sqlite — derived from LANDING_DIR,
no extra env var required. SQLite is used (not DuckDB) because state tracking
is a transactional workload: single-row inserts and point-lookup updates, not
analytical scans. WAL mode allows concurrent readers while a run is in progress.
Schema is generic: extractor is a free-text name, cursor_value is a TEXT field
that can hold any cursor type (date string, etag, page number, or JSON for
multi-dimensional cursors). The schema should never need changing — add new
extractor names by just using them.
"""
import sqlite3
from pathlib import Path
_CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS extraction_runs (
run_id INTEGER PRIMARY KEY AUTOINCREMENT,
extractor TEXT NOT NULL,
started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
finished_at TEXT,
status TEXT NOT NULL DEFAULT 'running',
files_written INTEGER DEFAULT 0,
files_skipped INTEGER DEFAULT 0,
bytes_written INTEGER DEFAULT 0,
cursor_value TEXT,
error_message TEXT
)
"""
def open_state_db(landing_dir: str | Path) -> sqlite3.Connection:
"""Open (or create) the state DB at {landing_dir}/.state.sqlite.
Enables WAL mode so concurrent readers (e.g. monitoring scripts) can query
the DB while an extraction run is in progress. Caller must close when done.
"""
assert landing_dir, "landing_dir must not be empty"
db_path = Path(landing_dir) / ".state.sqlite"
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(_CREATE_TABLE_SQL)
conn.commit()
return conn
def start_run(conn: sqlite3.Connection, extractor: str) -> int:
"""Insert a 'running' row for extractor. Returns the new run_id."""
assert extractor, "extractor name must not be empty"
cur = conn.execute(
"INSERT INTO extraction_runs (extractor, status) VALUES (?, 'running')",
(extractor,),
)
conn.commit()
assert cur.lastrowid is not None, "INSERT did not return a row ID"
return cur.lastrowid
def end_run(
conn: sqlite3.Connection,
run_id: int,
*,
status: str,
files_written: int = 0,
files_skipped: int = 0,
bytes_written: int = 0,
cursor_value: str | None = None,
error_message: str | None = None,
) -> None:
"""Update the run row to its final state (success or failed)."""
assert status in ("success", "failed"), f"status must be 'success' or 'failed', got {status!r}"
assert run_id > 0, f"run_id must be positive, got {run_id}"
assert files_written >= 0, f"files_written must be non-negative, got {files_written}"
assert files_skipped >= 0, f"files_skipped must be non-negative, got {files_skipped}"
assert bytes_written >= 0, f"bytes_written must be non-negative, got {bytes_written}"
conn.execute(
"""
UPDATE extraction_runs
SET finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
status = ?,
files_written = ?,
files_skipped = ?,
bytes_written = ?,
cursor_value = ?,
error_message = ?
WHERE run_id = ?
""",
(status, files_written, files_skipped, bytes_written, cursor_value, error_message, run_id),
)
conn.commit()
def get_last_cursor(conn: sqlite3.Connection, extractor: str) -> str | None:
"""Return the cursor_value from the most recent successful run, or None."""
row = conn.execute(
"""
SELECT cursor_value FROM extraction_runs
WHERE extractor = ? AND status = 'success' AND cursor_value IS NOT NULL
ORDER BY run_id DESC
LIMIT 1
""",
(extractor,),
).fetchone()
return row["cursor_value"] if row else None
def get_recent_runs(conn: sqlite3.Connection, extractor: str, limit: int = 10) -> list[dict]:
"""Return the most recent runs for an extractor, newest first."""
assert limit > 0, f"limit must be positive, got {limit}"
rows = conn.execute(
"""
SELECT * FROM extraction_runs
WHERE extractor = ?
ORDER BY run_id DESC
LIMIT ?
""",
(extractor, limit),
).fetchall()
return [dict(r) for r in rows]

View File

@@ -4,6 +4,7 @@ version = "0.1.0"
description = "ICE certified warehouse stocks extractor" description = "ICE certified warehouse stocks extractor"
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = [ dependencies = [
"extract_core",
"niquests>=3.14.1", "niquests>=3.14.1",
"xlrd>=2.0.1", "xlrd>=2.0.1",
] ]

View File

@@ -20,16 +20,23 @@ CSV schemas:
import csv import csv
import gzip import gzip
import hashlib
import io import io
import logging import logging
import os import os
import pathlib
import sys import sys
from datetime import datetime from datetime import datetime
from pathlib import Path
import niquests import niquests
import xlrd import xlrd
from extract_core import (
content_hash,
end_run,
landing_path,
open_state_db,
start_run,
write_bytes_atomic,
)
from ice_stocks.ice_api import find_all_reports, find_latest_report from ice_stocks.ice_api import find_all_reports, find_latest_report
from ice_stocks.xls_parse import OLE2_MAGIC, detect_file_format, xls_to_rows from ice_stocks.xls_parse import OLE2_MAGIC, detect_file_format, xls_to_rows
@@ -42,7 +49,7 @@ logging.basicConfig(
) )
logger = logging.getLogger("ICE Stocks Extractor") logger = logging.getLogger("ICE Stocks Extractor")
LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing"))
# ── ice_stocks (daily rolling) ────────────────────────────────────────────── # ── ice_stocks (daily rolling) ──────────────────────────────────────────────
DEST_SUBDIR = "ice_stocks" DEST_SUBDIR = "ice_stocks"
@@ -105,31 +112,30 @@ HISTORICAL_PORT_COLS = [
# ── shared helpers ─────────────────────────────────────────────────────────── # ── shared helpers ───────────────────────────────────────────────────────────
def _write_landing_file(canonical_csv: bytes, dest_subdir: str, date_label: str) -> None: def _write_landing_file(canonical_csv: bytes, dest_subdir: str, date_label: str) -> int:
"""SHA256-hash canonical_csv, skip if exists, else gzip and write.""" """SHA256-hash canonical_csv, skip if exists, else gzip and write atomically.
Returns bytes_written (0 if skipped).
"""
assert canonical_csv, "canonical_csv must not be empty" assert canonical_csv, "canonical_csv must not be empty"
assert dest_subdir, "dest_subdir must not be empty" assert dest_subdir, "dest_subdir must not be empty"
assert date_label, "date_label must not be empty" assert date_label, "date_label must not be empty"
sha256 = hashlib.sha256(canonical_csv).hexdigest() etag = content_hash(canonical_csv)
etag = sha256[:8]
year = date_label[:4] year = date_label[:4]
dest_dir = LANDING_DIR / dest_subdir / year dest_dir = landing_path(LANDING_DIR, dest_subdir, year)
local_file = dest_dir / f"{date_label}_{etag}.csv.gzip" local_file = dest_dir / f"{date_label}_{etag}.csv.gzip"
if local_file.exists(): if local_file.exists():
logger.info(f"File {local_file.name} already exists — content unchanged, skipping") logger.info(f"File {local_file.name} already exists — content unchanged, skipping")
return return 0
dest_dir.mkdir(parents=True, exist_ok=True)
compressed = gzip.compress(canonical_csv) compressed = gzip.compress(canonical_csv)
local_file.write_bytes(compressed) bytes_written = write_bytes_atomic(local_file, compressed)
assert local_file.exists(), f"File was not written: {local_file}" logger.info(f"Stored {local_file} ({bytes_written:,} bytes)")
assert local_file.stat().st_size > 0, f"Written file is empty: {local_file}" return bytes_written
logger.info(f"Stored {local_file} ({local_file.stat().st_size:,} bytes)")
def _build_csv_bytes(fieldnames: list[str], rows: list[dict]) -> bytes: def _build_csv_bytes(fieldnames: list[str], rows: list[dict]) -> bytes:
@@ -243,6 +249,9 @@ def extract_ice_stocks() -> None:
discovery to find the latest 'Daily Warehouse Stocks' report. discovery to find the latest 'Daily Warehouse Stocks' report.
Idempotent: skips if content hash already on disk. Idempotent: skips if content hash already on disk.
""" """
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, "ice_stocks")
try:
with niquests.Session() as session: with niquests.Session() as session:
logger.info(f"Trying ICE rolling CSV: {ICE_ROLLING_CSV_URL}") logger.info(f"Trying ICE rolling CSV: {ICE_ROLLING_CSV_URL}")
try: try:
@@ -258,16 +267,19 @@ def extract_ice_stocks() -> None:
report = find_latest_report(session, ICE_STOCKS_LABEL) report = find_latest_report(session, ICE_STOCKS_LABEL)
if not report: if not report:
logger.error("ICE API: no 'Daily Warehouse Stocks' report found") logger.error("ICE API: no 'Daily Warehouse Stocks' report found")
end_run(conn, run_id, status="failed", error_message="No report found via API")
return return
logger.info(f"Found report via API: {report['download_label']} ({report['publish_date']})") logger.info(f"Found report via API: {report['download_label']} ({report['publish_date']})")
try: try:
response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS) response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS)
except Exception as e: except Exception as e:
logger.error(f"Failed to download report from API URL: {e}") logger.error(f"Failed to download report from API URL: {e}")
end_run(conn, run_id, status="failed", error_message=str(e))
return return
if response.status_code != 200: if response.status_code != 200:
logger.error(f"Unexpected status {response.status_code}") logger.error(f"Unexpected status {response.status_code}")
end_run(conn, run_id, status="failed", error_message=f"HTTP {response.status_code}")
return return
assert len(response.content) > 0, "Downloaded empty file from ICE" assert len(response.content) > 0, "Downloaded empty file from ICE"
@@ -280,10 +292,23 @@ def extract_ice_stocks() -> None:
if not canonical_csv: if not canonical_csv:
logger.warning("ICE stocks parsed to 0 rows — check column mapping or XLS structure") logger.warning("ICE stocks parsed to 0 rows — check column mapping or XLS structure")
end_run(conn, run_id, status="failed", error_message="Parsed 0 rows")
return return
today = datetime.now().strftime("%Y-%m-%d") today = datetime.now().strftime("%Y-%m-%d")
_write_landing_file(canonical_csv, DEST_SUBDIR, today) bytes_written = _write_landing_file(canonical_csv, DEST_SUBDIR, today)
end_run(
conn, run_id, status="success",
files_written=1 if bytes_written > 0 else 0,
files_skipped=1 if bytes_written == 0 else 0,
bytes_written=bytes_written,
cursor_value=today,
)
except Exception as e:
end_run(conn, run_id, status="failed", error_message=str(e))
raise
finally:
conn.close()
# ── ice_aging (monthly aging report) ──────────────────────────────────────── # ── ice_aging (monthly aging report) ────────────────────────────────────────
@@ -309,11 +334,15 @@ def extract_ice_aging() -> None:
Monthly report: stock quantities by age bucket × port. Monthly report: stock quantities by age bucket × port.
Idempotent: skips if content hash already on disk. Idempotent: skips if content hash already on disk.
""" """
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, "ice_aging")
try:
with niquests.Session() as session: with niquests.Session() as session:
logger.info("Fetching latest ICE Aging Report via API") logger.info("Fetching latest ICE Aging Report via API")
report = find_latest_report(session, ICE_AGING_LABEL) report = find_latest_report(session, ICE_AGING_LABEL)
if not report: if not report:
logger.error(f"ICE API: no report matching {ICE_AGING_LABEL!r}") logger.error(f"ICE API: no report matching {ICE_AGING_LABEL!r}")
end_run(conn, run_id, status="failed", error_message="No aging report found via API")
return return
logger.info(f"Downloading: {report['download_label']} ({report['publish_date']})") logger.info(f"Downloading: {report['download_label']} ({report['publish_date']})")
@@ -321,6 +350,7 @@ def extract_ice_aging() -> None:
response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS) response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS)
except Exception as e: except Exception as e:
logger.error(f"Failed to download aging report: {e}") logger.error(f"Failed to download aging report: {e}")
end_run(conn, run_id, status="failed", error_message=str(e))
return return
assert response.status_code == 200, f"HTTP {response.status_code}" assert response.status_code == 200, f"HTTP {response.status_code}"
@@ -330,7 +360,9 @@ def extract_ice_aging() -> None:
report_date = _parse_aging_date(str(rows[0][0]) if rows else "") report_date = _parse_aging_date(str(rows[0][0]) if rows else "")
if not report_date: if not report_date:
logger.error(f"Could not parse aging report date from row 0: {rows[0] if rows else '(empty)'!r}") msg = f"Could not parse aging report date from row 0: {rows[0] if rows else '(empty)'!r}"
logger.error(msg)
end_run(conn, run_id, status="failed", error_message=msg)
return return
# Row 3+ are data rows; stop at row labelled "Total" # Row 3+ are data rows; stop at row labelled "Total"
@@ -364,10 +396,23 @@ def extract_ice_aging() -> None:
if not data_rows: if not data_rows:
logger.warning("Aging report parsed to 0 data rows") logger.warning("Aging report parsed to 0 data rows")
end_run(conn, run_id, status="failed", error_message="Parsed 0 data rows")
return return
canonical_csv = _build_csv_bytes(fieldnames, data_rows) canonical_csv = _build_csv_bytes(fieldnames, data_rows)
_write_landing_file(canonical_csv, AGING_DEST_SUBDIR, report_date) bytes_written = _write_landing_file(canonical_csv, AGING_DEST_SUBDIR, report_date)
end_run(
conn, run_id, status="success",
files_written=1 if bytes_written > 0 else 0,
files_skipped=1 if bytes_written == 0 else 0,
bytes_written=bytes_written,
cursor_value=report_date,
)
except Exception as e:
end_run(conn, run_id, status="failed", error_message=str(e))
raise
finally:
conn.close()
# ── ice_stocks_by_port (historical) ───────────────────────────────────────── # ── ice_stocks_by_port (historical) ─────────────────────────────────────────
@@ -387,6 +432,9 @@ def extract_ice_historical() -> None:
Static URL updated monthly. Covers Nov 1996 to present. Static URL updated monthly. Covers Nov 1996 to present.
Idempotent: skips if content hash already on disk. Idempotent: skips if content hash already on disk.
""" """
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, "ice_historical")
try:
logger.info(f"Downloading ICE historical by-port XLS: {ICE_HISTORICAL_URL}") logger.info(f"Downloading ICE historical by-port XLS: {ICE_HISTORICAL_URL}")
with niquests.Session() as session: with niquests.Session() as session:
@@ -394,6 +442,7 @@ def extract_ice_historical() -> None:
response = session.get(ICE_HISTORICAL_URL, timeout=HISTORICAL_HTTP_TIMEOUT_SECONDS) response = session.get(ICE_HISTORICAL_URL, timeout=HISTORICAL_HTTP_TIMEOUT_SECONDS)
except Exception as e: except Exception as e:
logger.error(f"Failed to download historical XLS: {e}") logger.error(f"Failed to download historical XLS: {e}")
end_run(conn, run_id, status="failed", error_message=str(e))
return return
assert response.status_code == 200, f"HTTP {response.status_code}" assert response.status_code == 200, f"HTTP {response.status_code}"
@@ -439,11 +488,24 @@ def extract_ice_historical() -> None:
if not data_rows: if not data_rows:
logger.warning("Historical XLS parsed to 0 data rows") logger.warning("Historical XLS parsed to 0 data rows")
end_run(conn, run_id, status="failed", error_message="Parsed 0 data rows")
return return
canonical_csv = _build_csv_bytes(fieldnames, data_rows) canonical_csv = _build_csv_bytes(fieldnames, data_rows)
today = datetime.now().strftime("%Y-%m-%d") today = datetime.now().strftime("%Y-%m-%d")
_write_landing_file(canonical_csv, HISTORICAL_DEST_SUBDIR, today) bytes_written = _write_landing_file(canonical_csv, HISTORICAL_DEST_SUBDIR, today)
end_run(
conn, run_id, status="success",
files_written=1 if bytes_written > 0 else 0,
files_skipped=1 if bytes_written == 0 else 0,
bytes_written=bytes_written,
cursor_value=today,
)
except Exception as e:
end_run(conn, run_id, status="failed", error_message=str(e))
raise
finally:
conn.close()
def extract_ice_stocks_backfill(max_pages: int = 3) -> None: def extract_ice_stocks_backfill(max_pages: int = 3) -> None:
@@ -458,17 +520,22 @@ def extract_ice_stocks_backfill(max_pages: int = 3) -> None:
""" """
assert max_pages > 0, f"max_pages must be positive, got {max_pages}" assert max_pages > 0, f"max_pages must be positive, got {max_pages}"
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, "ice_stocks_backfill")
files_written = 0
files_skipped = 0
bytes_written_total = 0
try:
with niquests.Session() as session: with niquests.Session() as session:
logger.info(f"Fetching all available Daily Warehouse Stocks reports (max {max_pages} pages)...") logger.info(f"Fetching all available Daily Warehouse Stocks reports (max {max_pages} pages)...")
reports = find_all_reports(session, ICE_STOCKS_LABEL, max_pages=max_pages) reports = find_all_reports(session, ICE_STOCKS_LABEL, max_pages=max_pages)
if not reports: if not reports:
logger.error("ICE API: no 'Daily Warehouse Stocks' reports found") logger.error("ICE API: no 'Daily Warehouse Stocks' reports found")
end_run(conn, run_id, status="failed", error_message="No reports found via API")
return return
logger.info(f"Found {len(reports)} reports: {reports[-1]['publish_date']}{reports[0]['publish_date']}") logger.info(f"Found {len(reports)} reports: {reports[-1]['publish_date']}{reports[0]['publish_date']}")
downloaded = 0
skipped = 0
for report in reports: for report in reports:
publish_date = report["publish_date"] publish_date = report["publish_date"]
@@ -492,16 +559,24 @@ def extract_ice_stocks_backfill(max_pages: int = 3) -> None:
logger.warning(f"Parsed 0 rows for {publish_date} — skipping") logger.warning(f"Parsed 0 rows for {publish_date} — skipping")
continue continue
# Use the report's publish date as the file date label result = _write_landing_file(canonical_csv, DEST_SUBDIR, publish_date)
file_count_before = sum(1 for _ in (LANDING_DIR / DEST_SUBDIR).rglob("*.csv.gzip")) if result > 0:
_write_landing_file(canonical_csv, DEST_SUBDIR, publish_date) files_written += 1
file_count_after = sum(1 for _ in (LANDING_DIR / DEST_SUBDIR).rglob("*.csv.gzip")) bytes_written_total += result
if file_count_after > file_count_before:
downloaded += 1
else: else:
skipped += 1 files_skipped += 1
logger.info(f"Backfill complete: {downloaded} new files downloaded, {skipped} already existed") logger.info(f"Backfill complete: {files_written} new files downloaded, {files_skipped} already existed")
end_run(
conn, run_id, status="success",
files_written=files_written, files_skipped=files_skipped,
bytes_written=bytes_written_total,
)
except Exception as e:
end_run(conn, run_id, status="failed", error_message=str(e))
raise
finally:
conn.close()
def extract_ice_all() -> None: def extract_ice_all() -> None:

View File

@@ -8,6 +8,7 @@ authors = [
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = [ dependencies = [
"extract_core",
"niquests>=3.14.1", "niquests>=3.14.1",
] ]
[project.scripts] [project.scripts]

View File

@@ -1,12 +1,14 @@
from .normalize import normalize_zipped_csv from .normalize import normalize_zipped_csv
import logging import logging
import os import os
import pathlib
import sys import sys
from datetime import datetime from datetime import datetime
from io import BytesIO from io import BytesIO
from pathlib import Path
import niquests import niquests
from extract_core import end_run, landing_path, normalize_etag, open_state_db, start_run
from extract_core import write_bytes_atomic
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -16,7 +18,7 @@ logging.basicConfig(
) )
logger = logging.getLogger("PSDOnline Extractor") logger = logging.getLogger("PSDOnline Extractor")
LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing"))
LANDING_DIR.mkdir(parents=True, exist_ok=True) LANDING_DIR.mkdir(parents=True, exist_ok=True)
logger.info(f"Landing dir: {LANDING_DIR}") logger.info(f"Landing dir: {LANDING_DIR}")
@@ -27,39 +29,48 @@ FIRST_MONTH = 8
HTTP_TIMEOUT_SECONDS = 60 HTTP_TIMEOUT_SECONDS = 60
def extract_psd_file(url: str, year: int, month: int, http_session: niquests.Session): def extract_psd_file(url: str, year: int, month: int, http_session: niquests.Session) -> int:
"""Extract PSD file to local year/month subdirectory.""" """Extract PSD file to local year/month subdirectory.
Returns bytes_written (0 if the file already existed and was skipped).
"""
logger.info(f"Requesting file {url} ...") logger.info(f"Requesting file {url} ...")
response = http_session.head(url, timeout=HTTP_TIMEOUT_SECONDS) response = http_session.head(url, timeout=HTTP_TIMEOUT_SECONDS)
if response.status_code == 404: if response.status_code == 404:
logger.error("File doesn't exist on server, received status code 404 Not Found") logger.error("File doesn't exist on server, received status code 404 Not Found")
return return 0
elif response.status_code != 200: elif response.status_code != 200:
logger.error(f"Status code not ok, STATUS={response.status_code}") logger.error(f"Status code not ok, STATUS={response.status_code}")
return return 0
etag = response.headers.get("etag", "").replace('"', "").replace(":", "_") raw_etag = response.headers.get("etag", "")
assert etag, "USDA response missing etag header" assert raw_etag, "USDA response missing etag header"
etag = normalize_etag(raw_etag)
extract_to_path = LANDING_DIR / "psd" / str(year) / f"{month:02d}" dest_dir = landing_path(LANDING_DIR, "psd", str(year), f"{month:02d}")
local_file = extract_to_path / f"{etag}.csv.gzip" local_file = dest_dir / f"{etag}.csv.gzip"
if local_file.exists(): if local_file.exists():
logger.info(f"File {etag}.csv.gzip already exists locally, skipping") logger.info(f"File {etag}.csv.gzip already exists locally, skipping")
return return 0
response = http_session.get(url, timeout=HTTP_TIMEOUT_SECONDS) response = http_session.get(url, timeout=HTTP_TIMEOUT_SECONDS)
logger.info(f"Storing file to {local_file}") logger.info(f"Storing file to {local_file}")
extract_to_path.mkdir(parents=True, exist_ok=True)
normalized_content = normalize_zipped_csv(BytesIO(response.content)) normalized_content = normalize_zipped_csv(BytesIO(response.content))
local_file.write_bytes(normalized_content.read()) bytes_written = write_bytes_atomic(local_file, normalized_content.read())
assert local_file.exists(), f"File was not written: {local_file}"
logger.info("Download complete") logger.info("Download complete")
return bytes_written
def extract_psd_dataset(): def extract_psd_dataset():
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, "psdonline")
files_written = 0
files_skipped = 0
bytes_written = 0
cursor_value = None
try:
today = datetime.now() today = datetime.now()
with niquests.Session() as session: with niquests.Session() as session:
for months_back in range(4): for months_back in range(4):
year = today.year year = today.year
@@ -74,15 +85,32 @@ def extract_psd_dataset():
response = session.head(url, timeout=HTTP_TIMEOUT_SECONDS) response = session.head(url, timeout=HTTP_TIMEOUT_SECONDS)
if response.status_code == 200: if response.status_code == 200:
logger.info(f"Found latest data at {year}-{month:02d}") logger.info(f"Found latest data at {year}-{month:02d}")
extract_psd_file(url=url, year=year, month=month, http_session=session) result = extract_psd_file(url=url, year=year, month=month, http_session=session)
return if result > 0:
files_written = 1
bytes_written = result
else:
files_skipped = 1
cursor_value = f"{year}-{month:02d}"
break
elif response.status_code == 404: elif response.status_code == 404:
logger.info(f"Month {year}-{month:02d} not found, trying earlier...") logger.info(f"Month {year}-{month:02d} not found, trying earlier...")
else: else:
logger.warning(f"Unexpected status code {response.status_code} for {year}-{month:02d}") logger.warning(f"Unexpected status code {response.status_code} for {year}-{month:02d}")
else:
logger.error("Could not find any available data in the last 4 months") logger.error("Could not find any available data in the last 4 months")
end_run(
conn, run_id, status="success",
files_written=files_written, files_skipped=files_skipped,
bytes_written=bytes_written, cursor_value=cursor_value,
)
except Exception as e:
end_run(conn, run_id, status="failed", error_message=str(e))
raise
finally:
conn.close()
if __name__ == "__main__": if __name__ == "__main__":
extract_psd_dataset() extract_psd_dataset()

View File

@@ -40,6 +40,7 @@ dev = [
] ]
[tool.uv.sources] [tool.uv.sources]
extract_core = {workspace = true }
psdonline = {workspace = true } psdonline = {workspace = true }
sqlmesh_materia = {workspace = true } sqlmesh_materia = {workspace = true }
cftc_cot = {workspace = true } cftc_cot = {workspace = true }

View File

@@ -123,7 +123,7 @@ def test_extract_cot_year_skips_existing_file(tmp_path, monkeypatch):
result = cot_execute.extract_cot_year(2024, mock_session) result = cot_execute.extract_cot_year(2024, mock_session)
assert result is False assert result == 0
mock_session.get.assert_not_called() # No download should occur mock_session.get.assert_not_called() # No download should occur
@@ -143,5 +143,5 @@ def test_extract_cot_year_returns_false_on_404(tmp_path, monkeypatch):
result = cot_execute.extract_cot_year(2006, mock_session) result = cot_execute.extract_cot_year(2006, mock_session)
assert result is False assert result == 0
mock_session.get.assert_not_called() mock_session.get.assert_not_called()

32
uv.lock generated
View File

@@ -11,6 +11,7 @@ members = [
"beanflows", "beanflows",
"cftc-cot", "cftc-cot",
"coffee-prices", "coffee-prices",
"extract-core",
"ice-stocks", "ice-stocks",
"materia", "materia",
"psdonline", "psdonline",
@@ -365,11 +366,15 @@ name = "cftc-cot"
version = "0.1.0" version = "0.1.0"
source = { editable = "extract/cftc_cot" } source = { editable = "extract/cftc_cot" }
dependencies = [ dependencies = [
{ name = "extract-core" },
{ name = "niquests" }, { name = "niquests" },
] ]
[package.metadata] [package.metadata]
requires-dist = [{ name = "niquests", specifier = ">=3.14.1" }] requires-dist = [
{ name = "extract-core", editable = "extract/extract_core" },
{ name = "niquests", specifier = ">=3.14.1" },
]
[[package]] [[package]]
name = "charset-normalizer" name = "charset-normalizer"
@@ -438,11 +443,15 @@ name = "coffee-prices"
version = "0.1.0" version = "0.1.0"
source = { editable = "extract/coffee_prices" } source = { editable = "extract/coffee_prices" }
dependencies = [ dependencies = [
{ name = "extract-core" },
{ name = "yfinance" }, { name = "yfinance" },
] ]
[package.metadata] [package.metadata]
requires-dist = [{ name = "yfinance", specifier = ">=0.2.55" }] requires-dist = [
{ name = "extract-core", editable = "extract/extract_core" },
{ name = "yfinance", specifier = ">=0.2.55" },
]
[[package]] [[package]]
name = "colorama" name = "colorama"
@@ -740,6 +749,17 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c1/ea/53f2148663b321f21b5a606bd5f191517cf40b7072c0497d3c92c4a13b1e/executing-2.2.1-py2.py3-none-any.whl", hash = "sha256:760643d3452b4d777d295bb167ccc74c64a81df23fb5e08eff250c425a4b2017", size = 28317, upload-time = "2025-09-01T09:48:08.5Z" }, { url = "https://files.pythonhosted.org/packages/c1/ea/53f2148663b321f21b5a606bd5f191517cf40b7072c0497d3c92c4a13b1e/executing-2.2.1-py2.py3-none-any.whl", hash = "sha256:760643d3452b4d777d295bb167ccc74c64a81df23fb5e08eff250c425a4b2017", size = 28317, upload-time = "2025-09-01T09:48:08.5Z" },
] ]
[[package]]
name = "extract-core"
version = "0.1.0"
source = { editable = "extract/extract_core" }
dependencies = [
{ name = "niquests" },
]
[package.metadata]
requires-dist = [{ name = "niquests", specifier = ">=3.14.1" }]
[[package]] [[package]]
name = "fakeredis" name = "fakeredis"
version = "2.34.0" version = "2.34.0"
@@ -1059,12 +1079,14 @@ name = "ice-stocks"
version = "0.1.0" version = "0.1.0"
source = { editable = "extract/ice_stocks" } source = { editable = "extract/ice_stocks" }
dependencies = [ dependencies = [
{ name = "extract-core" },
{ name = "niquests" }, { name = "niquests" },
{ name = "xlrd" }, { name = "xlrd" },
] ]
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "extract-core", editable = "extract/extract_core" },
{ name = "niquests", specifier = ">=3.14.1" }, { name = "niquests", specifier = ">=3.14.1" },
{ name = "xlrd", specifier = ">=2.0.1" }, { name = "xlrd", specifier = ">=2.0.1" },
] ]
@@ -2067,11 +2089,15 @@ name = "psdonline"
version = "0.1.0" version = "0.1.0"
source = { editable = "extract/psdonline" } source = { editable = "extract/psdonline" }
dependencies = [ dependencies = [
{ name = "extract-core" },
{ name = "niquests" }, { name = "niquests" },
] ]
[package.metadata] [package.metadata]
requires-dist = [{ name = "niquests", specifier = ">=3.14.1" }] requires-dist = [
{ name = "extract-core", editable = "extract/extract_core" },
{ name = "niquests", specifier = ">=3.14.1" },
]
[[package]] [[package]]
name = "psutil" name = "psutil"