/* Foundation fact: daily KC=F Coffee C futures prices. */ /* Reads directly from the landing zone, casts varchar columns to proper types, */ /* and deduplicates via hash key. */ /* Covers all available history from the landing directory. */ /* Grain: one row per trade_date. */ /* Dedup: hash of (trade_date, close) — if Yahoo Finance corrects a price, */ /* the new hash triggers a re-ingest on the next incremental run. */ MODEL ( name foundation.fct_coffee_prices, kind INCREMENTAL_BY_TIME_RANGE ( time_column trade_date ), grain ( trade_date ), start '1971-08-16', cron '@daily' ); WITH src AS ( SELECT * FROM READ_CSV( @prices_glob(), compression = 'gzip', header = TRUE, union_by_name = TRUE, filename = TRUE, all_varchar = TRUE ) ), cast_and_clean AS ( SELECT TRY_CAST(Date AS DATE) AS trade_date, TRY_CAST(Open AS DOUBLE) AS open, TRY_CAST(High AS DOUBLE) AS high, TRY_CAST(Low AS DOUBLE) AS low, TRY_CAST(Close AS DOUBLE) AS close, TRY_CAST(Adj_Close AS DOUBLE) AS adj_close, TRY_CAST(Volume AS BIGINT) AS volume, filename AS source_file, /* Filename encodes the content hash — use as ingest identifier */ HASH(Date, Close) AS hkey /* Dedup key: trade date + close price */ FROM src WHERE NOT TRY_CAST(Date AS DATE) IS NULL AND NOT TRY_CAST(Close AS DOUBLE) IS NULL ), deduplicated AS ( SELECT ANY_VALUE(trade_date) AS trade_date, ANY_VALUE(open) AS open, ANY_VALUE(high) AS high, ANY_VALUE(low) AS low, ANY_VALUE(close) AS close, ANY_VALUE(adj_close) AS adj_close, ANY_VALUE(volume) AS volume, ANY_VALUE(source_file) AS source_file, hkey FROM cast_and_clean GROUP BY hkey ) SELECT * FROM deduplicated WHERE trade_date BETWEEN @start_ds AND @end_ds