diff --git a/transform/sqlmesh_materia/external_models.yaml b/transform/sqlmesh_materia/external_models.yaml new file mode 100644 index 0000000..fe51488 --- /dev/null +++ b/transform/sqlmesh_materia/external_models.yaml @@ -0,0 +1 @@ +[] diff --git a/transform/sqlmesh_materia/models/README.md b/transform/sqlmesh_materia/models/README.md new file mode 100644 index 0000000..f7eef27 --- /dev/null +++ b/transform/sqlmesh_materia/models/README.md @@ -0,0 +1,103 @@ +# Data Engineering Pipeline Layers & Naming Conventions + +This document outlines the standard layered architecture and model naming conventions for our data platform. Adhering to these standards is crucial for maintaining a clean, scalable, and understandable project. + +--- + +## Data Pipeline Layers + +Each layer has a distinct purpose, transforming data from its raw state into a curated, analysis-ready format. + +### 1. Raw Layer + +The initial landing zone for all data ingested from source systems. + +* **Purpose:** To create a permanent, immutable archive of source data. +* **Key Activities:** + * Data is ingested and stored in its original, unaltered format. + * Serves as the definitive source of truth, enabling reprocessing of the entire pipeline if needed. + * No transformations or schema enforcement occur at this stage. + +### 2. Staging Layer + +A workspace for initial data preparation and technical validation. + +* **Purpose:** To convert raw data into a structured, technically sound format. +* **Key Activities:** + * **Schema Application:** A schema is applied to the raw data. + * **Data Typing:** Columns are cast to their correct data types (e.g., string to timestamp, integer to decimal). + * **Basic Cleansing:** Handles technical errors like malformed records and standardizes null values. + +### 3. Cleaned Layer + +The integrated core of the data platform, designed to create a "single version of the facts." + +* **Purpose:** To integrate data from various sources into a unified, consistent, and historically accurate model. +* **Key Activities:** + * **Business Logic:** Complex business rules are applied to conform and validate the data. + * **Integration:** Data from different sources is combined using business keys. + * **Core Modeling:** Data is structured into a robust, integrated model (e.g., a Data Vault) that represents core business processes. + +### 4. Serving Layer + +The final, presentation-ready layer optimized for analytics, reporting, and business intelligence. + +* **Purpose:** To provide high-performance, easy-to-query data for end-users. +* **Key Activities:** + * **Analytics Modeling:** Data from the Cleaned Layer is transformed into user-friendly models, such as **Fact and Dimension tables** (star schemas). + * **Aggregation:** Key business metrics and KPIs are pre-calculated to accelerate queries. + * **Consumption:** This layer feeds dashboards, reports, and analytical tools. It is often loaded into a dedicated Data Warehouse for optimal performance. + +--- + +## Model Naming Conventions + +A consistent naming convention helps us understand a model's purpose at a glance. + +### Guiding Principles + +1. **Be Explicit:** Names should clearly state the layer, source, and entity. +2. **Be Consistent:** Use the same patterns and abbreviations everywhere. +3. **Use Prefixes:** Start filenames and model names with the layer to group them logically. + +### Layer-by-Layer Naming Scheme + +#### 1. Raw / Sources Layer +This layer is for defining sources, not models. The convention is to name the source after the system it comes from. +* **Source Name:** `[source_system]` (e.g., `salesforce`, `google_ads`) +* **Table Name:** `[original_table_name]` (e.g., `account`, `ads_performance`) + +#### 2. Staging Layer +Staging models have a 1:1 relationship with a source table. +* **Pattern:** `stg_[source_system]__[entity_name]` +* **Examples:** + * `stg_stripe__charges.sql` + * `stg_google_ads__campaigns.sql` + +#### 3. Cleaned Layer +This is the integration layer for building unified business entities or a Data Vault. +* **Pattern (Integrated Entity):** `cln_[entity_name]` +* **Pattern (Data Vault):** `cln_[vault_component]_[entity_name]` +* **Examples:** + * `cln_customers.sql` + * `cln_hub_customers.sql` + * `cln_sat_customer_details.sql` + +#### 4. Serving Layer +This layer contains business-friendly models for consumption. +* **Pattern (Dimension):** `dim_[entity_name]` +* **Pattern (Fact):** `fct_[business_process]` +* **Pattern (Aggregate):** `agg_[aggregation_description]` +* **Examples:** + * `dim_customers.sql` + * `fct_orders.sql` + * `agg_monthly_revenue_by_region.sql` + +### Summary Table + +| Layer | Purpose | Filename / Model Name Example | Notes | +| :------ | :---------------------- | :---------------------------------------- | :---------------------------------------------- | +| Raw | Source Declaration | `sources.yml` (for `stripe`, `charges`) | No models, just declarations. | +| Staging | Basic Cleansing & Typing | `stg_stripe__charges.sql` | 1:1 with source tables. | +| Cleaned | Integration & Core Models | `cln_customers.sql` or `cln_hub_customers.sql` | Integrates sources. Your Data Vault lives here. | +| Serving | Analytics & BI | `dim_customers.sql` or `fct_orders.sql` | Business-facing, optimized for queries. | diff --git a/transform/sqlmesh_materia/models/cleaned/cln_psdalldata__commodity_pivoted.sql b/transform/sqlmesh_materia/models/cleaned/cln_psdalldata__commodity_pivoted.sql new file mode 100644 index 0000000..1c16d0b --- /dev/null +++ b/transform/sqlmesh_materia/models/cleaned/cln_psdalldata__commodity_pivoted.sql @@ -0,0 +1,57 @@ +MODEL ( + name cleaned.psdalldata__commodity_pivoted, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ingest_date + ), + start '2006-08-01', + cron '@daily' +); + +SELECT + max(hkey) as hkey, + commodity_code, + commodity_name, + country_code, + country_name, + ingest_date, + COALESCE(SUM(CASE WHEN attribute_name = 'Production' THEN value END), 0) AS Production, + COALESCE(SUM(CASE WHEN attribute_name = 'Imports' THEN value END), 0) AS Imports, + COALESCE(SUM(CASE WHEN attribute_name = 'Exports' THEN value END), 0) AS Exports, + COALESCE(SUM(CASE WHEN attribute_name = 'Total Distribution' THEN value END), 0) AS Total_Distribution, + COALESCE(SUM(CASE WHEN attribute_name = 'Ending Stocks' THEN value END), 0) AS Ending_Stocks, + COALESCE(SUM(CASE WHEN attribute_name = 'Beginning Stocks' THEN value END), 0) AS Beginning_Stocks, + COALESCE(SUM(CASE WHEN attribute_name = 'Total Supply' THEN value END), 0) AS Total_Supply, + COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Consumption' THEN value END), 0) AS Domestic_Consumption, + COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Demand' THEN value END), 0) AS Domestic_Demand, + COALESCE(SUM(CASE WHEN attribute_name = 'Food Use' THEN value END), 0) AS Food_Use, + COALESCE(SUM(CASE WHEN attribute_name = 'Industrial Use' THEN value END), 0) AS Industrial_Use, + COALESCE(SUM(CASE WHEN attribute_name = 'Seed Use' THEN value END), 0) AS Seed_Use, + COALESCE(SUM(CASE WHEN attribute_name = 'Waste' THEN value END), 0) AS Waste, + COALESCE(SUM(CASE WHEN attribute_name = 'Feed Use' THEN value END), 0) AS Feed_Use +FROM staging.psdalldata__commodity +WHERE attribute_name IN ( + 'Production', + 'Imports', + 'Exports', + 'Total Distribution', + 'Ending Stocks', + 'Beginning Stocks', + 'Total Supply', + 'Domestic Consumption', + 'Domestic Demand', + 'Food Use', + 'Industrial Use', + 'Seed Use', + 'Waste', + 'Feed Use' + ) +GROUP BY + commodity_code, + commodity_name, + country_code, + country_name, + ingest_date +ORDER BY + commodity_name, + country_name, + ingest_date; diff --git a/transform/sqlmesh_materia/models/serving/obt_commodity_metrics.sql b/transform/sqlmesh_materia/models/serving/obt_commodity_metrics.sql new file mode 100644 index 0000000..943c5c5 --- /dev/null +++ b/transform/sqlmesh_materia/models/serving/obt_commodity_metrics.sql @@ -0,0 +1,100 @@ +MODEL ( + name serving.commodity_metrics, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ingest_date + ), + start '2006-08-01', + cron '@daily' +); + +-- CTE to calculate country-level derived metrics +WITH country_metrics AS ( + SELECT + commodity_code, + commodity_name, + country_code, + country_name, + ingest_date, + Production, + Imports, + Exports, + Total_Distribution, + Ending_Stocks, + -- Derived metrics per country, mirroring Python script + (Production + Imports - Exports) AS Net_Supply, + (Exports - Imports) AS Trade_Balance, + (Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance, + -- Handle division by zero for Stock-to-Use Ratio + (Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct, + -- Calculate Production YoY percentage change using a window function + (Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY ingest_date), 0) * 100 AS Production_YoY_pct + FROM cleaned.psdalldata__commodity_pivoted +), +global_aggregates AS ( + SELECT + commodity_code, + commodity_name, + NULL::TEXT AS country_code, -- Use NULL for global aggregates + 'Global' AS country_name, + ingest_date, + SUM(Production) AS Production, + SUM(Imports) AS Imports, + SUM(Exports) AS Exports, + SUM(Total_Distribution) AS Total_Distribution, + SUM(Ending_Stocks) AS Ending_Stocks + FROM cleaned.psdalldata__commodity_pivoted + GROUP BY + commodity_code, + commodity_name, + ingest_date +), +-- CTE to calculate derived metrics for global aggregates +global_metrics AS ( + SELECT + commodity_code, + commodity_name, + country_code, + country_name, + ingest_date, + Production, + Imports, + Exports, + Total_Distribution, + Ending_Stocks, + (Production + Imports - Exports) AS Net_Supply, + (Exports - Imports) AS Trade_Balance, + (Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance, + (Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct, + (Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY ingest_date), 0) * 100 AS Production_YoY_pct + FROM global_aggregates +) +-- Combine country-level and global-level data into a single output +SELECT + commodity_code, + commodity_name, + country_code, + country_name, + ingest_date, + Production, + Imports, + Exports, + Total_Distribution, + Ending_Stocks, + Net_Supply, + Trade_Balance, + Supply_Demand_Balance, + Stock_to_Use_Ratio_pct, + Production_YoY_pct +FROM ( + SELECT + * + FROM country_metrics + UNION ALL + SELECT + * + FROM global_metrics +) AS combined_data +ORDER BY + commodity_name, + country_name, + ingest_date; diff --git a/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql b/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql new file mode 100644 index 0000000..bb1976e --- /dev/null +++ b/transform/sqlmesh_materia/models/staging/stg_psdalldata__commodity.sql @@ -0,0 +1,54 @@ +MODEL ( + name staging.psdalldata__commodity, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ingest_date + ), + start '2006-08-01', + cron '@daily' +); +with cast_dtypes as ( +SELECT + raw.psd_alldata.commodity_code::int as commodity_code, + coalesce(commodity_name, commodity_description) as commodity_name, + country_code::varchar(3) as country_code, + country_name, + market_year::int as market_year, + calendar_year::int as calendar_year, + month::int as month, + raw.psd_alldata.attribute_id::int as attribute_id, + coalesce(attribute_name, attribute_description) as attribute_name, + raw.psd_alldata.unit_id::int as unit_id, + coalesce(unit_name, unit_description) as unit_name, + value::float as value, + filename, + FROM raw.psd_alldata +left join raw.psd_commodity_codes on raw.psd_commodity_codes.commodity_code = raw.psd_alldata.commodity_code::int +left join raw.psd_unit_of_measure_codes on raw.psd_unit_of_measure_codes.unit_id = raw.psd_alldata.unit_id::int +left join raw.psd_attribute_codes on raw.psd_attribute_codes.attribute_id = raw.psd_alldata.attribute_id::int +), +metadata as ( +select *, + @GENERATE_SURROGATE_KEY(commodity_code, country_code, market_year, month, attribute_id) as hkey, + make_date(split(filename, '/')[-4]::int, split(filename, '/')[-3]::int, 1) as ingest_date, + if(month!=0,last_day(make_date(market_year, month, 1)),null) as market_date_month_end + from cast_dtypes +) +select hkey, + commodity_code, + commodity_name, + country_code, + country_name, + market_year, + calendar_year, + month, + attribute_id, + attribute_name, + unit_id, + unit_name, + value, + filename, + ingest_date, + market_date_month_end + from metadata +qualify row_number() over (partition by hkey) = 1 +where ingest_date between @start_ds and @end_ds;