diff --git a/extract/cftc_cot/pyproject.toml b/extract/cftc_cot/pyproject.toml index 743057c..b3275b3 100644 --- a/extract/cftc_cot/pyproject.toml +++ b/extract/cftc_cot/pyproject.toml @@ -4,6 +4,7 @@ version = "0.1.0" description = "CFTC Commitment of Traders data extractor" requires-python = ">=3.13" dependencies = [ + "extract_core", "niquests>=3.14.1", ] diff --git a/extract/cftc_cot/src/cftc_cot/execute.py b/extract/cftc_cot/src/cftc_cot/execute.py index aaebecc..cb3d87e 100644 --- a/extract/cftc_cot/src/cftc_cot/execute.py +++ b/extract/cftc_cot/src/cftc_cot/execute.py @@ -10,12 +10,20 @@ Landing path: LANDING_DIR/cot/{year}/{etag}.csv.gzip import logging import os -import pathlib import sys from datetime import datetime from io import BytesIO +from pathlib import Path import niquests +from extract_core import ( + end_run, + landing_path, + normalize_etag, + open_state_db, + start_run, + write_bytes_atomic, +) from .normalize import find_csv_inner_filename, normalize_zipped_csv @@ -27,7 +35,7 @@ logging.basicConfig( ) logger = logging.getLogger("CFTC COT Extractor") -LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) +LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing")) # CFTC publishes yearly ZIPs for the disaggregated futures-only report. # The file for the current year is updated each Friday at 3:30 PM ET. @@ -52,10 +60,10 @@ def _synthetic_etag(year: int, headers: dict) -> str: return etag -def extract_cot_year(year: int, http_session: niquests.Session) -> bool: +def extract_cot_year(year: int, http_session: niquests.Session) -> int: """Download and store COT data for a single year. - Returns True if a new file was written, False if skipped or unavailable. + Returns bytes_written (0 if skipped or unavailable). """ url = COT_URL_TEMPLATE.format(year=year) logger.info(f"Checking COT data for {year}: {url}") @@ -63,20 +71,20 @@ def extract_cot_year(year: int, http_session: niquests.Session) -> bool: head = http_session.head(url, timeout=HTTP_TIMEOUT_SECONDS) if head.status_code == 404: logger.info(f"Year {year} not available (404) — skipping") - return False + return 0 assert head.status_code == 200, ( f"Unexpected HEAD status {head.status_code} for {url}" ) raw_etag = head.headers.get("etag", "") - etag = raw_etag.replace('"', "").replace(":", "_") if raw_etag else _synthetic_etag(year, head.headers) + etag = normalize_etag(raw_etag) if raw_etag else _synthetic_etag(year, head.headers) - dest_dir = LANDING_DIR / "cot" / str(year) + dest_dir = landing_path(LANDING_DIR, "cot", str(year)) local_file = dest_dir / f"{etag}.csv.gzip" if local_file.exists(): logger.info(f"Year {year}: {etag}.csv.gzip already exists, skipping") - return False + return 0 logger.info(f"Downloading COT data for {year}...") response = http_session.get(url, timeout=HTTP_TIMEOUT_SECONDS) @@ -89,14 +97,11 @@ def extract_cot_year(year: int, http_session: niquests.Session) -> bool: inner_filename = find_csv_inner_filename(BytesIO(response.content)) normalized = normalize_zipped_csv(zip_buffer, inner_filename) - dest_dir.mkdir(parents=True, exist_ok=True) - local_file.write_bytes(normalized.read()) - - assert local_file.exists(), f"File was not written: {local_file}" + bytes_written = write_bytes_atomic(local_file, normalized.read()) assert local_file.stat().st_size > 0, f"Written file is empty: {local_file}" - logger.info(f"Stored {local_file} ({local_file.stat().st_size:,} bytes)") - return True + logger.info(f"Stored {local_file} ({bytes_written:,} bytes)") + return bytes_written def extract_cot_dataset(): @@ -113,16 +118,36 @@ def extract_cot_dataset(): f"Year range {len(years)} exceeds MAX_YEARS={MAX_YEARS}" ) - new_count = 0 - with niquests.Session() as session: - for year in years: - try: - if extract_cot_year(year, session): - new_count += 1 - except Exception: - logger.exception(f"Failed to extract COT data for {year}, continuing") + conn = open_state_db(LANDING_DIR) + run_id = start_run(conn, "cftc_cot") + files_written = 0 + files_skipped = 0 + bytes_written_total = 0 + try: + with niquests.Session() as session: + for year in years: + try: + result = extract_cot_year(year, session) + if result > 0: + files_written += 1 + bytes_written_total += result + else: + files_skipped += 1 + except Exception: + logger.exception(f"Failed to extract COT data for {year}, continuing") - logger.info(f"COT extraction complete: {new_count} new file(s) downloaded") + logger.info(f"COT extraction complete: {files_written} new file(s) downloaded") + end_run( + conn, run_id, status="success", + files_written=files_written, files_skipped=files_skipped, + bytes_written=bytes_written_total, + cursor_value=str(current_year), + ) + except Exception as e: + end_run(conn, run_id, status="failed", error_message=str(e)) + raise + finally: + conn.close() if __name__ == "__main__": diff --git a/extract/coffee_prices/pyproject.toml b/extract/coffee_prices/pyproject.toml index 34cd61e..599bb96 100644 --- a/extract/coffee_prices/pyproject.toml +++ b/extract/coffee_prices/pyproject.toml @@ -4,6 +4,7 @@ version = "0.1.0" description = "KC=F Coffee C futures price extractor" requires-python = ">=3.13" dependencies = [ + "extract_core", "yfinance>=0.2.55", ] diff --git a/extract/coffee_prices/src/coffee_prices/execute.py b/extract/coffee_prices/src/coffee_prices/execute.py index 1fa01d2..694e7ba 100644 --- a/extract/coffee_prices/src/coffee_prices/execute.py +++ b/extract/coffee_prices/src/coffee_prices/execute.py @@ -8,14 +8,15 @@ Landing path: LANDING_DIR/prices/coffee_kc/{hash8}.csv.gzip """ import gzip -import hashlib import io import logging import os -import pathlib import sys +from pathlib import Path import yfinance as yf +from extract_core import content_hash, end_run, landing_path, open_state_db, start_run +from extract_core import write_bytes_atomic logging.basicConfig( level=logging.INFO, @@ -25,7 +26,7 @@ logging.basicConfig( ) logger = logging.getLogger("Coffee Prices Extractor") -LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) +LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing")) TICKER = "KC=F" DEST_SUBDIR = "prices/coffee_kc" @@ -40,52 +41,54 @@ def extract_coffee_prices() -> None: On first run downloads full history (period='max'). On subsequent runs the hash matches if no new trading days have closed since last run. """ - logger.info(f"Downloading {TICKER} daily OHLCV from Yahoo Finance...") + conn = open_state_db(LANDING_DIR) + run_id = start_run(conn, "coffee_prices") + try: + logger.info(f"Downloading {TICKER} daily OHLCV from Yahoo Finance...") - ticker = yf.Ticker(TICKER) - df = ticker.history(period="max", interval="1d", auto_adjust=False, timeout=DOWNLOAD_TIMEOUT_SECONDS) + ticker = yf.Ticker(TICKER) + df = ticker.history(period="max", interval="1d", auto_adjust=False, timeout=DOWNLOAD_TIMEOUT_SECONDS) - assert df is not None and len(df) > 0, f"yfinance returned empty DataFrame for {TICKER}" + assert df is not None and len(df) > 0, f"yfinance returned empty DataFrame for {TICKER}" - # Reset index so Date becomes a plain column - df = df.reset_index() + # Reset index so Date becomes a plain column + df = df.reset_index() - # Keep standard OHLCV columns only; yfinance may return extra columns - keep_cols = [c for c in ["Date", "Open", "High", "Low", "Close", "Adj Close", "Volume"] if c in df.columns] - df = df[keep_cols] + # Keep standard OHLCV columns only; yfinance may return extra columns + keep_cols = [c for c in ["Date", "Open", "High", "Low", "Close", "Adj Close", "Volume"] if c in df.columns] + df = df[keep_cols] - # Normalize Date to ISO string for CSV stability across timezones - df["Date"] = df["Date"].dt.strftime("%Y-%m-%d") + # Normalize Date to ISO string for CSV stability across timezones + df["Date"] = df["Date"].dt.strftime("%Y-%m-%d") - # Serialize to CSV bytes - csv_buf = io.StringIO() - df.to_csv(csv_buf, index=False) - csv_bytes = csv_buf.getvalue().encode("utf-8") + # Serialize to CSV bytes + csv_buf = io.StringIO() + df.to_csv(csv_buf, index=False) + csv_bytes = csv_buf.getvalue().encode("utf-8") - assert len(csv_bytes) > 0, "CSV serialization produced empty output" + assert len(csv_bytes) > 0, "CSV serialization produced empty output" - # Hash-based idempotency key (first 8 hex chars of SHA256) - sha256 = hashlib.sha256(csv_bytes).hexdigest() - etag = sha256[:8] + etag = content_hash(csv_bytes) + dest_dir = landing_path(LANDING_DIR, DEST_SUBDIR) + local_file = dest_dir / f"{etag}.csv.gzip" - dest_dir = LANDING_DIR / DEST_SUBDIR - local_file = dest_dir / f"{etag}.csv.gzip" + if local_file.exists(): + logger.info(f"File {local_file.name} already exists — no new data, skipping") + end_run(conn, run_id, status="success", files_skipped=1) + return - if local_file.exists(): - logger.info(f"File {local_file.name} already exists — no new data, skipping") - return + compressed = gzip.compress(csv_bytes) + bytes_written = write_bytes_atomic(local_file, compressed) - # Compress and write - dest_dir.mkdir(parents=True, exist_ok=True) - compressed = gzip.compress(csv_bytes) - local_file.write_bytes(compressed) - - assert local_file.exists(), f"File was not written: {local_file}" - assert local_file.stat().st_size > 0, f"Written file is empty: {local_file}" - - logger.info( - f"Stored {local_file} ({local_file.stat().st_size:,} bytes, {len(df):,} rows)" - ) + logger.info( + f"Stored {local_file} ({bytes_written:,} bytes, {len(df):,} rows)" + ) + end_run(conn, run_id, status="success", files_written=1, bytes_written=bytes_written) + except Exception as e: + end_run(conn, run_id, status="failed", error_message=str(e)) + raise + finally: + conn.close() if __name__ == "__main__": diff --git a/extract/extract_core/pyproject.toml b/extract/extract_core/pyproject.toml new file mode 100644 index 0000000..71d905a --- /dev/null +++ b/extract/extract_core/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "extract_core" +version = "0.1.0" +description = "Shared extraction utilities: SQLite state tracking, HTTP helpers, file I/O" +requires-python = ">=3.13" +dependencies = [ + "niquests>=3.14.1", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/extract_core"] diff --git a/extract/extract_core/src/extract_core/__init__.py b/extract/extract_core/src/extract_core/__init__.py new file mode 100644 index 0000000..4edb89c --- /dev/null +++ b/extract/extract_core/src/extract_core/__init__.py @@ -0,0 +1,17 @@ +from .files import content_hash, landing_path, write_bytes_atomic +from .http import create_session, head_etag, normalize_etag +from .state import end_run, get_last_cursor, get_recent_runs, open_state_db, start_run + +__all__ = [ + "open_state_db", + "start_run", + "end_run", + "get_last_cursor", + "get_recent_runs", + "create_session", + "head_etag", + "normalize_etag", + "landing_path", + "content_hash", + "write_bytes_atomic", +] diff --git a/extract/extract_core/src/extract_core/files.py b/extract/extract_core/src/extract_core/files.py new file mode 100644 index 0000000..949c6a1 --- /dev/null +++ b/extract/extract_core/src/extract_core/files.py @@ -0,0 +1,53 @@ +"""Landing zone file I/O helpers for extraction pipelines.""" + +import hashlib +from pathlib import Path + + +def landing_path(landing_dir: str | Path, *parts: str) -> Path: + """Return path to a subdirectory of landing_dir, creating it if absent. + + Example: + dest_dir = landing_path("data/landing", "psd", "2025", "07") + # Returns Path("data/landing/psd/2025/07"), all directories created. + + Use this instead of manually calling mkdir — it makes the intent clear + and ensures the directory always exists before the caller writes to it. + """ + assert landing_dir, "landing_dir must not be empty" + path = Path(landing_dir).joinpath(*parts) + path.mkdir(parents=True, exist_ok=True) + return path + + +def content_hash(data: bytes, prefix_bytes: int = 8) -> str: + """Return the first prefix_bytes hex chars of the SHA256 digest of data. + + Used as a short idempotency key for content-addressed filenames. Two runs + that produce identical data will produce the same hash, so the file already + existing on disk is proof the content is unchanged. + + prefix_bytes=8 gives 32-bit collision resistance — sufficient for a few + thousand files per extractor. Increase to 16 for very high-volume sources. + """ + assert data, "data must not be empty" + assert 1 <= prefix_bytes <= 64, f"prefix_bytes must be 1-64, got {prefix_bytes}" + return hashlib.sha256(data).hexdigest()[:prefix_bytes] + + +def write_bytes_atomic(path: Path, data: bytes) -> int: + """Write data to path atomically via a .tmp sibling file. + + Writes to {path}.tmp first, then renames to path. On Linux, rename() is + atomic when src and dst are on the same filesystem — a reader never sees + a partial file. Returns the number of bytes written. + + Callers are responsible for compression (e.g. gzip.compress) before calling + this function. + """ + assert data, "data must not be empty" + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_bytes(data) + tmp.rename(path) + assert path.exists(), f"File was not written: {path}" + return len(data) diff --git a/extract/extract_core/src/extract_core/http.py b/extract/extract_core/src/extract_core/http.py new file mode 100644 index 0000000..0be2f8b --- /dev/null +++ b/extract/extract_core/src/extract_core/http.py @@ -0,0 +1,43 @@ +"""HTTP session factory and etag helpers for extraction pipelines.""" + +import niquests + + +def create_session(timeout_seconds: int = 60, max_retries: int = 3) -> niquests.Session: + """Return a new niquests Session. + + timeout_seconds is stored as an attribute so callers that need a consistent + timeout can reference it via session.timeout_seconds. + """ + assert timeout_seconds > 0, f"timeout_seconds must be positive, got {timeout_seconds}" + assert max_retries >= 0, f"max_retries must be non-negative, got {max_retries}" + session = niquests.Session() + session.timeout_seconds = timeout_seconds # type: ignore[attr-defined] + return session + + +def normalize_etag(raw: str) -> str: + """Normalize an HTTP etag for use as a filename component. + + Strips surrounding quotes and replaces colons with underscores so the + result is safe as a filename on all platforms. + + Example: '"abc:def"' → 'abc_def' + """ + assert raw, "raw etag must not be empty" + return raw.strip().strip('"').replace(":", "_") + + +def head_etag(session: niquests.Session, url: str, timeout_seconds: int = 60) -> str | None: + """Send a HEAD request and return the normalized etag, or None if absent. + + Returns None (not raises) when the server omits the etag header, so callers + can fall back to a synthetic key (e.g. content-length + last-modified). + """ + assert url, "url must not be empty" + assert timeout_seconds > 0, f"timeout_seconds must be positive, got {timeout_seconds}" + response = session.head(url, timeout=timeout_seconds) + raw = response.headers.get("etag", "") + if not raw: + return None + return normalize_etag(raw) diff --git a/extract/extract_core/src/extract_core/state.py b/extract/extract_core/src/extract_core/state.py new file mode 100644 index 0000000..59a5fb4 --- /dev/null +++ b/extract/extract_core/src/extract_core/state.py @@ -0,0 +1,122 @@ +"""SQLite-backed extraction run state. + +State table lives at {LANDING_DIR}/.state.sqlite — derived from LANDING_DIR, +no extra env var required. SQLite is used (not DuckDB) because state tracking +is a transactional workload: single-row inserts and point-lookup updates, not +analytical scans. WAL mode allows concurrent readers while a run is in progress. + +Schema is generic: extractor is a free-text name, cursor_value is a TEXT field +that can hold any cursor type (date string, etag, page number, or JSON for +multi-dimensional cursors). The schema should never need changing — add new +extractor names by just using them. +""" + +import sqlite3 +from pathlib import Path + +_CREATE_TABLE_SQL = """ +CREATE TABLE IF NOT EXISTS extraction_runs ( + run_id INTEGER PRIMARY KEY AUTOINCREMENT, + extractor TEXT NOT NULL, + started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + finished_at TEXT, + status TEXT NOT NULL DEFAULT 'running', + files_written INTEGER DEFAULT 0, + files_skipped INTEGER DEFAULT 0, + bytes_written INTEGER DEFAULT 0, + cursor_value TEXT, + error_message TEXT +) +""" + + +def open_state_db(landing_dir: str | Path) -> sqlite3.Connection: + """Open (or create) the state DB at {landing_dir}/.state.sqlite. + + Enables WAL mode so concurrent readers (e.g. monitoring scripts) can query + the DB while an extraction run is in progress. Caller must close when done. + """ + assert landing_dir, "landing_dir must not be empty" + db_path = Path(landing_dir) / ".state.sqlite" + db_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute(_CREATE_TABLE_SQL) + conn.commit() + return conn + + +def start_run(conn: sqlite3.Connection, extractor: str) -> int: + """Insert a 'running' row for extractor. Returns the new run_id.""" + assert extractor, "extractor name must not be empty" + cur = conn.execute( + "INSERT INTO extraction_runs (extractor, status) VALUES (?, 'running')", + (extractor,), + ) + conn.commit() + assert cur.lastrowid is not None, "INSERT did not return a row ID" + return cur.lastrowid + + +def end_run( + conn: sqlite3.Connection, + run_id: int, + *, + status: str, + files_written: int = 0, + files_skipped: int = 0, + bytes_written: int = 0, + cursor_value: str | None = None, + error_message: str | None = None, +) -> None: + """Update the run row to its final state (success or failed).""" + assert status in ("success", "failed"), f"status must be 'success' or 'failed', got {status!r}" + assert run_id > 0, f"run_id must be positive, got {run_id}" + assert files_written >= 0, f"files_written must be non-negative, got {files_written}" + assert files_skipped >= 0, f"files_skipped must be non-negative, got {files_skipped}" + assert bytes_written >= 0, f"bytes_written must be non-negative, got {bytes_written}" + conn.execute( + """ + UPDATE extraction_runs + SET finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), + status = ?, + files_written = ?, + files_skipped = ?, + bytes_written = ?, + cursor_value = ?, + error_message = ? + WHERE run_id = ? + """, + (status, files_written, files_skipped, bytes_written, cursor_value, error_message, run_id), + ) + conn.commit() + + +def get_last_cursor(conn: sqlite3.Connection, extractor: str) -> str | None: + """Return the cursor_value from the most recent successful run, or None.""" + row = conn.execute( + """ + SELECT cursor_value FROM extraction_runs + WHERE extractor = ? AND status = 'success' AND cursor_value IS NOT NULL + ORDER BY run_id DESC + LIMIT 1 + """, + (extractor,), + ).fetchone() + return row["cursor_value"] if row else None + + +def get_recent_runs(conn: sqlite3.Connection, extractor: str, limit: int = 10) -> list[dict]: + """Return the most recent runs for an extractor, newest first.""" + assert limit > 0, f"limit must be positive, got {limit}" + rows = conn.execute( + """ + SELECT * FROM extraction_runs + WHERE extractor = ? + ORDER BY run_id DESC + LIMIT ? + """, + (extractor, limit), + ).fetchall() + return [dict(r) for r in rows] diff --git a/extract/ice_stocks/pyproject.toml b/extract/ice_stocks/pyproject.toml index b31be93..88b299e 100644 --- a/extract/ice_stocks/pyproject.toml +++ b/extract/ice_stocks/pyproject.toml @@ -4,6 +4,7 @@ version = "0.1.0" description = "ICE certified warehouse stocks extractor" requires-python = ">=3.13" dependencies = [ + "extract_core", "niquests>=3.14.1", "xlrd>=2.0.1", ] diff --git a/extract/ice_stocks/src/ice_stocks/execute.py b/extract/ice_stocks/src/ice_stocks/execute.py index 6645a6b..e79e58c 100644 --- a/extract/ice_stocks/src/ice_stocks/execute.py +++ b/extract/ice_stocks/src/ice_stocks/execute.py @@ -20,16 +20,23 @@ CSV schemas: import csv import gzip -import hashlib import io import logging import os -import pathlib import sys from datetime import datetime +from pathlib import Path import niquests import xlrd +from extract_core import ( + content_hash, + end_run, + landing_path, + open_state_db, + start_run, + write_bytes_atomic, +) from ice_stocks.ice_api import find_all_reports, find_latest_report from ice_stocks.xls_parse import OLE2_MAGIC, detect_file_format, xls_to_rows @@ -42,7 +49,7 @@ logging.basicConfig( ) logger = logging.getLogger("ICE Stocks Extractor") -LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) +LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing")) # ── ice_stocks (daily rolling) ────────────────────────────────────────────── DEST_SUBDIR = "ice_stocks" @@ -105,31 +112,30 @@ HISTORICAL_PORT_COLS = [ # ── shared helpers ─────────────────────────────────────────────────────────── -def _write_landing_file(canonical_csv: bytes, dest_subdir: str, date_label: str) -> None: - """SHA256-hash canonical_csv, skip if exists, else gzip and write.""" +def _write_landing_file(canonical_csv: bytes, dest_subdir: str, date_label: str) -> int: + """SHA256-hash canonical_csv, skip if exists, else gzip and write atomically. + + Returns bytes_written (0 if skipped). + """ assert canonical_csv, "canonical_csv must not be empty" assert dest_subdir, "dest_subdir must not be empty" assert date_label, "date_label must not be empty" - sha256 = hashlib.sha256(canonical_csv).hexdigest() - etag = sha256[:8] + etag = content_hash(canonical_csv) year = date_label[:4] - dest_dir = LANDING_DIR / dest_subdir / year + dest_dir = landing_path(LANDING_DIR, dest_subdir, year) local_file = dest_dir / f"{date_label}_{etag}.csv.gzip" if local_file.exists(): logger.info(f"File {local_file.name} already exists — content unchanged, skipping") - return + return 0 - dest_dir.mkdir(parents=True, exist_ok=True) compressed = gzip.compress(canonical_csv) - local_file.write_bytes(compressed) + bytes_written = write_bytes_atomic(local_file, compressed) - assert local_file.exists(), f"File was not written: {local_file}" - assert local_file.stat().st_size > 0, f"Written file is empty: {local_file}" - - logger.info(f"Stored {local_file} ({local_file.stat().st_size:,} bytes)") + logger.info(f"Stored {local_file} ({bytes_written:,} bytes)") + return bytes_written def _build_csv_bytes(fieldnames: list[str], rows: list[dict]) -> bytes: @@ -243,47 +249,66 @@ def extract_ice_stocks() -> None: discovery to find the latest 'Daily Warehouse Stocks' report. Idempotent: skips if content hash already on disk. """ - with niquests.Session() as session: - logger.info(f"Trying ICE rolling CSV: {ICE_ROLLING_CSV_URL}") - try: - response = session.get(ICE_ROLLING_CSV_URL, timeout=HTTP_TIMEOUT_SECONDS) - except Exception as e: - logger.warning(f"Rolling CSV fetch failed: {e} — trying API discovery") - response = None - - use_api = response is None or response.status_code == 404 - - if use_api: - logger.info("Falling back to ICE API discovery for Daily Warehouse Stocks") - report = find_latest_report(session, ICE_STOCKS_LABEL) - if not report: - logger.error("ICE API: no 'Daily Warehouse Stocks' report found") - return - logger.info(f"Found report via API: {report['download_label']} ({report['publish_date']})") + conn = open_state_db(LANDING_DIR) + run_id = start_run(conn, "ice_stocks") + try: + with niquests.Session() as session: + logger.info(f"Trying ICE rolling CSV: {ICE_ROLLING_CSV_URL}") try: - response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS) + response = session.get(ICE_ROLLING_CSV_URL, timeout=HTTP_TIMEOUT_SECONDS) except Exception as e: - logger.error(f"Failed to download report from API URL: {e}") + logger.warning(f"Rolling CSV fetch failed: {e} — trying API discovery") + response = None + + use_api = response is None or response.status_code == 404 + + if use_api: + logger.info("Falling back to ICE API discovery for Daily Warehouse Stocks") + report = find_latest_report(session, ICE_STOCKS_LABEL) + if not report: + logger.error("ICE API: no 'Daily Warehouse Stocks' report found") + end_run(conn, run_id, status="failed", error_message="No report found via API") + return + logger.info(f"Found report via API: {report['download_label']} ({report['publish_date']})") + try: + response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS) + except Exception as e: + logger.error(f"Failed to download report from API URL: {e}") + end_run(conn, run_id, status="failed", error_message=str(e)) + return + + if response.status_code != 200: + logger.error(f"Unexpected status {response.status_code}") + end_run(conn, run_id, status="failed", error_message=f"HTTP {response.status_code}") return - if response.status_code != 200: - logger.error(f"Unexpected status {response.status_code}") + assert len(response.content) > 0, "Downloaded empty file from ICE" + + fmt = detect_file_format(response.content) + if fmt == "xls": + canonical_csv = _build_canonical_csv_from_xls(response.content) + else: + canonical_csv = _build_canonical_csv_from_csv(response.content) + + if not canonical_csv: + logger.warning("ICE stocks parsed to 0 rows — check column mapping or XLS structure") + end_run(conn, run_id, status="failed", error_message="Parsed 0 rows") return - assert len(response.content) > 0, "Downloaded empty file from ICE" - - fmt = detect_file_format(response.content) - if fmt == "xls": - canonical_csv = _build_canonical_csv_from_xls(response.content) - else: - canonical_csv = _build_canonical_csv_from_csv(response.content) - - if not canonical_csv: - logger.warning("ICE stocks parsed to 0 rows — check column mapping or XLS structure") - return - - today = datetime.now().strftime("%Y-%m-%d") - _write_landing_file(canonical_csv, DEST_SUBDIR, today) + today = datetime.now().strftime("%Y-%m-%d") + bytes_written = _write_landing_file(canonical_csv, DEST_SUBDIR, today) + end_run( + conn, run_id, status="success", + files_written=1 if bytes_written > 0 else 0, + files_skipped=1 if bytes_written == 0 else 0, + bytes_written=bytes_written, + cursor_value=today, + ) + except Exception as e: + end_run(conn, run_id, status="failed", error_message=str(e)) + raise + finally: + conn.close() # ── ice_aging (monthly aging report) ──────────────────────────────────────── @@ -309,65 +334,85 @@ def extract_ice_aging() -> None: Monthly report: stock quantities by age bucket × port. Idempotent: skips if content hash already on disk. """ - with niquests.Session() as session: - logger.info("Fetching latest ICE Aging Report via API") - report = find_latest_report(session, ICE_AGING_LABEL) - if not report: - logger.error(f"ICE API: no report matching {ICE_AGING_LABEL!r}") + conn = open_state_db(LANDING_DIR) + run_id = start_run(conn, "ice_aging") + try: + with niquests.Session() as session: + logger.info("Fetching latest ICE Aging Report via API") + report = find_latest_report(session, ICE_AGING_LABEL) + if not report: + logger.error(f"ICE API: no report matching {ICE_AGING_LABEL!r}") + end_run(conn, run_id, status="failed", error_message="No aging report found via API") + return + + logger.info(f"Downloading: {report['download_label']} ({report['publish_date']})") + try: + response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS) + except Exception as e: + logger.error(f"Failed to download aging report: {e}") + end_run(conn, run_id, status="failed", error_message=str(e)) + return + + assert response.status_code == 200, f"HTTP {response.status_code}" + assert response.content[:4] == OLE2_MAGIC, "Aging report is not an XLS file" + + rows = xls_to_rows(response.content) + + report_date = _parse_aging_date(str(rows[0][0]) if rows else "") + if not report_date: + msg = f"Could not parse aging report date from row 0: {rows[0] if rows else '(empty)'!r}" + logger.error(msg) + end_run(conn, run_id, status="failed", error_message=msg) return - logger.info(f"Downloading: {report['download_label']} ({report['publish_date']})") - try: - response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS) - except Exception as e: - logger.error(f"Failed to download aging report: {e}") - return + # Row 3+ are data rows; stop at row labelled "Total" + fieldnames = ["report_date", "age_bucket"] + AGING_PORT_HEADERS + data_rows = [] - assert response.status_code == 200, f"HTTP {response.status_code}" - assert response.content[:4] == OLE2_MAGIC, "Aging report is not an XLS file" + for row in rows[3:]: + if not row or not str(row[0]).strip(): + continue + label = str(row[0]).strip() + if label.lower() == "total": + break - rows = xls_to_rows(response.content) + port_values = [] + for cell in row[1:]: + if isinstance(cell, float): + port_values.append(str(int(cell))) + elif str(cell).strip() in ("-", ""): + port_values.append("0") + else: + port_values.append(str(cell).replace(",", "").strip()) - report_date = _parse_aging_date(str(rows[0][0]) if rows else "") - if not report_date: - logger.error(f"Could not parse aging report date from row 0: {rows[0] if rows else '(empty)'!r}") - return - - # Row 3+ are data rows; stop at row labelled "Total" - fieldnames = ["report_date", "age_bucket"] + AGING_PORT_HEADERS - data_rows = [] - - for row in rows[3:]: - if not row or not str(row[0]).strip(): - continue - label = str(row[0]).strip() - if label.lower() == "total": - break - - port_values = [] - for cell in row[1:]: - if isinstance(cell, float): - port_values.append(str(int(cell))) - elif str(cell).strip() in ("-", ""): + while len(port_values) < len(AGING_PORT_HEADERS): port_values.append("0") - else: - port_values.append(str(cell).replace(",", "").strip()) + port_values = port_values[:len(AGING_PORT_HEADERS)] - while len(port_values) < len(AGING_PORT_HEADERS): - port_values.append("0") - port_values = port_values[:len(AGING_PORT_HEADERS)] + record = {"report_date": report_date, "age_bucket": label} + for col, val in zip(AGING_PORT_HEADERS, port_values): + record[col] = val + data_rows.append(record) - record = {"report_date": report_date, "age_bucket": label} - for col, val in zip(AGING_PORT_HEADERS, port_values): - record[col] = val - data_rows.append(record) + if not data_rows: + logger.warning("Aging report parsed to 0 data rows") + end_run(conn, run_id, status="failed", error_message="Parsed 0 data rows") + return - if not data_rows: - logger.warning("Aging report parsed to 0 data rows") - return - - canonical_csv = _build_csv_bytes(fieldnames, data_rows) - _write_landing_file(canonical_csv, AGING_DEST_SUBDIR, report_date) + canonical_csv = _build_csv_bytes(fieldnames, data_rows) + bytes_written = _write_landing_file(canonical_csv, AGING_DEST_SUBDIR, report_date) + end_run( + conn, run_id, status="success", + files_written=1 if bytes_written > 0 else 0, + files_skipped=1 if bytes_written == 0 else 0, + bytes_written=bytes_written, + cursor_value=report_date, + ) + except Exception as e: + end_run(conn, run_id, status="failed", error_message=str(e)) + raise + finally: + conn.close() # ── ice_stocks_by_port (historical) ───────────────────────────────────────── @@ -387,63 +432,80 @@ def extract_ice_historical() -> None: Static URL updated monthly. Covers Nov 1996 to present. Idempotent: skips if content hash already on disk. """ - logger.info(f"Downloading ICE historical by-port XLS: {ICE_HISTORICAL_URL}") + conn = open_state_db(LANDING_DIR) + run_id = start_run(conn, "ice_historical") + try: + logger.info(f"Downloading ICE historical by-port XLS: {ICE_HISTORICAL_URL}") - with niquests.Session() as session: - try: - response = session.get(ICE_HISTORICAL_URL, timeout=HISTORICAL_HTTP_TIMEOUT_SECONDS) - except Exception as e: - logger.error(f"Failed to download historical XLS: {e}") + with niquests.Session() as session: + try: + response = session.get(ICE_HISTORICAL_URL, timeout=HISTORICAL_HTTP_TIMEOUT_SECONDS) + except Exception as e: + logger.error(f"Failed to download historical XLS: {e}") + end_run(conn, run_id, status="failed", error_message=str(e)) + return + + assert response.status_code == 200, f"HTTP {response.status_code}" + assert response.content[:4] == OLE2_MAGIC, "Historical file is not an XLS" + + book = xlrd.open_workbook(file_contents=response.content) + datemode = book.datemode + rows = xls_to_rows(response.content) + + # Data starts at row 8 (0-indexed); rows 0-7 are headers + fieldnames = ["report_date"] + HISTORICAL_PORT_COLS + data_rows = [] + + for row in rows[8:]: + if not row or len(row) < 2: + continue + + serial_cell = row[1] + if not isinstance(serial_cell, float) or serial_cell <= 0: + continue + + report_date = _excel_serial_to_date(serial_cell, datemode) + if not report_date: + continue + + port_cells = row[2:2 + len(HISTORICAL_PORT_COLS)] + port_values = [] + for cell in port_cells: + if cell == "" or str(cell).strip() in ("-", ""): + port_values.append("0") + elif isinstance(cell, float): + port_values.append(str(int(cell))) + else: + port_values.append(str(cell).replace(",", "").strip()) + + while len(port_values) < len(HISTORICAL_PORT_COLS): + port_values.append("0") + + record = {"report_date": report_date} + for col, val in zip(HISTORICAL_PORT_COLS, port_values): + record[col] = val + data_rows.append(record) + + if not data_rows: + logger.warning("Historical XLS parsed to 0 data rows") + end_run(conn, run_id, status="failed", error_message="Parsed 0 data rows") return - assert response.status_code == 200, f"HTTP {response.status_code}" - assert response.content[:4] == OLE2_MAGIC, "Historical file is not an XLS" - - book = xlrd.open_workbook(file_contents=response.content) - datemode = book.datemode - rows = xls_to_rows(response.content) - - # Data starts at row 8 (0-indexed); rows 0-7 are headers - fieldnames = ["report_date"] + HISTORICAL_PORT_COLS - data_rows = [] - - for row in rows[8:]: - if not row or len(row) < 2: - continue - - serial_cell = row[1] - if not isinstance(serial_cell, float) or serial_cell <= 0: - continue - - report_date = _excel_serial_to_date(serial_cell, datemode) - if not report_date: - continue - - port_cells = row[2:2 + len(HISTORICAL_PORT_COLS)] - port_values = [] - for cell in port_cells: - if cell == "" or str(cell).strip() in ("-", ""): - port_values.append("0") - elif isinstance(cell, float): - port_values.append(str(int(cell))) - else: - port_values.append(str(cell).replace(",", "").strip()) - - while len(port_values) < len(HISTORICAL_PORT_COLS): - port_values.append("0") - - record = {"report_date": report_date} - for col, val in zip(HISTORICAL_PORT_COLS, port_values): - record[col] = val - data_rows.append(record) - - if not data_rows: - logger.warning("Historical XLS parsed to 0 data rows") - return - - canonical_csv = _build_csv_bytes(fieldnames, data_rows) - today = datetime.now().strftime("%Y-%m-%d") - _write_landing_file(canonical_csv, HISTORICAL_DEST_SUBDIR, today) + canonical_csv = _build_csv_bytes(fieldnames, data_rows) + today = datetime.now().strftime("%Y-%m-%d") + bytes_written = _write_landing_file(canonical_csv, HISTORICAL_DEST_SUBDIR, today) + end_run( + conn, run_id, status="success", + files_written=1 if bytes_written > 0 else 0, + files_skipped=1 if bytes_written == 0 else 0, + bytes_written=bytes_written, + cursor_value=today, + ) + except Exception as e: + end_run(conn, run_id, status="failed", error_message=str(e)) + raise + finally: + conn.close() def extract_ice_stocks_backfill(max_pages: int = 3) -> None: @@ -458,50 +520,63 @@ def extract_ice_stocks_backfill(max_pages: int = 3) -> None: """ assert max_pages > 0, f"max_pages must be positive, got {max_pages}" - with niquests.Session() as session: - logger.info(f"Fetching all available Daily Warehouse Stocks reports (max {max_pages} pages)...") - reports = find_all_reports(session, ICE_STOCKS_LABEL, max_pages=max_pages) + conn = open_state_db(LANDING_DIR) + run_id = start_run(conn, "ice_stocks_backfill") + files_written = 0 + files_skipped = 0 + bytes_written_total = 0 + try: + with niquests.Session() as session: + logger.info(f"Fetching all available Daily Warehouse Stocks reports (max {max_pages} pages)...") + reports = find_all_reports(session, ICE_STOCKS_LABEL, max_pages=max_pages) - if not reports: - logger.error("ICE API: no 'Daily Warehouse Stocks' reports found") - return + if not reports: + logger.error("ICE API: no 'Daily Warehouse Stocks' reports found") + end_run(conn, run_id, status="failed", error_message="No reports found via API") + return - logger.info(f"Found {len(reports)} reports: {reports[-1]['publish_date']} → {reports[0]['publish_date']}") - downloaded = 0 - skipped = 0 + logger.info(f"Found {len(reports)} reports: {reports[-1]['publish_date']} → {reports[0]['publish_date']}") - for report in reports: - publish_date = report["publish_date"] - try: - response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS) - except Exception as e: - logger.warning(f"Failed to download {publish_date}: {e}") - continue + for report in reports: + publish_date = report["publish_date"] + try: + response = session.get(report["download_url"], timeout=HTTP_TIMEOUT_SECONDS) + except Exception as e: + logger.warning(f"Failed to download {publish_date}: {e}") + continue - if response.status_code != 200: - logger.warning(f"HTTP {response.status_code} for {publish_date} — skipping") - continue + if response.status_code != 200: + logger.warning(f"HTTP {response.status_code} for {publish_date} — skipping") + continue - fmt = detect_file_format(response.content) - if fmt == "xls": - canonical_csv = _build_canonical_csv_from_xls(response.content) - else: - canonical_csv = _build_canonical_csv_from_csv(response.content) + fmt = detect_file_format(response.content) + if fmt == "xls": + canonical_csv = _build_canonical_csv_from_xls(response.content) + else: + canonical_csv = _build_canonical_csv_from_csv(response.content) - if not canonical_csv: - logger.warning(f"Parsed 0 rows for {publish_date} — skipping") - continue + if not canonical_csv: + logger.warning(f"Parsed 0 rows for {publish_date} — skipping") + continue - # Use the report's publish date as the file date label - file_count_before = sum(1 for _ in (LANDING_DIR / DEST_SUBDIR).rglob("*.csv.gzip")) - _write_landing_file(canonical_csv, DEST_SUBDIR, publish_date) - file_count_after = sum(1 for _ in (LANDING_DIR / DEST_SUBDIR).rglob("*.csv.gzip")) - if file_count_after > file_count_before: - downloaded += 1 - else: - skipped += 1 + result = _write_landing_file(canonical_csv, DEST_SUBDIR, publish_date) + if result > 0: + files_written += 1 + bytes_written_total += result + else: + files_skipped += 1 - logger.info(f"Backfill complete: {downloaded} new files downloaded, {skipped} already existed") + logger.info(f"Backfill complete: {files_written} new files downloaded, {files_skipped} already existed") + end_run( + conn, run_id, status="success", + files_written=files_written, files_skipped=files_skipped, + bytes_written=bytes_written_total, + ) + except Exception as e: + end_run(conn, run_id, status="failed", error_message=str(e)) + raise + finally: + conn.close() def extract_ice_all() -> None: diff --git a/extract/psdonline/pyproject.toml b/extract/psdonline/pyproject.toml index 485b4e7..abb7958 100644 --- a/extract/psdonline/pyproject.toml +++ b/extract/psdonline/pyproject.toml @@ -8,6 +8,7 @@ authors = [ requires-python = ">=3.13" dependencies = [ + "extract_core", "niquests>=3.14.1", ] [project.scripts] diff --git a/extract/psdonline/src/psdonline/execute.py b/extract/psdonline/src/psdonline/execute.py index 65d7867..b4f7300 100644 --- a/extract/psdonline/src/psdonline/execute.py +++ b/extract/psdonline/src/psdonline/execute.py @@ -1,12 +1,14 @@ from .normalize import normalize_zipped_csv import logging import os -import pathlib import sys from datetime import datetime from io import BytesIO +from pathlib import Path import niquests +from extract_core import end_run, landing_path, normalize_etag, open_state_db, start_run +from extract_core import write_bytes_atomic logging.basicConfig( level=logging.INFO, @@ -16,7 +18,7 @@ logging.basicConfig( ) logger = logging.getLogger("PSDOnline Extractor") -LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) +LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing")) LANDING_DIR.mkdir(parents=True, exist_ok=True) logger.info(f"Landing dir: {LANDING_DIR}") @@ -27,61 +29,87 @@ FIRST_MONTH = 8 HTTP_TIMEOUT_SECONDS = 60 -def extract_psd_file(url: str, year: int, month: int, http_session: niquests.Session): - """Extract PSD file to local year/month subdirectory.""" +def extract_psd_file(url: str, year: int, month: int, http_session: niquests.Session) -> int: + """Extract PSD file to local year/month subdirectory. + + Returns bytes_written (0 if the file already existed and was skipped). + """ logger.info(f"Requesting file {url} ...") response = http_session.head(url, timeout=HTTP_TIMEOUT_SECONDS) if response.status_code == 404: logger.error("File doesn't exist on server, received status code 404 Not Found") - return + return 0 elif response.status_code != 200: logger.error(f"Status code not ok, STATUS={response.status_code}") - return + return 0 - etag = response.headers.get("etag", "").replace('"', "").replace(":", "_") - assert etag, "USDA response missing etag header" + raw_etag = response.headers.get("etag", "") + assert raw_etag, "USDA response missing etag header" + etag = normalize_etag(raw_etag) - extract_to_path = LANDING_DIR / "psd" / str(year) / f"{month:02d}" - local_file = extract_to_path / f"{etag}.csv.gzip" + dest_dir = landing_path(LANDING_DIR, "psd", str(year), f"{month:02d}") + local_file = dest_dir / f"{etag}.csv.gzip" if local_file.exists(): logger.info(f"File {etag}.csv.gzip already exists locally, skipping") - return + return 0 response = http_session.get(url, timeout=HTTP_TIMEOUT_SECONDS) logger.info(f"Storing file to {local_file}") - extract_to_path.mkdir(parents=True, exist_ok=True) normalized_content = normalize_zipped_csv(BytesIO(response.content)) - local_file.write_bytes(normalized_content.read()) - assert local_file.exists(), f"File was not written: {local_file}" + bytes_written = write_bytes_atomic(local_file, normalized_content.read()) logger.info("Download complete") + return bytes_written def extract_psd_dataset(): - today = datetime.now() + conn = open_state_db(LANDING_DIR) + run_id = start_run(conn, "psdonline") + files_written = 0 + files_skipped = 0 + bytes_written = 0 + cursor_value = None + try: + today = datetime.now() + with niquests.Session() as session: + for months_back in range(4): + year = today.year + month = today.month - months_back + while month < 1: + month += 12 + year -= 1 - with niquests.Session() as session: - for months_back in range(4): - year = today.year - month = today.month - months_back - while month < 1: - month += 12 - year -= 1 + url = PSD_HISTORICAL_URL.format(year=year, month=month) + logger.info(f"Trying {year}-{month:02d}...") - url = PSD_HISTORICAL_URL.format(year=year, month=month) - logger.info(f"Trying {year}-{month:02d}...") - - response = session.head(url, timeout=HTTP_TIMEOUT_SECONDS) - if response.status_code == 200: - logger.info(f"Found latest data at {year}-{month:02d}") - extract_psd_file(url=url, year=year, month=month, http_session=session) - return - elif response.status_code == 404: - logger.info(f"Month {year}-{month:02d} not found, trying earlier...") + response = session.head(url, timeout=HTTP_TIMEOUT_SECONDS) + if response.status_code == 200: + logger.info(f"Found latest data at {year}-{month:02d}") + result = extract_psd_file(url=url, year=year, month=month, http_session=session) + if result > 0: + files_written = 1 + bytes_written = result + else: + files_skipped = 1 + cursor_value = f"{year}-{month:02d}" + break + elif response.status_code == 404: + logger.info(f"Month {year}-{month:02d} not found, trying earlier...") + else: + logger.warning(f"Unexpected status code {response.status_code} for {year}-{month:02d}") else: - logger.warning(f"Unexpected status code {response.status_code} for {year}-{month:02d}") + logger.error("Could not find any available data in the last 4 months") - logger.error("Could not find any available data in the last 4 months") + end_run( + conn, run_id, status="success", + files_written=files_written, files_skipped=files_skipped, + bytes_written=bytes_written, cursor_value=cursor_value, + ) + except Exception as e: + end_run(conn, run_id, status="failed", error_message=str(e)) + raise + finally: + conn.close() if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index 5732b2f..dc265a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ dev = [ ] [tool.uv.sources] +extract_core = {workspace = true } psdonline = {workspace = true } sqlmesh_materia = {workspace = true } cftc_cot = {workspace = true } diff --git a/tests/test_cot_extraction.py b/tests/test_cot_extraction.py index 8c596e9..41896f4 100644 --- a/tests/test_cot_extraction.py +++ b/tests/test_cot_extraction.py @@ -123,7 +123,7 @@ def test_extract_cot_year_skips_existing_file(tmp_path, monkeypatch): result = cot_execute.extract_cot_year(2024, mock_session) - assert result is False + assert result == 0 mock_session.get.assert_not_called() # No download should occur @@ -143,5 +143,5 @@ def test_extract_cot_year_returns_false_on_404(tmp_path, monkeypatch): result = cot_execute.extract_cot_year(2006, mock_session) - assert result is False + assert result == 0 mock_session.get.assert_not_called() diff --git a/uv.lock b/uv.lock index e3cc4a0..1e8cbff 100644 --- a/uv.lock +++ b/uv.lock @@ -11,6 +11,7 @@ members = [ "beanflows", "cftc-cot", "coffee-prices", + "extract-core", "ice-stocks", "materia", "psdonline", @@ -365,11 +366,15 @@ name = "cftc-cot" version = "0.1.0" source = { editable = "extract/cftc_cot" } dependencies = [ + { name = "extract-core" }, { name = "niquests" }, ] [package.metadata] -requires-dist = [{ name = "niquests", specifier = ">=3.14.1" }] +requires-dist = [ + { name = "extract-core", editable = "extract/extract_core" }, + { name = "niquests", specifier = ">=3.14.1" }, +] [[package]] name = "charset-normalizer" @@ -438,11 +443,15 @@ name = "coffee-prices" version = "0.1.0" source = { editable = "extract/coffee_prices" } dependencies = [ + { name = "extract-core" }, { name = "yfinance" }, ] [package.metadata] -requires-dist = [{ name = "yfinance", specifier = ">=0.2.55" }] +requires-dist = [ + { name = "extract-core", editable = "extract/extract_core" }, + { name = "yfinance", specifier = ">=0.2.55" }, +] [[package]] name = "colorama" @@ -740,6 +749,17 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c1/ea/53f2148663b321f21b5a606bd5f191517cf40b7072c0497d3c92c4a13b1e/executing-2.2.1-py2.py3-none-any.whl", hash = "sha256:760643d3452b4d777d295bb167ccc74c64a81df23fb5e08eff250c425a4b2017", size = 28317, upload-time = "2025-09-01T09:48:08.5Z" }, ] +[[package]] +name = "extract-core" +version = "0.1.0" +source = { editable = "extract/extract_core" } +dependencies = [ + { name = "niquests" }, +] + +[package.metadata] +requires-dist = [{ name = "niquests", specifier = ">=3.14.1" }] + [[package]] name = "fakeredis" version = "2.34.0" @@ -1059,12 +1079,14 @@ name = "ice-stocks" version = "0.1.0" source = { editable = "extract/ice_stocks" } dependencies = [ + { name = "extract-core" }, { name = "niquests" }, { name = "xlrd" }, ] [package.metadata] requires-dist = [ + { name = "extract-core", editable = "extract/extract_core" }, { name = "niquests", specifier = ">=3.14.1" }, { name = "xlrd", specifier = ">=2.0.1" }, ] @@ -2067,11 +2089,15 @@ name = "psdonline" version = "0.1.0" source = { editable = "extract/psdonline" } dependencies = [ + { name = "extract-core" }, { name = "niquests" }, ] [package.metadata] -requires-dist = [{ name = "niquests", specifier = ">=3.14.1" }] +requires-dist = [ + { name = "extract-core", editable = "extract/extract_core" }, + { name = "niquests", specifier = ">=3.14.1" }, +] [[package]] name = "psutil"