Change layer naming

This commit is contained in:
Deeman
2025-09-10 18:46:18 +02:00
parent f5f2dbc7a5
commit 85704a4bf1
5 changed files with 315 additions and 0 deletions

View File

@@ -0,0 +1 @@
[]

View File

@@ -0,0 +1,103 @@
# Data Engineering Pipeline Layers & Naming Conventions
This document outlines the standard layered architecture and model naming conventions for our data platform. Adhering to these standards is crucial for maintaining a clean, scalable, and understandable project.
---
## Data Pipeline Layers
Each layer has a distinct purpose, transforming data from its raw state into a curated, analysis-ready format.
### 1. Raw Layer
The initial landing zone for all data ingested from source systems.
* **Purpose:** To create a permanent, immutable archive of source data.
* **Key Activities:**
* Data is ingested and stored in its original, unaltered format.
* Serves as the definitive source of truth, enabling reprocessing of the entire pipeline if needed.
* No transformations or schema enforcement occur at this stage.
### 2. Staging Layer
A workspace for initial data preparation and technical validation.
* **Purpose:** To convert raw data into a structured, technically sound format.
* **Key Activities:**
* **Schema Application:** A schema is applied to the raw data.
* **Data Typing:** Columns are cast to their correct data types (e.g., string to timestamp, integer to decimal).
* **Basic Cleansing:** Handles technical errors like malformed records and standardizes null values.
### 3. Cleaned Layer
The integrated core of the data platform, designed to create a "single version of the facts."
* **Purpose:** To integrate data from various sources into a unified, consistent, and historically accurate model.
* **Key Activities:**
* **Business Logic:** Complex business rules are applied to conform and validate the data.
* **Integration:** Data from different sources is combined using business keys.
* **Core Modeling:** Data is structured into a robust, integrated model (e.g., a Data Vault) that represents core business processes.
### 4. Serving Layer
The final, presentation-ready layer optimized for analytics, reporting, and business intelligence.
* **Purpose:** To provide high-performance, easy-to-query data for end-users.
* **Key Activities:**
* **Analytics Modeling:** Data from the Cleaned Layer is transformed into user-friendly models, such as **Fact and Dimension tables** (star schemas).
* **Aggregation:** Key business metrics and KPIs are pre-calculated to accelerate queries.
* **Consumption:** This layer feeds dashboards, reports, and analytical tools. It is often loaded into a dedicated Data Warehouse for optimal performance.
---
## Model Naming Conventions
A consistent naming convention helps us understand a model's purpose at a glance.
### Guiding Principles
1. **Be Explicit:** Names should clearly state the layer, source, and entity.
2. **Be Consistent:** Use the same patterns and abbreviations everywhere.
3. **Use Prefixes:** Start filenames and model names with the layer to group them logically.
### Layer-by-Layer Naming Scheme
#### 1. Raw / Sources Layer
This layer is for defining sources, not models. The convention is to name the source after the system it comes from.
* **Source Name:** `[source_system]` (e.g., `salesforce`, `google_ads`)
* **Table Name:** `[original_table_name]` (e.g., `account`, `ads_performance`)
#### 2. Staging Layer
Staging models have a 1:1 relationship with a source table.
* **Pattern:** `stg_[source_system]__[entity_name]`
* **Examples:**
* `stg_stripe__charges.sql`
* `stg_google_ads__campaigns.sql`
#### 3. Cleaned Layer
This is the integration layer for building unified business entities or a Data Vault.
* **Pattern (Integrated Entity):** `cln_[entity_name]`
* **Pattern (Data Vault):** `cln_[vault_component]_[entity_name]`
* **Examples:**
* `cln_customers.sql`
* `cln_hub_customers.sql`
* `cln_sat_customer_details.sql`
#### 4. Serving Layer
This layer contains business-friendly models for consumption.
* **Pattern (Dimension):** `dim_[entity_name]`
* **Pattern (Fact):** `fct_[business_process]`
* **Pattern (Aggregate):** `agg_[aggregation_description]`
* **Examples:**
* `dim_customers.sql`
* `fct_orders.sql`
* `agg_monthly_revenue_by_region.sql`
### Summary Table
| Layer | Purpose | Filename / Model Name Example | Notes |
| :------ | :---------------------- | :---------------------------------------- | :---------------------------------------------- |
| Raw | Source Declaration | `sources.yml` (for `stripe`, `charges`) | No models, just declarations. |
| Staging | Basic Cleansing & Typing | `stg_stripe__charges.sql` | 1:1 with source tables. |
| Cleaned | Integration & Core Models | `cln_customers.sql` or `cln_hub_customers.sql` | Integrates sources. Your Data Vault lives here. |
| Serving | Analytics & BI | `dim_customers.sql` or `fct_orders.sql` | Business-facing, optimized for queries. |

View File

@@ -0,0 +1,57 @@
MODEL (
name cleaned.psdalldata__commodity_pivoted,
kind INCREMENTAL_BY_TIME_RANGE (
time_column ingest_date
),
start '2006-08-01',
cron '@daily'
);
SELECT
max(hkey) as hkey,
commodity_code,
commodity_name,
country_code,
country_name,
ingest_date,
COALESCE(SUM(CASE WHEN attribute_name = 'Production' THEN value END), 0) AS Production,
COALESCE(SUM(CASE WHEN attribute_name = 'Imports' THEN value END), 0) AS Imports,
COALESCE(SUM(CASE WHEN attribute_name = 'Exports' THEN value END), 0) AS Exports,
COALESCE(SUM(CASE WHEN attribute_name = 'Total Distribution' THEN value END), 0) AS Total_Distribution,
COALESCE(SUM(CASE WHEN attribute_name = 'Ending Stocks' THEN value END), 0) AS Ending_Stocks,
COALESCE(SUM(CASE WHEN attribute_name = 'Beginning Stocks' THEN value END), 0) AS Beginning_Stocks,
COALESCE(SUM(CASE WHEN attribute_name = 'Total Supply' THEN value END), 0) AS Total_Supply,
COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Consumption' THEN value END), 0) AS Domestic_Consumption,
COALESCE(SUM(CASE WHEN attribute_name = 'Domestic Demand' THEN value END), 0) AS Domestic_Demand,
COALESCE(SUM(CASE WHEN attribute_name = 'Food Use' THEN value END), 0) AS Food_Use,
COALESCE(SUM(CASE WHEN attribute_name = 'Industrial Use' THEN value END), 0) AS Industrial_Use,
COALESCE(SUM(CASE WHEN attribute_name = 'Seed Use' THEN value END), 0) AS Seed_Use,
COALESCE(SUM(CASE WHEN attribute_name = 'Waste' THEN value END), 0) AS Waste,
COALESCE(SUM(CASE WHEN attribute_name = 'Feed Use' THEN value END), 0) AS Feed_Use
FROM staging.psdalldata__commodity
WHERE attribute_name IN (
'Production',
'Imports',
'Exports',
'Total Distribution',
'Ending Stocks',
'Beginning Stocks',
'Total Supply',
'Domestic Consumption',
'Domestic Demand',
'Food Use',
'Industrial Use',
'Seed Use',
'Waste',
'Feed Use'
)
GROUP BY
commodity_code,
commodity_name,
country_code,
country_name,
ingest_date
ORDER BY
commodity_name,
country_name,
ingest_date;

View File

@@ -0,0 +1,100 @@
MODEL (
name serving.commodity_metrics,
kind INCREMENTAL_BY_TIME_RANGE (
time_column ingest_date
),
start '2006-08-01',
cron '@daily'
);
-- CTE to calculate country-level derived metrics
WITH country_metrics AS (
SELECT
commodity_code,
commodity_name,
country_code,
country_name,
ingest_date,
Production,
Imports,
Exports,
Total_Distribution,
Ending_Stocks,
-- Derived metrics per country, mirroring Python script
(Production + Imports - Exports) AS Net_Supply,
(Exports - Imports) AS Trade_Balance,
(Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance,
-- Handle division by zero for Stock-to-Use Ratio
(Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct,
-- Calculate Production YoY percentage change using a window function
(Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code, country_code ORDER BY ingest_date), 0) * 100 AS Production_YoY_pct
FROM cleaned.psdalldata__commodity_pivoted
),
global_aggregates AS (
SELECT
commodity_code,
commodity_name,
NULL::TEXT AS country_code, -- Use NULL for global aggregates
'Global' AS country_name,
ingest_date,
SUM(Production) AS Production,
SUM(Imports) AS Imports,
SUM(Exports) AS Exports,
SUM(Total_Distribution) AS Total_Distribution,
SUM(Ending_Stocks) AS Ending_Stocks
FROM cleaned.psdalldata__commodity_pivoted
GROUP BY
commodity_code,
commodity_name,
ingest_date
),
-- CTE to calculate derived metrics for global aggregates
global_metrics AS (
SELECT
commodity_code,
commodity_name,
country_code,
country_name,
ingest_date,
Production,
Imports,
Exports,
Total_Distribution,
Ending_Stocks,
(Production + Imports - Exports) AS Net_Supply,
(Exports - Imports) AS Trade_Balance,
(Production + Imports - Exports) - Total_Distribution AS Supply_Demand_Balance,
(Ending_Stocks / NULLIF(Total_Distribution, 0)) * 100 AS Stock_to_Use_Ratio_pct,
(Production - LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY ingest_date)) / NULLIF(LAG(Production, 1, 0) OVER (PARTITION BY commodity_code ORDER BY ingest_date), 0) * 100 AS Production_YoY_pct
FROM global_aggregates
)
-- Combine country-level and global-level data into a single output
SELECT
commodity_code,
commodity_name,
country_code,
country_name,
ingest_date,
Production,
Imports,
Exports,
Total_Distribution,
Ending_Stocks,
Net_Supply,
Trade_Balance,
Supply_Demand_Balance,
Stock_to_Use_Ratio_pct,
Production_YoY_pct
FROM (
SELECT
*
FROM country_metrics
UNION ALL
SELECT
*
FROM global_metrics
) AS combined_data
ORDER BY
commodity_name,
country_name,
ingest_date;

View File

@@ -0,0 +1,54 @@
MODEL (
name staging.psdalldata__commodity,
kind INCREMENTAL_BY_TIME_RANGE (
time_column ingest_date
),
start '2006-08-01',
cron '@daily'
);
with cast_dtypes as (
SELECT
raw.psd_alldata.commodity_code::int as commodity_code,
coalesce(commodity_name, commodity_description) as commodity_name,
country_code::varchar(3) as country_code,
country_name,
market_year::int as market_year,
calendar_year::int as calendar_year,
month::int as month,
raw.psd_alldata.attribute_id::int as attribute_id,
coalesce(attribute_name, attribute_description) as attribute_name,
raw.psd_alldata.unit_id::int as unit_id,
coalesce(unit_name, unit_description) as unit_name,
value::float as value,
filename,
FROM raw.psd_alldata
left join raw.psd_commodity_codes on raw.psd_commodity_codes.commodity_code = raw.psd_alldata.commodity_code::int
left join raw.psd_unit_of_measure_codes on raw.psd_unit_of_measure_codes.unit_id = raw.psd_alldata.unit_id::int
left join raw.psd_attribute_codes on raw.psd_attribute_codes.attribute_id = raw.psd_alldata.attribute_id::int
),
metadata as (
select *,
@GENERATE_SURROGATE_KEY(commodity_code, country_code, market_year, month, attribute_id) as hkey,
make_date(split(filename, '/')[-4]::int, split(filename, '/')[-3]::int, 1) as ingest_date,
if(month!=0,last_day(make_date(market_year, month, 1)),null) as market_date_month_end
from cast_dtypes
)
select hkey,
commodity_code,
commodity_name,
country_code,
country_name,
market_year,
calendar_year,
month,
attribute_id,
attribute_name,
unit_id,
unit_name,
value,
filename,
ingest_date,
market_date_month_end
from metadata
qualify row_number() over (partition by hkey) = 1
where ingest_date between @start_ds and @end_ds;