diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql index a5ee10f..40ea9e9 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql @@ -2,22 +2,14 @@ -- One row per available 60-minute booking slot per court per venue per day. -- "Available" = the slot was NOT booked at capture time. Missing slots = booked. -- --- Reads BOTH morning snapshots and recheck files: --- Morning (new): availability_{date}.jsonl.gz → snapshot_type = 'morning' --- Morning (old): availability_{date}.json.gz → snapshot_type = 'morning' --- Recheck (new): availability_{date}_recheck_{HH}.jsonl.gz → snapshot_type = 'recheck' --- Recheck (old): availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck' +-- Reads morning snapshots and recheck files (JSONL format): +-- Morning: availability_{date}.jsonl.gz → snapshot_type = 'morning' +-- Recheck: availability_{date}_recheck_{HH}.jsonl.gz → snapshot_type = 'recheck' -- -- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit). -- Price parsed from strings like "14.56 EUR" or "48 GBP". -- --- Supports two morning landing formats (UNION ALL during migration): --- New: availability_{date}.jsonl.gz — one venue per line, columns: tenant_id, slots, date, captured_at_utc --- Old: availability_{date}.json.gz — {"date":..., "venues": [...]} blob (UNNEST required) --- --- Requires: at least one availability file in the landing zone. --- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz) --- with empty venues[] ensures this model runs before real data arrives. +-- Source: data/landing/playtomic/{year}/{month}/availability_*.jsonl.gz MODEL ( name staging.stg_playtomic_availability, @@ -27,7 +19,6 @@ MODEL ( ); WITH --- New format: one venue per JSONL line — no outer UNNEST needed morning_jsonl AS ( SELECT date AS snapshot_date, @@ -50,35 +41,6 @@ morning_jsonl AS ( WHERE filename NOT LIKE '%_recheck_%' AND tenant_id IS NOT NULL ), --- Old format: {"date":..., "venues": [...]} blob — kept for transition -morning_blob AS ( - SELECT - af.date AS snapshot_date, - af.captured_at_utc, - 'morning' AS snapshot_type, - NULL::INTEGER AS recheck_hour, - venue_json ->> 'tenant_id' AS tenant_id, - venue_json -> 'slots' AS slots_json - FROM ( - SELECT date, captured_at_utc, venues - FROM read_json( - @LANDING_DIR || '/playtomic/*/*/availability_*.json.gz', - format = 'auto', - columns = { - date: 'VARCHAR', - captured_at_utc: 'VARCHAR', - venues: 'JSON[]' - }, - filename = true, - maximum_object_size = 134217728 -- 128 MB; daily files grow with venue count - ) - WHERE filename NOT LIKE '%_recheck_%' - AND venues IS NOT NULL - AND json_array_length(venues) > 0 - ) af, - LATERAL UNNEST(af.venues) AS t(venue_json) -), --- Recheck snapshots (new JSONL format — one venue per line) recheck_jsonl AS ( SELECT date AS snapshot_date, @@ -101,43 +63,10 @@ recheck_jsonl AS ( ) WHERE tenant_id IS NOT NULL ), --- Recheck snapshots (old blob format, kept for transition) -recheck_blob AS ( - SELECT - rf.date AS snapshot_date, - rf.captured_at_utc, - 'recheck' AS snapshot_type, - TRY_CAST( - regexp_extract(rf.filename, '_recheck_(\d+)', 1) AS INTEGER - ) AS recheck_hour, - venue_json ->> 'tenant_id' AS tenant_id, - venue_json -> 'slots' AS slots_json - FROM ( - SELECT date, captured_at_utc, venues, filename - FROM read_json( - @LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz', - format = 'auto', - columns = { - date: 'VARCHAR', - captured_at_utc: 'VARCHAR', - venues: 'JSON[]' - }, - filename = true, - maximum_object_size = 134217728 -- 128 MB; matches morning snapshot limit - ) - WHERE venues IS NOT NULL - AND json_array_length(venues) > 0 - ) rf, - LATERAL UNNEST(rf.venues) AS t(venue_json) -), all_venues AS ( SELECT * FROM morning_jsonl UNION ALL - SELECT * FROM morning_blob - UNION ALL SELECT * FROM recheck_jsonl - UNION ALL - SELECT * FROM recheck_blob ), raw_resources AS ( SELECT diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_opening_hours.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_opening_hours.sql index b6d6bcc..fc31a11 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_opening_hours.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_opening_hours.sql @@ -5,11 +5,7 @@ -- DuckDB auto-infers opening_hours as STRUCT, so we access each day by literal -- key (no dynamic access) and UNION ALL to unpivot. -- --- Supports two landing formats (UNION ALL during migration): --- New: tenants.jsonl.gz — one tenant per line, opening_hours is a top-level JSON column --- Old: tenants.json.gz — {"tenants": [...]} blob (UNNEST required) --- --- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz +-- Source: data/landing/playtomic/{year}/{month}/{day}/tenants.jsonl.gz MODEL ( name staging.stg_playtomic_opening_hours, @@ -19,40 +15,18 @@ MODEL ( ); WITH --- New format: one tenant per JSONL line -jsonl_venues AS ( +venues AS ( SELECT tenant_id, opening_hours AS oh FROM read_json( - @LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz', + @LANDING_DIR || '/playtomic/*/*/*/tenants.jsonl.gz', format = 'newline_delimited', columns = {tenant_id: 'VARCHAR', opening_hours: 'JSON'} ) WHERE tenant_id IS NOT NULL AND opening_hours IS NOT NULL ), --- Old format: blob -blob_venues AS ( - SELECT - tenant ->> 'tenant_id' AS tenant_id, - tenant -> 'opening_hours' AS oh - FROM ( - SELECT UNNEST(tenants) AS tenant - FROM read_json( - @LANDING_DIR || '/playtomic/*/*/tenants.json.gz', - format = 'auto', - maximum_object_size = 134217728 - ) - ) - WHERE (tenant ->> 'tenant_id') IS NOT NULL - AND (tenant -> 'opening_hours') IS NOT NULL -), -venues AS ( - SELECT * FROM jsonl_venues - UNION ALL - SELECT * FROM blob_venues -), -- Unpivot by UNION ALL — 7 literal key accesses unpivoted AS ( SELECT tenant_id, 'MONDAY' AS day_of_week, 1 AS day_number, @@ -104,6 +78,4 @@ SELECT FROM unpivoted WHERE opening_time IS NOT NULL AND closing_time IS NOT NULL --- Enforce grain: if both old blob and new JSONL exist for the same month, --- the UNION ALL produces duplicate (tenant_id, day_of_week) pairs — deduplicate. QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, day_of_week ORDER BY tenant_id) = 1 diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_resources.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_resources.sql index 6c9484f..6dfc94c 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_resources.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_resources.sql @@ -2,11 +2,7 @@ -- Reads resources array from the landing zone to extract court type, size, -- surface, and booking config. -- --- Supports two landing formats (UNION ALL during migration): --- New: tenants.jsonl.gz — one tenant per line, resources is a top-level JSON column --- Old: tenants.json.gz — {"tenants": [...]} blob (double UNNEST: tenants → resources) --- --- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz +-- Source: data/landing/playtomic/{year}/{month}/{day}/tenants.jsonl.gz MODEL ( name staging.stg_playtomic_resources, @@ -16,41 +12,18 @@ MODEL ( ); WITH --- New format: one tenant per JSONL line — single UNNEST for resources -jsonl_unnested AS ( +unnested AS ( SELECT tenant_id, UPPER(address ->> 'country_code') AS country_code, UNNEST(from_json(resources, '["JSON"]')) AS resource_json FROM read_json( - @LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz', + @LANDING_DIR || '/playtomic/*/*/*/tenants.jsonl.gz', format = 'newline_delimited', columns = {tenant_id: 'VARCHAR', address: 'JSON', resources: 'JSON'} ) WHERE tenant_id IS NOT NULL AND resources IS NOT NULL -), --- Old format: blob — double UNNEST (tenants → resources) -blob_unnested AS ( - SELECT - tenant ->> 'tenant_id' AS tenant_id, - UPPER(tenant -> 'address' ->> 'country_code') AS country_code, - UNNEST(from_json(tenant -> 'resources', '["JSON"]')) AS resource_json - FROM ( - SELECT UNNEST(tenants) AS tenant - FROM read_json( - @LANDING_DIR || '/playtomic/*/*/tenants.json.gz', - format = 'auto', - maximum_object_size = 134217728 - ) - ) - WHERE (tenant ->> 'tenant_id') IS NOT NULL - AND (tenant -> 'resources') IS NOT NULL -), -unnested AS ( - SELECT * FROM jsonl_unnested - UNION ALL - SELECT * FROM blob_unnested ) SELECT tenant_id, @@ -68,6 +41,4 @@ SELECT FROM unnested WHERE (resource_json ->> 'resource_id') IS NOT NULL AND (resource_json ->> 'sport_id') = 'PADEL' --- Enforce grain: if both old blob and new JSONL exist for the same month, --- the UNION ALL produces duplicate (tenant_id, resource_id) pairs — deduplicate. QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, resource_json ->> 'resource_id' ORDER BY tenant_id) = 1 diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql index 6240462..cd39015 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql @@ -3,11 +3,7 @@ -- including address, opening hours, court resources, VAT rate, and facilities. -- Deduplicates on tenant_id (keeps most recent extraction). -- --- Supports two landing formats (UNION ALL during migration): --- New: tenants.jsonl.gz — one tenant JSON object per line (no UNNEST needed) --- Old: tenants.json.gz — {"tenants": [{...}]} blob (UNNEST required) --- --- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz +-- Source: data/landing/playtomic/{year}/{month}/{day}/tenants.jsonl.gz MODEL ( name staging.stg_playtomic_venues, @@ -17,8 +13,7 @@ MODEL ( ); WITH --- New format: one tenant per JSONL line — no UNNEST, access columns directly -jsonl_parsed AS ( +parsed AS ( SELECT tenant_id, tenant_name, @@ -45,7 +40,7 @@ jsonl_parsed AS ( filename AS source_file, CURRENT_DATE AS extracted_date FROM read_json( - @LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz', + @LANDING_DIR || '/playtomic/*/*/*/tenants.jsonl.gz', format = 'newline_delimited', filename = true, columns = { @@ -59,49 +54,6 @@ jsonl_parsed AS ( ) WHERE tenant_id IS NOT NULL ), --- Old format: {"tenants": [...]} blob — keep for transition until old files rotate out -blob_parsed AS ( - SELECT - tenant ->> 'tenant_id' AS tenant_id, - tenant ->> 'tenant_name' AS tenant_name, - tenant ->> 'slug' AS slug, - tenant ->> 'tenant_type' AS tenant_type, - tenant ->> 'tenant_status' AS tenant_status, - tenant ->> 'playtomic_status' AS playtomic_status, - tenant ->> 'booking_type' AS booking_type, - tenant -> 'address' ->> 'street' AS street, - tenant -> 'address' ->> 'city' AS city, - tenant -> 'address' ->> 'postal_code' AS postal_code, - UPPER(tenant -> 'address' ->> 'country_code') AS country_code, - tenant -> 'address' ->> 'timezone' AS timezone, - tenant -> 'address' ->> 'administrative_area' AS administrative_area, - TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lat' AS DOUBLE) AS lat, - TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lon' AS DOUBLE) AS lon, - TRY_CAST(tenant ->> 'vat_rate' AS DOUBLE) AS vat_rate, - tenant ->> 'default_currency' AS default_currency, - TRY_CAST(tenant -> 'booking_settings' ->> 'booking_ahead_limit' AS INTEGER) AS booking_ahead_limit_minutes, - tenant -> 'opening_hours' AS opening_hours_json, - tenant -> 'resources' AS resources_json, - tenant ->> 'created_at' AS created_at, - tenant ->> 'is_playtomic_partner' AS is_playtomic_partner_raw, - filename AS source_file, - CURRENT_DATE AS extracted_date - FROM ( - SELECT UNNEST(tenants) AS tenant, filename - FROM read_json( - @LANDING_DIR || '/playtomic/*/*/tenants.json.gz', - format = 'auto', - filename = true, - maximum_object_size = 134217728 - ) - ) - WHERE (tenant ->> 'tenant_id') IS NOT NULL -), -parsed AS ( - SELECT * FROM jsonl_parsed - UNION ALL - SELECT * FROM blob_parsed -), deduped AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY tenant_id ORDER BY source_file DESC) AS rn