diff --git a/CHANGELOG.md b/CHANGELOG.md index b94e75a..9f70d95 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,7 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - 50 new tests covering all template renders (EN + DE), registry structure, gallery routes (access control, list, preview, lang fallback), and compose preview endpoint - **JSONL streaming landing format** — extractors now write one JSON object per line (`.jsonl.gz`) instead of a single large blob, eliminating in-memory accumulation and `maximum_object_size` workarounds: - `playtomic_tenants.py` → `tenants.jsonl.gz` (one tenant per line; dedup still happens in memory before write) - - `playtomic_availability.py` → `availability_{date}.jsonl.gz` (one venue per line with `date`/`captured_at_utc` injected; working file IS the final file — eliminates the consolidation step) + - `playtomic_availability.py` → `availability_{date}.jsonl.gz` (morning) + `availability_{date}_recheck_{HH}.jsonl.gz` (recheck); one venue per line with `date`/`captured_at_utc`/`recheck_hour` injected - `geonames.py` → `cities_global.jsonl.gz` (one city per line; eliminates 30 MB blob and its `maximum_object_size` workaround) - `compress_jsonl_atomic(jsonl_path, dest_path)` utility added to `utils.py` — streams compression in 1 MB chunks, atomic `.tmp` rename, deletes source - **Regional Overpass splitting for tennis courts** — replaces single global query (150K+ elements, timed out) with 10 regional bbox queries (~10-40K elements each, 150s server / 180s client): @@ -49,7 +49,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ### Changed - All modified staging SQL models use **UNION ALL transition CTEs** — both JSONL (new) and blob (old) formats are readable simultaneously; old `.json.gz` files in the landing zone continue working until they rotate out naturally: - `stg_playtomic_venues`, `stg_playtomic_resources`, `stg_playtomic_opening_hours` — JSONL top-level columns (no `UNNEST(tenants)`) - - `stg_playtomic_availability` — JSONL morning files + blob morning files + blob recheck files + - `stg_playtomic_availability` — JSONL morning + recheck files; blob morning + recheck kept for transition - `stg_population_geonames` — JSONL city rows (no `UNNEST(rows)`, no `maximum_object_size`) - `stg_tennis_courts` — JSONL elements with `COALESCE(lat, center.lat)` for way/relation centre coords; blob UNNEST kept for old files diff --git a/docs/data-sources-inventory.md b/docs/data-sources-inventory.md index ea5f370..d94e45a 100644 --- a/docs/data-sources-inventory.md +++ b/docs/data-sources-inventory.md @@ -118,7 +118,7 @@ Playtomic covers 16,000+ courts globally. The platform is dominant in Spain, UK, **Pipeline implementation (tenants):** ✅ Ingested - Extractor: `extract-playtomic-tenants` — paginated global scrape of `GET /v1/tenants?sport_ids=PADEL`, page size 100, up to 500 pages -- Landing: `data/landing/playtomic/{year}/{month}/tenants.json.gz` (~14K venues as of Feb 2026) +- Landing: `data/landing/playtomic/{year}/{month}/tenants.jsonl.gz` (~14K venues as of Feb 2026) - Throttle: 2 s between pages; deduplicates on `tenant_id` - Staging models (all grain `tenant_id` or `(tenant_id, resource_id)`): - `stg_playtomic_venues` — venue metadata: name, address, city, country, coordinates, booking type, status @@ -127,9 +127,10 @@ Playtomic covers 16,000+ courts globally. The platform is dominant in Spain, UK, **Pipeline implementation (availability):** ✅ Ingested - Extractor: `extract-playtomic-availability` — reads tenant IDs from latest tenants file, queries `GET /v1/availability` for next-day slots per venue -- Landing: `data/landing/playtomic/{year}/{month}/{date}/availability_morning.json.gz` + `availability_recheck.json.gz` -- Recheck mode: re-queries slots starting within 90 min (controlled by `RECHECK_WINDOW_MINUTES`); captures near-real-time fill rates -- Parallelism: `EXTRACT_WORKERS` env var; `PROXY_URLS` for distributed rate limiting; throttle 1 s per venue per worker +- Landing: `data/landing/playtomic/{year}/{month}/availability_{date}.jsonl.gz` (morning) + `availability_{date}_recheck_{HH}.jsonl.gz` (recheck) +- Old blob format (`.json.gz`) retained in landing zone alongside JSONL; staging reads both +- Recheck mode: re-queries slots starting within `RECHECK_WINDOW_MINUTES` (default 30); captures near-real-time fill rates +- Parallelism: worker count derived from `PROXY_URLS` length; throttle 1 s per venue per worker - Staging: `stg_playtomic_availability`, grain `(snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc)` --- diff --git a/extract/padelnomics_extract/README.md b/extract/padelnomics_extract/README.md index 4a6d75e..eb2518a 100644 --- a/extract/padelnomics_extract/README.md +++ b/extract/padelnomics_extract/README.md @@ -37,7 +37,7 @@ src/padelnomics_extract/ ```python from ._shared import run_extractor, setup_logging -from .utils import landing_path, write_gzip_atomic +from .utils import compress_jsonl_atomic, landing_path logger = setup_logging("padelnomics.extract.my_source") EXTRACTOR_NAME = "my_source" @@ -108,18 +108,23 @@ sqlite3 data/landing/.state.sqlite \ ``` data/landing/ ├── .state.sqlite -├── overpass/{year}/{month}/courts.json.gz +├── overpass/{year}/{month}/courts.{jsonl,json}.gz +├── overpass_tennis/{year}/{month}/courts.{jsonl,json}.gz ├── eurostat/{year}/{month}/urb_cpop1.json.gz ├── eurostat/{year}/{month}/ilc_di03.json.gz -├── playtomic/{year}/{month}/tenants.json.gz -└── playtomic/{year}/{month}/availability_{date}.json.gz +├── geonames/{year}/{month}/cities_global.{jsonl,json}.gz +├── playtomic/{year}/{month}/tenants.{jsonl,json}.gz +├── playtomic/{year}/{month}/availability_{date}.{jsonl,json}.gz +└── playtomic/{year}/{month}/availability_{date}_recheck_{HH}.{jsonl,json}.gz ``` ## Data sources | Source | Module | Schedule | Notes | |--------|--------|----------|-------| -| Overpass API | `overpass.py` | Daily | OSM padel courts, ~5K nodes | +| Overpass API (padel) | `overpass.py` | Daily | OSM padel courts, ~5K nodes; JSONL output | +| Overpass API (tennis) | `overpass_tennis.py` | Daily | OSM tennis courts, ~150K+ nodes; regional splits; JSONL output | | Eurostat | `eurostat.py` | Daily (304 most runs) | urb_cpop1, ilc_di03 — etag dedup | -| Playtomic tenants | `playtomic_tenants.py` | Daily | ~8K venues, bounded pagination | -| Playtomic availability | `playtomic_availability.py` | Daily | Next-day slots, ~4.5h runtime | +| GeoNames | `geonames.py` | Daily | ~140K locations (pop ≥1K); JSONL output | +| Playtomic tenants | `playtomic_tenants.py` | Daily | ~14K venues, bounded pagination; JSONL output | +| Playtomic availability | `playtomic_availability.py` | Daily + recheck | Morning: next-day slots; recheck: near-real-time fill; JSONL output | diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py index 1fab6e2..63f1d4a 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py @@ -17,7 +17,7 @@ Recheck mode: re-queries venues with slots starting within the next 90 minutes. Writes a separate recheck file for more accurate occupancy measurement. Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.jsonl.gz -Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.json.gz +Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.jsonl.gz """ import gzip @@ -39,7 +39,6 @@ from .utils import ( flush_partial_batch, landing_path, load_partial_results, - write_gzip_atomic, ) logger = setup_logging("padelnomics.extract.playtomic_availability") @@ -510,25 +509,22 @@ def extract_recheck( logger.error("Circuit open with no fallback — writing partial recheck results") break - # Write recheck file + # Write recheck file as JSONL — one venue per line with metadata injected recheck_hour = now.hour year, month = year_month.split("/") dest_dir = landing_path(landing_dir, "playtomic", year, month) - dest = dest_dir / f"availability_{target_date}_recheck_{recheck_hour:02d}.json.gz" + dest = dest_dir / f"availability_{target_date}_recheck_{recheck_hour:02d}.jsonl.gz" captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") - payload = json.dumps({ - "date": target_date, - "captured_at_utc": captured_at, - "recheck_hour": recheck_hour, - "recheck_window_minutes": RECHECK_WINDOW_MINUTES, - "rechecked_tenant_ids": venues_to_recheck, - "venue_count": len(venues_data), - "venues_errored": venues_errored, - "venues": venues_data, - }).encode() + working_path = dest.with_suffix("").with_suffix(".working.jsonl") + with open(working_path, "w") as f: + for venue in venues_data: + venue["date"] = target_date + venue["captured_at_utc"] = captured_at + venue["recheck_hour"] = recheck_hour + f.write(json.dumps(venue, separators=(",", ":")) + "\n") + bytes_written = compress_jsonl_atomic(working_path, dest) - bytes_written = write_gzip_atomic(dest, payload) logger.info( "Recheck: %d/%d venues (%d errors) -> %s (%s bytes)", len(venues_data), len(venues_to_recheck), venues_errored, dest, f"{bytes_written:,}", diff --git a/scripts/init_landing_seeds.py b/scripts/init_landing_seeds.py index cc61bd5..9ffc743 100644 --- a/scripts/init_landing_seeds.py +++ b/scripts/init_landing_seeds.py @@ -51,7 +51,11 @@ def main() -> None: json.dumps({"date": "1970-01-01", "captured_at_utc": "1970-01-01T00:00:00Z", "venue_count": 0, "venues": []}).encode(), - # --- Playtomic recheck (blob only, small format) --- + # --- Playtomic recheck --- + # JSONL: one null venue (filtered by WHERE tenant_id IS NOT NULL) + "playtomic/1970/01/availability_1970-01-01_recheck_00.jsonl.gz": + b'{"tenant_id":null,"date":"1970-01-01","captured_at_utc":"1970-01-01T00:00:00Z","recheck_hour":0,"slots":null}\n', + # Blob: empty venues array (old format, kept for transition) "playtomic/1970/01/availability_1970-01-01_recheck_00.json.gz": json.dumps({"date": "1970-01-01", "captured_at_utc": "1970-01-01T00:00:00Z", "recheck_hour": 0, "venues": []}).encode(), diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql index 9092796..a5ee10f 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql @@ -5,7 +5,8 @@ -- Reads BOTH morning snapshots and recheck files: -- Morning (new): availability_{date}.jsonl.gz → snapshot_type = 'morning' -- Morning (old): availability_{date}.json.gz → snapshot_type = 'morning' --- Recheck: availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck' +-- Recheck (new): availability_{date}_recheck_{HH}.jsonl.gz → snapshot_type = 'recheck' +-- Recheck (old): availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck' -- -- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit). -- Price parsed from strings like "14.56 EUR" or "48 GBP". @@ -77,7 +78,30 @@ morning_blob AS ( ) af, LATERAL UNNEST(af.venues) AS t(venue_json) ), --- Recheck snapshots (blob format only — small files, no JSONL conversion needed) +-- Recheck snapshots (new JSONL format — one venue per line) +recheck_jsonl AS ( + SELECT + date AS snapshot_date, + captured_at_utc, + 'recheck' AS snapshot_type, + TRY_CAST(recheck_hour AS INTEGER) AS recheck_hour, + tenant_id, + slots AS slots_json + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.jsonl.gz', + format = 'newline_delimited', + columns = { + date: 'VARCHAR', + captured_at_utc: 'VARCHAR', + recheck_hour: 'VARCHAR', + tenant_id: 'VARCHAR', + slots: 'JSON' + }, + filename = true + ) + WHERE tenant_id IS NOT NULL +), +-- Recheck snapshots (old blob format, kept for transition) recheck_blob AS ( SELECT rf.date AS snapshot_date, @@ -111,6 +135,8 @@ all_venues AS ( UNION ALL SELECT * FROM morning_blob UNION ALL + SELECT * FROM recheck_jsonl + UNION ALL SELECT * FROM recheck_blob ), raw_resources AS (