-- Foundation fact: CFTC COT positioning, weekly grain, all commodities. -- -- Casts raw varchar columns to proper types, cleans column names, -- computes net positions (long - short) per trader category, and -- deduplicates via hash key. Covers all commodities — filtering to -- a specific commodity happens in the serving layer. -- -- Grain: one row per (cftc_commodity_code, report_date, cftc_contract_market_code) -- History: revisions appear as new rows with a later ingest_date. -- Serving layer picks max(ingest_date) per grain for latest view. MODEL ( name foundation.fct_cot_positioning, kind INCREMENTAL_BY_TIME_RANGE ( time_column report_date ), grain (cftc_commodity_code, report_date, cftc_contract_market_code, ingest_date), start '2006-06-13', cron '@daily' ); WITH cast_and_clean AS ( SELECT -- Identifiers trim(market_and_exchange_names) AS market_and_exchange_name, report_date_as_yyyy_mm_dd::date AS report_date, trim(cftc_commodity_code) AS cftc_commodity_code, trim(cftc_contract_market_code) AS cftc_contract_market_code, trim(contract_units) AS contract_units, -- Open interest -- CFTC uses '.' as null for any field — use TRY_CAST throughout TRY_CAST(open_interest_all AS int) AS open_interest, -- Producer / Merchant (commercial hedgers: exporters, processors) TRY_CAST(prod_merc_positions_long_all AS int) AS prod_merc_long, TRY_CAST(prod_merc_positions_short_all AS int) AS prod_merc_short, -- Swap dealers TRY_CAST(swap_positions_long_all AS int) AS swap_long, TRY_CAST(swap_positions_short_all AS int) AS swap_short, TRY_CAST(swap_positions_spread_all AS int) AS swap_spread, -- Managed money (hedge funds, CTAs — the primary speculative signal) TRY_CAST(m_money_positions_long_all AS int) AS managed_money_long, TRY_CAST(m_money_positions_short_all AS int) AS managed_money_short, TRY_CAST(m_money_positions_spread_all AS int) AS managed_money_spread, -- Other reportables TRY_CAST(other_rept_positions_long_all AS int) AS other_reportable_long, TRY_CAST(other_rept_positions_short_all AS int) AS other_reportable_short, TRY_CAST(other_rept_positions_spread_all AS int) AS other_reportable_spread, -- Non-reportable (small speculators, below reporting threshold) TRY_CAST(nonrept_positions_long_all AS int) AS nonreportable_long, TRY_CAST(nonrept_positions_short_all AS int) AS nonreportable_short, -- Net positions (long minus short per category) TRY_CAST(prod_merc_positions_long_all AS int) - TRY_CAST(prod_merc_positions_short_all AS int) AS prod_merc_net, TRY_CAST(m_money_positions_long_all AS int) - TRY_CAST(m_money_positions_short_all AS int) AS managed_money_net, TRY_CAST(swap_positions_long_all AS int) - TRY_CAST(swap_positions_short_all AS int) AS swap_net, TRY_CAST(other_rept_positions_long_all AS int) - TRY_CAST(other_rept_positions_short_all AS int) AS other_reportable_net, TRY_CAST(nonrept_positions_long_all AS int) - TRY_CAST(nonrept_positions_short_all AS int) AS nonreportable_net, -- Week-over-week changes TRY_CAST(change_in_open_interest_all AS int) AS change_open_interest, TRY_CAST(change_in_m_money_long_all AS int) AS change_managed_money_long, TRY_CAST(change_in_m_money_short_all AS int) AS change_managed_money_short, TRY_CAST(change_in_m_money_long_all AS int) - TRY_CAST(change_in_m_money_short_all AS int) AS change_managed_money_net, TRY_CAST(change_in_prod_merc_long_all AS int) AS change_prod_merc_long, TRY_CAST(change_in_prod_merc_short_all AS int) AS change_prod_merc_short, -- Concentration ratios (% of OI held by top 4 / top 8 traders) TRY_CAST(conc_gross_le_4_tdr_long_all AS float) AS concentration_top4_long_pct, TRY_CAST(conc_gross_le_4_tdr_short_all AS float) AS concentration_top4_short_pct, TRY_CAST(conc_gross_le_8_tdr_long_all AS float) AS concentration_top8_long_pct, TRY_CAST(conc_gross_le_8_tdr_short_all AS float) AS concentration_top8_short_pct, -- Trader counts TRY_CAST(traders_tot_all AS int) AS traders_total, TRY_CAST(traders_m_money_long_all AS int) AS traders_managed_money_long, TRY_CAST(traders_m_money_short_all AS int) AS traders_managed_money_short, TRY_CAST(traders_m_money_spread_all AS int) AS traders_managed_money_spread, -- Ingest date: derived from landing path year directory -- Path: .../cot/{year}/{etag}.csv.gzip → extract year from [-2] make_date(split(filename, '/')[-2]::int, 1, 1) AS ingest_date, -- Dedup key: hash of business grain + key metrics hash( cftc_commodity_code, report_date_as_yyyy_mm_dd, cftc_contract_market_code, open_interest_all, m_money_positions_long_all, m_money_positions_short_all, prod_merc_positions_long_all, prod_merc_positions_short_all ) AS hkey FROM raw.cot_disaggregated -- Reject rows with null commodity code or malformed date WHERE trim(cftc_commodity_code) IS NOT NULL AND len(trim(cftc_commodity_code)) > 0 AND report_date_as_yyyy_mm_dd::date IS NOT NULL ), deduplicated AS ( SELECT any_value(market_and_exchange_name) AS market_and_exchange_name, any_value(report_date) AS report_date, any_value(cftc_commodity_code) AS cftc_commodity_code, any_value(cftc_contract_market_code) AS cftc_contract_market_code, any_value(contract_units) AS contract_units, any_value(open_interest) AS open_interest, any_value(prod_merc_long) AS prod_merc_long, any_value(prod_merc_short) AS prod_merc_short, any_value(prod_merc_net) AS prod_merc_net, any_value(swap_long) AS swap_long, any_value(swap_short) AS swap_short, any_value(swap_spread) AS swap_spread, any_value(swap_net) AS swap_net, any_value(managed_money_long) AS managed_money_long, any_value(managed_money_short) AS managed_money_short, any_value(managed_money_spread) AS managed_money_spread, any_value(managed_money_net) AS managed_money_net, any_value(other_reportable_long) AS other_reportable_long, any_value(other_reportable_short) AS other_reportable_short, any_value(other_reportable_spread) AS other_reportable_spread, any_value(other_reportable_net) AS other_reportable_net, any_value(nonreportable_long) AS nonreportable_long, any_value(nonreportable_short) AS nonreportable_short, any_value(nonreportable_net) AS nonreportable_net, any_value(change_open_interest) AS change_open_interest, any_value(change_managed_money_long) AS change_managed_money_long, any_value(change_managed_money_short) AS change_managed_money_short, any_value(change_managed_money_net) AS change_managed_money_net, any_value(change_prod_merc_long) AS change_prod_merc_long, any_value(change_prod_merc_short) AS change_prod_merc_short, any_value(concentration_top4_long_pct) AS concentration_top4_long_pct, any_value(concentration_top4_short_pct) AS concentration_top4_short_pct, any_value(concentration_top8_long_pct) AS concentration_top8_long_pct, any_value(concentration_top8_short_pct) AS concentration_top8_short_pct, any_value(traders_total) AS traders_total, any_value(traders_managed_money_long) AS traders_managed_money_long, any_value(traders_managed_money_short) AS traders_managed_money_short, any_value(traders_managed_money_spread) AS traders_managed_money_spread, any_value(ingest_date) AS ingest_date, hkey FROM cast_and_clean GROUP BY hkey ) SELECT * FROM deduplicated WHERE report_date BETWEEN @start_ds AND @end_ds