From 67c048485b3c46b7805c5e06b25a12a1bbaa581d Mon Sep 17 00:00:00 2001 From: Deeman Date: Sat, 21 Feb 2026 11:41:43 +0100 Subject: [PATCH] Add Phase 1A-C + ICE warehouse stocks: prices, methodology, pipeline automation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1A — KC=F Coffee Futures Prices: - New extract/coffee_prices/ package (yfinance): downloads KC=F daily OHLCV, stores as gzip CSV with SHA256-based idempotency - SQLMesh models: raw/coffee_prices → foundation/fct_coffee_prices → serving/coffee_prices (with 20d/50d SMA, 52-week high/low, daily return %) - Dashboard: 4 metric cards + dual-line chart (close, 20d MA, 50d MA) - API: GET /commodities//prices Phase 1B — Data Methodology Page: - New /methodology route with full-page template (base.html) - 6 anchored sections: USDA PSD, CFTC COT, KC=F price, ICE warehouse stocks, data quality model, update schedule table - "Methodology" link added to marketing footer Phase 1C — Automated Pipeline: - supervisor.sh updated: runs extract_cot, extract_prices, extract_ice in sequence before transform - Webhook failure alerting via ALERT_WEBHOOK_URL env var (ntfy/Slack/Telegram) ICE Warehouse Stocks: - New extract/ice_stocks/ package (niquests): normalizes ICE Report Center CSV to canonical schema, hash-based idempotency, soft-fail on 404 with guidance - SQLMesh models: raw/ice_warehouse_stocks → foundation/fct_ice_warehouse_stocks → serving/ice_warehouse_stocks (30d avg, WoW change, 52w drawdown) - Dashboard: 4 metric cards + line chart (certified bags + 30d avg) - API: GET /commodities//stocks Foundation: - dim_commodity: added ticker (KC=F) and ice_stock_report_code (COFFEE-C) columns - macros/__init__.py: added prices_glob() and ice_stocks_glob() - pipelines.py: added extract_prices and extract_ice entries Co-Authored-By: Claude Sonnet 4.6 --- extract/coffee_prices/pyproject.toml | 18 ++ .../src/coffee_prices/__init__.py | 0 .../src/coffee_prices/execute.py | 92 +++++++ extract/ice_stocks/pyproject.toml | 18 ++ extract/ice_stocks/src/ice_stocks/__init__.py | 0 extract/ice_stocks/src/ice_stocks/execute.py | 173 +++++++++++++ infra/supervisor/supervisor.sh | 28 ++- pyproject.toml | 2 + src/materia/pipelines.py | 8 + transform/sqlmesh_materia/macros/__init__.py | 14 ++ .../models/foundation/dim_commodity.sql | 6 +- .../models/foundation/fct_coffee_prices.sql | 57 +++++ .../foundation/fct_ice_warehouse_stocks.sql | 47 ++++ .../models/raw/coffee_prices.sql | 46 ++++ .../models/raw/ice_warehouse_stocks.sql | 37 +++ .../models/serving/coffee_prices.sql | 77 ++++++ .../models/serving/ice_warehouse_stocks.sql | 78 ++++++ uv.lock | 114 +++++++++ web/src/beanflows/analytics.py | 73 ++++++ web/src/beanflows/api/routes.py | 49 ++++ web/src/beanflows/dashboard/routes.py | 17 +- .../beanflows/dashboard/templates/index.html | 165 +++++++++++++ web/src/beanflows/public/routes.py | 6 + .../public/templates/methodology.html | 230 ++++++++++++++++++ web/src/beanflows/templates/base.html | 1 + 25 files changed, 1350 insertions(+), 6 deletions(-) create mode 100644 extract/coffee_prices/pyproject.toml create mode 100644 extract/coffee_prices/src/coffee_prices/__init__.py create mode 100644 extract/coffee_prices/src/coffee_prices/execute.py create mode 100644 extract/ice_stocks/pyproject.toml create mode 100644 extract/ice_stocks/src/ice_stocks/__init__.py create mode 100644 extract/ice_stocks/src/ice_stocks/execute.py create mode 100644 transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql create mode 100644 transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks.sql create mode 100644 transform/sqlmesh_materia/models/raw/coffee_prices.sql create mode 100644 transform/sqlmesh_materia/models/raw/ice_warehouse_stocks.sql create mode 100644 transform/sqlmesh_materia/models/serving/coffee_prices.sql create mode 100644 transform/sqlmesh_materia/models/serving/ice_warehouse_stocks.sql create mode 100644 web/src/beanflows/public/templates/methodology.html diff --git a/extract/coffee_prices/pyproject.toml b/extract/coffee_prices/pyproject.toml new file mode 100644 index 0000000..34cd61e --- /dev/null +++ b/extract/coffee_prices/pyproject.toml @@ -0,0 +1,18 @@ +[project] +name = "coffee_prices" +version = "0.1.0" +description = "KC=F Coffee C futures price extractor" +requires-python = ">=3.13" +dependencies = [ + "yfinance>=0.2.55", +] + +[project.scripts] +extract_prices = "coffee_prices.execute:extract_coffee_prices" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/coffee_prices"] diff --git a/extract/coffee_prices/src/coffee_prices/__init__.py b/extract/coffee_prices/src/coffee_prices/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/extract/coffee_prices/src/coffee_prices/execute.py b/extract/coffee_prices/src/coffee_prices/execute.py new file mode 100644 index 0000000..1fa01d2 --- /dev/null +++ b/extract/coffee_prices/src/coffee_prices/execute.py @@ -0,0 +1,92 @@ +"""Coffee C (KC=F) futures price extraction. + +Downloads daily OHLCV data from Yahoo Finance via yfinance and stores as +gzip CSV in the landing directory. Uses SHA256 of CSV bytes as the +idempotency key — skips if a file with the same hash already exists. + +Landing path: LANDING_DIR/prices/coffee_kc/{hash8}.csv.gzip +""" + +import gzip +import hashlib +import io +import logging +import os +import pathlib +import sys + +import yfinance as yf + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger("Coffee Prices Extractor") + +LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) +TICKER = "KC=F" +DEST_SUBDIR = "prices/coffee_kc" + +# yfinance raises on network issues; give it enough time for the full history +DOWNLOAD_TIMEOUT_SECONDS = 120 + + +def extract_coffee_prices() -> None: + """Download KC=F daily OHLCV history and store as gzip CSV. + + Idempotent: computes SHA256 of CSV bytes, skips if already on disk. + On first run downloads full history (period='max'). On subsequent runs + the hash matches if no new trading days have closed since last run. + """ + logger.info(f"Downloading {TICKER} daily OHLCV from Yahoo Finance...") + + ticker = yf.Ticker(TICKER) + df = ticker.history(period="max", interval="1d", auto_adjust=False, timeout=DOWNLOAD_TIMEOUT_SECONDS) + + assert df is not None and len(df) > 0, f"yfinance returned empty DataFrame for {TICKER}" + + # Reset index so Date becomes a plain column + df = df.reset_index() + + # Keep standard OHLCV columns only; yfinance may return extra columns + keep_cols = [c for c in ["Date", "Open", "High", "Low", "Close", "Adj Close", "Volume"] if c in df.columns] + df = df[keep_cols] + + # Normalize Date to ISO string for CSV stability across timezones + df["Date"] = df["Date"].dt.strftime("%Y-%m-%d") + + # Serialize to CSV bytes + csv_buf = io.StringIO() + df.to_csv(csv_buf, index=False) + csv_bytes = csv_buf.getvalue().encode("utf-8") + + assert len(csv_bytes) > 0, "CSV serialization produced empty output" + + # Hash-based idempotency key (first 8 hex chars of SHA256) + sha256 = hashlib.sha256(csv_bytes).hexdigest() + etag = sha256[:8] + + dest_dir = LANDING_DIR / DEST_SUBDIR + local_file = dest_dir / f"{etag}.csv.gzip" + + if local_file.exists(): + logger.info(f"File {local_file.name} already exists — no new data, skipping") + return + + # Compress and write + dest_dir.mkdir(parents=True, exist_ok=True) + compressed = gzip.compress(csv_bytes) + local_file.write_bytes(compressed) + + assert local_file.exists(), f"File was not written: {local_file}" + assert local_file.stat().st_size > 0, f"Written file is empty: {local_file}" + + logger.info( + f"Stored {local_file} ({local_file.stat().st_size:,} bytes, {len(df):,} rows)" + ) + + +if __name__ == "__main__": + extract_coffee_prices() diff --git a/extract/ice_stocks/pyproject.toml b/extract/ice_stocks/pyproject.toml new file mode 100644 index 0000000..01fedb3 --- /dev/null +++ b/extract/ice_stocks/pyproject.toml @@ -0,0 +1,18 @@ +[project] +name = "ice_stocks" +version = "0.1.0" +description = "ICE certified warehouse stocks extractor" +requires-python = ">=3.13" +dependencies = [ + "niquests>=3.14.1", +] + +[project.scripts] +extract_ice = "ice_stocks.execute:extract_ice_stocks" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/ice_stocks"] diff --git a/extract/ice_stocks/src/ice_stocks/__init__.py b/extract/ice_stocks/src/ice_stocks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/extract/ice_stocks/src/ice_stocks/execute.py b/extract/ice_stocks/src/ice_stocks/execute.py new file mode 100644 index 0000000..e4dccb4 --- /dev/null +++ b/extract/ice_stocks/src/ice_stocks/execute.py @@ -0,0 +1,173 @@ +"""ICE certified Coffee C warehouse stock extraction. + +Downloads daily certified stock reports from the ICE Report Center and stores +as gzip CSV in the landing directory. Uses SHA256 of content as the +idempotency key — skips if a file with the same hash already exists. + +Landing path: LANDING_DIR/ice_stocks/{year}/{date}_{hash8}.csv.gzip + +CSV format produced (matching raw.ice_warehouse_stocks columns): + report_date,total_certified_bags,pending_grading_bags + +ICE Report Center URL discovery: + Visit https://www.theice.com/report-center and locate the + "Coffee C Warehouse Stocks" report. The download URL has the pattern: + https://www.theice.com/report-center/commodities/COFFEE/reports/... + Set ICE_STOCKS_URL environment variable to the discovered URL. +""" + +import csv +import gzip +import hashlib +import io +import logging +import os +import pathlib +import sys +from datetime import datetime + +import niquests + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger("ICE Stocks Extractor") + +LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) +DEST_SUBDIR = "ice_stocks" + +# ICE Report Center URL for Coffee C certified warehouse stocks. +# Discover by visiting https://www.theice.com/report-center and locating +# the Coffee C warehouse stocks CSV export. Override via environment variable. +ICE_STOCKS_URL = os.getenv( + "ICE_STOCKS_URL", + "https://www.theice.com/publicdocs/futures_us/exchange_notices/coffee_certifiedstocks.csv", +) + +HTTP_TIMEOUT_SECONDS = 60 + +# Expected column names from ICE CSV (may vary — adapt to actual column names) +# The ICE report typically has: Date, Certified Stocks (bags), Pending Grading (bags) +# We normalize to our canonical names. +COLUMN_MAPPINGS = { + # Possible ICE column name → our canonical name + "date": "report_date", + "report date": "report_date", + "Date": "report_date", + "certified stocks": "total_certified_bags", + "Certified Stocks": "total_certified_bags", + "certified stocks (bags)": "total_certified_bags", + "total certified": "total_certified_bags", + "pending grading": "pending_grading_bags", + "Pending Grading": "pending_grading_bags", + "pending grading (bags)": "pending_grading_bags", +} + + +def _normalize_row(row: dict) -> dict | None: + """Map raw ICE CSV columns to canonical schema. Returns None if date missing.""" + normalized = {} + for raw_key, value in row.items(): + canonical = COLUMN_MAPPINGS.get(raw_key.strip()) or COLUMN_MAPPINGS.get(raw_key.strip().lower()) + if canonical: + # Strip commas from numeric strings (ICE uses "1,234,567" format) + normalized[canonical] = value.strip().replace(",", "") if value else "" + + if "report_date" not in normalized or not normalized["report_date"]: + return None + + # Fill missing optional columns with empty string + normalized.setdefault("total_certified_bags", "") + normalized.setdefault("pending_grading_bags", "") + + return normalized + + +def _build_canonical_csv(raw_content: bytes) -> bytes: + """Parse ICE CSV and emit canonical CSV with our column schema.""" + text = raw_content.decode("utf-8", errors="replace") + reader = csv.DictReader(io.StringIO(text)) + + rows = [] + for row in reader: + normalized = _normalize_row(row) + if normalized: + rows.append(normalized) + + if not rows: + return b"" + + out = io.StringIO() + writer = csv.DictWriter(out, fieldnames=["report_date", "total_certified_bags", "pending_grading_bags"]) + writer.writeheader() + writer.writerows(rows) + return out.getvalue().encode("utf-8") + + +def extract_ice_stocks() -> None: + """Download ICE certified Coffee C warehouse stocks and store as gzip CSV. + + Idempotent: computes SHA256 of canonical CSV bytes, skips if already on disk. + The ICE report is a rolling file (same URL, updated daily) — we detect + changes via content hash. + """ + logger.info(f"Downloading ICE warehouse stocks from: {ICE_STOCKS_URL}") + + with niquests.Session() as session: + try: + response = session.get(ICE_STOCKS_URL, timeout=HTTP_TIMEOUT_SECONDS) + except Exception as e: + logger.error( + f"Failed to connect to ICE Report Center: {e}\n" + "If the URL has changed, set ICE_STOCKS_URL environment variable.\n" + "Visit https://www.theice.com/report-center to find the current URL." + ) + return + + if response.status_code == 404: + logger.warning( + "ICE stocks URL returned 404. The report URL may have changed.\n" + "Visit https://www.theice.com/report-center to find the current URL,\n" + "then set ICE_STOCKS_URL environment variable." + ) + return + + assert response.status_code == 200, ( + f"Unexpected status {response.status_code} from {ICE_STOCKS_URL}" + ) + assert len(response.content) > 0, "Downloaded empty file from ICE" + + canonical_csv = _build_canonical_csv(response.content) + if not canonical_csv: + logger.warning("ICE CSV parsed to 0 rows — column mapping may need updating") + return + + # Hash-based idempotency + sha256 = hashlib.sha256(canonical_csv).hexdigest() + etag = sha256[:8] + + today = datetime.now().strftime("%Y-%m-%d") + year = datetime.now().strftime("%Y") + + dest_dir = LANDING_DIR / DEST_SUBDIR / year + local_file = dest_dir / f"{today}_{etag}.csv.gzip" + + if local_file.exists(): + logger.info(f"File {local_file.name} already exists — content unchanged, skipping") + return + + dest_dir.mkdir(parents=True, exist_ok=True) + compressed = gzip.compress(canonical_csv) + local_file.write_bytes(compressed) + + assert local_file.exists(), f"File was not written: {local_file}" + assert local_file.stat().st_size > 0, f"Written file is empty: {local_file}" + + logger.info(f"Stored {local_file} ({local_file.stat().st_size:,} bytes)") + + +if __name__ == "__main__": + extract_ice_stocks() diff --git a/infra/supervisor/supervisor.sh b/infra/supervisor/supervisor.sh index 26481c3..701fab4 100644 --- a/infra/supervisor/supervisor.sh +++ b/infra/supervisor/supervisor.sh @@ -2,6 +2,11 @@ # Materia Supervisor - Continuous pipeline orchestration # Inspired by TigerBeetle's CFO supervisor: simple, resilient, easy to understand # https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh +# +# Environment variables (set in systemd EnvironmentFile): +# LANDING_DIR — local path for extracted landing data +# DUCKDB_PATH — path to DuckDB lakehouse file +# ALERT_WEBHOOK_URL — optional ntfy.sh / Slack / Telegram webhook for failure alerts set -eu @@ -24,14 +29,33 @@ do git switch --discard-changes --detach origin/master uv sync - # Run pipelines + # Extract all data sources LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ uv run materia pipeline run extract + LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ + DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ + uv run materia pipeline run extract_cot + + LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ + DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ + uv run materia pipeline run extract_prices + + LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ + DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ + uv run materia pipeline run extract_ice + + # Transform all data sources LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ uv run materia pipeline run transform - ) || sleep 600 # Sleep 10 min on failure to avoid busy-loop retries + ) || { + # Notify on failure if webhook is configured, then sleep to avoid busy-loop + if [ -n "${ALERT_WEBHOOK_URL:-}" ]; then + curl -s -d "Materia pipeline failed at $(date)" "$ALERT_WEBHOOK_URL" 2>/dev/null || true + fi + sleep 600 # Sleep 10 min on failure + } done diff --git a/pyproject.toml b/pyproject.toml index 858f4c4..e09e923 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,8 @@ dev = [ psdonline = {workspace = true } sqlmesh_materia = {workspace = true } cftc_cot = {workspace = true } +coffee_prices = {workspace = true } +ice_stocks = {workspace = true } [tool.uv.workspace] members = [ diff --git a/src/materia/pipelines.py b/src/materia/pipelines.py index 2750082..1cfe51a 100644 --- a/src/materia/pipelines.py +++ b/src/materia/pipelines.py @@ -20,6 +20,14 @@ PIPELINES = { "command": ["uv", "run", "--package", "cftc_cot", "extract_cot"], "timeout_seconds": 1800, }, + "extract_prices": { + "command": ["uv", "run", "--package", "coffee_prices", "extract_prices"], + "timeout_seconds": 300, + }, + "extract_ice": { + "command": ["uv", "run", "--package", "ice_stocks", "extract_ice"], + "timeout_seconds": 600, + }, "transform": { "command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"], "timeout_seconds": 3600, diff --git a/transform/sqlmesh_materia/macros/__init__.py b/transform/sqlmesh_materia/macros/__init__.py index 7201019..7d6aa61 100644 --- a/transform/sqlmesh_materia/macros/__init__.py +++ b/transform/sqlmesh_materia/macros/__init__.py @@ -15,3 +15,17 @@ def cot_glob(evaluator) -> str: """Return a quoted glob path for all COT CSV gzip files under LANDING_DIR.""" landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") return f"'{landing_dir}/cot/**/*.csv.gzip'" + + +@macro() +def prices_glob(evaluator) -> str: + """Return a quoted glob path for all coffee price CSV gzip files under LANDING_DIR.""" + landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") + return f"'{landing_dir}/prices/coffee_kc/**/*.csv.gzip'" + + +@macro() +def ice_stocks_glob(evaluator) -> str: + """Return a quoted glob path for all ICE warehouse stock CSV gzip files under LANDING_DIR.""" + landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") + return f"'{landing_dir}/ice_stocks/**/*.csv.gzip'" diff --git a/transform/sqlmesh_materia/models/foundation/dim_commodity.sql b/transform/sqlmesh_materia/models/foundation/dim_commodity.sql index c6747fa..63233b0 100644 --- a/transform/sqlmesh_materia/models/foundation/dim_commodity.sql +++ b/transform/sqlmesh_materia/models/foundation/dim_commodity.sql @@ -17,7 +17,7 @@ MODEL ( kind FULL ); -SELECT usda_commodity_code, cftc_commodity_code, commodity_name, commodity_group +SELECT usda_commodity_code, cftc_commodity_code, ticker, ice_stock_report_code, commodity_name, commodity_group FROM (VALUES - ('0711100', '083', 'Coffee, Green', 'Softs') -) AS t(usda_commodity_code, cftc_commodity_code, commodity_name, commodity_group) + ('0711100', '083', 'KC=F', 'COFFEE-C', 'Coffee, Green', 'Softs') +) AS t(usda_commodity_code, cftc_commodity_code, ticker, ice_stock_report_code, commodity_name, commodity_group) diff --git a/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql b/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql new file mode 100644 index 0000000..f9d4158 --- /dev/null +++ b/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql @@ -0,0 +1,57 @@ +-- Foundation fact: daily KC=F Coffee C futures prices. +-- +-- Casts raw varchar columns to proper types and deduplicates via hash key. +-- Covers all available history from the landing directory. +-- +-- Grain: one row per trade_date. +-- Dedup: hash of (trade_date, close) — if Yahoo Finance corrects a price, +-- the new hash triggers a re-ingest on the next incremental run. + +MODEL ( + name foundation.fct_coffee_prices, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column trade_date + ), + grain (trade_date), + start '1971-08-16', + cron '@daily' +); + +WITH cast_and_clean AS ( + SELECT + TRY_CAST(Date AS date) AS trade_date, + TRY_CAST(Open AS double) AS open, + TRY_CAST(High AS double) AS high, + TRY_CAST(Low AS double) AS low, + TRY_CAST(Close AS double) AS close, + TRY_CAST(Adj_Close AS double) AS adj_close, + TRY_CAST(Volume AS bigint) AS volume, + + -- Filename encodes the content hash — use as ingest identifier + filename AS source_file, + + -- Dedup key: trade date + close price + hash(Date, Close) AS hkey + FROM raw.coffee_prices + WHERE TRY_CAST(Date AS date) IS NOT NULL + AND TRY_CAST(Close AS double) IS NOT NULL +), + +deduplicated AS ( + SELECT + any_value(trade_date) AS trade_date, + any_value(open) AS open, + any_value(high) AS high, + any_value(low) AS low, + any_value(close) AS close, + any_value(adj_close) AS adj_close, + any_value(volume) AS volume, + any_value(source_file) AS source_file, + hkey + FROM cast_and_clean + GROUP BY hkey +) + +SELECT * +FROM deduplicated +WHERE trade_date BETWEEN @start_ds AND @end_ds diff --git a/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks.sql b/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks.sql new file mode 100644 index 0000000..6b9cef7 --- /dev/null +++ b/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks.sql @@ -0,0 +1,47 @@ +-- Foundation fact: ICE certified Coffee C (Arabica) warehouse stocks. +-- +-- Casts raw varchar columns to proper types and deduplicates via hash key. +-- "Certified" means Coffee C graded and stamped as delivery-eligible +-- against ICE futures contracts — a key physical supply indicator. +-- +-- Grain: one row per report_date. + +MODEL ( + name foundation.fct_ice_warehouse_stocks, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column report_date + ), + grain (report_date), + start '2000-01-01', + cron '@daily' +); + +WITH cast_and_clean AS ( + SELECT + TRY_CAST(report_date AS date) AS report_date, + TRY_CAST(total_certified_bags AS bigint) AS total_certified_bags, + TRY_CAST(pending_grading_bags AS bigint) AS pending_grading_bags, + + filename AS source_file, + + -- Dedup key: report date + total bags + hash(report_date, total_certified_bags) AS hkey + FROM raw.ice_warehouse_stocks + WHERE TRY_CAST(report_date AS date) IS NOT NULL + AND TRY_CAST(total_certified_bags AS bigint) IS NOT NULL +), + +deduplicated AS ( + SELECT + any_value(report_date) AS report_date, + any_value(total_certified_bags) AS total_certified_bags, + any_value(pending_grading_bags) AS pending_grading_bags, + any_value(source_file) AS source_file, + hkey + FROM cast_and_clean + GROUP BY hkey +) + +SELECT * +FROM deduplicated +WHERE report_date BETWEEN @start_ds AND @end_ds diff --git a/transform/sqlmesh_materia/models/raw/coffee_prices.sql b/transform/sqlmesh_materia/models/raw/coffee_prices.sql new file mode 100644 index 0000000..044dd92 --- /dev/null +++ b/transform/sqlmesh_materia/models/raw/coffee_prices.sql @@ -0,0 +1,46 @@ +-- Raw KC=F Coffee C futures prices — technical ingestion layer. +-- +-- Reads daily OHLCV gzip CSVs from the landing directory. All values are +-- varchar; casting happens in foundation.fct_coffee_prices. +-- +-- Source: Yahoo Finance via yfinance (KC=F ticker) +-- Coverage: 1971-present (historical futures data) +-- Frequency: daily (trading days only) + +MODEL ( + name raw.coffee_prices, + kind FULL, + grain (Date), + cron '@daily', + columns ( + Date varchar, + Open varchar, + High varchar, + Low varchar, + Close varchar, + Adj_Close varchar, + Volume varchar, + filename varchar + ) +); + +SELECT + "Date" AS Date, + "Open" AS Open, + "High" AS High, + "Low" AS Low, + "Close" AS Close, + "Adj Close" AS Adj_Close, + "Volume" AS Volume, + filename +FROM read_csv( + @prices_glob(), + delim = ',', + encoding = 'utf-8', + compression = 'gzip', + header = true, + union_by_name = true, + filename = true, + all_varchar = true, + ignore_errors = true +) diff --git a/transform/sqlmesh_materia/models/raw/ice_warehouse_stocks.sql b/transform/sqlmesh_materia/models/raw/ice_warehouse_stocks.sql new file mode 100644 index 0000000..99d0882 --- /dev/null +++ b/transform/sqlmesh_materia/models/raw/ice_warehouse_stocks.sql @@ -0,0 +1,37 @@ +-- Raw ICE certified warehouse stocks — technical ingestion layer. +-- +-- Reads daily stock report gzip CSVs from the landing directory. +-- All values are varchar; casting happens in foundation.fct_ice_warehouse_stocks. +-- +-- Source: ICE Report Center (Coffee C certified warehouse stocks) +-- Coverage: varies by download history +-- Frequency: daily (ICE updates after market close) + +MODEL ( + name raw.ice_warehouse_stocks, + kind FULL, + cron '@daily', + columns ( + report_date varchar, + total_certified_bags varchar, + pending_grading_bags varchar, + filename varchar + ) +); + +SELECT + report_date, + total_certified_bags, + pending_grading_bags, + filename +FROM read_csv( + @ice_stocks_glob(), + delim = ',', + encoding = 'utf-8', + compression = 'gzip', + header = true, + union_by_name = true, + filename = true, + all_varchar = true, + ignore_errors = true +) diff --git a/transform/sqlmesh_materia/models/serving/coffee_prices.sql b/transform/sqlmesh_materia/models/serving/coffee_prices.sql new file mode 100644 index 0000000..ef1a9cd --- /dev/null +++ b/transform/sqlmesh_materia/models/serving/coffee_prices.sql @@ -0,0 +1,77 @@ +-- Serving mart: KC=F Coffee C futures prices, analytics-ready. +-- +-- Adds moving averages (20-day, 50-day SMA) and 52-week high/low range. +-- Filtered to trading days only (NULL close rows excluded upstream). +-- +-- Grain: one row per trade_date. + +MODEL ( + name serving.coffee_prices, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column trade_date + ), + grain (trade_date), + start '1971-08-16', + cron '@daily' +); + +WITH base AS ( + SELECT + f.trade_date, + f.open, + f.high, + f.low, + f.close, + f.adj_close, + f.volume, + + -- Daily return: (close - prev_close) / prev_close * 100 + round( + (f.close - LAG(f.close, 1) OVER (ORDER BY f.trade_date)) + / NULLIF(LAG(f.close, 1) OVER (ORDER BY f.trade_date), 0) * 100, + 4 + ) AS daily_return_pct, + + -- 20-day simple moving average (1 trading month) + round( + AVG(f.close) OVER (ORDER BY f.trade_date ROWS BETWEEN 19 PRECEDING AND CURRENT ROW), + 4 + ) AS sma_20d, + + -- 50-day simple moving average (2.5 trading months) + round( + AVG(f.close) OVER (ORDER BY f.trade_date ROWS BETWEEN 49 PRECEDING AND CURRENT ROW), + 4 + ) AS sma_50d, + + -- 52-week high (approximately 252 trading days) + MAX(f.high) OVER (ORDER BY f.trade_date ROWS BETWEEN 251 PRECEDING AND CURRENT ROW) + AS high_52w, + + -- 52-week low + MIN(f.low) OVER (ORDER BY f.trade_date ROWS BETWEEN 251 PRECEDING AND CURRENT ROW) + AS low_52w + + FROM foundation.fct_coffee_prices f + WHERE f.trade_date BETWEEN @start_ds AND @end_ds +) + +SELECT + b.trade_date, + d.commodity_name, + d.ticker, + b.open, + b.high, + b.low, + b.close, + b.adj_close, + b.volume, + b.daily_return_pct, + b.sma_20d, + b.sma_50d, + b.high_52w, + b.low_52w +FROM base b +CROSS JOIN foundation.dim_commodity d +WHERE d.ticker = 'KC=F' +ORDER BY b.trade_date diff --git a/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks.sql b/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks.sql new file mode 100644 index 0000000..4b5b552 --- /dev/null +++ b/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks.sql @@ -0,0 +1,78 @@ +-- Serving mart: ICE certified Coffee C warehouse stocks, analytics-ready. +-- +-- Adds 30-day rolling average, week-over-week change, and drawdown from +-- 52-week high. Physical supply indicator used alongside S/D and positioning. +-- +-- "Certified stocks" = coffee graded and stamped as eligible for delivery +-- against ICE Coffee C futures — traders watch this as a squeeze indicator. +-- +-- Grain: one row per report_date. + +MODEL ( + name serving.ice_warehouse_stocks, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column report_date + ), + grain (report_date), + start '2000-01-01', + cron '@daily' +); + +WITH base AS ( + SELECT + f.report_date, + f.total_certified_bags, + f.pending_grading_bags, + + -- Week-over-week change (compare to 7 calendar days ago via LAG over ordered rows) + -- Using LAG(1) since data is daily: compares to previous trading/reporting day + f.total_certified_bags + - LAG(f.total_certified_bags, 1) OVER (ORDER BY f.report_date) AS wow_change_bags, + + -- 30-day rolling average (smooths daily noise) + round( + AVG(f.total_certified_bags::double) OVER ( + ORDER BY f.report_date ROWS BETWEEN 29 PRECEDING AND CURRENT ROW + ), + 0 + ) AS avg_30d_bags, + + -- 52-week high (365 calendar days ≈ 252 trading days; use 365-row window as proxy) + MAX(f.total_certified_bags) OVER ( + ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW + ) AS high_52w_bags, + + -- Drawdown from 52-week high (pct below peak — squeeze indicator) + round( + (f.total_certified_bags::double + - MAX(f.total_certified_bags) OVER ( + ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW + )::double + ) + / NULLIF( + MAX(f.total_certified_bags) OVER ( + ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW + )::double, + 0 + ) * 100, + 2 + ) AS drawdown_from_52w_high_pct + + FROM foundation.fct_ice_warehouse_stocks f + WHERE f.report_date BETWEEN @start_ds AND @end_ds +) + +SELECT + b.report_date, + d.commodity_name, + d.ice_stock_report_code, + b.total_certified_bags, + b.pending_grading_bags, + b.wow_change_bags, + b.avg_30d_bags, + b.high_52w_bags, + b.drawdown_from_52w_high_pct +FROM base b +CROSS JOIN foundation.dim_commodity d +WHERE d.ice_stock_report_code = 'COFFEE-C' +ORDER BY b.report_date diff --git a/uv.lock b/uv.lock index 14c020d..b3751f0 100644 --- a/uv.lock +++ b/uv.lock @@ -10,6 +10,8 @@ resolution-markers = [ members = [ "beanflows", "cftc-cot", + "coffee-prices", + "ice-stocks", "materia", "psdonline", "sqlmesh-materia", @@ -251,6 +253,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/71/cc/18245721fa7747065ab478316c7fea7c74777d07f37ae60db2e84f8172e8/beartype-0.22.9-py3-none-any.whl", hash = "sha256:d16c9bbc61ea14637596c5f6fbff2ee99cbe3573e46a716401734ef50c3060c2", size = 1333658, upload-time = "2025-12-13T06:50:28.266Z" }, ] +[[package]] +name = "beautifulsoup4" +version = "4.14.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "soupsieve" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c3/b0/1c6a16426d389813b48d95e26898aff79abbde42ad353958ad95cc8c9b21/beautifulsoup4-4.14.3.tar.gz", hash = "sha256:6292b1c5186d356bba669ef9f7f051757099565ad9ada5dd630bd9de5fa7fb86", size = 627737, upload-time = "2025-11-30T15:08:26.084Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1a/39/47f9197bdd44df24d67ac8893641e16f386c984a0619ef2ee4c51fbbc019/beautifulsoup4-4.14.3-py3-none-any.whl", hash = "sha256:0918bfe44902e6ad8d57732ba310582e98da931428d231a5ecb9e7c703a735bb", size = 107721, upload-time = "2025-11-30T15:08:24.087Z" }, +] + [[package]] name = "blinker" version = "1.9.0" @@ -418,6 +433,17 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/88/39/799be3f2f0f38cc727ee3b4f1445fe6d5e4133064ec2e4115069418a5bb6/cloudpickle-3.1.2-py3-none-any.whl", hash = "sha256:9acb47f6afd73f60dc1df93bb801b472f05ff42fa6c84167d25cb206be1fbf4a", size = 22228, upload-time = "2025-11-03T09:25:25.534Z" }, ] +[[package]] +name = "coffee-prices" +version = "0.1.0" +source = { editable = "extract/coffee_prices" } +dependencies = [ + { name = "yfinance" }, +] + +[package.metadata] +requires-dist = [{ name = "yfinance", specifier = ">=0.2.55" }] + [[package]] name = "colorama" version = "0.4.6" @@ -580,6 +606,27 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/48/ef/0c2f4a8e31018a986949d34a01115dd057bf536905dca38897bacd21fac3/cryptography-46.0.5-cp38-abi3-win_amd64.whl", hash = "sha256:556e106ee01aa13484ce9b0239bca667be5004efb0aabbed28d353df86445595", size = 3467050, upload-time = "2026-02-10T19:18:18.899Z" }, ] +[[package]] +name = "curl-cffi" +version = "0.13.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "cffi" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4e/3d/f39ca1f8fdf14408888e7c25e15eed63eac5f47926e206fb93300d28378c/curl_cffi-0.13.0.tar.gz", hash = "sha256:62ecd90a382bd5023750e3606e0aa7cb1a3a8ba41c14270b8e5e149ebf72c5ca", size = 151303, upload-time = "2025-08-06T13:05:42.988Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/19/d1/acabfd460f1de26cad882e5ef344d9adde1507034528cb6f5698a2e6a2f1/curl_cffi-0.13.0-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:434cadbe8df2f08b2fc2c16dff2779fb40b984af99c06aa700af898e185bb9db", size = 5686337, upload-time = "2025-08-06T13:05:28.985Z" }, + { url = "https://files.pythonhosted.org/packages/2c/1c/cdb4fb2d16a0e9de068e0e5bc02094e105ce58a687ff30b4c6f88e25a057/curl_cffi-0.13.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:59afa877a9ae09efa04646a7d068eeea48915a95d9add0a29854e7781679fcd7", size = 2994613, upload-time = "2025-08-06T13:05:31.027Z" }, + { url = "https://files.pythonhosted.org/packages/04/3e/fdf617c1ec18c3038b77065d484d7517bb30f8fb8847224eb1f601a4e8bc/curl_cffi-0.13.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d06ed389e45a7ca97b17c275dbedd3d6524560270e675c720e93a2018a766076", size = 7931353, upload-time = "2025-08-06T13:05:32.273Z" }, + { url = "https://files.pythonhosted.org/packages/3d/10/6f30c05d251cf03ddc2b9fd19880f3cab8c193255e733444a2df03b18944/curl_cffi-0.13.0-cp39-abi3-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b4e0de45ab3b7a835c72bd53640c2347415111b43421b5c7a1a0b18deae2e541", size = 7486378, upload-time = "2025-08-06T13:05:33.672Z" }, + { url = "https://files.pythonhosted.org/packages/77/81/5bdb7dd0d669a817397b2e92193559bf66c3807f5848a48ad10cf02bf6c7/curl_cffi-0.13.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8eb4083371bbb94e9470d782de235fb5268bf43520de020c9e5e6be8f395443f", size = 8328585, upload-time = "2025-08-06T13:05:35.28Z" }, + { url = "https://files.pythonhosted.org/packages/ce/c1/df5c6b4cfad41c08442e0f727e449f4fb5a05f8aa564d1acac29062e9e8e/curl_cffi-0.13.0-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:28911b526e8cd4aa0e5e38401bfe6887e8093907272f1f67ca22e6beb2933a51", size = 8739831, upload-time = "2025-08-06T13:05:37.078Z" }, + { url = "https://files.pythonhosted.org/packages/1a/91/6dd1910a212f2e8eafe57877bcf97748eb24849e1511a266687546066b8a/curl_cffi-0.13.0-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:6d433ffcb455ab01dd0d7bde47109083aa38b59863aa183d29c668ae4c96bf8e", size = 8711908, upload-time = "2025-08-06T13:05:38.741Z" }, + { url = "https://files.pythonhosted.org/packages/6d/e4/15a253f9b4bf8d008c31e176c162d2704a7e0c5e24d35942f759df107b68/curl_cffi-0.13.0-cp39-abi3-win_amd64.whl", hash = "sha256:66a6b75ce971de9af64f1b6812e275f60b88880577bac47ef1fa19694fa21cd3", size = 1614510, upload-time = "2025-08-06T13:05:40.451Z" }, + { url = "https://files.pythonhosted.org/packages/f9/0f/9c5275f17ad6ff5be70edb8e0120fdc184a658c9577ca426d4230f654beb/curl_cffi-0.13.0-cp39-abi3-win_arm64.whl", hash = "sha256:d438a3b45244e874794bc4081dc1e356d2bb926dcc7021e5a8fef2e2105ef1d8", size = 1365753, upload-time = "2025-08-06T13:05:41.879Z" }, +] + [[package]] name = "dateparser" version = "1.2.1" @@ -752,6 +799,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7f/9c/34f6962f9b9e9c71f6e5ed806e0d0ff03c9d1b0b2340088a0cf4bce09b18/flask-3.1.3-py3-none-any.whl", hash = "sha256:f4bcbefc124291925f1a26446da31a5178f9483862233b23c0c96a20701f670c", size = 103424, upload-time = "2026-02-19T05:00:56.027Z" }, ] +[[package]] +name = "frozendict" +version = "2.4.7" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/90/b2/2a3d1374b7780999d3184e171e25439a8358c47b481f68be883c14086b4c/frozendict-2.4.7.tar.gz", hash = "sha256:e478fb2a1391a56c8a6e10cc97c4a9002b410ecd1ac28c18d780661762e271bd", size = 317082, upload-time = "2025-11-11T22:40:14.251Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/38/74/f94141b38a51a553efef7f510fc213894161ae49b88bffd037f8d2a7cb2f/frozendict-2.4.7-py3-none-any.whl", hash = "sha256:972af65924ea25cf5b4d9326d549e69a9a4918d8a76a9d3a7cd174d98b237550", size = 16264, upload-time = "2025-11-11T22:40:12.836Z" }, +] + [[package]] name = "fsspec" version = "2026.2.0" @@ -998,6 +1054,17 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c4/f7/5cc291d701094754a1d327b44d80a44971e13962881d9a400235726171da/hypothesis-6.151.9-py3-none-any.whl", hash = "sha256:7b7220585c67759b1b1ef839b1e6e9e3d82ed468cfc1ece43c67184848d7edd9", size = 529307, upload-time = "2026-02-16T22:59:20.443Z" }, ] +[[package]] +name = "ice-stocks" +version = "0.1.0" +source = { editable = "extract/ice_stocks" } +dependencies = [ + { name = "niquests" }, +] + +[package.metadata] +requires-dist = [{ name = "niquests", specifier = ">=3.14.1" }] + [[package]] name = "identify" version = "2.6.16" @@ -1539,6 +1606,12 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" }, ] +[[package]] +name = "multitasking" +version = "0.0.12" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/17/0d/74f0293dfd7dcc3837746d0138cbedd60b31701ecc75caec7d3f281feba0/multitasking-0.0.12.tar.gz", hash = "sha256:2fba2fa8ed8c4b85e227c5dd7dc41c7d658de3b6f247927316175a57349b84d1", size = 19984, upload-time = "2025-07-20T21:27:51.636Z" } + [[package]] name = "nest-asyncio" version = "1.6.0" @@ -1774,6 +1847,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ef/3c/2c197d226f9ea224a9ab8d197933f9da0ae0aac5b6e0f884e2b8d9c8e9f7/pathspec-1.0.4-py3-none-any.whl", hash = "sha256:fb6ae2fd4e7c921a165808a552060e722767cfa526f99ca5156ed2ce45a5c723", size = 55206, upload-time = "2026-01-27T03:59:45.137Z" }, ] +[[package]] +name = "peewee" +version = "4.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/37/e3/98ed8ab20f26d429f61b3d5d455c52ac88ba343444fbcf7154374111eb3e/peewee-4.0.0.tar.gz", hash = "sha256:bc2722abf32a8074362c346fc8a95f2d34a9587873e81025b6429676c32044b6", size = 686951, upload-time = "2026-02-20T15:38:50.312Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/39/40/89664cce41f4bedab105d705885cbb152d7bd85ce0facbf0ec02e90eb02a/peewee-4.0.0-py3-none-any.whl", hash = "sha256:6de14ff11ab50c3152dc1d4e12628c1b28c1e03ff4e4213e463429bfcd7340b6", size = 139317, upload-time = "2026-02-20T15:38:48.519Z" }, +] + [[package]] name = "pexpect" version = "4.9.0" @@ -2866,6 +2948,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/32/46/9cb0e58b2deb7f82b84065f37f3bffeb12413f947f9388e4cac22c4621ce/sortedcontainers-2.4.0-py2.py3-none-any.whl", hash = "sha256:a163dcaede0f1c021485e957a39245190e74249897e2ae4b2aa38595db237ee0", size = 29575, upload-time = "2021-05-16T22:03:41.177Z" }, ] +[[package]] +name = "soupsieve" +version = "2.8.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/7b/ae/2d9c981590ed9999a0d91755b47fc74f74de286b0f5cee14c9269041e6c4/soupsieve-2.8.3.tar.gz", hash = "sha256:3267f1eeea4251fb42728b6dfb746edc9acaffc4a45b27e19450b676586e8349", size = 118627, upload-time = "2026-01-20T04:27:02.457Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/46/2c/1462b1d0a634697ae9e55b3cecdcb64788e8b7d63f54d923fcd0bb140aed/soupsieve-2.8.3-py3-none-any.whl", hash = "sha256:ed64f2ba4eebeab06cc4962affce381647455978ffc1e36bb79a545b91f45a95", size = 37016, upload-time = "2026-01-20T04:27:01.012Z" }, +] + [[package]] name = "sqlalchemy" version = "2.0.46" @@ -3466,6 +3557,29 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a4/f5/10b68b7b1544245097b2a1b8238f66f2fc6dcaeb24ba5d917f52bd2eed4f/wsproto-1.3.2-py3-none-any.whl", hash = "sha256:61eea322cdf56e8cc904bd3ad7573359a242ba65688716b0710a5eb12beab584", size = 24405, upload-time = "2025-11-20T18:18:00.454Z" }, ] +[[package]] +name = "yfinance" +version = "1.2.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "beautifulsoup4" }, + { name = "curl-cffi" }, + { name = "frozendict" }, + { name = "multitasking" }, + { name = "numpy" }, + { name = "pandas" }, + { name = "peewee" }, + { name = "platformdirs" }, + { name = "protobuf" }, + { name = "pytz" }, + { name = "requests" }, + { name = "websockets" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c9/1b/431d0ebd6a1e9deaffc8627cc4d26fd869841f31a1429cab7443eced0766/yfinance-1.2.0.tar.gz", hash = "sha256:80cec643eb983330ca63debab1b5492334fa1e6338d82cb17dd4e7b95079cfab", size = 140501, upload-time = "2026-02-16T19:52:34.368Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1e/60/462859de757ac56830824da7e8cf314b8b0321af5853df867c84cd6c2128/yfinance-1.2.0-py2.py3-none-any.whl", hash = "sha256:1c27d1ebfc6275f476721cc6dba035a49d0cf9a806d6aa1785c9e10cf8a610d8", size = 130247, upload-time = "2026-02-16T19:52:33.109Z" }, +] + [[package]] name = "zipp" version = "3.23.0" diff --git a/web/src/beanflows/analytics.py b/web/src/beanflows/analytics.py index be11b3e..35d322b 100644 --- a/web/src/beanflows/analytics.py +++ b/web/src/beanflows/analytics.py @@ -316,6 +316,79 @@ async def get_cot_index_trend( ) +# ============================================================================= +# Coffee Prices Queries +# ============================================================================= + +# KC=F Yahoo Finance ticker +COFFEE_TICKER = "KC=F" + + +async def get_price_time_series(ticker: str, limit: int = 504) -> list[dict]: + """Daily OHLCV + moving averages from serving.coffee_prices. Default ~2 years.""" + assert 1 <= limit <= 5000, "limit must be between 1 and 5000" + return await fetch_analytics( + """ + SELECT trade_date, open, high, low, close, volume, + daily_return_pct, sma_20d, sma_50d, high_52w, low_52w + FROM serving.coffee_prices + WHERE ticker = ? + ORDER BY trade_date DESC + LIMIT ? + """, + [ticker, limit], + ) + + +async def get_price_latest(ticker: str) -> dict | None: + """Latest trading day's close price, daily return, and 52-week range.""" + rows = await fetch_analytics( + """ + SELECT trade_date, close, daily_return_pct, high_52w, low_52w, sma_20d, sma_50d + FROM serving.coffee_prices + WHERE ticker = ? + ORDER BY trade_date DESC + LIMIT 1 + """, + [ticker], + ) + return rows[0] if rows else None + + +# ============================================================================= +# ICE Warehouse Stocks Queries +# ============================================================================= + + +async def get_ice_stocks_trend(days: int = 365) -> list[dict]: + """Daily ICE certified stocks over the trailing N days.""" + assert 1 <= days <= 3650, "days must be between 1 and 3650" + return await fetch_analytics( + """ + SELECT report_date, total_certified_bags, pending_grading_bags, + wow_change_bags, avg_30d_bags, high_52w_bags, drawdown_from_52w_high_pct + FROM serving.ice_warehouse_stocks + ORDER BY report_date DESC + LIMIT ? + """, + [days], + ) + + +async def get_ice_stocks_latest() -> dict | None: + """Latest ICE certified warehouse stock report.""" + rows = await fetch_analytics( + """ + SELECT report_date, total_certified_bags, pending_grading_bags, + wow_change_bags, avg_30d_bags, drawdown_from_52w_high_pct + FROM serving.ice_warehouse_stocks + ORDER BY report_date DESC + LIMIT 1 + """ + ) + return rows[0] if rows else None + + async def get_country_comparison( commodity_code: int, country_codes: list[str], diff --git a/web/src/beanflows/api/routes.py b/web/src/beanflows/api/routes.py index 1c62e04..24c4f39 100644 --- a/web/src/beanflows/api/routes.py +++ b/web/src/beanflows/api/routes.py @@ -198,6 +198,55 @@ async def commodity_positioning_latest(code: str): return jsonify({"cftc_commodity_code": code, "data": data}) +@bp.route("/commodities//prices") +@api_key_required(scopes=["read"]) +async def commodity_prices(code: str): + """Daily OHLCV price time series for a commodity ticker (e.g. KC=F). + + Query params: + start_date — ISO date filter (YYYY-MM-DD) + end_date — ISO date filter (YYYY-MM-DD) + limit — max rows returned (default 504 ≈ 2 years, max 5000) + """ + limit = min(int(request.args.get("limit", 504)), 5000) + start_date = request.args.get("start_date") + end_date = request.args.get("end_date") + + data = await analytics.get_price_time_series(code, limit=limit) + + # Apply date filters in Python — simpler than adding optional params to the query + if start_date: + data = [r for r in data if str(r["trade_date"]) >= start_date] + if end_date: + data = [r for r in data if str(r["trade_date"]) <= end_date] + + return jsonify({"ticker": code, "data": data}) + + +@bp.route("/commodities//stocks") +@api_key_required(scopes=["read"]) +async def commodity_ice_stocks(code: str): + """ICE certified warehouse stock time series. + + Query params: + start_date — ISO date filter (YYYY-MM-DD) + end_date — ISO date filter (YYYY-MM-DD) + days — trailing days (default 365, max 3650) + """ + days = min(int(request.args.get("days", 365)), 3650) + start_date = request.args.get("start_date") + end_date = request.args.get("end_date") + + data = await analytics.get_ice_stocks_trend(days=days) + + if start_date: + data = [r for r in data if str(r["report_date"]) >= start_date] + if end_date: + data = [r for r in data if str(r["report_date"]) <= end_date] + + return jsonify({"commodity": code, "data": data}) + + @bp.route("/commodities//metrics.csv") @api_key_required(scopes=["read"]) async def commodity_metrics_csv(code: int): diff --git a/web/src/beanflows/dashboard/routes.py b/web/src/beanflows/dashboard/routes.py index c82fca7..785a1ef 100644 --- a/web/src/beanflows/dashboard/routes.py +++ b/web/src/beanflows/dashboard/routes.py @@ -100,7 +100,12 @@ async def index(): # Fetch all analytics data in parallel (empty lists/None if DB not available) if analytics._conn is not None: - time_series, top_producers, stu_trend, balance, yoy, cot_latest, cot_trend = await asyncio.gather( + ( + time_series, top_producers, stu_trend, balance, yoy, + cot_latest, cot_trend, + price_series, price_latest, + ice_stocks_trend, ice_stocks_latest, + ) = await asyncio.gather( analytics.get_global_time_series( analytics.COFFEE_COMMODITY_CODE, ["production", "exports", "imports", "ending_stocks", "total_distribution"], @@ -111,10 +116,16 @@ async def index(): analytics.get_production_yoy_by_country(analytics.COFFEE_COMMODITY_CODE, limit=15), analytics.get_cot_positioning_latest(analytics.COFFEE_CFTC_CODE), analytics.get_cot_index_trend(analytics.COFFEE_CFTC_CODE, weeks=104), + analytics.get_price_time_series(analytics.COFFEE_TICKER, limit=504), + analytics.get_price_latest(analytics.COFFEE_TICKER), + analytics.get_ice_stocks_trend(days=365), + analytics.get_ice_stocks_latest(), ) else: time_series, top_producers, stu_trend, balance, yoy = [], [], [], [], [] cot_latest, cot_trend = None, [] + price_series, price_latest = [], None + ice_stocks_trend, ice_stocks_latest = [], None # Latest global snapshot for key metric cards latest = time_series[-1] if time_series else {} @@ -140,6 +151,10 @@ async def index(): yoy=yoy, cot_latest=cot_latest, cot_trend=cot_trend, + price_series=price_series, + price_latest=price_latest, + ice_stocks_trend=ice_stocks_trend, + ice_stocks_latest=ice_stocks_latest, ) diff --git a/web/src/beanflows/dashboard/templates/index.html b/web/src/beanflows/dashboard/templates/index.html index d180348..7eb5cc4 100644 --- a/web/src/beanflows/dashboard/templates/index.html +++ b/web/src/beanflows/dashboard/templates/index.html @@ -148,6 +148,80 @@ {% endif %} + + {% if price_latest %} +
+

Coffee C Futures Price — KC=F

+

ICE Coffee C Arabica · Daily close price · Source: Yahoo Finance

+
+
+
Latest Close
+
{{ "{:.2f}".format(price_latest.close) }}
+
¢/lb · as of {{ price_latest.trade_date }}
+
+
+
Daily Change
+
+ {% if price_latest.daily_return_pct is not none %} + {{ "{:+.2f}%".format(price_latest.daily_return_pct) }} + {% else %}--{% endif %} +
+
vs previous close
+
+
+
52-Week High
+
{{ "{:.2f}".format(price_latest.high_52w) }}
+
¢/lb
+
+
+
52-Week Low
+
{{ "{:.2f}".format(price_latest.low_52w) }}
+
¢/lb
+
+
+ +
+ {% endif %} + + + {% if ice_stocks_latest %} +
+

ICE Certified Warehouse Stocks

+

Physical Arabica certified for delivery against ICE Coffee C futures · as of {{ ice_stocks_latest.report_date }}

+
+
+
Certified Stocks
+
{{ "{:,.0f}".format(ice_stocks_latest.total_certified_bags) }}
+
60-kg bags
+
+
+
Week-over-Week
+
+ {% if ice_stocks_latest.wow_change_bags is not none %} + {{ "{:+,d}".format(ice_stocks_latest.wow_change_bags | int) }} + {% else %}--{% endif %} +
+
bags vs previous day
+
+
+
30-Day Average
+
{{ "{:,.0f}".format(ice_stocks_latest.avg_30d_bags) }}
+
60-kg bags
+
+
+
Drawdown from 52w High
+
+ {% if ice_stocks_latest.drawdown_from_52w_high_pct is not none %} + {{ "{:.1f}%".format(ice_stocks_latest.drawdown_from_52w_high_pct) }} + {% else %}--{% endif %} +
+
below 52-week peak
+
+
+ +
+ {% endif %} +
Country Comparison @@ -286,6 +360,97 @@ if (cotRaw && cotRaw.length > 0) { }); } +// -- Coffee Prices Chart (close + 20d MA + 50d MA) -- +const priceRaw = {{ price_series | tojson }}; +if (priceRaw && priceRaw.length > 0) { + const priceData = [...priceRaw].reverse(); // query returns DESC, chart needs ASC + new Chart(document.getElementById('priceChart'), { + type: 'line', + data: { + labels: priceData.map(r => r.trade_date), + datasets: [ + { + label: 'Close (¢/lb)', + data: priceData.map(r => r.close), + borderColor: CHART_COLORS.copper, + backgroundColor: CHART_COLORS.copper + '18', + fill: true, + tension: 0.2, + pointRadius: 0, + yAxisID: 'y' + }, + { + label: '20d MA', + data: priceData.map(r => r.sma_20d), + borderColor: CHART_COLORS.beanGreen, + borderDash: [4, 3], + tension: 0.2, + pointRadius: 0, + yAxisID: 'y' + }, + { + label: '50d MA', + data: priceData.map(r => r.sma_50d), + borderColor: CHART_COLORS.roast, + borderDash: [8, 4], + tension: 0.2, + pointRadius: 0, + yAxisID: 'y' + } + ] + }, + options: { + responsive: true, + interaction: {mode: 'index', intersect: false}, + plugins: {legend: {position: 'bottom'}}, + scales: { + x: {ticks: {maxTicksLimit: 12}}, + y: {title: {display: true, text: '¢/lb'}, beginAtZero: false} + } + } + }); +} + +// -- ICE Warehouse Stocks Chart -- +const iceRaw = {{ ice_stocks_trend | tojson }}; +if (iceRaw && iceRaw.length > 0) { + const iceData = [...iceRaw].reverse(); // query returns DESC, chart needs ASC + new Chart(document.getElementById('iceStocksChart'), { + type: 'line', + data: { + labels: iceData.map(r => r.report_date), + datasets: [ + { + label: 'Certified Stocks (bags)', + data: iceData.map(r => r.total_certified_bags), + borderColor: CHART_COLORS.roast, + backgroundColor: CHART_COLORS.roast + '18', + fill: true, + tension: 0.2, + pointRadius: 0 + }, + { + label: '30d Average', + data: iceData.map(r => r.avg_30d_bags), + borderColor: CHART_COLORS.stone, + borderDash: [5, 4], + tension: 0.2, + pointRadius: 0 + } + ] + }, + options: { + responsive: true, + interaction: {mode: 'index', intersect: false}, + plugins: {legend: {position: 'bottom'}}, + scales: { + x: {ticks: {maxTicksLimit: 12}}, + y: {title: {display: true, text: '60-kg bags'}, beginAtZero: false} + } + } + }); +} + // -- Top Producers Horizontal Bar -- const topData = {{ top_producers | tojson }}; if (topData.length > 0) { diff --git a/web/src/beanflows/public/routes.py b/web/src/beanflows/public/routes.py index e353eb2..4748745 100644 --- a/web/src/beanflows/public/routes.py +++ b/web/src/beanflows/public/routes.py @@ -46,6 +46,12 @@ async def about(): return await render_template("about.html") +@bp.route("/methodology") +async def methodology(): + """Data methodology page — explains all data sources.""" + return await render_template("methodology.html") + + @bp.route("/feedback", methods=["POST"]) @csrf_protect async def feedback(): diff --git a/web/src/beanflows/public/templates/methodology.html b/web/src/beanflows/public/templates/methodology.html new file mode 100644 index 0000000..21c1a70 --- /dev/null +++ b/web/src/beanflows/public/templates/methodology.html @@ -0,0 +1,230 @@ +{% extends "base.html" %} + +{% block title %}Data Methodology — {{ config.APP_NAME }}{% endblock %} + +{% block content %} +
+ +
+
+

Data Methodology

+

Every number on BeanFlows has a source, a frequency, and a known limitation. Here's exactly where the data comes from and how we process it.

+
+
+ + +
+ + + +
+

USDA Production, Supply & Distribution

+

The USDA's Production, Supply and Distribution (PSD) Online database is the definitive public source for agricultural commodity supply and demand balances. It is maintained by the USDA Foreign Agricultural Service and covers 160+ countries and 50+ commodities going back to the 1960s for some crops.

+ +

What we use

+
    +
  • Commodity: Coffee, Green — USDA commodity code 0711100
  • +
  • Coverage: 2006–present (monthly updates)
  • +
  • Geography: Country-level + world aggregate
  • +
  • Source URL: apps.fas.usda.gov/psdonlineapi
  • +
+ +

Metrics

+
+ + + + + + + + + + + + +
MetricDefinitionUnit
ProductionHarvested green coffee output1,000 × 60-kg bags
ImportsPhysical coffee imported into country1,000 × 60-kg bags
ExportsPhysical coffee exported from country1,000 × 60-kg bags
Domestic ConsumptionCoffee consumed within country1,000 × 60-kg bags
Ending StocksCarry-over stocks at marketing year end1,000 × 60-kg bags
Stock-to-Use RatioEnding stocks ÷ consumption × 100%
+
+ +

Release schedule

+

USDA publishes PSD updates monthly, typically in the second week of the month as part of the World Agricultural Supply and Demand Estimates (WASDE) report. Our pipeline checks for updates daily and downloads new data when the file hash changes.

+ +
+ Note on marketing years: Coffee marketing years vary by origin country. Brazil's marketing year runs April–March; Colombia's runs October–September. USDA normalizes all data to a common market year basis for the global aggregate. +
+
+ + +
+

CFTC Commitments of Traders

+

The Commitments of Traders (COT) report is published weekly by the U.S. Commodity Futures Trading Commission (CFTC). It shows the net positions of large traders in regulated futures markets. It is the primary public indicator of speculative positioning in agricultural commodities.

+ +

What we use

+
    +
  • Report type: Disaggregated Futures-Only
  • +
  • Commodity: Coffee C — CFTC code 083
  • +
  • Snapshot date: Every Tuesday close-of-business
  • +
  • Release date: The following Friday at 3:30 PM ET
  • +
  • Coverage: June 2006–present
  • +
  • Source: cftc.gov/files/dea/history/fut_disagg_txt_{year}.zip
  • +
+ +

Trader categories

+
+ + + + + + + + + + + +
CategoryWho they areWhat to watch
Managed MoneyHedge funds, CTAs, algorithmic tradersPrimary speculative signal — net long = bullish
Producer / MerchantCoffee exporters, processors, roastersCommercial hedgers — usually net short
Swap DealersBanks providing OTC commodity exposureIndex fund replication — less directional signal
Other ReportablesLarge traders not fitting other categoriesMixed motivations
Non-ReportableSmall speculators below CFTC thresholdRetail sentiment proxy
+
+ +

COT Index

+

The COT Index normalizes the managed money net position to a 0–100 scale over a trailing window (we publish both 26-week and 52-week). It is calculated as:

+
+ COT Index = (current net − min over window) ÷ (max over window − min over window) × 100 +
+

A reading near 0 indicates managed money is at its most bearish extreme over the window. A reading near 100 indicates maximum bullish positioning. Think of it as an RSI for speculative positioning.

+
+ + +
+

Coffee Futures Price (KC=F)

+

The Coffee C contract (ticker: KC=F) is the global benchmark price for Arabica coffee, traded on ICE Futures U.S. (formerly New York Board of Trade). Each contract covers 37,500 lbs of green coffee. Price is quoted in US cents per pound (¢/lb).

+ +

What we use

+
    +
  • Ticker: KC=F (front-month continuous contract)
  • +
  • Data: Daily OHLCV (Open, High, Low, Close, Adjusted Close, Volume)
  • +
  • Source: Yahoo Finance via yfinance
  • +
  • Coverage: 1971–present
  • +
  • Delay: ~15-minute delayed (Yahoo Finance standard)
  • +
+ +

Derived metrics

+
    +
  • Daily Return %: (close − prev close) ÷ prev close × 100
  • +
  • 20-day SMA: Simple moving average of the last 20 trading days
  • +
  • 50-day SMA: Simple moving average of the last 50 trading days
  • +
  • 52-week High/Low: Rolling high/low over the trailing ~252 trading days
  • +
+ +
+ Front-month continuity: KC=F is the continuous front-month contract. At roll dates, there is a price gap between expiring and next-month contracts. Adjusted Close accounts for roll adjustments. We use raw Close for current price display and Adjusted Close for historical return calculations. +
+
+ + +
+

ICE Certified Warehouse Stocks

+

ICE Futures U.S. publishes daily reports of certified warehouse stocks for Coffee C. These are physical bags of Arabica coffee that have been graded and stamped as meeting ICE delivery specifications — making them eligible for delivery against a futures contract at expiration.

+ +

Why certified stocks matter

+

Certified stocks are the physical backing of the futures market. When certified stocks fall sharply while open interest is high, shorts cannot easily deliver physical coffee — this creates a squeeze dynamic that can drive explosive price rallies. Tracking certified stocks alongside positioning data is essential for understanding delivery risk.

+ +

What we track

+
    +
  • Total Certified Bags: All ICE-approved warehouse receipts (60-kg bags)
  • +
  • Pending Grading: Coffee being evaluated for certification (may join or exit certified stock)
  • +
  • Source: ICE Report Center (daily publication)
  • +
  • Update frequency: Daily, after market close
  • +
+ +

Derived metrics

+
    +
  • WoW Change: Day-over-day change in certified bags
  • +
  • 30-Day Average: Smoothed trend removing daily noise
  • +
  • 52-Week High: Rolling maximum over trailing 365 days
  • +
  • Drawdown from 52w High: % decline from peak — measures how far stocks have been drawn down
  • +
+
+ + +
+

Data Quality

+ +

Immutable raw layer

+

All source files are stored as immutable gzip-compressed CSVs in a content-addressed landing directory. Files are never modified in place — a new download creates a new file only if the content hash differs from what is already stored. This means the full history of source corrections is preserved.

+ +

Incremental models with deduplication

+

Foundation models are incremental and deduplicate via a hash key computed from business grain columns and key metrics. If a source issues a correction (CFTC re-states a COT figure, USDA revises a production estimate), the corrected row produces a different hash and is ingested on the next pipeline run. Serving models select the most recent revision per grain.

+ +

Known limitations

+
    +
  • USDA PSD revisions can extend back multiple years — always treat historical figures as estimates subject to revision.
  • +
  • Yahoo Finance prices carry a ~15-minute delay and may have minor adjustments at roll dates.
  • +
  • COT data reflects Tuesday close positions; the market may move significantly before Friday's release.
  • +
  • ICE warehouse stocks do not distinguish between origins — certified stock drawdowns at specific ports are not visible here.
  • +
+
+ + +
+

Update Schedule

+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
SourceFrequencyTypical freshnessNotes
USDA PSDMonthly~2nd week of monthWASDE release day; daily pipeline detects hash change
CFTC COTWeekly (Friday)Friday 3:30 PM ETReflects prior Tuesday positions
KC=F PriceDailyNext morningYahoo Finance ~15 min delayed; previous day close available next morning
ICE Warehouse StocksDailyAfter market closeICE publishes report center data daily after the close
+
+ +

Our pipeline runs continuously. Data is re-checked daily and new data is loaded within hours of publication. The dashboard shows the freshness date on each data section.

+
+ + +
+

Questions about the data?

+

If you spot an inconsistency or want to understand how a specific metric is calculated, use the feedback button on any page or reach out directly.

+ Try BeanFlows free +
+
+
+{% endblock %} diff --git a/web/src/beanflows/templates/base.html b/web/src/beanflows/templates/base.html index 4fa2fb5..f60fdb4 100644 --- a/web/src/beanflows/templates/base.html +++ b/web/src/beanflows/templates/base.html @@ -77,6 +77,7 @@
  • Features
  • Pricing
  • About
  • +
  • Methodology