diff --git a/CHANGELOG.md b/CHANGELOG.md index 832ea1f..61ff8cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,10 +16,27 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - **Admin email gallery** (`/admin/emails/gallery`) — card grid of all email types; preview page with EN/DE language toggle renders each template in a sandboxed iframe (`srcdoc`); "View in sent log →" cross-link; gallery link added to admin sidebar - **Compose live preview** — two-column compose layout: form on the left, HTMX-powered preview iframe on the right; `hx-trigger="input delay:500ms"` on the textarea; `POST /admin/emails/compose/preview` endpoint supports plain body or branded wrapper via `wrap` checkbox - 50 new tests covering all template renders (EN + DE), registry structure, gallery routes (access control, list, preview, lang fallback), and compose preview endpoint +- **JSONL streaming landing format** — extractors now write one JSON object per line (`.jsonl.gz`) instead of a single large blob, eliminating in-memory accumulation and `maximum_object_size` workarounds: + - `playtomic_tenants.py` → `tenants.jsonl.gz` (one tenant per line; dedup still happens in memory before write) + - `playtomic_availability.py` → `availability_{date}.jsonl.gz` (one venue per line with `date`/`captured_at_utc` injected; working file IS the final file — eliminates the consolidation step) + - `geonames.py` → `cities_global.jsonl.gz` (one city per line; eliminates 30 MB blob and its `maximum_object_size` workaround) + - `compress_jsonl_atomic(jsonl_path, dest_path)` utility added to `utils.py` — streams compression in 1 MB chunks, atomic `.tmp` rename, deletes source +- **Regional Overpass splitting for tennis courts** — replaces single global query (150K+ elements, timed out) with 10 regional bbox queries (~10-40K elements each, 150s server / 180s client): + - Regions: europe\_west, europe\_central, europe\_east, north\_america, south\_america, asia\_east, asia\_west, oceania, africa, asia\_north + - Per-region retry (2 attempts, 30s cooldown) + 5s inter-region polite delay + - Crash recovery via `working.jsonl` accumulation — already-written element IDs skipped on restart; completed regions produce 0 new elements on re-query + - Output: `courts.jsonl.gz` (one OSM element per line) +- **`scripts/init_landing_seeds.py`** — creates minimal `.jsonl.gz` and `.json.gz` seed files in `1970/01/` so SQLMesh staging models can run before real extraction data arrives; idempotent + +### Changed +- All modified staging SQL models use **UNION ALL transition CTEs** — both JSONL (new) and blob (old) formats are readable simultaneously; old `.json.gz` files in the landing zone continue working until they rotate out naturally: + - `stg_playtomic_venues`, `stg_playtomic_resources`, `stg_playtomic_opening_hours` — JSONL top-level columns (no `UNNEST(tenants)`) + - `stg_playtomic_availability` — JSONL morning files + blob morning files + blob recheck files + - `stg_population_geonames` — JSONL city rows (no `UNNEST(rows)`, no `maximum_object_size`) + - `stg_tennis_courts` — JSONL elements with `COALESCE(lat, center.lat)` for way/relation centre coords; blob UNNEST kept for old files ### Removed - `_email_wrap()` and `_email_button()` helper functions removed from `worker.py` — replaced by templates - - **Marketplace admin dashboard** (`/admin/marketplace`) — single-screen health view for the two-sided market: - **Lead funnel** — total / verified-new (ready to unlock) / unlocked / won / conversion rate - **Credit economy** — total credits issued, consumed (lead unlocks), outstanding balance across all paid suppliers, 30-day burn rate diff --git a/PROJECT.md b/PROJECT.md index 9feaa0c..0a96835 100644 --- a/PROJECT.md +++ b/PROJECT.md @@ -93,6 +93,9 @@ - [x] `dim_venues` (OSM + Playtomic deduped), `dim_cities` (Eurostat population) - [x] `city_market_profile` (market score OBT), `planner_defaults` (per-city calculator pre-fill) - [x] DuckDB analytics reader in app lifecycle +- [x] **JSONL streaming landing format** — extractors write `.jsonl.gz` (one record per line); constant-memory compression via `compress_jsonl_atomic()`; eliminates `maximum_object_size` workarounds; all modified staging models use UNION ALL transition to support both formats +- [x] **Regional Overpass tennis splitting** — 10 regional bbox queries replace the single global 150K-element query that timed out; crash recovery via `working.jsonl` accumulation +- [x] **`init_landing_seeds.py`** — creates minimal seed files for both JSONL and blob formats so SQLMesh can run before real data arrives ### i18n - [x] Full i18n across entire app (EN + DE) diff --git a/extract/padelnomics_extract/src/padelnomics_extract/geonames.py b/extract/padelnomics_extract/src/padelnomics_extract/geonames.py index b6d6a8d..0e83498 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/geonames.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/geonames.py @@ -10,14 +10,14 @@ highest padel investment opportunity (white space markets). Requires: GEONAMES_USERNAME env var (free registration at geonames.org) -Landing: {LANDING_DIR}/geonames/{year}/{month}/cities_global.json.gz -Output: {"rows": [{"geoname_id": 2950159, "city_name": "Berlin", - "country_code": "DE", "population": 3644826, - "lat": 52.524, "lon": 13.411, - "admin1_code": "16", "admin2_code": "00", - "ref_year": 2024}], "count": N} +Landing: {LANDING_DIR}/geonames/{year}/{month}/cities_global.jsonl.gz +Output: one JSON object per line, e.g.: + {"geoname_id": 2950159, "city_name": "Berlin", "country_code": "DE", + "population": 3644826, "lat": 52.524, "lon": 13.411, + "admin1_code": "16", "admin2_code": "00", "ref_year": 2024} """ +import gzip import io import json import os @@ -28,7 +28,7 @@ from pathlib import Path import niquests from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging -from .utils import get_last_cursor, landing_path, write_gzip_atomic +from .utils import compress_jsonl_atomic, get_last_cursor, landing_path logger = setup_logging("padelnomics.extract.geonames") @@ -131,9 +131,12 @@ def extract( logger.warning("GEONAMES_USERNAME not set — writing empty placeholder so SQLMesh models can run") year, month = year_month.split("/") dest_dir = landing_path(landing_dir, "geonames", year, month) - dest = dest_dir / "cities_global.json.gz" + dest = dest_dir / "cities_global.jsonl.gz" if not dest.exists(): - write_gzip_atomic(dest, b'{"rows": [], "count": 0}') + tmp = dest.with_suffix(".gz.tmp") + with gzip.open(tmp, "wt") as f: + f.write('{"geoname_id":null}\n') # filtered by WHERE geoname_id IS NOT NULL + tmp.rename(dest) return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) @@ -164,9 +167,12 @@ def extract( logger.info("parsed %d global locations (pop ≥1K)", len(rows)) dest_dir = landing_path(landing_dir, "geonames", year, month) - dest = dest_dir / "cities_global.json.gz" - payload = json.dumps({"rows": rows, "count": len(rows)}).encode() - bytes_written = write_gzip_atomic(dest, payload) + dest = dest_dir / "cities_global.jsonl.gz" + working_path = dest.with_suffix(".working.jsonl") + with open(working_path, "w") as f: + for row in rows: + f.write(json.dumps(row, separators=(",", ":")) + "\n") + bytes_written = compress_jsonl_atomic(working_path, dest) logger.info("written %s bytes compressed", f"{bytes_written:,}") return { diff --git a/extract/padelnomics_extract/src/padelnomics_extract/overpass_tennis.py b/extract/padelnomics_extract/src/padelnomics_extract/overpass_tennis.py index d0a6748..e7f1c0f 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/overpass_tennis.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/overpass_tennis.py @@ -1,40 +1,77 @@ """Overpass API extractor — global tennis court locations from OpenStreetMap. -Queries the Overpass API for all nodes/ways/relations tagged sport=tennis. -Tennis court density near a location is a proxy for racket-sport culture — -areas with many tennis clubs are prime candidates for padel adoption. +Queries the Overpass API for all nodes/ways/relations tagged sport=tennis, +split across 10 geographic regions to avoid timeout on the ~150K+ global result. -The query returns ~150K+ results globally (vs ~5K for padel), so a higher -Overpass timeout is used. +Regional strategy: + - Each region is a bounding box covering a continent or sub-continent + - Each region is queried independently (POST with [bbox:...]) + - Overlapping bboxes are deduped on OSM element id + - One region per POST (~10-40K elements each, well within Overpass limits) + - Crash recovery: working JSONL accumulates completed regions; on restart + already-written IDs are skipped, completed regions produce 0 new elements -Landing: {LANDING_DIR}/overpass_tennis/{year}/{month}/courts.json.gz +Landing: {LANDING_DIR}/overpass_tennis/{year}/{month}/courts.jsonl.gz """ +import json import sqlite3 +import time from pathlib import Path import niquests -from ._shared import OVERPASS_TIMEOUT_SECONDS, run_extractor, setup_logging -from .utils import landing_path, write_gzip_atomic +from ._shared import run_extractor, setup_logging +from .utils import compress_jsonl_atomic, landing_path, load_partial_results logger = setup_logging("padelnomics.extract.overpass_tennis") EXTRACTOR_NAME = "overpass_tennis" OVERPASS_URL = "https://overpass-api.de/api/interpreter" -# Tennis returns ~150K+ elements globally vs ~5K for padel — use 3× timeout. -TENNIS_OVERPASS_TIMEOUT_SECONDS = OVERPASS_TIMEOUT_SECONDS * 3 +# Each region is [south, west, north, east] — Overpass bbox format +REGIONS = [ + {"name": "europe_west", "bbox": "35.0,-11.0,61.0,8.0"}, # FR ES GB PT IE BE NL + {"name": "europe_central", "bbox": "42.0,8.0,55.5,24.0"}, # DE IT AT CH CZ PL HU + {"name": "europe_east", "bbox": "35.0,24.0,72.0,60.0"}, # Nordics Baltics GR TR RO + {"name": "north_america", "bbox": "15.0,-170.0,72.0,-50.0"}, # US CA MX + {"name": "south_america", "bbox": "-56.0,-82.0,15.0,-34.0"}, # BR AR CL + {"name": "asia_east", "bbox": "18.0,73.0,54.0,150.0"}, # JP KR CN + {"name": "asia_west", "bbox": "-11.0,24.0,42.0,73.0"}, # Middle East India + {"name": "oceania", "bbox": "-50.0,110.0,5.0,180.0"}, # AU NZ + {"name": "africa", "bbox": "-35.0,-18.0,37.0,52.0"}, # ZA EG MA + {"name": "asia_north", "bbox": "42.0,60.0,82.0,180.0"}, # RU-east KZ +] -OVERPASS_QUERY = ( - "[out:json][timeout:300];\n" - "(\n" - ' node["sport"="tennis"];\n' - ' way["sport"="tennis"];\n' - ' relation["sport"="tennis"];\n' - ");\n" - "out center;" -) +MAX_RETRIES_PER_REGION = 2 +RETRY_DELAY_SECONDS = 30 # Overpass cooldown between retries +REGION_TIMEOUT_SECONDS = 180 # Client-side per-region timeout (server uses 150s) +INTER_REGION_DELAY_SECONDS = 5 # Polite delay between regions + + +def _region_query(bbox: str) -> str: + """Build an Overpass QL query for tennis courts within a bounding box.""" + return ( + f"[out:json][timeout:150][bbox:{bbox}];\n" + "(\n" + " node[\"sport\"=\"tennis\"];\n" + " way[\"sport\"=\"tennis\"];\n" + " rel[\"sport\"=\"tennis\"];\n" + ");\n" + "out center;" + ) + + +def _query_region(session: niquests.Session, region: dict) -> list[dict]: + """POST one regional Overpass query. Returns list of OSM elements.""" + query = _region_query(region["bbox"]) + resp = session.post( + OVERPASS_URL, + data={"data": query}, + timeout=REGION_TIMEOUT_SECONDS, + ) + resp.raise_for_status() + return resp.json().get("elements", []) def extract( @@ -43,24 +80,84 @@ def extract( conn: sqlite3.Connection, session: niquests.Session, ) -> dict: - """POST OverpassQL query for tennis courts and write raw OSM JSON. Returns run metrics.""" + """Query Overpass for global tennis courts using regional bbox splitting. + + Splits the global query into REGIONS to avoid Overpass timeout. + Writes one OSM element per line to courts.jsonl.gz. + Crash-safe: working.jsonl accumulates results; on restart already-written + element IDs are skipped so completed regions produce 0 new elements. + """ + assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}" + assert "/" in year_month and len(year_month) == 7, f"year_month must be YYYY/MM: {year_month!r}" + year, month = year_month.split("/") dest_dir = landing_path(landing_dir, "overpass_tennis", year, month) - dest = dest_dir / "courts.json.gz" + dest = dest_dir / "courts.jsonl.gz" + old_blob = dest_dir / "courts.json.gz" - logger.info("POST %s (sport=tennis, ~150K+ results expected)", OVERPASS_URL) - resp = session.post( - OVERPASS_URL, - data={"data": OVERPASS_QUERY}, - timeout=TENNIS_OVERPASS_TIMEOUT_SECONDS, + if dest.exists() or old_blob.exists(): + logger.info("Already have courts for %s — skipping", year_month) + return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} + + # Crash recovery: load already-written elements from the working file + working_path = dest_dir / "courts.working.jsonl" + prior_records, already_seen_ids = load_partial_results(working_path, id_key="id") + if already_seen_ids: + logger.info("Resuming: %d elements already in working file", len(already_seen_ids)) + + total_new = 0 + regions_succeeded: list[str] = [] + regions_failed: list[str] = [] + + working_file = open(working_path, "a") # noqa: SIM115 + try: + for i, region in enumerate(REGIONS): + for attempt in range(MAX_RETRIES_PER_REGION + 1): + try: + elements = _query_region(session, region) + new_elements = [e for e in elements if str(e.get("id", "")) not in already_seen_ids] + for elem in new_elements: + working_file.write(json.dumps(elem, separators=(",", ":")) + "\n") + already_seen_ids.add(str(elem["id"])) + working_file.flush() + total_new += len(new_elements) + regions_succeeded.append(region["name"]) + logger.info( + "Region %s: %d elements (%d new, %d total)", + region["name"], len(elements), len(new_elements), len(already_seen_ids), + ) + break + except niquests.exceptions.RequestException as exc: + if attempt < MAX_RETRIES_PER_REGION: + logger.warning( + "Region %s attempt %d failed: %s — retrying in %ds", + region["name"], attempt + 1, exc, RETRY_DELAY_SECONDS, + ) + time.sleep(RETRY_DELAY_SECONDS) + else: + regions_failed.append(region["name"]) + logger.error( + "Region %s failed after %d attempts: %s", + region["name"], MAX_RETRIES_PER_REGION + 1, exc, + ) + + if i < len(REGIONS) - 1: + time.sleep(INTER_REGION_DELAY_SECONDS) + finally: + working_file.close() + + total_elements = len(prior_records) + total_new + if total_elements == 0: + raise RuntimeError(f"All regions failed, no elements written: {regions_failed}") + + if regions_failed: + logger.warning("Completed with %d failed regions: %s", len(regions_failed), regions_failed) + + bytes_written = compress_jsonl_atomic(working_path, dest) + logger.info( + "%d total elements (%d regions, %d failed) -> %s (%s bytes)", + total_elements, len(regions_succeeded), len(regions_failed), dest, f"{bytes_written:,}", ) - resp.raise_for_status() - - size_bytes = len(resp.content) - logger.info("%s bytes received", f"{size_bytes:,}") - - bytes_written = write_gzip_atomic(dest, resp.content) - logger.info("wrote %s (%s bytes compressed)", dest, f"{bytes_written:,}") return { "files_written": 1, diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py index e665ccd..1fab6e2 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py @@ -16,7 +16,7 @@ records (a few seconds of work with 10 workers) are lost on crash. Recheck mode: re-queries venues with slots starting within the next 90 minutes. Writes a separate recheck file for more accurate occupancy measurement. -Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.json.gz +Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.jsonl.gz Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.json.gz """ @@ -34,7 +34,13 @@ import niquests from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler -from .utils import flush_partial_batch, landing_path, load_partial_results, write_gzip_atomic +from .utils import ( + compress_jsonl_atomic, + flush_partial_batch, + landing_path, + load_partial_results, + write_gzip_atomic, +) logger = setup_logging("padelnomics.extract.playtomic_availability") @@ -65,28 +71,44 @@ _thread_local = threading.local() # --------------------------------------------------------------------------- def _load_tenant_ids(landing_dir: Path) -> list[str]: - """Read tenant IDs from the most recent tenants.json.gz file.""" + """Read tenant IDs from the most recent tenants file (JSONL or blob format).""" assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}" playtomic_dir = landing_dir / "playtomic" if not playtomic_dir.exists(): return [] - tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True) + # Prefer JSONL (new format), fall back to blob (old format) + tenant_files = sorted(playtomic_dir.glob("*/*/tenants.jsonl.gz"), reverse=True) + if not tenant_files: + tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True) if not tenant_files: return [] latest = tenant_files[0] logger.info("Loading tenant IDs from %s", latest) - - with gzip.open(latest, "rb") as f: - data = json.loads(f.read()) - - tenants = data.get("tenants", []) ids = [] - for t in tenants: - tid = t.get("tenant_id") or t.get("id") - if tid: - ids.append(tid) + + with gzip.open(latest, "rt") as f: + if latest.name.endswith(".jsonl.gz"): + # JSONL: one tenant object per line + for line in f: + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + tid = record.get("tenant_id") or record.get("id") + if tid: + ids.append(tid) + except json.JSONDecodeError: + break # truncated last line + else: + # Blob: {"tenants": [...]} + data = json.loads(f.read()) + for t in data.get("tenants", []): + tid = t.get("tenant_id") or t.get("id") + if tid: + ids.append(tid) logger.info("Loaded %d tenant IDs", len(ids)) return ids @@ -257,14 +279,14 @@ def extract( year, month = year_month.split("/") dest_dir = landing_path(landing_dir, "playtomic", year, month) - dest = dest_dir / f"availability_{target_date}.json.gz" - - if dest.exists(): - logger.info("Already have %s — skipping", dest) + dest = dest_dir / f"availability_{target_date}.jsonl.gz" + old_blob = dest_dir / f"availability_{target_date}.json.gz" + if dest.exists() or old_blob.exists(): + logger.info("Already have availability for %s — skipping", target_date) return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} - # Crash resumption: load already-fetched venues from partial file - partial_path = dest.with_suffix(".partial.jsonl") + # Crash resumption: load already-fetched venues from working file + partial_path = dest_dir / f"availability_{target_date}.working.jsonl" prior_results, already_done = load_partial_results(partial_path, id_key="tenant_id") if already_done: logger.info("Resuming: %d venues already fetched from partial file", len(already_done)) @@ -281,7 +303,10 @@ def extract( start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S") start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S") - # Partial file for incremental crash-safe progress + # Timestamp stamped into every JSONL line — computed once before the fetch loop. + captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") + + # Working file for incremental crash-safe progress (IS the final file). partial_file = open(partial_path, "a") # noqa: SIM115 partial_lock = threading.Lock() pending_batch: list[dict] = [] @@ -289,6 +314,9 @@ def extract( def _on_result(result: dict) -> None: # Called inside _fetch_venues_parallel's lock — no additional locking needed. # In serial mode, called single-threaded — also safe without extra locking. + # Inject date + captured_at so every JSONL line is self-contained. + result["date"] = target_date + result["captured_at_utc"] = captured_at pending_batch.append(result) if len(pending_batch) >= PARTIAL_FLUSH_SIZE: flush_partial_batch(partial_file, partial_lock, pending_batch) @@ -332,24 +360,13 @@ def extract( pending_batch.clear() partial_file.close() - # Consolidate prior (resumed) + new results into final file - venues_data = prior_results + new_venues_data - captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") - payload = json.dumps({ - "date": target_date, - "captured_at_utc": captured_at, - "venue_count": len(venues_data), - "venues_errored": venues_errored, - "venues": venues_data, - }).encode() - - bytes_written = write_gzip_atomic(dest, payload) - if partial_path.exists(): - partial_path.unlink() + # Working file IS the output — compress atomically (deletes source). + total_venues = len(prior_results) + len(new_venues_data) + bytes_written = compress_jsonl_atomic(partial_path, dest) logger.info( "%d venues scraped (%d errors) -> %s (%s bytes)", - len(venues_data), venues_errored, dest, f"{bytes_written:,}", + total_venues, venues_errored, dest, f"{bytes_written:,}", ) return { @@ -364,14 +381,36 @@ def extract( # Recheck mode — re-query venues with upcoming slots for accurate occupancy # --------------------------------------------------------------------------- +def _read_availability_jsonl(path: Path) -> dict: + """Read a JSONL availability file into the blob dict format recheck expects.""" + venues = [] + date_val = captured_at = None + with gzip.open(path, "rt") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + break # truncated last line on crash + if date_val is None: + date_val = record.get("date") + captured_at = record.get("captured_at_utc") + venues.append(record) + return {"date": date_val, "captured_at_utc": captured_at, "venues": venues} + + def _load_morning_availability(landing_dir: Path, target_date: str) -> dict | None: - """Load today's morning availability file. Returns parsed JSON or None.""" + """Load today's morning availability file (JSONL or blob). Returns dict or None.""" playtomic_dir = landing_dir / "playtomic" - # Search across year/month dirs for the target date + # Try JSONL first (new format), fall back to blob (old format) + matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.jsonl.gz")) + if matches: + return _read_availability_jsonl(matches[0]) matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.json.gz")) if not matches: return None - with gzip.open(matches[0], "rb") as f: return json.loads(f.read()) diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py index 8feb5c4..ea95eca 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py @@ -1,8 +1,8 @@ """Playtomic tenants extractor — venue listings via unauthenticated API. Paginates through the global tenant list (sorted by UUID) using the `page` -parameter. Deduplicates on tenant_id and writes a single consolidated JSON -to the landing zone. +parameter. Deduplicates on tenant_id and writes a gzipped JSONL file to the +landing zone (one tenant object per line). API notes (discovered 2026-02): - bbox params (min_latitude etc.) are silently ignored by the API @@ -18,7 +18,7 @@ pages. Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2). -Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.json.gz +Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.jsonl.gz """ import json @@ -31,7 +31,7 @@ import niquests from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging from .proxy import load_proxy_urls, make_round_robin_cycler -from .utils import landing_path, write_gzip_atomic +from .utils import compress_jsonl_atomic, landing_path logger = setup_logging("padelnomics.extract.playtomic_tenants") @@ -76,7 +76,7 @@ def extract( """Fetch all Playtomic venues via global pagination. Returns run metrics.""" year, month = year_month.split("/") dest_dir = landing_path(landing_dir, "playtomic", year, month) - dest = dest_dir / "tenants.json.gz" + dest = dest_dir / "tenants.jsonl.gz" proxy_urls = load_proxy_urls() next_proxy = make_round_robin_cycler(proxy_urls) if proxy_urls else None @@ -138,8 +138,12 @@ def extract( if not next_proxy: time.sleep(THROTTLE_SECONDS) - payload = json.dumps({"tenants": all_tenants, "count": len(all_tenants)}).encode() - bytes_written = write_gzip_atomic(dest, payload) + # Write each tenant as a JSONL line, then compress atomically + working_path = dest.with_suffix(".working.jsonl") + with open(working_path, "w") as f: + for tenant in all_tenants: + f.write(json.dumps(tenant, separators=(",", ":")) + "\n") + bytes_written = compress_jsonl_atomic(working_path, dest) logger.info("%d unique venues -> %s", len(all_tenants), dest) return { diff --git a/extract/padelnomics_extract/src/padelnomics_extract/utils.py b/extract/padelnomics_extract/src/padelnomics_extract/utils.py index 15777f0..451c365 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/utils.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/utils.py @@ -174,3 +174,23 @@ def write_gzip_atomic(path: Path, data: bytes) -> int: tmp.write_bytes(compressed) tmp.rename(path) return len(compressed) + + +def compress_jsonl_atomic(jsonl_path: Path, dest_path: Path) -> int: + """Compress a JSONL working file to .jsonl.gz atomically, then delete the source. + + Streams compression in 1MB chunks (constant memory regardless of file size). + Atomic via .tmp rename — readers never see a partial .jsonl.gz. + Deletes the uncompressed working file after successful compression. + Returns compressed bytes written. + """ + assert jsonl_path.exists(), f"source must exist: {jsonl_path}" + assert jsonl_path.stat().st_size > 0, f"source must not be empty: {jsonl_path}" + tmp = dest_path.with_suffix(dest_path.suffix + ".tmp") + with open(jsonl_path, "rb") as f_in, gzip.open(tmp, "wb") as f_out: + while chunk := f_in.read(1_048_576): # 1 MB chunks + f_out.write(chunk) + bytes_written = tmp.stat().st_size + tmp.rename(dest_path) + jsonl_path.unlink() + return bytes_written diff --git a/scripts/init_landing_seeds.py b/scripts/init_landing_seeds.py new file mode 100644 index 0000000..cc61bd5 --- /dev/null +++ b/scripts/init_landing_seeds.py @@ -0,0 +1,101 @@ +"""Create minimal landing zone seed files so SQLMesh models can run before real data arrives. + +Each seed contains one null/empty record that is filtered out by the staging model's +WHERE clause. Seeds live in the 1970/01 epoch so they're never confused with real data. + +Usage: + uv run python scripts/init_landing_seeds.py [--landing-dir data/landing] + +Idempotent: skips existing files. +""" + +import argparse +import gzip +import json +from pathlib import Path + + +def create_seed(dest: Path, content: bytes) -> None: + """Write content to a gzip file atomically. Skips if the file already exists.""" + if dest.exists(): + return + dest.parent.mkdir(parents=True, exist_ok=True) + tmp = dest.with_suffix(dest.suffix + ".tmp") + with gzip.open(tmp, "wb") as f: + f.write(content) + tmp.rename(dest) + print(f" created: {dest}") + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--landing-dir", default="data/landing", type=Path) + args = parser.parse_args() + base: Path = args.landing_dir + + seeds = { + # --- Playtomic tenants --- + # JSONL: one null tenant (filtered by WHERE tenant_id IS NOT NULL) + "playtomic/1970/01/tenants.jsonl.gz": + b'{"tenant_id":null}\n', + # Blob: empty tenants array + "playtomic/1970/01/tenants.json.gz": + json.dumps({"tenants": [], "count": 0}).encode(), + + # --- Playtomic availability (morning) --- + # JSONL: one null venue (filtered by WHERE tenant_id IS NOT NULL) + "playtomic/1970/01/availability_1970-01-01.jsonl.gz": + b'{"tenant_id":null,"date":"1970-01-01","captured_at_utc":"1970-01-01T00:00:00Z","slots":null}\n', + # Blob: empty venues array + "playtomic/1970/01/availability_1970-01-01.json.gz": + json.dumps({"date": "1970-01-01", "captured_at_utc": "1970-01-01T00:00:00Z", + "venue_count": 0, "venues": []}).encode(), + + # --- Playtomic recheck (blob only, small format) --- + "playtomic/1970/01/availability_1970-01-01_recheck_00.json.gz": + json.dumps({"date": "1970-01-01", "captured_at_utc": "1970-01-01T00:00:00Z", + "recheck_hour": 0, "venues": []}).encode(), + + # --- GeoNames --- + # JSONL: one null city (filtered by WHERE geoname_id IS NOT NULL) + "geonames/1970/01/cities_global.jsonl.gz": + b'{"geoname_id":null}\n', + # Blob: empty rows array + "geonames/1970/01/cities_global.json.gz": + json.dumps({"rows": [], "count": 0}).encode(), + + # --- Overpass tennis --- + # JSONL: one null element (filtered by WHERE type IS NOT NULL) + "overpass_tennis/1970/01/courts.jsonl.gz": + b'{"type":null,"id":null}\n', + # Blob: empty elements array + "overpass_tennis/1970/01/courts.json.gz": + json.dumps({"version": 0.6, "elements": []}).encode(), + + # --- Overpass padel (unchanged format) --- + "overpass/1970/01/courts.json.gz": + json.dumps({"version": 0.6, "elements": []}).encode(), + + # --- Eurostat --- + "eurostat/1970/01/urb_cpop1.json.gz": + json.dumps({"rows": [], "count": 0}).encode(), + "eurostat/1970/01/ilc_di03.json.gz": + json.dumps({"rows": [], "count": 0}).encode(), + "eurostat_city_labels/1970/01/cities_codelist.json.gz": + json.dumps({"rows": [], "count": 0}).encode(), + + # --- National statistics --- + "ons_uk/1970/01/lad_population.json.gz": + json.dumps({"rows": [], "count": 0}).encode(), + "census_usa/1970/01/acs5_places.json.gz": + json.dumps({"rows": [], "count": 0}).encode(), + } + + print(f"Initialising landing seeds in: {base}") + for rel_path, content in seeds.items(): + create_seed(base / rel_path, content) + print("Done.") + + +if __name__ == "__main__": + main() diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql index bf0b3f2..9092796 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql @@ -3,12 +3,17 @@ -- "Available" = the slot was NOT booked at capture time. Missing slots = booked. -- -- Reads BOTH morning snapshots and recheck files: --- Morning: availability_{date}.json.gz → snapshot_type = 'morning' --- Recheck: availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck' +-- Morning (new): availability_{date}.jsonl.gz → snapshot_type = 'morning' +-- Morning (old): availability_{date}.json.gz → snapshot_type = 'morning' +-- Recheck: availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck' -- -- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit). -- Price parsed from strings like "14.56 EUR" or "48 GBP". -- +-- Supports two morning landing formats (UNION ALL during migration): +-- New: availability_{date}.jsonl.gz — one venue per line, columns: tenant_id, slots, date, captured_at_utc +-- Old: availability_{date}.json.gz — {"date":..., "venues": [...]} blob (UNNEST required) +-- -- Requires: at least one availability file in the landing zone. -- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz) -- with empty venues[] ensures this model runs before real data arrives. @@ -20,77 +25,105 @@ MODEL ( grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc) ); --- Morning snapshots (filename does NOT contain '_recheck_') -WITH morning_files AS ( +WITH +-- New format: one venue per JSONL line — no outer UNNEST needed +morning_jsonl AS ( SELECT - *, - 'morning' AS snapshot_type, - NULL::INTEGER AS recheck_hour + date AS snapshot_date, + captured_at_utc, + 'morning' AS snapshot_type, + NULL::INTEGER AS recheck_hour, + tenant_id, + slots AS slots_json FROM read_json( - @LANDING_DIR || '/playtomic/*/*/availability_*.json.gz', - format = 'auto', + @LANDING_DIR || '/playtomic/*/*/availability_*.jsonl.gz', + format = 'newline_delimited', columns = { date: 'VARCHAR', captured_at_utc: 'VARCHAR', - venues: 'JSON[]' + tenant_id: 'VARCHAR', + slots: 'JSON' }, - filename = true, - maximum_object_size = 134217728 -- 128 MB; daily files grow with venue count + filename = true ) WHERE filename NOT LIKE '%_recheck_%' - AND venues IS NOT NULL - AND json_array_length(venues) > 0 + AND tenant_id IS NOT NULL ), --- Recheck snapshots (filename contains '_recheck_') --- Use TRY_CAST on a regex-extracted hour to get the recheck_hour. --- If no recheck files exist yet, this CTE produces zero rows (safe). -recheck_files AS ( +-- Old format: {"date":..., "venues": [...]} blob — kept for transition +morning_blob AS ( SELECT - *, - 'recheck' AS snapshot_type, - TRY_CAST( - regexp_extract(filename, '_recheck_(\d+)', 1) AS INTEGER - ) AS recheck_hour - FROM read_json( - @LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz', - format = 'auto', - columns = { - date: 'VARCHAR', - captured_at_utc: 'VARCHAR', - venues: 'JSON[]' - }, - filename = true, - maximum_object_size = 134217728 -- 128 MB; matches morning snapshot limit - ) - WHERE venues IS NOT NULL - AND json_array_length(venues) > 0 -), -all_files AS ( - SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM morning_files - UNION ALL - SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM recheck_files -), -raw_venues AS ( - SELECT - af.date AS snapshot_date, + af.date AS snapshot_date, af.captured_at_utc, - af.snapshot_type, - af.recheck_hour, - venue_json - FROM all_files af, + 'morning' AS snapshot_type, + NULL::INTEGER AS recheck_hour, + venue_json ->> 'tenant_id' AS tenant_id, + venue_json -> 'slots' AS slots_json + FROM ( + SELECT date, captured_at_utc, venues + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/availability_*.json.gz', + format = 'auto', + columns = { + date: 'VARCHAR', + captured_at_utc: 'VARCHAR', + venues: 'JSON[]' + }, + filename = true, + maximum_object_size = 134217728 -- 128 MB; daily files grow with venue count + ) + WHERE filename NOT LIKE '%_recheck_%' + AND venues IS NOT NULL + AND json_array_length(venues) > 0 + ) af, LATERAL UNNEST(af.venues) AS t(venue_json) ), +-- Recheck snapshots (blob format only — small files, no JSONL conversion needed) +recheck_blob AS ( + SELECT + rf.date AS snapshot_date, + rf.captured_at_utc, + 'recheck' AS snapshot_type, + TRY_CAST( + regexp_extract(rf.filename, '_recheck_(\d+)', 1) AS INTEGER + ) AS recheck_hour, + venue_json ->> 'tenant_id' AS tenant_id, + venue_json -> 'slots' AS slots_json + FROM ( + SELECT date, captured_at_utc, venues, filename + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz', + format = 'auto', + columns = { + date: 'VARCHAR', + captured_at_utc: 'VARCHAR', + venues: 'JSON[]' + }, + filename = true, + maximum_object_size = 134217728 -- 128 MB; matches morning snapshot limit + ) + WHERE venues IS NOT NULL + AND json_array_length(venues) > 0 + ) rf, + LATERAL UNNEST(rf.venues) AS t(venue_json) +), +all_venues AS ( + SELECT * FROM morning_jsonl + UNION ALL + SELECT * FROM morning_blob + UNION ALL + SELECT * FROM recheck_blob +), raw_resources AS ( SELECT - rv.snapshot_date, - rv.captured_at_utc, - rv.snapshot_type, - rv.recheck_hour, - rv.venue_json ->> 'tenant_id' AS tenant_id, + av.snapshot_date, + av.captured_at_utc, + av.snapshot_type, + av.recheck_hour, + av.tenant_id, resource_json - FROM raw_venues rv, + FROM all_venues av, LATERAL UNNEST( - from_json(rv.venue_json -> 'slots', '["JSON"]') + from_json(av.slots_json, '["JSON"]') ) AS t(resource_json) ), raw_slots AS ( diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_opening_hours.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_opening_hours.sql index 08aa810..42e7bf9 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_opening_hours.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_opening_hours.sql @@ -5,8 +5,11 @@ -- DuckDB auto-infers opening_hours as STRUCT, so we access each day by literal -- key (no dynamic access) and UNION ALL to unpivot. -- --- Source: data/landing/playtomic/{year}/{month}/tenants.json.gz --- Each tenant has opening_hours: {MONDAY: {opening_time, closing_time}, ...} +-- Supports two landing formats (UNION ALL during migration): +-- New: tenants.jsonl.gz — one tenant per line, opening_hours is a top-level JSON column +-- Old: tenants.json.gz — {"tenants": [...]} blob (UNNEST required) +-- +-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz MODEL ( name staging.stg_playtomic_opening_hours, @@ -15,7 +18,22 @@ MODEL ( grain (tenant_id, day_of_week) ); -WITH venues AS ( +WITH +-- New format: one tenant per JSONL line +jsonl_venues AS ( + SELECT + tenant_id, + opening_hours AS oh + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz', + format = 'newline_delimited', + columns = {tenant_id: 'VARCHAR', opening_hours: 'JSON'} + ) + WHERE tenant_id IS NOT NULL + AND opening_hours IS NOT NULL +), +-- Old format: blob +blob_venues AS ( SELECT tenant ->> 'tenant_id' AS tenant_id, tenant -> 'opening_hours' AS oh @@ -30,6 +48,11 @@ WITH venues AS ( WHERE (tenant ->> 'tenant_id') IS NOT NULL AND (tenant -> 'opening_hours') IS NOT NULL ), +venues AS ( + SELECT * FROM jsonl_venues + UNION ALL + SELECT * FROM blob_venues +), -- Unpivot by UNION ALL — 7 literal key accesses unpivoted AS ( SELECT tenant_id, 'MONDAY' AS day_of_week, 1 AS day_number, diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_resources.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_resources.sql index 0907d6a..b6f6353 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_resources.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_resources.sql @@ -1,9 +1,12 @@ -- Individual court (resource) records from Playtomic venues. --- Reads resources array from the landing zone JSON directly (double UNNEST: --- tenants → resources) to extract court type, size, surface, and booking config. +-- Reads resources array from the landing zone to extract court type, size, +-- surface, and booking config. -- --- Source: data/landing/playtomic/{year}/{month}/tenants.json.gz --- Each tenant has a resources[] array of court objects. +-- Supports two landing formats (UNION ALL during migration): +-- New: tenants.jsonl.gz — one tenant per line, resources is a top-level JSON column +-- Old: tenants.json.gz — {"tenants": [...]} blob (double UNNEST: tenants → resources) +-- +-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz MODEL ( name staging.stg_playtomic_resources, @@ -12,36 +15,56 @@ MODEL ( grain (tenant_id, resource_id) ); -WITH raw AS ( - SELECT UNNEST(tenants) AS tenant - FROM read_json( - @LANDING_DIR || '/playtomic/*/*/tenants.json.gz', - format = 'auto', - maximum_object_size = 134217728 - ) -), -unnested AS ( +WITH +-- New format: one tenant per JSONL line — single UNNEST for resources +jsonl_unnested AS ( SELECT - tenant ->> 'tenant_id' AS tenant_id, - UPPER(tenant -> 'address' ->> 'country_code') AS country_code, - UNNEST(from_json(tenant -> 'resources', '["JSON"]')) AS resource_json - FROM raw + tenant_id, + UPPER(address ->> 'country_code') AS country_code, + UNNEST(from_json(resources, '["JSON"]')) AS resource_json + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz', + format = 'newline_delimited', + columns = {tenant_id: 'VARCHAR', address: 'JSON', resources: 'JSON'} + ) + WHERE tenant_id IS NOT NULL + AND resources IS NOT NULL +), +-- Old format: blob — double UNNEST (tenants → resources) +blob_unnested AS ( + SELECT + tenant ->> 'tenant_id' AS tenant_id, + UPPER(tenant -> 'address' ->> 'country_code') AS country_code, + UNNEST(from_json(tenant -> 'resources', '["JSON"]')) AS resource_json + FROM ( + SELECT UNNEST(tenants) AS tenant + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/tenants.json.gz', + format = 'auto', + maximum_object_size = 134217728 + ) + ) WHERE (tenant ->> 'tenant_id') IS NOT NULL AND (tenant -> 'resources') IS NOT NULL +), +unnested AS ( + SELECT * FROM jsonl_unnested + UNION ALL + SELECT * FROM blob_unnested ) SELECT tenant_id, - resource_json ->> 'resource_id' AS resource_id, + resource_json ->> 'resource_id' AS resource_id, country_code, - NULLIF(TRIM(resource_json ->> 'name'), '') AS resource_name, - resource_json ->> 'sport_id' AS sport_id, + NULLIF(TRIM(resource_json ->> 'name'), '') AS resource_name, + resource_json ->> 'sport_id' AS sport_id, CASE WHEN LOWER(resource_json ->> 'is_active') IN ('true', '1') - THEN TRUE ELSE FALSE END AS is_active, + THEN TRUE ELSE FALSE END AS is_active, LOWER(resource_json -> 'properties' ->> 'resource_type') AS resource_type, LOWER(resource_json -> 'properties' ->> 'resource_size') AS resource_size, LOWER(resource_json -> 'properties' ->> 'resource_feature') AS resource_feature, CASE WHEN LOWER(resource_json -> 'booking_settings' ->> 'is_bookable_online') IN ('true', '1') - THEN TRUE ELSE FALSE END AS is_bookable_online + THEN TRUE ELSE FALSE END AS is_bookable_online FROM unnested WHERE (resource_json ->> 'resource_id') IS NOT NULL AND (resource_json ->> 'sport_id') = 'PADEL' diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql index de579b5..6240462 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_venues.sql @@ -1,10 +1,13 @@ -- Playtomic padel venue records — full metadata extraction. --- Reads landing zone JSON, unnests tenant array, extracts all venue metadata +-- Reads landing zone tenants files, extracts all venue metadata -- including address, opening hours, court resources, VAT rate, and facilities. -- Deduplicates on tenant_id (keeps most recent extraction). -- --- Source: data/landing/playtomic/{year}/{month}/tenants.json.gz --- Format: {"tenants": [{tenant_id, tenant_name, address, resources, opening_hours, ...}]} +-- Supports two landing formats (UNION ALL during migration): +-- New: tenants.jsonl.gz — one tenant JSON object per line (no UNNEST needed) +-- Old: tenants.json.gz — {"tenants": [{...}]} blob (UNNEST required) +-- +-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz MODEL ( name staging.stg_playtomic_venues, @@ -13,9 +16,52 @@ MODEL ( grain tenant_id ); -WITH parsed AS ( +WITH +-- New format: one tenant per JSONL line — no UNNEST, access columns directly +jsonl_parsed AS ( + SELECT + tenant_id, + tenant_name, + slug, + tenant_type, + tenant_status, + playtomic_status, + booking_type, + address ->> 'street' AS street, + address ->> 'city' AS city, + address ->> 'postal_code' AS postal_code, + UPPER(address ->> 'country_code') AS country_code, + address ->> 'timezone' AS timezone, + address ->> 'administrative_area' AS administrative_area, + TRY_CAST(address -> 'coordinate' ->> 'lat' AS DOUBLE) AS lat, + TRY_CAST(address -> 'coordinate' ->> 'lon' AS DOUBLE) AS lon, + TRY_CAST(vat_rate AS DOUBLE) AS vat_rate, + default_currency, + TRY_CAST(booking_settings ->> 'booking_ahead_limit' AS INTEGER) AS booking_ahead_limit_minutes, + opening_hours AS opening_hours_json, + resources AS resources_json, + created_at, + CAST(is_playtomic_partner AS VARCHAR) AS is_playtomic_partner_raw, + filename AS source_file, + CURRENT_DATE AS extracted_date + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz', + format = 'newline_delimited', + filename = true, + columns = { + tenant_id: 'VARCHAR', tenant_name: 'VARCHAR', slug: 'VARCHAR', + tenant_type: 'VARCHAR', tenant_status: 'VARCHAR', playtomic_status: 'VARCHAR', + booking_type: 'VARCHAR', address: 'JSON', vat_rate: 'DOUBLE', + default_currency: 'VARCHAR', booking_settings: 'JSON', + opening_hours: 'JSON', resources: 'JSON', + created_at: 'VARCHAR', is_playtomic_partner: 'VARCHAR' + } + ) + WHERE tenant_id IS NOT NULL +), +-- Old format: {"tenants": [...]} blob — keep for transition until old files rotate out +blob_parsed AS ( SELECT - -- Identity tenant ->> 'tenant_id' AS tenant_id, tenant ->> 'tenant_name' AS tenant_name, tenant ->> 'slug' AS slug, @@ -23,8 +69,6 @@ WITH parsed AS ( tenant ->> 'tenant_status' AS tenant_status, tenant ->> 'playtomic_status' AS playtomic_status, tenant ->> 'booking_type' AS booking_type, - - -- Address tenant -> 'address' ->> 'street' AS street, tenant -> 'address' ->> 'city' AS city, tenant -> 'address' ->> 'postal_code' AS postal_code, @@ -33,22 +77,13 @@ WITH parsed AS ( tenant -> 'address' ->> 'administrative_area' AS administrative_area, TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lat' AS DOUBLE) AS lat, TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lon' AS DOUBLE) AS lon, - - -- Commercial - TRY_CAST(tenant ->> 'vat_rate' AS DOUBLE) AS vat_rate, + TRY_CAST(tenant ->> 'vat_rate' AS DOUBLE) AS vat_rate, tenant ->> 'default_currency' AS default_currency, - - -- Booking settings (venue-level) TRY_CAST(tenant -> 'booking_settings' ->> 'booking_ahead_limit' AS INTEGER) AS booking_ahead_limit_minutes, - - -- Opening hours and resources stored as JSON for downstream models tenant -> 'opening_hours' AS opening_hours_json, tenant -> 'resources' AS resources_json, - - -- Metadata tenant ->> 'created_at' AS created_at, tenant ->> 'is_playtomic_partner' AS is_playtomic_partner_raw, - filename AS source_file, CURRENT_DATE AS extracted_date FROM ( @@ -62,6 +97,11 @@ WITH parsed AS ( ) WHERE (tenant ->> 'tenant_id') IS NOT NULL ), +parsed AS ( + SELECT * FROM jsonl_parsed + UNION ALL + SELECT * FROM blob_parsed +), deduped AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY tenant_id ORDER BY source_file DESC) AS rn diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_population_geonames.sql b/transform/sqlmesh_padelnomics/models/staging/stg_population_geonames.sql index 699c90e..82f4826 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_population_geonames.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_population_geonames.sql @@ -3,7 +3,11 @@ -- Broad coverage (140K+ locations) enables Gemeinde-level market intelligence. -- One row per geoname_id (GeoNames stable numeric identifier). -- --- Source: data/landing/geonames/{year}/{month}/cities_global.json.gz +-- Supports two landing formats (UNION ALL during migration): +-- New: cities_global.jsonl.gz — one city per line, columns directly accessible +-- Old: cities_global.json.gz — {"rows": [...]} blob (UNNEST required) +-- +-- Source: data/landing/geonames/{year}/{month}/cities_global.{jsonl,json}.gz MODEL ( name staging.stg_population_geonames, @@ -12,7 +16,33 @@ MODEL ( grain geoname_id ); -WITH parsed AS ( +WITH +-- New format: one city per JSONL line +jsonl_rows AS ( + SELECT + TRY_CAST(geoname_id AS INTEGER) AS geoname_id, + city_name, + country_code, + TRY_CAST(lat AS DOUBLE) AS lat, + TRY_CAST(lon AS DOUBLE) AS lon, + admin1_code, + admin2_code, + TRY_CAST(population AS BIGINT) AS population, + TRY_CAST(ref_year AS INTEGER) AS ref_year, + CURRENT_DATE AS extracted_date + FROM read_json( + @LANDING_DIR || '/geonames/*/*/cities_global.jsonl.gz', + format = 'newline_delimited', + columns = { + geoname_id: 'INTEGER', city_name: 'VARCHAR', country_code: 'VARCHAR', + lat: 'DOUBLE', lon: 'DOUBLE', admin1_code: 'VARCHAR', admin2_code: 'VARCHAR', + population: 'BIGINT', ref_year: 'INTEGER' + } + ) + WHERE geoname_id IS NOT NULL +), +-- Old format: {"rows": [...]} blob — kept for transition +blob_rows AS ( SELECT TRY_CAST(row ->> 'geoname_id' AS INTEGER) AS geoname_id, row ->> 'city_name' AS city_name, @@ -33,11 +63,16 @@ WITH parsed AS ( ) ) WHERE (row ->> 'geoname_id') IS NOT NULL +), +all_rows AS ( + SELECT * FROM jsonl_rows + UNION ALL + SELECT * FROM blob_rows ) SELECT geoname_id, - TRIM(city_name) AS city_name, - UPPER(country_code) AS country_code, + TRIM(city_name) AS city_name, + UPPER(country_code) AS country_code, lat, lon, NULLIF(TRIM(admin1_code), '') AS admin1_code, @@ -45,7 +80,7 @@ SELECT population, ref_year, extracted_date -FROM parsed +FROM all_rows WHERE population IS NOT NULL AND population > 0 AND geoname_id IS NOT NULL diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_tennis_courts.sql b/transform/sqlmesh_padelnomics/models/staging/stg_tennis_courts.sql index 8821f45..c9c5577 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_tennis_courts.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_tennis_courts.sql @@ -2,7 +2,12 @@ -- Used as a "racket sport culture" signal in the opportunity score: -- areas with high tennis court density are prime padel adoption markets. -- --- Source: data/landing/overpass_tennis/{year}/{month}/courts.json.gz +-- Supports two landing formats (UNION ALL during migration): +-- New: courts.jsonl.gz — one OSM element per line; nodes have lat/lon directly, +-- ways/relations have center.lat/center.lon (Overpass out center) +-- Old: courts.json.gz — {"elements": [...]} blob (UNNEST required) +-- +-- Source: data/landing/overpass_tennis/{year}/{month}/courts.{jsonl,json}.gz MODEL ( name staging.stg_tennis_courts, @@ -11,7 +16,39 @@ MODEL ( grain osm_id ); -WITH parsed AS ( +WITH +-- New format: one OSM element per JSONL line +jsonl_elements AS ( + SELECT + type AS osm_type, + TRY_CAST(id AS BIGINT) AS osm_id, + -- Nodes: lat/lon direct. Ways/relations: center object (Overpass out center). + COALESCE( + TRY_CAST(lat AS DOUBLE), + TRY_CAST(center ->> 'lat' AS DOUBLE) + ) AS lat, + COALESCE( + TRY_CAST(lon AS DOUBLE), + TRY_CAST(center ->> 'lon' AS DOUBLE) + ) AS lon, + tags ->> 'name' AS name, + tags ->> 'addr:country' AS country_code, + tags ->> 'addr:city' AS city_tag, + filename AS source_file, + CURRENT_DATE AS extracted_date + FROM read_json( + @LANDING_DIR || '/overpass_tennis/*/*/courts.jsonl.gz', + format = 'newline_delimited', + columns = { + type: 'VARCHAR', id: 'BIGINT', lat: 'DOUBLE', lon: 'DOUBLE', + center: 'JSON', tags: 'JSON' + }, + filename = true + ) + WHERE type IS NOT NULL +), +-- Old format: {"elements": [...]} blob — kept for transition +blob_elements AS ( SELECT elem ->> 'type' AS osm_type, (elem ->> 'id')::BIGINT AS osm_id, @@ -32,12 +69,16 @@ WITH parsed AS ( ) WHERE (elem ->> 'type') IS NOT NULL ), +parsed AS ( + SELECT * FROM jsonl_elements + UNION ALL + SELECT * FROM blob_elements +), deduped AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY osm_id ORDER BY extracted_date DESC) AS rn FROM parsed - WHERE osm_type = 'node' - AND lat IS NOT NULL AND lon IS NOT NULL + WHERE lat IS NOT NULL AND lon IS NOT NULL AND lat BETWEEN -90 AND 90 AND lon BETWEEN -180 AND 180 ), @@ -54,8 +95,8 @@ with_country AS ( WHEN lat BETWEEN 36.35 AND 47.09 AND lon BETWEEN 6.62 AND 18.51 THEN 'IT' WHEN lat BETWEEN 37.00 AND 42.15 AND lon BETWEEN -9.50 AND -6.19 THEN 'PT' ELSE NULL - END) AS country_code, - NULLIF(TRIM(name), '') AS name, + END) AS country_code, + NULLIF(TRIM(name), '') AS name, NULLIF(TRIM(city_tag), '') AS city, extracted_date FROM deduped