Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
85b6aa0d0a | ||
|
|
e62aad148b | ||
|
|
6fb1e990e3 | ||
|
|
6edf8ba65e |
@@ -2,22 +2,14 @@
|
|||||||
-- One row per available 60-minute booking slot per court per venue per day.
|
-- One row per available 60-minute booking slot per court per venue per day.
|
||||||
-- "Available" = the slot was NOT booked at capture time. Missing slots = booked.
|
-- "Available" = the slot was NOT booked at capture time. Missing slots = booked.
|
||||||
--
|
--
|
||||||
-- Reads BOTH morning snapshots and recheck files:
|
-- Reads morning snapshots and recheck files (JSONL format):
|
||||||
-- Morning (new): availability_{date}.jsonl.gz → snapshot_type = 'morning'
|
-- Morning: availability_{date}.jsonl.gz → snapshot_type = 'morning'
|
||||||
-- Morning (old): availability_{date}.json.gz → snapshot_type = 'morning'
|
-- Recheck: availability_{date}_recheck_{HH}.jsonl.gz → snapshot_type = 'recheck'
|
||||||
-- Recheck (new): availability_{date}_recheck_{HH}.jsonl.gz → snapshot_type = 'recheck'
|
|
||||||
-- Recheck (old): availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck'
|
|
||||||
--
|
--
|
||||||
-- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit).
|
-- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit).
|
||||||
-- Price parsed from strings like "14.56 EUR" or "48 GBP".
|
-- Price parsed from strings like "14.56 EUR" or "48 GBP".
|
||||||
--
|
--
|
||||||
-- Supports two morning landing formats (UNION ALL during migration):
|
-- Source: data/landing/playtomic/{year}/{month}/availability_*.jsonl.gz
|
||||||
-- New: availability_{date}.jsonl.gz — one venue per line, columns: tenant_id, slots, date, captured_at_utc
|
|
||||||
-- Old: availability_{date}.json.gz — {"date":..., "venues": [...]} blob (UNNEST required)
|
|
||||||
--
|
|
||||||
-- Requires: at least one availability file in the landing zone.
|
|
||||||
-- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz)
|
|
||||||
-- with empty venues[] ensures this model runs before real data arrives.
|
|
||||||
|
|
||||||
MODEL (
|
MODEL (
|
||||||
name staging.stg_playtomic_availability,
|
name staging.stg_playtomic_availability,
|
||||||
@@ -27,7 +19,6 @@ MODEL (
|
|||||||
);
|
);
|
||||||
|
|
||||||
WITH
|
WITH
|
||||||
-- New format: one venue per JSONL line — no outer UNNEST needed
|
|
||||||
morning_jsonl AS (
|
morning_jsonl AS (
|
||||||
SELECT
|
SELECT
|
||||||
date AS snapshot_date,
|
date AS snapshot_date,
|
||||||
@@ -50,35 +41,6 @@ morning_jsonl AS (
|
|||||||
WHERE filename NOT LIKE '%_recheck_%'
|
WHERE filename NOT LIKE '%_recheck_%'
|
||||||
AND tenant_id IS NOT NULL
|
AND tenant_id IS NOT NULL
|
||||||
),
|
),
|
||||||
-- Old format: {"date":..., "venues": [...]} blob — kept for transition
|
|
||||||
morning_blob AS (
|
|
||||||
SELECT
|
|
||||||
af.date AS snapshot_date,
|
|
||||||
af.captured_at_utc,
|
|
||||||
'morning' AS snapshot_type,
|
|
||||||
NULL::INTEGER AS recheck_hour,
|
|
||||||
venue_json ->> 'tenant_id' AS tenant_id,
|
|
||||||
venue_json -> 'slots' AS slots_json
|
|
||||||
FROM (
|
|
||||||
SELECT date, captured_at_utc, venues
|
|
||||||
FROM read_json(
|
|
||||||
@LANDING_DIR || '/playtomic/*/*/availability_*.json.gz',
|
|
||||||
format = 'auto',
|
|
||||||
columns = {
|
|
||||||
date: 'VARCHAR',
|
|
||||||
captured_at_utc: 'VARCHAR',
|
|
||||||
venues: 'JSON[]'
|
|
||||||
},
|
|
||||||
filename = true,
|
|
||||||
maximum_object_size = 134217728 -- 128 MB; daily files grow with venue count
|
|
||||||
)
|
|
||||||
WHERE filename NOT LIKE '%_recheck_%'
|
|
||||||
AND venues IS NOT NULL
|
|
||||||
AND json_array_length(venues) > 0
|
|
||||||
) af,
|
|
||||||
LATERAL UNNEST(af.venues) AS t(venue_json)
|
|
||||||
),
|
|
||||||
-- Recheck snapshots (new JSONL format — one venue per line)
|
|
||||||
recheck_jsonl AS (
|
recheck_jsonl AS (
|
||||||
SELECT
|
SELECT
|
||||||
date AS snapshot_date,
|
date AS snapshot_date,
|
||||||
@@ -101,43 +63,10 @@ recheck_jsonl AS (
|
|||||||
)
|
)
|
||||||
WHERE tenant_id IS NOT NULL
|
WHERE tenant_id IS NOT NULL
|
||||||
),
|
),
|
||||||
-- Recheck snapshots (old blob format, kept for transition)
|
|
||||||
recheck_blob AS (
|
|
||||||
SELECT
|
|
||||||
rf.date AS snapshot_date,
|
|
||||||
rf.captured_at_utc,
|
|
||||||
'recheck' AS snapshot_type,
|
|
||||||
TRY_CAST(
|
|
||||||
regexp_extract(rf.filename, '_recheck_(\d+)', 1) AS INTEGER
|
|
||||||
) AS recheck_hour,
|
|
||||||
venue_json ->> 'tenant_id' AS tenant_id,
|
|
||||||
venue_json -> 'slots' AS slots_json
|
|
||||||
FROM (
|
|
||||||
SELECT date, captured_at_utc, venues, filename
|
|
||||||
FROM read_json(
|
|
||||||
@LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz',
|
|
||||||
format = 'auto',
|
|
||||||
columns = {
|
|
||||||
date: 'VARCHAR',
|
|
||||||
captured_at_utc: 'VARCHAR',
|
|
||||||
venues: 'JSON[]'
|
|
||||||
},
|
|
||||||
filename = true,
|
|
||||||
maximum_object_size = 134217728 -- 128 MB; matches morning snapshot limit
|
|
||||||
)
|
|
||||||
WHERE venues IS NOT NULL
|
|
||||||
AND json_array_length(venues) > 0
|
|
||||||
) rf,
|
|
||||||
LATERAL UNNEST(rf.venues) AS t(venue_json)
|
|
||||||
),
|
|
||||||
all_venues AS (
|
all_venues AS (
|
||||||
SELECT * FROM morning_jsonl
|
SELECT * FROM morning_jsonl
|
||||||
UNION ALL
|
UNION ALL
|
||||||
SELECT * FROM morning_blob
|
|
||||||
UNION ALL
|
|
||||||
SELECT * FROM recheck_jsonl
|
SELECT * FROM recheck_jsonl
|
||||||
UNION ALL
|
|
||||||
SELECT * FROM recheck_blob
|
|
||||||
),
|
),
|
||||||
raw_resources AS (
|
raw_resources AS (
|
||||||
SELECT
|
SELECT
|
||||||
|
|||||||
@@ -5,11 +5,7 @@
|
|||||||
-- DuckDB auto-infers opening_hours as STRUCT, so we access each day by literal
|
-- DuckDB auto-infers opening_hours as STRUCT, so we access each day by literal
|
||||||
-- key (no dynamic access) and UNION ALL to unpivot.
|
-- key (no dynamic access) and UNION ALL to unpivot.
|
||||||
--
|
--
|
||||||
-- Supports two landing formats (UNION ALL during migration):
|
-- Source: data/landing/playtomic/{year}/{month}/{day}/tenants.jsonl.gz
|
||||||
-- New: tenants.jsonl.gz — one tenant per line, opening_hours is a top-level JSON column
|
|
||||||
-- Old: tenants.json.gz — {"tenants": [...]} blob (UNNEST required)
|
|
||||||
--
|
|
||||||
-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz
|
|
||||||
|
|
||||||
MODEL (
|
MODEL (
|
||||||
name staging.stg_playtomic_opening_hours,
|
name staging.stg_playtomic_opening_hours,
|
||||||
@@ -19,40 +15,18 @@ MODEL (
|
|||||||
);
|
);
|
||||||
|
|
||||||
WITH
|
WITH
|
||||||
-- New format: one tenant per JSONL line
|
venues AS (
|
||||||
jsonl_venues AS (
|
|
||||||
SELECT
|
SELECT
|
||||||
tenant_id,
|
tenant_id,
|
||||||
opening_hours AS oh
|
opening_hours AS oh
|
||||||
FROM read_json(
|
FROM read_json(
|
||||||
@LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz',
|
@LANDING_DIR || '/playtomic/*/*/*/tenants.jsonl.gz',
|
||||||
format = 'newline_delimited',
|
format = 'newline_delimited',
|
||||||
columns = {tenant_id: 'VARCHAR', opening_hours: 'JSON'}
|
columns = {tenant_id: 'VARCHAR', opening_hours: 'JSON'}
|
||||||
)
|
)
|
||||||
WHERE tenant_id IS NOT NULL
|
WHERE tenant_id IS NOT NULL
|
||||||
AND opening_hours IS NOT NULL
|
AND opening_hours IS NOT NULL
|
||||||
),
|
),
|
||||||
-- Old format: blob
|
|
||||||
blob_venues AS (
|
|
||||||
SELECT
|
|
||||||
tenant ->> 'tenant_id' AS tenant_id,
|
|
||||||
tenant -> 'opening_hours' AS oh
|
|
||||||
FROM (
|
|
||||||
SELECT UNNEST(tenants) AS tenant
|
|
||||||
FROM read_json(
|
|
||||||
@LANDING_DIR || '/playtomic/*/*/tenants.json.gz',
|
|
||||||
format = 'auto',
|
|
||||||
maximum_object_size = 134217728
|
|
||||||
)
|
|
||||||
)
|
|
||||||
WHERE (tenant ->> 'tenant_id') IS NOT NULL
|
|
||||||
AND (tenant -> 'opening_hours') IS NOT NULL
|
|
||||||
),
|
|
||||||
venues AS (
|
|
||||||
SELECT * FROM jsonl_venues
|
|
||||||
UNION ALL
|
|
||||||
SELECT * FROM blob_venues
|
|
||||||
),
|
|
||||||
-- Unpivot by UNION ALL — 7 literal key accesses
|
-- Unpivot by UNION ALL — 7 literal key accesses
|
||||||
unpivoted AS (
|
unpivoted AS (
|
||||||
SELECT tenant_id, 'MONDAY' AS day_of_week, 1 AS day_number,
|
SELECT tenant_id, 'MONDAY' AS day_of_week, 1 AS day_number,
|
||||||
@@ -104,6 +78,4 @@ SELECT
|
|||||||
FROM unpivoted
|
FROM unpivoted
|
||||||
WHERE opening_time IS NOT NULL
|
WHERE opening_time IS NOT NULL
|
||||||
AND closing_time IS NOT NULL
|
AND closing_time IS NOT NULL
|
||||||
-- Enforce grain: if both old blob and new JSONL exist for the same month,
|
|
||||||
-- the UNION ALL produces duplicate (tenant_id, day_of_week) pairs — deduplicate.
|
|
||||||
QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, day_of_week ORDER BY tenant_id) = 1
|
QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, day_of_week ORDER BY tenant_id) = 1
|
||||||
|
|||||||
@@ -2,11 +2,7 @@
|
|||||||
-- Reads resources array from the landing zone to extract court type, size,
|
-- Reads resources array from the landing zone to extract court type, size,
|
||||||
-- surface, and booking config.
|
-- surface, and booking config.
|
||||||
--
|
--
|
||||||
-- Supports two landing formats (UNION ALL during migration):
|
-- Source: data/landing/playtomic/{year}/{month}/{day}/tenants.jsonl.gz
|
||||||
-- New: tenants.jsonl.gz — one tenant per line, resources is a top-level JSON column
|
|
||||||
-- Old: tenants.json.gz — {"tenants": [...]} blob (double UNNEST: tenants → resources)
|
|
||||||
--
|
|
||||||
-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz
|
|
||||||
|
|
||||||
MODEL (
|
MODEL (
|
||||||
name staging.stg_playtomic_resources,
|
name staging.stg_playtomic_resources,
|
||||||
@@ -16,41 +12,18 @@ MODEL (
|
|||||||
);
|
);
|
||||||
|
|
||||||
WITH
|
WITH
|
||||||
-- New format: one tenant per JSONL line — single UNNEST for resources
|
unnested AS (
|
||||||
jsonl_unnested AS (
|
|
||||||
SELECT
|
SELECT
|
||||||
tenant_id,
|
tenant_id,
|
||||||
UPPER(address ->> 'country_code') AS country_code,
|
UPPER(address ->> 'country_code') AS country_code,
|
||||||
UNNEST(from_json(resources, '["JSON"]')) AS resource_json
|
UNNEST(from_json(resources, '["JSON"]')) AS resource_json
|
||||||
FROM read_json(
|
FROM read_json(
|
||||||
@LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz',
|
@LANDING_DIR || '/playtomic/*/*/*/tenants.jsonl.gz',
|
||||||
format = 'newline_delimited',
|
format = 'newline_delimited',
|
||||||
columns = {tenant_id: 'VARCHAR', address: 'JSON', resources: 'JSON'}
|
columns = {tenant_id: 'VARCHAR', address: 'JSON', resources: 'JSON'}
|
||||||
)
|
)
|
||||||
WHERE tenant_id IS NOT NULL
|
WHERE tenant_id IS NOT NULL
|
||||||
AND resources IS NOT NULL
|
AND resources IS NOT NULL
|
||||||
),
|
|
||||||
-- Old format: blob — double UNNEST (tenants → resources)
|
|
||||||
blob_unnested AS (
|
|
||||||
SELECT
|
|
||||||
tenant ->> 'tenant_id' AS tenant_id,
|
|
||||||
UPPER(tenant -> 'address' ->> 'country_code') AS country_code,
|
|
||||||
UNNEST(from_json(tenant -> 'resources', '["JSON"]')) AS resource_json
|
|
||||||
FROM (
|
|
||||||
SELECT UNNEST(tenants) AS tenant
|
|
||||||
FROM read_json(
|
|
||||||
@LANDING_DIR || '/playtomic/*/*/tenants.json.gz',
|
|
||||||
format = 'auto',
|
|
||||||
maximum_object_size = 134217728
|
|
||||||
)
|
|
||||||
)
|
|
||||||
WHERE (tenant ->> 'tenant_id') IS NOT NULL
|
|
||||||
AND (tenant -> 'resources') IS NOT NULL
|
|
||||||
),
|
|
||||||
unnested AS (
|
|
||||||
SELECT * FROM jsonl_unnested
|
|
||||||
UNION ALL
|
|
||||||
SELECT * FROM blob_unnested
|
|
||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
tenant_id,
|
tenant_id,
|
||||||
@@ -68,6 +41,4 @@ SELECT
|
|||||||
FROM unnested
|
FROM unnested
|
||||||
WHERE (resource_json ->> 'resource_id') IS NOT NULL
|
WHERE (resource_json ->> 'resource_id') IS NOT NULL
|
||||||
AND (resource_json ->> 'sport_id') = 'PADEL'
|
AND (resource_json ->> 'sport_id') = 'PADEL'
|
||||||
-- Enforce grain: if both old blob and new JSONL exist for the same month,
|
|
||||||
-- the UNION ALL produces duplicate (tenant_id, resource_id) pairs — deduplicate.
|
|
||||||
QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, resource_json ->> 'resource_id' ORDER BY tenant_id) = 1
|
QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, resource_json ->> 'resource_id' ORDER BY tenant_id) = 1
|
||||||
|
|||||||
@@ -3,11 +3,7 @@
|
|||||||
-- including address, opening hours, court resources, VAT rate, and facilities.
|
-- including address, opening hours, court resources, VAT rate, and facilities.
|
||||||
-- Deduplicates on tenant_id (keeps most recent extraction).
|
-- Deduplicates on tenant_id (keeps most recent extraction).
|
||||||
--
|
--
|
||||||
-- Supports two landing formats (UNION ALL during migration):
|
-- Source: data/landing/playtomic/{year}/{month}/{day}/tenants.jsonl.gz
|
||||||
-- New: tenants.jsonl.gz — one tenant JSON object per line (no UNNEST needed)
|
|
||||||
-- Old: tenants.json.gz — {"tenants": [{...}]} blob (UNNEST required)
|
|
||||||
--
|
|
||||||
-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz
|
|
||||||
|
|
||||||
MODEL (
|
MODEL (
|
||||||
name staging.stg_playtomic_venues,
|
name staging.stg_playtomic_venues,
|
||||||
@@ -17,8 +13,7 @@ MODEL (
|
|||||||
);
|
);
|
||||||
|
|
||||||
WITH
|
WITH
|
||||||
-- New format: one tenant per JSONL line — no UNNEST, access columns directly
|
parsed AS (
|
||||||
jsonl_parsed AS (
|
|
||||||
SELECT
|
SELECT
|
||||||
tenant_id,
|
tenant_id,
|
||||||
tenant_name,
|
tenant_name,
|
||||||
@@ -45,7 +40,7 @@ jsonl_parsed AS (
|
|||||||
filename AS source_file,
|
filename AS source_file,
|
||||||
CURRENT_DATE AS extracted_date
|
CURRENT_DATE AS extracted_date
|
||||||
FROM read_json(
|
FROM read_json(
|
||||||
@LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz',
|
@LANDING_DIR || '/playtomic/*/*/*/tenants.jsonl.gz',
|
||||||
format = 'newline_delimited',
|
format = 'newline_delimited',
|
||||||
filename = true,
|
filename = true,
|
||||||
columns = {
|
columns = {
|
||||||
@@ -59,49 +54,6 @@ jsonl_parsed AS (
|
|||||||
)
|
)
|
||||||
WHERE tenant_id IS NOT NULL
|
WHERE tenant_id IS NOT NULL
|
||||||
),
|
),
|
||||||
-- Old format: {"tenants": [...]} blob — keep for transition until old files rotate out
|
|
||||||
blob_parsed AS (
|
|
||||||
SELECT
|
|
||||||
tenant ->> 'tenant_id' AS tenant_id,
|
|
||||||
tenant ->> 'tenant_name' AS tenant_name,
|
|
||||||
tenant ->> 'slug' AS slug,
|
|
||||||
tenant ->> 'tenant_type' AS tenant_type,
|
|
||||||
tenant ->> 'tenant_status' AS tenant_status,
|
|
||||||
tenant ->> 'playtomic_status' AS playtomic_status,
|
|
||||||
tenant ->> 'booking_type' AS booking_type,
|
|
||||||
tenant -> 'address' ->> 'street' AS street,
|
|
||||||
tenant -> 'address' ->> 'city' AS city,
|
|
||||||
tenant -> 'address' ->> 'postal_code' AS postal_code,
|
|
||||||
UPPER(tenant -> 'address' ->> 'country_code') AS country_code,
|
|
||||||
tenant -> 'address' ->> 'timezone' AS timezone,
|
|
||||||
tenant -> 'address' ->> 'administrative_area' AS administrative_area,
|
|
||||||
TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lat' AS DOUBLE) AS lat,
|
|
||||||
TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lon' AS DOUBLE) AS lon,
|
|
||||||
TRY_CAST(tenant ->> 'vat_rate' AS DOUBLE) AS vat_rate,
|
|
||||||
tenant ->> 'default_currency' AS default_currency,
|
|
||||||
TRY_CAST(tenant -> 'booking_settings' ->> 'booking_ahead_limit' AS INTEGER) AS booking_ahead_limit_minutes,
|
|
||||||
tenant -> 'opening_hours' AS opening_hours_json,
|
|
||||||
tenant -> 'resources' AS resources_json,
|
|
||||||
tenant ->> 'created_at' AS created_at,
|
|
||||||
tenant ->> 'is_playtomic_partner' AS is_playtomic_partner_raw,
|
|
||||||
filename AS source_file,
|
|
||||||
CURRENT_DATE AS extracted_date
|
|
||||||
FROM (
|
|
||||||
SELECT UNNEST(tenants) AS tenant, filename
|
|
||||||
FROM read_json(
|
|
||||||
@LANDING_DIR || '/playtomic/*/*/tenants.json.gz',
|
|
||||||
format = 'auto',
|
|
||||||
filename = true,
|
|
||||||
maximum_object_size = 134217728
|
|
||||||
)
|
|
||||||
)
|
|
||||||
WHERE (tenant ->> 'tenant_id') IS NOT NULL
|
|
||||||
),
|
|
||||||
parsed AS (
|
|
||||||
SELECT * FROM jsonl_parsed
|
|
||||||
UNION ALL
|
|
||||||
SELECT * FROM blob_parsed
|
|
||||||
),
|
|
||||||
deduped AS (
|
deduped AS (
|
||||||
SELECT *,
|
SELECT *,
|
||||||
ROW_NUMBER() OVER (PARTITION BY tenant_id ORDER BY source_file DESC) AS rn
|
ROW_NUMBER() OVER (PARTITION BY tenant_id ORDER BY source_file DESC) AS rn
|
||||||
|
|||||||
@@ -3,11 +3,7 @@
|
|||||||
-- Broad coverage (140K+ locations) enables Gemeinde-level market intelligence.
|
-- Broad coverage (140K+ locations) enables Gemeinde-level market intelligence.
|
||||||
-- One row per geoname_id (GeoNames stable numeric identifier).
|
-- One row per geoname_id (GeoNames stable numeric identifier).
|
||||||
--
|
--
|
||||||
-- Supports two landing formats (UNION ALL during migration):
|
-- Source: data/landing/geonames/{year}/{month}/cities_global.jsonl.gz
|
||||||
-- New: cities_global.jsonl.gz — one city per line, columns directly accessible
|
|
||||||
-- Old: cities_global.json.gz — {"rows": [...]} blob (UNNEST required)
|
|
||||||
--
|
|
||||||
-- Source: data/landing/geonames/{year}/{month}/cities_global.{jsonl,json}.gz
|
|
||||||
|
|
||||||
MODEL (
|
MODEL (
|
||||||
name staging.stg_population_geonames,
|
name staging.stg_population_geonames,
|
||||||
@@ -16,74 +12,29 @@ MODEL (
|
|||||||
grain geoname_id
|
grain geoname_id
|
||||||
);
|
);
|
||||||
|
|
||||||
WITH
|
|
||||||
-- New format: one city per JSONL line
|
|
||||||
jsonl_rows AS (
|
|
||||||
SELECT
|
|
||||||
TRY_CAST(geoname_id AS INTEGER) AS geoname_id,
|
|
||||||
city_name,
|
|
||||||
country_code,
|
|
||||||
TRY_CAST(lat AS DOUBLE) AS lat,
|
|
||||||
TRY_CAST(lon AS DOUBLE) AS lon,
|
|
||||||
admin1_code,
|
|
||||||
admin2_code,
|
|
||||||
TRY_CAST(population AS BIGINT) AS population,
|
|
||||||
TRY_CAST(ref_year AS INTEGER) AS ref_year,
|
|
||||||
CURRENT_DATE AS extracted_date
|
|
||||||
FROM read_json(
|
|
||||||
@LANDING_DIR || '/geonames/*/*/cities_global.jsonl.gz',
|
|
||||||
format = 'newline_delimited',
|
|
||||||
columns = {
|
|
||||||
geoname_id: 'INTEGER', city_name: 'VARCHAR', country_code: 'VARCHAR',
|
|
||||||
lat: 'DOUBLE', lon: 'DOUBLE', admin1_code: 'VARCHAR', admin2_code: 'VARCHAR',
|
|
||||||
population: 'BIGINT', ref_year: 'INTEGER'
|
|
||||||
}
|
|
||||||
)
|
|
||||||
WHERE geoname_id IS NOT NULL
|
|
||||||
),
|
|
||||||
-- Old format: {"rows": [...]} blob — kept for transition
|
|
||||||
blob_rows AS (
|
|
||||||
SELECT
|
|
||||||
TRY_CAST(row ->> 'geoname_id' AS INTEGER) AS geoname_id,
|
|
||||||
row ->> 'city_name' AS city_name,
|
|
||||||
row ->> 'country_code' AS country_code,
|
|
||||||
TRY_CAST(row ->> 'lat' AS DOUBLE) AS lat,
|
|
||||||
TRY_CAST(row ->> 'lon' AS DOUBLE) AS lon,
|
|
||||||
row ->> 'admin1_code' AS admin1_code,
|
|
||||||
row ->> 'admin2_code' AS admin2_code,
|
|
||||||
TRY_CAST(row ->> 'population' AS BIGINT) AS population,
|
|
||||||
TRY_CAST(row ->> 'ref_year' AS INTEGER) AS ref_year,
|
|
||||||
CURRENT_DATE AS extracted_date
|
|
||||||
FROM (
|
|
||||||
SELECT UNNEST(rows) AS row
|
|
||||||
FROM read_json(
|
|
||||||
@LANDING_DIR || '/geonames/*/*/cities_global.json.gz',
|
|
||||||
auto_detect = true,
|
|
||||||
maximum_object_size = 40000000
|
|
||||||
)
|
|
||||||
)
|
|
||||||
WHERE (row ->> 'geoname_id') IS NOT NULL
|
|
||||||
),
|
|
||||||
all_rows AS (
|
|
||||||
SELECT * FROM jsonl_rows
|
|
||||||
UNION ALL
|
|
||||||
SELECT * FROM blob_rows
|
|
||||||
)
|
|
||||||
SELECT
|
SELECT
|
||||||
geoname_id,
|
TRY_CAST(geoname_id AS INTEGER) AS geoname_id,
|
||||||
TRIM(city_name) AS city_name,
|
TRIM(city_name) AS city_name,
|
||||||
UPPER(country_code) AS country_code,
|
UPPER(country_code) AS country_code,
|
||||||
lat,
|
TRY_CAST(lat AS DOUBLE) AS lat,
|
||||||
lon,
|
TRY_CAST(lon AS DOUBLE) AS lon,
|
||||||
NULLIF(TRIM(admin1_code), '') AS admin1_code,
|
NULLIF(TRIM(admin1_code), '') AS admin1_code,
|
||||||
NULLIF(TRIM(admin2_code), '') AS admin2_code,
|
NULLIF(TRIM(admin2_code), '') AS admin2_code,
|
||||||
population,
|
TRY_CAST(population AS BIGINT) AS population,
|
||||||
ref_year,
|
TRY_CAST(ref_year AS INTEGER) AS ref_year,
|
||||||
extracted_date
|
CURRENT_DATE AS extracted_date
|
||||||
FROM all_rows
|
FROM read_json(
|
||||||
WHERE population IS NOT NULL
|
@LANDING_DIR || '/geonames/*/*/cities_global.jsonl.gz',
|
||||||
|
format = 'newline_delimited',
|
||||||
|
columns = {
|
||||||
|
geoname_id: 'INTEGER', city_name: 'VARCHAR', country_code: 'VARCHAR',
|
||||||
|
lat: 'DOUBLE', lon: 'DOUBLE', admin1_code: 'VARCHAR', admin2_code: 'VARCHAR',
|
||||||
|
population: 'BIGINT', ref_year: 'INTEGER'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
WHERE geoname_id IS NOT NULL
|
||||||
|
AND population IS NOT NULL
|
||||||
AND population > 0
|
AND population > 0
|
||||||
AND geoname_id IS NOT NULL
|
|
||||||
AND city_name IS NOT NULL
|
AND city_name IS NOT NULL
|
||||||
AND lat IS NOT NULL
|
AND lat IS NOT NULL
|
||||||
AND lon IS NOT NULL
|
AND lon IS NOT NULL
|
||||||
|
|||||||
@@ -1,22 +1,19 @@
|
|||||||
"""Create minimal seed files for SQLMesh staging models that require landing data."""
|
"""Create minimal seed files for SQLMesh staging models that require landing data.
|
||||||
|
|
||||||
|
Seeds are empty JSONL gzip files — they satisfy DuckDB's file-not-found check
|
||||||
|
while contributing zero rows to the staging models.
|
||||||
|
"""
|
||||||
import gzip
|
import gzip
|
||||||
import json
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
seed = {
|
# stg_playtomic_availability requires at least one morning and one recheck file
|
||||||
"date": "1970-01-01",
|
morning = Path("data/landing/playtomic/1970/01/availability_1970-01-01.jsonl.gz")
|
||||||
"captured_at_utc": "1970-01-01T00:00:00Z",
|
recheck = Path("data/landing/playtomic/1970/01/availability_1970-01-01_recheck_00.jsonl.gz")
|
||||||
"venue_count": 0,
|
|
||||||
"venues_errored": 0,
|
|
||||||
"venues": [],
|
|
||||||
}
|
|
||||||
morning = Path("data/landing/playtomic/1970/01/availability_1970-01-01.json.gz")
|
|
||||||
recheck = Path("data/landing/playtomic/1970/01/availability_1970-01-01_recheck_00.json.gz")
|
|
||||||
morning.parent.mkdir(parents=True, exist_ok=True)
|
morning.parent.mkdir(parents=True, exist_ok=True)
|
||||||
for p in [morning, recheck]:
|
for p in [morning, recheck]:
|
||||||
if not p.exists():
|
if not p.exists():
|
||||||
with gzip.open(p, "wt") as f:
|
with gzip.open(p, "wb") as f:
|
||||||
json.dump(seed, f)
|
pass # empty JSONL — 0 rows, no error
|
||||||
print("created", p)
|
print("created", p)
|
||||||
else:
|
else:
|
||||||
print("exists ", p)
|
print("exists ", p)
|
||||||
|
|||||||
Reference in New Issue
Block a user