merge(pipeline-lineage): conform geographic dimension hierarchy via city_slug

This commit is contained in:
Deeman
2026-02-27 13:31:44 +01:00
8 changed files with 14 additions and 7 deletions

View File

@@ -34,6 +34,7 @@ SELECT
v.tenant_id, v.tenant_id,
v.country_code, v.country_code,
v.city, v.city,
v.city_slug,
cc.active_court_count, cc.active_court_count,
ROUND(wh.hours_open_per_week, 1) AS hours_open_per_week, ROUND(wh.hours_open_per_week, 1) AS hours_open_per_week,
ROUND(wh.avg_hours_open_per_day, 1) AS avg_hours_open_per_day, ROUND(wh.avg_hours_open_per_day, 1) AS avg_hours_open_per_day,
@@ -42,6 +43,6 @@ SELECT
ROUND(cc.active_court_count * wh.avg_hours_open_per_day, 1) AS capacity_court_hours_per_day, ROUND(cc.active_court_count * wh.avg_hours_open_per_day, 1) AS capacity_court_hours_per_day,
-- Total bookable court-hours per week -- Total bookable court-hours per week
ROUND(cc.active_court_count * wh.hours_open_per_week, 1) AS capacity_court_hours_per_week ROUND(cc.active_court_count * wh.hours_open_per_week, 1) AS capacity_court_hours_per_week
FROM staging.stg_playtomic_venues v FROM foundation.dim_venues v
JOIN court_counts cc ON v.tenant_id = cc.tenant_id JOIN court_counts cc ON v.tenant_id = cc.tenant_id
JOIN weekly_hours wh ON v.tenant_id = wh.tenant_id JOIN weekly_hours wh ON v.tenant_id = wh.tenant_id

View File

@@ -98,6 +98,8 @@ SELECT
court_count, court_count,
indoor_court_count, indoor_court_count,
outdoor_court_count, outdoor_court_count,
-- Conformed city key: enables deterministic joins to dim_cities / venue_pricing_benchmarks
LOWER(REGEXP_REPLACE(LOWER(COALESCE(city, '')), '[^a-z0-9]+', '-')) AS city_slug,
extracted_date extracted_date
FROM ranked FROM ranked
QUALIFY ROW_NUMBER() OVER ( QUALIFY ROW_NUMBER() OVER (

View File

@@ -44,6 +44,7 @@ SELECT
sa.tenant_id, sa.tenant_id,
cap.country_code, cap.country_code,
cap.city, cap.city,
cap.city_slug,
cap.active_court_count, cap.active_court_count,
cap.capacity_court_hours_per_day, cap.capacity_court_hours_per_day,
sa.available_slot_count, sa.available_slot_count,

View File

@@ -57,7 +57,7 @@ WITH base AS (
FROM foundation.dim_cities c FROM foundation.dim_cities c
LEFT JOIN serving.venue_pricing_benchmarks vpb LEFT JOIN serving.venue_pricing_benchmarks vpb
ON c.country_code = vpb.country_code ON c.country_code = vpb.country_code
AND LOWER(TRIM(c.city_name)) = LOWER(TRIM(vpb.city)) AND c.city_slug = vpb.city_slug
WHERE c.padel_venue_count > 0 WHERE c.padel_venue_count > 0
), ),
scored AS ( scored AS (

View File

@@ -21,6 +21,7 @@ city_benchmarks AS (
SELECT SELECT
country_code, country_code,
city, city,
city_slug,
median_peak_rate, median_peak_rate,
median_offpeak_rate, median_offpeak_rate,
median_occupancy_rate, median_occupancy_rate,
@@ -128,7 +129,7 @@ SELECT
FROM city_profiles cp FROM city_profiles cp
LEFT JOIN city_benchmarks cb LEFT JOIN city_benchmarks cb
ON cp.country_code = cb.country_code ON cp.country_code = cb.country_code
AND LOWER(TRIM(cp.city_name)) = LOWER(TRIM(cb.city)) AND cp.city_slug = cb.city_slug
LEFT JOIN country_benchmarks ctb LEFT JOIN country_benchmarks ctb
ON cp.country_code = ctb.country_code ON cp.country_code = ctb.country_code
LEFT JOIN hardcoded_fallbacks hf LEFT JOIN hardcoded_fallbacks hf

View File

@@ -41,6 +41,6 @@ FROM serving.venue_pricing_benchmarks vpb
-- Join city_market_profile to get the canonical city_slug and country metadata -- Join city_market_profile to get the canonical city_slug and country metadata
INNER JOIN serving.city_market_profile c INNER JOIN serving.city_market_profile c
ON vpb.country_code = c.country_code ON vpb.country_code = c.country_code
AND LOWER(TRIM(vpb.city)) = LOWER(TRIM(c.city_name)) AND vpb.city_slug = c.city_slug
-- Only cities with enough venues for meaningful pricing statistics -- Only cities with enough venues for meaningful pricing statistics
WHERE vpb.venue_count >= 2 WHERE vpb.venue_count >= 2

View File

@@ -17,6 +17,7 @@ WITH venue_stats AS (
da.tenant_id, da.tenant_id,
da.country_code, da.country_code,
da.city, da.city,
da.city_slug,
da.price_currency, da.price_currency,
AVG(da.occupancy_rate) AS avg_occupancy_rate, AVG(da.occupancy_rate) AS avg_occupancy_rate,
MEDIAN(da.median_price) AS median_hourly_rate, MEDIAN(da.median_price) AS median_hourly_rate,
@@ -29,12 +30,13 @@ WITH venue_stats AS (
WHERE TRY_CAST(da.snapshot_date AS DATE) >= CURRENT_DATE - INTERVAL '30 days' WHERE TRY_CAST(da.snapshot_date AS DATE) >= CURRENT_DATE - INTERVAL '30 days'
AND da.occupancy_rate IS NOT NULL AND da.occupancy_rate IS NOT NULL
AND da.occupancy_rate BETWEEN 0 AND 1.5 AND da.occupancy_rate BETWEEN 0 AND 1.5
GROUP BY da.tenant_id, da.country_code, da.city, da.price_currency GROUP BY da.tenant_id, da.country_code, da.city, da.city_slug, da.price_currency
HAVING COUNT(DISTINCT da.snapshot_date) >= 3 HAVING COUNT(DISTINCT da.snapshot_date) >= 3
) )
SELECT SELECT
country_code, country_code,
city, city,
city_slug,
price_currency, price_currency,
COUNT(*) AS venue_count, COUNT(*) AS venue_count,
-- Pricing benchmarks -- Pricing benchmarks
@@ -54,4 +56,4 @@ SELECT
SUM(days_observed) AS total_venue_days_observed, SUM(days_observed) AS total_venue_days_observed,
CURRENT_DATE AS refreshed_date CURRENT_DATE AS refreshed_date
FROM venue_stats FROM venue_stats
GROUP BY country_code, city, price_currency GROUP BY country_code, city, city_slug, price_currency

View File

@@ -100,7 +100,7 @@ _DAG: dict[str, list[str]] = {
"stg_regional_income", "stg_income_usa", "stg_padel_courts", "stg_tennis_courts", "stg_regional_income", "stg_income_usa", "stg_padel_courts", "stg_tennis_courts",
], ],
"dim_venue_capacity": [ "dim_venue_capacity": [
"stg_playtomic_venues", "stg_playtomic_resources", "stg_playtomic_opening_hours", "dim_venues", "stg_playtomic_resources", "stg_playtomic_opening_hours",
], ],
"fct_availability_slot": ["stg_playtomic_availability"], "fct_availability_slot": ["stg_playtomic_availability"],
"fct_daily_availability": ["fct_availability_slot", "dim_venue_capacity"], "fct_daily_availability": ["fct_availability_slot", "dim_venue_capacity"],