Files
beanflows/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql

146 lines
8.2 KiB
SQL

/* Foundation fact: CFTC COT positioning, weekly grain, all commodities. */ /* Reads directly from the landing zone, casts varchar columns to proper types, */ /* cleans column names, computes net positions (long - short) per trader category, */ /* and deduplicates via hash key. Covers all commodities — filtering to */ /* a specific commodity happens in the serving layer. */ /* Grain: one row per (cftc_commodity_code, report_date, cftc_contract_market_code) */ /* History: revisions appear as new rows with a later ingest_date. */ /* Serving layer picks max(ingest_date) per grain for latest view. */
MODEL (
name foundation.fct_cot_positioning,
kind INCREMENTAL_BY_TIME_RANGE (
time_column report_date
),
grain (cftc_commodity_code, report_date, cftc_contract_market_code, ingest_date, report_type),
start '2006-06-13',
cron '@daily'
);
WITH src AS (
SELECT
*
FROM READ_CSV(
@cot_glob(),
compression = 'gzip',
header = TRUE,
union_by_name = TRUE,
filename = TRUE,
all_varchar = TRUE,
max_line_size = 10000000
)
UNION ALL BY NAME
SELECT
*
FROM READ_CSV(
@cot_combined_glob(),
compression = 'gzip',
header = TRUE,
union_by_name = TRUE,
filename = TRUE,
all_varchar = TRUE,
max_line_size = 10000000
)
), cast_and_clean AS (
SELECT
TRIM(market_and_exchange_names) AS market_and_exchange_name, /* Identifiers */
"Report_Date_as_YYYY-MM-DD"::DATE AS report_date,
TRIM(cftc_commodity_code) AS cftc_commodity_code,
TRIM(cftc_contract_market_code) AS cftc_contract_market_code,
TRIM(contract_units) AS contract_units,
TRIM("FutOnly_or_Combined") AS report_type, /* 'FutOnly' or 'Combined' — discriminates the two CFTC report variants */
TRY_CAST(open_interest_all AS INT) AS open_interest, /* Open interest */ /* CFTC uses '.' as null for any field — use TRY_CAST throughout */
TRY_CAST(prod_merc_positions_long_all AS INT) AS prod_merc_long, /* Producer / Merchant (commercial hedgers: exporters, processors) */
TRY_CAST(prod_merc_positions_short_all AS INT) AS prod_merc_short,
TRY_CAST(swap_positions_long_all AS INT) AS swap_long, /* Swap dealers */
TRY_CAST("Swap__Positions_Short_All" AS INT) AS swap_short,
TRY_CAST("Swap__Positions_Spread_All" AS INT) AS swap_spread,
TRY_CAST(m_money_positions_long_all AS INT) AS managed_money_long, /* Managed money (hedge funds, CTAs — the primary speculative signal) */
TRY_CAST(m_money_positions_short_all AS INT) AS managed_money_short,
TRY_CAST(m_money_positions_spread_all AS INT) AS managed_money_spread,
TRY_CAST(other_rept_positions_long_all AS INT) AS other_reportable_long, /* Other reportables */
TRY_CAST(other_rept_positions_short_all AS INT) AS other_reportable_short,
TRY_CAST(other_rept_positions_spread_all AS INT) AS other_reportable_spread,
TRY_CAST(nonrept_positions_long_all AS INT) AS nonreportable_long, /* Non-reportable (small speculators, below reporting threshold) */
TRY_CAST(nonrept_positions_short_all AS INT) AS nonreportable_short,
TRY_CAST(prod_merc_positions_long_all AS INT) /* Net positions (long minus short per category) */ - TRY_CAST(prod_merc_positions_short_all AS INT) AS prod_merc_net,
TRY_CAST(m_money_positions_long_all AS INT) - TRY_CAST(m_money_positions_short_all AS INT) AS managed_money_net,
TRY_CAST(swap_positions_long_all AS INT) - TRY_CAST("Swap__Positions_Short_All" AS INT) AS swap_net,
TRY_CAST(other_rept_positions_long_all AS INT) - TRY_CAST(other_rept_positions_short_all AS INT) AS other_reportable_net,
TRY_CAST(nonrept_positions_long_all AS INT) - TRY_CAST(nonrept_positions_short_all AS INT) AS nonreportable_net,
TRY_CAST(change_in_open_interest_all AS INT) AS change_open_interest, /* Week-over-week changes */
TRY_CAST(change_in_m_money_long_all AS INT) AS change_managed_money_long,
TRY_CAST(change_in_m_money_short_all AS INT) AS change_managed_money_short,
TRY_CAST(change_in_m_money_long_all AS INT) - TRY_CAST(change_in_m_money_short_all AS INT) AS change_managed_money_net,
TRY_CAST(change_in_prod_merc_long_all AS INT) AS change_prod_merc_long,
TRY_CAST(change_in_prod_merc_short_all AS INT) AS change_prod_merc_short,
TRY_CAST(conc_gross_le_4_tdr_long_all AS REAL) AS concentration_top4_long_pct, /* Concentration ratios (% of OI held by top 4 / top 8 traders) */
TRY_CAST(conc_gross_le_4_tdr_short_all AS REAL) AS concentration_top4_short_pct,
TRY_CAST(conc_gross_le_8_tdr_long_all AS REAL) AS concentration_top8_long_pct,
TRY_CAST(conc_gross_le_8_tdr_short_all AS REAL) AS concentration_top8_short_pct,
TRY_CAST(traders_tot_all AS INT) AS traders_total, /* Trader counts */
TRY_CAST(traders_m_money_long_all AS INT) AS traders_managed_money_long,
TRY_CAST(traders_m_money_short_all AS INT) AS traders_managed_money_short,
TRY_CAST(traders_m_money_spread_all AS INT) AS traders_managed_money_spread,
MAKE_DATE(STR_SPLIT(filename, '/')[-2]::INT, 1, 1) AS ingest_date, /* Ingest date: derived from landing path year directory */ /* Path: .../cot/{year}/{etag}.csv.gzip → extract year from [-2] */
HASH(
cftc_commodity_code,
"Report_Date_as_YYYY-MM-DD",
cftc_contract_market_code,
"FutOnly_or_Combined",
open_interest_all,
m_money_positions_long_all,
m_money_positions_short_all,
prod_merc_positions_long_all,
prod_merc_positions_short_all
) AS hkey /* Dedup key: hash of business grain + key metrics; includes report variant so fut-only and combined rows get distinct keys */
FROM src
/* Reject rows with null commodity code or malformed date */
WHERE
NOT TRIM(cftc_commodity_code) IS NULL
AND LENGTH(TRIM(cftc_commodity_code)) > 0
AND NOT "Report_Date_as_YYYY-MM-DD"::DATE IS NULL
), deduplicated AS (
SELECT
ANY_VALUE(market_and_exchange_name) AS market_and_exchange_name,
ANY_VALUE(report_date) AS report_date,
ANY_VALUE(cftc_commodity_code) AS cftc_commodity_code,
ANY_VALUE(cftc_contract_market_code) AS cftc_contract_market_code,
ANY_VALUE(contract_units) AS contract_units,
ANY_VALUE(open_interest) AS open_interest,
ANY_VALUE(prod_merc_long) AS prod_merc_long,
ANY_VALUE(prod_merc_short) AS prod_merc_short,
ANY_VALUE(prod_merc_net) AS prod_merc_net,
ANY_VALUE(swap_long) AS swap_long,
ANY_VALUE(swap_short) AS swap_short,
ANY_VALUE(swap_spread) AS swap_spread,
ANY_VALUE(swap_net) AS swap_net,
ANY_VALUE(managed_money_long) AS managed_money_long,
ANY_VALUE(managed_money_short) AS managed_money_short,
ANY_VALUE(managed_money_spread) AS managed_money_spread,
ANY_VALUE(managed_money_net) AS managed_money_net,
ANY_VALUE(other_reportable_long) AS other_reportable_long,
ANY_VALUE(other_reportable_short) AS other_reportable_short,
ANY_VALUE(other_reportable_spread) AS other_reportable_spread,
ANY_VALUE(other_reportable_net) AS other_reportable_net,
ANY_VALUE(nonreportable_long) AS nonreportable_long,
ANY_VALUE(nonreportable_short) AS nonreportable_short,
ANY_VALUE(nonreportable_net) AS nonreportable_net,
ANY_VALUE(change_open_interest) AS change_open_interest,
ANY_VALUE(change_managed_money_long) AS change_managed_money_long,
ANY_VALUE(change_managed_money_short) AS change_managed_money_short,
ANY_VALUE(change_managed_money_net) AS change_managed_money_net,
ANY_VALUE(change_prod_merc_long) AS change_prod_merc_long,
ANY_VALUE(change_prod_merc_short) AS change_prod_merc_short,
ANY_VALUE(concentration_top4_long_pct) AS concentration_top4_long_pct,
ANY_VALUE(concentration_top4_short_pct) AS concentration_top4_short_pct,
ANY_VALUE(concentration_top8_long_pct) AS concentration_top8_long_pct,
ANY_VALUE(concentration_top8_short_pct) AS concentration_top8_short_pct,
ANY_VALUE(traders_total) AS traders_total,
ANY_VALUE(traders_managed_money_long) AS traders_managed_money_long,
ANY_VALUE(traders_managed_money_short) AS traders_managed_money_short,
ANY_VALUE(traders_managed_money_spread) AS traders_managed_money_spread,
ANY_VALUE(ingest_date) AS ingest_date,
ANY_VALUE(report_type) AS report_type,
hkey
FROM cast_and_clean
GROUP BY
hkey
)
SELECT
*
FROM deduplicated
WHERE
report_date BETWEEN @start_ds AND @end_ds