From da89c2bf6e48cb79f3303cce16fd781ad322b61a Mon Sep 17 00:00:00 2001 From: Deeman Date: Tue, 7 Oct 2025 22:20:48 +0200 Subject: [PATCH] update staging pipeline --- transform/sqlmesh_materia/config.yaml | 6 +++ .../sqlmesh_materia/models/raw/psd_data.sql | 31 +++++++------- .../staging/stg_psdalldata__commodity.sql | 40 ++++++++++++------- 3 files changed, 46 insertions(+), 31 deletions(-) diff --git a/transform/sqlmesh_materia/config.yaml b/transform/sqlmesh_materia/config.yaml index a3944bd..68e7334 100644 --- a/transform/sqlmesh_materia/config.yaml +++ b/transform/sqlmesh_materia/config.yaml @@ -10,6 +10,8 @@ gateways: database: materia_dev.db extensions: - name: zipfs + - name: httpfs + - name: iceberg prod: connection: @@ -17,9 +19,13 @@ gateways: # https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#connection # https://sqlmesh.readthedocs.io/en/stable/integrations/engines/duckdb/#connection-options type: duckdb + database: materia_prod.db extensions: - name: zipfs + - name: httpfs + - name: iceberg + default_gateway: dev diff --git a/transform/sqlmesh_materia/models/raw/psd_data.sql b/transform/sqlmesh_materia/models/raw/psd_data.sql index 71ed40e..62efe88 100644 --- a/transform/sqlmesh_materia/models/raw/psd_data.sql +++ b/transform/sqlmesh_materia/models/raw/psd_data.sql @@ -5,21 +5,20 @@ MODEL ( start '2006-08-01', cron '@daily', columns ( -commodity_code varchar, -commodity_description varchar, -country_code varchar, -country_name varchar, -market_year varchar, -calendar_year varchar, -month varchar, -attribute_id varchar, -attribute_description varchar, -unit_id varchar, -unit_description varchar, -value varchar, -filename varchar + commodity_code varchar, + commodity_description varchar, + country_code varchar, + country_name varchar, + market_year varchar, + calendar_year varchar, + month varchar, + attribute_id varchar, + attribute_description varchar, + unit_id varchar, + unit_description varchar, + value varchar, + filename varchar ) ); -SELECT - * - FROM read_csv('zip:///extract/psdonline/src/psdonline/data/**/*.zip/*.csv', header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true) +SELECT * + FROM read_csv('zip://extract/psdonline/src/psdonline/data/**/*.zip/*.csv', header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true) diff --git a/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql b/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql index bb1976e..c91cb12 100644 --- a/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql +++ b/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql @@ -7,7 +7,7 @@ MODEL ( cron '@daily' ); with cast_dtypes as ( -SELECT + SELECT raw.psd_alldata.commodity_code::int as commodity_code, coalesce(commodity_name, commodity_description) as commodity_name, country_code::varchar(3) as country_code, @@ -20,18 +20,31 @@ SELECT raw.psd_alldata.unit_id::int as unit_id, coalesce(unit_name, unit_description) as unit_name, value::float as value, - filename, + filename FROM raw.psd_alldata -left join raw.psd_commodity_codes on raw.psd_commodity_codes.commodity_code = raw.psd_alldata.commodity_code::int -left join raw.psd_unit_of_measure_codes on raw.psd_unit_of_measure_codes.unit_id = raw.psd_alldata.unit_id::int -left join raw.psd_attribute_codes on raw.psd_attribute_codes.attribute_id = raw.psd_alldata.attribute_id::int + left join raw.psd_commodity_codes on raw.psd_commodity_codes.commodity_code = raw.psd_alldata.commodity_code::int + left join raw.psd_unit_of_measure_codes on raw.psd_unit_of_measure_codes.unit_id = raw.psd_alldata.unit_id::int + left join raw.psd_attribute_codes on raw.psd_attribute_codes.attribute_id = raw.psd_alldata.attribute_id::int ), -metadata as ( -select *, - @GENERATE_SURROGATE_KEY(commodity_code, country_code, market_year, month, attribute_id) as hkey, - make_date(split(filename, '/')[-4]::int, split(filename, '/')[-3]::int, 1) as ingest_date, - if(month!=0,last_day(make_date(market_year, month, 1)),null) as market_date_month_end - from cast_dtypes +metadata_and_deduplication as ( +select + any_value(commodity_code) as commodity_code, + any_value(commodity_name) as commodity_name, + any_value(country_code) as country_code, + any_value(country_name) as country_name, + any_value(market_year) as market_year, + any_value(calendar_year) as calendar_year, + any_value(month) as month, + any_value(attribute_id) as attribute_id, + any_value(attribute_name) as attribute_name, + any_value(unit_id) as unit_id, + any_value(unit_name) as unit_name, + any_value(value) as value, + hash(commodity_code, commodity_name, country_code, country_name, market_year, calendar_year, month, attribute_id, attribute_name, unit_id, unit_name, value) as hkey, + any_value(make_date(split(filename, '/')[-4]::int, split(filename, '/')[-3]::int, 1)) as ingest_date, + any_value(if(month!=0,last_day(make_date(market_year, month, 1)),null)) as market_date_month_end, + from cast_dtypes + group by hkey ) select hkey, commodity_code, @@ -46,9 +59,6 @@ select hkey, unit_id, unit_name, value, - filename, ingest_date, - market_date_month_end - from metadata -qualify row_number() over (partition by hkey) = 1 +from metadata_and_deduplication where ingest_date between @start_ds and @end_ds;