- fct_cot_positioning: quote Swap__Positions_Short_All and Swap__Positions_Spread_All (CSV uses double underscore; DuckDB preserves header names exactly) - fct_cot_positioning: quote Report_Date_as_YYYY-MM-DD (dashes preserved in header) - fct_coffee_prices: quote "Adj Close" (space in CSV header) - openmeteo/execute.py: skip API call in backfill when all daily files already exist (_count_existing_files pre-check prevents 429 rate limit on re-runs) - dev_run.sh: open browser as admin@beanflows.coffee instead of pro@ Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
58 lines
1.8 KiB
SQL
58 lines
1.8 KiB
SQL
/* Foundation fact: daily KC=F Coffee C futures prices. */ /* Reads directly from the landing zone, casts varchar columns to proper types, */ /* and deduplicates via hash key. */ /* Covers all available history from the landing directory. */ /* Grain: one row per trade_date. */ /* Dedup: hash of (trade_date, close) — if Yahoo Finance corrects a price, */ /* the new hash triggers a re-ingest on the next incremental run. */
|
|
MODEL (
|
|
name foundation.fct_coffee_prices,
|
|
kind INCREMENTAL_BY_TIME_RANGE (
|
|
time_column trade_date
|
|
),
|
|
grain (
|
|
trade_date
|
|
),
|
|
start '1971-08-16',
|
|
cron '@daily'
|
|
);
|
|
|
|
WITH src AS (
|
|
SELECT
|
|
*
|
|
FROM READ_CSV(
|
|
@prices_glob(),
|
|
compression = 'gzip',
|
|
header = TRUE,
|
|
union_by_name = TRUE,
|
|
filename = TRUE,
|
|
all_varchar = TRUE
|
|
)
|
|
), cast_and_clean AS (
|
|
SELECT
|
|
TRY_CAST(Date AS DATE) AS trade_date,
|
|
TRY_CAST(Open AS DOUBLE) AS open,
|
|
TRY_CAST(High AS DOUBLE) AS high,
|
|
TRY_CAST(Low AS DOUBLE) AS low,
|
|
TRY_CAST(Close AS DOUBLE) AS close,
|
|
TRY_CAST("Adj Close" AS DOUBLE) AS adj_close,
|
|
TRY_CAST(Volume AS BIGINT) AS volume,
|
|
filename AS source_file, /* Filename encodes the content hash — use as ingest identifier */
|
|
HASH(Date, Close) AS hkey /* Dedup key: trade date + close price */
|
|
FROM src
|
|
WHERE
|
|
NOT TRY_CAST(Date AS DATE) IS NULL AND NOT TRY_CAST(Close AS DOUBLE) IS NULL
|
|
), deduplicated AS (
|
|
SELECT
|
|
ANY_VALUE(trade_date) AS trade_date,
|
|
ANY_VALUE(open) AS open,
|
|
ANY_VALUE(high) AS high,
|
|
ANY_VALUE(low) AS low,
|
|
ANY_VALUE(close) AS close,
|
|
ANY_VALUE(adj_close) AS adj_close,
|
|
ANY_VALUE(volume) AS volume,
|
|
ANY_VALUE(source_file) AS source_file,
|
|
hkey
|
|
FROM cast_and_clean
|
|
GROUP BY
|
|
hkey
|
|
)
|
|
SELECT
|
|
*
|
|
FROM deduplicated
|
|
WHERE
|
|
trade_date BETWEEN @start_ds AND @end_ds |