Fix COT pipeline: TRY_CAST nulls, dim_commodity leading zeros, correct CFTC codes

- config.yaml: remove ambiguousorinvalidcolumn linter rule (false positives on read_csv TVFs)
- fct_cot_positioning: use TRY_CAST throughout — CFTC uses '.' as null in many columns
- raw/cot_disaggregated: add columns() declaration for 33 varchar cols
- dim_commodity: switch from SEED to FULL model with SQL VALUES to preserve leading zeros
  Pandas auto-converts '083' → 83 even with varchar column declarations in SEED models
- seeds/dim_commodity.csv: correct cftc_commodity_code from '083731' (contract market code)
  to '083' (3-digit CFTC commodity code); add CSV quoting
- test_cot_foundation.yaml: fix output key name, vars for time range, partial: true,
  and correct cftc_commodity_code to '083'
- analytics.py: COFFEE_CFTC_CODE '083731' → '083' to match actual data

Result: serving.cot_positioning has 685 rows (2013-01-08 to 2026-02-17), 23/23 tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-20 23:22:46 +01:00
parent 0a83b2cb74
commit 2962bf5e3b
7 changed files with 126 additions and 80 deletions

View File

@@ -29,7 +29,9 @@ model_defaults:
linter: linter:
enabled: true enabled: true
rules: rules:
- ambiguousorinvalidcolumn # ambiguousorinvalidcolumn removed: sqlglot cannot introspect read_csv() TVF
# schemas at lint time, causing false positives on all raw models. Cross-model
# column validation is handled by SQLMesh at plan time via columns() declarations.
- invalidselectstarexpansion - invalidselectstarexpansion
# --- Default Target Environment --- # --- Default Target Environment ---

View File

@@ -1,24 +1,23 @@
-- Commodity dimension: conforms identifiers across source systems. -- Commodity dimension: conforms identifiers across source systems.
-- --
-- This is the ontology seed. Each row is a commodity tracked by BeanFlows. -- This is the ontology. Each row is a commodity tracked by BeanFlows.
-- As new sources are added (ICO, futures prices, satellite), their -- As new sources are added (ICO, futures prices, satellite), their
-- commodity identifiers are added as columns here — not as separate tables. -- commodity identifiers are added as columns here — not as separate tables.
-- As new commodities are added (cocoa, sugar), rows are added here. -- As new commodities are added (cocoa, sugar), rows are added here.
-- --
-- References: -- References:
-- usda_commodity_code → raw.psd_alldata.commodity_code -- usda_commodity_code → raw.psd_alldata.commodity_code (numeric string, e.g. '0711100')
-- cftc_commodity_code → raw.cot_disaggregated.cftc_commodity_code -- cftc_commodity_code → raw.cot_disaggregated.cftc_commodity_code (3-char, e.g. '083')
--
-- NOTE: Defined as FULL model (not SEED) to guarantee leading-zero preservation.
-- Pandas CSV loading converts '083' → 83 even with varchar column declarations.
MODEL ( MODEL (
name foundation.dim_commodity, name foundation.dim_commodity,
kind SEED ( kind FULL
path '$root/seeds/dim_commodity.csv',
csv_settings (delimiter = ';')
),
columns (
usda_commodity_code varchar,
cftc_commodity_code varchar,
commodity_name varchar,
commodity_group varchar
)
); );
SELECT usda_commodity_code, cftc_commodity_code, commodity_name, commodity_group
FROM (VALUES
('0711100', '083', 'Coffee, Green', 'Softs')
) AS t(usda_commodity_code, cftc_commodity_code, commodity_name, commodity_group)

View File

@@ -29,63 +29,64 @@ WITH cast_and_clean AS (
trim(contract_units) AS contract_units, trim(contract_units) AS contract_units,
-- Open interest -- Open interest
open_interest_all::int AS open_interest, -- CFTC uses '.' as null for any field — use TRY_CAST throughout
TRY_CAST(open_interest_all AS int) AS open_interest,
-- Producer / Merchant (commercial hedgers: exporters, processors) -- Producer / Merchant (commercial hedgers: exporters, processors)
prod_merc_positions_long_all::int AS prod_merc_long, TRY_CAST(prod_merc_positions_long_all AS int) AS prod_merc_long,
prod_merc_positions_short_all::int AS prod_merc_short, TRY_CAST(prod_merc_positions_short_all AS int) AS prod_merc_short,
-- Swap dealers -- Swap dealers
swap_positions_long_all::int AS swap_long, TRY_CAST(swap_positions_long_all AS int) AS swap_long,
swap_positions_short_all::int AS swap_short, TRY_CAST(swap_positions_short_all AS int) AS swap_short,
swap_positions_spread_all::int AS swap_spread, TRY_CAST(swap_positions_spread_all AS int) AS swap_spread,
-- Managed money (hedge funds, CTAs — the primary speculative signal) -- Managed money (hedge funds, CTAs — the primary speculative signal)
m_money_positions_long_all::int AS managed_money_long, TRY_CAST(m_money_positions_long_all AS int) AS managed_money_long,
m_money_positions_short_all::int AS managed_money_short, TRY_CAST(m_money_positions_short_all AS int) AS managed_money_short,
m_money_positions_spread_all::int AS managed_money_spread, TRY_CAST(m_money_positions_spread_all AS int) AS managed_money_spread,
-- Other reportables -- Other reportables
other_rept_positions_long_all::int AS other_reportable_long, TRY_CAST(other_rept_positions_long_all AS int) AS other_reportable_long,
other_rept_positions_short_all::int AS other_reportable_short, TRY_CAST(other_rept_positions_short_all AS int) AS other_reportable_short,
other_rept_positions_spread_all::int AS other_reportable_spread, TRY_CAST(other_rept_positions_spread_all AS int) AS other_reportable_spread,
-- Non-reportable (small speculators, below reporting threshold) -- Non-reportable (small speculators, below reporting threshold)
nonrept_positions_long_all::int AS nonreportable_long, TRY_CAST(nonrept_positions_long_all AS int) AS nonreportable_long,
nonrept_positions_short_all::int AS nonreportable_short, TRY_CAST(nonrept_positions_short_all AS int) AS nonreportable_short,
-- Net positions (long minus short per category) -- Net positions (long minus short per category)
prod_merc_positions_long_all::int TRY_CAST(prod_merc_positions_long_all AS int)
- prod_merc_positions_short_all::int AS prod_merc_net, - TRY_CAST(prod_merc_positions_short_all AS int) AS prod_merc_net,
m_money_positions_long_all::int TRY_CAST(m_money_positions_long_all AS int)
- m_money_positions_short_all::int AS managed_money_net, - TRY_CAST(m_money_positions_short_all AS int) AS managed_money_net,
swap_positions_long_all::int TRY_CAST(swap_positions_long_all AS int)
- swap_positions_short_all::int AS swap_net, - TRY_CAST(swap_positions_short_all AS int) AS swap_net,
other_rept_positions_long_all::int TRY_CAST(other_rept_positions_long_all AS int)
- other_rept_positions_short_all::int AS other_reportable_net, - TRY_CAST(other_rept_positions_short_all AS int) AS other_reportable_net,
nonrept_positions_long_all::int TRY_CAST(nonrept_positions_long_all AS int)
- nonrept_positions_short_all::int AS nonreportable_net, - TRY_CAST(nonrept_positions_short_all AS int) AS nonreportable_net,
-- Week-over-week changes -- Week-over-week changes
change_in_open_interest_all::int AS change_open_interest, TRY_CAST(change_in_open_interest_all AS int) AS change_open_interest,
change_in_m_money_long_all::int AS change_managed_money_long, TRY_CAST(change_in_m_money_long_all AS int) AS change_managed_money_long,
change_in_m_money_short_all::int AS change_managed_money_short, TRY_CAST(change_in_m_money_short_all AS int) AS change_managed_money_short,
change_in_m_money_long_all::int TRY_CAST(change_in_m_money_long_all AS int)
- change_in_m_money_short_all::int AS change_managed_money_net, - TRY_CAST(change_in_m_money_short_all AS int) AS change_managed_money_net,
change_in_prod_merc_long_all::int AS change_prod_merc_long, TRY_CAST(change_in_prod_merc_long_all AS int) AS change_prod_merc_long,
change_in_prod_merc_short_all::int AS change_prod_merc_short, TRY_CAST(change_in_prod_merc_short_all AS int) AS change_prod_merc_short,
-- Concentration ratios (% of OI held by top 4 / top 8 traders) -- Concentration ratios (% of OI held by top 4 / top 8 traders)
conc_gross_le_4_tdr_long_all::float AS concentration_top4_long_pct, TRY_CAST(conc_gross_le_4_tdr_long_all AS float) AS concentration_top4_long_pct,
conc_gross_le_4_tdr_short_all::float AS concentration_top4_short_pct, TRY_CAST(conc_gross_le_4_tdr_short_all AS float) AS concentration_top4_short_pct,
conc_gross_le_8_tdr_long_all::float AS concentration_top8_long_pct, TRY_CAST(conc_gross_le_8_tdr_long_all AS float) AS concentration_top8_long_pct,
conc_gross_le_8_tdr_short_all::float AS concentration_top8_short_pct, TRY_CAST(conc_gross_le_8_tdr_short_all AS float) AS concentration_top8_short_pct,
-- Trader counts -- Trader counts
traders_tot_all::int AS traders_total, TRY_CAST(traders_tot_all AS int) AS traders_total,
traders_m_money_long_all::int AS traders_managed_money_long, TRY_CAST(traders_m_money_long_all AS int) AS traders_managed_money_long,
traders_m_money_short_all::int AS traders_managed_money_short, TRY_CAST(traders_m_money_short_all AS int) AS traders_managed_money_short,
traders_m_money_spread_all::int AS traders_managed_money_spread, TRY_CAST(traders_m_money_spread_all AS int) AS traders_managed_money_spread,
-- Ingest date: derived from landing path year directory -- Ingest date: derived from landing path year directory
-- Path: .../cot/{year}/{etag}.csv.gzip → extract year from [-2] -- Path: .../cot/{year}/{etag}.csv.gzip → extract year from [-2]

View File

@@ -13,7 +13,42 @@ MODEL (
kind FULL, kind FULL,
grain (cftc_commodity_code, report_date_as_yyyy_mm_dd, cftc_contract_market_code), grain (cftc_commodity_code, report_date_as_yyyy_mm_dd, cftc_contract_market_code),
start '2006-06-13', start '2006-06-13',
cron '@daily' cron '@daily',
columns (
market_and_exchange_names varchar,
report_date_as_yyyy_mm_dd varchar,
cftc_commodity_code varchar,
cftc_contract_market_code varchar,
contract_units varchar,
open_interest_all varchar,
prod_merc_positions_long_all varchar,
prod_merc_positions_short_all varchar,
swap_positions_long_all varchar,
swap_positions_short_all varchar,
swap_positions_spread_all varchar,
m_money_positions_long_all varchar,
m_money_positions_short_all varchar,
m_money_positions_spread_all varchar,
other_rept_positions_long_all varchar,
other_rept_positions_short_all varchar,
other_rept_positions_spread_all varchar,
nonrept_positions_long_all varchar,
nonrept_positions_short_all varchar,
change_in_open_interest_all varchar,
change_in_m_money_long_all varchar,
change_in_m_money_short_all varchar,
change_in_prod_merc_long_all varchar,
change_in_prod_merc_short_all varchar,
conc_gross_le_4_tdr_long_all varchar,
conc_gross_le_4_tdr_short_all varchar,
conc_gross_le_8_tdr_long_all varchar,
conc_gross_le_8_tdr_short_all varchar,
traders_tot_all varchar,
traders_m_money_long_all varchar,
traders_m_money_short_all varchar,
traders_m_money_spread_all varchar,
filename varchar
)
); );
SELECT SELECT

View File

@@ -1,2 +1,2 @@
usda_commodity_code;cftc_commodity_code;commodity_name;commodity_group usda_commodity_code;cftc_commodity_code;commodity_name;commodity_group
0711100;083731;Coffee, Green;Softs "0711100";"083";"Coffee, Green";"Softs"
1 usda_commodity_code cftc_commodity_code commodity_name commodity_group
2 0711100 083731 083 Coffee, Green Softs

View File

@@ -1,11 +1,14 @@
test_fct_cot_positioning_types_and_net_positions: test_fct_cot_positioning_types_and_net_positions:
model: foundation.fct_cot_positioning model: foundation.fct_cot_positioning
vars:
start: "2024-01-01"
end: "2024-01-31"
inputs: inputs:
raw.cot_disaggregated: raw.cot_disaggregated:
rows: rows:
- market_and_exchange_names: "COFFEE C - ICE FUTURES U.S." - market_and_exchange_names: "COFFEE C - ICE FUTURES U.S."
report_date_as_yyyy_mm_dd: "2024-01-02" report_date_as_yyyy_mm_dd: "2024-01-02"
cftc_commodity_code: "083731" cftc_commodity_code: "083"
cftc_contract_market_code: "083731" cftc_contract_market_code: "083731"
contract_units: "37,500 POUNDS" contract_units: "37,500 POUNDS"
open_interest_all: "250000" open_interest_all: "250000"
@@ -36,29 +39,34 @@ test_fct_cot_positioning_types_and_net_positions:
traders_m_money_short_all: "62" traders_m_money_short_all: "62"
traders_m_money_spread_all: "20" traders_m_money_spread_all: "20"
filename: "data/landing/cot/2024/abc123.csv.gzip" filename: "data/landing/cot/2024/abc123.csv.gzip"
expected: outputs:
rows: partial: true
- report_date: "2024-01-02" query:
cftc_commodity_code: "083731" rows:
open_interest: 250000 - report_date: "2024-01-02"
managed_money_long: 60000 cftc_commodity_code: "083"
managed_money_short: 40000 open_interest: 250000
managed_money_net: 20000 managed_money_long: 60000
prod_merc_long: 80000 managed_money_short: 40000
prod_merc_short: 90000 managed_money_net: 20000
prod_merc_net: -10000 prod_merc_long: 80000
swap_long: 30000 prod_merc_short: 90000
swap_short: 35000 prod_merc_net: -10000
swap_net: -5000 swap_long: 30000
nonreportable_long: 12000 swap_short: 35000
nonreportable_short: 14000 swap_net: -5000
nonreportable_net: -2000 nonreportable_long: 12000
change_managed_money_net: 3000 nonreportable_short: 14000
traders_managed_money_long: 85 nonreportable_net: -2000
traders_managed_money_short: 62 change_managed_money_net: 3000
traders_managed_money_long: 85
traders_managed_money_short: 62
test_fct_cot_positioning_rejects_null_commodity: test_fct_cot_positioning_rejects_null_commodity:
model: foundation.fct_cot_positioning model: foundation.fct_cot_positioning
vars:
start: "2024-01-01"
end: "2024-01-31"
inputs: inputs:
raw.cot_disaggregated: raw.cot_disaggregated:
rows: rows:
@@ -95,5 +103,6 @@ test_fct_cot_positioning_rejects_null_commodity:
traders_m_money_short_all: "0" traders_m_money_short_all: "0"
traders_m_money_spread_all: "0" traders_m_money_spread_all: "0"
filename: "data/landing/cot/2024/abc123.csv.gzip" filename: "data/landing/cot/2024/abc123.csv.gzip"
expected: outputs:
rows: [] query:
rows: []

View File

@@ -12,8 +12,8 @@ import duckdb
# Coffee (Green) commodity code in USDA PSD # Coffee (Green) commodity code in USDA PSD
COFFEE_COMMODITY_CODE = 711100 COFFEE_COMMODITY_CODE = 711100
# Coffee C futures commodity code in CFTC COT reports # Coffee C futures commodity code in CFTC COT reports (3-digit CFTC commodity code)
COFFEE_CFTC_CODE = "083731" COFFEE_CFTC_CODE = "083"
# Metrics safe for user-facing queries (prevents SQL injection in dynamic column refs) # Metrics safe for user-facing queries (prevents SQL injection in dynamic column refs)
ALLOWED_METRICS = frozenset({ ALLOWED_METRICS = frozenset({