Update 3 files
- /notebooks/03_Extraction.ipynb - /transform/sqlmesh_materia/models/staging/stg_psd_alldata_1_filter_silver_layer.sql - /transform/sqlmesh_materia/models/staging/stg_psd_alldata_2_filter_gold_layer.sql
This commit is contained in:
@@ -24,7 +24,7 @@
|
|||||||
"\n",
|
"\n",
|
||||||
"data = \"../data/\"\n",
|
"data = \"../data/\"\n",
|
||||||
"df = pd.read_csv(\"../data/psd_alldata.csv\", encoding=\"latin1\")\n",
|
"df = pd.read_csv(\"../data/psd_alldata.csv\", encoding=\"latin1\")\n",
|
||||||
"\"\"\"\n",
|
"\n",
|
||||||
"df.rename(columns={\n",
|
"df.rename(columns={\n",
|
||||||
" 'Commodity_Description': 'commodity',\n",
|
" 'Commodity_Description': 'commodity',\n",
|
||||||
" 'Country_Name': 'country',\n",
|
" 'Country_Name': 'country',\n",
|
||||||
@@ -115,8 +115,7 @@
|
|||||||
" 'Total Distribution', 'Ending Stocks', 'Net Supply',\n",
|
" 'Total Distribution', 'Ending Stocks', 'Net Supply',\n",
|
||||||
" 'Supply-Demand Balance', 'Stock-to-Use Ratio (%)']]\n",
|
" 'Supply-Demand Balance', 'Stock-to-Use Ratio (%)']]\n",
|
||||||
"combined_global.to_csv(\"global_summary_all.csv\", index=False)\n",
|
"combined_global.to_csv(\"global_summary_all.csv\", index=False)\n",
|
||||||
"print(\"🌐 Combined global summary saved as 'global_summary_all.csv'\")\n",
|
"print(\"🌐 Combined global summary saved as 'global_summary_all.csv'\")\n"
|
||||||
"\"\"\""
|
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -0,0 +1,64 @@
|
|||||||
|
/*
|
||||||
|
* Silver layer: Pivots the raw PSD data into a wide format,
|
||||||
|
* with key attributes ('Production', 'Imports', etc.) as columns.
|
||||||
|
* This is equivalent to step 2 of the Python script 03_Extraction.
|
||||||
|
*/
|
||||||
|
MODEL (
|
||||||
|
name transform.sqlmesh_materia.models.staging.stg_psd_alldata_1_filter_silver_layer,
|
||||||
|
kind INCREMENTAL_BY_TIME_RANGE (
|
||||||
|
time_column ingest_date
|
||||||
|
),
|
||||||
|
start '2006-08-01',
|
||||||
|
cron '@daily'
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
commodity_code,
|
||||||
|
commodity_name,
|
||||||
|
country_code,
|
||||||
|
country_name,
|
||||||
|
ingest_date,
|
||||||
|
-- Replicate the Python script's pivot by using conditional aggregation
|
||||||
|
-- This creates a single row for each commodity-country-date combination
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Production' THEN value END), 0) AS Production,
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Imports' THEN value END), 0) AS Imports,
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Exports' THEN value END), 0) AS Exports,
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Total Distribution' THEN value END), 0) AS Total_Distribution,
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Ending Stocks' THEN value END), 0) AS Ending_Stocks,
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Beginning Stocks' THEN value END), 0) AS Beginning_Stocks,
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Total Supply' THEN value END), 0) AS Total_Supply,
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Consumption' THEN value END), 0) AS Domestic_Consumption,
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Demand' THEN value END), 0) AS Domestic_Demand,
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Food Use' THEN value END), 0) AS Food_Use,
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Industrial Use' THEN value END), 0) AS Industrial_Use,
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Seed Use' THEN value END), 0) AS Seed_Use,
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Waste' THEN value END), 0) AS Waste,
|
||||||
|
COALESCE(SUM(CASE WHEN attribute_name = 'Feed Use' THEN value END), 0) AS Feed_Use
|
||||||
|
FROM transform.sqlmesh_materia.models.staging.stg_psd_alldata_0
|
||||||
|
-- Filter for the specific attributes used in the pivot table for efficiency
|
||||||
|
WHERE attribute_name IN (
|
||||||
|
'Production',
|
||||||
|
'Imports',
|
||||||
|
'Exports',
|
||||||
|
'Total Distribution',
|
||||||
|
'Ending Stocks',
|
||||||
|
'Beginning Stocks',
|
||||||
|
'Total Supply',
|
||||||
|
'Domestic Consumption',
|
||||||
|
'Domestic Demand',
|
||||||
|
'Food Use',
|
||||||
|
'Industrial Use',
|
||||||
|
'Seed Use',
|
||||||
|
'Waste',
|
||||||
|
'Feed Use'
|
||||||
|
)
|
||||||
|
GROUP BY
|
||||||
|
commodity_code,
|
||||||
|
commodity_name,
|
||||||
|
country_code,
|
||||||
|
country_name,
|
||||||
|
ingest_date
|
||||||
|
ORDER BY
|
||||||
|
commodity_name,
|
||||||
|
country_name,
|
||||||
|
ingest_date;
|
||||||
@@ -0,0 +1,110 @@
|
|||||||
|
/*
|
||||||
|
* Gold layer: Calculates derived metrics like Net Supply, Trade Balance,
|
||||||
|
* and Stock-to-Use Ratio based on the pivoted silver layer data.
|
||||||
|
* This also includes the global aggregates, mimicking steps 3 and 4
|
||||||
|
* of the Python script 03_Extraction.
|
||||||
|
*/
|
||||||
|
MODEL (
|
||||||
|
name transform.sqlmesh_materia.models.staging.stg_psd_alldata_2_filter_gold_layer,
|
||||||
|
kind INCREMENTAL_BY_TIME_RANGE (
|
||||||
|
time_column ingest_date
|
||||||
|
),
|
||||||
|
start '2006-08-01',
|
||||||
|
cron '@daily'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- CTE to calculate country-level derived metrics
|
||||||
|
WITH country_metrics AS (
|
||||||
|
SELECT
|
||||||
|
commodity_code,
|
||||||
|
commodity_name,
|
||||||
|
country_code,
|
||||||
|
country_name,
|
||||||
|
ingest_date,
|
||||||
|
Production,
|
||||||
|
Imports,
|
||||||
|
Exports,
|
||||||
|
Total_Distribution,
|
||||||
|
Ending_Stocks,
|
||||||
|
-- Derived metrics per country, mirroring Python script
|
||||||
|
(Production + Imports - Exports) AS Net_Supply,
|
||||||
|
(Exports - Imports) AS Trade_Balance,
|
||||||
|
(Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance,
|
||||||
|
-- Handle division by zero for Stock-to-Use Ratio
|
||||||
|
(Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct,
|
||||||
|
-- Calculate Production YoY percentage change using a window function
|
||||||
|
(Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY ingest_date), 0) * 100 AS Production_YoY_pct
|
||||||
|
FROM transform.sqlmesh_materia.models.staging.stg_psd_alldata_1_filter_silver_layer
|
||||||
|
),
|
||||||
|
-- CTE to calculate global aggregates by summing up country-level data
|
||||||
|
global_aggregates AS (
|
||||||
|
SELECT
|
||||||
|
commodity_code,
|
||||||
|
commodity_name,
|
||||||
|
NULL::TEXT AS country_code, -- Use NULL for global aggregates
|
||||||
|
'Global' AS country_name,
|
||||||
|
ingest_date,
|
||||||
|
SUM(Production) AS Production,
|
||||||
|
SUM(Imports) AS Imports,
|
||||||
|
SUM(Exports) AS Exports,
|
||||||
|
SUM(Total_Distribution) AS Total_Distribution,
|
||||||
|
SUM(Ending_Stocks) AS Ending_Stocks
|
||||||
|
FROM transform.sqlmesh_materia.models.staging.stg_psd_alldata_1_filter_silver_layer
|
||||||
|
GROUP BY
|
||||||
|
commodity_code,
|
||||||
|
commodity_name,
|
||||||
|
ingest_date
|
||||||
|
),
|
||||||
|
-- CTE to calculate derived metrics for global aggregates
|
||||||
|
global_metrics AS (
|
||||||
|
SELECT
|
||||||
|
commodity_code,
|
||||||
|
commodity_name,
|
||||||
|
country_code,
|
||||||
|
country_name,
|
||||||
|
ingest_date,
|
||||||
|
Production,
|
||||||
|
Imports,
|
||||||
|
Exports,
|
||||||
|
Total_Distribution,
|
||||||
|
Ending_Stocks,
|
||||||
|
(Production + Imports - Exports) AS Net_Supply,
|
||||||
|
(Exports - Imports) AS Trade_Balance,
|
||||||
|
(Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance,
|
||||||
|
(Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct,
|
||||||
|
(Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY ingest_date), 0) * 100 AS Production_YoY_pct
|
||||||
|
FROM global_aggregates
|
||||||
|
)
|
||||||
|
-- Combine country-level and global-level data into a single output
|
||||||
|
SELECT
|
||||||
|
hkey,
|
||||||
|
commodity_code,
|
||||||
|
commodity_name,
|
||||||
|
country_code,
|
||||||
|
country_name,
|
||||||
|
ingest_date,
|
||||||
|
Production,
|
||||||
|
Imports,
|
||||||
|
Exports,
|
||||||
|
Total_Distribution,
|
||||||
|
Ending_Stocks,
|
||||||
|
Net_Supply,
|
||||||
|
Trade_Balance,
|
||||||
|
Supply_Demand_Balance,
|
||||||
|
Stock_to_Use_Ratio_pct,
|
||||||
|
Production_YoY_pct
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
@GENERATE_SURROGATE_KEY(commodity_code, country_code, ingest_date) AS hkey,
|
||||||
|
*
|
||||||
|
FROM country_metrics
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
@GENERATE_SURROGATE_KEY(commodity_code, country_name, ingest_date) AS hkey,
|
||||||
|
*
|
||||||
|
FROM global_metrics
|
||||||
|
) AS combined_data
|
||||||
|
ORDER BY
|
||||||
|
commodity_name,
|
||||||
|
country_name,
|
||||||
|
ingest_date;
|
||||||
Reference in New Issue
Block a user