merge: JSONL streaming landing format + regional overpass_tennis splitting

Converts extractors from large JSON blobs to streaming JSONL (.jsonl.gz),
eliminating in-memory accumulation, maximum_object_size workarounds, and
the playtomic availability consolidation step.

- compress_jsonl_atomic(): 1MB-chunk streaming compression, atomic rename
- playtomic_tenants → tenants.jsonl.gz (one tenant per line after dedup)
- playtomic_availability → availability_{date}.jsonl.gz (working file IS the output)
- geonames → cities_global.jsonl.gz (eliminates 30MB blob)
- overpass_tennis → 10 regional bbox queries + courts.jsonl.gz with crash recovery
- All modified staging SQL uses UNION ALL (JSONL + blob) for smooth transition
- init_landing_seeds.py: bootstrap seeds for both formats in 1970/01

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

# Conflicts:
#	CHANGELOG.md
This commit is contained in:
Deeman
2026-02-25 12:34:03 +01:00
14 changed files with 682 additions and 200 deletions

View File

@@ -16,10 +16,27 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- **Admin email gallery** (`/admin/emails/gallery`) — card grid of all email types; preview page with EN/DE language toggle renders each template in a sandboxed iframe (`srcdoc`); "View in sent log →" cross-link; gallery link added to admin sidebar - **Admin email gallery** (`/admin/emails/gallery`) — card grid of all email types; preview page with EN/DE language toggle renders each template in a sandboxed iframe (`srcdoc`); "View in sent log →" cross-link; gallery link added to admin sidebar
- **Compose live preview** — two-column compose layout: form on the left, HTMX-powered preview iframe on the right; `hx-trigger="input delay:500ms"` on the textarea; `POST /admin/emails/compose/preview` endpoint supports plain body or branded wrapper via `wrap` checkbox - **Compose live preview** — two-column compose layout: form on the left, HTMX-powered preview iframe on the right; `hx-trigger="input delay:500ms"` on the textarea; `POST /admin/emails/compose/preview` endpoint supports plain body or branded wrapper via `wrap` checkbox
- 50 new tests covering all template renders (EN + DE), registry structure, gallery routes (access control, list, preview, lang fallback), and compose preview endpoint - 50 new tests covering all template renders (EN + DE), registry structure, gallery routes (access control, list, preview, lang fallback), and compose preview endpoint
- **JSONL streaming landing format** — extractors now write one JSON object per line (`.jsonl.gz`) instead of a single large blob, eliminating in-memory accumulation and `maximum_object_size` workarounds:
- `playtomic_tenants.py``tenants.jsonl.gz` (one tenant per line; dedup still happens in memory before write)
- `playtomic_availability.py``availability_{date}.jsonl.gz` (one venue per line with `date`/`captured_at_utc` injected; working file IS the final file — eliminates the consolidation step)
- `geonames.py``cities_global.jsonl.gz` (one city per line; eliminates 30 MB blob and its `maximum_object_size` workaround)
- `compress_jsonl_atomic(jsonl_path, dest_path)` utility added to `utils.py` — streams compression in 1 MB chunks, atomic `.tmp` rename, deletes source
- **Regional Overpass splitting for tennis courts** — replaces single global query (150K+ elements, timed out) with 10 regional bbox queries (~10-40K elements each, 150s server / 180s client):
- Regions: europe\_west, europe\_central, europe\_east, north\_america, south\_america, asia\_east, asia\_west, oceania, africa, asia\_north
- Per-region retry (2 attempts, 30s cooldown) + 5s inter-region polite delay
- Crash recovery via `working.jsonl` accumulation — already-written element IDs skipped on restart; completed regions produce 0 new elements on re-query
- Output: `courts.jsonl.gz` (one OSM element per line)
- **`scripts/init_landing_seeds.py`** — creates minimal `.jsonl.gz` and `.json.gz` seed files in `1970/01/` so SQLMesh staging models can run before real extraction data arrives; idempotent
### Changed
- All modified staging SQL models use **UNION ALL transition CTEs** — both JSONL (new) and blob (old) formats are readable simultaneously; old `.json.gz` files in the landing zone continue working until they rotate out naturally:
- `stg_playtomic_venues`, `stg_playtomic_resources`, `stg_playtomic_opening_hours` — JSONL top-level columns (no `UNNEST(tenants)`)
- `stg_playtomic_availability` — JSONL morning files + blob morning files + blob recheck files
- `stg_population_geonames` — JSONL city rows (no `UNNEST(rows)`, no `maximum_object_size`)
- `stg_tennis_courts` — JSONL elements with `COALESCE(lat, center.lat)` for way/relation centre coords; blob UNNEST kept for old files
### Removed ### Removed
- `_email_wrap()` and `_email_button()` helper functions removed from `worker.py` — replaced by templates - `_email_wrap()` and `_email_button()` helper functions removed from `worker.py` — replaced by templates
- **Marketplace admin dashboard** (`/admin/marketplace`) — single-screen health view for the two-sided market: - **Marketplace admin dashboard** (`/admin/marketplace`) — single-screen health view for the two-sided market:
- **Lead funnel** — total / verified-new (ready to unlock) / unlocked / won / conversion rate - **Lead funnel** — total / verified-new (ready to unlock) / unlocked / won / conversion rate
- **Credit economy** — total credits issued, consumed (lead unlocks), outstanding balance across all paid suppliers, 30-day burn rate - **Credit economy** — total credits issued, consumed (lead unlocks), outstanding balance across all paid suppliers, 30-day burn rate

View File

@@ -93,6 +93,9 @@
- [x] `dim_venues` (OSM + Playtomic deduped), `dim_cities` (Eurostat population) - [x] `dim_venues` (OSM + Playtomic deduped), `dim_cities` (Eurostat population)
- [x] `city_market_profile` (market score OBT), `planner_defaults` (per-city calculator pre-fill) - [x] `city_market_profile` (market score OBT), `planner_defaults` (per-city calculator pre-fill)
- [x] DuckDB analytics reader in app lifecycle - [x] DuckDB analytics reader in app lifecycle
- [x] **JSONL streaming landing format** — extractors write `.jsonl.gz` (one record per line); constant-memory compression via `compress_jsonl_atomic()`; eliminates `maximum_object_size` workarounds; all modified staging models use UNION ALL transition to support both formats
- [x] **Regional Overpass tennis splitting** — 10 regional bbox queries replace the single global 150K-element query that timed out; crash recovery via `working.jsonl` accumulation
- [x] **`init_landing_seeds.py`** — creates minimal seed files for both JSONL and blob formats so SQLMesh can run before real data arrives
### i18n ### i18n
- [x] Full i18n across entire app (EN + DE) - [x] Full i18n across entire app (EN + DE)

View File

@@ -10,14 +10,14 @@ highest padel investment opportunity (white space markets).
Requires: GEONAMES_USERNAME env var (free registration at geonames.org) Requires: GEONAMES_USERNAME env var (free registration at geonames.org)
Landing: {LANDING_DIR}/geonames/{year}/{month}/cities_global.json.gz Landing: {LANDING_DIR}/geonames/{year}/{month}/cities_global.jsonl.gz
Output: {"rows": [{"geoname_id": 2950159, "city_name": "Berlin", Output: one JSON object per line, e.g.:
"country_code": "DE", "population": 3644826, {"geoname_id": 2950159, "city_name": "Berlin", "country_code": "DE",
"lat": 52.524, "lon": 13.411, "population": 3644826, "lat": 52.524, "lon": 13.411,
"admin1_code": "16", "admin2_code": "00", "admin1_code": "16", "admin2_code": "00", "ref_year": 2024}
"ref_year": 2024}], "count": N}
""" """
import gzip
import io import io
import json import json
import os import os
@@ -28,7 +28,7 @@ from pathlib import Path
import niquests import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import get_last_cursor, landing_path, write_gzip_atomic from .utils import compress_jsonl_atomic, get_last_cursor, landing_path
logger = setup_logging("padelnomics.extract.geonames") logger = setup_logging("padelnomics.extract.geonames")
@@ -131,9 +131,12 @@ def extract(
logger.warning("GEONAMES_USERNAME not set — writing empty placeholder so SQLMesh models can run") logger.warning("GEONAMES_USERNAME not set — writing empty placeholder so SQLMesh models can run")
year, month = year_month.split("/") year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "geonames", year, month) dest_dir = landing_path(landing_dir, "geonames", year, month)
dest = dest_dir / "cities_global.json.gz" dest = dest_dir / "cities_global.jsonl.gz"
if not dest.exists(): if not dest.exists():
write_gzip_atomic(dest, b'{"rows": [], "count": 0}') tmp = dest.with_suffix(".gz.tmp")
with gzip.open(tmp, "wt") as f:
f.write('{"geoname_id":null}\n') # filtered by WHERE geoname_id IS NOT NULL
tmp.rename(dest)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
@@ -164,9 +167,12 @@ def extract(
logger.info("parsed %d global locations (pop ≥1K)", len(rows)) logger.info("parsed %d global locations (pop ≥1K)", len(rows))
dest_dir = landing_path(landing_dir, "geonames", year, month) dest_dir = landing_path(landing_dir, "geonames", year, month)
dest = dest_dir / "cities_global.json.gz" dest = dest_dir / "cities_global.jsonl.gz"
payload = json.dumps({"rows": rows, "count": len(rows)}).encode() working_path = dest.with_suffix(".working.jsonl")
bytes_written = write_gzip_atomic(dest, payload) with open(working_path, "w") as f:
for row in rows:
f.write(json.dumps(row, separators=(",", ":")) + "\n")
bytes_written = compress_jsonl_atomic(working_path, dest)
logger.info("written %s bytes compressed", f"{bytes_written:,}") logger.info("written %s bytes compressed", f"{bytes_written:,}")
return { return {

View File

@@ -1,66 +1,163 @@
"""Overpass API extractor — global tennis court locations from OpenStreetMap. """Overpass API extractor — global tennis court locations from OpenStreetMap.
Queries the Overpass API for all nodes/ways/relations tagged sport=tennis. Queries the Overpass API for all nodes/ways/relations tagged sport=tennis,
Tennis court density near a location is a proxy for racket-sport culture — split across 10 geographic regions to avoid timeout on the ~150K+ global result.
areas with many tennis clubs are prime candidates for padel adoption.
The query returns ~150K+ results globally (vs ~5K for padel), so a higher Regional strategy:
Overpass timeout is used. - Each region is a bounding box covering a continent or sub-continent
- Each region is queried independently (POST with [bbox:...])
- Overlapping bboxes are deduped on OSM element id
- One region per POST (~10-40K elements each, well within Overpass limits)
- Crash recovery: working JSONL accumulates completed regions; on restart
already-written IDs are skipped, completed regions produce 0 new elements
Landing: {LANDING_DIR}/overpass_tennis/{year}/{month}/courts.json.gz Landing: {LANDING_DIR}/overpass_tennis/{year}/{month}/courts.jsonl.gz
""" """
import json
import sqlite3 import sqlite3
import time
from pathlib import Path from pathlib import Path
import niquests import niquests
from ._shared import OVERPASS_TIMEOUT_SECONDS, run_extractor, setup_logging from ._shared import run_extractor, setup_logging
from .utils import landing_path, write_gzip_atomic from .utils import compress_jsonl_atomic, landing_path, load_partial_results
logger = setup_logging("padelnomics.extract.overpass_tennis") logger = setup_logging("padelnomics.extract.overpass_tennis")
EXTRACTOR_NAME = "overpass_tennis" EXTRACTOR_NAME = "overpass_tennis"
OVERPASS_URL = "https://overpass-api.de/api/interpreter" OVERPASS_URL = "https://overpass-api.de/api/interpreter"
# Tennis returns ~150K+ elements globally vs ~5K for padel — use 3× timeout. # Each region is [south, west, north, east] — Overpass bbox format
TENNIS_OVERPASS_TIMEOUT_SECONDS = OVERPASS_TIMEOUT_SECONDS * 3 REGIONS = [
{"name": "europe_west", "bbox": "35.0,-11.0,61.0,8.0"}, # FR ES GB PT IE BE NL
{"name": "europe_central", "bbox": "42.0,8.0,55.5,24.0"}, # DE IT AT CH CZ PL HU
{"name": "europe_east", "bbox": "35.0,24.0,72.0,60.0"}, # Nordics Baltics GR TR RO
{"name": "north_america", "bbox": "15.0,-170.0,72.0,-50.0"}, # US CA MX
{"name": "south_america", "bbox": "-56.0,-82.0,15.0,-34.0"}, # BR AR CL
{"name": "asia_east", "bbox": "18.0,73.0,54.0,150.0"}, # JP KR CN
{"name": "asia_west", "bbox": "-11.0,24.0,42.0,73.0"}, # Middle East India
{"name": "oceania", "bbox": "-50.0,110.0,5.0,180.0"}, # AU NZ
{"name": "africa", "bbox": "-35.0,-18.0,37.0,52.0"}, # ZA EG MA
{"name": "asia_north", "bbox": "42.0,60.0,82.0,180.0"}, # RU-east KZ
]
OVERPASS_QUERY = ( MAX_RETRIES_PER_REGION = 2
"[out:json][timeout:300];\n" RETRY_DELAY_SECONDS = 30 # Overpass cooldown between retries
REGION_TIMEOUT_SECONDS = 180 # Client-side per-region timeout (server uses 150s)
INTER_REGION_DELAY_SECONDS = 5 # Polite delay between regions
def _region_query(bbox: str) -> str:
"""Build an Overpass QL query for tennis courts within a bounding box."""
return (
f"[out:json][timeout:150][bbox:{bbox}];\n"
"(\n" "(\n"
' node["sport"="tennis"];\n' " node[\"sport\"=\"tennis\"];\n"
' way["sport"="tennis"];\n' " way[\"sport\"=\"tennis\"];\n"
' relation["sport"="tennis"];\n' " rel[\"sport\"=\"tennis\"];\n"
");\n" ");\n"
"out center;" "out center;"
) )
def _query_region(session: niquests.Session, region: dict) -> list[dict]:
"""POST one regional Overpass query. Returns list of OSM elements."""
query = _region_query(region["bbox"])
resp = session.post(
OVERPASS_URL,
data={"data": query},
timeout=REGION_TIMEOUT_SECONDS,
)
resp.raise_for_status()
return resp.json().get("elements", [])
def extract( def extract(
landing_dir: Path, landing_dir: Path,
year_month: str, year_month: str,
conn: sqlite3.Connection, conn: sqlite3.Connection,
session: niquests.Session, session: niquests.Session,
) -> dict: ) -> dict:
"""POST OverpassQL query for tennis courts and write raw OSM JSON. Returns run metrics.""" """Query Overpass for global tennis courts using regional bbox splitting.
Splits the global query into REGIONS to avoid Overpass timeout.
Writes one OSM element per line to courts.jsonl.gz.
Crash-safe: working.jsonl accumulates results; on restart already-written
element IDs are skipped so completed regions produce 0 new elements.
"""
assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}"
assert "/" in year_month and len(year_month) == 7, f"year_month must be YYYY/MM: {year_month!r}"
year, month = year_month.split("/") year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "overpass_tennis", year, month) dest_dir = landing_path(landing_dir, "overpass_tennis", year, month)
dest = dest_dir / "courts.json.gz" dest = dest_dir / "courts.jsonl.gz"
old_blob = dest_dir / "courts.json.gz"
logger.info("POST %s (sport=tennis, ~150K+ results expected)", OVERPASS_URL) if dest.exists() or old_blob.exists():
resp = session.post( logger.info("Already have courts for %s — skipping", year_month)
OVERPASS_URL, return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
data={"data": OVERPASS_QUERY},
timeout=TENNIS_OVERPASS_TIMEOUT_SECONDS, # Crash recovery: load already-written elements from the working file
working_path = dest_dir / "courts.working.jsonl"
prior_records, already_seen_ids = load_partial_results(working_path, id_key="id")
if already_seen_ids:
logger.info("Resuming: %d elements already in working file", len(already_seen_ids))
total_new = 0
regions_succeeded: list[str] = []
regions_failed: list[str] = []
working_file = open(working_path, "a") # noqa: SIM115
try:
for i, region in enumerate(REGIONS):
for attempt in range(MAX_RETRIES_PER_REGION + 1):
try:
elements = _query_region(session, region)
new_elements = [e for e in elements if str(e.get("id", "")) not in already_seen_ids]
for elem in new_elements:
working_file.write(json.dumps(elem, separators=(",", ":")) + "\n")
already_seen_ids.add(str(elem["id"]))
working_file.flush()
total_new += len(new_elements)
regions_succeeded.append(region["name"])
logger.info(
"Region %s: %d elements (%d new, %d total)",
region["name"], len(elements), len(new_elements), len(already_seen_ids),
)
break
except niquests.exceptions.RequestException as exc:
if attempt < MAX_RETRIES_PER_REGION:
logger.warning(
"Region %s attempt %d failed: %s — retrying in %ds",
region["name"], attempt + 1, exc, RETRY_DELAY_SECONDS,
)
time.sleep(RETRY_DELAY_SECONDS)
else:
regions_failed.append(region["name"])
logger.error(
"Region %s failed after %d attempts: %s",
region["name"], MAX_RETRIES_PER_REGION + 1, exc,
) )
resp.raise_for_status()
size_bytes = len(resp.content) if i < len(REGIONS) - 1:
logger.info("%s bytes received", f"{size_bytes:,}") time.sleep(INTER_REGION_DELAY_SECONDS)
finally:
working_file.close()
bytes_written = write_gzip_atomic(dest, resp.content) total_elements = len(prior_records) + total_new
logger.info("wrote %s (%s bytes compressed)", dest, f"{bytes_written:,}") if total_elements == 0:
raise RuntimeError(f"All regions failed, no elements written: {regions_failed}")
if regions_failed:
logger.warning("Completed with %d failed regions: %s", len(regions_failed), regions_failed)
bytes_written = compress_jsonl_atomic(working_path, dest)
logger.info(
"%d total elements (%d regions, %d failed) -> %s (%s bytes)",
total_elements, len(regions_succeeded), len(regions_failed), dest, f"{bytes_written:,}",
)
return { return {
"files_written": 1, "files_written": 1,

View File

@@ -16,7 +16,7 @@ records (a few seconds of work with 10 workers) are lost on crash.
Recheck mode: re-queries venues with slots starting within the next 90 minutes. Recheck mode: re-queries venues with slots starting within the next 90 minutes.
Writes a separate recheck file for more accurate occupancy measurement. Writes a separate recheck file for more accurate occupancy measurement.
Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.json.gz Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.jsonl.gz
Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.json.gz Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.json.gz
""" """
@@ -34,7 +34,13 @@ import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging
from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler
from .utils import flush_partial_batch, landing_path, load_partial_results, write_gzip_atomic from .utils import (
compress_jsonl_atomic,
flush_partial_batch,
landing_path,
load_partial_results,
write_gzip_atomic,
)
logger = setup_logging("padelnomics.extract.playtomic_availability") logger = setup_logging("padelnomics.extract.playtomic_availability")
@@ -65,25 +71,41 @@ _thread_local = threading.local()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _load_tenant_ids(landing_dir: Path) -> list[str]: def _load_tenant_ids(landing_dir: Path) -> list[str]:
"""Read tenant IDs from the most recent tenants.json.gz file.""" """Read tenant IDs from the most recent tenants file (JSONL or blob format)."""
assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}" assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}"
playtomic_dir = landing_dir / "playtomic" playtomic_dir = landing_dir / "playtomic"
if not playtomic_dir.exists(): if not playtomic_dir.exists():
return [] return []
# Prefer JSONL (new format), fall back to blob (old format)
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.jsonl.gz"), reverse=True)
if not tenant_files:
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True) tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True)
if not tenant_files: if not tenant_files:
return [] return []
latest = tenant_files[0] latest = tenant_files[0]
logger.info("Loading tenant IDs from %s", latest) logger.info("Loading tenant IDs from %s", latest)
with gzip.open(latest, "rb") as f:
data = json.loads(f.read())
tenants = data.get("tenants", [])
ids = [] ids = []
for t in tenants:
with gzip.open(latest, "rt") as f:
if latest.name.endswith(".jsonl.gz"):
# JSONL: one tenant object per line
for line in f:
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
tid = record.get("tenant_id") or record.get("id")
if tid:
ids.append(tid)
except json.JSONDecodeError:
break # truncated last line
else:
# Blob: {"tenants": [...]}
data = json.loads(f.read())
for t in data.get("tenants", []):
tid = t.get("tenant_id") or t.get("id") tid = t.get("tenant_id") or t.get("id")
if tid: if tid:
ids.append(tid) ids.append(tid)
@@ -257,14 +279,14 @@ def extract(
year, month = year_month.split("/") year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "playtomic", year, month) dest_dir = landing_path(landing_dir, "playtomic", year, month)
dest = dest_dir / f"availability_{target_date}.json.gz" dest = dest_dir / f"availability_{target_date}.jsonl.gz"
old_blob = dest_dir / f"availability_{target_date}.json.gz"
if dest.exists(): if dest.exists() or old_blob.exists():
logger.info("Already have %s — skipping", dest) logger.info("Already have availability for %s — skipping", target_date)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
# Crash resumption: load already-fetched venues from partial file # Crash resumption: load already-fetched venues from working file
partial_path = dest.with_suffix(".partial.jsonl") partial_path = dest_dir / f"availability_{target_date}.working.jsonl"
prior_results, already_done = load_partial_results(partial_path, id_key="tenant_id") prior_results, already_done = load_partial_results(partial_path, id_key="tenant_id")
if already_done: if already_done:
logger.info("Resuming: %d venues already fetched from partial file", len(already_done)) logger.info("Resuming: %d venues already fetched from partial file", len(already_done))
@@ -281,7 +303,10 @@ def extract(
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S") start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S") start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S")
# Partial file for incremental crash-safe progress # Timestamp stamped into every JSONL line — computed once before the fetch loop.
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
# Working file for incremental crash-safe progress (IS the final file).
partial_file = open(partial_path, "a") # noqa: SIM115 partial_file = open(partial_path, "a") # noqa: SIM115
partial_lock = threading.Lock() partial_lock = threading.Lock()
pending_batch: list[dict] = [] pending_batch: list[dict] = []
@@ -289,6 +314,9 @@ def extract(
def _on_result(result: dict) -> None: def _on_result(result: dict) -> None:
# Called inside _fetch_venues_parallel's lock — no additional locking needed. # Called inside _fetch_venues_parallel's lock — no additional locking needed.
# In serial mode, called single-threaded — also safe without extra locking. # In serial mode, called single-threaded — also safe without extra locking.
# Inject date + captured_at so every JSONL line is self-contained.
result["date"] = target_date
result["captured_at_utc"] = captured_at
pending_batch.append(result) pending_batch.append(result)
if len(pending_batch) >= PARTIAL_FLUSH_SIZE: if len(pending_batch) >= PARTIAL_FLUSH_SIZE:
flush_partial_batch(partial_file, partial_lock, pending_batch) flush_partial_batch(partial_file, partial_lock, pending_batch)
@@ -332,24 +360,13 @@ def extract(
pending_batch.clear() pending_batch.clear()
partial_file.close() partial_file.close()
# Consolidate prior (resumed) + new results into final file # Working file IS the output — compress atomically (deletes source).
venues_data = prior_results + new_venues_data total_venues = len(prior_results) + len(new_venues_data)
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") bytes_written = compress_jsonl_atomic(partial_path, dest)
payload = json.dumps({
"date": target_date,
"captured_at_utc": captured_at,
"venue_count": len(venues_data),
"venues_errored": venues_errored,
"venues": venues_data,
}).encode()
bytes_written = write_gzip_atomic(dest, payload)
if partial_path.exists():
partial_path.unlink()
logger.info( logger.info(
"%d venues scraped (%d errors) -> %s (%s bytes)", "%d venues scraped (%d errors) -> %s (%s bytes)",
len(venues_data), venues_errored, dest, f"{bytes_written:,}", total_venues, venues_errored, dest, f"{bytes_written:,}",
) )
return { return {
@@ -364,14 +381,36 @@ def extract(
# Recheck mode — re-query venues with upcoming slots for accurate occupancy # Recheck mode — re-query venues with upcoming slots for accurate occupancy
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _read_availability_jsonl(path: Path) -> dict:
"""Read a JSONL availability file into the blob dict format recheck expects."""
venues = []
date_val = captured_at = None
with gzip.open(path, "rt") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
except json.JSONDecodeError:
break # truncated last line on crash
if date_val is None:
date_val = record.get("date")
captured_at = record.get("captured_at_utc")
venues.append(record)
return {"date": date_val, "captured_at_utc": captured_at, "venues": venues}
def _load_morning_availability(landing_dir: Path, target_date: str) -> dict | None: def _load_morning_availability(landing_dir: Path, target_date: str) -> dict | None:
"""Load today's morning availability file. Returns parsed JSON or None.""" """Load today's morning availability file (JSONL or blob). Returns dict or None."""
playtomic_dir = landing_dir / "playtomic" playtomic_dir = landing_dir / "playtomic"
# Search across year/month dirs for the target date # Try JSONL first (new format), fall back to blob (old format)
matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.jsonl.gz"))
if matches:
return _read_availability_jsonl(matches[0])
matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.json.gz")) matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.json.gz"))
if not matches: if not matches:
return None return None
with gzip.open(matches[0], "rb") as f: with gzip.open(matches[0], "rb") as f:
return json.loads(f.read()) return json.loads(f.read())

View File

@@ -1,8 +1,8 @@
"""Playtomic tenants extractor — venue listings via unauthenticated API. """Playtomic tenants extractor — venue listings via unauthenticated API.
Paginates through the global tenant list (sorted by UUID) using the `page` Paginates through the global tenant list (sorted by UUID) using the `page`
parameter. Deduplicates on tenant_id and writes a single consolidated JSON parameter. Deduplicates on tenant_id and writes a gzipped JSONL file to the
to the landing zone. landing zone (one tenant object per line).
API notes (discovered 2026-02): API notes (discovered 2026-02):
- bbox params (min_latitude etc.) are silently ignored by the API - bbox params (min_latitude etc.) are silently ignored by the API
@@ -18,7 +18,7 @@ pages.
Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2). Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2).
Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.json.gz Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.jsonl.gz
""" """
import json import json
@@ -31,7 +31,7 @@ import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging
from .proxy import load_proxy_urls, make_round_robin_cycler from .proxy import load_proxy_urls, make_round_robin_cycler
from .utils import landing_path, write_gzip_atomic from .utils import compress_jsonl_atomic, landing_path
logger = setup_logging("padelnomics.extract.playtomic_tenants") logger = setup_logging("padelnomics.extract.playtomic_tenants")
@@ -76,7 +76,7 @@ def extract(
"""Fetch all Playtomic venues via global pagination. Returns run metrics.""" """Fetch all Playtomic venues via global pagination. Returns run metrics."""
year, month = year_month.split("/") year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "playtomic", year, month) dest_dir = landing_path(landing_dir, "playtomic", year, month)
dest = dest_dir / "tenants.json.gz" dest = dest_dir / "tenants.jsonl.gz"
proxy_urls = load_proxy_urls() proxy_urls = load_proxy_urls()
next_proxy = make_round_robin_cycler(proxy_urls) if proxy_urls else None next_proxy = make_round_robin_cycler(proxy_urls) if proxy_urls else None
@@ -138,8 +138,12 @@ def extract(
if not next_proxy: if not next_proxy:
time.sleep(THROTTLE_SECONDS) time.sleep(THROTTLE_SECONDS)
payload = json.dumps({"tenants": all_tenants, "count": len(all_tenants)}).encode() # Write each tenant as a JSONL line, then compress atomically
bytes_written = write_gzip_atomic(dest, payload) working_path = dest.with_suffix(".working.jsonl")
with open(working_path, "w") as f:
for tenant in all_tenants:
f.write(json.dumps(tenant, separators=(",", ":")) + "\n")
bytes_written = compress_jsonl_atomic(working_path, dest)
logger.info("%d unique venues -> %s", len(all_tenants), dest) logger.info("%d unique venues -> %s", len(all_tenants), dest)
return { return {

View File

@@ -174,3 +174,23 @@ def write_gzip_atomic(path: Path, data: bytes) -> int:
tmp.write_bytes(compressed) tmp.write_bytes(compressed)
tmp.rename(path) tmp.rename(path)
return len(compressed) return len(compressed)
def compress_jsonl_atomic(jsonl_path: Path, dest_path: Path) -> int:
"""Compress a JSONL working file to .jsonl.gz atomically, then delete the source.
Streams compression in 1MB chunks (constant memory regardless of file size).
Atomic via .tmp rename — readers never see a partial .jsonl.gz.
Deletes the uncompressed working file after successful compression.
Returns compressed bytes written.
"""
assert jsonl_path.exists(), f"source must exist: {jsonl_path}"
assert jsonl_path.stat().st_size > 0, f"source must not be empty: {jsonl_path}"
tmp = dest_path.with_suffix(dest_path.suffix + ".tmp")
with open(jsonl_path, "rb") as f_in, gzip.open(tmp, "wb") as f_out:
while chunk := f_in.read(1_048_576): # 1 MB chunks
f_out.write(chunk)
bytes_written = tmp.stat().st_size
tmp.rename(dest_path)
jsonl_path.unlink()
return bytes_written

View File

@@ -0,0 +1,101 @@
"""Create minimal landing zone seed files so SQLMesh models can run before real data arrives.
Each seed contains one null/empty record that is filtered out by the staging model's
WHERE clause. Seeds live in the 1970/01 epoch so they're never confused with real data.
Usage:
uv run python scripts/init_landing_seeds.py [--landing-dir data/landing]
Idempotent: skips existing files.
"""
import argparse
import gzip
import json
from pathlib import Path
def create_seed(dest: Path, content: bytes) -> None:
"""Write content to a gzip file atomically. Skips if the file already exists."""
if dest.exists():
return
dest.parent.mkdir(parents=True, exist_ok=True)
tmp = dest.with_suffix(dest.suffix + ".tmp")
with gzip.open(tmp, "wb") as f:
f.write(content)
tmp.rename(dest)
print(f" created: {dest}")
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--landing-dir", default="data/landing", type=Path)
args = parser.parse_args()
base: Path = args.landing_dir
seeds = {
# --- Playtomic tenants ---
# JSONL: one null tenant (filtered by WHERE tenant_id IS NOT NULL)
"playtomic/1970/01/tenants.jsonl.gz":
b'{"tenant_id":null}\n',
# Blob: empty tenants array
"playtomic/1970/01/tenants.json.gz":
json.dumps({"tenants": [], "count": 0}).encode(),
# --- Playtomic availability (morning) ---
# JSONL: one null venue (filtered by WHERE tenant_id IS NOT NULL)
"playtomic/1970/01/availability_1970-01-01.jsonl.gz":
b'{"tenant_id":null,"date":"1970-01-01","captured_at_utc":"1970-01-01T00:00:00Z","slots":null}\n',
# Blob: empty venues array
"playtomic/1970/01/availability_1970-01-01.json.gz":
json.dumps({"date": "1970-01-01", "captured_at_utc": "1970-01-01T00:00:00Z",
"venue_count": 0, "venues": []}).encode(),
# --- Playtomic recheck (blob only, small format) ---
"playtomic/1970/01/availability_1970-01-01_recheck_00.json.gz":
json.dumps({"date": "1970-01-01", "captured_at_utc": "1970-01-01T00:00:00Z",
"recheck_hour": 0, "venues": []}).encode(),
# --- GeoNames ---
# JSONL: one null city (filtered by WHERE geoname_id IS NOT NULL)
"geonames/1970/01/cities_global.jsonl.gz":
b'{"geoname_id":null}\n',
# Blob: empty rows array
"geonames/1970/01/cities_global.json.gz":
json.dumps({"rows": [], "count": 0}).encode(),
# --- Overpass tennis ---
# JSONL: one null element (filtered by WHERE type IS NOT NULL)
"overpass_tennis/1970/01/courts.jsonl.gz":
b'{"type":null,"id":null}\n',
# Blob: empty elements array
"overpass_tennis/1970/01/courts.json.gz":
json.dumps({"version": 0.6, "elements": []}).encode(),
# --- Overpass padel (unchanged format) ---
"overpass/1970/01/courts.json.gz":
json.dumps({"version": 0.6, "elements": []}).encode(),
# --- Eurostat ---
"eurostat/1970/01/urb_cpop1.json.gz":
json.dumps({"rows": [], "count": 0}).encode(),
"eurostat/1970/01/ilc_di03.json.gz":
json.dumps({"rows": [], "count": 0}).encode(),
"eurostat_city_labels/1970/01/cities_codelist.json.gz":
json.dumps({"rows": [], "count": 0}).encode(),
# --- National statistics ---
"ons_uk/1970/01/lad_population.json.gz":
json.dumps({"rows": [], "count": 0}).encode(),
"census_usa/1970/01/acs5_places.json.gz":
json.dumps({"rows": [], "count": 0}).encode(),
}
print(f"Initialising landing seeds in: {base}")
for rel_path, content in seeds.items():
create_seed(base / rel_path, content)
print("Done.")
if __name__ == "__main__":
main()

View File

@@ -3,12 +3,17 @@
-- "Available" = the slot was NOT booked at capture time. Missing slots = booked. -- "Available" = the slot was NOT booked at capture time. Missing slots = booked.
-- --
-- Reads BOTH morning snapshots and recheck files: -- Reads BOTH morning snapshots and recheck files:
-- Morning: availability_{date}.json.gz → snapshot_type = 'morning' -- Morning (new): availability_{date}.jsonl.gz → snapshot_type = 'morning'
-- Morning (old): availability_{date}.json.gz → snapshot_type = 'morning'
-- Recheck: availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck' -- Recheck: availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck'
-- --
-- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit). -- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit).
-- Price parsed from strings like "14.56 EUR" or "48 GBP". -- Price parsed from strings like "14.56 EUR" or "48 GBP".
-- --
-- Supports two morning landing formats (UNION ALL during migration):
-- New: availability_{date}.jsonl.gz — one venue per line, columns: tenant_id, slots, date, captured_at_utc
-- Old: availability_{date}.json.gz — {"date":..., "venues": [...]} blob (UNNEST required)
--
-- Requires: at least one availability file in the landing zone. -- Requires: at least one availability file in the landing zone.
-- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz) -- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz)
-- with empty venues[] ensures this model runs before real data arrives. -- with empty venues[] ensures this model runs before real data arrives.
@@ -20,12 +25,41 @@ MODEL (
grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc) grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc)
); );
-- Morning snapshots (filename does NOT contain '_recheck_') WITH
WITH morning_files AS ( -- New format: one venue per JSONL line — no outer UNNEST needed
morning_jsonl AS (
SELECT SELECT
*, date AS snapshot_date,
captured_at_utc,
'morning' AS snapshot_type, 'morning' AS snapshot_type,
NULL::INTEGER AS recheck_hour NULL::INTEGER AS recheck_hour,
tenant_id,
slots AS slots_json
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/availability_*.jsonl.gz',
format = 'newline_delimited',
columns = {
date: 'VARCHAR',
captured_at_utc: 'VARCHAR',
tenant_id: 'VARCHAR',
slots: 'JSON'
},
filename = true
)
WHERE filename NOT LIKE '%_recheck_%'
AND tenant_id IS NOT NULL
),
-- Old format: {"date":..., "venues": [...]} blob — kept for transition
morning_blob AS (
SELECT
af.date AS snapshot_date,
af.captured_at_utc,
'morning' AS snapshot_type,
NULL::INTEGER AS recheck_hour,
venue_json ->> 'tenant_id' AS tenant_id,
venue_json -> 'slots' AS slots_json
FROM (
SELECT date, captured_at_utc, venues
FROM read_json( FROM read_json(
@LANDING_DIR || '/playtomic/*/*/availability_*.json.gz', @LANDING_DIR || '/playtomic/*/*/availability_*.json.gz',
format = 'auto', format = 'auto',
@@ -40,17 +74,22 @@ WITH morning_files AS (
WHERE filename NOT LIKE '%_recheck_%' WHERE filename NOT LIKE '%_recheck_%'
AND venues IS NOT NULL AND venues IS NOT NULL
AND json_array_length(venues) > 0 AND json_array_length(venues) > 0
) af,
LATERAL UNNEST(af.venues) AS t(venue_json)
), ),
-- Recheck snapshots (filename contains '_recheck_') -- Recheck snapshots (blob format only — small files, no JSONL conversion needed)
-- Use TRY_CAST on a regex-extracted hour to get the recheck_hour. recheck_blob AS (
-- If no recheck files exist yet, this CTE produces zero rows (safe).
recheck_files AS (
SELECT SELECT
*, rf.date AS snapshot_date,
rf.captured_at_utc,
'recheck' AS snapshot_type, 'recheck' AS snapshot_type,
TRY_CAST( TRY_CAST(
regexp_extract(filename, '_recheck_(\d+)', 1) AS INTEGER regexp_extract(rf.filename, '_recheck_(\d+)', 1) AS INTEGER
) AS recheck_hour ) AS recheck_hour,
venue_json ->> 'tenant_id' AS tenant_id,
venue_json -> 'slots' AS slots_json
FROM (
SELECT date, captured_at_utc, venues, filename
FROM read_json( FROM read_json(
@LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz', @LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz',
format = 'auto', format = 'auto',
@@ -64,33 +103,27 @@ recheck_files AS (
) )
WHERE venues IS NOT NULL WHERE venues IS NOT NULL
AND json_array_length(venues) > 0 AND json_array_length(venues) > 0
) rf,
LATERAL UNNEST(rf.venues) AS t(venue_json)
), ),
all_files AS ( all_venues AS (
SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM morning_files SELECT * FROM morning_jsonl
UNION ALL UNION ALL
SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM recheck_files SELECT * FROM morning_blob
), UNION ALL
raw_venues AS ( SELECT * FROM recheck_blob
SELECT
af.date AS snapshot_date,
af.captured_at_utc,
af.snapshot_type,
af.recheck_hour,
venue_json
FROM all_files af,
LATERAL UNNEST(af.venues) AS t(venue_json)
), ),
raw_resources AS ( raw_resources AS (
SELECT SELECT
rv.snapshot_date, av.snapshot_date,
rv.captured_at_utc, av.captured_at_utc,
rv.snapshot_type, av.snapshot_type,
rv.recheck_hour, av.recheck_hour,
rv.venue_json ->> 'tenant_id' AS tenant_id, av.tenant_id,
resource_json resource_json
FROM raw_venues rv, FROM all_venues av,
LATERAL UNNEST( LATERAL UNNEST(
from_json(rv.venue_json -> 'slots', '["JSON"]') from_json(av.slots_json, '["JSON"]')
) AS t(resource_json) ) AS t(resource_json)
), ),
raw_slots AS ( raw_slots AS (

View File

@@ -5,8 +5,11 @@
-- DuckDB auto-infers opening_hours as STRUCT, so we access each day by literal -- DuckDB auto-infers opening_hours as STRUCT, so we access each day by literal
-- key (no dynamic access) and UNION ALL to unpivot. -- key (no dynamic access) and UNION ALL to unpivot.
-- --
-- Source: data/landing/playtomic/{year}/{month}/tenants.json.gz -- Supports two landing formats (UNION ALL during migration):
-- Each tenant has opening_hours: {MONDAY: {opening_time, closing_time}, ...} -- New: tenants.jsonl.gz — one tenant per line, opening_hours is a top-level JSON column
-- Old: tenants.json.gz — {"tenants": [...]} blob (UNNEST required)
--
-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz
MODEL ( MODEL (
name staging.stg_playtomic_opening_hours, name staging.stg_playtomic_opening_hours,
@@ -15,7 +18,22 @@ MODEL (
grain (tenant_id, day_of_week) grain (tenant_id, day_of_week)
); );
WITH venues AS ( WITH
-- New format: one tenant per JSONL line
jsonl_venues AS (
SELECT
tenant_id,
opening_hours AS oh
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz',
format = 'newline_delimited',
columns = {tenant_id: 'VARCHAR', opening_hours: 'JSON'}
)
WHERE tenant_id IS NOT NULL
AND opening_hours IS NOT NULL
),
-- Old format: blob
blob_venues AS (
SELECT SELECT
tenant ->> 'tenant_id' AS tenant_id, tenant ->> 'tenant_id' AS tenant_id,
tenant -> 'opening_hours' AS oh tenant -> 'opening_hours' AS oh
@@ -30,6 +48,11 @@ WITH venues AS (
WHERE (tenant ->> 'tenant_id') IS NOT NULL WHERE (tenant ->> 'tenant_id') IS NOT NULL
AND (tenant -> 'opening_hours') IS NOT NULL AND (tenant -> 'opening_hours') IS NOT NULL
), ),
venues AS (
SELECT * FROM jsonl_venues
UNION ALL
SELECT * FROM blob_venues
),
-- Unpivot by UNION ALL — 7 literal key accesses -- Unpivot by UNION ALL — 7 literal key accesses
unpivoted AS ( unpivoted AS (
SELECT tenant_id, 'MONDAY' AS day_of_week, 1 AS day_number, SELECT tenant_id, 'MONDAY' AS day_of_week, 1 AS day_number,

View File

@@ -1,9 +1,12 @@
-- Individual court (resource) records from Playtomic venues. -- Individual court (resource) records from Playtomic venues.
-- Reads resources array from the landing zone JSON directly (double UNNEST: -- Reads resources array from the landing zone to extract court type, size,
-- tenants → resources) to extract court type, size, surface, and booking config. -- surface, and booking config.
-- --
-- Source: data/landing/playtomic/{year}/{month}/tenants.json.gz -- Supports two landing formats (UNION ALL during migration):
-- Each tenant has a resources[] array of court objects. -- New: tenants.jsonl.gz — one tenant per line, resources is a top-level JSON column
-- Old: tenants.json.gz — {"tenants": [...]} blob (double UNNEST: tenants → resources)
--
-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz
MODEL ( MODEL (
name staging.stg_playtomic_resources, name staging.stg_playtomic_resources,
@@ -12,22 +15,42 @@ MODEL (
grain (tenant_id, resource_id) grain (tenant_id, resource_id)
); );
WITH raw AS ( WITH
-- New format: one tenant per JSONL line — single UNNEST for resources
jsonl_unnested AS (
SELECT
tenant_id,
UPPER(address ->> 'country_code') AS country_code,
UNNEST(from_json(resources, '["JSON"]')) AS resource_json
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz',
format = 'newline_delimited',
columns = {tenant_id: 'VARCHAR', address: 'JSON', resources: 'JSON'}
)
WHERE tenant_id IS NOT NULL
AND resources IS NOT NULL
),
-- Old format: blob — double UNNEST (tenants → resources)
blob_unnested AS (
SELECT
tenant ->> 'tenant_id' AS tenant_id,
UPPER(tenant -> 'address' ->> 'country_code') AS country_code,
UNNEST(from_json(tenant -> 'resources', '["JSON"]')) AS resource_json
FROM (
SELECT UNNEST(tenants) AS tenant SELECT UNNEST(tenants) AS tenant
FROM read_json( FROM read_json(
@LANDING_DIR || '/playtomic/*/*/tenants.json.gz', @LANDING_DIR || '/playtomic/*/*/tenants.json.gz',
format = 'auto', format = 'auto',
maximum_object_size = 134217728 maximum_object_size = 134217728
) )
), )
unnested AS (
SELECT
tenant ->> 'tenant_id' AS tenant_id,
UPPER(tenant -> 'address' ->> 'country_code') AS country_code,
UNNEST(from_json(tenant -> 'resources', '["JSON"]')) AS resource_json
FROM raw
WHERE (tenant ->> 'tenant_id') IS NOT NULL WHERE (tenant ->> 'tenant_id') IS NOT NULL
AND (tenant -> 'resources') IS NOT NULL AND (tenant -> 'resources') IS NOT NULL
),
unnested AS (
SELECT * FROM jsonl_unnested
UNION ALL
SELECT * FROM blob_unnested
) )
SELECT SELECT
tenant_id, tenant_id,

View File

@@ -1,10 +1,13 @@
-- Playtomic padel venue records — full metadata extraction. -- Playtomic padel venue records — full metadata extraction.
-- Reads landing zone JSON, unnests tenant array, extracts all venue metadata -- Reads landing zone tenants files, extracts all venue metadata
-- including address, opening hours, court resources, VAT rate, and facilities. -- including address, opening hours, court resources, VAT rate, and facilities.
-- Deduplicates on tenant_id (keeps most recent extraction). -- Deduplicates on tenant_id (keeps most recent extraction).
-- --
-- Source: data/landing/playtomic/{year}/{month}/tenants.json.gz -- Supports two landing formats (UNION ALL during migration):
-- Format: {"tenants": [{tenant_id, tenant_name, address, resources, opening_hours, ...}]} -- New: tenants.jsonl.gz — one tenant JSON object per line (no UNNEST needed)
-- Old: tenants.json.gz — {"tenants": [{...}]} blob (UNNEST required)
--
-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz
MODEL ( MODEL (
name staging.stg_playtomic_venues, name staging.stg_playtomic_venues,
@@ -13,9 +16,52 @@ MODEL (
grain tenant_id grain tenant_id
); );
WITH parsed AS ( WITH
-- New format: one tenant per JSONL line — no UNNEST, access columns directly
jsonl_parsed AS (
SELECT
tenant_id,
tenant_name,
slug,
tenant_type,
tenant_status,
playtomic_status,
booking_type,
address ->> 'street' AS street,
address ->> 'city' AS city,
address ->> 'postal_code' AS postal_code,
UPPER(address ->> 'country_code') AS country_code,
address ->> 'timezone' AS timezone,
address ->> 'administrative_area' AS administrative_area,
TRY_CAST(address -> 'coordinate' ->> 'lat' AS DOUBLE) AS lat,
TRY_CAST(address -> 'coordinate' ->> 'lon' AS DOUBLE) AS lon,
TRY_CAST(vat_rate AS DOUBLE) AS vat_rate,
default_currency,
TRY_CAST(booking_settings ->> 'booking_ahead_limit' AS INTEGER) AS booking_ahead_limit_minutes,
opening_hours AS opening_hours_json,
resources AS resources_json,
created_at,
CAST(is_playtomic_partner AS VARCHAR) AS is_playtomic_partner_raw,
filename AS source_file,
CURRENT_DATE AS extracted_date
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz',
format = 'newline_delimited',
filename = true,
columns = {
tenant_id: 'VARCHAR', tenant_name: 'VARCHAR', slug: 'VARCHAR',
tenant_type: 'VARCHAR', tenant_status: 'VARCHAR', playtomic_status: 'VARCHAR',
booking_type: 'VARCHAR', address: 'JSON', vat_rate: 'DOUBLE',
default_currency: 'VARCHAR', booking_settings: 'JSON',
opening_hours: 'JSON', resources: 'JSON',
created_at: 'VARCHAR', is_playtomic_partner: 'VARCHAR'
}
)
WHERE tenant_id IS NOT NULL
),
-- Old format: {"tenants": [...]} blob — keep for transition until old files rotate out
blob_parsed AS (
SELECT SELECT
-- Identity
tenant ->> 'tenant_id' AS tenant_id, tenant ->> 'tenant_id' AS tenant_id,
tenant ->> 'tenant_name' AS tenant_name, tenant ->> 'tenant_name' AS tenant_name,
tenant ->> 'slug' AS slug, tenant ->> 'slug' AS slug,
@@ -23,8 +69,6 @@ WITH parsed AS (
tenant ->> 'tenant_status' AS tenant_status, tenant ->> 'tenant_status' AS tenant_status,
tenant ->> 'playtomic_status' AS playtomic_status, tenant ->> 'playtomic_status' AS playtomic_status,
tenant ->> 'booking_type' AS booking_type, tenant ->> 'booking_type' AS booking_type,
-- Address
tenant -> 'address' ->> 'street' AS street, tenant -> 'address' ->> 'street' AS street,
tenant -> 'address' ->> 'city' AS city, tenant -> 'address' ->> 'city' AS city,
tenant -> 'address' ->> 'postal_code' AS postal_code, tenant -> 'address' ->> 'postal_code' AS postal_code,
@@ -33,22 +77,13 @@ WITH parsed AS (
tenant -> 'address' ->> 'administrative_area' AS administrative_area, tenant -> 'address' ->> 'administrative_area' AS administrative_area,
TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lat' AS DOUBLE) AS lat, TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lat' AS DOUBLE) AS lat,
TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lon' AS DOUBLE) AS lon, TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lon' AS DOUBLE) AS lon,
-- Commercial
TRY_CAST(tenant ->> 'vat_rate' AS DOUBLE) AS vat_rate, TRY_CAST(tenant ->> 'vat_rate' AS DOUBLE) AS vat_rate,
tenant ->> 'default_currency' AS default_currency, tenant ->> 'default_currency' AS default_currency,
-- Booking settings (venue-level)
TRY_CAST(tenant -> 'booking_settings' ->> 'booking_ahead_limit' AS INTEGER) AS booking_ahead_limit_minutes, TRY_CAST(tenant -> 'booking_settings' ->> 'booking_ahead_limit' AS INTEGER) AS booking_ahead_limit_minutes,
-- Opening hours and resources stored as JSON for downstream models
tenant -> 'opening_hours' AS opening_hours_json, tenant -> 'opening_hours' AS opening_hours_json,
tenant -> 'resources' AS resources_json, tenant -> 'resources' AS resources_json,
-- Metadata
tenant ->> 'created_at' AS created_at, tenant ->> 'created_at' AS created_at,
tenant ->> 'is_playtomic_partner' AS is_playtomic_partner_raw, tenant ->> 'is_playtomic_partner' AS is_playtomic_partner_raw,
filename AS source_file, filename AS source_file,
CURRENT_DATE AS extracted_date CURRENT_DATE AS extracted_date
FROM ( FROM (
@@ -62,6 +97,11 @@ WITH parsed AS (
) )
WHERE (tenant ->> 'tenant_id') IS NOT NULL WHERE (tenant ->> 'tenant_id') IS NOT NULL
), ),
parsed AS (
SELECT * FROM jsonl_parsed
UNION ALL
SELECT * FROM blob_parsed
),
deduped AS ( deduped AS (
SELECT *, SELECT *,
ROW_NUMBER() OVER (PARTITION BY tenant_id ORDER BY source_file DESC) AS rn ROW_NUMBER() OVER (PARTITION BY tenant_id ORDER BY source_file DESC) AS rn

View File

@@ -3,7 +3,11 @@
-- Broad coverage (140K+ locations) enables Gemeinde-level market intelligence. -- Broad coverage (140K+ locations) enables Gemeinde-level market intelligence.
-- One row per geoname_id (GeoNames stable numeric identifier). -- One row per geoname_id (GeoNames stable numeric identifier).
-- --
-- Source: data/landing/geonames/{year}/{month}/cities_global.json.gz -- Supports two landing formats (UNION ALL during migration):
-- New: cities_global.jsonl.gz — one city per line, columns directly accessible
-- Old: cities_global.json.gz — {"rows": [...]} blob (UNNEST required)
--
-- Source: data/landing/geonames/{year}/{month}/cities_global.{jsonl,json}.gz
MODEL ( MODEL (
name staging.stg_population_geonames, name staging.stg_population_geonames,
@@ -12,7 +16,33 @@ MODEL (
grain geoname_id grain geoname_id
); );
WITH parsed AS ( WITH
-- New format: one city per JSONL line
jsonl_rows AS (
SELECT
TRY_CAST(geoname_id AS INTEGER) AS geoname_id,
city_name,
country_code,
TRY_CAST(lat AS DOUBLE) AS lat,
TRY_CAST(lon AS DOUBLE) AS lon,
admin1_code,
admin2_code,
TRY_CAST(population AS BIGINT) AS population,
TRY_CAST(ref_year AS INTEGER) AS ref_year,
CURRENT_DATE AS extracted_date
FROM read_json(
@LANDING_DIR || '/geonames/*/*/cities_global.jsonl.gz',
format = 'newline_delimited',
columns = {
geoname_id: 'INTEGER', city_name: 'VARCHAR', country_code: 'VARCHAR',
lat: 'DOUBLE', lon: 'DOUBLE', admin1_code: 'VARCHAR', admin2_code: 'VARCHAR',
population: 'BIGINT', ref_year: 'INTEGER'
}
)
WHERE geoname_id IS NOT NULL
),
-- Old format: {"rows": [...]} blob — kept for transition
blob_rows AS (
SELECT SELECT
TRY_CAST(row ->> 'geoname_id' AS INTEGER) AS geoname_id, TRY_CAST(row ->> 'geoname_id' AS INTEGER) AS geoname_id,
row ->> 'city_name' AS city_name, row ->> 'city_name' AS city_name,
@@ -33,6 +63,11 @@ WITH parsed AS (
) )
) )
WHERE (row ->> 'geoname_id') IS NOT NULL WHERE (row ->> 'geoname_id') IS NOT NULL
),
all_rows AS (
SELECT * FROM jsonl_rows
UNION ALL
SELECT * FROM blob_rows
) )
SELECT SELECT
geoname_id, geoname_id,
@@ -45,7 +80,7 @@ SELECT
population, population,
ref_year, ref_year,
extracted_date extracted_date
FROM parsed FROM all_rows
WHERE population IS NOT NULL WHERE population IS NOT NULL
AND population > 0 AND population > 0
AND geoname_id IS NOT NULL AND geoname_id IS NOT NULL

View File

@@ -2,7 +2,12 @@
-- Used as a "racket sport culture" signal in the opportunity score: -- Used as a "racket sport culture" signal in the opportunity score:
-- areas with high tennis court density are prime padel adoption markets. -- areas with high tennis court density are prime padel adoption markets.
-- --
-- Source: data/landing/overpass_tennis/{year}/{month}/courts.json.gz -- Supports two landing formats (UNION ALL during migration):
-- New: courts.jsonl.gz — one OSM element per line; nodes have lat/lon directly,
-- ways/relations have center.lat/center.lon (Overpass out center)
-- Old: courts.json.gz — {"elements": [...]} blob (UNNEST required)
--
-- Source: data/landing/overpass_tennis/{year}/{month}/courts.{jsonl,json}.gz
MODEL ( MODEL (
name staging.stg_tennis_courts, name staging.stg_tennis_courts,
@@ -11,7 +16,39 @@ MODEL (
grain osm_id grain osm_id
); );
WITH parsed AS ( WITH
-- New format: one OSM element per JSONL line
jsonl_elements AS (
SELECT
type AS osm_type,
TRY_CAST(id AS BIGINT) AS osm_id,
-- Nodes: lat/lon direct. Ways/relations: center object (Overpass out center).
COALESCE(
TRY_CAST(lat AS DOUBLE),
TRY_CAST(center ->> 'lat' AS DOUBLE)
) AS lat,
COALESCE(
TRY_CAST(lon AS DOUBLE),
TRY_CAST(center ->> 'lon' AS DOUBLE)
) AS lon,
tags ->> 'name' AS name,
tags ->> 'addr:country' AS country_code,
tags ->> 'addr:city' AS city_tag,
filename AS source_file,
CURRENT_DATE AS extracted_date
FROM read_json(
@LANDING_DIR || '/overpass_tennis/*/*/courts.jsonl.gz',
format = 'newline_delimited',
columns = {
type: 'VARCHAR', id: 'BIGINT', lat: 'DOUBLE', lon: 'DOUBLE',
center: 'JSON', tags: 'JSON'
},
filename = true
)
WHERE type IS NOT NULL
),
-- Old format: {"elements": [...]} blob — kept for transition
blob_elements AS (
SELECT SELECT
elem ->> 'type' AS osm_type, elem ->> 'type' AS osm_type,
(elem ->> 'id')::BIGINT AS osm_id, (elem ->> 'id')::BIGINT AS osm_id,
@@ -32,12 +69,16 @@ WITH parsed AS (
) )
WHERE (elem ->> 'type') IS NOT NULL WHERE (elem ->> 'type') IS NOT NULL
), ),
parsed AS (
SELECT * FROM jsonl_elements
UNION ALL
SELECT * FROM blob_elements
),
deduped AS ( deduped AS (
SELECT *, SELECT *,
ROW_NUMBER() OVER (PARTITION BY osm_id ORDER BY extracted_date DESC) AS rn ROW_NUMBER() OVER (PARTITION BY osm_id ORDER BY extracted_date DESC) AS rn
FROM parsed FROM parsed
WHERE osm_type = 'node' WHERE lat IS NOT NULL AND lon IS NOT NULL
AND lat IS NOT NULL AND lon IS NOT NULL
AND lat BETWEEN -90 AND 90 AND lat BETWEEN -90 AND 90
AND lon BETWEEN -180 AND 180 AND lon BETWEEN -180 AND 180
), ),