update staging pipeline

This commit is contained in:
Deeman
2025-10-07 22:20:48 +02:00
parent 0a409acbea
commit da89c2bf6e
3 changed files with 46 additions and 31 deletions

View File

@@ -10,6 +10,8 @@ gateways:
database: materia_dev.db database: materia_dev.db
extensions: extensions:
- name: zipfs - name: zipfs
- name: httpfs
- name: iceberg
prod: prod:
connection: connection:
@@ -17,9 +19,13 @@ gateways:
# https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#connection # https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#connection
# https://sqlmesh.readthedocs.io/en/stable/integrations/engines/duckdb/#connection-options # https://sqlmesh.readthedocs.io/en/stable/integrations/engines/duckdb/#connection-options
type: duckdb type: duckdb
database: materia_prod.db database: materia_prod.db
extensions: extensions:
- name: zipfs - name: zipfs
- name: httpfs
- name: iceberg
default_gateway: dev default_gateway: dev

View File

@@ -5,21 +5,20 @@ MODEL (
start '2006-08-01', start '2006-08-01',
cron '@daily', cron '@daily',
columns ( columns (
commodity_code varchar, commodity_code varchar,
commodity_description varchar, commodity_description varchar,
country_code varchar, country_code varchar,
country_name varchar, country_name varchar,
market_year varchar, market_year varchar,
calendar_year varchar, calendar_year varchar,
month varchar, month varchar,
attribute_id varchar, attribute_id varchar,
attribute_description varchar, attribute_description varchar,
unit_id varchar, unit_id varchar,
unit_description varchar, unit_description varchar,
value varchar, value varchar,
filename varchar filename varchar
) )
); );
SELECT SELECT *
* FROM read_csv('zip://extract/psdonline/src/psdonline/data/**/*.zip/*.csv', header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true)
FROM read_csv('zip:///extract/psdonline/src/psdonline/data/**/*.zip/*.csv', header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true)

View File

@@ -7,7 +7,7 @@ MODEL (
cron '@daily' cron '@daily'
); );
with cast_dtypes as ( with cast_dtypes as (
SELECT SELECT
raw.psd_alldata.commodity_code::int as commodity_code, raw.psd_alldata.commodity_code::int as commodity_code,
coalesce(commodity_name, commodity_description) as commodity_name, coalesce(commodity_name, commodity_description) as commodity_name,
country_code::varchar(3) as country_code, country_code::varchar(3) as country_code,
@@ -20,18 +20,31 @@ SELECT
raw.psd_alldata.unit_id::int as unit_id, raw.psd_alldata.unit_id::int as unit_id,
coalesce(unit_name, unit_description) as unit_name, coalesce(unit_name, unit_description) as unit_name,
value::float as value, value::float as value,
filename, filename
FROM raw.psd_alldata FROM raw.psd_alldata
left join raw.psd_commodity_codes on raw.psd_commodity_codes.commodity_code = raw.psd_alldata.commodity_code::int left join raw.psd_commodity_codes on raw.psd_commodity_codes.commodity_code = raw.psd_alldata.commodity_code::int
left join raw.psd_unit_of_measure_codes on raw.psd_unit_of_measure_codes.unit_id = raw.psd_alldata.unit_id::int left join raw.psd_unit_of_measure_codes on raw.psd_unit_of_measure_codes.unit_id = raw.psd_alldata.unit_id::int
left join raw.psd_attribute_codes on raw.psd_attribute_codes.attribute_id = raw.psd_alldata.attribute_id::int left join raw.psd_attribute_codes on raw.psd_attribute_codes.attribute_id = raw.psd_alldata.attribute_id::int
), ),
metadata as ( metadata_and_deduplication as (
select *, select
@GENERATE_SURROGATE_KEY(commodity_code, country_code, market_year, month, attribute_id) as hkey, any_value(commodity_code) as commodity_code,
make_date(split(filename, '/')[-4]::int, split(filename, '/')[-3]::int, 1) as ingest_date, any_value(commodity_name) as commodity_name,
if(month!=0,last_day(make_date(market_year, month, 1)),null) as market_date_month_end any_value(country_code) as country_code,
any_value(country_name) as country_name,
any_value(market_year) as market_year,
any_value(calendar_year) as calendar_year,
any_value(month) as month,
any_value(attribute_id) as attribute_id,
any_value(attribute_name) as attribute_name,
any_value(unit_id) as unit_id,
any_value(unit_name) as unit_name,
any_value(value) as value,
hash(commodity_code, commodity_name, country_code, country_name, market_year, calendar_year, month, attribute_id, attribute_name, unit_id, unit_name, value) as hkey,
any_value(make_date(split(filename, '/')[-4]::int, split(filename, '/')[-3]::int, 1)) as ingest_date,
any_value(if(month!=0,last_day(make_date(market_year, month, 1)),null)) as market_date_month_end,
from cast_dtypes from cast_dtypes
group by hkey
) )
select hkey, select hkey,
commodity_code, commodity_code,
@@ -46,9 +59,6 @@ select hkey,
unit_id, unit_id,
unit_name, unit_name,
value, value,
filename,
ingest_date, ingest_date,
market_date_month_end from metadata_and_deduplication
from metadata
qualify row_number() over (partition by hkey) = 1
where ingest_date between @start_ds and @end_ds; where ingest_date between @start_ds and @end_ds;