diff --git a/notebooks/03_Extraction.ipynb b/notebooks/03_Extraction.ipynb index 9f2ec50..cebf785 100644 --- a/notebooks/03_Extraction.ipynb +++ b/notebooks/03_Extraction.ipynb @@ -24,7 +24,7 @@ "\n", "data = \"../data/\"\n", "df = pd.read_csv(\"../data/psd_alldata.csv\", encoding=\"latin1\")\n", - "\"\"\"\n", + "\n", "df.rename(columns={\n", " 'Commodity_Description': 'commodity',\n", " 'Country_Name': 'country',\n", @@ -115,8 +115,7 @@ " 'Total Distribution', 'Ending Stocks', 'Net Supply',\n", " 'Supply-Demand Balance', 'Stock-to-Use Ratio (%)']]\n", "combined_global.to_csv(\"global_summary_all.csv\", index=False)\n", - "print(\"🌐 Combined global summary saved as 'global_summary_all.csv'\")\n", - "\"\"\"" + "print(\"🌐 Combined global summary saved as 'global_summary_all.csv'\")\n" ] }, { diff --git a/transform/sqlmesh_materia/models/staging/stg_psd_alldata_1_filter_silver_layer.sql b/transform/sqlmesh_materia/models/staging/stg_psd_alldata_1_filter_silver_layer.sql index e69de29..0d77482 100644 --- a/transform/sqlmesh_materia/models/staging/stg_psd_alldata_1_filter_silver_layer.sql +++ b/transform/sqlmesh_materia/models/staging/stg_psd_alldata_1_filter_silver_layer.sql @@ -0,0 +1,64 @@ +/* + * Silver layer: Pivots the raw PSD data into a wide format, + * with key attributes ('Production', 'Imports', etc.) as columns. + * This is equivalent to step 2 of the Python script 03_Extraction. + */ +MODEL ( + name transform.sqlmesh_materia.models.staging.stg_psd_alldata_1_filter_silver_layer, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ingest_date + ), + start '2006-08-01', + cron '@daily' +); + +SELECT + commodity_code, + commodity_name, + country_code, + country_name, + ingest_date, + -- Replicate the Python script's pivot by using conditional aggregation + -- This creates a single row for each commodity-country-date combination + COALESCE(SUM(CASE WHEN attribute_name = 'Production' THEN value END), 0) AS Production, + COALESCE(SUM(CASE WHEN attribute_name = 'Imports' THEN value END), 0) AS Imports, + COALESCE(SUM(CASE WHEN attribute_name = 'Exports' THEN value END), 0) AS Exports, + COALESCE(SUM(CASE WHEN attribute_name = 'Total Distribution' THEN value END), 0) AS Total_Distribution, + COALESCE(SUM(CASE WHEN attribute_name = 'Ending Stocks' THEN value END), 0) AS Ending_Stocks, + COALESCE(SUM(CASE WHEN attribute_name = 'Beginning Stocks' THEN value END), 0) AS Beginning_Stocks, + COALESCE(SUM(CASE WHEN attribute_name = 'Total Supply' THEN value END), 0) AS Total_Supply, + COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Consumption' THEN value END), 0) AS Domestic_Consumption, + COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Demand' THEN value END), 0) AS Domestic_Demand, + COALESCE(SUM(CASE WHEN attribute_name = 'Food Use' THEN value END), 0) AS Food_Use, + COALESCE(SUM(CASE WHEN attribute_name = 'Industrial Use' THEN value END), 0) AS Industrial_Use, + COALESCE(SUM(CASE WHEN attribute_name = 'Seed Use' THEN value END), 0) AS Seed_Use, + COALESCE(SUM(CASE WHEN attribute_name = 'Waste' THEN value END), 0) AS Waste, + COALESCE(SUM(CASE WHEN attribute_name = 'Feed Use' THEN value END), 0) AS Feed_Use +FROM transform.sqlmesh_materia.models.staging.stg_psd_alldata_0 +-- Filter for the specific attributes used in the pivot table for efficiency +WHERE attribute_name IN ( + 'Production', + 'Imports', + 'Exports', + 'Total Distribution', + 'Ending Stocks', + 'Beginning Stocks', + 'Total Supply', + 'Domestic Consumption', + 'Domestic Demand', + 'Food Use', + 'Industrial Use', + 'Seed Use', + 'Waste', + 'Feed Use' + ) +GROUP BY + commodity_code, + commodity_name, + country_code, + country_name, + ingest_date +ORDER BY + commodity_name, + country_name, + ingest_date; \ No newline at end of file diff --git a/transform/sqlmesh_materia/models/staging/stg_psd_alldata_2_filter_gold_layer.sql b/transform/sqlmesh_materia/models/staging/stg_psd_alldata_2_filter_gold_layer.sql index e69de29..ccf5ca7 100644 --- a/transform/sqlmesh_materia/models/staging/stg_psd_alldata_2_filter_gold_layer.sql +++ b/transform/sqlmesh_materia/models/staging/stg_psd_alldata_2_filter_gold_layer.sql @@ -0,0 +1,110 @@ +/* + * Gold layer: Calculates derived metrics like Net Supply, Trade Balance, + * and Stock-to-Use Ratio based on the pivoted silver layer data. + * This also includes the global aggregates, mimicking steps 3 and 4 + * of the Python script 03_Extraction. + */ +MODEL ( + name transform.sqlmesh_materia.models.staging.stg_psd_alldata_2_filter_gold_layer, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ingest_date + ), + start '2006-08-01', + cron '@daily' +); + +-- CTE to calculate country-level derived metrics +WITH country_metrics AS ( + SELECT + commodity_code, + commodity_name, + country_code, + country_name, + ingest_date, + Production, + Imports, + Exports, + Total_Distribution, + Ending_Stocks, + -- Derived metrics per country, mirroring Python script + (Production + Imports - Exports) AS Net_Supply, + (Exports - Imports) AS Trade_Balance, + (Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance, + -- Handle division by zero for Stock-to-Use Ratio + (Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct, + -- Calculate Production YoY percentage change using a window function + (Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY ingest_date), 0) * 100 AS Production_YoY_pct + FROM transform.sqlmesh_materia.models.staging.stg_psd_alldata_1_filter_silver_layer +), +-- CTE to calculate global aggregates by summing up country-level data +global_aggregates AS ( + SELECT + commodity_code, + commodity_name, + NULL::TEXT AS country_code, -- Use NULL for global aggregates + 'Global' AS country_name, + ingest_date, + SUM(Production) AS Production, + SUM(Imports) AS Imports, + SUM(Exports) AS Exports, + SUM(Total_Distribution) AS Total_Distribution, + SUM(Ending_Stocks) AS Ending_Stocks + FROM transform.sqlmesh_materia.models.staging.stg_psd_alldata_1_filter_silver_layer + GROUP BY + commodity_code, + commodity_name, + ingest_date +), +-- CTE to calculate derived metrics for global aggregates +global_metrics AS ( + SELECT + commodity_code, + commodity_name, + country_code, + country_name, + ingest_date, + Production, + Imports, + Exports, + Total_Distribution, + Ending_Stocks, + (Production + Imports - Exports) AS Net_Supply, + (Exports - Imports) AS Trade_Balance, + (Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance, + (Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct, + (Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY ingest_date), 0) * 100 AS Production_YoY_pct + FROM global_aggregates +) +-- Combine country-level and global-level data into a single output +SELECT + hkey, + commodity_code, + commodity_name, + country_code, + country_name, + ingest_date, + Production, + Imports, + Exports, + Total_Distribution, + Ending_Stocks, + Net_Supply, + Trade_Balance, + Supply_Demand_Balance, + Stock_to_Use_Ratio_pct, + Production_YoY_pct +FROM ( + SELECT + @GENERATE_SURROGATE_KEY(commodity_code, country_code, ingest_date) AS hkey, + * + FROM country_metrics + UNION ALL + SELECT + @GENERATE_SURROGATE_KEY(commodity_code, country_name, ingest_date) AS hkey, + * + FROM global_metrics +) AS combined_data +ORDER BY + commodity_name, + country_name, + ingest_date; \ No newline at end of file