From 9bef055e6d58dbb0e4bc792ed90edfd51b1dc752 Mon Sep 17 00:00:00 2001 From: Deeman Date: Wed, 25 Feb 2026 12:07:53 +0100 Subject: [PATCH] feat(extract): convert playtomic_tenants to JSONL output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - playtomic_tenants.py: write each tenant as a JSONL line after dedup, compress via compress_jsonl_atomic → tenants.jsonl.gz - playtomic_availability.py: update _load_tenant_ids() to prefer tenants.jsonl.gz, fall back to tenants.json.gz (transition) - stg_playtomic_venues.sql: UNION ALL jsonl+blob CTEs for transition; JSONL reads top-level columns directly, no UNNEST(tenants) needed - stg_playtomic_resources.sql: same UNION ALL pattern, single UNNEST for resources in JSONL path vs double UNNEST in blob path - stg_playtomic_opening_hours.sql: same UNION ALL pattern, opening_hours as top-level JSON column in JSONL path Co-Authored-By: Claude Sonnet 4.6 --- .../playtomic_availability.py | 38 +++++++--- .../padelnomics_extract/playtomic_tenants.py | 18 +++-- .../staging/stg_playtomic_opening_hours.sql | 29 +++++++- .../staging/stg_playtomic_resources.sql | 67 +++++++++++------ .../models/staging/stg_playtomic_venues.sql | 74 ++++++++++++++----- 5 files changed, 166 insertions(+), 60 deletions(-) diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py index e665ccd..e086855 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py @@ -65,28 +65,44 @@ _thread_local = threading.local() # --------------------------------------------------------------------------- def _load_tenant_ids(landing_dir: Path) -> list[str]: - """Read tenant IDs from the most recent tenants.json.gz file.""" + """Read tenant IDs from the most recent tenants file (JSONL or blob format).""" assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}" playtomic_dir = landing_dir / "playtomic" if not playtomic_dir.exists(): return [] - tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True) + # Prefer JSONL (new format), fall back to blob (old format) + tenant_files = sorted(playtomic_dir.glob("*/*/tenants.jsonl.gz"), reverse=True) + if not tenant_files: + tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True) if not tenant_files: return [] latest = tenant_files[0] logger.info("Loading tenant IDs from %s", latest) - - with gzip.open(latest, "rb") as f: - data = json.loads(f.read()) - - tenants = data.get("tenants", []) ids = [] - for t in tenants: - tid = t.get("tenant_id") or t.get("id") - if tid: - ids.append(tid) + + with gzip.open(latest, "rt") as f: + if latest.name.endswith(".jsonl.gz"): + # JSONL: one tenant object per line + for line in f: + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + tid = record.get("tenant_id") or record.get("id") + if tid: + ids.append(tid) + except json.JSONDecodeError: + break # truncated last line + else: + # Blob: {"tenants": [...]} + data = json.loads(f.read()) + for t in data.get("tenants", []): + tid = t.get("tenant_id") or t.get("id") + if tid: + ids.append(tid) logger.info("Loaded %d tenant IDs", len(ids)) return ids diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py index 8feb5c4..ea95eca 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py @@ -1,8 +1,8 @@ """Playtomic tenants extractor — venue listings via unauthenticated API. Paginates through the global tenant list (sorted by UUID) using the `page` -parameter. Deduplicates on tenant_id and writes a single consolidated JSON -to the landing zone. +parameter. Deduplicates on tenant_id and writes a gzipped JSONL file to the +landing zone (one tenant object per line). API notes (discovered 2026-02): - bbox params (min_latitude etc.) are silently ignored by the API @@ -18,7 +18,7 @@ pages. Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2). -Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.json.gz +Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.jsonl.gz """ import json @@ -31,7 +31,7 @@ import niquests from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging from .proxy import load_proxy_urls, make_round_robin_cycler -from .utils import landing_path, write_gzip_atomic +from .utils import compress_jsonl_atomic, landing_path logger = setup_logging("padelnomics.extract.playtomic_tenants") @@ -76,7 +76,7 @@ def extract( """Fetch all Playtomic venues via global pagination. Returns run metrics.""" year, month = year_month.split("/") dest_dir = landing_path(landing_dir, "playtomic", year, month) - dest = dest_dir / "tenants.json.gz" + dest = dest_dir / "tenants.jsonl.gz" proxy_urls = load_proxy_urls() next_proxy = make_round_robin_cycler(proxy_urls) if proxy_urls else None @@ -138,8 +138,12 @@ def extract( if not next_proxy: time.sleep(THROTTLE_SECONDS) - payload = json.dumps({"tenants": all_tenants, "count": len(all_tenants)}).encode() - bytes_written = write_gzip_atomic(dest, payload) + # Write each tenant as a JSONL line, then compress atomically + working_path = dest.with_suffix(".working.jsonl") + with open(working_path, "w") as f: + for tenant in all_tenants: + f.write(json.dumps(tenant, separators=(",", ":")) + "\n") + bytes_written = compress_jsonl_atomic(working_path, dest) logger.info("%d unique venues -> %s", len(all_tenants), dest) return { diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_opening_hours.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_opening_hours.sql index 08aa810..42e7bf9 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_opening_hours.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_opening_hours.sql @@ -5,8 +5,11 @@ -- DuckDB auto-infers opening_hours as STRUCT, so we access each day by literal -- key (no dynamic access) and UNION ALL to unpivot. -- --- Source: data/landing/playtomic/{year}/{month}/tenants.json.gz --- Each tenant has opening_hours: {MONDAY: {opening_time, closing_time}, ...} +-- Supports two landing formats (UNION ALL during migration): +-- New: tenants.jsonl.gz — one tenant per line, opening_hours is a top-level JSON column +-- Old: tenants.json.gz — {"tenants": [...]} blob (UNNEST required) +-- +-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz MODEL ( name staging.stg_playtomic_opening_hours, @@ -15,7 +18,22 @@ MODEL ( grain (tenant_id, day_of_week) ); -WITH venues AS ( +WITH +-- New format: one tenant per JSONL line +jsonl_venues AS ( + SELECT + tenant_id, + opening_hours AS oh + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz', + format = 'newline_delimited', + columns = {tenant_id: 'VARCHAR', opening_hours: 'JSON'} + ) + WHERE tenant_id IS NOT NULL + AND opening_hours IS NOT NULL +), +-- Old format: blob +blob_venues AS ( SELECT tenant ->> 'tenant_id' AS tenant_id, tenant -> 'opening_hours' AS oh @@ -30,6 +48,11 @@ WITH venues AS ( WHERE (tenant ->> 'tenant_id') IS NOT NULL AND (tenant -> 'opening_hours') IS NOT NULL ), +venues AS ( + SELECT * FROM jsonl_venues + UNION ALL + SELECT * FROM blob_venues +), -- Unpivot by UNION ALL — 7 literal key accesses unpivoted AS ( SELECT tenant_id, 'MONDAY' AS day_of_week, 1 AS day_number, diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_resources.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_resources.sql index 0907d6a..b6f6353 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_resources.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_resources.sql @@ -1,9 +1,12 @@ -- Individual court (resource) records from Playtomic venues. --- Reads resources array from the landing zone JSON directly (double UNNEST: --- tenants → resources) to extract court type, size, surface, and booking config. +-- Reads resources array from the landing zone to extract court type, size, +-- surface, and booking config. -- --- Source: data/landing/playtomic/{year}/{month}/tenants.json.gz --- Each tenant has a resources[] array of court objects. +-- Supports two landing formats (UNION ALL during migration): +-- New: tenants.jsonl.gz — one tenant per line, resources is a top-level JSON column +-- Old: tenants.json.gz — {"tenants": [...]} blob (double UNNEST: tenants → resources) +-- +-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz MODEL ( name staging.stg_playtomic_resources, @@ -12,36 +15,56 @@ MODEL ( grain (tenant_id, resource_id) ); -WITH raw AS ( - SELECT UNNEST(tenants) AS tenant - FROM read_json( - @LANDING_DIR || '/playtomic/*/*/tenants.json.gz', - format = 'auto', - maximum_object_size = 134217728 - ) -), -unnested AS ( +WITH +-- New format: one tenant per JSONL line — single UNNEST for resources +jsonl_unnested AS ( SELECT - tenant ->> 'tenant_id' AS tenant_id, - UPPER(tenant -> 'address' ->> 'country_code') AS country_code, - UNNEST(from_json(tenant -> 'resources', '["JSON"]')) AS resource_json - FROM raw + tenant_id, + UPPER(address ->> 'country_code') AS country_code, + UNNEST(from_json(resources, '["JSON"]')) AS resource_json + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz', + format = 'newline_delimited', + columns = {tenant_id: 'VARCHAR', address: 'JSON', resources: 'JSON'} + ) + WHERE tenant_id IS NOT NULL + AND resources IS NOT NULL +), +-- Old format: blob — double UNNEST (tenants → resources) +blob_unnested AS ( + SELECT + tenant ->> 'tenant_id' AS tenant_id, + UPPER(tenant -> 'address' ->> 'country_code') AS country_code, + UNNEST(from_json(tenant -> 'resources', '["JSON"]')) AS resource_json + FROM ( + SELECT UNNEST(tenants) AS tenant + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/tenants.json.gz', + format = 'auto', + maximum_object_size = 134217728 + ) + ) WHERE (tenant ->> 'tenant_id') IS NOT NULL AND (tenant -> 'resources') IS NOT NULL +), +unnested AS ( + SELECT * FROM jsonl_unnested + UNION ALL + SELECT * FROM blob_unnested ) SELECT tenant_id, - resource_json ->> 'resource_id' AS resource_id, + resource_json ->> 'resource_id' AS resource_id, country_code, - NULLIF(TRIM(resource_json ->> 'name'), '') AS resource_name, - resource_json ->> 'sport_id' AS sport_id, + NULLIF(TRIM(resource_json ->> 'name'), '') AS resource_name, + resource_json ->> 'sport_id' AS sport_id, CASE WHEN LOWER(resource_json ->> 'is_active') IN ('true', '1') - THEN TRUE ELSE FALSE END AS is_active, + THEN TRUE ELSE FALSE END AS is_active, LOWER(resource_json -> 'properties' ->> 'resource_type') AS resource_type, LOWER(resource_json -> 'properties' ->> 'resource_size') AS resource_size, LOWER(resource_json -> 'properties' ->> 'resource_feature') AS resource_feature, CASE WHEN LOWER(resource_json -> 'booking_settings' ->> 'is_bookable_online') IN ('true', '1') - THEN TRUE ELSE FALSE END AS is_bookable_online + THEN TRUE ELSE FALSE END AS is_bookable_online FROM unnested WHERE (resource_json ->> 'resource_id') IS NOT NULL AND (resource_json ->> 'sport_id') = 'PADEL' diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql index de579b5..6240462 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql @@ -1,10 +1,13 @@ -- Playtomic padel venue records — full metadata extraction. --- Reads landing zone JSON, unnests tenant array, extracts all venue metadata +-- Reads landing zone tenants files, extracts all venue metadata -- including address, opening hours, court resources, VAT rate, and facilities. -- Deduplicates on tenant_id (keeps most recent extraction). -- --- Source: data/landing/playtomic/{year}/{month}/tenants.json.gz --- Format: {"tenants": [{tenant_id, tenant_name, address, resources, opening_hours, ...}]} +-- Supports two landing formats (UNION ALL during migration): +-- New: tenants.jsonl.gz — one tenant JSON object per line (no UNNEST needed) +-- Old: tenants.json.gz — {"tenants": [{...}]} blob (UNNEST required) +-- +-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz MODEL ( name staging.stg_playtomic_venues, @@ -13,9 +16,52 @@ MODEL ( grain tenant_id ); -WITH parsed AS ( +WITH +-- New format: one tenant per JSONL line — no UNNEST, access columns directly +jsonl_parsed AS ( + SELECT + tenant_id, + tenant_name, + slug, + tenant_type, + tenant_status, + playtomic_status, + booking_type, + address ->> 'street' AS street, + address ->> 'city' AS city, + address ->> 'postal_code' AS postal_code, + UPPER(address ->> 'country_code') AS country_code, + address ->> 'timezone' AS timezone, + address ->> 'administrative_area' AS administrative_area, + TRY_CAST(address -> 'coordinate' ->> 'lat' AS DOUBLE) AS lat, + TRY_CAST(address -> 'coordinate' ->> 'lon' AS DOUBLE) AS lon, + TRY_CAST(vat_rate AS DOUBLE) AS vat_rate, + default_currency, + TRY_CAST(booking_settings ->> 'booking_ahead_limit' AS INTEGER) AS booking_ahead_limit_minutes, + opening_hours AS opening_hours_json, + resources AS resources_json, + created_at, + CAST(is_playtomic_partner AS VARCHAR) AS is_playtomic_partner_raw, + filename AS source_file, + CURRENT_DATE AS extracted_date + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz', + format = 'newline_delimited', + filename = true, + columns = { + tenant_id: 'VARCHAR', tenant_name: 'VARCHAR', slug: 'VARCHAR', + tenant_type: 'VARCHAR', tenant_status: 'VARCHAR', playtomic_status: 'VARCHAR', + booking_type: 'VARCHAR', address: 'JSON', vat_rate: 'DOUBLE', + default_currency: 'VARCHAR', booking_settings: 'JSON', + opening_hours: 'JSON', resources: 'JSON', + created_at: 'VARCHAR', is_playtomic_partner: 'VARCHAR' + } + ) + WHERE tenant_id IS NOT NULL +), +-- Old format: {"tenants": [...]} blob — keep for transition until old files rotate out +blob_parsed AS ( SELECT - -- Identity tenant ->> 'tenant_id' AS tenant_id, tenant ->> 'tenant_name' AS tenant_name, tenant ->> 'slug' AS slug, @@ -23,8 +69,6 @@ WITH parsed AS ( tenant ->> 'tenant_status' AS tenant_status, tenant ->> 'playtomic_status' AS playtomic_status, tenant ->> 'booking_type' AS booking_type, - - -- Address tenant -> 'address' ->> 'street' AS street, tenant -> 'address' ->> 'city' AS city, tenant -> 'address' ->> 'postal_code' AS postal_code, @@ -33,22 +77,13 @@ WITH parsed AS ( tenant -> 'address' ->> 'administrative_area' AS administrative_area, TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lat' AS DOUBLE) AS lat, TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lon' AS DOUBLE) AS lon, - - -- Commercial - TRY_CAST(tenant ->> 'vat_rate' AS DOUBLE) AS vat_rate, + TRY_CAST(tenant ->> 'vat_rate' AS DOUBLE) AS vat_rate, tenant ->> 'default_currency' AS default_currency, - - -- Booking settings (venue-level) TRY_CAST(tenant -> 'booking_settings' ->> 'booking_ahead_limit' AS INTEGER) AS booking_ahead_limit_minutes, - - -- Opening hours and resources stored as JSON for downstream models tenant -> 'opening_hours' AS opening_hours_json, tenant -> 'resources' AS resources_json, - - -- Metadata tenant ->> 'created_at' AS created_at, tenant ->> 'is_playtomic_partner' AS is_playtomic_partner_raw, - filename AS source_file, CURRENT_DATE AS extracted_date FROM ( @@ -62,6 +97,11 @@ WITH parsed AS ( ) WHERE (tenant ->> 'tenant_id') IS NOT NULL ), +parsed AS ( + SELECT * FROM jsonl_parsed + UNION ALL + SELECT * FROM blob_parsed +), deduped AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY tenant_id ORDER BY source_file DESC) AS rn