feat(extract): add OpenWeatherMap daily weather extractor

Adds extract/openweathermap package with daily weather extraction for 8
coffee-growing regions (Brazil, Vietnam, Colombia, Ethiopia, Honduras,
Guatemala, Indonesia). Feeds crop stress signal for commodity sentiment score.

Extractor:
- OWM One Call API 3.0 / Day Summary — one JSON.gz per (location, date)
- extract_weather: daily, fetches yesterday + today (16 calls max)
- extract_weather_backfill: fills 2020-01-01 to yesterday, capped at 500
  calls/run with resume cursor '{location_id}:{date}' for crash safety
- Full idempotency via file existence check; state tracking via extract_core

SQLMesh:
- seeds.weather_locations (8 regions with lat/lon/variety)
- foundation.fct_weather_daily: INCREMENTAL_BY_TIME_RANGE, grain
  (location_id, observation_date), dedup via hash key, crop stress flags:
  is_frost (<2°C), is_heat_stress (>35°C), is_drought (<1mm), in_growing_season

Landing path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-25 22:40:27 +01:00
parent c3c8333407
commit 08e74665bb
31 changed files with 1377 additions and 915 deletions

View File

@@ -44,23 +44,24 @@ uv run materia secrets get
**Workspace packages** (`pyproject.toml``tool.uv.workspace`): **Workspace packages** (`pyproject.toml``tool.uv.workspace`):
- `extract/psdonline/` — Downloads USDA PSD Online data, normalizes ZIP→gzip CSV, writes to local landing directory - `extract/psdonline/` — Downloads USDA PSD Online data, normalizes ZIP→gzip CSV, writes to local landing directory
- `transform/sqlmesh_materia/`4-layer SQL transformation pipeline (local DuckDB) - `extract/openweathermap/`Daily weather for 8 coffee-growing regions (OWM One Call API 3.0)
- `transform/sqlmesh_materia/` — 3-layer SQL transformation pipeline (local DuckDB)
- `src/materia/` — CLI (Typer) for pipeline execution, worker management, secrets - `src/materia/` — CLI (Typer) for pipeline execution, worker management, secrets
- `web/` — Future web frontend - `web/` — Future web frontend
**Data flow:** **Data flow:**
``` ```
USDA API → extract → /data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip USDA API → extract → /data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip
OWM API → extract → /data/materia/landing/weather/{location_id}/{year}/{date}.json.gz
→ rclone cron syncs landing/ to R2 → rclone cron syncs landing/ to R2
→ SQLMesh raw → staging → cleaned → serving → /data/materia/lakehouse.duckdb → SQLMesh staging → foundation → serving → /data/materia/lakehouse.duckdb
→ Web app reads lakehouse.duckdb (read-only) → Web app reads lakehouse.duckdb (read-only)
``` ```
**SQLMesh 4-layer model structure** (`transform/sqlmesh_materia/models/`): **SQLMesh 3-layer model structure** (`transform/sqlmesh_materia/models/`):
1. `raw/` — Immutable source reads (read_csv from landing directory) 1. `staging/` — Type casting, lookup joins, basic cleansing (reads landing directly)
2. `staging/` — Type casting, lookup joins, basic cleansing 2. `foundation/` — Business logic, pivoting, dimensions, facts (also reads landing directly)
3. `cleaned/` — Business logic, pivoting, integration 3. `serving/` — Analytics-ready aggregates for the web app
4. `serving/` — Analytics-ready facts, dimensions, aggregates
**CLI modules** (`src/materia/`): **CLI modules** (`src/materia/`):
- `cli.py` — Typer app with subcommands: worker, pipeline, secrets, version - `cli.py` — Typer app with subcommands: worker, pipeline, secrets, version
@@ -100,3 +101,4 @@ Read `coding_philosophy.md` for the full guide. Key points:
|----------|---------|-------------| |----------|---------|-------------|
| `LANDING_DIR` | `data/landing` | Root directory for extracted landing data | | `LANDING_DIR` | `data/landing` | Root directory for extracted landing data |
| `DUCKDB_PATH` | `local.duckdb` | Path to the DuckDB lakehouse database | | `DUCKDB_PATH` | `local.duckdb` | Path to the DuckDB lakehouse database |
| `OPENWEATHERMAP_API_KEY` | — | OWM One Call API 3.0 key (required for weather extraction) |

View File

@@ -0,0 +1,20 @@
[project]
name = "openweathermap"
version = "0.1.0"
description = "OpenWeatherMap daily weather extractor for coffee-growing regions"
requires-python = ">=3.13"
dependencies = [
"extract_core",
"niquests>=3.14.1",
]
[project.scripts]
extract_weather = "openweathermap.execute:extract_weather"
extract_weather_backfill = "openweathermap.execute:extract_weather_backfill"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/openweathermap"]

View File

@@ -0,0 +1,76 @@
"""Thin client for the OpenWeatherMap One Call API 3.0 — Day Summary endpoint.
Endpoint: GET https://api.openweathermap.org/data/3.0/onecall/day_summary
Docs: https://openweathermap.org/api/one-call-3#history_daily_aggregation
Returns one JSON object per (lat, lon, date) with daily aggregates:
temperature.{min,max,morning,afternoon,evening,night}
precipitation.total
humidity.afternoon
cloud_cover.afternoon
wind.max.{speed,direction}
pressure.afternoon
This module contains only the HTTP call and basic response validation.
All business logic (file storage, rate limiting, cursor tracking) lives in execute.py.
"""
import niquests
OWM_BASE_URL = "https://api.openweathermap.org/data/3.0/onecall/day_summary"
HTTP_TIMEOUT_SECONDS = 30
MAX_RESPONSE_BYTES = 10_000 # Day summary is ~500 bytes; 10 KB is a generous bound
class RateLimitError(Exception):
"""Raised when OWM returns HTTP 429 (rate limit exceeded)."""
def fetch_day_summary(
session: niquests.Session,
lat: float,
lon: float,
date_str: str,
api_key: str,
) -> dict:
"""Fetch the OWM One Call 3.0 day summary for a single (lat, lon, date).
date_str must be YYYY-MM-DD format.
Returns the parsed JSON dict on success.
Raises RateLimitError on HTTP 429 — caller is responsible for sleeping and retrying.
Raises AssertionError on any other non-200 status.
"""
assert api_key, "api_key must not be empty"
assert date_str and len(date_str) == 10, f"date_str must be YYYY-MM-DD, got {date_str!r}"
assert -90.0 <= lat <= 90.0, f"lat out of range: {lat}"
assert -180.0 <= lon <= 180.0, f"lon out of range: {lon}"
response = session.get(
OWM_BASE_URL,
params={
"lat": lat,
"lon": lon,
"date": date_str,
"appid": api_key,
"units": "metric",
},
timeout=HTTP_TIMEOUT_SECONDS,
)
if response.status_code == 429:
raise RateLimitError(f"OWM rate limit hit for lat={lat} lon={lon} date={date_str}")
assert response.status_code == 200, (
f"OWM API returned HTTP {response.status_code} for "
f"lat={lat} lon={lon} date={date_str}: {response.text[:200]}"
)
assert len(response.content) <= MAX_RESPONSE_BYTES, (
f"OWM response unexpectedly large ({len(response.content)} bytes) for {date_str}"
)
data = response.json()
assert isinstance(data, dict), f"Expected dict response, got {type(data)}"
assert "date" in data, f"OWM response missing 'date' field: {list(data.keys())}"
return data

View File

@@ -0,0 +1,330 @@
"""OpenWeatherMap daily weather extraction for coffee-growing regions.
Two entry points:
extract_weather()
Daily run: fetches yesterday + today for all 8 locations (16 calls max).
Yesterday is included to cover the midnight edge case — if the daily job
fires just after midnight UTC, today's OWM data may still be partial.
Idempotent: skips if the landing file already exists.
extract_weather_backfill()
Historical fill: iterates (date, location) pairs from 2020-01-01 to
yesterday. Bounded to MAX_CALLS_PER_BACKFILL_RUN per run; re-run daily
to advance. Resumes from cursor on restart.
Landing path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz
Idempotency: file existence check. Past weather is immutable — (location_id, date)
uniquely identifies a file that never changes once written.
Backfill cursor format: '{location_id}:{date}' (e.g. 'brazil_parana:2022-07-15').
Encodes both dimensions so a mid-run crash resumes at the exact (location, date) pair.
"""
import gzip
import json
import logging
import os
import sys
import time
from datetime import date, timedelta
from pathlib import Path
import niquests
from extract_core import end_run, get_last_cursor, landing_path, open_state_db, start_run, write_bytes_atomic
from openweathermap.api import RateLimitError, fetch_day_summary
from openweathermap.locations import LOCATIONS
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("OWM Weather Extractor")
LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing"))
LANDING_SUBDIR = "weather"
EXTRACTOR_DAILY = "owm_weather_daily"
EXTRACTOR_BACKFILL = "owm_weather_backfill"
# Rate limiting: OWM free tier = 1000 calls/day (~0.7/s).
# 1.5s between calls stays comfortably below the limit for the daily run.
# 2.0s for backfill (more conservative, many sequential calls).
SLEEP_BETWEEN_CALLS_SECONDS = 1.5
SLEEP_BETWEEN_BACKFILL_CALLS_SECONDS = 2.0
# On 429: wait 60s, then one retry. If still 429, abort the run.
SLEEP_ON_RATE_LIMIT_SECONDS = 60
MAX_RATE_LIMIT_RETRIES = 1
# Cap backfill at 500 calls per run (~17 min at 2s/call).
# 5-year backfill = 14,600 calls → ~30 runs. Re-run daily until complete.
MAX_CALLS_PER_BACKFILL_RUN = 500
# ── helpers ──────────────────────────────────────────────────────────────────
def _write_weather_file(location_id: str, date_str: str, payload: dict) -> int:
"""Gzip-compress payload JSON and write atomically to the landing zone.
Returns bytes_written, or 0 if the file already exists (idempotent skip).
Path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz
"""
assert location_id, "location_id must not be empty"
assert date_str and len(date_str) == 10, f"date_str must be YYYY-MM-DD, got {date_str!r}"
assert isinstance(payload, dict) and payload, "payload must be a non-empty dict"
year = date_str[:4]
dest_dir = landing_path(LANDING_DIR, LANDING_SUBDIR, location_id, year)
local_file = dest_dir / f"{date_str}.json.gz"
if local_file.exists():
logger.debug(f"Already exists, skipping: {local_file}")
return 0
compressed = gzip.compress(json.dumps(payload, separators=(",", ":")).encode("utf-8"))
bytes_written = write_bytes_atomic(local_file, compressed)
logger.info(f"Stored {local_file} ({bytes_written:,} bytes)")
return bytes_written
def _fetch_with_retry(session: niquests.Session, loc: dict, date_str: str, api_key: str) -> dict | None:
"""Fetch OWM day summary with one 429-retry.
Returns the JSON dict on success, or None if rate limit persists after retry.
"""
for attempt in range(MAX_RATE_LIMIT_RETRIES + 1):
try:
return fetch_day_summary(session, loc["lat"], loc["lon"], date_str, api_key)
except RateLimitError:
if attempt < MAX_RATE_LIMIT_RETRIES:
logger.warning(
f"Rate limit hit for {loc['id']} {date_str}"
f"sleeping {SLEEP_ON_RATE_LIMIT_SECONDS}s before retry"
)
time.sleep(SLEEP_ON_RATE_LIMIT_SECONDS)
else:
logger.error(f"Rate limit persisted after retry for {loc['id']} {date_str}")
return None
return None # unreachable; satisfies type checker
def _file_exists(location_id: str, date_str: str) -> bool:
year = date_str[:4]
return (LANDING_DIR / LANDING_SUBDIR / location_id / year / f"{date_str}.json.gz").exists()
# ── daily extractor ───────────────────────────────────────────────────────────
def extract_weather() -> None:
"""Fetch yesterday + today weather for all 8 coffee-growing locations.
Up to 16 API calls. Both days are skipped if files already exist,
so re-running costs zero API calls (fully idempotent).
"""
api_key = os.environ.get("OPENWEATHERMAP_API_KEY", "")
assert api_key, "OPENWEATHERMAP_API_KEY environment variable must be set"
today = date.today()
yesterday = today - timedelta(days=1)
dates_to_fetch = [yesterday.isoformat(), today.isoformat()]
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, EXTRACTOR_DAILY)
files_written = 0
files_skipped = 0
bytes_written_total = 0
try:
with niquests.Session() as session:
for date_str in dates_to_fetch:
for loc in LOCATIONS:
if _file_exists(loc["id"], date_str):
logger.info(f"Already exists: {loc['id']} {date_str}")
files_skipped += 1
continue
data = _fetch_with_retry(session, loc, date_str, api_key)
if data is None:
logger.error(f"Skipping {loc['id']} {date_str} after persistent rate limit")
continue
bw = _write_weather_file(loc["id"], date_str, data)
if bw > 0:
files_written += 1
bytes_written_total += bw
else:
files_skipped += 1
time.sleep(SLEEP_BETWEEN_CALLS_SECONDS)
end_run(
conn, run_id,
status="success",
files_written=files_written,
files_skipped=files_skipped,
bytes_written=bytes_written_total,
cursor_value=today.isoformat(),
)
logger.info(f"Daily weather complete: {files_written} new, {files_skipped} skipped")
except Exception as e:
end_run(conn, run_id, status="failed", error_message=str(e))
raise
finally:
conn.close()
# ── backfill extractor ────────────────────────────────────────────────────────
def extract_weather_backfill() -> None:
"""Fill historical weather data from 2020-01-01 to yesterday.
Iterates (date, location) pairs in date-ascending, LOCATIONS-list order.
Bounded to MAX_CALLS_PER_BACKFILL_RUN per run — re-run daily to advance.
Cursor format: '{location_id}:{date}' (e.g. 'brazil_parana:2022-07-15').
Encodes both dimensions: on resume, all pairs at or before the cursor are
skipped (via cursor comparison first, then file-existence check).
5-year backfill (20202025) = 14,600 calls. At 500/run = ~30 runs.
429 handling: sleep 60s, one retry. If still 429, save cursor and exit
with status='failed' so the cursor does not advance beyond the last
successfully written pair. Safe to re-run the next day.
"""
api_key = os.environ.get("OPENWEATHERMAP_API_KEY", "")
assert api_key, "OPENWEATHERMAP_API_KEY environment variable must be set"
start = date(2020, 1, 1)
end = date.today() - timedelta(days=1) # never fetch today in backfill
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, EXTRACTOR_BACKFILL)
files_written = 0
files_skipped = 0
bytes_written_total = 0
calls_made = 0
last_cursor: str | None = None
# Load resume cursor from last successful run
resume_cursor = get_last_cursor(conn, EXTRACTOR_BACKFILL)
if resume_cursor:
logger.info(f"Resuming backfill from cursor: {resume_cursor}")
else:
logger.info(f"Starting fresh backfill from {start.isoformat()}")
# Parse cursor into (location_id, date_str) for skip comparison
resume_location_id: str | None = None
resume_date_str: str | None = None
if resume_cursor and ":" in resume_cursor:
resume_location_id, resume_date_str = resume_cursor.split(":", 1)
location_ids = [loc["id"] for loc in LOCATIONS]
resume_loc_idx = -1
if resume_location_id and resume_location_id in location_ids:
resume_loc_idx = location_ids.index(resume_location_id)
try:
with niquests.Session() as session:
current = start
while current <= end:
date_str = current.isoformat()
for loc in LOCATIONS:
loc_idx = location_ids.index(loc["id"])
# Cursor-based skip: (date, loc_idx) <= (resume_date, resume_loc_idx)
# This skips everything already processed in previous runs.
if resume_date_str:
if date_str < resume_date_str:
files_skipped += 1
continue
if date_str == resume_date_str and loc_idx <= resume_loc_idx:
files_skipped += 1
continue
# File-existence check: idempotency guard for files already on disk
# (e.g. written by the daily extractor, or a previous partial run)
if _file_exists(loc["id"], date_str):
files_skipped += 1
last_cursor = f"{loc['id']}:{date_str}"
continue
# Per-run call cap
if calls_made >= MAX_CALLS_PER_BACKFILL_RUN:
logger.info(
f"Reached cap of {MAX_CALLS_PER_BACKFILL_RUN} calls. "
f"Re-run to continue from {last_cursor or resume_cursor}"
)
end_run(
conn, run_id,
status="success",
files_written=files_written,
files_skipped=files_skipped,
bytes_written=bytes_written_total,
cursor_value=last_cursor or resume_cursor,
)
return
data = _fetch_with_retry(session, loc, date_str, api_key)
calls_made += 1
if data is None:
logger.warning(f"Persistent rate limit at {loc['id']} {date_str} — stopping run")
end_run(
conn, run_id,
status="failed",
files_written=files_written,
files_skipped=files_skipped,
bytes_written=bytes_written_total,
cursor_value=last_cursor or resume_cursor,
error_message="Persistent rate limit — resume from cursor",
)
return
bw = _write_weather_file(loc["id"], date_str, data)
if bw > 0:
files_written += 1
bytes_written_total += bw
else:
files_skipped += 1
last_cursor = f"{loc['id']}:{date_str}"
time.sleep(SLEEP_BETWEEN_BACKFILL_CALLS_SECONDS)
current += timedelta(days=1)
final_cursor = last_cursor or resume_cursor or end.isoformat()
logger.info(
f"Backfill complete: {files_written} written, "
f"{files_skipped} skipped, {calls_made} API calls"
)
end_run(
conn, run_id,
status="success",
files_written=files_written,
files_skipped=files_skipped,
bytes_written=bytes_written_total,
cursor_value=final_cursor,
)
except Exception as e:
end_run(
conn, run_id,
status="failed",
files_written=files_written,
files_skipped=files_skipped,
bytes_written=bytes_written_total,
cursor_value=last_cursor or resume_cursor,
error_message=str(e),
)
raise
finally:
conn.close()
if __name__ == "__main__":
extract_weather()

View File

@@ -0,0 +1,35 @@
"""Coffee-growing region coordinates for OpenWeatherMap extraction.
Each entry is a dict with:
id — filesystem-safe unique identifier (used as landing subdirectory name)
lat/lon — WGS84 coordinates
name — human-readable region name
country — ISO 3166-1 alpha-2 country code
variety — 'Arabica' or 'Robusta' (drives growing season logic in SQL)
Locations were chosen to represent the primary growing zones for the world's
major coffee-producing countries, weighted toward Arabica regions since KC=F
futures track Arabica.
"""
LOCATIONS: list[dict] = [
# Brazil — largest Arabica producer; frost risk in highlands (JunAug)
{"id": "brazil_minas_gerais", "lat": -19.9167, "lon": -43.9345, "name": "Minas Gerais", "country": "BR", "variety": "Arabica"},
{"id": "brazil_parana", "lat": -23.4205, "lon": -51.9330, "name": "Paraná", "country": "BR", "variety": "Arabica"},
# Vietnam — largest Robusta producer; Central Highlands plateau
{"id": "vietnam_highlands", "lat": 12.6667, "lon": 108.0500, "name": "Central Highlands", "country": "VN", "variety": "Robusta"},
# Colombia — premium washed Arabica; Huila department
{"id": "colombia_huila", "lat": 2.5359, "lon": -75.5277, "name": "Huila", "country": "CO", "variety": "Arabica"},
# Ethiopia — birthplace of Arabica; Sidama zone (Yirgacheffe region)
{"id": "ethiopia_sidama", "lat": 6.7612, "lon": 38.4721, "name": "Sidama", "country": "ET", "variety": "Arabica"},
# Honduras — largest Central American producer; Copán department
{"id": "honduras_copan", "lat": 14.8333, "lon": -89.1500, "name": "Copán", "country": "HN", "variety": "Arabica"},
# Guatemala — benchmark Central American; Antigua valley
{"id": "guatemala_antigua", "lat": 14.5586, "lon": -90.7295, "name": "Antigua", "country": "GT", "variety": "Arabica"},
# Indonesia — Sumatra (Mandheling); significant Robusta production
{"id": "indonesia_sumatra", "lat": 3.5952, "lon": 98.6722, "name": "Sumatra", "country": "ID", "variety": "Robusta"},
]
assert len(LOCATIONS) == 8, f"Expected 8 locations, got {len(LOCATIONS)}"
assert all("id" in loc and "lat" in loc and "lon" in loc for loc in LOCATIONS), \
"Each location must have id, lat, lon"

View File

@@ -46,6 +46,7 @@ sqlmesh_materia = {workspace = true }
cftc_cot = {workspace = true } cftc_cot = {workspace = true }
coffee_prices = {workspace = true } coffee_prices = {workspace = true }
ice_stocks = {workspace = true } ice_stocks = {workspace = true }
openweathermap = {workspace = true }
[tool.uv.workspace] [tool.uv.workspace]
members = [ members = [
"extract/*", "extract/*",

View File

@@ -40,9 +40,17 @@ PIPELINES = {
"command": ["uv", "run", "--package", "ice_stocks", "extract_ice_all"], "command": ["uv", "run", "--package", "ice_stocks", "extract_ice_all"],
"timeout_seconds": 1800, "timeout_seconds": 1800,
}, },
"extract_weather": {
"command": ["uv", "run", "--package", "openweathermap", "extract_weather"],
"timeout_seconds": 300,
},
"extract_weather_backfill": {
"command": ["uv", "run", "--package", "openweathermap", "extract_weather_backfill"],
"timeout_seconds": 1200,
},
"extract_all": { "extract_all": {
"command": ["meta", "extract", "extract_cot", "extract_prices", "extract_ice_all"], "command": ["meta", "extract", "extract_cot", "extract_prices", "extract_ice_all", "extract_weather"],
"timeout_seconds": 6300, "timeout_seconds": 6600,
}, },
"transform": { "transform": {
"command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"], "command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"],
@@ -60,7 +68,7 @@ PIPELINES = {
META_PIPELINES: dict[str, list[str]] = { META_PIPELINES: dict[str, list[str]] = {
"extract_all": ["extract", "extract_cot", "extract_prices", "extract_ice_all"], "extract_all": ["extract", "extract_cot", "extract_prices", "extract_ice_all", "extract_weather"],
} }

View File

@@ -1,9 +1,9 @@
AUDIT ( AUDIT (
name assert_positive_order_ids, name assert_positive_order_ids
); );
SELECT * SELECT
*
FROM @this_model FROM @this_model
WHERE WHERE
item_id < 0 item_id < 0

View File

@@ -43,3 +43,14 @@ def ice_stocks_by_port_glob(evaluator) -> str:
"""Return a quoted glob path for all ICE historical by-port CSV gzip files under LANDING_DIR.""" """Return a quoted glob path for all ICE historical by-port CSV gzip files under LANDING_DIR."""
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
return f"'{landing_dir}/ice_stocks_by_port/**/*.csv.gzip'" return f"'{landing_dir}/ice_stocks_by_port/**/*.csv.gzip'"
@macro()
def weather_glob(evaluator) -> str:
"""Return a quoted glob path for all OWM weather JSON gzip files under LANDING_DIR.
Pattern: weather/{location_id}/{year}/{date}.json.gz
The double-star catches all location_id subdirectories.
"""
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
return f"'{landing_dir}/weather/**/*.json.gz'"

View File

@@ -8,11 +8,11 @@ MODEL (
); );
SELECT SELECT
max(hkey) as hkey, MAX(hkey) AS hkey,
commodity_code, commodity_code,
max(commodity_name) as commodity_name, MAX(commodity_name) AS commodity_name,
country_code, country_code,
max(country_name) as country_name, MAX(country_name) AS country_name,
market_year, market_year,
ingest_date, ingest_date,
COALESCE(SUM(CASE WHEN attribute_name = 'Production' THEN value END), 0) AS Production, COALESCE(SUM(CASE WHEN attribute_name = 'Production' THEN value END), 0) AS Production,
@@ -30,7 +30,8 @@ SELECT
COALESCE(SUM(CASE WHEN attribute_name = 'Waste' THEN value END), 0) AS Waste, COALESCE(SUM(CASE WHEN attribute_name = 'Waste' THEN value END), 0) AS Waste,
COALESCE(SUM(CASE WHEN attribute_name = 'Feed Use' THEN value END), 0) AS Feed_Use COALESCE(SUM(CASE WHEN attribute_name = 'Feed Use' THEN value END), 0) AS Feed_Use
FROM staging.psdalldata__commodity FROM staging.psdalldata__commodity
WHERE attribute_name IN ( WHERE
attribute_name IN (
'Production', 'Production',
'Imports', 'Imports',
'Exports', 'Exports',

View File

@@ -1,23 +1,15 @@
-- Commodity dimension: conforms identifiers across source systems. /* Commodity dimension: conforms identifiers across source systems. */ /* This is the ontology. Each row is a commodity tracked by BeanFlows. */ /* As new sources are added (ICO, futures prices, satellite), their */ /* commodity identifiers are added as columns here — not as separate tables. */ /* As new commodities are added (cocoa, sugar), rows are added here. */ /* References: */ /* usda_commodity_code → staging.psdalldata__commodity.commodity_code (numeric string, e.g. '0711100') */ /* cftc_commodity_code → foundation.fct_cot_positioning.cftc_commodity_code (3-char, e.g. '083') */ /* NOTE: Defined as FULL model (not SEED) to guarantee leading-zero preservation. */ /* Pandas CSV loading converts '083' → 83 even with varchar column declarations. */
--
-- This is the ontology. Each row is a commodity tracked by BeanFlows.
-- As new sources are added (ICO, futures prices, satellite), their
-- commodity identifiers are added as columns here — not as separate tables.
-- As new commodities are added (cocoa, sugar), rows are added here.
--
-- References:
-- usda_commodity_code → staging.psdalldata__commodity.commodity_code (numeric string, e.g. '0711100')
-- cftc_commodity_code → foundation.fct_cot_positioning.cftc_commodity_code (3-char, e.g. '083')
--
-- NOTE: Defined as FULL model (not SEED) to guarantee leading-zero preservation.
-- Pandas CSV loading converts '083' → 83 even with varchar column declarations.
MODEL ( MODEL (
name foundation.dim_commodity, name foundation.dim_commodity,
kind FULL kind FULL
); );
SELECT usda_commodity_code, cftc_commodity_code, ticker, ice_stock_report_code, commodity_name, commodity_group SELECT
usda_commodity_code,
cftc_commodity_code,
ticker,
ice_stock_report_code,
commodity_name,
commodity_group
FROM (VALUES FROM (VALUES
('0711100', '083', 'KC=F', 'COFFEE-C', 'Coffee, Green', 'Softs') ('0711100', '083', 'KC=F', 'COFFEE-C', 'Coffee, Green', 'Softs')) AS t(usda_commodity_code, cftc_commodity_code, ticker, ice_stock_report_code, commodity_name, commodity_group)
) AS t(usda_commodity_code, cftc_commodity_code, ticker, ice_stock_report_code, commodity_name, commodity_group)

View File

@@ -1,69 +1,58 @@
-- Foundation fact: daily KC=F Coffee C futures prices. /* Foundation fact: daily KC=F Coffee C futures prices. */ /* Reads directly from the landing zone, casts varchar columns to proper types, */ /* and deduplicates via hash key. */ /* Covers all available history from the landing directory. */ /* Grain: one row per trade_date. */ /* Dedup: hash of (trade_date, close) — if Yahoo Finance corrects a price, */ /* the new hash triggers a re-ingest on the next incremental run. */
--
-- Reads directly from the landing zone, casts varchar columns to proper types,
-- and deduplicates via hash key.
-- Covers all available history from the landing directory.
--
-- Grain: one row per trade_date.
-- Dedup: hash of (trade_date, close) — if Yahoo Finance corrects a price,
-- the new hash triggers a re-ingest on the next incremental run.
MODEL ( MODEL (
name foundation.fct_coffee_prices, name foundation.fct_coffee_prices,
kind INCREMENTAL_BY_TIME_RANGE ( kind INCREMENTAL_BY_TIME_RANGE (
time_column trade_date time_column trade_date
), ),
grain (trade_date), grain (
trade_date
),
start '1971-08-16', start '1971-08-16',
cron '@daily' cron '@daily'
); );
WITH src AS ( WITH src AS (
SELECT * FROM read_csv( SELECT
*
FROM READ_CSV(
@prices_glob(), @prices_glob(),
compression = 'gzip', compression = 'gzip',
header = true, header = TRUE,
union_by_name = true, union_by_name = TRUE,
filename = true, filename = TRUE,
all_varchar = true all_varchar = TRUE
) )
), ), cast_and_clean AS (
cast_and_clean AS (
SELECT SELECT
TRY_CAST(Date AS date) AS trade_date, TRY_CAST(Date AS DATE) AS trade_date,
TRY_CAST(Open AS double) AS open, TRY_CAST(Open AS DOUBLE) AS open,
TRY_CAST(High AS double) AS high, TRY_CAST(High AS DOUBLE) AS high,
TRY_CAST(Low AS double) AS low, TRY_CAST(Low AS DOUBLE) AS low,
TRY_CAST(Close AS double) AS close, TRY_CAST(Close AS DOUBLE) AS close,
TRY_CAST(Adj_Close AS double) AS adj_close, TRY_CAST(Adj_Close AS DOUBLE) AS adj_close,
TRY_CAST(Volume AS bigint) AS volume, TRY_CAST(Volume AS BIGINT) AS volume,
filename AS source_file, /* Filename encodes the content hash — use as ingest identifier */
-- Filename encodes the content hash — use as ingest identifier HASH(Date, Close) AS hkey /* Dedup key: trade date + close price */
filename AS source_file,
-- Dedup key: trade date + close price
hash(Date, Close) AS hkey
FROM src FROM src
WHERE TRY_CAST(Date AS date) IS NOT NULL WHERE
AND TRY_CAST(Close AS double) IS NOT NULL NOT TRY_CAST(Date AS DATE) IS NULL AND NOT TRY_CAST(Close AS DOUBLE) IS NULL
), ), deduplicated AS (
deduplicated AS (
SELECT SELECT
any_value(trade_date) AS trade_date, ANY_VALUE(trade_date) AS trade_date,
any_value(open) AS open, ANY_VALUE(open) AS open,
any_value(high) AS high, ANY_VALUE(high) AS high,
any_value(low) AS low, ANY_VALUE(low) AS low,
any_value(close) AS close, ANY_VALUE(close) AS close,
any_value(adj_close) AS adj_close, ANY_VALUE(adj_close) AS adj_close,
any_value(volume) AS volume, ANY_VALUE(volume) AS volume,
any_value(source_file) AS source_file, ANY_VALUE(source_file) AS source_file,
hkey hkey
FROM cast_and_clean FROM cast_and_clean
GROUP BY hkey GROUP BY
hkey
) )
SELECT
SELECT * *
FROM deduplicated FROM deduplicated
WHERE trade_date BETWEEN @start_ds AND @end_ds WHERE
trade_date BETWEEN @start_ds AND @end_ds

View File

@@ -1,14 +1,4 @@
-- Foundation fact: CFTC COT positioning, weekly grain, all commodities. /* Foundation fact: CFTC COT positioning, weekly grain, all commodities. */ /* Reads directly from the landing zone, casts varchar columns to proper types, */ /* cleans column names, computes net positions (long - short) per trader category, */ /* and deduplicates via hash key. Covers all commodities — filtering to */ /* a specific commodity happens in the serving layer. */ /* Grain: one row per (cftc_commodity_code, report_date, cftc_contract_market_code) */ /* History: revisions appear as new rows with a later ingest_date. */ /* Serving layer picks max(ingest_date) per grain for latest view. */
--
-- Reads directly from the landing zone, casts varchar columns to proper types,
-- cleans column names, computes net positions (long - short) per trader category,
-- and deduplicates via hash key. Covers all commodities — filtering to
-- a specific commodity happens in the serving layer.
--
-- Grain: one row per (cftc_commodity_code, report_date, cftc_contract_market_code)
-- History: revisions appear as new rows with a later ingest_date.
-- Serving layer picks max(ingest_date) per grain for latest view.
MODEL ( MODEL (
name foundation.fct_cot_positioning, name foundation.fct_cot_positioning,
kind INCREMENTAL_BY_TIME_RANGE ( kind INCREMENTAL_BY_TIME_RANGE (
@@ -20,92 +10,59 @@ MODEL (
); );
WITH src AS ( WITH src AS (
SELECT * FROM read_csv( SELECT
*
FROM READ_CSV(
@cot_glob(), @cot_glob(),
compression = 'gzip', compression = 'gzip',
header = true, header = TRUE,
union_by_name = true, union_by_name = TRUE,
filename = true, filename = TRUE,
all_varchar = true, all_varchar = TRUE,
max_line_size = 10000000 max_line_size = 10000000
) )
), ), cast_and_clean AS (
cast_and_clean AS (
SELECT SELECT
-- Identifiers TRIM(market_and_exchange_names) AS market_and_exchange_name, /* Identifiers */
trim(market_and_exchange_names) AS market_and_exchange_name, report_date_as_yyyy_mm_dd::DATE AS report_date,
report_date_as_yyyy_mm_dd::date AS report_date, TRIM(cftc_commodity_code) AS cftc_commodity_code,
trim(cftc_commodity_code) AS cftc_commodity_code, TRIM(cftc_contract_market_code) AS cftc_contract_market_code,
trim(cftc_contract_market_code) AS cftc_contract_market_code, TRIM(contract_units) AS contract_units,
trim(contract_units) AS contract_units, TRY_CAST(open_interest_all AS INT) AS open_interest, /* Open interest */ /* CFTC uses '.' as null for any field — use TRY_CAST throughout */
TRY_CAST(prod_merc_positions_long_all AS INT) AS prod_merc_long, /* Producer / Merchant (commercial hedgers: exporters, processors) */
-- Open interest TRY_CAST(prod_merc_positions_short_all AS INT) AS prod_merc_short,
-- CFTC uses '.' as null for any field — use TRY_CAST throughout TRY_CAST(swap_positions_long_all AS INT) AS swap_long, /* Swap dealers */
TRY_CAST(open_interest_all AS int) AS open_interest, TRY_CAST(swap_positions_short_all AS INT) AS swap_short,
TRY_CAST(swap_positions_spread_all AS INT) AS swap_spread,
-- Producer / Merchant (commercial hedgers: exporters, processors) TRY_CAST(m_money_positions_long_all AS INT) AS managed_money_long, /* Managed money (hedge funds, CTAs — the primary speculative signal) */
TRY_CAST(prod_merc_positions_long_all AS int) AS prod_merc_long, TRY_CAST(m_money_positions_short_all AS INT) AS managed_money_short,
TRY_CAST(prod_merc_positions_short_all AS int) AS prod_merc_short, TRY_CAST(m_money_positions_spread_all AS INT) AS managed_money_spread,
TRY_CAST(other_rept_positions_long_all AS INT) AS other_reportable_long, /* Other reportables */
-- Swap dealers TRY_CAST(other_rept_positions_short_all AS INT) AS other_reportable_short,
TRY_CAST(swap_positions_long_all AS int) AS swap_long, TRY_CAST(other_rept_positions_spread_all AS INT) AS other_reportable_spread,
TRY_CAST(swap_positions_short_all AS int) AS swap_short, TRY_CAST(nonrept_positions_long_all AS INT) AS nonreportable_long, /* Non-reportable (small speculators, below reporting threshold) */
TRY_CAST(swap_positions_spread_all AS int) AS swap_spread, TRY_CAST(nonrept_positions_short_all AS INT) AS nonreportable_short,
TRY_CAST(prod_merc_positions_long_all AS INT) /* Net positions (long minus short per category) */ - TRY_CAST(prod_merc_positions_short_all AS INT) AS prod_merc_net,
-- Managed money (hedge funds, CTAs — the primary speculative signal) TRY_CAST(m_money_positions_long_all AS INT) - TRY_CAST(m_money_positions_short_all AS INT) AS managed_money_net,
TRY_CAST(m_money_positions_long_all AS int) AS managed_money_long, TRY_CAST(swap_positions_long_all AS INT) - TRY_CAST(swap_positions_short_all AS INT) AS swap_net,
TRY_CAST(m_money_positions_short_all AS int) AS managed_money_short, TRY_CAST(other_rept_positions_long_all AS INT) - TRY_CAST(other_rept_positions_short_all AS INT) AS other_reportable_net,
TRY_CAST(m_money_positions_spread_all AS int) AS managed_money_spread, TRY_CAST(nonrept_positions_long_all AS INT) - TRY_CAST(nonrept_positions_short_all AS INT) AS nonreportable_net,
TRY_CAST(change_in_open_interest_all AS INT) AS change_open_interest, /* Week-over-week changes */
-- Other reportables TRY_CAST(change_in_m_money_long_all AS INT) AS change_managed_money_long,
TRY_CAST(other_rept_positions_long_all AS int) AS other_reportable_long, TRY_CAST(change_in_m_money_short_all AS INT) AS change_managed_money_short,
TRY_CAST(other_rept_positions_short_all AS int) AS other_reportable_short, TRY_CAST(change_in_m_money_long_all AS INT) - TRY_CAST(change_in_m_money_short_all AS INT) AS change_managed_money_net,
TRY_CAST(other_rept_positions_spread_all AS int) AS other_reportable_spread, TRY_CAST(change_in_prod_merc_long_all AS INT) AS change_prod_merc_long,
TRY_CAST(change_in_prod_merc_short_all AS INT) AS change_prod_merc_short,
-- Non-reportable (small speculators, below reporting threshold) TRY_CAST(conc_gross_le_4_tdr_long_all AS REAL) AS concentration_top4_long_pct, /* Concentration ratios (% of OI held by top 4 / top 8 traders) */
TRY_CAST(nonrept_positions_long_all AS int) AS nonreportable_long, TRY_CAST(conc_gross_le_4_tdr_short_all AS REAL) AS concentration_top4_short_pct,
TRY_CAST(nonrept_positions_short_all AS int) AS nonreportable_short, TRY_CAST(conc_gross_le_8_tdr_long_all AS REAL) AS concentration_top8_long_pct,
TRY_CAST(conc_gross_le_8_tdr_short_all AS REAL) AS concentration_top8_short_pct,
-- Net positions (long minus short per category) TRY_CAST(traders_tot_all AS INT) AS traders_total, /* Trader counts */
TRY_CAST(prod_merc_positions_long_all AS int) TRY_CAST(traders_m_money_long_all AS INT) AS traders_managed_money_long,
- TRY_CAST(prod_merc_positions_short_all AS int) AS prod_merc_net, TRY_CAST(traders_m_money_short_all AS INT) AS traders_managed_money_short,
TRY_CAST(m_money_positions_long_all AS int) TRY_CAST(traders_m_money_spread_all AS INT) AS traders_managed_money_spread,
- TRY_CAST(m_money_positions_short_all AS int) AS managed_money_net, MAKE_DATE(STR_SPLIT(filename, '/')[-2]::INT, 1, 1) AS ingest_date, /* Ingest date: derived from landing path year directory */ /* Path: .../cot/{year}/{etag}.csv.gzip → extract year from [-2] */
TRY_CAST(swap_positions_long_all AS int) HASH(
- TRY_CAST(swap_positions_short_all AS int) AS swap_net,
TRY_CAST(other_rept_positions_long_all AS int)
- TRY_CAST(other_rept_positions_short_all AS int) AS other_reportable_net,
TRY_CAST(nonrept_positions_long_all AS int)
- TRY_CAST(nonrept_positions_short_all AS int) AS nonreportable_net,
-- Week-over-week changes
TRY_CAST(change_in_open_interest_all AS int) AS change_open_interest,
TRY_CAST(change_in_m_money_long_all AS int) AS change_managed_money_long,
TRY_CAST(change_in_m_money_short_all AS int) AS change_managed_money_short,
TRY_CAST(change_in_m_money_long_all AS int)
- TRY_CAST(change_in_m_money_short_all AS int) AS change_managed_money_net,
TRY_CAST(change_in_prod_merc_long_all AS int) AS change_prod_merc_long,
TRY_CAST(change_in_prod_merc_short_all AS int) AS change_prod_merc_short,
-- Concentration ratios (% of OI held by top 4 / top 8 traders)
TRY_CAST(conc_gross_le_4_tdr_long_all AS float) AS concentration_top4_long_pct,
TRY_CAST(conc_gross_le_4_tdr_short_all AS float) AS concentration_top4_short_pct,
TRY_CAST(conc_gross_le_8_tdr_long_all AS float) AS concentration_top8_long_pct,
TRY_CAST(conc_gross_le_8_tdr_short_all AS float) AS concentration_top8_short_pct,
-- Trader counts
TRY_CAST(traders_tot_all AS int) AS traders_total,
TRY_CAST(traders_m_money_long_all AS int) AS traders_managed_money_long,
TRY_CAST(traders_m_money_short_all AS int) AS traders_managed_money_short,
TRY_CAST(traders_m_money_spread_all AS int) AS traders_managed_money_spread,
-- Ingest date: derived from landing path year directory
-- Path: .../cot/{year}/{etag}.csv.gzip → extract year from [-2]
make_date(split(filename, '/')[-2]::int, 1, 1) AS ingest_date,
-- Dedup key: hash of business grain + key metrics
hash(
cftc_commodity_code, cftc_commodity_code,
report_date_as_yyyy_mm_dd, report_date_as_yyyy_mm_dd,
cftc_contract_market_code, cftc_contract_market_code,
@@ -114,60 +71,61 @@ cast_and_clean AS (
m_money_positions_short_all, m_money_positions_short_all,
prod_merc_positions_long_all, prod_merc_positions_long_all,
prod_merc_positions_short_all prod_merc_positions_short_all
) AS hkey ) AS hkey /* Dedup key: hash of business grain + key metrics */
FROM src FROM src
-- Reject rows with null commodity code or malformed date /* Reject rows with null commodity code or malformed date */
WHERE trim(cftc_commodity_code) IS NOT NULL WHERE
AND len(trim(cftc_commodity_code)) > 0 NOT TRIM(cftc_commodity_code) IS NULL
AND report_date_as_yyyy_mm_dd::date IS NOT NULL AND LENGTH(TRIM(cftc_commodity_code)) > 0
), AND NOT report_date_as_yyyy_mm_dd::DATE IS NULL
), deduplicated AS (
deduplicated AS (
SELECT SELECT
any_value(market_and_exchange_name) AS market_and_exchange_name, ANY_VALUE(market_and_exchange_name) AS market_and_exchange_name,
any_value(report_date) AS report_date, ANY_VALUE(report_date) AS report_date,
any_value(cftc_commodity_code) AS cftc_commodity_code, ANY_VALUE(cftc_commodity_code) AS cftc_commodity_code,
any_value(cftc_contract_market_code) AS cftc_contract_market_code, ANY_VALUE(cftc_contract_market_code) AS cftc_contract_market_code,
any_value(contract_units) AS contract_units, ANY_VALUE(contract_units) AS contract_units,
any_value(open_interest) AS open_interest, ANY_VALUE(open_interest) AS open_interest,
any_value(prod_merc_long) AS prod_merc_long, ANY_VALUE(prod_merc_long) AS prod_merc_long,
any_value(prod_merc_short) AS prod_merc_short, ANY_VALUE(prod_merc_short) AS prod_merc_short,
any_value(prod_merc_net) AS prod_merc_net, ANY_VALUE(prod_merc_net) AS prod_merc_net,
any_value(swap_long) AS swap_long, ANY_VALUE(swap_long) AS swap_long,
any_value(swap_short) AS swap_short, ANY_VALUE(swap_short) AS swap_short,
any_value(swap_spread) AS swap_spread, ANY_VALUE(swap_spread) AS swap_spread,
any_value(swap_net) AS swap_net, ANY_VALUE(swap_net) AS swap_net,
any_value(managed_money_long) AS managed_money_long, ANY_VALUE(managed_money_long) AS managed_money_long,
any_value(managed_money_short) AS managed_money_short, ANY_VALUE(managed_money_short) AS managed_money_short,
any_value(managed_money_spread) AS managed_money_spread, ANY_VALUE(managed_money_spread) AS managed_money_spread,
any_value(managed_money_net) AS managed_money_net, ANY_VALUE(managed_money_net) AS managed_money_net,
any_value(other_reportable_long) AS other_reportable_long, ANY_VALUE(other_reportable_long) AS other_reportable_long,
any_value(other_reportable_short) AS other_reportable_short, ANY_VALUE(other_reportable_short) AS other_reportable_short,
any_value(other_reportable_spread) AS other_reportable_spread, ANY_VALUE(other_reportable_spread) AS other_reportable_spread,
any_value(other_reportable_net) AS other_reportable_net, ANY_VALUE(other_reportable_net) AS other_reportable_net,
any_value(nonreportable_long) AS nonreportable_long, ANY_VALUE(nonreportable_long) AS nonreportable_long,
any_value(nonreportable_short) AS nonreportable_short, ANY_VALUE(nonreportable_short) AS nonreportable_short,
any_value(nonreportable_net) AS nonreportable_net, ANY_VALUE(nonreportable_net) AS nonreportable_net,
any_value(change_open_interest) AS change_open_interest, ANY_VALUE(change_open_interest) AS change_open_interest,
any_value(change_managed_money_long) AS change_managed_money_long, ANY_VALUE(change_managed_money_long) AS change_managed_money_long,
any_value(change_managed_money_short) AS change_managed_money_short, ANY_VALUE(change_managed_money_short) AS change_managed_money_short,
any_value(change_managed_money_net) AS change_managed_money_net, ANY_VALUE(change_managed_money_net) AS change_managed_money_net,
any_value(change_prod_merc_long) AS change_prod_merc_long, ANY_VALUE(change_prod_merc_long) AS change_prod_merc_long,
any_value(change_prod_merc_short) AS change_prod_merc_short, ANY_VALUE(change_prod_merc_short) AS change_prod_merc_short,
any_value(concentration_top4_long_pct) AS concentration_top4_long_pct, ANY_VALUE(concentration_top4_long_pct) AS concentration_top4_long_pct,
any_value(concentration_top4_short_pct) AS concentration_top4_short_pct, ANY_VALUE(concentration_top4_short_pct) AS concentration_top4_short_pct,
any_value(concentration_top8_long_pct) AS concentration_top8_long_pct, ANY_VALUE(concentration_top8_long_pct) AS concentration_top8_long_pct,
any_value(concentration_top8_short_pct) AS concentration_top8_short_pct, ANY_VALUE(concentration_top8_short_pct) AS concentration_top8_short_pct,
any_value(traders_total) AS traders_total, ANY_VALUE(traders_total) AS traders_total,
any_value(traders_managed_money_long) AS traders_managed_money_long, ANY_VALUE(traders_managed_money_long) AS traders_managed_money_long,
any_value(traders_managed_money_short) AS traders_managed_money_short, ANY_VALUE(traders_managed_money_short) AS traders_managed_money_short,
any_value(traders_managed_money_spread) AS traders_managed_money_spread, ANY_VALUE(traders_managed_money_spread) AS traders_managed_money_spread,
any_value(ingest_date) AS ingest_date, ANY_VALUE(ingest_date) AS ingest_date,
hkey hkey
FROM cast_and_clean FROM cast_and_clean
GROUP BY hkey GROUP BY
hkey
) )
SELECT
SELECT * *
FROM deduplicated FROM deduplicated
WHERE report_date BETWEEN @start_ds AND @end_ds WHERE
report_date BETWEEN @start_ds AND @end_ds

View File

@@ -1,11 +1,4 @@
-- Foundation fact: ICE certified Coffee C (Arabica) aging report. /* Foundation fact: ICE certified Coffee C (Arabica) aging report. */ /* Reads directly from the landing zone, casts varchar columns to proper types, */ /* and deduplicates via hash key. */ /* Grain: one row per (report_date, age_bucket). */ /* Age buckets represent how long coffee has been in certified storage. */ /* Port columns are in bags (60kg). */
--
-- Reads directly from the landing zone, casts varchar columns to proper types,
-- and deduplicates via hash key.
-- Grain: one row per (report_date, age_bucket).
-- Age buckets represent how long coffee has been in certified storage.
-- Port columns are in bags (60kg).
MODEL ( MODEL (
name foundation.fct_ice_aging_stocks, name foundation.fct_ice_aging_stocks,
kind INCREMENTAL_BY_TIME_RANGE ( kind INCREMENTAL_BY_TIME_RANGE (
@@ -17,54 +10,53 @@ MODEL (
); );
WITH src AS ( WITH src AS (
SELECT * FROM read_csv( SELECT
*
FROM READ_CSV(
@ice_aging_glob(), @ice_aging_glob(),
compression = 'gzip', compression = 'gzip',
header = true, header = TRUE,
union_by_name = true, union_by_name = TRUE,
filename = true, filename = TRUE,
all_varchar = true all_varchar = TRUE
) )
), ), cast_and_clean AS (
cast_and_clean AS (
SELECT SELECT
TRY_CAST(report_date AS date) AS report_date, TRY_CAST(report_date AS DATE) AS report_date,
age_bucket, age_bucket,
TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags, TRY_CAST(antwerp_bags AS BIGINT) AS antwerp_bags,
TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags, TRY_CAST(hamburg_bremen_bags AS BIGINT) AS hamburg_bremen_bags,
TRY_CAST(houston_bags AS bigint) AS houston_bags, TRY_CAST(houston_bags AS BIGINT) AS houston_bags,
TRY_CAST(miami_bags AS bigint) AS miami_bags, TRY_CAST(miami_bags AS BIGINT) AS miami_bags,
TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags, TRY_CAST(new_orleans_bags AS BIGINT) AS new_orleans_bags,
TRY_CAST(new_york_bags AS bigint) AS new_york_bags, TRY_CAST(new_york_bags AS BIGINT) AS new_york_bags,
TRY_CAST(total_bags AS bigint) AS total_bags, TRY_CAST(total_bags AS BIGINT) AS total_bags,
filename AS source_file, filename AS source_file,
HASH(report_date, age_bucket, total_bags) AS hkey
hash(report_date, age_bucket, total_bags) AS hkey
FROM src FROM src
WHERE TRY_CAST(report_date AS date) IS NOT NULL WHERE
AND age_bucket IS NOT NULL NOT TRY_CAST(report_date AS DATE) IS NULL
AND age_bucket != '' AND NOT age_bucket IS NULL
), AND age_bucket <> ''
), deduplicated AS (
deduplicated AS (
SELECT SELECT
any_value(report_date) AS report_date, ANY_VALUE(report_date) AS report_date,
any_value(age_bucket) AS age_bucket, ANY_VALUE(age_bucket) AS age_bucket,
any_value(antwerp_bags) AS antwerp_bags, ANY_VALUE(antwerp_bags) AS antwerp_bags,
any_value(hamburg_bremen_bags) AS hamburg_bremen_bags, ANY_VALUE(hamburg_bremen_bags) AS hamburg_bremen_bags,
any_value(houston_bags) AS houston_bags, ANY_VALUE(houston_bags) AS houston_bags,
any_value(miami_bags) AS miami_bags, ANY_VALUE(miami_bags) AS miami_bags,
any_value(new_orleans_bags) AS new_orleans_bags, ANY_VALUE(new_orleans_bags) AS new_orleans_bags,
any_value(new_york_bags) AS new_york_bags, ANY_VALUE(new_york_bags) AS new_york_bags,
any_value(total_bags) AS total_bags, ANY_VALUE(total_bags) AS total_bags,
any_value(source_file) AS source_file, ANY_VALUE(source_file) AS source_file,
hkey hkey
FROM cast_and_clean FROM cast_and_clean
GROUP BY hkey GROUP BY
hkey
) )
SELECT
SELECT * *
FROM deduplicated FROM deduplicated
WHERE report_date BETWEEN @start_ds AND @end_ds WHERE
report_date BETWEEN @start_ds AND @end_ds

View File

@@ -1,59 +1,51 @@
-- Foundation fact: ICE certified Coffee C (Arabica) warehouse stocks. /* Foundation fact: ICE certified Coffee C (Arabica) warehouse stocks. */ /* Reads directly from the landing zone, casts varchar columns to proper types, */ /* and deduplicates via hash key. */ /* "Certified" means Coffee C graded and stamped as delivery-eligible */ /* against ICE futures contracts — a key physical supply indicator. */ /* Grain: one row per report_date. */
--
-- Reads directly from the landing zone, casts varchar columns to proper types,
-- and deduplicates via hash key.
-- "Certified" means Coffee C graded and stamped as delivery-eligible
-- against ICE futures contracts — a key physical supply indicator.
--
-- Grain: one row per report_date.
MODEL ( MODEL (
name foundation.fct_ice_warehouse_stocks, name foundation.fct_ice_warehouse_stocks,
kind INCREMENTAL_BY_TIME_RANGE ( kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date time_column report_date
), ),
grain (report_date), grain (
report_date
),
start '2000-01-01', start '2000-01-01',
cron '@daily' cron '@daily'
); );
WITH src AS ( WITH src AS (
SELECT * FROM read_csv( SELECT
*
FROM READ_CSV(
@ice_stocks_glob(), @ice_stocks_glob(),
compression = 'gzip', compression = 'gzip',
header = true, header = TRUE,
union_by_name = true, union_by_name = TRUE,
filename = true, filename = TRUE,
all_varchar = true all_varchar = TRUE
) )
), ), cast_and_clean AS (
cast_and_clean AS (
SELECT SELECT
TRY_CAST(report_date AS date) AS report_date, TRY_CAST(report_date AS DATE) AS report_date,
TRY_CAST(total_certified_bags AS bigint) AS total_certified_bags, TRY_CAST(total_certified_bags AS BIGINT) AS total_certified_bags,
TRY_CAST(pending_grading_bags AS bigint) AS pending_grading_bags, TRY_CAST(pending_grading_bags AS BIGINT) AS pending_grading_bags,
filename AS source_file, filename AS source_file,
HASH(report_date, total_certified_bags) AS hkey /* Dedup key: report date + total bags */
-- Dedup key: report date + total bags
hash(report_date, total_certified_bags) AS hkey
FROM src FROM src
WHERE TRY_CAST(report_date AS date) IS NOT NULL WHERE
AND TRY_CAST(total_certified_bags AS bigint) IS NOT NULL NOT TRY_CAST(report_date AS DATE) IS NULL
), AND NOT TRY_CAST(total_certified_bags AS BIGINT) IS NULL
), deduplicated AS (
deduplicated AS (
SELECT SELECT
any_value(report_date) AS report_date, ANY_VALUE(report_date) AS report_date,
any_value(total_certified_bags) AS total_certified_bags, ANY_VALUE(total_certified_bags) AS total_certified_bags,
any_value(pending_grading_bags) AS pending_grading_bags, ANY_VALUE(pending_grading_bags) AS pending_grading_bags,
any_value(source_file) AS source_file, ANY_VALUE(source_file) AS source_file,
hkey hkey
FROM cast_and_clean FROM cast_and_clean
GROUP BY hkey GROUP BY
hkey
) )
SELECT
SELECT * *
FROM deduplicated FROM deduplicated
WHERE report_date BETWEEN @start_ds AND @end_ds WHERE
report_date BETWEEN @start_ds AND @end_ds

View File

@@ -1,72 +1,65 @@
-- Foundation fact: ICE historical end-of-month Coffee C certified warehouse stocks by port. /* Foundation fact: ICE historical end-of-month Coffee C certified warehouse stocks by port. */ /* Reads directly from the landing zone, casts varchar columns to proper types, */ /* and deduplicates via hash key. */ /* Covers November 1996 to present (30-year history). */ /* Grain: one row per report_date (end-of-month). */ /* Port columns are in bags (60kg). */
--
-- Reads directly from the landing zone, casts varchar columns to proper types,
-- and deduplicates via hash key.
-- Covers November 1996 to present (30-year history).
--
-- Grain: one row per report_date (end-of-month).
-- Port columns are in bags (60kg).
MODEL ( MODEL (
name foundation.fct_ice_warehouse_stocks_by_port, name foundation.fct_ice_warehouse_stocks_by_port,
kind INCREMENTAL_BY_TIME_RANGE ( kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date time_column report_date
), ),
grain (report_date), grain (
report_date
),
start '1996-11-01', start '1996-11-01',
cron '@daily' cron '@daily'
); );
WITH src AS ( WITH src AS (
SELECT * FROM read_csv( SELECT
*
FROM READ_CSV(
@ice_stocks_by_port_glob(), @ice_stocks_by_port_glob(),
compression = 'gzip', compression = 'gzip',
header = true, header = TRUE,
union_by_name = true, union_by_name = TRUE,
filename = true, filename = TRUE,
all_varchar = true all_varchar = TRUE
) )
), ), cast_and_clean AS (
cast_and_clean AS (
SELECT SELECT
TRY_CAST(report_date AS date) AS report_date, TRY_CAST(report_date AS DATE) AS report_date,
TRY_CAST(new_york_bags AS bigint) AS new_york_bags, TRY_CAST(new_york_bags AS BIGINT) AS new_york_bags,
TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags, TRY_CAST(new_orleans_bags AS BIGINT) AS new_orleans_bags,
TRY_CAST(houston_bags AS bigint) AS houston_bags, TRY_CAST(houston_bags AS BIGINT) AS houston_bags,
TRY_CAST(miami_bags AS bigint) AS miami_bags, TRY_CAST(miami_bags AS BIGINT) AS miami_bags,
TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags, TRY_CAST(antwerp_bags AS BIGINT) AS antwerp_bags,
TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags, TRY_CAST(hamburg_bremen_bags AS BIGINT) AS hamburg_bremen_bags,
TRY_CAST(barcelona_bags AS bigint) AS barcelona_bags, TRY_CAST(barcelona_bags AS BIGINT) AS barcelona_bags,
TRY_CAST(virginia_bags AS bigint) AS virginia_bags, TRY_CAST(virginia_bags AS BIGINT) AS virginia_bags,
TRY_CAST(total_bags AS bigint) AS total_bags, TRY_CAST(total_bags AS BIGINT) AS total_bags,
filename AS source_file, filename AS source_file,
HASH(report_date, total_bags) AS hkey
hash(report_date, total_bags) AS hkey
FROM src FROM src
WHERE TRY_CAST(report_date AS date) IS NOT NULL WHERE
AND TRY_CAST(total_bags AS bigint) IS NOT NULL NOT TRY_CAST(report_date AS DATE) IS NULL
), AND NOT TRY_CAST(total_bags AS BIGINT) IS NULL
), deduplicated AS (
deduplicated AS (
SELECT SELECT
any_value(report_date) AS report_date, ANY_VALUE(report_date) AS report_date,
any_value(new_york_bags) AS new_york_bags, ANY_VALUE(new_york_bags) AS new_york_bags,
any_value(new_orleans_bags) AS new_orleans_bags, ANY_VALUE(new_orleans_bags) AS new_orleans_bags,
any_value(houston_bags) AS houston_bags, ANY_VALUE(houston_bags) AS houston_bags,
any_value(miami_bags) AS miami_bags, ANY_VALUE(miami_bags) AS miami_bags,
any_value(antwerp_bags) AS antwerp_bags, ANY_VALUE(antwerp_bags) AS antwerp_bags,
any_value(hamburg_bremen_bags) AS hamburg_bremen_bags, ANY_VALUE(hamburg_bremen_bags) AS hamburg_bremen_bags,
any_value(barcelona_bags) AS barcelona_bags, ANY_VALUE(barcelona_bags) AS barcelona_bags,
any_value(virginia_bags) AS virginia_bags, ANY_VALUE(virginia_bags) AS virginia_bags,
any_value(total_bags) AS total_bags, ANY_VALUE(total_bags) AS total_bags,
any_value(source_file) AS source_file, ANY_VALUE(source_file) AS source_file,
hkey hkey
FROM cast_and_clean FROM cast_and_clean
GROUP BY hkey GROUP BY
hkey
) )
SELECT
SELECT * *
FROM deduplicated FROM deduplicated
WHERE report_date BETWEEN @start_ds AND @end_ds WHERE
report_date BETWEEN @start_ds AND @end_ds

View File

@@ -0,0 +1,93 @@
/* Foundation fact: daily weather observations for 8 coffee-growing regions. */ /* Source: OpenWeatherMap One Call API 3.0 / Day Summary */ /* Landing: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz */ /* One file per (location_id, date). Content: raw OWM day summary JSON. */ /* Each file is a single JSON object (not newline-delimited), so format='auto'. */ /* Grain: (location_id, observation_date) — one row per location per day. */ /* Dedup key: hash(location_id, date) — past weather is immutable. */ /* location_id is parsed from the filename path: split(filename, '/')[-3] */ /* Path structure: .../weather/{location_id}/{year}/{date}.json.gz */ /* Crop stress flags (agronomic thresholds for Arabica coffee): */ /* is_frost — temp_min_c < 2.0°C (ICO frost damage threshold) */ /* is_heat_stress — temp_max_c > 35.0°C (photosynthesis impairment) */ /* is_drought — precipitation_mm < 1.0 (dry day; OWM omits field when 0) */ /* in_growing_season — simplified month-range flag by variety */
MODEL (
name foundation.fct_weather_daily,
kind INCREMENTAL_BY_TIME_RANGE (
time_column observation_date
),
grain (location_id, observation_date),
start '2020-01-01',
cron '@daily'
);
WITH src AS (
/* Each file is a single JSON object with nested fields: */ /* temperature.{min,max,afternoon,morning,evening,night} */ /* precipitation.total (absent when 0 — COALESCE to 0 downstream) */ /* humidity.afternoon */ /* cloud_cover.afternoon */ /* wind.max.{speed,direction} */ /* pressure.afternoon */ /* DuckDB read_json(format='auto') creates STRUCT columns for nested objects; */ /* fields are accessed with dot notation (temperature.min, wind.max.speed). */
SELECT
*
FROM READ_JSON(@weather_glob(), format = 'auto', compression = 'gzip', filename = TRUE)
), located AS (
SELECT
src.*,
STR_SPLIT(filename, '/')[-3] AS location_id, /* location_id is the 3rd-from-last path segment: */ /* e.g. .../weather/brazil_minas_gerais/2024/2024-01-15.json.gz → 'brazil_minas_gerais' */
TRY_CAST(src."date" AS DATE) AS observation_date
FROM src
), cast_and_clean AS (
SELECT
location_id,
observation_date,
TRY_CAST(located.temperature.min AS DOUBLE) AS temp_min_c, /* Temperature (°C, metric units) */
TRY_CAST(located.temperature.max AS DOUBLE) AS temp_max_c,
TRY_CAST(located.temperature.afternoon AS DOUBLE) AS temp_afternoon_c,
COALESCE(TRY_CAST(located.precipitation.total AS DOUBLE), 0.0) AS precipitation_mm, /* Precipitation (mm total for the day; OWM omits field when 0) */
TRY_CAST(located.humidity.afternoon AS DOUBLE) AS humidity_afternoon_pct, /* Humidity (% afternoon reading) */
TRY_CAST(located.cloud_cover.afternoon AS DOUBLE) AS cloud_cover_afternoon_pct, /* Cloud cover (% afternoon) */
TRY_CAST(located.wind.max.speed AS DOUBLE) AS wind_max_speed_ms, /* Wind (m/s max speed, degrees direction) */
TRY_CAST(located.pressure.afternoon AS DOUBLE) AS pressure_afternoon_hpa, /* Pressure (hPa afternoon) */
TRY_CAST(located.temperature.min AS DOUBLE) /* Crop stress flags */ < 2.0 AS is_frost,
TRY_CAST(located.temperature.max AS DOUBLE) > 35.0 AS is_heat_stress,
COALESCE(TRY_CAST(located.precipitation.total AS DOUBLE), 0.0) < 1.0 AS is_drought,
HASH(location_id, src."date") AS hkey,
filename
FROM located
WHERE
NOT observation_date IS NULL AND NOT location_id IS NULL AND location_id <> ''
), deduplicated AS (
SELECT
ANY_VALUE(location_id) AS location_id,
ANY_VALUE(observation_date) AS observation_date,
ANY_VALUE(temp_min_c) AS temp_min_c,
ANY_VALUE(temp_max_c) AS temp_max_c,
ANY_VALUE(temp_afternoon_c) AS temp_afternoon_c,
ANY_VALUE(precipitation_mm) AS precipitation_mm,
ANY_VALUE(humidity_afternoon_pct) AS humidity_afternoon_pct,
ANY_VALUE(cloud_cover_afternoon_pct) AS cloud_cover_afternoon_pct,
ANY_VALUE(wind_max_speed_ms) AS wind_max_speed_ms,
ANY_VALUE(pressure_afternoon_hpa) AS pressure_afternoon_hpa,
ANY_VALUE(is_frost) AS is_frost,
ANY_VALUE(is_heat_stress) AS is_heat_stress,
ANY_VALUE(is_drought) AS is_drought,
hkey
FROM cast_and_clean
GROUP BY
hkey
)
SELECT
d.observation_date,
d.location_id,
loc.name AS location_name,
loc.country,
loc.lat,
loc.lon,
loc.variety,
d.temp_min_c,
d.temp_max_c,
d.temp_afternoon_c,
d.precipitation_mm,
d.humidity_afternoon_pct,
d.cloud_cover_afternoon_pct,
d.wind_max_speed_ms,
d.pressure_afternoon_hpa,
d.is_frost,
d.is_heat_stress,
d.is_drought,
CASE loc.variety
WHEN 'Arabica'
THEN EXTRACT(MONTH FROM d.observation_date) BETWEEN 4 AND 10
WHEN 'Robusta'
THEN EXTRACT(MONTH FROM d.observation_date) BETWEEN 4 AND 11
ELSE FALSE
END AS in_growing_season /* Growing season: simplified month-range flag by variety. */ /* Arabica: AprOct (covers northern + southern hemisphere risk windows). */ /* Robusta: AprNov (Vietnam/Indonesia main cycle). */
FROM deduplicated AS d
LEFT JOIN seeds.weather_locations AS loc
ON d.location_id = loc.location_id
WHERE
d.observation_date BETWEEN @start_ds AND @end_ds

View File

@@ -2,8 +2,6 @@ MODEL (
name seeds.psd_attribute_codes, name seeds.psd_attribute_codes,
kind SEED ( kind SEED (
path '$root/seeds/psd_attribute_codes.csv', path '$root/seeds/psd_attribute_codes.csv',
csv_settings ( csv_settings (delimiter = ';')
delimiter = ';'
)
) )
); )

View File

@@ -2,9 +2,6 @@ MODEL (
name seeds.psd_commodity_codes, name seeds.psd_commodity_codes,
kind SEED ( kind SEED (
path '$root/seeds/psd_commodity_codes.csv', path '$root/seeds/psd_commodity_codes.csv',
csv_settings ( csv_settings (delimiter = ';')
delimiter = ';'
)
) )
); )

View File

@@ -2,9 +2,6 @@ MODEL (
name seeds.psd_unit_of_measure_codes, name seeds.psd_unit_of_measure_codes,
kind SEED ( kind SEED (
path '$root/seeds/psd_unit_of_measure_codes.csv', path '$root/seeds/psd_unit_of_measure_codes.csv',
csv_settings ( csv_settings (delimiter = ';')
delimiter = ';'
)
) )
); )

View File

@@ -0,0 +1,7 @@
MODEL (
name seeds.weather_locations,
kind SEED (
path '$root/seeds/weather_locations.csv',
csv_settings (delimiter = ';')
)
)

View File

@@ -1,16 +1,12 @@
-- Serving mart: KC=F Coffee C futures prices, analytics-ready. /* Serving mart: KC=F Coffee C futures prices, analytics-ready. */ /* Adds moving averages (20-day, 50-day SMA) and 52-week high/low range. */ /* Filtered to trading days only (NULL close rows excluded upstream). */ /* Grain: one row per trade_date. */
--
-- Adds moving averages (20-day, 50-day SMA) and 52-week high/low range.
-- Filtered to trading days only (NULL close rows excluded upstream).
--
-- Grain: one row per trade_date.
MODEL ( MODEL (
name serving.coffee_prices, name serving.coffee_prices,
kind INCREMENTAL_BY_TIME_RANGE ( kind INCREMENTAL_BY_TIME_RANGE (
time_column trade_date time_column trade_date
), ),
grain (trade_date), grain (
trade_date
),
start '1971-08-16', start '1971-08-16',
cron '@daily' cron '@daily'
); );
@@ -24,38 +20,26 @@ WITH base AS (
f.close, f.close,
f.adj_close, f.adj_close,
f.volume, f.volume,
ROUND(
-- Daily return: (close - prev_close) / prev_close * 100 (
round( f.close - LAG(f.close, 1) OVER (ORDER BY f.trade_date)
(f.close - LAG(f.close, 1) OVER (ORDER BY f.trade_date)) ) / NULLIF(LAG(f.close, 1) OVER (ORDER BY f.trade_date), 0) * 100,
/ NULLIF(LAG(f.close, 1) OVER (ORDER BY f.trade_date), 0) * 100,
4 4
) AS daily_return_pct, ) AS daily_return_pct, /* Daily return: (close - prev_close) / prev_close * 100 */
ROUND(
-- 20-day simple moving average (1 trading month)
round(
AVG(f.close) OVER (ORDER BY f.trade_date ROWS BETWEEN 19 PRECEDING AND CURRENT ROW), AVG(f.close) OVER (ORDER BY f.trade_date ROWS BETWEEN 19 PRECEDING AND CURRENT ROW),
4 4
) AS sma_20d, ) AS sma_20d, /* 20-day simple moving average (1 trading month) */
ROUND(
-- 50-day simple moving average (2.5 trading months)
round(
AVG(f.close) OVER (ORDER BY f.trade_date ROWS BETWEEN 49 PRECEDING AND CURRENT ROW), AVG(f.close) OVER (ORDER BY f.trade_date ROWS BETWEEN 49 PRECEDING AND CURRENT ROW),
4 4
) AS sma_50d, ) AS sma_50d, /* 50-day simple moving average (2.5 trading months) */
MAX(f.high) OVER (ORDER BY f.trade_date ROWS BETWEEN 251 PRECEDING AND CURRENT ROW) AS high_52w, /* 52-week high (approximately 252 trading days) */
-- 52-week high (approximately 252 trading days) MIN(f.low) OVER (ORDER BY f.trade_date ROWS BETWEEN 251 PRECEDING AND CURRENT ROW) AS low_52w /* 52-week low */
MAX(f.high) OVER (ORDER BY f.trade_date ROWS BETWEEN 251 PRECEDING AND CURRENT ROW) FROM foundation.fct_coffee_prices AS f
AS high_52w, WHERE
f.trade_date BETWEEN @start_ds AND @end_ds
-- 52-week low
MIN(f.low) OVER (ORDER BY f.trade_date ROWS BETWEEN 251 PRECEDING AND CURRENT ROW)
AS low_52w
FROM foundation.fct_coffee_prices f
WHERE f.trade_date BETWEEN @start_ds AND @end_ds
) )
SELECT SELECT
b.trade_date, b.trade_date,
d.commodity_name, d.commodity_name,
@@ -71,7 +55,9 @@ SELECT
b.sma_50d, b.sma_50d,
b.high_52w, b.high_52w,
b.low_52w b.low_52w
FROM base b FROM base AS b
CROSS JOIN foundation.dim_commodity d CROSS JOIN foundation.dim_commodity AS d
WHERE d.ticker = 'KC=F' WHERE
ORDER BY b.trade_date d.ticker = 'KC=F'
ORDER BY
b.trade_date

View File

@@ -1,12 +1,4 @@
-- Serving mart: ICE certified Coffee C stock aging report, analytics-ready. /* Serving mart: ICE certified Coffee C stock aging report, analytics-ready. */ /* Shows the age distribution of certified stocks across delivery ports. */ /* Age buckets represent how long coffee has been in certified storage. */ /* Older stock approaching certificate limits is a supply quality signal. */ /* Source: ICE Certified Stock Aging Report (monthly) */ /* Grain: one row per (report_date, age_bucket). */
--
-- Shows the age distribution of certified stocks across delivery ports.
-- Age buckets represent how long coffee has been in certified storage.
-- Older stock approaching certificate limits is a supply quality signal.
--
-- Source: ICE Certified Stock Aging Report (monthly)
-- Grain: one row per (report_date, age_bucket).
MODEL ( MODEL (
name serving.ice_aging_stocks, name serving.ice_aging_stocks,
kind INCREMENTAL_BY_TIME_RANGE ( kind INCREMENTAL_BY_TIME_RANGE (
@@ -21,11 +13,8 @@ WITH base AS (
SELECT SELECT
f.report_date, f.report_date,
f.age_bucket, f.age_bucket,
TRY_CAST(SPLIT_PART(f.age_bucket, ' to ', 1) AS INT) AS age_bucket_start_days, /* Parse age range from "0000 to 0120" format for correct sort order */
-- Parse age range from "0000 to 0120" format for correct sort order TRY_CAST(SPLIT_PART(f.age_bucket, ' to ', 2) AS INT) AS age_bucket_end_days,
TRY_CAST(split_part(f.age_bucket, ' to ', 1) AS int) AS age_bucket_start_days,
TRY_CAST(split_part(f.age_bucket, ' to ', 2) AS int) AS age_bucket_end_days,
f.antwerp_bags, f.antwerp_bags,
f.hamburg_bremen_bags, f.hamburg_bremen_bags,
f.houston_bags, f.houston_bags,
@@ -33,12 +22,11 @@ WITH base AS (
f.new_orleans_bags, f.new_orleans_bags,
f.new_york_bags, f.new_york_bags,
f.total_bags, f.total_bags,
f.source_file f.source_file
FROM foundation.fct_ice_aging_stocks f FROM foundation.fct_ice_aging_stocks AS f
WHERE f.report_date BETWEEN @start_ds AND @end_ds WHERE
f.report_date BETWEEN @start_ds AND @end_ds
) )
SELECT SELECT
b.report_date, b.report_date,
d.commodity_name, d.commodity_name,
@@ -54,7 +42,10 @@ SELECT
b.new_york_bags, b.new_york_bags,
b.total_bags, b.total_bags,
b.source_file b.source_file
FROM base b FROM base AS b
CROSS JOIN foundation.dim_commodity d CROSS JOIN foundation.dim_commodity AS d
WHERE d.ice_stock_report_code = 'COFFEE-C' WHERE
ORDER BY b.report_date, b.age_bucket_start_days d.ice_stock_report_code = 'COFFEE-C'
ORDER BY
b.report_date,
b.age_bucket_start_days

View File

@@ -1,19 +1,12 @@
-- Serving mart: ICE certified Coffee C warehouse stocks, analytics-ready. /* Serving mart: ICE certified Coffee C warehouse stocks, analytics-ready. */ /* Adds 30-day rolling average, week-over-week change, and drawdown from */ /* 52-week high. Physical supply indicator used alongside S/D and positioning. */ /* "Certified stocks" = coffee graded and stamped as eligible for delivery */ /* against ICE Coffee C futures — traders watch this as a squeeze indicator. */ /* Grain: one row per report_date. */
--
-- Adds 30-day rolling average, week-over-week change, and drawdown from
-- 52-week high. Physical supply indicator used alongside S/D and positioning.
--
-- "Certified stocks" = coffee graded and stamped as eligible for delivery
-- against ICE Coffee C futures — traders watch this as a squeeze indicator.
--
-- Grain: one row per report_date.
MODEL ( MODEL (
name serving.ice_warehouse_stocks, name serving.ice_warehouse_stocks,
kind INCREMENTAL_BY_TIME_RANGE ( kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date time_column report_date
), ),
grain (report_date), grain (
report_date
),
start '2000-01-01', start '2000-01-01',
cron '@daily' cron '@daily'
); );
@@ -23,45 +16,25 @@ WITH base AS (
f.report_date, f.report_date,
f.total_certified_bags, f.total_certified_bags,
f.pending_grading_bags, f.pending_grading_bags,
f.total_certified_bags /* Week-over-week change (compare to 7 calendar days ago via LAG over ordered rows) */ /* Using LAG(1) since data is daily: compares to previous trading/reporting day */ - LAG(f.total_certified_bags, 1) OVER (ORDER BY f.report_date) AS wow_change_bags,
-- Week-over-week change (compare to 7 calendar days ago via LAG over ordered rows) ROUND(
-- Using LAG(1) since data is daily: compares to previous trading/reporting day AVG(f.total_certified_bags::DOUBLE) OVER (ORDER BY f.report_date ROWS BETWEEN 29 PRECEDING AND CURRENT ROW),
f.total_certified_bags
- LAG(f.total_certified_bags, 1) OVER (ORDER BY f.report_date) AS wow_change_bags,
-- 30-day rolling average (smooths daily noise)
round(
AVG(f.total_certified_bags::double) OVER (
ORDER BY f.report_date ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
),
0 0
) AS avg_30d_bags, ) AS avg_30d_bags, /* 30-day rolling average (smooths daily noise) */
MAX(f.total_certified_bags) OVER (ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW) AS high_52w_bags, /* 52-week high (365 calendar days ≈ 252 trading days; use 365-row window as proxy) */
-- 52-week high (365 calendar days ≈ 252 trading days; use 365-row window as proxy) ROUND(
MAX(f.total_certified_bags) OVER ( (
ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW f.total_certified_bags::DOUBLE - MAX(f.total_certified_bags) OVER (ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW)::DOUBLE
) AS high_52w_bags, ) / NULLIF(
MAX(f.total_certified_bags) OVER (ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW)::DOUBLE,
-- Drawdown from 52-week high (pct below peak — squeeze indicator)
round(
(f.total_certified_bags::double
- MAX(f.total_certified_bags) OVER (
ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW
)::double
)
/ NULLIF(
MAX(f.total_certified_bags) OVER (
ORDER BY f.report_date ROWS BETWEEN 364 PRECEDING AND CURRENT ROW
)::double,
0 0
) * 100, ) * 100,
2 2
) AS drawdown_from_52w_high_pct ) AS drawdown_from_52w_high_pct /* Drawdown from 52-week high (pct below peak — squeeze indicator) */
FROM foundation.fct_ice_warehouse_stocks AS f
FROM foundation.fct_ice_warehouse_stocks f WHERE
WHERE f.report_date BETWEEN @start_ds AND @end_ds f.report_date BETWEEN @start_ds AND @end_ds
) )
SELECT SELECT
b.report_date, b.report_date,
d.commodity_name, d.commodity_name,
@@ -72,7 +45,9 @@ SELECT
b.avg_30d_bags, b.avg_30d_bags,
b.high_52w_bags, b.high_52w_bags,
b.drawdown_from_52w_high_pct b.drawdown_from_52w_high_pct
FROM base b FROM base AS b
CROSS JOIN foundation.dim_commodity d CROSS JOIN foundation.dim_commodity AS d
WHERE d.ice_stock_report_code = 'COFFEE-C' WHERE
ORDER BY b.report_date d.ice_stock_report_code = 'COFFEE-C'
ORDER BY
b.report_date

View File

@@ -1,18 +1,12 @@
-- Serving mart: ICE certified Coffee C warehouse stocks by port, analytics-ready. /* Serving mart: ICE certified Coffee C warehouse stocks by port, analytics-ready. */ /* End-of-month certified stock levels broken down by delivery port. */ /* Covers November 1996 to present (~30 years). Useful for understanding */ /* geographic shifts in the certified supply base over time. */ /* Source: ICE historical by-port XLS (EOM_KC_cert_stox_by_port_nov96-present.xls) */ /* Grain: one row per report_date (end-of-month). */
--
-- End-of-month certified stock levels broken down by delivery port.
-- Covers November 1996 to present (~30 years). Useful for understanding
-- geographic shifts in the certified supply base over time.
--
-- Source: ICE historical by-port XLS (EOM_KC_cert_stox_by_port_nov96-present.xls)
-- Grain: one row per report_date (end-of-month).
MODEL ( MODEL (
name serving.ice_warehouse_stocks_by_port, name serving.ice_warehouse_stocks_by_port,
kind INCREMENTAL_BY_TIME_RANGE ( kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date time_column report_date
), ),
grain (report_date), grain (
report_date
),
start '1996-11-01', start '1996-11-01',
cron '@daily' cron '@daily'
); );
@@ -29,32 +23,22 @@ WITH base AS (
f.barcelona_bags, f.barcelona_bags,
f.virginia_bags, f.virginia_bags,
f.total_bags, f.total_bags,
f.total_bags /* Month-over-month change in total certified bags */ - LAG(f.total_bags, 1) OVER (ORDER BY f.report_date) AS mom_change_bags,
-- Month-over-month change in total certified bags ROUND(
f.total_bags (
- LAG(f.total_bags, 1) OVER (ORDER BY f.report_date) AS mom_change_bags, f.total_bags::DOUBLE - LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::DOUBLE
) / NULLIF(LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::DOUBLE, 0) * 100,
-- Month-over-month percent change
round(
(f.total_bags::double
- LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::double)
/ NULLIF(LAG(f.total_bags, 1) OVER (ORDER BY f.report_date)::double, 0) * 100,
2 2
) AS mom_change_pct, ) AS mom_change_pct, /* Month-over-month percent change */
ROUND(
-- 12-month rolling average AVG(f.total_bags::DOUBLE) OVER (ORDER BY f.report_date ROWS BETWEEN 11 PRECEDING AND CURRENT ROW),
round(
AVG(f.total_bags::double) OVER (
ORDER BY f.report_date ROWS BETWEEN 11 PRECEDING AND CURRENT ROW
),
0 0
) AS avg_12m_bags, ) AS avg_12m_bags, /* 12-month rolling average */
f.source_file f.source_file
FROM foundation.fct_ice_warehouse_stocks_by_port f FROM foundation.fct_ice_warehouse_stocks_by_port AS f
WHERE f.report_date BETWEEN @start_ds AND @end_ds WHERE
f.report_date BETWEEN @start_ds AND @end_ds
) )
SELECT SELECT
b.report_date, b.report_date,
d.commodity_name, d.commodity_name,
@@ -72,7 +56,9 @@ SELECT
b.mom_change_pct, b.mom_change_pct,
b.avg_12m_bags, b.avg_12m_bags,
b.source_file b.source_file
FROM base b FROM base AS b
CROSS JOIN foundation.dim_commodity d CROSS JOIN foundation.dim_commodity AS d
WHERE d.ice_stock_report_code = 'COFFEE-C' WHERE
ORDER BY b.report_date d.ice_stock_report_code = 'COFFEE-C'
ORDER BY
b.report_date

View File

@@ -7,7 +7,7 @@ MODEL (
cron '@daily' cron '@daily'
); );
-- CTE to calculate country-level derived metrics /* CTE to calculate country-level derived metrics */
WITH country_metrics AS ( WITH country_metrics AS (
SELECT SELECT
commodity_code, commodity_code,
@@ -21,21 +21,30 @@ WITH country_metrics AS (
Exports, Exports,
Total_Distribution, Total_Distribution,
Ending_Stocks, Ending_Stocks,
-- Derived metrics per country, mirroring Python script (
(Production + Imports - Exports) AS Net_Supply, Production + Imports - Exports
(Exports - Imports) AS Trade_Balance, ) AS Net_Supply, /* Derived metrics per country, mirroring Python script */
(Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance, (
-- Handle division by zero for Stock-to-Use Ratio Exports - Imports
(Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct, ) AS Trade_Balance,
-- Calculate Production YoY percentage change using a window function (
(Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY market_year, ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY market_year, ingest_date), 0) * 100 AS Production_YoY_pct Production + Imports - Exports
) - Total_Distribution AS Supply_Demand_Balance,
(
Ending_Stocks / NULLIF(Total_Distribution, 0)
) /* Handle division by zero for Stock-to-Use Ratio */ * 100 AS Stock_to_Use_Ratio_pct,
(
Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY market_year, ingest_date)
) /* Calculate Production YoY percentage change using a window function */ / NULLIF(
LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY market_year, ingest_date),
0
) * 100 AS Production_YoY_pct
FROM cleaned.psdalldata__commodity_pivoted FROM cleaned.psdalldata__commodity_pivoted
), ), global_aggregates AS (
global_aggregates AS (
SELECT SELECT
commodity_code, commodity_code,
commodity_name, commodity_name,
NULL::TEXT AS country_code, -- Use NULL for global aggregates NULL::TEXT AS country_code, /* Use NULL for global aggregates */
'Global' AS country_name, 'Global' AS country_name,
market_year, market_year,
ingest_date, ingest_date,
@@ -50,9 +59,7 @@ global_aggregates AS (
commodity_name, commodity_name,
market_year, market_year,
ingest_date ingest_date
), ), global_metrics /* CTE to calculate derived metrics for global aggregates */ AS (
-- CTE to calculate derived metrics for global aggregates
global_metrics AS (
SELECT SELECT
commodity_code, commodity_code,
commodity_name, commodity_name,
@@ -65,14 +72,27 @@ global_metrics AS (
Exports, Exports,
Total_Distribution, Total_Distribution,
Ending_Stocks, Ending_Stocks,
(Production + Imports - Exports) AS Net_Supply, (
(Exports - Imports) AS Trade_Balance, Production + Imports - Exports
(Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance, ) AS Net_Supply,
(Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct, (
(Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY market_year, ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY market_year, ingest_date), 0) * 100 AS Production_YoY_pct Exports - Imports
) AS Trade_Balance,
(
Production + Imports - Exports
) - Total_Distribution AS Supply_Demand_Balance,
(
Ending_Stocks / NULLIF(Total_Distribution, 0)
) * 100 AS Stock_to_Use_Ratio_pct,
(
Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY market_year, ingest_date)
) / NULLIF(
LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY market_year, ingest_date),
0
) * 100 AS Production_YoY_pct
FROM global_aggregates FROM global_aggregates
) )
-- Combine country-level and global-level data into a single output /* Combine country-level and global-level data into a single output */
SELECT SELECT
commodity_code, commodity_code,
commodity_name, commodity_name,
@@ -103,4 +123,4 @@ ORDER BY
commodity_name, commodity_name,
country_name, country_name,
market_year, market_year,
ingest_date; ingest_date

View File

@@ -1,41 +1,32 @@
-- Serving mart: COT positioning for Coffee C futures, analytics-ready. /* Serving mart: COT positioning for Coffee C futures, analytics-ready. */ /* Joins foundation.fct_cot_positioning with foundation.dim_commodity so */ /* the coffee filter is driven by the dimension (not a hardcoded CFTC code). */ /* Adds derived analytics used by the dashboard and API: */ /* - Normalized positioning (% of open interest) */ /* - Long/short ratio */ /* - Week-over-week momentum */ /* - COT Index over 26-week and 52-week trailing windows (0=bearish, 100=bullish) */ /* Grain: one row per report_date for Coffee C futures. */ /* Latest revision per date: MAX(ingest_date) used to deduplicate CFTC corrections. */
--
-- Joins foundation.fct_cot_positioning with foundation.dim_commodity so
-- the coffee filter is driven by the dimension (not a hardcoded CFTC code).
-- Adds derived analytics used by the dashboard and API:
-- - Normalized positioning (% of open interest)
-- - Long/short ratio
-- - Week-over-week momentum
-- - COT Index over 26-week and 52-week trailing windows (0=bearish, 100=bullish)
--
-- Grain: one row per report_date for Coffee C futures.
-- Latest revision per date: MAX(ingest_date) used to deduplicate CFTC corrections.
MODEL ( MODEL (
name serving.cot_positioning, name serving.cot_positioning,
kind INCREMENTAL_BY_TIME_RANGE ( kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date time_column report_date
), ),
grain (report_date), grain (
report_date
),
start '2006-06-13', start '2006-06-13',
cron '@daily' cron '@daily'
); );
WITH latest_revision AS ( WITH latest_revision AS (
-- Pick the most recently ingested row when CFTC issues corrections /* Pick the most recently ingested row when CFTC issues corrections */
SELECT f.* SELECT
FROM foundation.fct_cot_positioning f f.*
INNER JOIN foundation.dim_commodity d FROM foundation.fct_cot_positioning AS f
INNER JOIN foundation.dim_commodity AS d
ON f.cftc_commodity_code = d.cftc_commodity_code ON f.cftc_commodity_code = d.cftc_commodity_code
WHERE d.commodity_name = 'Coffee, Green' WHERE
d.commodity_name = 'Coffee, Green'
AND f.report_date BETWEEN @start_ds AND @end_ds AND f.report_date BETWEEN @start_ds AND @end_ds
QUALIFY ROW_NUMBER() OVER ( QUALIFY
ROW_NUMBER() OVER (
PARTITION BY f.report_date, f.cftc_contract_market_code PARTITION BY f.report_date, f.cftc_contract_market_code
ORDER BY f.ingest_date DESC ORDER BY f.ingest_date DESC
) = 1 ) = 1
), ), with_derived AS (
with_derived AS (
SELECT SELECT
report_date, report_date,
market_and_exchange_name, market_and_exchange_name,
@@ -43,9 +34,7 @@ with_derived AS (
cftc_contract_market_code, cftc_contract_market_code,
contract_units, contract_units,
ingest_date, ingest_date,
open_interest, /* Absolute positions (contracts) */
-- Absolute positions (contracts)
open_interest,
managed_money_long, managed_money_long,
managed_money_short, managed_money_short,
managed_money_spread, managed_money_spread,
@@ -64,77 +53,52 @@ with_derived AS (
nonreportable_long, nonreportable_long,
nonreportable_short, nonreportable_short,
nonreportable_net, nonreportable_net,
ROUND(managed_money_net::REAL / NULLIF(open_interest, 0) * 100, 2) AS managed_money_net_pct_of_oi, /* Normalized: managed money net as % of open interest */ /* Removes size effects and makes cross-period comparison meaningful */
-- Normalized: managed money net as % of open interest ROUND(managed_money_long::REAL / NULLIF(managed_money_short, 0), 3) AS managed_money_long_short_ratio, /* Long/short ratio: >1 = more bulls than bears in managed money */
-- Removes size effects and makes cross-period comparison meaningful change_open_interest, /* Weekly changes */
round(
managed_money_net::float / NULLIF(open_interest, 0) * 100,
2
) AS managed_money_net_pct_of_oi,
-- Long/short ratio: >1 = more bulls than bears in managed money
round(
managed_money_long::float / NULLIF(managed_money_short, 0),
3
) AS managed_money_long_short_ratio,
-- Weekly changes
change_open_interest,
change_managed_money_long, change_managed_money_long,
change_managed_money_short, change_managed_money_short,
change_managed_money_net, change_managed_money_net,
change_prod_merc_long, change_prod_merc_long,
change_prod_merc_short, change_prod_merc_short,
managed_money_net /* Week-over-week momentum in managed money net (via LAG) */ - LAG(managed_money_net, 1) OVER (ORDER BY report_date) AS managed_money_net_wow,
-- Week-over-week momentum in managed money net (via LAG) concentration_top4_long_pct, /* Concentration */
managed_money_net - LAG(managed_money_net, 1) OVER (
ORDER BY report_date
) AS managed_money_net_wow,
-- Concentration
concentration_top4_long_pct,
concentration_top4_short_pct, concentration_top4_short_pct,
concentration_top8_long_pct, concentration_top8_long_pct,
concentration_top8_short_pct, concentration_top8_short_pct,
traders_total, /* Trader counts */
-- Trader counts
traders_total,
traders_managed_money_long, traders_managed_money_long,
traders_managed_money_short, traders_managed_money_short,
traders_managed_money_spread, traders_managed_money_spread,
-- COT Index (26-week): where is current net vs. trailing 26 weeks?
-- 0 = most bearish extreme, 100 = most bullish extreme
-- Industry-standard sentiment gauge (equivalent to RSI for positioning)
CASE CASE
WHEN MAX(managed_money_net) OVER w26 = MIN(managed_money_net) OVER w26 WHEN MAX(managed_money_net) OVER w26 = MIN(managed_money_net) OVER w26
THEN 50.0 THEN 50.0
ELSE round( ELSE ROUND(
(managed_money_net - MIN(managed_money_net) OVER w26)::float (
/ (MAX(managed_money_net) OVER w26 - MIN(managed_money_net) OVER w26) managed_money_net - MIN(managed_money_net) OVER w26
* 100, )::REAL / (
MAX(managed_money_net) OVER w26 - MIN(managed_money_net) OVER w26
) * 100,
1 1
) )
END AS cot_index_26w, END AS cot_index_26w, /* COT Index (26-week): where is current net vs. trailing 26 weeks? */ /* 0 = most bearish extreme, 100 = most bullish extreme */ /* Industry-standard sentiment gauge (equivalent to RSI for positioning) */
-- COT Index (52-week): longer-term positioning context
CASE CASE
WHEN MAX(managed_money_net) OVER w52 = MIN(managed_money_net) OVER w52 WHEN MAX(managed_money_net) OVER w52 = MIN(managed_money_net) OVER w52
THEN 50.0 THEN 50.0
ELSE round( ELSE ROUND(
(managed_money_net - MIN(managed_money_net) OVER w52)::float (
/ (MAX(managed_money_net) OVER w52 - MIN(managed_money_net) OVER w52) managed_money_net - MIN(managed_money_net) OVER w52
* 100, )::REAL / (
MAX(managed_money_net) OVER w52 - MIN(managed_money_net) OVER w52
) * 100,
1 1
) )
END AS cot_index_52w END AS cot_index_52w /* COT Index (52-week): longer-term positioning context */
FROM latest_revision FROM latest_revision
WINDOW WINDOW w26 AS (ORDER BY report_date ROWS BETWEEN 25 PRECEDING AND CURRENT ROW), w52 AS (ORDER BY report_date ROWS BETWEEN 51 PRECEDING AND CURRENT ROW)
w26 AS (ORDER BY report_date ROWS BETWEEN 25 PRECEDING AND CURRENT ROW),
w52 AS (ORDER BY report_date ROWS BETWEEN 51 PRECEDING AND CURRENT ROW)
) )
SELECT
SELECT * *
FROM with_derived FROM with_derived
ORDER BY report_date ORDER BY
report_date

View File

@@ -6,55 +6,77 @@ MODEL (
start '2006-08-01', start '2006-08-01',
cron '@daily' cron '@daily'
); );
with cast_dtypes as (
WITH cast_dtypes AS (
SELECT SELECT
src.commodity_code::int as commodity_code, src.commodity_code::INT AS commodity_code,
coalesce(commodity_name, commodity_description) as commodity_name, COALESCE(commodity_name, commodity_description) AS commodity_name,
country_code::varchar(3) as country_code, country_code::TEXT AS country_code,
country_name, country_name,
market_year::int as market_year, market_year::INT AS market_year,
calendar_year::int as calendar_year, calendar_year::INT AS calendar_year,
month::int as month, month::INT AS month,
src.attribute_id::int as attribute_id, src.attribute_id::INT AS attribute_id,
coalesce(attribute_name, attribute_description) as attribute_name, COALESCE(attribute_name, attribute_description) AS attribute_name,
src.unit_id::int as unit_id, src.unit_id::INT AS unit_id,
coalesce(unit_name, unit_description) as unit_name, COALESCE(unit_name, unit_description) AS unit_name,
value::float as value, value::REAL AS value,
filename filename
FROM read_csv( FROM READ_CSV(
@psd_glob(), @psd_glob(),
compression = 'gzip', compression = 'gzip',
header = true, header = TRUE,
union_by_name = true, union_by_name = TRUE,
filename = true, filename = TRUE,
all_varchar = true, all_varchar = TRUE,
max_line_size = 10000000 max_line_size = 10000000
) AS src ) AS src
left join seeds.psd_commodity_codes on seeds.psd_commodity_codes.commodity_code = src.commodity_code::int LEFT JOIN seeds.psd_commodity_codes
left join seeds.psd_unit_of_measure_codes on seeds.psd_unit_of_measure_codes.unit_id = src.unit_id::int ON seeds.psd_commodity_codes.commodity_code = src.commodity_code::INT
left join seeds.psd_attribute_codes on seeds.psd_attribute_codes.attribute_id = src.attribute_id::int LEFT JOIN seeds.psd_unit_of_measure_codes
), ON seeds.psd_unit_of_measure_codes.unit_id = src.unit_id::INT
metadata_and_deduplication as ( LEFT JOIN seeds.psd_attribute_codes
select ON seeds.psd_attribute_codes.attribute_id = src.attribute_id::INT
any_value(commodity_code) as commodity_code, ), metadata_and_deduplication AS (
any_value(commodity_name) as commodity_name, SELECT
any_value(country_code) as country_code, ANY_VALUE(commodity_code) AS commodity_code,
any_value(country_name) as country_name, ANY_VALUE(commodity_name) AS commodity_name,
any_value(market_year) as market_year, ANY_VALUE(country_code) AS country_code,
any_value(calendar_year) as calendar_year, ANY_VALUE(country_name) AS country_name,
any_value(month) as month, ANY_VALUE(market_year) AS market_year,
any_value(attribute_id) as attribute_id, ANY_VALUE(calendar_year) AS calendar_year,
any_value(attribute_name) as attribute_name, ANY_VALUE(month) AS month,
any_value(unit_id) as unit_id, ANY_VALUE(attribute_id) AS attribute_id,
any_value(unit_name) as unit_name, ANY_VALUE(attribute_name) AS attribute_name,
any_value(value) as value, ANY_VALUE(unit_id) AS unit_id,
hash(commodity_code, commodity_name, country_code, country_name, market_year, calendar_year, month, attribute_id, attribute_name, unit_id, unit_name, value) as hkey, ANY_VALUE(unit_name) AS unit_name,
any_value(make_date(split(filename, '/')[-3]::int, split(filename, '/')[-2]::int, 1)) as ingest_date, ANY_VALUE(value) AS value,
any_value(if(month!=0,last_day(make_date(market_year, month, 1)),null)) as market_date_month_end, HASH(
from cast_dtypes commodity_code,
group by hkey commodity_name,
country_code,
country_name,
market_year,
calendar_year,
month,
attribute_id,
attribute_name,
unit_id,
unit_name,
value
) AS hkey,
ANY_VALUE(
MAKE_DATE(STR_SPLIT(filename, '/')[-3]::INT, STR_SPLIT(filename, '/')[-2]::INT, 1)
) AS ingest_date,
ANY_VALUE(
CASE WHEN month <> 0 THEN LAST_DAY(MAKE_DATE(market_year, month, 1)) ELSE NULL END
) AS market_date_month_end
FROM cast_dtypes
GROUP BY
hkey
) )
select hkey, SELECT
hkey,
commodity_code, commodity_code,
commodity_name, commodity_name,
country_code, country_code,
@@ -67,6 +89,7 @@ select hkey,
unit_id, unit_id,
unit_name, unit_name,
value, value,
ingest_date, ingest_date
from metadata_and_deduplication FROM metadata_and_deduplication
where ingest_date between @start_ds and @end_ds; WHERE
ingest_date BETWEEN @start_ds AND @end_ds

View File

@@ -0,0 +1,9 @@
location_id;name;country;lat;lon;variety
brazil_minas_gerais;Minas Gerais;BR;-19.9167;-43.9345;Arabica
brazil_parana;Paraná;BR;-23.4205;-51.9330;Arabica
vietnam_highlands;Central Highlands;VN;12.6667;108.0500;Robusta
colombia_huila;Huila;CO;2.5359;-75.5277;Arabica
ethiopia_sidama;Sidama;ET;6.7612;38.4721;Arabica
honduras_copan;Copán;HN;14.8333;-89.1500;Arabica
guatemala_antigua;Antigua;GT;14.5586;-90.7295;Arabica
indonesia_sumatra;Sumatra;ID;3.5952;98.6722;Robusta
1 location_id name country lat lon variety
2 brazil_minas_gerais Minas Gerais BR -19.9167 -43.9345 Arabica
3 brazil_parana Paraná BR -23.4205 -51.9330 Arabica
4 vietnam_highlands Central Highlands VN 12.6667 108.0500 Robusta
5 colombia_huila Huila CO 2.5359 -75.5277 Arabica
6 ethiopia_sidama Sidama ET 6.7612 38.4721 Arabica
7 honduras_copan Copán HN 14.8333 -89.1500 Arabica
8 guatemala_antigua Antigua GT 14.5586 -90.7295 Arabica
9 indonesia_sumatra Sumatra ID 3.5952 98.6722 Robusta

16
uv.lock generated
View File

@@ -14,6 +14,7 @@ members = [
"extract-core", "extract-core",
"ice-stocks", "ice-stocks",
"materia", "materia",
"openweathermap",
"psdonline", "psdonline",
"sqlmesh-materia", "sqlmesh-materia",
] ]
@@ -1778,6 +1779,21 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/cf/df/d3f1ddf4bb4cb50ed9b1139cc7b1c54c34a1e7ce8fd1b9a37c0d1551a6bd/opentelemetry_api-1.39.1-py3-none-any.whl", hash = "sha256:2edd8463432a7f8443edce90972169b195e7d6a05500cd29e6d13898187c9950", size = 66356, upload-time = "2025-12-11T13:32:17.304Z" }, { url = "https://files.pythonhosted.org/packages/cf/df/d3f1ddf4bb4cb50ed9b1139cc7b1c54c34a1e7ce8fd1b9a37c0d1551a6bd/opentelemetry_api-1.39.1-py3-none-any.whl", hash = "sha256:2edd8463432a7f8443edce90972169b195e7d6a05500cd29e6d13898187c9950", size = 66356, upload-time = "2025-12-11T13:32:17.304Z" },
] ]
[[package]]
name = "openweathermap"
version = "0.1.0"
source = { editable = "extract/openweathermap" }
dependencies = [
{ name = "extract-core" },
{ name = "niquests" },
]
[package.metadata]
requires-dist = [
{ name = "extract-core", editable = "extract/extract_core" },
{ name = "niquests", specifier = ">=3.14.1" },
]
[[package]] [[package]]
name = "orjson" name = "orjson"
version = "3.11.7" version = "3.11.7"