diff --git a/transform/sqlmesh_materia/models/foundation/dim_commodity.sql b/transform/sqlmesh_materia/models/foundation/dim_commodity.sql index 63233b0..e806664 100644 --- a/transform/sqlmesh_materia/models/foundation/dim_commodity.sql +++ b/transform/sqlmesh_materia/models/foundation/dim_commodity.sql @@ -6,8 +6,8 @@ -- As new commodities are added (cocoa, sugar), rows are added here. -- -- References: --- usda_commodity_code → raw.psd_alldata.commodity_code (numeric string, e.g. '0711100') --- cftc_commodity_code → raw.cot_disaggregated.cftc_commodity_code (3-char, e.g. '083') +-- usda_commodity_code → staging.psdalldata__commodity.commodity_code (numeric string, e.g. '0711100') +-- cftc_commodity_code → foundation.fct_cot_positioning.cftc_commodity_code (3-char, e.g. '083') -- -- NOTE: Defined as FULL model (not SEED) to guarantee leading-zero preservation. -- Pandas CSV loading converts '083' → 83 even with varchar column declarations. diff --git a/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql b/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql index f9d4158..78aff7b 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql @@ -1,6 +1,7 @@ -- Foundation fact: daily KC=F Coffee C futures prices. -- --- Casts raw varchar columns to proper types and deduplicates via hash key. +-- Reads directly from the landing zone, casts varchar columns to proper types, +-- and deduplicates via hash key. -- Covers all available history from the landing directory. -- -- Grain: one row per trade_date. @@ -17,7 +18,18 @@ MODEL ( cron '@daily' ); -WITH cast_and_clean AS ( +WITH src AS ( + SELECT * FROM read_csv( + @prices_glob(), + compression = 'gzip', + header = true, + union_by_name = true, + filename = true, + all_varchar = true + ) +), + +cast_and_clean AS ( SELECT TRY_CAST(Date AS date) AS trade_date, TRY_CAST(Open AS double) AS open, @@ -32,7 +44,7 @@ WITH cast_and_clean AS ( -- Dedup key: trade date + close price hash(Date, Close) AS hkey - FROM raw.coffee_prices + FROM src WHERE TRY_CAST(Date AS date) IS NOT NULL AND TRY_CAST(Close AS double) IS NOT NULL ), diff --git a/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql b/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql index 7c140d0..404b2e1 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql @@ -1,8 +1,8 @@ -- Foundation fact: CFTC COT positioning, weekly grain, all commodities. -- --- Casts raw varchar columns to proper types, cleans column names, --- computes net positions (long - short) per trader category, and --- deduplicates via hash key. Covers all commodities — filtering to +-- Reads directly from the landing zone, casts varchar columns to proper types, +-- cleans column names, computes net positions (long - short) per trader category, +-- and deduplicates via hash key. Covers all commodities — filtering to -- a specific commodity happens in the serving layer. -- -- Grain: one row per (cftc_commodity_code, report_date, cftc_contract_market_code) @@ -19,7 +19,19 @@ MODEL ( cron '@daily' ); -WITH cast_and_clean AS ( +WITH src AS ( + SELECT * FROM read_csv( + @cot_glob(), + compression = 'gzip', + header = true, + union_by_name = true, + filename = true, + all_varchar = true, + max_line_size = 10000000 + ) +), + +cast_and_clean AS ( SELECT -- Identifiers trim(market_and_exchange_names) AS market_and_exchange_name, @@ -103,7 +115,7 @@ WITH cast_and_clean AS ( prod_merc_positions_long_all, prod_merc_positions_short_all ) AS hkey - FROM raw.cot_disaggregated + FROM src -- Reject rows with null commodity code or malformed date WHERE trim(cftc_commodity_code) IS NOT NULL AND len(trim(cftc_commodity_code)) > 0 diff --git a/transform/sqlmesh_materia/models/foundation/fct_ice_aging_stocks.sql b/transform/sqlmesh_materia/models/foundation/fct_ice_aging_stocks.sql index 6f9030f..6643e72 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_ice_aging_stocks.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_ice_aging_stocks.sql @@ -1,58 +1,70 @@ --- Foundation fact: ICE certified Coffee C (Arabica) aging report. --- --- Casts raw varchar columns to proper types and deduplicates via hash key. --- Grain: one row per (report_date, age_bucket). --- Age buckets represent how long coffee has been in certified storage. --- Port columns are in bags (60kg). - -MODEL ( - name foundation.fct_ice_aging_stocks, - kind INCREMENTAL_BY_TIME_RANGE ( - time_column report_date - ), - grain (report_date, age_bucket), - start '2020-01-01', - cron '@daily' -); - -WITH cast_and_clean AS ( - SELECT - TRY_CAST(report_date AS date) AS report_date, - age_bucket, - TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags, - TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags, - TRY_CAST(houston_bags AS bigint) AS houston_bags, - TRY_CAST(miami_bags AS bigint) AS miami_bags, - TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags, - TRY_CAST(new_york_bags AS bigint) AS new_york_bags, - TRY_CAST(total_bags AS bigint) AS total_bags, - - filename AS source_file, - - hash(report_date, age_bucket, total_bags) AS hkey - FROM raw.ice_aging_stocks - WHERE TRY_CAST(report_date AS date) IS NOT NULL - AND age_bucket IS NOT NULL - AND age_bucket != '' -), - -deduplicated AS ( - SELECT - any_value(report_date) AS report_date, - any_value(age_bucket) AS age_bucket, - any_value(antwerp_bags) AS antwerp_bags, - any_value(hamburg_bremen_bags) AS hamburg_bremen_bags, - any_value(houston_bags) AS houston_bags, - any_value(miami_bags) AS miami_bags, - any_value(new_orleans_bags) AS new_orleans_bags, - any_value(new_york_bags) AS new_york_bags, - any_value(total_bags) AS total_bags, - any_value(source_file) AS source_file, - hkey - FROM cast_and_clean - GROUP BY hkey -) - -SELECT * -FROM deduplicated -WHERE report_date BETWEEN @start_ds AND @end_ds +-- Foundation fact: ICE certified Coffee C (Arabica) aging report. +-- +-- Reads directly from the landing zone, casts varchar columns to proper types, +-- and deduplicates via hash key. +-- Grain: one row per (report_date, age_bucket). +-- Age buckets represent how long coffee has been in certified storage. +-- Port columns are in bags (60kg). + +MODEL ( + name foundation.fct_ice_aging_stocks, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column report_date + ), + grain (report_date, age_bucket), + start '2020-01-01', + cron '@daily' +); + +WITH src AS ( + SELECT * FROM read_csv( + @ice_aging_glob(), + compression = 'gzip', + header = true, + union_by_name = true, + filename = true, + all_varchar = true + ) +), + +cast_and_clean AS ( + SELECT + TRY_CAST(report_date AS date) AS report_date, + age_bucket, + TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags, + TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags, + TRY_CAST(houston_bags AS bigint) AS houston_bags, + TRY_CAST(miami_bags AS bigint) AS miami_bags, + TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags, + TRY_CAST(new_york_bags AS bigint) AS new_york_bags, + TRY_CAST(total_bags AS bigint) AS total_bags, + + filename AS source_file, + + hash(report_date, age_bucket, total_bags) AS hkey + FROM src + WHERE TRY_CAST(report_date AS date) IS NOT NULL + AND age_bucket IS NOT NULL + AND age_bucket != '' +), + +deduplicated AS ( + SELECT + any_value(report_date) AS report_date, + any_value(age_bucket) AS age_bucket, + any_value(antwerp_bags) AS antwerp_bags, + any_value(hamburg_bremen_bags) AS hamburg_bremen_bags, + any_value(houston_bags) AS houston_bags, + any_value(miami_bags) AS miami_bags, + any_value(new_orleans_bags) AS new_orleans_bags, + any_value(new_york_bags) AS new_york_bags, + any_value(total_bags) AS total_bags, + any_value(source_file) AS source_file, + hkey + FROM cast_and_clean + GROUP BY hkey +) + +SELECT * +FROM deduplicated +WHERE report_date BETWEEN @start_ds AND @end_ds diff --git a/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks.sql b/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks.sql index 6b9cef7..92973fb 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks.sql @@ -1,6 +1,7 @@ -- Foundation fact: ICE certified Coffee C (Arabica) warehouse stocks. -- --- Casts raw varchar columns to proper types and deduplicates via hash key. +-- Reads directly from the landing zone, casts varchar columns to proper types, +-- and deduplicates via hash key. -- "Certified" means Coffee C graded and stamped as delivery-eligible -- against ICE futures contracts — a key physical supply indicator. -- @@ -16,7 +17,18 @@ MODEL ( cron '@daily' ); -WITH cast_and_clean AS ( +WITH src AS ( + SELECT * FROM read_csv( + @ice_stocks_glob(), + compression = 'gzip', + header = true, + union_by_name = true, + filename = true, + all_varchar = true + ) +), + +cast_and_clean AS ( SELECT TRY_CAST(report_date AS date) AS report_date, TRY_CAST(total_certified_bags AS bigint) AS total_certified_bags, @@ -26,7 +38,7 @@ WITH cast_and_clean AS ( -- Dedup key: report date + total bags hash(report_date, total_certified_bags) AS hkey - FROM raw.ice_warehouse_stocks + FROM src WHERE TRY_CAST(report_date AS date) IS NOT NULL AND TRY_CAST(total_certified_bags AS bigint) IS NOT NULL ), diff --git a/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks_by_port.sql b/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks_by_port.sql index 392070b..d417f87 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks_by_port.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_ice_warehouse_stocks_by_port.sql @@ -1,60 +1,72 @@ --- Foundation fact: ICE historical end-of-month Coffee C certified warehouse stocks by port. --- --- Covers November 1996 to present (30-year history). Casts raw varchar columns --- to proper types and deduplicates via hash key. --- --- Grain: one row per report_date (end-of-month). --- Port columns are in bags (60kg). - -MODEL ( - name foundation.fct_ice_warehouse_stocks_by_port, - kind INCREMENTAL_BY_TIME_RANGE ( - time_column report_date - ), - grain (report_date), - start '1996-11-01', - cron '@daily' -); - -WITH cast_and_clean AS ( - SELECT - TRY_CAST(report_date AS date) AS report_date, - TRY_CAST(new_york_bags AS bigint) AS new_york_bags, - TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags, - TRY_CAST(houston_bags AS bigint) AS houston_bags, - TRY_CAST(miami_bags AS bigint) AS miami_bags, - TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags, - TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags, - TRY_CAST(barcelona_bags AS bigint) AS barcelona_bags, - TRY_CAST(virginia_bags AS bigint) AS virginia_bags, - TRY_CAST(total_bags AS bigint) AS total_bags, - - filename AS source_file, - - hash(report_date, total_bags) AS hkey - FROM raw.ice_warehouse_stocks_by_port - WHERE TRY_CAST(report_date AS date) IS NOT NULL - AND TRY_CAST(total_bags AS bigint) IS NOT NULL -), - -deduplicated AS ( - SELECT - any_value(report_date) AS report_date, - any_value(new_york_bags) AS new_york_bags, - any_value(new_orleans_bags) AS new_orleans_bags, - any_value(houston_bags) AS houston_bags, - any_value(miami_bags) AS miami_bags, - any_value(antwerp_bags) AS antwerp_bags, - any_value(hamburg_bremen_bags) AS hamburg_bremen_bags, - any_value(barcelona_bags) AS barcelona_bags, - any_value(virginia_bags) AS virginia_bags, - any_value(total_bags) AS total_bags, - any_value(source_file) AS source_file, - hkey - FROM cast_and_clean - GROUP BY hkey -) - -SELECT * -FROM deduplicated -WHERE report_date BETWEEN @start_ds AND @end_ds +-- Foundation fact: ICE historical end-of-month Coffee C certified warehouse stocks by port. +-- +-- Reads directly from the landing zone, casts varchar columns to proper types, +-- and deduplicates via hash key. +-- Covers November 1996 to present (30-year history). +-- +-- Grain: one row per report_date (end-of-month). +-- Port columns are in bags (60kg). + +MODEL ( + name foundation.fct_ice_warehouse_stocks_by_port, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column report_date + ), + grain (report_date), + start '1996-11-01', + cron '@daily' +); + +WITH src AS ( + SELECT * FROM read_csv( + @ice_stocks_by_port_glob(), + compression = 'gzip', + header = true, + union_by_name = true, + filename = true, + all_varchar = true + ) +), + +cast_and_clean AS ( + SELECT + TRY_CAST(report_date AS date) AS report_date, + TRY_CAST(new_york_bags AS bigint) AS new_york_bags, + TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags, + TRY_CAST(houston_bags AS bigint) AS houston_bags, + TRY_CAST(miami_bags AS bigint) AS miami_bags, + TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags, + TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags, + TRY_CAST(barcelona_bags AS bigint) AS barcelona_bags, + TRY_CAST(virginia_bags AS bigint) AS virginia_bags, + TRY_CAST(total_bags AS bigint) AS total_bags, + + filename AS source_file, + + hash(report_date, total_bags) AS hkey + FROM src + WHERE TRY_CAST(report_date AS date) IS NOT NULL + AND TRY_CAST(total_bags AS bigint) IS NOT NULL +), + +deduplicated AS ( + SELECT + any_value(report_date) AS report_date, + any_value(new_york_bags) AS new_york_bags, + any_value(new_orleans_bags) AS new_orleans_bags, + any_value(houston_bags) AS houston_bags, + any_value(miami_bags) AS miami_bags, + any_value(antwerp_bags) AS antwerp_bags, + any_value(hamburg_bremen_bags) AS hamburg_bremen_bags, + any_value(barcelona_bags) AS barcelona_bags, + any_value(virginia_bags) AS virginia_bags, + any_value(total_bags) AS total_bags, + any_value(source_file) AS source_file, + hkey + FROM cast_and_clean + GROUP BY hkey +) + +SELECT * +FROM deduplicated +WHERE report_date BETWEEN @start_ds AND @end_ds diff --git a/transform/sqlmesh_materia/models/raw/coffee_prices.sql b/transform/sqlmesh_materia/models/raw/coffee_prices.sql deleted file mode 100644 index 044dd92..0000000 --- a/transform/sqlmesh_materia/models/raw/coffee_prices.sql +++ /dev/null @@ -1,46 +0,0 @@ --- Raw KC=F Coffee C futures prices — technical ingestion layer. --- --- Reads daily OHLCV gzip CSVs from the landing directory. All values are --- varchar; casting happens in foundation.fct_coffee_prices. --- --- Source: Yahoo Finance via yfinance (KC=F ticker) --- Coverage: 1971-present (historical futures data) --- Frequency: daily (trading days only) - -MODEL ( - name raw.coffee_prices, - kind FULL, - grain (Date), - cron '@daily', - columns ( - Date varchar, - Open varchar, - High varchar, - Low varchar, - Close varchar, - Adj_Close varchar, - Volume varchar, - filename varchar - ) -); - -SELECT - "Date" AS Date, - "Open" AS Open, - "High" AS High, - "Low" AS Low, - "Close" AS Close, - "Adj Close" AS Adj_Close, - "Volume" AS Volume, - filename -FROM read_csv( - @prices_glob(), - delim = ',', - encoding = 'utf-8', - compression = 'gzip', - header = true, - union_by_name = true, - filename = true, - all_varchar = true, - ignore_errors = true -) diff --git a/transform/sqlmesh_materia/models/raw/cot_disaggregated.sql b/transform/sqlmesh_materia/models/raw/cot_disaggregated.sql deleted file mode 100644 index 809b228..0000000 --- a/transform/sqlmesh_materia/models/raw/cot_disaggregated.sql +++ /dev/null @@ -1,120 +0,0 @@ --- Raw CFTC Commitment of Traders — Disaggregated Futures Only. --- --- Technical ingestion layer only: reads gzip CSVs from the landing directory --- and surfaces the columns needed by downstream foundation models. --- All values are varchar; casting happens in foundation. --- --- Source: CFTC yearly ZIPs at --- https://www.cftc.gov/files/dea/history/fut_disagg_txt_{year}.zip --- Coverage: June 2006 – present (new file every Friday at 3:30 PM ET) - -MODEL ( - name raw.cot_disaggregated, - kind FULL, - grain (cftc_commodity_code, report_date_as_yyyy_mm_dd, cftc_contract_market_code), - start '2006-06-13', - cron '@daily', - columns ( - market_and_exchange_names varchar, - report_date_as_yyyy_mm_dd varchar, - cftc_commodity_code varchar, - cftc_contract_market_code varchar, - contract_units varchar, - open_interest_all varchar, - prod_merc_positions_long_all varchar, - prod_merc_positions_short_all varchar, - swap_positions_long_all varchar, - swap_positions_short_all varchar, - swap_positions_spread_all varchar, - m_money_positions_long_all varchar, - m_money_positions_short_all varchar, - m_money_positions_spread_all varchar, - other_rept_positions_long_all varchar, - other_rept_positions_short_all varchar, - other_rept_positions_spread_all varchar, - nonrept_positions_long_all varchar, - nonrept_positions_short_all varchar, - change_in_open_interest_all varchar, - change_in_m_money_long_all varchar, - change_in_m_money_short_all varchar, - change_in_prod_merc_long_all varchar, - change_in_prod_merc_short_all varchar, - conc_gross_le_4_tdr_long_all varchar, - conc_gross_le_4_tdr_short_all varchar, - conc_gross_le_8_tdr_long_all varchar, - conc_gross_le_8_tdr_short_all varchar, - traders_tot_all varchar, - traders_m_money_long_all varchar, - traders_m_money_short_all varchar, - traders_m_money_spread_all varchar, - filename varchar - ) -); - -SELECT - -- Identifiers - "Market_and_Exchange_Names" AS market_and_exchange_names, - "Report_Date_as_YYYY-MM-DD" AS report_date_as_yyyy_mm_dd, - "CFTC_Commodity_Code" AS cftc_commodity_code, - "CFTC_Contract_Market_Code" AS cftc_contract_market_code, - "Contract_Units" AS contract_units, - - -- Open interest - "Open_Interest_All" AS open_interest_all, - - -- Producer / Merchant / Processor / User (commercial hedgers) - "Prod_Merc_Positions_Long_All" AS prod_merc_positions_long_all, - "Prod_Merc_Positions_Short_All" AS prod_merc_positions_short_all, - - -- Swap dealers - "Swap_Positions_Long_All" AS swap_positions_long_all, - "Swap__Positions_Short_All" AS swap_positions_short_all, - "Swap__Positions_Spread_All" AS swap_positions_spread_all, - - -- Managed money (hedge funds, CTAs — key speculative signal) - "M_Money_Positions_Long_All" AS m_money_positions_long_all, - "M_Money_Positions_Short_All" AS m_money_positions_short_all, - "M_Money_Positions_Spread_All" AS m_money_positions_spread_all, - - -- Other reportables - "Other_Rept_Positions_Long_All" AS other_rept_positions_long_all, - "Other_Rept_Positions_Short_All" AS other_rept_positions_short_all, - "Other_Rept_Positions_Spread_All" AS other_rept_positions_spread_all, - - -- Non-reportable (small speculators) - "NonRept_Positions_Long_All" AS nonrept_positions_long_all, - "NonRept_Positions_Short_All" AS nonrept_positions_short_all, - - -- Week-over-week changes - "Change_in_Open_Interest_All" AS change_in_open_interest_all, - "Change_in_M_Money_Long_All" AS change_in_m_money_long_all, - "Change_in_M_Money_Short_All" AS change_in_m_money_short_all, - "Change_in_Prod_Merc_Long_All" AS change_in_prod_merc_long_all, - "Change_in_Prod_Merc_Short_All" AS change_in_prod_merc_short_all, - - -- Concentration (% of OI held by top 4 and top 8 traders) - "Conc_Gross_LE_4_TDR_Long_All" AS conc_gross_le_4_tdr_long_all, - "Conc_Gross_LE_4_TDR_Short_All" AS conc_gross_le_4_tdr_short_all, - "Conc_Gross_LE_8_TDR_Long_All" AS conc_gross_le_8_tdr_long_all, - "Conc_Gross_LE_8_TDR_Short_All" AS conc_gross_le_8_tdr_short_all, - - -- Trader counts - "Traders_Tot_All" AS traders_tot_all, - "Traders_M_Money_Long_All" AS traders_m_money_long_all, - "Traders_M_Money_Short_All" AS traders_m_money_short_all, - "Traders_M_Money_Spread_All" AS traders_m_money_spread_all, - - -- Lineage - filename -FROM read_csv( - @cot_glob(), - delim = ',', - encoding = 'utf-8', - compression = 'gzip', - header = true, - union_by_name = true, - filename = true, - all_varchar = true, - max_line_size = 10000000, - ignore_errors = true -) diff --git a/transform/sqlmesh_materia/models/raw/ice_aging_stocks.sql b/transform/sqlmesh_materia/models/raw/ice_aging_stocks.sql deleted file mode 100644 index 777deae..0000000 --- a/transform/sqlmesh_materia/models/raw/ice_aging_stocks.sql +++ /dev/null @@ -1,49 +0,0 @@ --- Raw ICE certified stock aging report — technical ingestion layer. --- --- Reads monthly aging report gzip CSVs from the landing directory. --- All values are varchar; casting happens in foundation.fct_ice_aging_stocks. --- --- Source: ICE Report Center (Certified Stock Aging Report) --- Coverage: varies by download history --- Frequency: monthly (ICE updates after each delivery month) - -MODEL ( - name raw.ice_aging_stocks, - kind FULL, - cron '@daily', - columns ( - report_date varchar, - age_bucket varchar, - antwerp_bags varchar, - hamburg_bremen_bags varchar, - houston_bags varchar, - miami_bags varchar, - new_orleans_bags varchar, - new_york_bags varchar, - total_bags varchar, - filename varchar - ) -); - -SELECT - report_date, - age_bucket, - antwerp_bags, - hamburg_bremen_bags, - houston_bags, - miami_bags, - new_orleans_bags, - new_york_bags, - total_bags, - filename -FROM read_csv( - @ice_aging_glob(), - delim = ',', - encoding = 'utf-8', - compression = 'gzip', - header = true, - union_by_name = true, - filename = true, - all_varchar = true, - ignore_errors = true -) diff --git a/transform/sqlmesh_materia/models/raw/ice_warehouse_stocks.sql b/transform/sqlmesh_materia/models/raw/ice_warehouse_stocks.sql deleted file mode 100644 index 99d0882..0000000 --- a/transform/sqlmesh_materia/models/raw/ice_warehouse_stocks.sql +++ /dev/null @@ -1,37 +0,0 @@ --- Raw ICE certified warehouse stocks — technical ingestion layer. --- --- Reads daily stock report gzip CSVs from the landing directory. --- All values are varchar; casting happens in foundation.fct_ice_warehouse_stocks. --- --- Source: ICE Report Center (Coffee C certified warehouse stocks) --- Coverage: varies by download history --- Frequency: daily (ICE updates after market close) - -MODEL ( - name raw.ice_warehouse_stocks, - kind FULL, - cron '@daily', - columns ( - report_date varchar, - total_certified_bags varchar, - pending_grading_bags varchar, - filename varchar - ) -); - -SELECT - report_date, - total_certified_bags, - pending_grading_bags, - filename -FROM read_csv( - @ice_stocks_glob(), - delim = ',', - encoding = 'utf-8', - compression = 'gzip', - header = true, - union_by_name = true, - filename = true, - all_varchar = true, - ignore_errors = true -) diff --git a/transform/sqlmesh_materia/models/raw/ice_warehouse_stocks_by_port.sql b/transform/sqlmesh_materia/models/raw/ice_warehouse_stocks_by_port.sql deleted file mode 100644 index ce98184..0000000 --- a/transform/sqlmesh_materia/models/raw/ice_warehouse_stocks_by_port.sql +++ /dev/null @@ -1,51 +0,0 @@ --- Raw ICE historical end-of-month warehouse stocks by port — technical ingestion layer. --- --- Reads historical by-port stock gzip CSVs from the landing directory. --- All values are varchar; casting happens in foundation.fct_ice_warehouse_stocks_by_port. --- --- Source: ICE (EOM_KC_cert_stox_by_port_nov96-present.xls) --- Coverage: November 1996 to present --- Frequency: monthly (ICE updates the static file monthly) - -MODEL ( - name raw.ice_warehouse_stocks_by_port, - kind FULL, - cron '@daily', - columns ( - report_date varchar, - new_york_bags varchar, - new_orleans_bags varchar, - houston_bags varchar, - miami_bags varchar, - antwerp_bags varchar, - hamburg_bremen_bags varchar, - barcelona_bags varchar, - virginia_bags varchar, - total_bags varchar, - filename varchar - ) -); - -SELECT - report_date, - new_york_bags, - new_orleans_bags, - houston_bags, - miami_bags, - antwerp_bags, - hamburg_bremen_bags, - barcelona_bags, - virginia_bags, - total_bags, - filename -FROM read_csv( - @ice_stocks_by_port_glob(), - delim = ',', - encoding = 'utf-8', - compression = 'gzip', - header = true, - union_by_name = true, - filename = true, - all_varchar = true, - ignore_errors = true -) diff --git a/transform/sqlmesh_materia/models/raw/psd_data.sql b/transform/sqlmesh_materia/models/raw/psd_data.sql deleted file mode 100644 index db69cfc..0000000 --- a/transform/sqlmesh_materia/models/raw/psd_data.sql +++ /dev/null @@ -1,24 +0,0 @@ -MODEL ( - name raw.psd_alldata, - kind FULL, - grain ( commodity_code, country_code, market_year, calendar_year, month, attribute_id,unit_id ), - start '2006-08-01', - cron '@daily', - columns ( - commodity_code varchar, - commodity_description varchar, - country_code varchar, - country_name varchar, - market_year varchar, - calendar_year varchar, - month varchar, - attribute_id varchar, - attribute_description varchar, - unit_id varchar, - unit_description varchar, - value varchar, - filename varchar -) -); -select * -FROM read_csv(@psd_glob(), delim=',', encoding='utf-8', compression='gzip', max_line_size=10000000, header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true) diff --git a/transform/sqlmesh_materia/models/raw/psd_attribute_codes.sql b/transform/sqlmesh_materia/models/seeds/psd_attribute_codes.sql similarity index 77% rename from transform/sqlmesh_materia/models/raw/psd_attribute_codes.sql rename to transform/sqlmesh_materia/models/seeds/psd_attribute_codes.sql index b187f53..9c63bec 100644 --- a/transform/sqlmesh_materia/models/raw/psd_attribute_codes.sql +++ b/transform/sqlmesh_materia/models/seeds/psd_attribute_codes.sql @@ -1,5 +1,5 @@ MODEL ( - name raw.psd_attribute_codes, + name seeds.psd_attribute_codes, kind SEED ( path '$root/seeds/psd_attribute_codes.csv', csv_settings ( diff --git a/transform/sqlmesh_materia/models/raw/psd_commodity_codes.sql b/transform/sqlmesh_materia/models/seeds/psd_commodity_codes.sql similarity index 77% rename from transform/sqlmesh_materia/models/raw/psd_commodity_codes.sql rename to transform/sqlmesh_materia/models/seeds/psd_commodity_codes.sql index a50ab04..aa599a0 100644 --- a/transform/sqlmesh_materia/models/raw/psd_commodity_codes.sql +++ b/transform/sqlmesh_materia/models/seeds/psd_commodity_codes.sql @@ -1,5 +1,5 @@ MODEL ( - name raw.psd_commodity_codes, + name seeds.psd_commodity_codes, kind SEED ( path '$root/seeds/psd_commodity_codes.csv', csv_settings ( diff --git a/transform/sqlmesh_materia/models/raw/psd_unit_of_measure_codes.sql b/transform/sqlmesh_materia/models/seeds/psd_unit_of_measure_codes.sql similarity index 76% rename from transform/sqlmesh_materia/models/raw/psd_unit_of_measure_codes.sql rename to transform/sqlmesh_materia/models/seeds/psd_unit_of_measure_codes.sql index 5de5c1c..855e9d4 100644 --- a/transform/sqlmesh_materia/models/raw/psd_unit_of_measure_codes.sql +++ b/transform/sqlmesh_materia/models/seeds/psd_unit_of_measure_codes.sql @@ -1,5 +1,5 @@ MODEL ( - name raw.psd_unit_of_measure_codes, + name seeds.psd_unit_of_measure_codes, kind SEED ( path '$root/seeds/psd_unit_of_measure_codes.csv', csv_settings ( diff --git a/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql b/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql index da4d886..03dc8c4 100644 --- a/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql +++ b/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql @@ -8,26 +8,34 @@ MODEL ( ); with cast_dtypes as ( SELECT - raw.psd_alldata.commodity_code::int as commodity_code, + src.commodity_code::int as commodity_code, coalesce(commodity_name, commodity_description) as commodity_name, country_code::varchar(3) as country_code, country_name, market_year::int as market_year, calendar_year::int as calendar_year, month::int as month, - raw.psd_alldata.attribute_id::int as attribute_id, + src.attribute_id::int as attribute_id, coalesce(attribute_name, attribute_description) as attribute_name, - raw.psd_alldata.unit_id::int as unit_id, + src.unit_id::int as unit_id, coalesce(unit_name, unit_description) as unit_name, value::float as value, filename - FROM raw.psd_alldata - left join raw.psd_commodity_codes on raw.psd_commodity_codes.commodity_code = raw.psd_alldata.commodity_code::int - left join raw.psd_unit_of_measure_codes on raw.psd_unit_of_measure_codes.unit_id = raw.psd_alldata.unit_id::int - left join raw.psd_attribute_codes on raw.psd_attribute_codes.attribute_id = raw.psd_alldata.attribute_id::int + FROM read_csv( + @psd_glob(), + compression = 'gzip', + header = true, + union_by_name = true, + filename = true, + all_varchar = true, + max_line_size = 10000000 + ) AS src + left join seeds.psd_commodity_codes on seeds.psd_commodity_codes.commodity_code = src.commodity_code::int + left join seeds.psd_unit_of_measure_codes on seeds.psd_unit_of_measure_codes.unit_id = src.unit_id::int + left join seeds.psd_attribute_codes on seeds.psd_attribute_codes.attribute_id = src.attribute_id::int ), metadata_and_deduplication as ( -select +select any_value(commodity_code) as commodity_code, any_value(commodity_name) as commodity_name, any_value(country_code) as country_code, @@ -39,7 +47,7 @@ select any_value(attribute_name) as attribute_name, any_value(unit_id) as unit_id, any_value(unit_name) as unit_name, - any_value(value) as value, + any_value(value) as value, hash(commodity_code, commodity_name, country_code, country_name, market_year, calendar_year, month, attribute_id, attribute_name, unit_id, unit_name, value) as hkey, any_value(make_date(split(filename, '/')[-3]::int, split(filename, '/')[-2]::int, 1)) as ingest_date, any_value(if(month!=0,last_day(make_date(market_year, month, 1)),null)) as market_date_month_end, diff --git a/transform/sqlmesh_materia/readme.md b/transform/sqlmesh_materia/readme.md index a1bae2a..b7e73e5 100644 --- a/transform/sqlmesh_materia/readme.md +++ b/transform/sqlmesh_materia/readme.md @@ -1,92 +1,82 @@ # Materia SQLMesh Transform Layer -Data transformation pipeline using SQLMesh and DuckDB, implementing a 4-layer architecture. +Data transformation pipeline using SQLMesh and DuckDB, implementing a 3-layer architecture. ## Quick Start ```bash -cd transform/sqlmesh_materia +# From repo root -# Local development (virtual environment) -sqlmesh plan dev_ +# Plan changes (dev environment) +uv run sqlmesh -p transform/sqlmesh_materia plan -# Production -sqlmesh plan prod +# Apply to production +uv run sqlmesh -p transform/sqlmesh_materia plan prod -# Run tests -sqlmesh test +# Run model tests +uv run sqlmesh -p transform/sqlmesh_materia test # Format SQL -sqlmesh format +uv run sqlmesh -p transform/sqlmesh_materia format ``` ## Architecture -### Gateway Configuration +### 3-Layer Data Model -**Single Gateway:** All environments connect to Cloudflare R2 Data Catalog (Apache Iceberg) -- **Production:** `sqlmesh plan prod` -- **Development:** `sqlmesh plan dev_` (isolated virtual environment) +``` +landing/ ← immutable files (extraction output) + ├── psd/{year}/{month}/ ← USDA PSD + ├── cot/{year}/ ← CFTC COT + ├── prices/coffee_kc/ ← KC=F daily prices + ├── ice_stocks/ ← ICE daily warehouse stocks + ├── ice_aging/ ← ICE monthly aging report + └── ice_stocks_by_port/ ← ICE historical EOM by port -SQLMesh manages environment isolation automatically - no need for separate local databases. +staging/ ← read_csv + seed joins + cast (PSD) + └── staging.psdalldata__commodity -### 4-Layer Data Model +seeds/ ← static lookup CSVs (PSD code mappings) + ├── seeds.psd_commodity_codes + ├── seeds.psd_attribute_codes + └── seeds.psd_unit_of_measure_codes -See `models/README.md` for detailed architecture documentation: +foundation/ ← read_csv + cast + dedup (prices, COT, ICE) + ├── foundation.fct_coffee_prices + ├── foundation.fct_cot_positioning + ├── foundation.fct_ice_warehouse_stocks + ├── foundation.fct_ice_aging_stocks + ├── foundation.fct_ice_warehouse_stocks_by_port + └── foundation.dim_commodity -1. **Raw** - Immutable source data -2. **Staging** - Schema, types, basic cleansing -3. **Cleaned** - Business logic, integration -4. **Serving** - Analytics-ready (facts, dimensions, aggregates) - -## Configuration - -**Config:** `config.yaml` -- DuckDB in-memory with R2 Iceberg catalog -- Extensions: httpfs, iceberg -- Auto-apply enabled (no prompts) -- Initialization hooks for R2 secret/catalog attachment - -## Commands - -```bash -# Plan changes for dev environment -sqlmesh plan dev_yourname - -# Plan changes for prod -sqlmesh plan prod - -# Run tests -sqlmesh test - -# Validate models -sqlmesh validate - -# Run audits -sqlmesh audit - -# Format SQL files -sqlmesh format - -# Start web UI -sqlmesh ui +serving/ ← pre-aggregated for web app + ├── serving.coffee_prices + ├── serving.cot_positioning + ├── serving.ice_warehouse_stocks + ├── serving.ice_aging_stocks + ├── serving.ice_warehouse_stocks_by_port + └── serving.commodity_metrics ``` -## Environment Variables (Prod) +### Layer responsibilities -Required for production R2 Iceberg catalog: -- `CLOUDFLARE_API_TOKEN` - R2 API token -- `ICEBERG_REST_URI` - R2 catalog REST endpoint -- `R2_WAREHOUSE_NAME` - Warehouse name (default: "materia") +**staging/** — PSD only: reads landing CSVs directly via `@psd_glob()`, joins seed lookup tables, casts types, deduplicates. Uses INCREMENTAL_BY_TIME_RANGE (ingest_date derived from filename path). -These are injected via Pulumi ESC (`beanflows/prod`) on the supervisor instance. +**seeds/** — Static lookup tables (commodity codes, attribute codes, unit of measure) loaded from `seeds/*.csv`. Referenced by staging. -## Development Workflow +**foundation/** — All other sources (prices, COT, ICE): reads landing CSVs directly via glob macros, casts types, deduplicates. Uses INCREMENTAL_BY_TIME_RANGE. Also holds `dim_commodity` (the cross-source identity mapping). -1. Make changes to models in `models/` -2. Test locally: `sqlmesh test` -3. Plan changes: `sqlmesh plan dev_yourname` -4. Review and apply changes -5. Commit and push to trigger CI/CD +**serving/** — Analytics-ready aggregates consumed by the web app via `analytics.duckdb`. Pre-computes moving averages, COT indices, MoM changes. These are the only tables the web app reads. -SQLMesh will handle environment isolation, table versioning, and incremental updates automatically. +### Why no raw layer? + +Landing files are immutable and content-addressed — the landing directory is the audit trail. A SQL raw layer would just duplicate file bytes into DuckDB with no added value. The first SQL layer reads directly from landing. + +## Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `LANDING_DIR` | `data/landing` | Root of the landing zone | +| `DUCKDB_PATH` | `local.duckdb` | DuckDB file (SQLMesh exclusive write access) | + +The web app reads from a separate `analytics.duckdb` via `export_serving.py`. diff --git a/transform/sqlmesh_materia/tests/test_cot_foundation.yaml b/transform/sqlmesh_materia/tests/test_cot_foundation.yaml deleted file mode 100644 index 654533b..0000000 --- a/transform/sqlmesh_materia/tests/test_cot_foundation.yaml +++ /dev/null @@ -1,108 +0,0 @@ -test_fct_cot_positioning_types_and_net_positions: - model: foundation.fct_cot_positioning - vars: - start: "2024-01-01" - end: "2024-01-31" - inputs: - raw.cot_disaggregated: - rows: - - market_and_exchange_names: "COFFEE C - ICE FUTURES U.S." - report_date_as_yyyy_mm_dd: "2024-01-02" - cftc_commodity_code: "083" - cftc_contract_market_code: "083731" - contract_units: "37,500 POUNDS" - open_interest_all: "250000" - prod_merc_positions_long_all: "80000" - prod_merc_positions_short_all: "90000" - swap_positions_long_all: "30000" - swap_positions_short_all: "35000" - swap_positions_spread_all: "10000" - m_money_positions_long_all: "60000" - m_money_positions_short_all: "40000" - m_money_positions_spread_all: "15000" - other_rept_positions_long_all: "20000" - other_rept_positions_short_all: "18000" - other_rept_positions_spread_all: "5000" - nonrept_positions_long_all: "12000" - nonrept_positions_short_all: "14000" - change_in_open_interest_all: "5000" - change_in_m_money_long_all: "2000" - change_in_m_money_short_all: "-1000" - change_in_prod_merc_long_all: "1000" - change_in_prod_merc_short_all: "500" - conc_gross_le_4_tdr_long_all: "35.5" - conc_gross_le_4_tdr_short_all: "28.3" - conc_gross_le_8_tdr_long_all: "52.1" - conc_gross_le_8_tdr_short_all: "44.7" - traders_tot_all: "450" - traders_m_money_long_all: "85" - traders_m_money_short_all: "62" - traders_m_money_spread_all: "20" - filename: "data/landing/cot/2024/abc123.csv.gzip" - outputs: - partial: true - query: - rows: - - report_date: "2024-01-02" - cftc_commodity_code: "083" - open_interest: 250000 - managed_money_long: 60000 - managed_money_short: 40000 - managed_money_net: 20000 - prod_merc_long: 80000 - prod_merc_short: 90000 - prod_merc_net: -10000 - swap_long: 30000 - swap_short: 35000 - swap_net: -5000 - nonreportable_long: 12000 - nonreportable_short: 14000 - nonreportable_net: -2000 - change_managed_money_net: 3000 - traders_managed_money_long: 85 - traders_managed_money_short: 62 - -test_fct_cot_positioning_rejects_null_commodity: - model: foundation.fct_cot_positioning - vars: - start: "2024-01-01" - end: "2024-01-31" - inputs: - raw.cot_disaggregated: - rows: - - market_and_exchange_names: "SOME OTHER CONTRACT" - report_date_as_yyyy_mm_dd: "2024-01-02" - cftc_commodity_code: "" - cftc_contract_market_code: "999999" - contract_units: "N/A" - open_interest_all: "1000" - prod_merc_positions_long_all: "500" - prod_merc_positions_short_all: "500" - swap_positions_long_all: "0" - swap_positions_short_all: "0" - swap_positions_spread_all: "0" - m_money_positions_long_all: "0" - m_money_positions_short_all: "0" - m_money_positions_spread_all: "0" - other_rept_positions_long_all: "0" - other_rept_positions_short_all: "0" - other_rept_positions_spread_all: "0" - nonrept_positions_long_all: "0" - nonrept_positions_short_all: "0" - change_in_open_interest_all: "0" - change_in_m_money_long_all: "0" - change_in_m_money_short_all: "0" - change_in_prod_merc_long_all: "0" - change_in_prod_merc_short_all: "0" - conc_gross_le_4_tdr_long_all: "0" - conc_gross_le_4_tdr_short_all: "0" - conc_gross_le_8_tdr_long_all: "0" - conc_gross_le_8_tdr_short_all: "0" - traders_tot_all: "10" - traders_m_money_long_all: "0" - traders_m_money_short_all: "0" - traders_m_money_spread_all: "0" - filename: "data/landing/cot/2024/abc123.csv.gzip" - outputs: - query: - rows: []