refactor(transform): remove raw layer, read landing zone directly

- Delete 6 data raw models (coffee_prices, cot_disaggregated, ice_*,
  psd_data) — pure read_csv passthroughs with no added value
- Move 3 PSD seed models raw/ → seeds/, rename schema raw.* → seeds.*
- Update staging.psdalldata__commodity: read_csv(@psd_glob()) directly,
  join seeds.psd_* instead of raw.psd_*
- Update 5 foundation models: inline read_csv() with src CTE, removing
  raw.* dependency (fct_coffee_prices, fct_cot_positioning, fct_ice_*)
- Remove fixture-based SQLMesh test that depended on raw.cot_disaggregated
  (unit tests incompatible with inline read_csv; integration run covers this)
- Update readme.md: 3-layer architecture (staging/foundation → serving)

Landing files are immutable and content-addressed — the landing directory
is the audit trail. A raw SQL layer duplicated file bytes into DuckDB
with no added value.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-22 17:30:18 +01:00
parent 1814a76e74
commit c3c8333407
18 changed files with 266 additions and 643 deletions

View File

@@ -6,8 +6,8 @@
-- As new commodities are added (cocoa, sugar), rows are added here.
--
-- References:
-- usda_commodity_code → raw.psd_alldata.commodity_code (numeric string, e.g. '0711100')
-- cftc_commodity_code → raw.cot_disaggregated.cftc_commodity_code (3-char, e.g. '083')
-- usda_commodity_code → staging.psdalldata__commodity.commodity_code (numeric string, e.g. '0711100')
-- cftc_commodity_code → foundation.fct_cot_positioning.cftc_commodity_code (3-char, e.g. '083')
--
-- NOTE: Defined as FULL model (not SEED) to guarantee leading-zero preservation.
-- Pandas CSV loading converts '083' → 83 even with varchar column declarations.

View File

@@ -1,6 +1,7 @@
-- Foundation fact: daily KC=F Coffee C futures prices.
--
-- Casts raw varchar columns to proper types and deduplicates via hash key.
-- Reads directly from the landing zone, casts varchar columns to proper types,
-- and deduplicates via hash key.
-- Covers all available history from the landing directory.
--
-- Grain: one row per trade_date.
@@ -17,7 +18,18 @@ MODEL (
cron '@daily'
);
WITH cast_and_clean AS (
WITH src AS (
SELECT * FROM read_csv(
@prices_glob(),
compression = 'gzip',
header = true,
union_by_name = true,
filename = true,
all_varchar = true
)
),
cast_and_clean AS (
SELECT
TRY_CAST(Date AS date) AS trade_date,
TRY_CAST(Open AS double) AS open,
@@ -32,7 +44,7 @@ WITH cast_and_clean AS (
-- Dedup key: trade date + close price
hash(Date, Close) AS hkey
FROM raw.coffee_prices
FROM src
WHERE TRY_CAST(Date AS date) IS NOT NULL
AND TRY_CAST(Close AS double) IS NOT NULL
),

View File

@@ -1,8 +1,8 @@
-- Foundation fact: CFTC COT positioning, weekly grain, all commodities.
--
-- Casts raw varchar columns to proper types, cleans column names,
-- computes net positions (long - short) per trader category, and
-- deduplicates via hash key. Covers all commodities — filtering to
-- Reads directly from the landing zone, casts varchar columns to proper types,
-- cleans column names, computes net positions (long - short) per trader category,
-- and deduplicates via hash key. Covers all commodities — filtering to
-- a specific commodity happens in the serving layer.
--
-- Grain: one row per (cftc_commodity_code, report_date, cftc_contract_market_code)
@@ -19,7 +19,19 @@ MODEL (
cron '@daily'
);
WITH cast_and_clean AS (
WITH src AS (
SELECT * FROM read_csv(
@cot_glob(),
compression = 'gzip',
header = true,
union_by_name = true,
filename = true,
all_varchar = true,
max_line_size = 10000000
)
),
cast_and_clean AS (
SELECT
-- Identifiers
trim(market_and_exchange_names) AS market_and_exchange_name,
@@ -103,7 +115,7 @@ WITH cast_and_clean AS (
prod_merc_positions_long_all,
prod_merc_positions_short_all
) AS hkey
FROM raw.cot_disaggregated
FROM src
-- Reject rows with null commodity code or malformed date
WHERE trim(cftc_commodity_code) IS NOT NULL
AND len(trim(cftc_commodity_code)) > 0

View File

@@ -1,58 +1,70 @@
-- Foundation fact: ICE certified Coffee C (Arabica) aging report.
--
-- Casts raw varchar columns to proper types and deduplicates via hash key.
-- Grain: one row per (report_date, age_bucket).
-- Age buckets represent how long coffee has been in certified storage.
-- Port columns are in bags (60kg).
MODEL (
name foundation.fct_ice_aging_stocks,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (report_date, age_bucket),
start '2020-01-01',
cron '@daily'
);
WITH cast_and_clean AS (
SELECT
TRY_CAST(report_date AS date) AS report_date,
age_bucket,
TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags,
TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags,
TRY_CAST(houston_bags AS bigint) AS houston_bags,
TRY_CAST(miami_bags AS bigint) AS miami_bags,
TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags,
TRY_CAST(new_york_bags AS bigint) AS new_york_bags,
TRY_CAST(total_bags AS bigint) AS total_bags,
filename AS source_file,
hash(report_date, age_bucket, total_bags) AS hkey
FROM raw.ice_aging_stocks
WHERE TRY_CAST(report_date AS date) IS NOT NULL
AND age_bucket IS NOT NULL
AND age_bucket != ''
),
deduplicated AS (
SELECT
any_value(report_date) AS report_date,
any_value(age_bucket) AS age_bucket,
any_value(antwerp_bags) AS antwerp_bags,
any_value(hamburg_bremen_bags) AS hamburg_bremen_bags,
any_value(houston_bags) AS houston_bags,
any_value(miami_bags) AS miami_bags,
any_value(new_orleans_bags) AS new_orleans_bags,
any_value(new_york_bags) AS new_york_bags,
any_value(total_bags) AS total_bags,
any_value(source_file) AS source_file,
hkey
FROM cast_and_clean
GROUP BY hkey
)
SELECT *
FROM deduplicated
WHERE report_date BETWEEN @start_ds AND @end_ds
-- Foundation fact: ICE certified Coffee C (Arabica) aging report.
--
-- Reads directly from the landing zone, casts varchar columns to proper types,
-- and deduplicates via hash key.
-- Grain: one row per (report_date, age_bucket).
-- Age buckets represent how long coffee has been in certified storage.
-- Port columns are in bags (60kg).
MODEL (
name foundation.fct_ice_aging_stocks,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (report_date, age_bucket),
start '2020-01-01',
cron '@daily'
);
WITH src AS (
SELECT * FROM read_csv(
@ice_aging_glob(),
compression = 'gzip',
header = true,
union_by_name = true,
filename = true,
all_varchar = true
)
),
cast_and_clean AS (
SELECT
TRY_CAST(report_date AS date) AS report_date,
age_bucket,
TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags,
TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags,
TRY_CAST(houston_bags AS bigint) AS houston_bags,
TRY_CAST(miami_bags AS bigint) AS miami_bags,
TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags,
TRY_CAST(new_york_bags AS bigint) AS new_york_bags,
TRY_CAST(total_bags AS bigint) AS total_bags,
filename AS source_file,
hash(report_date, age_bucket, total_bags) AS hkey
FROM src
WHERE TRY_CAST(report_date AS date) IS NOT NULL
AND age_bucket IS NOT NULL
AND age_bucket != ''
),
deduplicated AS (
SELECT
any_value(report_date) AS report_date,
any_value(age_bucket) AS age_bucket,
any_value(antwerp_bags) AS antwerp_bags,
any_value(hamburg_bremen_bags) AS hamburg_bremen_bags,
any_value(houston_bags) AS houston_bags,
any_value(miami_bags) AS miami_bags,
any_value(new_orleans_bags) AS new_orleans_bags,
any_value(new_york_bags) AS new_york_bags,
any_value(total_bags) AS total_bags,
any_value(source_file) AS source_file,
hkey
FROM cast_and_clean
GROUP BY hkey
)
SELECT *
FROM deduplicated
WHERE report_date BETWEEN @start_ds AND @end_ds

View File

@@ -1,6 +1,7 @@
-- Foundation fact: ICE certified Coffee C (Arabica) warehouse stocks.
--
-- Casts raw varchar columns to proper types and deduplicates via hash key.
-- Reads directly from the landing zone, casts varchar columns to proper types,
-- and deduplicates via hash key.
-- "Certified" means Coffee C graded and stamped as delivery-eligible
-- against ICE futures contracts — a key physical supply indicator.
--
@@ -16,7 +17,18 @@ MODEL (
cron '@daily'
);
WITH cast_and_clean AS (
WITH src AS (
SELECT * FROM read_csv(
@ice_stocks_glob(),
compression = 'gzip',
header = true,
union_by_name = true,
filename = true,
all_varchar = true
)
),
cast_and_clean AS (
SELECT
TRY_CAST(report_date AS date) AS report_date,
TRY_CAST(total_certified_bags AS bigint) AS total_certified_bags,
@@ -26,7 +38,7 @@ WITH cast_and_clean AS (
-- Dedup key: report date + total bags
hash(report_date, total_certified_bags) AS hkey
FROM raw.ice_warehouse_stocks
FROM src
WHERE TRY_CAST(report_date AS date) IS NOT NULL
AND TRY_CAST(total_certified_bags AS bigint) IS NOT NULL
),

View File

@@ -1,60 +1,72 @@
-- Foundation fact: ICE historical end-of-month Coffee C certified warehouse stocks by port.
--
-- Covers November 1996 to present (30-year history). Casts raw varchar columns
-- to proper types and deduplicates via hash key.
--
-- Grain: one row per report_date (end-of-month).
-- Port columns are in bags (60kg).
MODEL (
name foundation.fct_ice_warehouse_stocks_by_port,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (report_date),
start '1996-11-01',
cron '@daily'
);
WITH cast_and_clean AS (
SELECT
TRY_CAST(report_date AS date) AS report_date,
TRY_CAST(new_york_bags AS bigint) AS new_york_bags,
TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags,
TRY_CAST(houston_bags AS bigint) AS houston_bags,
TRY_CAST(miami_bags AS bigint) AS miami_bags,
TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags,
TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags,
TRY_CAST(barcelona_bags AS bigint) AS barcelona_bags,
TRY_CAST(virginia_bags AS bigint) AS virginia_bags,
TRY_CAST(total_bags AS bigint) AS total_bags,
filename AS source_file,
hash(report_date, total_bags) AS hkey
FROM raw.ice_warehouse_stocks_by_port
WHERE TRY_CAST(report_date AS date) IS NOT NULL
AND TRY_CAST(total_bags AS bigint) IS NOT NULL
),
deduplicated AS (
SELECT
any_value(report_date) AS report_date,
any_value(new_york_bags) AS new_york_bags,
any_value(new_orleans_bags) AS new_orleans_bags,
any_value(houston_bags) AS houston_bags,
any_value(miami_bags) AS miami_bags,
any_value(antwerp_bags) AS antwerp_bags,
any_value(hamburg_bremen_bags) AS hamburg_bremen_bags,
any_value(barcelona_bags) AS barcelona_bags,
any_value(virginia_bags) AS virginia_bags,
any_value(total_bags) AS total_bags,
any_value(source_file) AS source_file,
hkey
FROM cast_and_clean
GROUP BY hkey
)
SELECT *
FROM deduplicated
WHERE report_date BETWEEN @start_ds AND @end_ds
-- Foundation fact: ICE historical end-of-month Coffee C certified warehouse stocks by port.
--
-- Reads directly from the landing zone, casts varchar columns to proper types,
-- and deduplicates via hash key.
-- Covers November 1996 to present (30-year history).
--
-- Grain: one row per report_date (end-of-month).
-- Port columns are in bags (60kg).
MODEL (
name foundation.fct_ice_warehouse_stocks_by_port,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (report_date),
start '1996-11-01',
cron '@daily'
);
WITH src AS (
SELECT * FROM read_csv(
@ice_stocks_by_port_glob(),
compression = 'gzip',
header = true,
union_by_name = true,
filename = true,
all_varchar = true
)
),
cast_and_clean AS (
SELECT
TRY_CAST(report_date AS date) AS report_date,
TRY_CAST(new_york_bags AS bigint) AS new_york_bags,
TRY_CAST(new_orleans_bags AS bigint) AS new_orleans_bags,
TRY_CAST(houston_bags AS bigint) AS houston_bags,
TRY_CAST(miami_bags AS bigint) AS miami_bags,
TRY_CAST(antwerp_bags AS bigint) AS antwerp_bags,
TRY_CAST(hamburg_bremen_bags AS bigint) AS hamburg_bremen_bags,
TRY_CAST(barcelona_bags AS bigint) AS barcelona_bags,
TRY_CAST(virginia_bags AS bigint) AS virginia_bags,
TRY_CAST(total_bags AS bigint) AS total_bags,
filename AS source_file,
hash(report_date, total_bags) AS hkey
FROM src
WHERE TRY_CAST(report_date AS date) IS NOT NULL
AND TRY_CAST(total_bags AS bigint) IS NOT NULL
),
deduplicated AS (
SELECT
any_value(report_date) AS report_date,
any_value(new_york_bags) AS new_york_bags,
any_value(new_orleans_bags) AS new_orleans_bags,
any_value(houston_bags) AS houston_bags,
any_value(miami_bags) AS miami_bags,
any_value(antwerp_bags) AS antwerp_bags,
any_value(hamburg_bremen_bags) AS hamburg_bremen_bags,
any_value(barcelona_bags) AS barcelona_bags,
any_value(virginia_bags) AS virginia_bags,
any_value(total_bags) AS total_bags,
any_value(source_file) AS source_file,
hkey
FROM cast_and_clean
GROUP BY hkey
)
SELECT *
FROM deduplicated
WHERE report_date BETWEEN @start_ds AND @end_ds

View File

@@ -1,46 +0,0 @@
-- Raw KC=F Coffee C futures prices — technical ingestion layer.
--
-- Reads daily OHLCV gzip CSVs from the landing directory. All values are
-- varchar; casting happens in foundation.fct_coffee_prices.
--
-- Source: Yahoo Finance via yfinance (KC=F ticker)
-- Coverage: 1971-present (historical futures data)
-- Frequency: daily (trading days only)
MODEL (
name raw.coffee_prices,
kind FULL,
grain (Date),
cron '@daily',
columns (
Date varchar,
Open varchar,
High varchar,
Low varchar,
Close varchar,
Adj_Close varchar,
Volume varchar,
filename varchar
)
);
SELECT
"Date" AS Date,
"Open" AS Open,
"High" AS High,
"Low" AS Low,
"Close" AS Close,
"Adj Close" AS Adj_Close,
"Volume" AS Volume,
filename
FROM read_csv(
@prices_glob(),
delim = ',',
encoding = 'utf-8',
compression = 'gzip',
header = true,
union_by_name = true,
filename = true,
all_varchar = true,
ignore_errors = true
)

View File

@@ -1,120 +0,0 @@
-- Raw CFTC Commitment of Traders — Disaggregated Futures Only.
--
-- Technical ingestion layer only: reads gzip CSVs from the landing directory
-- and surfaces the columns needed by downstream foundation models.
-- All values are varchar; casting happens in foundation.
--
-- Source: CFTC yearly ZIPs at
-- https://www.cftc.gov/files/dea/history/fut_disagg_txt_{year}.zip
-- Coverage: June 2006 present (new file every Friday at 3:30 PM ET)
MODEL (
name raw.cot_disaggregated,
kind FULL,
grain (cftc_commodity_code, report_date_as_yyyy_mm_dd, cftc_contract_market_code),
start '2006-06-13',
cron '@daily',
columns (
market_and_exchange_names varchar,
report_date_as_yyyy_mm_dd varchar,
cftc_commodity_code varchar,
cftc_contract_market_code varchar,
contract_units varchar,
open_interest_all varchar,
prod_merc_positions_long_all varchar,
prod_merc_positions_short_all varchar,
swap_positions_long_all varchar,
swap_positions_short_all varchar,
swap_positions_spread_all varchar,
m_money_positions_long_all varchar,
m_money_positions_short_all varchar,
m_money_positions_spread_all varchar,
other_rept_positions_long_all varchar,
other_rept_positions_short_all varchar,
other_rept_positions_spread_all varchar,
nonrept_positions_long_all varchar,
nonrept_positions_short_all varchar,
change_in_open_interest_all varchar,
change_in_m_money_long_all varchar,
change_in_m_money_short_all varchar,
change_in_prod_merc_long_all varchar,
change_in_prod_merc_short_all varchar,
conc_gross_le_4_tdr_long_all varchar,
conc_gross_le_4_tdr_short_all varchar,
conc_gross_le_8_tdr_long_all varchar,
conc_gross_le_8_tdr_short_all varchar,
traders_tot_all varchar,
traders_m_money_long_all varchar,
traders_m_money_short_all varchar,
traders_m_money_spread_all varchar,
filename varchar
)
);
SELECT
-- Identifiers
"Market_and_Exchange_Names" AS market_and_exchange_names,
"Report_Date_as_YYYY-MM-DD" AS report_date_as_yyyy_mm_dd,
"CFTC_Commodity_Code" AS cftc_commodity_code,
"CFTC_Contract_Market_Code" AS cftc_contract_market_code,
"Contract_Units" AS contract_units,
-- Open interest
"Open_Interest_All" AS open_interest_all,
-- Producer / Merchant / Processor / User (commercial hedgers)
"Prod_Merc_Positions_Long_All" AS prod_merc_positions_long_all,
"Prod_Merc_Positions_Short_All" AS prod_merc_positions_short_all,
-- Swap dealers
"Swap_Positions_Long_All" AS swap_positions_long_all,
"Swap__Positions_Short_All" AS swap_positions_short_all,
"Swap__Positions_Spread_All" AS swap_positions_spread_all,
-- Managed money (hedge funds, CTAs — key speculative signal)
"M_Money_Positions_Long_All" AS m_money_positions_long_all,
"M_Money_Positions_Short_All" AS m_money_positions_short_all,
"M_Money_Positions_Spread_All" AS m_money_positions_spread_all,
-- Other reportables
"Other_Rept_Positions_Long_All" AS other_rept_positions_long_all,
"Other_Rept_Positions_Short_All" AS other_rept_positions_short_all,
"Other_Rept_Positions_Spread_All" AS other_rept_positions_spread_all,
-- Non-reportable (small speculators)
"NonRept_Positions_Long_All" AS nonrept_positions_long_all,
"NonRept_Positions_Short_All" AS nonrept_positions_short_all,
-- Week-over-week changes
"Change_in_Open_Interest_All" AS change_in_open_interest_all,
"Change_in_M_Money_Long_All" AS change_in_m_money_long_all,
"Change_in_M_Money_Short_All" AS change_in_m_money_short_all,
"Change_in_Prod_Merc_Long_All" AS change_in_prod_merc_long_all,
"Change_in_Prod_Merc_Short_All" AS change_in_prod_merc_short_all,
-- Concentration (% of OI held by top 4 and top 8 traders)
"Conc_Gross_LE_4_TDR_Long_All" AS conc_gross_le_4_tdr_long_all,
"Conc_Gross_LE_4_TDR_Short_All" AS conc_gross_le_4_tdr_short_all,
"Conc_Gross_LE_8_TDR_Long_All" AS conc_gross_le_8_tdr_long_all,
"Conc_Gross_LE_8_TDR_Short_All" AS conc_gross_le_8_tdr_short_all,
-- Trader counts
"Traders_Tot_All" AS traders_tot_all,
"Traders_M_Money_Long_All" AS traders_m_money_long_all,
"Traders_M_Money_Short_All" AS traders_m_money_short_all,
"Traders_M_Money_Spread_All" AS traders_m_money_spread_all,
-- Lineage
filename
FROM read_csv(
@cot_glob(),
delim = ',',
encoding = 'utf-8',
compression = 'gzip',
header = true,
union_by_name = true,
filename = true,
all_varchar = true,
max_line_size = 10000000,
ignore_errors = true
)

View File

@@ -1,49 +0,0 @@
-- Raw ICE certified stock aging report — technical ingestion layer.
--
-- Reads monthly aging report gzip CSVs from the landing directory.
-- All values are varchar; casting happens in foundation.fct_ice_aging_stocks.
--
-- Source: ICE Report Center (Certified Stock Aging Report)
-- Coverage: varies by download history
-- Frequency: monthly (ICE updates after each delivery month)
MODEL (
name raw.ice_aging_stocks,
kind FULL,
cron '@daily',
columns (
report_date varchar,
age_bucket varchar,
antwerp_bags varchar,
hamburg_bremen_bags varchar,
houston_bags varchar,
miami_bags varchar,
new_orleans_bags varchar,
new_york_bags varchar,
total_bags varchar,
filename varchar
)
);
SELECT
report_date,
age_bucket,
antwerp_bags,
hamburg_bremen_bags,
houston_bags,
miami_bags,
new_orleans_bags,
new_york_bags,
total_bags,
filename
FROM read_csv(
@ice_aging_glob(),
delim = ',',
encoding = 'utf-8',
compression = 'gzip',
header = true,
union_by_name = true,
filename = true,
all_varchar = true,
ignore_errors = true
)

View File

@@ -1,37 +0,0 @@
-- Raw ICE certified warehouse stocks — technical ingestion layer.
--
-- Reads daily stock report gzip CSVs from the landing directory.
-- All values are varchar; casting happens in foundation.fct_ice_warehouse_stocks.
--
-- Source: ICE Report Center (Coffee C certified warehouse stocks)
-- Coverage: varies by download history
-- Frequency: daily (ICE updates after market close)
MODEL (
name raw.ice_warehouse_stocks,
kind FULL,
cron '@daily',
columns (
report_date varchar,
total_certified_bags varchar,
pending_grading_bags varchar,
filename varchar
)
);
SELECT
report_date,
total_certified_bags,
pending_grading_bags,
filename
FROM read_csv(
@ice_stocks_glob(),
delim = ',',
encoding = 'utf-8',
compression = 'gzip',
header = true,
union_by_name = true,
filename = true,
all_varchar = true,
ignore_errors = true
)

View File

@@ -1,51 +0,0 @@
-- Raw ICE historical end-of-month warehouse stocks by port — technical ingestion layer.
--
-- Reads historical by-port stock gzip CSVs from the landing directory.
-- All values are varchar; casting happens in foundation.fct_ice_warehouse_stocks_by_port.
--
-- Source: ICE (EOM_KC_cert_stox_by_port_nov96-present.xls)
-- Coverage: November 1996 to present
-- Frequency: monthly (ICE updates the static file monthly)
MODEL (
name raw.ice_warehouse_stocks_by_port,
kind FULL,
cron '@daily',
columns (
report_date varchar,
new_york_bags varchar,
new_orleans_bags varchar,
houston_bags varchar,
miami_bags varchar,
antwerp_bags varchar,
hamburg_bremen_bags varchar,
barcelona_bags varchar,
virginia_bags varchar,
total_bags varchar,
filename varchar
)
);
SELECT
report_date,
new_york_bags,
new_orleans_bags,
houston_bags,
miami_bags,
antwerp_bags,
hamburg_bremen_bags,
barcelona_bags,
virginia_bags,
total_bags,
filename
FROM read_csv(
@ice_stocks_by_port_glob(),
delim = ',',
encoding = 'utf-8',
compression = 'gzip',
header = true,
union_by_name = true,
filename = true,
all_varchar = true,
ignore_errors = true
)

View File

@@ -1,24 +0,0 @@
MODEL (
name raw.psd_alldata,
kind FULL,
grain ( commodity_code, country_code, market_year, calendar_year, month, attribute_id,unit_id ),
start '2006-08-01',
cron '@daily',
columns (
commodity_code varchar,
commodity_description varchar,
country_code varchar,
country_name varchar,
market_year varchar,
calendar_year varchar,
month varchar,
attribute_id varchar,
attribute_description varchar,
unit_id varchar,
unit_description varchar,
value varchar,
filename varchar
)
);
select *
FROM read_csv(@psd_glob(), delim=',', encoding='utf-8', compression='gzip', max_line_size=10000000, header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true)

View File

@@ -1,5 +1,5 @@
MODEL (
name raw.psd_attribute_codes,
name seeds.psd_attribute_codes,
kind SEED (
path '$root/seeds/psd_attribute_codes.csv',
csv_settings (

View File

@@ -1,5 +1,5 @@
MODEL (
name raw.psd_commodity_codes,
name seeds.psd_commodity_codes,
kind SEED (
path '$root/seeds/psd_commodity_codes.csv',
csv_settings (

View File

@@ -1,5 +1,5 @@
MODEL (
name raw.psd_unit_of_measure_codes,
name seeds.psd_unit_of_measure_codes,
kind SEED (
path '$root/seeds/psd_unit_of_measure_codes.csv',
csv_settings (

View File

@@ -8,26 +8,34 @@ MODEL (
);
with cast_dtypes as (
SELECT
raw.psd_alldata.commodity_code::int as commodity_code,
src.commodity_code::int as commodity_code,
coalesce(commodity_name, commodity_description) as commodity_name,
country_code::varchar(3) as country_code,
country_name,
market_year::int as market_year,
calendar_year::int as calendar_year,
month::int as month,
raw.psd_alldata.attribute_id::int as attribute_id,
src.attribute_id::int as attribute_id,
coalesce(attribute_name, attribute_description) as attribute_name,
raw.psd_alldata.unit_id::int as unit_id,
src.unit_id::int as unit_id,
coalesce(unit_name, unit_description) as unit_name,
value::float as value,
filename
FROM raw.psd_alldata
left join raw.psd_commodity_codes on raw.psd_commodity_codes.commodity_code = raw.psd_alldata.commodity_code::int
left join raw.psd_unit_of_measure_codes on raw.psd_unit_of_measure_codes.unit_id = raw.psd_alldata.unit_id::int
left join raw.psd_attribute_codes on raw.psd_attribute_codes.attribute_id = raw.psd_alldata.attribute_id::int
FROM read_csv(
@psd_glob(),
compression = 'gzip',
header = true,
union_by_name = true,
filename = true,
all_varchar = true,
max_line_size = 10000000
) AS src
left join seeds.psd_commodity_codes on seeds.psd_commodity_codes.commodity_code = src.commodity_code::int
left join seeds.psd_unit_of_measure_codes on seeds.psd_unit_of_measure_codes.unit_id = src.unit_id::int
left join seeds.psd_attribute_codes on seeds.psd_attribute_codes.attribute_id = src.attribute_id::int
),
metadata_and_deduplication as (
select
select
any_value(commodity_code) as commodity_code,
any_value(commodity_name) as commodity_name,
any_value(country_code) as country_code,
@@ -39,7 +47,7 @@ select
any_value(attribute_name) as attribute_name,
any_value(unit_id) as unit_id,
any_value(unit_name) as unit_name,
any_value(value) as value,
any_value(value) as value,
hash(commodity_code, commodity_name, country_code, country_name, market_year, calendar_year, month, attribute_id, attribute_name, unit_id, unit_name, value) as hkey,
any_value(make_date(split(filename, '/')[-3]::int, split(filename, '/')[-2]::int, 1)) as ingest_date,
any_value(if(month!=0,last_day(make_date(market_year, month, 1)),null)) as market_date_month_end,