diff --git a/CLAUDE.md b/CLAUDE.md index ec184c3..050d387 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -44,23 +44,24 @@ uv run materia secrets get **Workspace packages** (`pyproject.toml` → `tool.uv.workspace`): - `extract/psdonline/` — Downloads USDA PSD Online data, normalizes ZIP→gzip CSV, writes to local landing directory -- `transform/sqlmesh_materia/` — 4-layer SQL transformation pipeline (local DuckDB) +- `extract/openweathermap/` — Daily weather for 8 coffee-growing regions (OWM One Call API 3.0) +- `transform/sqlmesh_materia/` — 3-layer SQL transformation pipeline (local DuckDB) - `src/materia/` — CLI (Typer) for pipeline execution, worker management, secrets - `web/` — Future web frontend **Data flow:** ``` USDA API → extract → /data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip +OWM API → extract → /data/materia/landing/weather/{location_id}/{year}/{date}.json.gz → rclone cron syncs landing/ to R2 - → SQLMesh raw → staging → cleaned → serving → /data/materia/lakehouse.duckdb + → SQLMesh staging → foundation → serving → /data/materia/lakehouse.duckdb → Web app reads lakehouse.duckdb (read-only) ``` -**SQLMesh 4-layer model structure** (`transform/sqlmesh_materia/models/`): -1. `raw/` — Immutable source reads (read_csv from landing directory) -2. `staging/` — Type casting, lookup joins, basic cleansing -3. `cleaned/` — Business logic, pivoting, integration -4. `serving/` — Analytics-ready facts, dimensions, aggregates +**SQLMesh 3-layer model structure** (`transform/sqlmesh_materia/models/`): +1. `staging/` — Type casting, lookup joins, basic cleansing (reads landing directly) +2. `foundation/` — Business logic, pivoting, dimensions, facts (also reads landing directly) +3. `serving/` — Analytics-ready aggregates for the web app **CLI modules** (`src/materia/`): - `cli.py` — Typer app with subcommands: worker, pipeline, secrets, version @@ -100,3 +101,4 @@ Read `coding_philosophy.md` for the full guide. Key points: |----------|---------|-------------| | `LANDING_DIR` | `data/landing` | Root directory for extracted landing data | | `DUCKDB_PATH` | `local.duckdb` | Path to the DuckDB lakehouse database | +| `OPENWEATHERMAP_API_KEY` | — | OWM One Call API 3.0 key (required for weather extraction) | diff --git a/extract/openweathermap/pyproject.toml b/extract/openweathermap/pyproject.toml new file mode 100644 index 0000000..b23e3a0 --- /dev/null +++ b/extract/openweathermap/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "openweathermap" +version = "0.1.0" +description = "OpenWeatherMap daily weather extractor for coffee-growing regions" +requires-python = ">=3.13" +dependencies = [ + "extract_core", + "niquests>=3.14.1", +] + +[project.scripts] +extract_weather = "openweathermap.execute:extract_weather" +extract_weather_backfill = "openweathermap.execute:extract_weather_backfill" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/openweathermap"] diff --git a/extract/openweathermap/src/openweathermap/__init__.py b/extract/openweathermap/src/openweathermap/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/extract/openweathermap/src/openweathermap/api.py b/extract/openweathermap/src/openweathermap/api.py new file mode 100644 index 0000000..8a48c34 --- /dev/null +++ b/extract/openweathermap/src/openweathermap/api.py @@ -0,0 +1,76 @@ +"""Thin client for the OpenWeatherMap One Call API 3.0 — Day Summary endpoint. + +Endpoint: GET https://api.openweathermap.org/data/3.0/onecall/day_summary +Docs: https://openweathermap.org/api/one-call-3#history_daily_aggregation + +Returns one JSON object per (lat, lon, date) with daily aggregates: + temperature.{min,max,morning,afternoon,evening,night} + precipitation.total + humidity.afternoon + cloud_cover.afternoon + wind.max.{speed,direction} + pressure.afternoon + +This module contains only the HTTP call and basic response validation. +All business logic (file storage, rate limiting, cursor tracking) lives in execute.py. +""" + +import niquests + +OWM_BASE_URL = "https://api.openweathermap.org/data/3.0/onecall/day_summary" +HTTP_TIMEOUT_SECONDS = 30 +MAX_RESPONSE_BYTES = 10_000 # Day summary is ~500 bytes; 10 KB is a generous bound + + +class RateLimitError(Exception): + """Raised when OWM returns HTTP 429 (rate limit exceeded).""" + + +def fetch_day_summary( + session: niquests.Session, + lat: float, + lon: float, + date_str: str, + api_key: str, +) -> dict: + """Fetch the OWM One Call 3.0 day summary for a single (lat, lon, date). + + date_str must be YYYY-MM-DD format. + Returns the parsed JSON dict on success. + + Raises RateLimitError on HTTP 429 — caller is responsible for sleeping and retrying. + Raises AssertionError on any other non-200 status. + """ + assert api_key, "api_key must not be empty" + assert date_str and len(date_str) == 10, f"date_str must be YYYY-MM-DD, got {date_str!r}" + assert -90.0 <= lat <= 90.0, f"lat out of range: {lat}" + assert -180.0 <= lon <= 180.0, f"lon out of range: {lon}" + + response = session.get( + OWM_BASE_URL, + params={ + "lat": lat, + "lon": lon, + "date": date_str, + "appid": api_key, + "units": "metric", + }, + timeout=HTTP_TIMEOUT_SECONDS, + ) + + if response.status_code == 429: + raise RateLimitError(f"OWM rate limit hit for lat={lat} lon={lon} date={date_str}") + + assert response.status_code == 200, ( + f"OWM API returned HTTP {response.status_code} for " + f"lat={lat} lon={lon} date={date_str}: {response.text[:200]}" + ) + assert len(response.content) <= MAX_RESPONSE_BYTES, ( + f"OWM response unexpectedly large ({len(response.content)} bytes) for {date_str}" + ) + + data = response.json() + assert isinstance(data, dict), f"Expected dict response, got {type(data)}" + assert "date" in data, f"OWM response missing 'date' field: {list(data.keys())}" + + return data diff --git a/extract/openweathermap/src/openweathermap/execute.py b/extract/openweathermap/src/openweathermap/execute.py new file mode 100644 index 0000000..6466c1f --- /dev/null +++ b/extract/openweathermap/src/openweathermap/execute.py @@ -0,0 +1,330 @@ +"""OpenWeatherMap daily weather extraction for coffee-growing regions. + +Two entry points: + + extract_weather() + Daily run: fetches yesterday + today for all 8 locations (16 calls max). + Yesterday is included to cover the midnight edge case — if the daily job + fires just after midnight UTC, today's OWM data may still be partial. + Idempotent: skips if the landing file already exists. + + extract_weather_backfill() + Historical fill: iterates (date, location) pairs from 2020-01-01 to + yesterday. Bounded to MAX_CALLS_PER_BACKFILL_RUN per run; re-run daily + to advance. Resumes from cursor on restart. + +Landing path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz + +Idempotency: file existence check. Past weather is immutable — (location_id, date) +uniquely identifies a file that never changes once written. + +Backfill cursor format: '{location_id}:{date}' (e.g. 'brazil_parana:2022-07-15'). +Encodes both dimensions so a mid-run crash resumes at the exact (location, date) pair. +""" + +import gzip +import json +import logging +import os +import sys +import time +from datetime import date, timedelta +from pathlib import Path + +import niquests +from extract_core import end_run, get_last_cursor, landing_path, open_state_db, start_run, write_bytes_atomic + +from openweathermap.api import RateLimitError, fetch_day_summary +from openweathermap.locations import LOCATIONS + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger("OWM Weather Extractor") + +LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing")) +LANDING_SUBDIR = "weather" + +EXTRACTOR_DAILY = "owm_weather_daily" +EXTRACTOR_BACKFILL = "owm_weather_backfill" + +# Rate limiting: OWM free tier = 1000 calls/day (~0.7/s). +# 1.5s between calls stays comfortably below the limit for the daily run. +# 2.0s for backfill (more conservative, many sequential calls). +SLEEP_BETWEEN_CALLS_SECONDS = 1.5 +SLEEP_BETWEEN_BACKFILL_CALLS_SECONDS = 2.0 + +# On 429: wait 60s, then one retry. If still 429, abort the run. +SLEEP_ON_RATE_LIMIT_SECONDS = 60 +MAX_RATE_LIMIT_RETRIES = 1 + +# Cap backfill at 500 calls per run (~17 min at 2s/call). +# 5-year backfill = 14,600 calls → ~30 runs. Re-run daily until complete. +MAX_CALLS_PER_BACKFILL_RUN = 500 + + +# ── helpers ────────────────────────────────────────────────────────────────── + +def _write_weather_file(location_id: str, date_str: str, payload: dict) -> int: + """Gzip-compress payload JSON and write atomically to the landing zone. + + Returns bytes_written, or 0 if the file already exists (idempotent skip). + Path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz + """ + assert location_id, "location_id must not be empty" + assert date_str and len(date_str) == 10, f"date_str must be YYYY-MM-DD, got {date_str!r}" + assert isinstance(payload, dict) and payload, "payload must be a non-empty dict" + + year = date_str[:4] + dest_dir = landing_path(LANDING_DIR, LANDING_SUBDIR, location_id, year) + local_file = dest_dir / f"{date_str}.json.gz" + + if local_file.exists(): + logger.debug(f"Already exists, skipping: {local_file}") + return 0 + + compressed = gzip.compress(json.dumps(payload, separators=(",", ":")).encode("utf-8")) + bytes_written = write_bytes_atomic(local_file, compressed) + logger.info(f"Stored {local_file} ({bytes_written:,} bytes)") + return bytes_written + + +def _fetch_with_retry(session: niquests.Session, loc: dict, date_str: str, api_key: str) -> dict | None: + """Fetch OWM day summary with one 429-retry. + + Returns the JSON dict on success, or None if rate limit persists after retry. + """ + for attempt in range(MAX_RATE_LIMIT_RETRIES + 1): + try: + return fetch_day_summary(session, loc["lat"], loc["lon"], date_str, api_key) + except RateLimitError: + if attempt < MAX_RATE_LIMIT_RETRIES: + logger.warning( + f"Rate limit hit for {loc['id']} {date_str} — " + f"sleeping {SLEEP_ON_RATE_LIMIT_SECONDS}s before retry" + ) + time.sleep(SLEEP_ON_RATE_LIMIT_SECONDS) + else: + logger.error(f"Rate limit persisted after retry for {loc['id']} {date_str}") + return None + return None # unreachable; satisfies type checker + + +def _file_exists(location_id: str, date_str: str) -> bool: + year = date_str[:4] + return (LANDING_DIR / LANDING_SUBDIR / location_id / year / f"{date_str}.json.gz").exists() + + +# ── daily extractor ─────────────────────────────────────────────────────────── + +def extract_weather() -> None: + """Fetch yesterday + today weather for all 8 coffee-growing locations. + + Up to 16 API calls. Both days are skipped if files already exist, + so re-running costs zero API calls (fully idempotent). + """ + api_key = os.environ.get("OPENWEATHERMAP_API_KEY", "") + assert api_key, "OPENWEATHERMAP_API_KEY environment variable must be set" + + today = date.today() + yesterday = today - timedelta(days=1) + dates_to_fetch = [yesterday.isoformat(), today.isoformat()] + + conn = open_state_db(LANDING_DIR) + run_id = start_run(conn, EXTRACTOR_DAILY) + files_written = 0 + files_skipped = 0 + bytes_written_total = 0 + + try: + with niquests.Session() as session: + for date_str in dates_to_fetch: + for loc in LOCATIONS: + if _file_exists(loc["id"], date_str): + logger.info(f"Already exists: {loc['id']} {date_str}") + files_skipped += 1 + continue + + data = _fetch_with_retry(session, loc, date_str, api_key) + if data is None: + logger.error(f"Skipping {loc['id']} {date_str} after persistent rate limit") + continue + + bw = _write_weather_file(loc["id"], date_str, data) + if bw > 0: + files_written += 1 + bytes_written_total += bw + else: + files_skipped += 1 + + time.sleep(SLEEP_BETWEEN_CALLS_SECONDS) + + end_run( + conn, run_id, + status="success", + files_written=files_written, + files_skipped=files_skipped, + bytes_written=bytes_written_total, + cursor_value=today.isoformat(), + ) + logger.info(f"Daily weather complete: {files_written} new, {files_skipped} skipped") + except Exception as e: + end_run(conn, run_id, status="failed", error_message=str(e)) + raise + finally: + conn.close() + + +# ── backfill extractor ──────────────────────────────────────────────────────── + +def extract_weather_backfill() -> None: + """Fill historical weather data from 2020-01-01 to yesterday. + + Iterates (date, location) pairs in date-ascending, LOCATIONS-list order. + Bounded to MAX_CALLS_PER_BACKFILL_RUN per run — re-run daily to advance. + + Cursor format: '{location_id}:{date}' (e.g. 'brazil_parana:2022-07-15'). + Encodes both dimensions: on resume, all pairs at or before the cursor are + skipped (via cursor comparison first, then file-existence check). + + 5-year backfill (2020–2025) = 14,600 calls. At 500/run = ~30 runs. + + 429 handling: sleep 60s, one retry. If still 429, save cursor and exit + with status='failed' so the cursor does not advance beyond the last + successfully written pair. Safe to re-run the next day. + """ + api_key = os.environ.get("OPENWEATHERMAP_API_KEY", "") + assert api_key, "OPENWEATHERMAP_API_KEY environment variable must be set" + + start = date(2020, 1, 1) + end = date.today() - timedelta(days=1) # never fetch today in backfill + + conn = open_state_db(LANDING_DIR) + run_id = start_run(conn, EXTRACTOR_BACKFILL) + files_written = 0 + files_skipped = 0 + bytes_written_total = 0 + calls_made = 0 + last_cursor: str | None = None + + # Load resume cursor from last successful run + resume_cursor = get_last_cursor(conn, EXTRACTOR_BACKFILL) + if resume_cursor: + logger.info(f"Resuming backfill from cursor: {resume_cursor}") + else: + logger.info(f"Starting fresh backfill from {start.isoformat()}") + + # Parse cursor into (location_id, date_str) for skip comparison + resume_location_id: str | None = None + resume_date_str: str | None = None + if resume_cursor and ":" in resume_cursor: + resume_location_id, resume_date_str = resume_cursor.split(":", 1) + + location_ids = [loc["id"] for loc in LOCATIONS] + resume_loc_idx = -1 + if resume_location_id and resume_location_id in location_ids: + resume_loc_idx = location_ids.index(resume_location_id) + + try: + with niquests.Session() as session: + current = start + while current <= end: + date_str = current.isoformat() + + for loc in LOCATIONS: + loc_idx = location_ids.index(loc["id"]) + + # Cursor-based skip: (date, loc_idx) <= (resume_date, resume_loc_idx) + # This skips everything already processed in previous runs. + if resume_date_str: + if date_str < resume_date_str: + files_skipped += 1 + continue + if date_str == resume_date_str and loc_idx <= resume_loc_idx: + files_skipped += 1 + continue + + # File-existence check: idempotency guard for files already on disk + # (e.g. written by the daily extractor, or a previous partial run) + if _file_exists(loc["id"], date_str): + files_skipped += 1 + last_cursor = f"{loc['id']}:{date_str}" + continue + + # Per-run call cap + if calls_made >= MAX_CALLS_PER_BACKFILL_RUN: + logger.info( + f"Reached cap of {MAX_CALLS_PER_BACKFILL_RUN} calls. " + f"Re-run to continue from {last_cursor or resume_cursor}" + ) + end_run( + conn, run_id, + status="success", + files_written=files_written, + files_skipped=files_skipped, + bytes_written=bytes_written_total, + cursor_value=last_cursor or resume_cursor, + ) + return + + data = _fetch_with_retry(session, loc, date_str, api_key) + calls_made += 1 + + if data is None: + logger.warning(f"Persistent rate limit at {loc['id']} {date_str} — stopping run") + end_run( + conn, run_id, + status="failed", + files_written=files_written, + files_skipped=files_skipped, + bytes_written=bytes_written_total, + cursor_value=last_cursor or resume_cursor, + error_message="Persistent rate limit — resume from cursor", + ) + return + + bw = _write_weather_file(loc["id"], date_str, data) + if bw > 0: + files_written += 1 + bytes_written_total += bw + else: + files_skipped += 1 + + last_cursor = f"{loc['id']}:{date_str}" + time.sleep(SLEEP_BETWEEN_BACKFILL_CALLS_SECONDS) + + current += timedelta(days=1) + + final_cursor = last_cursor or resume_cursor or end.isoformat() + logger.info( + f"Backfill complete: {files_written} written, " + f"{files_skipped} skipped, {calls_made} API calls" + ) + end_run( + conn, run_id, + status="success", + files_written=files_written, + files_skipped=files_skipped, + bytes_written=bytes_written_total, + cursor_value=final_cursor, + ) + except Exception as e: + end_run( + conn, run_id, + status="failed", + files_written=files_written, + files_skipped=files_skipped, + bytes_written=bytes_written_total, + cursor_value=last_cursor or resume_cursor, + error_message=str(e), + ) + raise + finally: + conn.close() + + +if __name__ == "__main__": + extract_weather() diff --git a/extract/openweathermap/src/openweathermap/locations.py b/extract/openweathermap/src/openweathermap/locations.py new file mode 100644 index 0000000..6d0aa4b --- /dev/null +++ b/extract/openweathermap/src/openweathermap/locations.py @@ -0,0 +1,35 @@ +"""Coffee-growing region coordinates for OpenWeatherMap extraction. + +Each entry is a dict with: + id — filesystem-safe unique identifier (used as landing subdirectory name) + lat/lon — WGS84 coordinates + name — human-readable region name + country — ISO 3166-1 alpha-2 country code + variety — 'Arabica' or 'Robusta' (drives growing season logic in SQL) + +Locations were chosen to represent the primary growing zones for the world's +major coffee-producing countries, weighted toward Arabica regions since KC=F +futures track Arabica. +""" + +LOCATIONS: list[dict] = [ + # Brazil — largest Arabica producer; frost risk in highlands (Jun–Aug) + {"id": "brazil_minas_gerais", "lat": -19.9167, "lon": -43.9345, "name": "Minas Gerais", "country": "BR", "variety": "Arabica"}, + {"id": "brazil_parana", "lat": -23.4205, "lon": -51.9330, "name": "Paraná", "country": "BR", "variety": "Arabica"}, + # Vietnam — largest Robusta producer; Central Highlands plateau + {"id": "vietnam_highlands", "lat": 12.6667, "lon": 108.0500, "name": "Central Highlands", "country": "VN", "variety": "Robusta"}, + # Colombia — premium washed Arabica; Huila department + {"id": "colombia_huila", "lat": 2.5359, "lon": -75.5277, "name": "Huila", "country": "CO", "variety": "Arabica"}, + # Ethiopia — birthplace of Arabica; Sidama zone (Yirgacheffe region) + {"id": "ethiopia_sidama", "lat": 6.7612, "lon": 38.4721, "name": "Sidama", "country": "ET", "variety": "Arabica"}, + # Honduras — largest Central American producer; Copán department + {"id": "honduras_copan", "lat": 14.8333, "lon": -89.1500, "name": "Copán", "country": "HN", "variety": "Arabica"}, + # Guatemala — benchmark Central American; Antigua valley + {"id": "guatemala_antigua", "lat": 14.5586, "lon": -90.7295, "name": "Antigua", "country": "GT", "variety": "Arabica"}, + # Indonesia — Sumatra (Mandheling); significant Robusta production + {"id": "indonesia_sumatra", "lat": 3.5952, "lon": 98.6722, "name": "Sumatra", "country": "ID", "variety": "Robusta"}, +] + +assert len(LOCATIONS) == 8, f"Expected 8 locations, got {len(LOCATIONS)}" +assert all("id" in loc and "lat" in loc and "lon" in loc for loc in LOCATIONS), \ + "Each location must have id, lat, lon" diff --git a/pyproject.toml b/pyproject.toml index dc265a5..9904cab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ sqlmesh_materia = {workspace = true } cftc_cot = {workspace = true } coffee_prices = {workspace = true } ice_stocks = {workspace = true } +openweathermap = {workspace = true } [tool.uv.workspace] members = [ "extract/*", diff --git a/src/materia/pipelines.py b/src/materia/pipelines.py index f94be5a..0f3d2f6 100644 --- a/src/materia/pipelines.py +++ b/src/materia/pipelines.py @@ -40,9 +40,17 @@ PIPELINES = { "command": ["uv", "run", "--package", "ice_stocks", "extract_ice_all"], "timeout_seconds": 1800, }, + "extract_weather": { + "command": ["uv", "run", "--package", "openweathermap", "extract_weather"], + "timeout_seconds": 300, + }, + "extract_weather_backfill": { + "command": ["uv", "run", "--package", "openweathermap", "extract_weather_backfill"], + "timeout_seconds": 1200, + }, "extract_all": { - "command": ["meta", "extract", "extract_cot", "extract_prices", "extract_ice_all"], - "timeout_seconds": 6300, + "command": ["meta", "extract", "extract_cot", "extract_prices", "extract_ice_all", "extract_weather"], + "timeout_seconds": 6600, }, "transform": { "command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"], @@ -60,7 +68,7 @@ PIPELINES = { META_PIPELINES: dict[str, list[str]] = { - "extract_all": ["extract", "extract_cot", "extract_prices", "extract_ice_all"], + "extract_all": ["extract", "extract_cot", "extract_prices", "extract_ice_all", "extract_weather"], } diff --git a/transform/sqlmesh_materia/audits/assert_positive_order_ids.sql b/transform/sqlmesh_materia/audits/assert_positive_order_ids.sql index 4b66f40..99f6ae1 100644 --- a/transform/sqlmesh_materia/audits/assert_positive_order_ids.sql +++ b/transform/sqlmesh_materia/audits/assert_positive_order_ids.sql @@ -1,9 +1,9 @@ AUDIT ( - name assert_positive_order_ids, + name assert_positive_order_ids ); -SELECT * +SELECT + * FROM @this_model WHERE - item_id < 0 - \ No newline at end of file + item_id < 0 \ No newline at end of file diff --git a/transform/sqlmesh_materia/macros/__init__.py b/transform/sqlmesh_materia/macros/__init__.py index 9d42e6d..0bf2bf1 100644 --- a/transform/sqlmesh_materia/macros/__init__.py +++ b/transform/sqlmesh_materia/macros/__init__.py @@ -43,3 +43,14 @@ def ice_stocks_by_port_glob(evaluator) -> str: """Return a quoted glob path for all ICE historical by-port CSV gzip files under LANDING_DIR.""" landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") return f"'{landing_dir}/ice_stocks_by_port/**/*.csv.gzip'" + + +@macro() +def weather_glob(evaluator) -> str: + """Return a quoted glob path for all OWM weather JSON gzip files under LANDING_DIR. + + Pattern: weather/{location_id}/{year}/{date}.json.gz + The double-star catches all location_id subdirectories. + """ + landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") + return f"'{landing_dir}/weather/**/*.json.gz'" diff --git a/transform/sqlmesh_materia/models/cleaned/cln_psdalldata__commodity_pivoted.sql b/transform/sqlmesh_materia/models/cleaned/cln_psdalldata__commodity_pivoted.sql index 029e459..4a596d5 100644 --- a/transform/sqlmesh_materia/models/cleaned/cln_psdalldata__commodity_pivoted.sql +++ b/transform/sqlmesh_materia/models/cleaned/cln_psdalldata__commodity_pivoted.sql @@ -1,58 +1,59 @@ -MODEL ( - name cleaned.psdalldata__commodity_pivoted, - kind INCREMENTAL_BY_TIME_RANGE ( - time_column ingest_date - ), - start '2006-08-01', - cron '@daily' -); - -SELECT - max(hkey) as hkey, - commodity_code, - max(commodity_name) as commodity_name, - country_code, - max(country_name) as country_name, - market_year, - ingest_date, - COALESCE(SUM(CASE WHEN attribute_name = 'Production' THEN value END), 0) AS Production, - COALESCE(SUM(CASE WHEN attribute_name = 'Imports' THEN value END), 0) AS Imports, - COALESCE(SUM(CASE WHEN attribute_name = 'Exports' THEN value END), 0) AS Exports, - COALESCE(SUM(CASE WHEN attribute_name = 'Total Distribution' THEN value END), 0) AS Total_Distribution, - COALESCE(SUM(CASE WHEN attribute_name = 'Ending Stocks' THEN value END), 0) AS Ending_Stocks, - COALESCE(SUM(CASE WHEN attribute_name = 'Beginning Stocks' THEN value END), 0) AS Beginning_Stocks, - COALESCE(SUM(CASE WHEN attribute_name = 'Total Supply' THEN value END), 0) AS Total_Supply, - COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Consumption' THEN value END), 0) AS Domestic_Consumption, - COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Demand' THEN value END), 0) AS Domestic_Demand, - COALESCE(SUM(CASE WHEN attribute_name = 'Food Use' THEN value END), 0) AS Food_Use, - COALESCE(SUM(CASE WHEN attribute_name = 'Industrial Use' THEN value END), 0) AS Industrial_Use, - COALESCE(SUM(CASE WHEN attribute_name = 'Seed Use' THEN value END), 0) AS Seed_Use, - COALESCE(SUM(CASE WHEN attribute_name = 'Waste' THEN value END), 0) AS Waste, - COALESCE(SUM(CASE WHEN attribute_name = 'Feed Use' THEN value END), 0) AS Feed_Use -FROM staging.psdalldata__commodity -WHERE attribute_name IN ( - 'Production', - 'Imports', - 'Exports', - 'Total Distribution', - 'Ending Stocks', - 'Beginning Stocks', - 'Total Supply', - 'Domestic Consumption', - 'Domestic Demand', - 'Food Use', - 'Industrial Use', - 'Seed Use', - 'Waste', - 'Feed Use' - ) -GROUP BY - commodity_code, - country_code, - market_year, - ingest_date -ORDER BY - commodity_code, - country_code, - market_year, - ingest_date +MODEL ( + name cleaned.psdalldata__commodity_pivoted, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ingest_date + ), + start '2006-08-01', + cron '@daily' +); + +SELECT + MAX(hkey) AS hkey, + commodity_code, + MAX(commodity_name) AS commodity_name, + country_code, + MAX(country_name) AS country_name, + market_year, + ingest_date, + COALESCE(SUM(CASE WHEN attribute_name = 'Production' THEN value END), 0) AS Production, + COALESCE(SUM(CASE WHEN attribute_name = 'Imports' THEN value END), 0) AS Imports, + COALESCE(SUM(CASE WHEN attribute_name = 'Exports' THEN value END), 0) AS Exports, + COALESCE(SUM(CASE WHEN attribute_name = 'Total Distribution' THEN value END), 0) AS Total_Distribution, + COALESCE(SUM(CASE WHEN attribute_name = 'Ending Stocks' THEN value END), 0) AS Ending_Stocks, + COALESCE(SUM(CASE WHEN attribute_name = 'Beginning Stocks' THEN value END), 0) AS Beginning_Stocks, + COALESCE(SUM(CASE WHEN attribute_name = 'Total Supply' THEN value END), 0) AS Total_Supply, + COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Consumption' THEN value END), 0) AS Domestic_Consumption, + COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Demand' THEN value END), 0) AS Domestic_Demand, + COALESCE(SUM(CASE WHEN attribute_name = 'Food Use' THEN value END), 0) AS Food_Use, + COALESCE(SUM(CASE WHEN attribute_name = 'Industrial Use' THEN value END), 0) AS Industrial_Use, + COALESCE(SUM(CASE WHEN attribute_name = 'Seed Use' THEN value END), 0) AS Seed_Use, + COALESCE(SUM(CASE WHEN attribute_name = 'Waste' THEN value END), 0) AS Waste, + COALESCE(SUM(CASE WHEN attribute_name = 'Feed Use' THEN value END), 0) AS Feed_Use +FROM staging.psdalldata__commodity +WHERE + attribute_name IN ( + 'Production', + 'Imports', + 'Exports', + 'Total Distribution', + 'Ending Stocks', + 'Beginning Stocks', + 'Total Supply', + 'Domestic Consumption', + 'Domestic Demand', + 'Food Use', + 'Industrial Use', + 'Seed Use', + 'Waste', + 'Feed Use' + ) +GROUP BY + commodity_code, + country_code, + market_year, + ingest_date +ORDER BY + commodity_code, + country_code, + market_year, + ingest_date \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/foundation/dim_commodity.sql b/transform/sqlmesh_materia/models/foundation/dim_commodity.sql index e806664..e5e5762 100644 --- a/transform/sqlmesh_materia/models/foundation/dim_commodity.sql +++ b/transform/sqlmesh_materia/models/foundation/dim_commodity.sql @@ -1,23 +1,15 @@ --- Commodity dimension: conforms identifiers across source systems. --- --- This is the ontology. Each row is a commodity tracked by BeanFlows. --- As new sources are added (ICO, futures prices, satellite), their --- commodity identifiers are added as columns here — not as separate tables. --- As new commodities are added (cocoa, sugar), rows are added here. --- --- References: --- usda_commodity_code → staging.psdalldata__commodity.commodity_code (numeric string, e.g. '0711100') --- cftc_commodity_code → foundation.fct_cot_positioning.cftc_commodity_code (3-char, e.g. '083') --- --- NOTE: Defined as FULL model (not SEED) to guarantee leading-zero preservation. --- Pandas CSV loading converts '083' → 83 even with varchar column declarations. - +/* Commodity dimension: conforms identifiers across source systems. */ /* This is the ontology. Each row is a commodity tracked by BeanFlows. */ /* As new sources are added (ICO, futures prices, satellite), their */ /* commodity identifiers are added as columns here — not as separate tables. */ /* As new commodities are added (cocoa, sugar), rows are added here. */ /* References: */ /* usda_commodity_code → staging.psdalldata__commodity.commodity_code (numeric string, e.g. '0711100') */ /* cftc_commodity_code → foundation.fct_cot_positioning.cftc_commodity_code (3-char, e.g. '083') */ /* NOTE: Defined as FULL model (not SEED) to guarantee leading-zero preservation. */ /* Pandas CSV loading converts '083' → 83 even with varchar column declarations. */ MODEL ( name foundation.dim_commodity, kind FULL ); -SELECT usda_commodity_code, cftc_commodity_code, ticker, ice_stock_report_code, commodity_name, commodity_group +SELECT + usda_commodity_code, + cftc_commodity_code, + ticker, + ice_stock_report_code, + commodity_name, + commodity_group FROM (VALUES - ('0711100', '083', 'KC=F', 'COFFEE-C', 'Coffee, Green', 'Softs') -) AS t(usda_commodity_code, cftc_commodity_code, ticker, ice_stock_report_code, commodity_name, commodity_group) + ('0711100', '083', 'KC=F', 'COFFEE-C', 'Coffee, Green', 'Softs')) AS t(usda_commodity_code, cftc_commodity_code, ticker, ice_stock_report_code, commodity_name, commodity_group) \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql b/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql index 78aff7b..c90adb9 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql @@ -1,69 +1,58 @@ --- Foundation fact: daily KC=F Coffee C futures prices. --- --- Reads directly from the landing zone, casts varchar columns to proper types, --- and deduplicates via hash key. --- Covers all available history from the landing directory. --- --- Grain: one row per trade_date. --- Dedup: hash of (trade_date, close) — if Yahoo Finance corrects a price, --- the new hash triggers a re-ingest on the next incremental run. - +/* Foundation fact: daily KC=F Coffee C futures prices. */ /* Reads directly from the landing zone, casts varchar columns to proper types, */ /* and deduplicates via hash key. */ /* Covers all available history from the landing directory. */ /* Grain: one row per trade_date. */ /* Dedup: hash of (trade_date, close) — if Yahoo Finance corrects a price, */ /* the new hash triggers a re-ingest on the next incremental run. */ MODEL ( name foundation.fct_coffee_prices, kind INCREMENTAL_BY_TIME_RANGE ( time_column trade_date ), - grain (trade_date), + grain ( + trade_date + ), start '1971-08-16', cron '@daily' ); WITH src AS ( - SELECT * FROM read_csv( + SELECT + * + FROM READ_CSV( @prices_glob(), - compression = 'gzip', - header = true, - union_by_name = true, - filename = true, - all_varchar = true + compression = 'gzip', + header = TRUE, + union_by_name = TRUE, + filename = TRUE, + all_varchar = TRUE ) -), - -cast_and_clean AS ( +), cast_and_clean AS ( SELECT - TRY_CAST(Date AS date) AS trade_date, - TRY_CAST(Open AS double) AS open, - TRY_CAST(High AS double) AS high, - TRY_CAST(Low AS double) AS low, - TRY_CAST(Close AS double) AS close, - TRY_CAST(Adj_Close AS double) AS adj_close, - TRY_CAST(Volume AS bigint) AS volume, - - -- Filename encodes the content hash — use as ingest identifier - filename AS source_file, - - -- Dedup key: trade date + close price - hash(Date, Close) AS hkey + TRY_CAST(Date AS DATE) AS trade_date, + TRY_CAST(Open AS DOUBLE) AS open, + TRY_CAST(High AS DOUBLE) AS high, + TRY_CAST(Low AS DOUBLE) AS low, + TRY_CAST(Close AS DOUBLE) AS close, + TRY_CAST(Adj_Close AS DOUBLE) AS adj_close, + TRY_CAST(Volume AS BIGINT) AS volume, + filename AS source_file, /* Filename encodes the content hash — use as ingest identifier */ + HASH(Date, Close) AS hkey /* Dedup key: trade date + close price */ FROM src - WHERE TRY_CAST(Date AS date) IS NOT NULL - AND TRY_CAST(Close AS double) IS NOT NULL -), - -deduplicated AS ( + WHERE + NOT TRY_CAST(Date AS DATE) IS NULL AND NOT TRY_CAST(Close AS DOUBLE) IS NULL +), deduplicated AS ( SELECT - any_value(trade_date) AS trade_date, - any_value(open) AS open, - any_value(high) AS high, - any_value(low) AS low, - any_value(close) AS close, - any_value(adj_close) AS adj_close, - any_value(volume) AS volume, - any_value(source_file) AS source_file, + ANY_VALUE(trade_date) AS trade_date, + ANY_VALUE(open) AS open, + ANY_VALUE(high) AS high, + ANY_VALUE(low) AS low, + ANY_VALUE(close) AS close, + ANY_VALUE(adj_close) AS adj_close, + ANY_VALUE(volume) AS volume, + ANY_VALUE(source_file) AS source_file, hkey FROM cast_and_clean - GROUP BY hkey + GROUP BY + hkey ) - -SELECT * +SELECT + * FROM deduplicated -WHERE trade_date BETWEEN @start_ds AND @end_ds +WHERE + trade_date BETWEEN @start_ds AND @end_ds \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql b/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql index 404b2e1..2a2e9cd 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql @@ -1,14 +1,4 @@ --- Foundation fact: CFTC COT positioning, weekly grain, all commodities. --- --- Reads directly from the landing zone, casts varchar columns to proper types, --- cleans column names, computes net positions (long - short) per trader category, --- and deduplicates via hash key. Covers all commodities — filtering to --- a specific commodity happens in the serving layer. --- --- Grain: one row per (cftc_commodity_code, report_date, cftc_contract_market_code) --- History: revisions appear as new rows with a later ingest_date. --- Serving layer picks max(ingest_date) per grain for latest view. - +/* Foundation fact: CFTC COT positioning, weekly grain, all commodities. */ /* Reads directly from the landing zone, casts varchar columns to proper types, */ /* cleans column names, computes net positions (long - short) per trader category, */ /* and deduplicates via hash key. Covers all commodities — filtering to */ /* a specific commodity happens in the serving layer. */ /* Grain: one row per (cftc_commodity_code, report_date, cftc_contract_market_code) */ /* History: revisions appear as new rows with a later ingest_date. */ /* Serving layer picks max(ingest_date) per grain for latest view. */ MODEL ( name foundation.fct_cot_positioning, kind INCREMENTAL_BY_TIME_RANGE ( @@ -20,92 +10,59 @@ MODEL ( ); WITH src AS ( - SELECT * FROM read_csv( + SELECT + * + FROM READ_CSV( @cot_glob(), - compression = 'gzip', - header = true, - union_by_name = true, - filename = true, - all_varchar = true, + compression = 'gzip', + header = TRUE, + union_by_name = TRUE, + filename = TRUE, + all_varchar = TRUE, max_line_size = 10000000 ) -), - -cast_and_clean AS ( +), cast_and_clean AS ( SELECT - -- Identifiers - trim(market_and_exchange_names) AS market_and_exchange_name, - report_date_as_yyyy_mm_dd::date AS report_date, - trim(cftc_commodity_code) AS cftc_commodity_code, - trim(cftc_contract_market_code) AS cftc_contract_market_code, - trim(contract_units) AS contract_units, - - -- Open interest - -- CFTC uses '.' as null for any field — use TRY_CAST throughout - TRY_CAST(open_interest_all AS int) AS open_interest, - - -- Producer / Merchant (commercial hedgers: exporters, processors) - TRY_CAST(prod_merc_positions_long_all AS int) AS prod_merc_long, - TRY_CAST(prod_merc_positions_short_all AS int) AS prod_merc_short, - - -- Swap dealers - TRY_CAST(swap_positions_long_all AS int) AS swap_long, - TRY_CAST(swap_positions_short_all AS int) AS swap_short, - TRY_CAST(swap_positions_spread_all AS int) AS swap_spread, - - -- Managed money (hedge funds, CTAs — the primary speculative signal) - TRY_CAST(m_money_positions_long_all AS int) AS managed_money_long, - TRY_CAST(m_money_positions_short_all AS int) AS managed_money_short, - TRY_CAST(m_money_positions_spread_all AS int) AS managed_money_spread, - - -- Other reportables - TRY_CAST(other_rept_positions_long_all AS int) AS other_reportable_long, - TRY_CAST(other_rept_positions_short_all AS int) AS other_reportable_short, - TRY_CAST(other_rept_positions_spread_all AS int) AS other_reportable_spread, - - -- Non-reportable (small speculators, below reporting threshold) - TRY_CAST(nonrept_positions_long_all AS int) AS nonreportable_long, - TRY_CAST(nonrept_positions_short_all AS int) AS nonreportable_short, - - -- Net positions (long minus short per category) - TRY_CAST(prod_merc_positions_long_all AS int) - - TRY_CAST(prod_merc_positions_short_all AS int) AS prod_merc_net, - TRY_CAST(m_money_positions_long_all AS int) - - TRY_CAST(m_money_positions_short_all AS int) AS managed_money_net, - TRY_CAST(swap_positions_long_all AS int) - - TRY_CAST(swap_positions_short_all AS int) AS swap_net, - TRY_CAST(other_rept_positions_long_all AS int) - - TRY_CAST(other_rept_positions_short_all AS int) AS other_reportable_net, - TRY_CAST(nonrept_positions_long_all AS int) - - TRY_CAST(nonrept_positions_short_all AS int) AS nonreportable_net, - - -- Week-over-week changes - TRY_CAST(change_in_open_interest_all AS int) AS change_open_interest, - TRY_CAST(change_in_m_money_long_all AS int) AS change_managed_money_long, - TRY_CAST(change_in_m_money_short_all AS int) AS change_managed_money_short, - TRY_CAST(change_in_m_money_long_all AS int) - - TRY_CAST(change_in_m_money_short_all AS int) AS change_managed_money_net, - TRY_CAST(change_in_prod_merc_long_all AS int) AS change_prod_merc_long, - TRY_CAST(change_in_prod_merc_short_all AS int) AS change_prod_merc_short, - - -- Concentration ratios (% of OI held by top 4 / top 8 traders) - TRY_CAST(conc_gross_le_4_tdr_long_all AS float) AS concentration_top4_long_pct, - TRY_CAST(conc_gross_le_4_tdr_short_all AS float) AS concentration_top4_short_pct, - TRY_CAST(conc_gross_le_8_tdr_long_all AS float) AS concentration_top8_long_pct, - TRY_CAST(conc_gross_le_8_tdr_short_all AS float) AS concentration_top8_short_pct, - - -- Trader counts - TRY_CAST(traders_tot_all AS int) AS traders_total, - TRY_CAST(traders_m_money_long_all AS int) AS traders_managed_money_long, - TRY_CAST(traders_m_money_short_all AS int) AS traders_managed_money_short, - TRY_CAST(traders_m_money_spread_all AS int) AS traders_managed_money_spread, - - -- Ingest date: derived from landing path year directory - -- Path: .../cot/{year}/{etag}.csv.gzip → extract year from [-2] - make_date(split(filename, '/')[-2]::int, 1, 1) AS ingest_date, - - -- Dedup key: hash of business grain + key metrics - hash( + TRIM(market_and_exchange_names) AS market_and_exchange_name, /* Identifiers */ + report_date_as_yyyy_mm_dd::DATE AS report_date, + TRIM(cftc_commodity_code) AS cftc_commodity_code, + TRIM(cftc_contract_market_code) AS cftc_contract_market_code, + TRIM(contract_units) AS contract_units, + TRY_CAST(open_interest_all AS INT) AS open_interest, /* Open interest */ /* CFTC uses '.' as null for any field — use TRY_CAST throughout */ + TRY_CAST(prod_merc_positions_long_all AS INT) AS prod_merc_long, /* Producer / Merchant (commercial hedgers: exporters, processors) */ + TRY_CAST(prod_merc_positions_short_all AS INT) AS prod_merc_short, + TRY_CAST(swap_positions_long_all AS INT) AS swap_long, /* Swap dealers */ + TRY_CAST(swap_positions_short_all AS INT) AS swap_short, + TRY_CAST(swap_positions_spread_all AS INT) AS swap_spread, + TRY_CAST(m_money_positions_long_all AS INT) AS managed_money_long, /* Managed money (hedge funds, CTAs — the primary speculative signal) */ + TRY_CAST(m_money_positions_short_all AS INT) AS managed_money_short, + TRY_CAST(m_money_positions_spread_all AS INT) AS managed_money_spread, + TRY_CAST(other_rept_positions_long_all AS INT) AS other_reportable_long, /* Other reportables */ + TRY_CAST(other_rept_positions_short_all AS INT) AS other_reportable_short, + TRY_CAST(other_rept_positions_spread_all AS INT) AS other_reportable_spread, + TRY_CAST(nonrept_positions_long_all AS INT) AS nonreportable_long, /* Non-reportable (small speculators, below reporting threshold) */ + TRY_CAST(nonrept_positions_short_all AS INT) AS nonreportable_short, + TRY_CAST(prod_merc_positions_long_all AS INT) /* Net positions (long minus short per category) */ - TRY_CAST(prod_merc_positions_short_all AS INT) AS prod_merc_net, + TRY_CAST(m_money_positions_long_all AS INT) - TRY_CAST(m_money_positions_short_all AS INT) AS managed_money_net, + TRY_CAST(swap_positions_long_all AS INT) - TRY_CAST(swap_positions_short_all AS INT) AS swap_net, + TRY_CAST(other_rept_positions_long_all AS INT) - TRY_CAST(other_rept_positions_short_all AS INT) AS other_reportable_net, + TRY_CAST(nonrept_positions_long_all AS INT) - TRY_CAST(nonrept_positions_short_all AS INT) AS nonreportable_net, + TRY_CAST(change_in_open_interest_all AS INT) AS change_open_interest, /* Week-over-week changes */ + TRY_CAST(change_in_m_money_long_all AS INT) AS change_managed_money_long, + TRY_CAST(change_in_m_money_short_all AS INT) AS change_managed_money_short, + TRY_CAST(change_in_m_money_long_all AS INT) - TRY_CAST(change_in_m_money_short_all AS INT) AS change_managed_money_net, + TRY_CAST(change_in_prod_merc_long_all AS INT) AS change_prod_merc_long, + TRY_CAST(change_in_prod_merc_short_all AS INT) AS change_prod_merc_short, + TRY_CAST(conc_gross_le_4_tdr_long_all AS REAL) AS concentration_top4_long_pct, /* Concentration ratios (% of OI held by top 4 / top 8 traders) */ + TRY_CAST(conc_gross_le_4_tdr_short_all AS REAL) AS concentration_top4_short_pct, + TRY_CAST(conc_gross_le_8_tdr_long_all AS REAL) AS concentration_top8_long_pct, + TRY_CAST(conc_gross_le_8_tdr_short_all AS REAL) AS concentration_top8_short_pct, + TRY_CAST(traders_tot_all AS INT) AS traders_total, /* Trader counts */ + TRY_CAST(traders_m_money_long_all AS INT) AS traders_managed_money_long, + TRY_CAST(traders_m_money_short_all AS INT) AS traders_managed_money_short, + TRY_CAST(traders_m_money_spread_all AS INT) AS traders_managed_money_spread, + MAKE_DATE(STR_SPLIT(filename, '/')[-2]::INT, 1, 1) AS ingest_date, /* Ingest date: derived from landing path year directory */ /* Path: .../cot/{year}/{etag}.csv.gzip → extract year from [-2] */ + HASH( cftc_commodity_code, report_date_as_yyyy_mm_dd, cftc_contract_market_code, @@ -114,60 +71,61 @@ cast_and_clean AS ( m_money_positions_short_all, prod_merc_positions_long_all, prod_merc_positions_short_all - ) AS hkey + ) AS hkey /* Dedup key: hash of business grain + key metrics */ FROM src - -- Reject rows with null commodity code or malformed date - WHERE trim(cftc_commodity_code) IS NOT NULL - AND len(trim(cftc_commodity_code)) > 0 - AND report_date_as_yyyy_mm_dd::date IS NOT NULL -), - -deduplicated AS ( + /* Reject rows with null commodity code or malformed date */ + WHERE + NOT TRIM(cftc_commodity_code) IS NULL + AND LENGTH(TRIM(cftc_commodity_code)) > 0 + AND NOT report_date_as_yyyy_mm_dd::DATE IS NULL +), deduplicated AS ( SELECT - any_value(market_and_exchange_name) AS market_and_exchange_name, - any_value(report_date) AS report_date, - any_value(cftc_commodity_code) AS cftc_commodity_code, - any_value(cftc_contract_market_code) AS cftc_contract_market_code, - any_value(contract_units) AS contract_units, - any_value(open_interest) AS open_interest, - any_value(prod_merc_long) AS prod_merc_long, - any_value(prod_merc_short) AS prod_merc_short, - any_value(prod_merc_net) AS prod_merc_net, - any_value(swap_long) AS swap_long, - any_value(swap_short) AS swap_short, - any_value(swap_spread) AS swap_spread, - any_value(swap_net) AS swap_net, - any_value(managed_money_long) AS managed_money_long, - any_value(managed_money_short) AS managed_money_short, - any_value(managed_money_spread) AS managed_money_spread, - any_value(managed_money_net) AS managed_money_net, - any_value(other_reportable_long) AS other_reportable_long, - any_value(other_reportable_short) AS other_reportable_short, - any_value(other_reportable_spread) AS other_reportable_spread, - any_value(other_reportable_net) AS other_reportable_net, - any_value(nonreportable_long) AS nonreportable_long, - any_value(nonreportable_short) AS nonreportable_short, - any_value(nonreportable_net) AS nonreportable_net, - any_value(change_open_interest) AS change_open_interest, - any_value(change_managed_money_long) AS change_managed_money_long, - any_value(change_managed_money_short) AS change_managed_money_short, - any_value(change_managed_money_net) AS change_managed_money_net, - any_value(change_prod_merc_long) AS change_prod_merc_long, - any_value(change_prod_merc_short) AS change_prod_merc_short, - any_value(concentration_top4_long_pct) AS concentration_top4_long_pct, - any_value(concentration_top4_short_pct) AS concentration_top4_short_pct, - any_value(concentration_top8_long_pct) AS concentration_top8_long_pct, - any_value(concentration_top8_short_pct) AS concentration_top8_short_pct, - any_value(traders_total) AS traders_total, - any_value(traders_managed_money_long) AS traders_managed_money_long, - any_value(traders_managed_money_short) AS traders_managed_money_short, - any_value(traders_managed_money_spread) AS traders_managed_money_spread, - any_value(ingest_date) AS ingest_date, + ANY_VALUE(market_and_exchange_name) AS market_and_exchange_name, + ANY_VALUE(report_date) AS report_date, + ANY_VALUE(cftc_commodity_code) AS cftc_commodity_code, + ANY_VALUE(cftc_contract_market_code) AS cftc_contract_market_code, + ANY_VALUE(contract_units) AS contract_units, + ANY_VALUE(open_interest) AS open_interest, + ANY_VALUE(prod_merc_long) AS prod_merc_long, + ANY_VALUE(prod_merc_short) AS prod_merc_short, + ANY_VALUE(prod_merc_net) AS prod_merc_net, + ANY_VALUE(swap_long) AS swap_long, + ANY_VALUE(swap_short) AS swap_short, + ANY_VALUE(swap_spread) AS swap_spread, + ANY_VALUE(swap_net) AS swap_net, + ANY_VALUE(managed_money_long) AS managed_money_long, + ANY_VALUE(managed_money_short) AS managed_money_short, + ANY_VALUE(managed_money_spread) AS managed_money_spread, + ANY_VALUE(managed_money_net) AS managed_money_net, + ANY_VALUE(other_reportable_long) AS other_reportable_long, + ANY_VALUE(other_reportable_short) AS other_reportable_short, + ANY_VALUE(other_reportable_spread) AS other_reportable_spread, + ANY_VALUE(other_reportable_net) AS other_reportable_net, + ANY_VALUE(nonreportable_long) AS nonreportable_long, + ANY_VALUE(nonreportable_short) AS nonreportable_short, + ANY_VALUE(nonreportable_net) AS nonreportable_net, + ANY_VALUE(change_open_interest) AS change_open_interest, + ANY_VALUE(change_managed_money_long) AS change_managed_money_long, + ANY_VALUE(change_managed_money_short) AS change_managed_money_short, + ANY_VALUE(change_managed_money_net) AS change_managed_money_net, + ANY_VALUE(change_prod_merc_long) AS change_prod_merc_long, + ANY_VALUE(change_prod_merc_short) AS change_prod_merc_short, + ANY_VALUE(concentration_top4_long_pct) AS concentration_top4_long_pct, + ANY_VALUE(concentration_top4_short_pct) AS concentration_top4_short_pct, + ANY_VALUE(concentration_top8_long_pct) AS concentration_top8_long_pct, + ANY_VALUE(concentration_top8_short_pct) AS concentration_top8_short_pct, + ANY_VALUE(traders_total) AS traders_total, + ANY_VALUE(traders_managed_money_long) AS traders_managed_money_long, + ANY_VALUE(traders_managed_money_short) AS traders_managed_money_short, + ANY_VALUE(traders_managed_money_spread) AS traders_managed_money_spread, + ANY_VALUE(ingest_date) AS ingest_date, hkey FROM cast_and_clean - GROUP BY hkey + GROUP BY + hkey ) - -SELECT * +SELECT + * FROM deduplicated -WHERE report_date BETWEEN @start_ds AND @end_ds +WHERE + report_date BETWEEN @start_ds AND @end_ds \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/foundation/fct_ice_aging_stocks.sql b/transform/sqlmesh_materia/models/foundation/fct_ice_aging_stocks.sql index 6643e72..e96f934 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_ice_aging_stocks.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_ice_aging_stocks.sql @@ -1,11 +1,4 @@ --- Foundation fact: ICE certified Coffee C (Arabica) aging report. --- --- Reads directly from the landing zone, casts varchar columns to proper types, --- and deduplicates via hash key. --- Grain: one row per (report_date, age_bucket). --- Age buckets represent how long coffee has been in certified storage. --- Port columns are in bags (60kg). - +/* Foundation fact: ICE certified Coffee C (Arabica) aging report. */ /* Reads directly from the landing zone, casts varchar columns to proper types, */ /* and deduplicates via hash key. */ /* Grain: one row per (report_date, age_bucket). */ /* Age buckets represent how long coffee has been in certified storage. */ /* Port columns are in bags (60kg). */ MODEL ( name foundation.fct_ice_aging_stocks, kind INCREMENTAL_BY_TIME_RANGE ( @@ -17,54 +10,53 @@ MODEL ( ); WITH src AS ( - SELECT * FROM read_csv( + SELECT + * + FROM READ_CSV( @ice_aging_glob(), - compression = 'gzip', - header = true, - union_by_name = true, - filename = true, - all_varchar = true + compression = 'gzip', + header = TRUE, + union_by_name = TRUE, + filename = TRUE, + all_varchar = TRUE ) -), - -cast_and_clean AS ( +), cast_and_clean AS ( SELECT - TRY_CAST(report_date AS date) AS report_date, + TRY_CAST(report_date AS DATE) AS report_date, age_bucket, - TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags, - TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags, - TRY_CAST(houston_bags AS bigint) AS houston_bags, - TRY_CAST(miami_bags AS bigint) AS miami_bags, - TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags, - TRY_CAST(new_york_bags AS bigint) AS new_york_bags, - TRY_CAST(total_bags AS bigint) AS total_bags, - - filename AS source_file, - - hash(report_date, age_bucket, total_bags) AS hkey + TRY_CAST(antwerp_bags AS BIGINT) AS antwerp_bags, + TRY_CAST(hamburg_bremen_bags AS BIGINT) AS hamburg_bremen_bags, + TRY_CAST(houston_bags AS BIGINT) AS houston_bags, + TRY_CAST(miami_bags AS BIGINT) AS miami_bags, + TRY_CAST(new_orleans_bags AS BIGINT) AS new_orleans_bags, + TRY_CAST(new_york_bags AS BIGINT) AS new_york_bags, + TRY_CAST(total_bags AS BIGINT) AS total_bags, + filename AS source_file, + HASH(report_date, age_bucket, total_bags) AS hkey FROM src - WHERE TRY_CAST(report_date AS date) IS NOT NULL - AND age_bucket IS NOT NULL - AND age_bucket != '' -), - -deduplicated AS ( + WHERE + NOT TRY_CAST(report_date AS DATE) IS NULL + AND NOT age_bucket IS NULL + AND age_bucket <> '' +), deduplicated AS ( SELECT - any_value(report_date) AS report_date, - any_value(age_bucket) AS age_bucket, - any_value(antwerp_bags) AS antwerp_bags, - any_value(hamburg_bremen_bags) AS hamburg_bremen_bags, - any_value(houston_bags) AS houston_bags, - any_value(miami_bags) AS miami_bags, - any_value(new_orleans_bags) AS new_orleans_bags, - any_value(new_york_bags) AS new_york_bags, - any_value(total_bags) AS total_bags, - any_value(source_file) AS source_file, + ANY_VALUE(report_date) AS report_date, + ANY_VALUE(age_bucket) AS age_bucket, + ANY_VALUE(antwerp_bags) AS antwerp_bags, + ANY_VALUE(hamburg_bremen_bags) AS hamburg_bremen_bags, + ANY_VALUE(houston_bags) AS houston_bags, + ANY_VALUE(miami_bags) AS miami_bags, + ANY_VALUE(new_orleans_bags) AS new_orleans_bags, + ANY_VALUE(new_york_bags) AS new_york_bags, + ANY_VALUE(total_bags) AS total_bags, + ANY_VALUE(source_file) AS source_file, hkey FROM cast_and_clean - GROUP BY hkey + GROUP BY + hkey ) - -SELECT * +SELECT + * FROM deduplicated -WHERE report_date BETWEEN @start_ds AND @end_ds +WHERE + report_date BETWEEN @start_ds AND @end_ds \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks.sql b/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks.sql index 92973fb..32d5d08 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks.sql @@ -1,59 +1,51 @@ --- Foundation fact: ICE certified Coffee C (Arabica) warehouse stocks. --- --- Reads directly from the landing zone, casts varchar columns to proper types, --- and deduplicates via hash key. --- "Certified" means Coffee C graded and stamped as delivery-eligible --- against ICE futures contracts — a key physical supply indicator. --- --- Grain: one row per report_date. - +/* Foundation fact: ICE certified Coffee C (Arabica) warehouse stocks. */ /* Reads directly from the landing zone, casts varchar columns to proper types, */ /* and deduplicates via hash key. */ /* "Certified" means Coffee C graded and stamped as delivery-eligible */ /* against ICE futures contracts — a key physical supply indicator. */ /* Grain: one row per report_date. */ MODEL ( name foundation.fct_ice_warehouse_stocks, kind INCREMENTAL_BY_TIME_RANGE ( time_column report_date ), - grain (report_date), + grain ( + report_date + ), start '2000-01-01', cron '@daily' ); WITH src AS ( - SELECT * FROM read_csv( + SELECT + * + FROM READ_CSV( @ice_stocks_glob(), - compression = 'gzip', - header = true, - union_by_name = true, - filename = true, - all_varchar = true + compression = 'gzip', + header = TRUE, + union_by_name = TRUE, + filename = TRUE, + all_varchar = TRUE ) -), - -cast_and_clean AS ( +), cast_and_clean AS ( SELECT - TRY_CAST(report_date AS date) AS report_date, - TRY_CAST(total_certified_bags AS bigint) AS total_certified_bags, - TRY_CAST(pending_grading_bags AS bigint) AS pending_grading_bags, - - filename AS source_file, - - -- Dedup key: report date + total bags - hash(report_date, total_certified_bags) AS hkey + TRY_CAST(report_date AS DATE) AS report_date, + TRY_CAST(total_certified_bags AS BIGINT) AS total_certified_bags, + TRY_CAST(pending_grading_bags AS BIGINT) AS pending_grading_bags, + filename AS source_file, + HASH(report_date, total_certified_bags) AS hkey /* Dedup key: report date + total bags */ FROM src - WHERE TRY_CAST(report_date AS date) IS NOT NULL - AND TRY_CAST(total_certified_bags AS bigint) IS NOT NULL -), - -deduplicated AS ( + WHERE + NOT TRY_CAST(report_date AS DATE) IS NULL + AND NOT TRY_CAST(total_certified_bags AS BIGINT) IS NULL +), deduplicated AS ( SELECT - any_value(report_date) AS report_date, - any_value(total_certified_bags) AS total_certified_bags, - any_value(pending_grading_bags) AS pending_grading_bags, - any_value(source_file) AS source_file, + ANY_VALUE(report_date) AS report_date, + ANY_VALUE(total_certified_bags) AS total_certified_bags, + ANY_VALUE(pending_grading_bags) AS pending_grading_bags, + ANY_VALUE(source_file) AS source_file, hkey FROM cast_and_clean - GROUP BY hkey + GROUP BY + hkey ) - -SELECT * +SELECT + * FROM deduplicated -WHERE report_date BETWEEN @start_ds AND @end_ds +WHERE + report_date BETWEEN @start_ds AND @end_ds \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks_by_port.sql b/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks_by_port.sql index d417f87..e612534 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks_by_port.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks_by_port.sql @@ -1,72 +1,65 @@ --- Foundation fact: ICE historical end-of-month Coffee C certified warehouse stocks by port. --- --- Reads directly from the landing zone, casts varchar columns to proper types, --- and deduplicates via hash key. --- Covers November 1996 to present (30-year history). --- --- Grain: one row per report_date (end-of-month). --- Port columns are in bags (60kg). - +/* Foundation fact: ICE historical end-of-month Coffee C certified warehouse stocks by port. */ /* Reads directly from the landing zone, casts varchar columns to proper types, */ /* and deduplicates via hash key. */ /* Covers November 1996 to present (30-year history). */ /* Grain: one row per report_date (end-of-month). */ /* Port columns are in bags (60kg). */ MODEL ( name foundation.fct_ice_warehouse_stocks_by_port, kind INCREMENTAL_BY_TIME_RANGE ( time_column report_date ), - grain (report_date), + grain ( + report_date + ), start '1996-11-01', cron '@daily' ); WITH src AS ( - SELECT * FROM read_csv( + SELECT + * + FROM READ_CSV( @ice_stocks_by_port_glob(), - compression = 'gzip', - header = true, - union_by_name = true, - filename = true, - all_varchar = true + compression = 'gzip', + header = TRUE, + union_by_name = TRUE, + filename = TRUE, + all_varchar = TRUE ) -), - -cast_and_clean AS ( +), cast_and_clean AS ( SELECT - TRY_CAST(report_date AS date) AS report_date, - TRY_CAST(new_york_bags AS bigint) AS new_york_bags, - TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags, - TRY_CAST(houston_bags AS bigint) AS houston_bags, - TRY_CAST(miami_bags AS bigint) AS miami_bags, - TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags, - TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags, - TRY_CAST(barcelona_bags AS bigint) AS barcelona_bags, - TRY_CAST(virginia_bags AS bigint) AS virginia_bags, - TRY_CAST(total_bags AS bigint) AS total_bags, - - filename AS source_file, - - hash(report_date, total_bags) AS hkey + TRY_CAST(report_date AS DATE) AS report_date, + TRY_CAST(new_york_bags AS BIGINT) AS new_york_bags, + TRY_CAST(new_orleans_bags AS BIGINT) AS new_orleans_bags, + TRY_CAST(houston_bags AS BIGINT) AS houston_bags, + TRY_CAST(miami_bags AS BIGINT) AS miami_bags, + TRY_CAST(antwerp_bags AS BIGINT) AS antwerp_bags, + TRY_CAST(hamburg_bremen_bags AS BIGINT) AS hamburg_bremen_bags, + TRY_CAST(barcelona_bags AS BIGINT) AS barcelona_bags, + TRY_CAST(virginia_bags AS BIGINT) AS virginia_bags, + TRY_CAST(total_bags AS BIGINT) AS total_bags, + filename AS source_file, + HASH(report_date, total_bags) AS hkey FROM src - WHERE TRY_CAST(report_date AS date) IS NOT NULL - AND TRY_CAST(total_bags AS bigint) IS NOT NULL -), - -deduplicated AS ( + WHERE + NOT TRY_CAST(report_date AS DATE) IS NULL + AND NOT TRY_CAST(total_bags AS BIGINT) IS NULL +), deduplicated AS ( SELECT - any_value(report_date) AS report_date, - any_value(new_york_bags) AS new_york_bags, - any_value(new_orleans_bags) AS new_orleans_bags, - any_value(houston_bags) AS houston_bags, - any_value(miami_bags) AS miami_bags, - any_value(antwerp_bags) AS antwerp_bags, - any_value(hamburg_bremen_bags) AS hamburg_bremen_bags, - any_value(barcelona_bags) AS barcelona_bags, - any_value(virginia_bags) AS virginia_bags, - any_value(total_bags) AS total_bags, - any_value(source_file) AS source_file, + ANY_VALUE(report_date) AS report_date, + ANY_VALUE(new_york_bags) AS new_york_bags, + ANY_VALUE(new_orleans_bags) AS new_orleans_bags, + ANY_VALUE(houston_bags) AS houston_bags, + ANY_VALUE(miami_bags) AS miami_bags, + ANY_VALUE(antwerp_bags) AS antwerp_bags, + ANY_VALUE(hamburg_bremen_bags) AS hamburg_bremen_bags, + ANY_VALUE(barcelona_bags) AS barcelona_bags, + ANY_VALUE(virginia_bags) AS virginia_bags, + ANY_VALUE(total_bags) AS total_bags, + ANY_VALUE(source_file) AS source_file, hkey FROM cast_and_clean - GROUP BY hkey + GROUP BY + hkey ) - -SELECT * +SELECT + * FROM deduplicated -WHERE report_date BETWEEN @start_ds AND @end_ds +WHERE + report_date BETWEEN @start_ds AND @end_ds \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/foundation/fct_weather_daily.sql b/transform/sqlmesh_materia/models/foundation/fct_weather_daily.sql new file mode 100644 index 0000000..020504e --- /dev/null +++ b/transform/sqlmesh_materia/models/foundation/fct_weather_daily.sql @@ -0,0 +1,93 @@ +/* Foundation fact: daily weather observations for 8 coffee-growing regions. */ /* Source: OpenWeatherMap One Call API 3.0 / Day Summary */ /* Landing: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz */ /* One file per (location_id, date). Content: raw OWM day summary JSON. */ /* Each file is a single JSON object (not newline-delimited), so format='auto'. */ /* Grain: (location_id, observation_date) — one row per location per day. */ /* Dedup key: hash(location_id, date) — past weather is immutable. */ /* location_id is parsed from the filename path: split(filename, '/')[-3] */ /* Path structure: .../weather/{location_id}/{year}/{date}.json.gz */ /* Crop stress flags (agronomic thresholds for Arabica coffee): */ /* is_frost — temp_min_c < 2.0°C (ICO frost damage threshold) */ /* is_heat_stress — temp_max_c > 35.0°C (photosynthesis impairment) */ /* is_drought — precipitation_mm < 1.0 (dry day; OWM omits field when 0) */ /* in_growing_season — simplified month-range flag by variety */ +MODEL ( + name foundation.fct_weather_daily, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column observation_date + ), + grain (location_id, observation_date), + start '2020-01-01', + cron '@daily' +); + +WITH src AS ( + /* Each file is a single JSON object with nested fields: */ /* temperature.{min,max,afternoon,morning,evening,night} */ /* precipitation.total (absent when 0 — COALESCE to 0 downstream) */ /* humidity.afternoon */ /* cloud_cover.afternoon */ /* wind.max.{speed,direction} */ /* pressure.afternoon */ /* DuckDB read_json(format='auto') creates STRUCT columns for nested objects; */ /* fields are accessed with dot notation (temperature.min, wind.max.speed). */ + SELECT + * + FROM READ_JSON(@weather_glob(), format = 'auto', compression = 'gzip', filename = TRUE) +), located AS ( + SELECT + src.*, + STR_SPLIT(filename, '/')[-3] AS location_id, /* location_id is the 3rd-from-last path segment: */ /* e.g. .../weather/brazil_minas_gerais/2024/2024-01-15.json.gz → 'brazil_minas_gerais' */ + TRY_CAST(src."date" AS DATE) AS observation_date + FROM src +), cast_and_clean AS ( + SELECT + location_id, + observation_date, + TRY_CAST(located.temperature.min AS DOUBLE) AS temp_min_c, /* Temperature (°C, metric units) */ + TRY_CAST(located.temperature.max AS DOUBLE) AS temp_max_c, + TRY_CAST(located.temperature.afternoon AS DOUBLE) AS temp_afternoon_c, + COALESCE(TRY_CAST(located.precipitation.total AS DOUBLE), 0.0) AS precipitation_mm, /* Precipitation (mm total for the day; OWM omits field when 0) */ + TRY_CAST(located.humidity.afternoon AS DOUBLE) AS humidity_afternoon_pct, /* Humidity (% afternoon reading) */ + TRY_CAST(located.cloud_cover.afternoon AS DOUBLE) AS cloud_cover_afternoon_pct, /* Cloud cover (% afternoon) */ + TRY_CAST(located.wind.max.speed AS DOUBLE) AS wind_max_speed_ms, /* Wind (m/s max speed, degrees direction) */ + TRY_CAST(located.pressure.afternoon AS DOUBLE) AS pressure_afternoon_hpa, /* Pressure (hPa afternoon) */ + TRY_CAST(located.temperature.min AS DOUBLE) /* Crop stress flags */ < 2.0 AS is_frost, + TRY_CAST(located.temperature.max AS DOUBLE) > 35.0 AS is_heat_stress, + COALESCE(TRY_CAST(located.precipitation.total AS DOUBLE), 0.0) < 1.0 AS is_drought, + HASH(location_id, src."date") AS hkey, + filename + FROM located + WHERE + NOT observation_date IS NULL AND NOT location_id IS NULL AND location_id <> '' +), deduplicated AS ( + SELECT + ANY_VALUE(location_id) AS location_id, + ANY_VALUE(observation_date) AS observation_date, + ANY_VALUE(temp_min_c) AS temp_min_c, + ANY_VALUE(temp_max_c) AS temp_max_c, + ANY_VALUE(temp_afternoon_c) AS temp_afternoon_c, + ANY_VALUE(precipitation_mm) AS precipitation_mm, + ANY_VALUE(humidity_afternoon_pct) AS humidity_afternoon_pct, + ANY_VALUE(cloud_cover_afternoon_pct) AS cloud_cover_afternoon_pct, + ANY_VALUE(wind_max_speed_ms) AS wind_max_speed_ms, + ANY_VALUE(pressure_afternoon_hpa) AS pressure_afternoon_hpa, + ANY_VALUE(is_frost) AS is_frost, + ANY_VALUE(is_heat_stress) AS is_heat_stress, + ANY_VALUE(is_drought) AS is_drought, + hkey + FROM cast_and_clean + GROUP BY + hkey +) +SELECT + d.observation_date, + d.location_id, + loc.name AS location_name, + loc.country, + loc.lat, + loc.lon, + loc.variety, + d.temp_min_c, + d.temp_max_c, + d.temp_afternoon_c, + d.precipitation_mm, + d.humidity_afternoon_pct, + d.cloud_cover_afternoon_pct, + d.wind_max_speed_ms, + d.pressure_afternoon_hpa, + d.is_frost, + d.is_heat_stress, + d.is_drought, + CASE loc.variety + WHEN 'Arabica' + THEN EXTRACT(MONTH FROM d.observation_date) BETWEEN 4 AND 10 + WHEN 'Robusta' + THEN EXTRACT(MONTH FROM d.observation_date) BETWEEN 4 AND 11 + ELSE FALSE + END AS in_growing_season /* Growing season: simplified month-range flag by variety. */ /* Arabica: Apr–Oct (covers northern + southern hemisphere risk windows). */ /* Robusta: Apr–Nov (Vietnam/Indonesia main cycle). */ +FROM deduplicated AS d +LEFT JOIN seeds.weather_locations AS loc + ON d.location_id = loc.location_id +WHERE + d.observation_date BETWEEN @start_ds AND @end_ds \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/seeds/psd_attribute_codes.sql b/transform/sqlmesh_materia/models/seeds/psd_attribute_codes.sql index 9c63bec..7957d7e 100644 --- a/transform/sqlmesh_materia/models/seeds/psd_attribute_codes.sql +++ b/transform/sqlmesh_materia/models/seeds/psd_attribute_codes.sql @@ -2,8 +2,6 @@ MODEL ( name seeds.psd_attribute_codes, kind SEED ( path '$root/seeds/psd_attribute_codes.csv', - csv_settings ( - delimiter = ';' -) - ) -); + csv_settings (delimiter = ';') + ) +) \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/seeds/psd_commodity_codes.sql b/transform/sqlmesh_materia/models/seeds/psd_commodity_codes.sql index aa599a0..23b4ef8 100644 --- a/transform/sqlmesh_materia/models/seeds/psd_commodity_codes.sql +++ b/transform/sqlmesh_materia/models/seeds/psd_commodity_codes.sql @@ -2,9 +2,6 @@ MODEL ( name seeds.psd_commodity_codes, kind SEED ( path '$root/seeds/psd_commodity_codes.csv', - csv_settings ( - delimiter = ';' -) - ) -); - + csv_settings (delimiter = ';') + ) +) \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/seeds/psd_unit_of_measure_codes.sql b/transform/sqlmesh_materia/models/seeds/psd_unit_of_measure_codes.sql index 855e9d4..f9c955e 100644 --- a/transform/sqlmesh_materia/models/seeds/psd_unit_of_measure_codes.sql +++ b/transform/sqlmesh_materia/models/seeds/psd_unit_of_measure_codes.sql @@ -2,9 +2,6 @@ MODEL ( name seeds.psd_unit_of_measure_codes, kind SEED ( path '$root/seeds/psd_unit_of_measure_codes.csv', - csv_settings ( - delimiter = ';' -) + csv_settings (delimiter = ';') ) -); - +) \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/seeds/weather_locations.sql b/transform/sqlmesh_materia/models/seeds/weather_locations.sql new file mode 100644 index 0000000..f1096bf --- /dev/null +++ b/transform/sqlmesh_materia/models/seeds/weather_locations.sql @@ -0,0 +1,7 @@ +MODEL ( + name seeds.weather_locations, + kind SEED ( + path '$root/seeds/weather_locations.csv', + csv_settings (delimiter = ';') + ) +) \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/serving/coffee_prices.sql b/transform/sqlmesh_materia/models/serving/coffee_prices.sql index ef1a9cd..e1ddf08 100644 --- a/transform/sqlmesh_materia/models/serving/coffee_prices.sql +++ b/transform/sqlmesh_materia/models/serving/coffee_prices.sql @@ -1,16 +1,12 @@ --- Serving mart: KC=F Coffee C futures prices, analytics-ready. --- --- Adds moving averages (20-day, 50-day SMA) and 52-week high/low range. --- Filtered to trading days only (NULL close rows excluded upstream). --- --- Grain: one row per trade_date. - +/* Serving mart: KC=F Coffee C futures prices, analytics-ready. */ /* Adds moving averages (20-day, 50-day SMA) and 52-week high/low range. */ /* Filtered to trading days only (NULL close rows excluded upstream). */ /* Grain: one row per trade_date. */ MODEL ( name serving.coffee_prices, kind INCREMENTAL_BY_TIME_RANGE ( time_column trade_date ), - grain (trade_date), + grain ( + trade_date + ), start '1971-08-16', cron '@daily' ); @@ -24,38 +20,26 @@ WITH base AS ( f.close, f.adj_close, f.volume, - - -- Daily return: (close - prev_close) / prev_close * 100 - round( - (f.close - LAG(f.close, 1) OVER (ORDER BY f.trade_date)) - / NULLIF(LAG(f.close, 1) OVER (ORDER BY f.trade_date), 0) * 100, + ROUND( + ( + f.close - LAG(f.close, 1) OVER (ORDER BY f.trade_date) + ) / NULLIF(LAG(f.close, 1) OVER (ORDER BY f.trade_date), 0) * 100, 4 - ) AS daily_return_pct, - - -- 20-day simple moving average (1 trading month) - round( + ) AS daily_return_pct, /* Daily return: (close - prev_close) / prev_close * 100 */ + ROUND( AVG(f.close) OVER (ORDER BY f.trade_date ROWS BETWEEN 19 PRECEDING AND CURRENT ROW), 4 - ) AS sma_20d, - - -- 50-day simple moving average (2.5 trading months) - round( + ) AS sma_20d, /* 20-day simple moving average (1 trading month) */ + ROUND( AVG(f.close) OVER (ORDER BY f.trade_date ROWS BETWEEN 49 PRECEDING AND CURRENT ROW), 4 - ) AS sma_50d, - - -- 52-week high (approximately 252 trading days) - MAX(f.high) OVER (ORDER BY f.trade_date ROWS BETWEEN 251 PRECEDING AND CURRENT ROW) - AS high_52w, - - -- 52-week low - MIN(f.low) OVER (ORDER BY f.trade_date ROWS BETWEEN 251 PRECEDING AND CURRENT ROW) - AS low_52w - - FROM foundation.fct_coffee_prices f - WHERE f.trade_date BETWEEN @start_ds AND @end_ds + ) AS sma_50d, /* 50-day simple moving average (2.5 trading months) */ + MAX(f.high) OVER (ORDER BY f.trade_date ROWS BETWEEN 251 PRECEDING AND CURRENT ROW) AS high_52w, /* 52-week high (approximately 252 trading days) */ + MIN(f.low) OVER (ORDER BY f.trade_date ROWS BETWEEN 251 PRECEDING AND CURRENT ROW) AS low_52w /* 52-week low */ + FROM foundation.fct_coffee_prices AS f + WHERE + f.trade_date BETWEEN @start_ds AND @end_ds ) - SELECT b.trade_date, d.commodity_name, @@ -71,7 +55,9 @@ SELECT b.sma_50d, b.high_52w, b.low_52w -FROM base b -CROSS JOIN foundation.dim_commodity d -WHERE d.ticker = 'KC=F' -ORDER BY b.trade_date +FROM base AS b +CROSS JOIN foundation.dim_commodity AS d +WHERE + d.ticker = 'KC=F' +ORDER BY + b.trade_date \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/serving/ice_aging_stocks.sql b/transform/sqlmesh_materia/models/serving/ice_aging_stocks.sql index 6f3e4d2..3788912 100644 --- a/transform/sqlmesh_materia/models/serving/ice_aging_stocks.sql +++ b/transform/sqlmesh_materia/models/serving/ice_aging_stocks.sql @@ -1,60 +1,51 @@ --- Serving mart: ICE certified Coffee C stock aging report, analytics-ready. --- --- Shows the age distribution of certified stocks across delivery ports. --- Age buckets represent how long coffee has been in certified storage. --- Older stock approaching certificate limits is a supply quality signal. --- --- Source: ICE Certified Stock Aging Report (monthly) --- Grain: one row per (report_date, age_bucket). - -MODEL ( - name serving.ice_aging_stocks, - kind INCREMENTAL_BY_TIME_RANGE ( - time_column report_date - ), - grain (report_date, age_bucket), - start '2020-01-01', - cron '@daily' -); - -WITH base AS ( - SELECT - f.report_date, - f.age_bucket, - - -- Parse age range from "0000 to 0120" format for correct sort order - TRY_CAST(split_part(f.age_bucket, ' to ', 1) AS int) AS age_bucket_start_days, - TRY_CAST(split_part(f.age_bucket, ' to ', 2) AS int) AS age_bucket_end_days, - - f.antwerp_bags, - f.hamburg_bremen_bags, - f.houston_bags, - f.miami_bags, - f.new_orleans_bags, - f.new_york_bags, - f.total_bags, - - f.source_file - FROM foundation.fct_ice_aging_stocks f - WHERE f.report_date BETWEEN @start_ds AND @end_ds -) - -SELECT - b.report_date, - d.commodity_name, - d.ice_stock_report_code, - b.age_bucket, - b.age_bucket_start_days, - b.age_bucket_end_days, - b.antwerp_bags, - b.hamburg_bremen_bags, - b.houston_bags, - b.miami_bags, - b.new_orleans_bags, - b.new_york_bags, - b.total_bags, - b.source_file -FROM base b -CROSS JOIN foundation.dim_commodity d -WHERE d.ice_stock_report_code = 'COFFEE-C' -ORDER BY b.report_date, b.age_bucket_start_days +/* Serving mart: ICE certified Coffee C stock aging report, analytics-ready. */ /* Shows the age distribution of certified stocks across delivery ports. */ /* Age buckets represent how long coffee has been in certified storage. */ /* Older stock approaching certificate limits is a supply quality signal. */ /* Source: ICE Certified Stock Aging Report (monthly) */ /* Grain: one row per (report_date, age_bucket). */ +MODEL ( + name serving.ice_aging_stocks, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column report_date + ), + grain (report_date, age_bucket), + start '2020-01-01', + cron '@daily' +); + +WITH base AS ( + SELECT + f.report_date, + f.age_bucket, + TRY_CAST(SPLIT_PART(f.age_bucket, ' to ', 1) AS INT) AS age_bucket_start_days, /* Parse age range from "0000 to 0120" format for correct sort order */ + TRY_CAST(SPLIT_PART(f.age_bucket, ' to ', 2) AS INT) AS age_bucket_end_days, + f.antwerp_bags, + f.hamburg_bremen_bags, + f.houston_bags, + f.miami_bags, + f.new_orleans_bags, + f.new_york_bags, + f.total_bags, + f.source_file + FROM foundation.fct_ice_aging_stocks AS f + WHERE + f.report_date BETWEEN @start_ds AND @end_ds +) +SELECT + b.report_date, + d.commodity_name, + d.ice_stock_report_code, + b.age_bucket, + b.age_bucket_start_days, + b.age_bucket_end_days, + b.antwerp_bags, + b.hamburg_bremen_bags, + b.houston_bags, + b.miami_bags, + b.new_orleans_bags, + b.new_york_bags, + b.total_bags, + b.source_file +FROM base AS b +CROSS JOIN foundation.dim_commodity AS d +WHERE + d.ice_stock_report_code = 'COFFEE-C' +ORDER BY + b.report_date, + b.age_bucket_start_days \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks.sql b/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks.sql index 4b5b552..d011f11 100644 --- a/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks.sql +++ b/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks.sql @@ -1,19 +1,12 @@ --- Serving mart: ICE certified Coffee C warehouse stocks, analytics-ready. --- --- Adds 30-day rolling average, week-over-week change, and drawdown from --- 52-week high. Physical supply indicator used alongside S/D and positioning. --- --- "Certified stocks" = coffee graded and stamped as eligible for delivery --- against ICE Coffee C futures — traders watch this as a squeeze indicator. --- --- Grain: one row per report_date. - +/* Serving mart: ICE certified Coffee C warehouse stocks, analytics-ready. */ /* Adds 30-day rolling average, week-over-week change, and drawdown from */ /* 52-week high. Physical supply indicator used alongside S/D and positioning. */ /* "Certified stocks" = coffee graded and stamped as eligible for delivery */ /* against ICE Coffee C futures — traders watch this as a squeeze indicator. */ /* Grain: one row per report_date. */ MODEL ( name serving.ice_warehouse_stocks, kind INCREMENTAL_BY_TIME_RANGE ( time_column report_date ), - grain (report_date), + grain ( + report_date + ), start '2000-01-01', cron '@daily' ); @@ -23,45 +16,25 @@ WITH base AS ( f.report_date, f.total_certified_bags, f.pending_grading_bags, - - -- Week-over-week change (compare to 7 calendar days ago via LAG over ordered rows) - -- Using LAG(1) since data is daily: compares to previous trading/reporting day - f.total_certified_bags - - LAG(f.total_certified_bags, 1) OVER (ORDER BY f.report_date) AS wow_change_bags, - - -- 30-day rolling average (smooths daily noise) - round( - AVG(f.total_certified_bags::double) OVER ( - ORDER BY f.report_date ROWS BETWEEN 29 PRECEDING AND CURRENT ROW - ), + f.total_certified_bags /* Week-over-week change (compare to 7 calendar days ago via LAG over ordered rows) */ /* Using LAG(1) since data is daily: compares to previous trading/reporting day */ - LAG(f.total_certified_bags, 1) OVER (ORDER BY f.report_date) AS wow_change_bags, + ROUND( + AVG(f.total_certified_bags::DOUBLE) OVER (ORDER BY f.report_date ROWS BETWEEN 29 PRECEDING AND CURRENT ROW), 0 - ) AS avg_30d_bags, - - -- 52-week high (365 calendar days ≈ 252 trading days; use 365-row window as proxy) - MAX(f.total_certified_bags) OVER ( - ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW - ) AS high_52w_bags, - - -- Drawdown from 52-week high (pct below peak — squeeze indicator) - round( - (f.total_certified_bags::double - - MAX(f.total_certified_bags) OVER ( - ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW - )::double - ) - / NULLIF( - MAX(f.total_certified_bags) OVER ( - ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW - )::double, - 0 - ) * 100, + ) AS avg_30d_bags, /* 30-day rolling average (smooths daily noise) */ + MAX(f.total_certified_bags) OVER (ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW) AS high_52w_bags, /* 52-week high (365 calendar days ≈ 252 trading days; use 365-row window as proxy) */ + ROUND( + ( + f.total_certified_bags::DOUBLE - MAX(f.total_certified_bags) OVER (ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW)::DOUBLE + ) / NULLIF( + MAX(f.total_certified_bags) OVER (ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW)::DOUBLE, + 0 + ) * 100, 2 - ) AS drawdown_from_52w_high_pct - - FROM foundation.fct_ice_warehouse_stocks f - WHERE f.report_date BETWEEN @start_ds AND @end_ds + ) AS drawdown_from_52w_high_pct /* Drawdown from 52-week high (pct below peak — squeeze indicator) */ + FROM foundation.fct_ice_warehouse_stocks AS f + WHERE + f.report_date BETWEEN @start_ds AND @end_ds ) - SELECT b.report_date, d.commodity_name, @@ -72,7 +45,9 @@ SELECT b.avg_30d_bags, b.high_52w_bags, b.drawdown_from_52w_high_pct -FROM base b -CROSS JOIN foundation.dim_commodity d -WHERE d.ice_stock_report_code = 'COFFEE-C' -ORDER BY b.report_date +FROM base AS b +CROSS JOIN foundation.dim_commodity AS d +WHERE + d.ice_stock_report_code = 'COFFEE-C' +ORDER BY + b.report_date \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks_by_port.sql b/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks_by_port.sql index 1573a5f..ac96180 100644 --- a/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks_by_port.sql +++ b/transform/sqlmesh_materia/models/serving/ice_warehouse_stocks_by_port.sql @@ -1,78 +1,64 @@ --- Serving mart: ICE certified Coffee C warehouse stocks by port, analytics-ready. --- --- End-of-month certified stock levels broken down by delivery port. --- Covers November 1996 to present (~30 years). Useful for understanding --- geographic shifts in the certified supply base over time. --- --- Source: ICE historical by-port XLS (EOM_KC_cert_stox_by_port_nov96-present.xls) --- Grain: one row per report_date (end-of-month). - -MODEL ( - name serving.ice_warehouse_stocks_by_port, - kind INCREMENTAL_BY_TIME_RANGE ( - time_column report_date - ), - grain (report_date), - start '1996-11-01', - cron '@daily' -); - -WITH base AS ( - SELECT - f.report_date, - f.new_york_bags, - f.new_orleans_bags, - f.houston_bags, - f.miami_bags, - f.antwerp_bags, - f.hamburg_bremen_bags, - f.barcelona_bags, - f.virginia_bags, - f.total_bags, - - -- Month-over-month change in total certified bags - f.total_bags - - LAG(f.total_bags, 1) OVER (ORDER BY f.report_date) AS mom_change_bags, - - -- Month-over-month percent change - round( - (f.total_bags::double - - LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::double) - / NULLIF(LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::double, 0) * 100, - 2 - ) AS mom_change_pct, - - -- 12-month rolling average - round( - AVG(f.total_bags::double) OVER ( - ORDER BY f.report_date ROWS BETWEEN 11 PRECEDING AND CURRENT ROW - ), - 0 - ) AS avg_12m_bags, - - f.source_file - FROM foundation.fct_ice_warehouse_stocks_by_port f - WHERE f.report_date BETWEEN @start_ds AND @end_ds -) - -SELECT - b.report_date, - d.commodity_name, - d.ice_stock_report_code, - b.new_york_bags, - b.new_orleans_bags, - b.houston_bags, - b.miami_bags, - b.antwerp_bags, - b.hamburg_bremen_bags, - b.barcelona_bags, - b.virginia_bags, - b.total_bags, - b.mom_change_bags, - b.mom_change_pct, - b.avg_12m_bags, - b.source_file -FROM base b -CROSS JOIN foundation.dim_commodity d -WHERE d.ice_stock_report_code = 'COFFEE-C' -ORDER BY b.report_date +/* Serving mart: ICE certified Coffee C warehouse stocks by port, analytics-ready. */ /* End-of-month certified stock levels broken down by delivery port. */ /* Covers November 1996 to present (~30 years). Useful for understanding */ /* geographic shifts in the certified supply base over time. */ /* Source: ICE historical by-port XLS (EOM_KC_cert_stox_by_port_nov96-present.xls) */ /* Grain: one row per report_date (end-of-month). */ +MODEL ( + name serving.ice_warehouse_stocks_by_port, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column report_date + ), + grain ( + report_date + ), + start '1996-11-01', + cron '@daily' +); + +WITH base AS ( + SELECT + f.report_date, + f.new_york_bags, + f.new_orleans_bags, + f.houston_bags, + f.miami_bags, + f.antwerp_bags, + f.hamburg_bremen_bags, + f.barcelona_bags, + f.virginia_bags, + f.total_bags, + f.total_bags /* Month-over-month change in total certified bags */ - LAG(f.total_bags, 1) OVER (ORDER BY f.report_date) AS mom_change_bags, + ROUND( + ( + f.total_bags::DOUBLE - LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::DOUBLE + ) / NULLIF(LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::DOUBLE, 0) * 100, + 2 + ) AS mom_change_pct, /* Month-over-month percent change */ + ROUND( + AVG(f.total_bags::DOUBLE) OVER (ORDER BY f.report_date ROWS BETWEEN 11 PRECEDING AND CURRENT ROW), + 0 + ) AS avg_12m_bags, /* 12-month rolling average */ + f.source_file + FROM foundation.fct_ice_warehouse_stocks_by_port AS f + WHERE + f.report_date BETWEEN @start_ds AND @end_ds +) +SELECT + b.report_date, + d.commodity_name, + d.ice_stock_report_code, + b.new_york_bags, + b.new_orleans_bags, + b.houston_bags, + b.miami_bags, + b.antwerp_bags, + b.hamburg_bremen_bags, + b.barcelona_bags, + b.virginia_bags, + b.total_bags, + b.mom_change_bags, + b.mom_change_pct, + b.avg_12m_bags, + b.source_file +FROM base AS b +CROSS JOIN foundation.dim_commodity AS d +WHERE + d.ice_stock_report_code = 'COFFEE-C' +ORDER BY + b.report_date \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/serving/obt_commodity_metrics.sql b/transform/sqlmesh_materia/models/serving/obt_commodity_metrics.sql index 59b3093..998784d 100644 --- a/transform/sqlmesh_materia/models/serving/obt_commodity_metrics.sql +++ b/transform/sqlmesh_materia/models/serving/obt_commodity_metrics.sql @@ -1,106 +1,126 @@ -MODEL ( - name serving.commodity_metrics, - kind INCREMENTAL_BY_TIME_RANGE ( - time_column ingest_date - ), - start '2006-08-01', - cron '@daily' -); - --- CTE to calculate country-level derived metrics -WITH country_metrics AS ( - SELECT - commodity_code, - commodity_name, - country_code, - country_name, - market_year, - ingest_date, - Production, - Imports, - Exports, - Total_Distribution, - Ending_Stocks, - -- Derived metrics per country, mirroring Python script - (Production + Imports - Exports) AS Net_Supply, - (Exports - Imports) AS Trade_Balance, - (Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance, - -- Handle division by zero for Stock-to-Use Ratio - (Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct, - -- Calculate Production YoY percentage change using a window function - (Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY market_year, ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY market_year, ingest_date), 0) * 100 AS Production_YoY_pct - FROM cleaned.psdalldata__commodity_pivoted -), -global_aggregates AS ( - SELECT - commodity_code, - commodity_name, - NULL::TEXT AS country_code, -- Use NULL for global aggregates - 'Global' AS country_name, - market_year, - ingest_date, - SUM(Production) AS Production, - SUM(Imports) AS Imports, - SUM(Exports) AS Exports, - SUM(Total_Distribution) AS Total_Distribution, - SUM(Ending_Stocks) AS Ending_Stocks - FROM cleaned.psdalldata__commodity_pivoted - GROUP BY - commodity_code, - commodity_name, - market_year, - ingest_date -), --- CTE to calculate derived metrics for global aggregates -global_metrics AS ( - SELECT - commodity_code, - commodity_name, - country_code, - country_name, - market_year, - ingest_date, - Production, - Imports, - Exports, - Total_Distribution, - Ending_Stocks, - (Production + Imports - Exports) AS Net_Supply, - (Exports - Imports) AS Trade_Balance, - (Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance, - (Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct, - (Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY market_year, ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY market_year, ingest_date), 0) * 100 AS Production_YoY_pct - FROM global_aggregates -) --- Combine country-level and global-level data into a single output -SELECT - commodity_code, - commodity_name, - country_code, - country_name, - market_year, - ingest_date, - Production, - Imports, - Exports, - Total_Distribution, - Ending_Stocks, - Net_Supply, - Trade_Balance, - Supply_Demand_Balance, - Stock_to_Use_Ratio_pct, - Production_YoY_pct -FROM ( - SELECT - * - FROM country_metrics - UNION ALL - SELECT - * - FROM global_metrics -) AS combined_data -ORDER BY - commodity_name, - country_name, - market_year, - ingest_date; +MODEL ( + name serving.commodity_metrics, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ingest_date + ), + start '2006-08-01', + cron '@daily' +); + +/* CTE to calculate country-level derived metrics */ +WITH country_metrics AS ( + SELECT + commodity_code, + commodity_name, + country_code, + country_name, + market_year, + ingest_date, + Production, + Imports, + Exports, + Total_Distribution, + Ending_Stocks, + ( + Production + Imports - Exports + ) AS Net_Supply, /* Derived metrics per country, mirroring Python script */ + ( + Exports - Imports + ) AS Trade_Balance, + ( + Production + Imports - Exports + ) - Total_Distribution AS Supply_Demand_Balance, + ( + Ending_Stocks / NULLIF(Total_Distribution, 0) + ) /* Handle division by zero for Stock-to-Use Ratio */ * 100 AS Stock_to_Use_Ratio_pct, + ( + Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY market_year, ingest_date) + ) /* Calculate Production YoY percentage change using a window function */ / NULLIF( + LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY market_year, ingest_date), + 0 + ) * 100 AS Production_YoY_pct + FROM cleaned.psdalldata__commodity_pivoted +), global_aggregates AS ( + SELECT + commodity_code, + commodity_name, + NULL::TEXT AS country_code, /* Use NULL for global aggregates */ + 'Global' AS country_name, + market_year, + ingest_date, + SUM(Production) AS Production, + SUM(Imports) AS Imports, + SUM(Exports) AS Exports, + SUM(Total_Distribution) AS Total_Distribution, + SUM(Ending_Stocks) AS Ending_Stocks + FROM cleaned.psdalldata__commodity_pivoted + GROUP BY + commodity_code, + commodity_name, + market_year, + ingest_date +), global_metrics /* CTE to calculate derived metrics for global aggregates */ AS ( + SELECT + commodity_code, + commodity_name, + country_code, + country_name, + market_year, + ingest_date, + Production, + Imports, + Exports, + Total_Distribution, + Ending_Stocks, + ( + Production + Imports - Exports + ) AS Net_Supply, + ( + Exports - Imports + ) AS Trade_Balance, + ( + Production + Imports - Exports + ) - Total_Distribution AS Supply_Demand_Balance, + ( + Ending_Stocks / NULLIF(Total_Distribution, 0) + ) * 100 AS Stock_to_Use_Ratio_pct, + ( + Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY market_year, ingest_date) + ) / NULLIF( + LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY market_year, ingest_date), + 0 + ) * 100 AS Production_YoY_pct + FROM global_aggregates +) +/* Combine country-level and global-level data into a single output */ +SELECT + commodity_code, + commodity_name, + country_code, + country_name, + market_year, + ingest_date, + Production, + Imports, + Exports, + Total_Distribution, + Ending_Stocks, + Net_Supply, + Trade_Balance, + Supply_Demand_Balance, + Stock_to_Use_Ratio_pct, + Production_YoY_pct +FROM ( + SELECT + * + FROM country_metrics + UNION ALL + SELECT + * + FROM global_metrics +) AS combined_data +ORDER BY + commodity_name, + country_name, + market_year, + ingest_date \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/serving/obt_cot_positioning.sql b/transform/sqlmesh_materia/models/serving/obt_cot_positioning.sql index 8044341..9a94d62 100644 --- a/transform/sqlmesh_materia/models/serving/obt_cot_positioning.sql +++ b/transform/sqlmesh_materia/models/serving/obt_cot_positioning.sql @@ -1,41 +1,32 @@ --- Serving mart: COT positioning for Coffee C futures, analytics-ready. --- --- Joins foundation.fct_cot_positioning with foundation.dim_commodity so --- the coffee filter is driven by the dimension (not a hardcoded CFTC code). --- Adds derived analytics used by the dashboard and API: --- - Normalized positioning (% of open interest) --- - Long/short ratio --- - Week-over-week momentum --- - COT Index over 26-week and 52-week trailing windows (0=bearish, 100=bullish) --- --- Grain: one row per report_date for Coffee C futures. --- Latest revision per date: MAX(ingest_date) used to deduplicate CFTC corrections. - +/* Serving mart: COT positioning for Coffee C futures, analytics-ready. */ /* Joins foundation.fct_cot_positioning with foundation.dim_commodity so */ /* the coffee filter is driven by the dimension (not a hardcoded CFTC code). */ /* Adds derived analytics used by the dashboard and API: */ /* - Normalized positioning (% of open interest) */ /* - Long/short ratio */ /* - Week-over-week momentum */ /* - COT Index over 26-week and 52-week trailing windows (0=bearish, 100=bullish) */ /* Grain: one row per report_date for Coffee C futures. */ /* Latest revision per date: MAX(ingest_date) used to deduplicate CFTC corrections. */ MODEL ( name serving.cot_positioning, kind INCREMENTAL_BY_TIME_RANGE ( time_column report_date ), - grain (report_date), + grain ( + report_date + ), start '2006-06-13', cron '@daily' ); WITH latest_revision AS ( - -- Pick the most recently ingested row when CFTC issues corrections - SELECT f.* - FROM foundation.fct_cot_positioning f - INNER JOIN foundation.dim_commodity d + /* Pick the most recently ingested row when CFTC issues corrections */ + SELECT + f.* + FROM foundation.fct_cot_positioning AS f + INNER JOIN foundation.dim_commodity AS d ON f.cftc_commodity_code = d.cftc_commodity_code - WHERE d.commodity_name = 'Coffee, Green' + WHERE + d.commodity_name = 'Coffee, Green' AND f.report_date BETWEEN @start_ds AND @end_ds - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY f.report_date, f.cftc_contract_market_code - ORDER BY f.ingest_date DESC - ) = 1 -), - -with_derived AS ( + QUALIFY + ROW_NUMBER() OVER ( + PARTITION BY f.report_date, f.cftc_contract_market_code + ORDER BY f.ingest_date DESC + ) = 1 +), with_derived AS ( SELECT report_date, market_and_exchange_name, @@ -43,9 +34,7 @@ with_derived AS ( cftc_contract_market_code, contract_units, ingest_date, - - -- Absolute positions (contracts) - open_interest, + open_interest, /* Absolute positions (contracts) */ managed_money_long, managed_money_short, managed_money_spread, @@ -64,77 +53,52 @@ with_derived AS ( nonreportable_long, nonreportable_short, nonreportable_net, - - -- Normalized: managed money net as % of open interest - -- Removes size effects and makes cross-period comparison meaningful - round( - managed_money_net::float / NULLIF(open_interest, 0) * 100, - 2 - ) AS managed_money_net_pct_of_oi, - - -- Long/short ratio: >1 = more bulls than bears in managed money - round( - managed_money_long::float / NULLIF(managed_money_short, 0), - 3 - ) AS managed_money_long_short_ratio, - - -- Weekly changes - change_open_interest, + ROUND(managed_money_net::REAL / NULLIF(open_interest, 0) * 100, 2) AS managed_money_net_pct_of_oi, /* Normalized: managed money net as % of open interest */ /* Removes size effects and makes cross-period comparison meaningful */ + ROUND(managed_money_long::REAL / NULLIF(managed_money_short, 0), 3) AS managed_money_long_short_ratio, /* Long/short ratio: >1 = more bulls than bears in managed money */ + change_open_interest, /* Weekly changes */ change_managed_money_long, change_managed_money_short, change_managed_money_net, change_prod_merc_long, change_prod_merc_short, - - -- Week-over-week momentum in managed money net (via LAG) - managed_money_net - LAG(managed_money_net, 1) OVER ( - ORDER BY report_date - ) AS managed_money_net_wow, - - -- Concentration - concentration_top4_long_pct, + managed_money_net /* Week-over-week momentum in managed money net (via LAG) */ - LAG(managed_money_net, 1) OVER (ORDER BY report_date) AS managed_money_net_wow, + concentration_top4_long_pct, /* Concentration */ concentration_top4_short_pct, concentration_top8_long_pct, concentration_top8_short_pct, - - -- Trader counts - traders_total, + traders_total, /* Trader counts */ traders_managed_money_long, traders_managed_money_short, traders_managed_money_spread, - - -- COT Index (26-week): where is current net vs. trailing 26 weeks? - -- 0 = most bearish extreme, 100 = most bullish extreme - -- Industry-standard sentiment gauge (equivalent to RSI for positioning) CASE WHEN MAX(managed_money_net) OVER w26 = MIN(managed_money_net) OVER w26 - THEN 50.0 - ELSE round( - (managed_money_net - MIN(managed_money_net) OVER w26)::float - / (MAX(managed_money_net) OVER w26 - MIN(managed_money_net) OVER w26) - * 100, + THEN 50.0 + ELSE ROUND( + ( + managed_money_net - MIN(managed_money_net) OVER w26 + )::REAL / ( + MAX(managed_money_net) OVER w26 - MIN(managed_money_net) OVER w26 + ) * 100, 1 ) - END AS cot_index_26w, - - -- COT Index (52-week): longer-term positioning context + END AS cot_index_26w, /* COT Index (26-week): where is current net vs. trailing 26 weeks? */ /* 0 = most bearish extreme, 100 = most bullish extreme */ /* Industry-standard sentiment gauge (equivalent to RSI for positioning) */ CASE WHEN MAX(managed_money_net) OVER w52 = MIN(managed_money_net) OVER w52 - THEN 50.0 - ELSE round( - (managed_money_net - MIN(managed_money_net) OVER w52)::float - / (MAX(managed_money_net) OVER w52 - MIN(managed_money_net) OVER w52) - * 100, + THEN 50.0 + ELSE ROUND( + ( + managed_money_net - MIN(managed_money_net) OVER w52 + )::REAL / ( + MAX(managed_money_net) OVER w52 - MIN(managed_money_net) OVER w52 + ) * 100, 1 ) - END AS cot_index_52w - + END AS cot_index_52w /* COT Index (52-week): longer-term positioning context */ FROM latest_revision - WINDOW - w26 AS (ORDER BY report_date ROWS BETWEEN 25 PRECEDING AND CURRENT ROW), - w52 AS (ORDER BY report_date ROWS BETWEEN 51 PRECEDING AND CURRENT ROW) + WINDOW w26 AS (ORDER BY report_date ROWS BETWEEN 25 PRECEDING AND CURRENT ROW), w52 AS (ORDER BY report_date ROWS BETWEEN 51 PRECEDING AND CURRENT ROW) ) - -SELECT * +SELECT + * FROM with_derived -ORDER BY report_date +ORDER BY + report_date \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql b/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql index 03dc8c4..e9e79eb 100644 --- a/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql +++ b/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql @@ -6,67 +6,90 @@ MODEL ( start '2006-08-01', cron '@daily' ); -with cast_dtypes as ( + +WITH cast_dtypes AS ( SELECT - src.commodity_code::int as commodity_code, - coalesce(commodity_name, commodity_description) as commodity_name, - country_code::varchar(3) as country_code, - country_name, - market_year::int as market_year, - calendar_year::int as calendar_year, - month::int as month, - src.attribute_id::int as attribute_id, - coalesce(attribute_name, attribute_description) as attribute_name, - src.unit_id::int as unit_id, - coalesce(unit_name, unit_description) as unit_name, - value::float as value, - filename - FROM read_csv( + src.commodity_code::INT AS commodity_code, + COALESCE(commodity_name, commodity_description) AS commodity_name, + country_code::TEXT AS country_code, + country_name, + market_year::INT AS market_year, + calendar_year::INT AS calendar_year, + month::INT AS month, + src.attribute_id::INT AS attribute_id, + COALESCE(attribute_name, attribute_description) AS attribute_name, + src.unit_id::INT AS unit_id, + COALESCE(unit_name, unit_description) AS unit_name, + value::REAL AS value, + filename + FROM READ_CSV( @psd_glob(), - compression = 'gzip', - header = true, - union_by_name = true, - filename = true, - all_varchar = true, - max_line_size = 10000000 + compression = 'gzip', + header = TRUE, + union_by_name = TRUE, + filename = TRUE, + all_varchar = TRUE, + max_line_size = 10000000 ) AS src - left join seeds.psd_commodity_codes on seeds.psd_commodity_codes.commodity_code = src.commodity_code::int - left join seeds.psd_unit_of_measure_codes on seeds.psd_unit_of_measure_codes.unit_id = src.unit_id::int - left join seeds.psd_attribute_codes on seeds.psd_attribute_codes.attribute_id = src.attribute_id::int -), -metadata_and_deduplication as ( -select - any_value(commodity_code) as commodity_code, - any_value(commodity_name) as commodity_name, - any_value(country_code) as country_code, - any_value(country_name) as country_name, - any_value(market_year) as market_year, - any_value(calendar_year) as calendar_year, - any_value(month) as month, - any_value(attribute_id) as attribute_id, - any_value(attribute_name) as attribute_name, - any_value(unit_id) as unit_id, - any_value(unit_name) as unit_name, - any_value(value) as value, - hash(commodity_code, commodity_name, country_code, country_name, market_year, calendar_year, month, attribute_id, attribute_name, unit_id, unit_name, value) as hkey, - any_value(make_date(split(filename, '/')[-3]::int, split(filename, '/')[-2]::int, 1)) as ingest_date, - any_value(if(month!=0,last_day(make_date(market_year, month, 1)),null)) as market_date_month_end, - from cast_dtypes - group by hkey + LEFT JOIN seeds.psd_commodity_codes + ON seeds.psd_commodity_codes.commodity_code = src.commodity_code::INT + LEFT JOIN seeds.psd_unit_of_measure_codes + ON seeds.psd_unit_of_measure_codes.unit_id = src.unit_id::INT + LEFT JOIN seeds.psd_attribute_codes + ON seeds.psd_attribute_codes.attribute_id = src.attribute_id::INT +), metadata_and_deduplication AS ( + SELECT + ANY_VALUE(commodity_code) AS commodity_code, + ANY_VALUE(commodity_name) AS commodity_name, + ANY_VALUE(country_code) AS country_code, + ANY_VALUE(country_name) AS country_name, + ANY_VALUE(market_year) AS market_year, + ANY_VALUE(calendar_year) AS calendar_year, + ANY_VALUE(month) AS month, + ANY_VALUE(attribute_id) AS attribute_id, + ANY_VALUE(attribute_name) AS attribute_name, + ANY_VALUE(unit_id) AS unit_id, + ANY_VALUE(unit_name) AS unit_name, + ANY_VALUE(value) AS value, + HASH( + commodity_code, + commodity_name, + country_code, + country_name, + market_year, + calendar_year, + month, + attribute_id, + attribute_name, + unit_id, + unit_name, + value + ) AS hkey, + ANY_VALUE( + MAKE_DATE(STR_SPLIT(filename, '/')[-3]::INT, STR_SPLIT(filename, '/')[-2]::INT, 1) + ) AS ingest_date, + ANY_VALUE( + CASE WHEN month <> 0 THEN LAST_DAY(MAKE_DATE(market_year, month, 1)) ELSE NULL END + ) AS market_date_month_end + FROM cast_dtypes + GROUP BY + hkey ) -select hkey, - commodity_code, - commodity_name, - country_code, - country_name, - market_year, - calendar_year, - month, - attribute_id, - attribute_name, - unit_id, - unit_name, - value, - ingest_date, -from metadata_and_deduplication -where ingest_date between @start_ds and @end_ds; +SELECT + hkey, + commodity_code, + commodity_name, + country_code, + country_name, + market_year, + calendar_year, + month, + attribute_id, + attribute_name, + unit_id, + unit_name, + value, + ingest_date +FROM metadata_and_deduplication +WHERE + ingest_date BETWEEN @start_ds AND @end_ds \ No newline at end of file diff --git a/transform/sqlmesh_materia/seeds/weather_locations.csv b/transform/sqlmesh_materia/seeds/weather_locations.csv new file mode 100644 index 0000000..6c6ad7a --- /dev/null +++ b/transform/sqlmesh_materia/seeds/weather_locations.csv @@ -0,0 +1,9 @@ +location_id;name;country;lat;lon;variety +brazil_minas_gerais;Minas Gerais;BR;-19.9167;-43.9345;Arabica +brazil_parana;Paraná;BR;-23.4205;-51.9330;Arabica +vietnam_highlands;Central Highlands;VN;12.6667;108.0500;Robusta +colombia_huila;Huila;CO;2.5359;-75.5277;Arabica +ethiopia_sidama;Sidama;ET;6.7612;38.4721;Arabica +honduras_copan;Copán;HN;14.8333;-89.1500;Arabica +guatemala_antigua;Antigua;GT;14.5586;-90.7295;Arabica +indonesia_sumatra;Sumatra;ID;3.5952;98.6722;Robusta diff --git a/uv.lock b/uv.lock index 1e8cbff..6baaff9 100644 --- a/uv.lock +++ b/uv.lock @@ -14,6 +14,7 @@ members = [ "extract-core", "ice-stocks", "materia", + "openweathermap", "psdonline", "sqlmesh-materia", ] @@ -1778,6 +1779,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cf/df/d3f1ddf4bb4cb50ed9b1139cc7b1c54c34a1e7ce8fd1b9a37c0d1551a6bd/opentelemetry_api-1.39.1-py3-none-any.whl", hash = "sha256:2edd8463432a7f8443edce90972169b195e7d6a05500cd29e6d13898187c9950", size = 66356, upload-time = "2025-12-11T13:32:17.304Z" }, ] +[[package]] +name = "openweathermap" +version = "0.1.0" +source = { editable = "extract/openweathermap" } +dependencies = [ + { name = "extract-core" }, + { name = "niquests" }, +] + +[package.metadata] +requires-dist = [ + { name = "extract-core", editable = "extract/extract_core" }, + { name = "niquests", specifier = ">=3.14.1" }, +] + [[package]] name = "orjson" version = "3.11.7"