diff --git a/transform/sqlmesh_materia/config.yaml b/transform/sqlmesh_materia/config.yaml index e59aacd..ca9bb83 100644 --- a/transform/sqlmesh_materia/config.yaml +++ b/transform/sqlmesh_materia/config.yaml @@ -29,7 +29,9 @@ model_defaults: linter: enabled: true rules: - - ambiguousorinvalidcolumn + # ambiguousorinvalidcolumn removed: sqlglot cannot introspect read_csv() TVF + # schemas at lint time, causing false positives on all raw models. Cross-model + # column validation is handled by SQLMesh at plan time via columns() declarations. - invalidselectstarexpansion # --- Default Target Environment --- diff --git a/transform/sqlmesh_materia/models/foundation/dim_commodity.sql b/transform/sqlmesh_materia/models/foundation/dim_commodity.sql index 0bfcd78..c6747fa 100644 --- a/transform/sqlmesh_materia/models/foundation/dim_commodity.sql +++ b/transform/sqlmesh_materia/models/foundation/dim_commodity.sql @@ -1,24 +1,23 @@ -- Commodity dimension: conforms identifiers across source systems. -- --- This is the ontology seed. Each row is a commodity tracked by BeanFlows. +-- This is the ontology. Each row is a commodity tracked by BeanFlows. -- As new sources are added (ICO, futures prices, satellite), their -- commodity identifiers are added as columns here — not as separate tables. -- As new commodities are added (cocoa, sugar), rows are added here. -- -- References: --- usda_commodity_code → raw.psd_alldata.commodity_code --- cftc_commodity_code → raw.cot_disaggregated.cftc_commodity_code +-- usda_commodity_code → raw.psd_alldata.commodity_code (numeric string, e.g. '0711100') +-- cftc_commodity_code → raw.cot_disaggregated.cftc_commodity_code (3-char, e.g. '083') +-- +-- NOTE: Defined as FULL model (not SEED) to guarantee leading-zero preservation. +-- Pandas CSV loading converts '083' → 83 even with varchar column declarations. MODEL ( name foundation.dim_commodity, - kind SEED ( - path '$root/seeds/dim_commodity.csv', - csv_settings (delimiter = ';') - ), - columns ( - usda_commodity_code varchar, - cftc_commodity_code varchar, - commodity_name varchar, - commodity_group varchar - ) + kind FULL ); + +SELECT usda_commodity_code, cftc_commodity_code, commodity_name, commodity_group +FROM (VALUES + ('0711100', '083', 'Coffee, Green', 'Softs') +) AS t(usda_commodity_code, cftc_commodity_code, commodity_name, commodity_group) diff --git a/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql b/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql index 412b94a..7c140d0 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql @@ -29,63 +29,64 @@ WITH cast_and_clean AS ( trim(contract_units) AS contract_units, -- Open interest - open_interest_all::int AS open_interest, + -- CFTC uses '.' as null for any field — use TRY_CAST throughout + TRY_CAST(open_interest_all AS int) AS open_interest, -- Producer / Merchant (commercial hedgers: exporters, processors) - prod_merc_positions_long_all::int AS prod_merc_long, - prod_merc_positions_short_all::int AS prod_merc_short, + TRY_CAST(prod_merc_positions_long_all AS int) AS prod_merc_long, + TRY_CAST(prod_merc_positions_short_all AS int) AS prod_merc_short, -- Swap dealers - swap_positions_long_all::int AS swap_long, - swap_positions_short_all::int AS swap_short, - swap_positions_spread_all::int AS swap_spread, + TRY_CAST(swap_positions_long_all AS int) AS swap_long, + TRY_CAST(swap_positions_short_all AS int) AS swap_short, + TRY_CAST(swap_positions_spread_all AS int) AS swap_spread, -- Managed money (hedge funds, CTAs — the primary speculative signal) - m_money_positions_long_all::int AS managed_money_long, - m_money_positions_short_all::int AS managed_money_short, - m_money_positions_spread_all::int AS managed_money_spread, + TRY_CAST(m_money_positions_long_all AS int) AS managed_money_long, + TRY_CAST(m_money_positions_short_all AS int) AS managed_money_short, + TRY_CAST(m_money_positions_spread_all AS int) AS managed_money_spread, -- Other reportables - other_rept_positions_long_all::int AS other_reportable_long, - other_rept_positions_short_all::int AS other_reportable_short, - other_rept_positions_spread_all::int AS other_reportable_spread, + TRY_CAST(other_rept_positions_long_all AS int) AS other_reportable_long, + TRY_CAST(other_rept_positions_short_all AS int) AS other_reportable_short, + TRY_CAST(other_rept_positions_spread_all AS int) AS other_reportable_spread, -- Non-reportable (small speculators, below reporting threshold) - nonrept_positions_long_all::int AS nonreportable_long, - nonrept_positions_short_all::int AS nonreportable_short, + TRY_CAST(nonrept_positions_long_all AS int) AS nonreportable_long, + TRY_CAST(nonrept_positions_short_all AS int) AS nonreportable_short, -- Net positions (long minus short per category) - prod_merc_positions_long_all::int - - prod_merc_positions_short_all::int AS prod_merc_net, - m_money_positions_long_all::int - - m_money_positions_short_all::int AS managed_money_net, - swap_positions_long_all::int - - swap_positions_short_all::int AS swap_net, - other_rept_positions_long_all::int - - other_rept_positions_short_all::int AS other_reportable_net, - nonrept_positions_long_all::int - - nonrept_positions_short_all::int AS nonreportable_net, + TRY_CAST(prod_merc_positions_long_all AS int) + - TRY_CAST(prod_merc_positions_short_all AS int) AS prod_merc_net, + TRY_CAST(m_money_positions_long_all AS int) + - TRY_CAST(m_money_positions_short_all AS int) AS managed_money_net, + TRY_CAST(swap_positions_long_all AS int) + - TRY_CAST(swap_positions_short_all AS int) AS swap_net, + TRY_CAST(other_rept_positions_long_all AS int) + - TRY_CAST(other_rept_positions_short_all AS int) AS other_reportable_net, + TRY_CAST(nonrept_positions_long_all AS int) + - TRY_CAST(nonrept_positions_short_all AS int) AS nonreportable_net, -- Week-over-week changes - change_in_open_interest_all::int AS change_open_interest, - change_in_m_money_long_all::int AS change_managed_money_long, - change_in_m_money_short_all::int AS change_managed_money_short, - change_in_m_money_long_all::int - - change_in_m_money_short_all::int AS change_managed_money_net, - change_in_prod_merc_long_all::int AS change_prod_merc_long, - change_in_prod_merc_short_all::int AS change_prod_merc_short, + TRY_CAST(change_in_open_interest_all AS int) AS change_open_interest, + TRY_CAST(change_in_m_money_long_all AS int) AS change_managed_money_long, + TRY_CAST(change_in_m_money_short_all AS int) AS change_managed_money_short, + TRY_CAST(change_in_m_money_long_all AS int) + - TRY_CAST(change_in_m_money_short_all AS int) AS change_managed_money_net, + TRY_CAST(change_in_prod_merc_long_all AS int) AS change_prod_merc_long, + TRY_CAST(change_in_prod_merc_short_all AS int) AS change_prod_merc_short, -- Concentration ratios (% of OI held by top 4 / top 8 traders) - conc_gross_le_4_tdr_long_all::float AS concentration_top4_long_pct, - conc_gross_le_4_tdr_short_all::float AS concentration_top4_short_pct, - conc_gross_le_8_tdr_long_all::float AS concentration_top8_long_pct, - conc_gross_le_8_tdr_short_all::float AS concentration_top8_short_pct, + TRY_CAST(conc_gross_le_4_tdr_long_all AS float) AS concentration_top4_long_pct, + TRY_CAST(conc_gross_le_4_tdr_short_all AS float) AS concentration_top4_short_pct, + TRY_CAST(conc_gross_le_8_tdr_long_all AS float) AS concentration_top8_long_pct, + TRY_CAST(conc_gross_le_8_tdr_short_all AS float) AS concentration_top8_short_pct, -- Trader counts - traders_tot_all::int AS traders_total, - traders_m_money_long_all::int AS traders_managed_money_long, - traders_m_money_short_all::int AS traders_managed_money_short, - traders_m_money_spread_all::int AS traders_managed_money_spread, + TRY_CAST(traders_tot_all AS int) AS traders_total, + TRY_CAST(traders_m_money_long_all AS int) AS traders_managed_money_long, + TRY_CAST(traders_m_money_short_all AS int) AS traders_managed_money_short, + TRY_CAST(traders_m_money_spread_all AS int) AS traders_managed_money_spread, -- Ingest date: derived from landing path year directory -- Path: .../cot/{year}/{etag}.csv.gzip → extract year from [-2] diff --git a/transform/sqlmesh_materia/models/raw/cot_disaggregated.sql b/transform/sqlmesh_materia/models/raw/cot_disaggregated.sql index 9191eda..809b228 100644 --- a/transform/sqlmesh_materia/models/raw/cot_disaggregated.sql +++ b/transform/sqlmesh_materia/models/raw/cot_disaggregated.sql @@ -13,7 +13,42 @@ MODEL ( kind FULL, grain (cftc_commodity_code, report_date_as_yyyy_mm_dd, cftc_contract_market_code), start '2006-06-13', - cron '@daily' + cron '@daily', + columns ( + market_and_exchange_names varchar, + report_date_as_yyyy_mm_dd varchar, + cftc_commodity_code varchar, + cftc_contract_market_code varchar, + contract_units varchar, + open_interest_all varchar, + prod_merc_positions_long_all varchar, + prod_merc_positions_short_all varchar, + swap_positions_long_all varchar, + swap_positions_short_all varchar, + swap_positions_spread_all varchar, + m_money_positions_long_all varchar, + m_money_positions_short_all varchar, + m_money_positions_spread_all varchar, + other_rept_positions_long_all varchar, + other_rept_positions_short_all varchar, + other_rept_positions_spread_all varchar, + nonrept_positions_long_all varchar, + nonrept_positions_short_all varchar, + change_in_open_interest_all varchar, + change_in_m_money_long_all varchar, + change_in_m_money_short_all varchar, + change_in_prod_merc_long_all varchar, + change_in_prod_merc_short_all varchar, + conc_gross_le_4_tdr_long_all varchar, + conc_gross_le_4_tdr_short_all varchar, + conc_gross_le_8_tdr_long_all varchar, + conc_gross_le_8_tdr_short_all varchar, + traders_tot_all varchar, + traders_m_money_long_all varchar, + traders_m_money_short_all varchar, + traders_m_money_spread_all varchar, + filename varchar + ) ); SELECT diff --git a/transform/sqlmesh_materia/seeds/dim_commodity.csv b/transform/sqlmesh_materia/seeds/dim_commodity.csv index 4189793..4eb57b0 100644 --- a/transform/sqlmesh_materia/seeds/dim_commodity.csv +++ b/transform/sqlmesh_materia/seeds/dim_commodity.csv @@ -1,2 +1,2 @@ usda_commodity_code;cftc_commodity_code;commodity_name;commodity_group -0711100;083731;Coffee, Green;Softs +"0711100";"083";"Coffee, Green";"Softs" diff --git a/transform/sqlmesh_materia/tests/test_cot_foundation.yaml b/transform/sqlmesh_materia/tests/test_cot_foundation.yaml index 0fd6cf9..654533b 100644 --- a/transform/sqlmesh_materia/tests/test_cot_foundation.yaml +++ b/transform/sqlmesh_materia/tests/test_cot_foundation.yaml @@ -1,11 +1,14 @@ test_fct_cot_positioning_types_and_net_positions: model: foundation.fct_cot_positioning + vars: + start: "2024-01-01" + end: "2024-01-31" inputs: raw.cot_disaggregated: rows: - market_and_exchange_names: "COFFEE C - ICE FUTURES U.S." report_date_as_yyyy_mm_dd: "2024-01-02" - cftc_commodity_code: "083731" + cftc_commodity_code: "083" cftc_contract_market_code: "083731" contract_units: "37,500 POUNDS" open_interest_all: "250000" @@ -36,29 +39,34 @@ test_fct_cot_positioning_types_and_net_positions: traders_m_money_short_all: "62" traders_m_money_spread_all: "20" filename: "data/landing/cot/2024/abc123.csv.gzip" - expected: - rows: - - report_date: "2024-01-02" - cftc_commodity_code: "083731" - open_interest: 250000 - managed_money_long: 60000 - managed_money_short: 40000 - managed_money_net: 20000 - prod_merc_long: 80000 - prod_merc_short: 90000 - prod_merc_net: -10000 - swap_long: 30000 - swap_short: 35000 - swap_net: -5000 - nonreportable_long: 12000 - nonreportable_short: 14000 - nonreportable_net: -2000 - change_managed_money_net: 3000 - traders_managed_money_long: 85 - traders_managed_money_short: 62 + outputs: + partial: true + query: + rows: + - report_date: "2024-01-02" + cftc_commodity_code: "083" + open_interest: 250000 + managed_money_long: 60000 + managed_money_short: 40000 + managed_money_net: 20000 + prod_merc_long: 80000 + prod_merc_short: 90000 + prod_merc_net: -10000 + swap_long: 30000 + swap_short: 35000 + swap_net: -5000 + nonreportable_long: 12000 + nonreportable_short: 14000 + nonreportable_net: -2000 + change_managed_money_net: 3000 + traders_managed_money_long: 85 + traders_managed_money_short: 62 test_fct_cot_positioning_rejects_null_commodity: model: foundation.fct_cot_positioning + vars: + start: "2024-01-01" + end: "2024-01-31" inputs: raw.cot_disaggregated: rows: @@ -95,5 +103,6 @@ test_fct_cot_positioning_rejects_null_commodity: traders_m_money_short_all: "0" traders_m_money_spread_all: "0" filename: "data/landing/cot/2024/abc123.csv.gzip" - expected: - rows: [] + outputs: + query: + rows: [] diff --git a/web/src/beanflows/analytics.py b/web/src/beanflows/analytics.py index 17d6104..be11b3e 100644 --- a/web/src/beanflows/analytics.py +++ b/web/src/beanflows/analytics.py @@ -12,8 +12,8 @@ import duckdb # Coffee (Green) commodity code in USDA PSD COFFEE_COMMODITY_CODE = 711100 -# Coffee C futures commodity code in CFTC COT reports -COFFEE_CFTC_CODE = "083731" +# Coffee C futures commodity code in CFTC COT reports (3-digit CFTC commodity code) +COFFEE_CFTC_CODE = "083" # Metrics safe for user-facing queries (prevents SQL injection in dynamic column refs) ALLOWED_METRICS = frozenset({