refactor
This commit is contained in:
@@ -1,26 +1,25 @@
|
||||
MODEL (
|
||||
name raw.psd_alldata,
|
||||
kind FULL,
|
||||
grain ( commodity_code, country_code, market_year, calendar_year, month, attribute_id,unit_id ),
|
||||
start '2006-08-01',
|
||||
cron '@daily'
|
||||
cron '@daily',
|
||||
columns (
|
||||
commodity_code varchar,
|
||||
commodity_description varchar,
|
||||
country_code varchar,
|
||||
country_name varchar,
|
||||
market_year varchar,
|
||||
calendar_year varchar,
|
||||
month varchar,
|
||||
attribute_id varchar,
|
||||
attribute_description varchar,
|
||||
unit_id varchar,
|
||||
unit_description varchar,
|
||||
value varchar,
|
||||
filename varchar
|
||||
)
|
||||
);
|
||||
|
||||
SELECT
|
||||
*
|
||||
--format('{}-{}-01',split(filename, '/')[-4],split(filename, '/')[-3])::date as ingest_date
|
||||
FROM read_csv('zip:///home/deeman/projects/materia/extract/psdonline/src/psdonline/data/**/*.zip/*.csv', header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], types=
|
||||
{
|
||||
'commodity_code' : 'VARCHAR',
|
||||
'commodity_description' :'VARCHAR',
|
||||
'country_code' : 'VARCHAR',
|
||||
'country_name' : 'VARCHAR',
|
||||
'market_year' : 'BIGINT' ,
|
||||
'calendar_year' : 'BIGINT' ,
|
||||
'month' : 'VARCHAR',
|
||||
'attribute_id' : 'VARCHAR',
|
||||
'attribute_description' :'VARCHAR',
|
||||
'unit_id' : 'VARCHAR',
|
||||
'unit_description' : 'VARCHAR',
|
||||
'value' : 'DOUBLE'
|
||||
}
|
||||
)
|
||||
FROM read_csv('zip:///home/deeman/projects/materia/extract/psdonline/src/psdonline/data/**/*.zip/*.csv', header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true)
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
MODEL (
|
||||
name staging.psd_alldata,
|
||||
kind INCREMENTAL_BY_TIME_RANGE (
|
||||
time_column ingest_date
|
||||
),
|
||||
start '2006-08-01',
|
||||
cron '@daily'
|
||||
);
|
||||
|
||||
SELECT
|
||||
@GENERATE_SURROGATE_KEY(commodity_code, country_code, market_year, month, attribute_id) as hkey,
|
||||
commodity_code,
|
||||
coalesce(commodity_name, commodity_description) as commodity_name,
|
||||
country_code,
|
||||
country_name,
|
||||
market_year,
|
||||
calendar_year,
|
||||
month,
|
||||
attribute_id,
|
||||
coalesce(attribute_name, attribute_description) as attribute_name,
|
||||
unit_id,
|
||||
coalesce(unit_name, unit_description) as unit_name,
|
||||
value,
|
||||
filename,
|
||||
format('{}-{}-01',split(filename, '/')[-4],split(filename, '/')[-3])::date as ingest_date
|
||||
FROM raw.psd_alldata
|
||||
left join raw.psd_commodity_codes using (commodity_code)
|
||||
left join raw.psd_unit_of_measure_codes using (unit_id)
|
||||
left join raw.psd_attribute_codes using (attribute_id)
|
||||
@@ -1,64 +0,0 @@
|
||||
/*
|
||||
* Silver layer: Pivots the raw PSD data into a wide format,
|
||||
* with key attributes ('Production', 'Imports', etc.) as columns.
|
||||
* This is equivalent to step 2 of the Python script 03_Extraction.
|
||||
*/
|
||||
MODEL (
|
||||
name transform.sqlmesh_materia.models.staging.stg_psd_alldata_1_filter_silver_layer,
|
||||
kind INCREMENTAL_BY_TIME_RANGE (
|
||||
time_column ingest_date
|
||||
),
|
||||
start '2006-08-01',
|
||||
cron '@daily'
|
||||
);
|
||||
|
||||
SELECT
|
||||
commodity_code,
|
||||
commodity_name,
|
||||
country_code,
|
||||
country_name,
|
||||
ingest_date,
|
||||
-- Replicate the Python script's pivot by using conditional aggregation
|
||||
-- This creates a single row for each commodity-country-date combination
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Production' THEN value END), 0) AS Production,
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Imports' THEN value END), 0) AS Imports,
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Exports' THEN value END), 0) AS Exports,
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Total Distribution' THEN value END), 0) AS Total_Distribution,
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Ending Stocks' THEN value END), 0) AS Ending_Stocks,
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Beginning Stocks' THEN value END), 0) AS Beginning_Stocks,
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Total Supply' THEN value END), 0) AS Total_Supply,
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Consumption' THEN value END), 0) AS Domestic_Consumption,
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Demand' THEN value END), 0) AS Domestic_Demand,
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Food Use' THEN value END), 0) AS Food_Use,
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Industrial Use' THEN value END), 0) AS Industrial_Use,
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Seed Use' THEN value END), 0) AS Seed_Use,
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Waste' THEN value END), 0) AS Waste,
|
||||
COALESCE(SUM(CASE WHEN attribute_name = 'Feed Use' THEN value END), 0) AS Feed_Use
|
||||
FROM transform.sqlmesh_materia.models.staging.stg_psd_alldata_0
|
||||
-- Filter for the specific attributes used in the pivot table for efficiency
|
||||
WHERE attribute_name IN (
|
||||
'Production',
|
||||
'Imports',
|
||||
'Exports',
|
||||
'Total Distribution',
|
||||
'Ending Stocks',
|
||||
'Beginning Stocks',
|
||||
'Total Supply',
|
||||
'Domestic Consumption',
|
||||
'Domestic Demand',
|
||||
'Food Use',
|
||||
'Industrial Use',
|
||||
'Seed Use',
|
||||
'Waste',
|
||||
'Feed Use'
|
||||
)
|
||||
GROUP BY
|
||||
commodity_code,
|
||||
commodity_name,
|
||||
country_code,
|
||||
country_name,
|
||||
ingest_date
|
||||
ORDER BY
|
||||
commodity_name,
|
||||
country_name,
|
||||
ingest_date;
|
||||
@@ -1,110 +0,0 @@
|
||||
/*
|
||||
* Gold layer: Calculates derived metrics like Net Supply, Trade Balance,
|
||||
* and Stock-to-Use Ratio based on the pivoted silver layer data.
|
||||
* This also includes the global aggregates, mimicking steps 3 and 4
|
||||
* of the Python script 03_Extraction.
|
||||
*/
|
||||
MODEL (
|
||||
name transform.sqlmesh_materia.models.staging.stg_psd_alldata_2_filter_gold_layer,
|
||||
kind INCREMENTAL_BY_TIME_RANGE (
|
||||
time_column ingest_date
|
||||
),
|
||||
start '2006-08-01',
|
||||
cron '@daily'
|
||||
);
|
||||
|
||||
-- CTE to calculate country-level derived metrics
|
||||
WITH country_metrics AS (
|
||||
SELECT
|
||||
commodity_code,
|
||||
commodity_name,
|
||||
country_code,
|
||||
country_name,
|
||||
ingest_date,
|
||||
Production,
|
||||
Imports,
|
||||
Exports,
|
||||
Total_Distribution,
|
||||
Ending_Stocks,
|
||||
-- Derived metrics per country, mirroring Python script
|
||||
(Production + Imports - Exports) AS Net_Supply,
|
||||
(Exports - Imports) AS Trade_Balance,
|
||||
(Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance,
|
||||
-- Handle division by zero for Stock-to-Use Ratio
|
||||
(Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct,
|
||||
-- Calculate Production YoY percentage change using a window function
|
||||
(Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY ingest_date), 0) * 100 AS Production_YoY_pct
|
||||
FROM transform.sqlmesh_materia.models.staging.stg_psd_alldata_1_filter_silver_layer
|
||||
),
|
||||
-- CTE to calculate global aggregates by summing up country-level data
|
||||
global_aggregates AS (
|
||||
SELECT
|
||||
commodity_code,
|
||||
commodity_name,
|
||||
NULL::TEXT AS country_code, -- Use NULL for global aggregates
|
||||
'Global' AS country_name,
|
||||
ingest_date,
|
||||
SUM(Production) AS Production,
|
||||
SUM(Imports) AS Imports,
|
||||
SUM(Exports) AS Exports,
|
||||
SUM(Total_Distribution) AS Total_Distribution,
|
||||
SUM(Ending_Stocks) AS Ending_Stocks
|
||||
FROM transform.sqlmesh_materia.models.staging.stg_psd_alldata_1_filter_silver_layer
|
||||
GROUP BY
|
||||
commodity_code,
|
||||
commodity_name,
|
||||
ingest_date
|
||||
),
|
||||
-- CTE to calculate derived metrics for global aggregates
|
||||
global_metrics AS (
|
||||
SELECT
|
||||
commodity_code,
|
||||
commodity_name,
|
||||
country_code,
|
||||
country_name,
|
||||
ingest_date,
|
||||
Production,
|
||||
Imports,
|
||||
Exports,
|
||||
Total_Distribution,
|
||||
Ending_Stocks,
|
||||
(Production + Imports - Exports) AS Net_Supply,
|
||||
(Exports - Imports) AS Trade_Balance,
|
||||
(Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance,
|
||||
(Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct,
|
||||
(Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY ingest_date), 0) * 100 AS Production_YoY_pct
|
||||
FROM global_aggregates
|
||||
)
|
||||
-- Combine country-level and global-level data into a single output
|
||||
SELECT
|
||||
hkey,
|
||||
commodity_code,
|
||||
commodity_name,
|
||||
country_code,
|
||||
country_name,
|
||||
ingest_date,
|
||||
Production,
|
||||
Imports,
|
||||
Exports,
|
||||
Total_Distribution,
|
||||
Ending_Stocks,
|
||||
Net_Supply,
|
||||
Trade_Balance,
|
||||
Supply_Demand_Balance,
|
||||
Stock_to_Use_Ratio_pct,
|
||||
Production_YoY_pct
|
||||
FROM (
|
||||
SELECT
|
||||
@GENERATE_SURROGATE_KEY(commodity_code, country_code, ingest_date) AS hkey,
|
||||
*
|
||||
FROM country_metrics
|
||||
UNION ALL
|
||||
SELECT
|
||||
@GENERATE_SURROGATE_KEY(commodity_code, country_name, ingest_date) AS hkey,
|
||||
*
|
||||
FROM global_metrics
|
||||
) AS combined_data
|
||||
ORDER BY
|
||||
commodity_name,
|
||||
country_name,
|
||||
ingest_date;
|
||||
Reference in New Issue
Block a user