feat: standardise recheck availability to JSONL output
- extract_recheck() now writes availability_{date}_recheck_{HH}.jsonl.gz
(one venue per line with date/captured_at_utc/recheck_hour injected);
uses compress_jsonl_atomic; removes write_gzip_atomic import
- stg_playtomic_availability: add recheck_jsonl CTE (newline_delimited
read_json on *.jsonl.gz recheck files); include in all_venues UNION ALL;
old recheck_blob CTE kept for transition
- init_landing_seeds.py: add JSONL recheck seed alongside blob seed
- Docs: README landing structure + data sources table updated; CHANGELOG
availability bullets updated; data-sources-inventory paths corrected
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -9,7 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
|
|||||||
### Added
|
### Added
|
||||||
- **JSONL streaming landing format** — extractors now write one JSON object per line (`.jsonl.gz`) instead of a single large blob, eliminating in-memory accumulation and `maximum_object_size` workarounds:
|
- **JSONL streaming landing format** — extractors now write one JSON object per line (`.jsonl.gz`) instead of a single large blob, eliminating in-memory accumulation and `maximum_object_size` workarounds:
|
||||||
- `playtomic_tenants.py` → `tenants.jsonl.gz` (one tenant per line; dedup still happens in memory before write)
|
- `playtomic_tenants.py` → `tenants.jsonl.gz` (one tenant per line; dedup still happens in memory before write)
|
||||||
- `playtomic_availability.py` → `availability_{date}.jsonl.gz` (one venue per line with `date`/`captured_at_utc` injected; working file IS the final file — eliminates the consolidation step)
|
- `playtomic_availability.py` → `availability_{date}.jsonl.gz` (morning) + `availability_{date}_recheck_{HH}.jsonl.gz` (recheck); one venue per line with `date`/`captured_at_utc`/`recheck_hour` injected
|
||||||
- `geonames.py` → `cities_global.jsonl.gz` (one city per line; eliminates 30 MB blob and its `maximum_object_size` workaround)
|
- `geonames.py` → `cities_global.jsonl.gz` (one city per line; eliminates 30 MB blob and its `maximum_object_size` workaround)
|
||||||
- `compress_jsonl_atomic(jsonl_path, dest_path)` utility added to `utils.py` — streams compression in 1 MB chunks, atomic `.tmp` rename, deletes source
|
- `compress_jsonl_atomic(jsonl_path, dest_path)` utility added to `utils.py` — streams compression in 1 MB chunks, atomic `.tmp` rename, deletes source
|
||||||
- **Regional Overpass splitting for tennis courts** — replaces single global query (150K+ elements, timed out) with 10 regional bbox queries (~10-40K elements each, 150s server / 180s client):
|
- **Regional Overpass splitting for tennis courts** — replaces single global query (150K+ elements, timed out) with 10 regional bbox queries (~10-40K elements each, 150s server / 180s client):
|
||||||
@@ -22,7 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
|
|||||||
### Changed
|
### Changed
|
||||||
- All modified staging SQL models use **UNION ALL transition CTEs** — both JSONL (new) and blob (old) formats are readable simultaneously; old `.json.gz` files in the landing zone continue working until they rotate out naturally:
|
- All modified staging SQL models use **UNION ALL transition CTEs** — both JSONL (new) and blob (old) formats are readable simultaneously; old `.json.gz` files in the landing zone continue working until they rotate out naturally:
|
||||||
- `stg_playtomic_venues`, `stg_playtomic_resources`, `stg_playtomic_opening_hours` — JSONL top-level columns (no `UNNEST(tenants)`)
|
- `stg_playtomic_venues`, `stg_playtomic_resources`, `stg_playtomic_opening_hours` — JSONL top-level columns (no `UNNEST(tenants)`)
|
||||||
- `stg_playtomic_availability` — JSONL morning files + blob morning files + blob recheck files
|
- `stg_playtomic_availability` — JSONL morning + recheck files; blob morning + recheck kept for transition
|
||||||
- `stg_population_geonames` — JSONL city rows (no `UNNEST(rows)`, no `maximum_object_size`)
|
- `stg_population_geonames` — JSONL city rows (no `UNNEST(rows)`, no `maximum_object_size`)
|
||||||
- `stg_tennis_courts` — JSONL elements with `COALESCE(lat, center.lat)` for way/relation centre coords; blob UNNEST kept for old files
|
- `stg_tennis_courts` — JSONL elements with `COALESCE(lat, center.lat)` for way/relation centre coords; blob UNNEST kept for old files
|
||||||
- **Marketplace admin dashboard** (`/admin/marketplace`) — single-screen health view for the two-sided market:
|
- **Marketplace admin dashboard** (`/admin/marketplace`) — single-screen health view for the two-sided market:
|
||||||
|
|||||||
@@ -118,7 +118,7 @@ Playtomic covers 16,000+ courts globally. The platform is dominant in Spain, UK,
|
|||||||
|
|
||||||
**Pipeline implementation (tenants):** ✅ Ingested
|
**Pipeline implementation (tenants):** ✅ Ingested
|
||||||
- Extractor: `extract-playtomic-tenants` — paginated global scrape of `GET /v1/tenants?sport_ids=PADEL`, page size 100, up to 500 pages
|
- Extractor: `extract-playtomic-tenants` — paginated global scrape of `GET /v1/tenants?sport_ids=PADEL`, page size 100, up to 500 pages
|
||||||
- Landing: `data/landing/playtomic/{year}/{month}/tenants.json.gz` (~14K venues as of Feb 2026)
|
- Landing: `data/landing/playtomic/{year}/{month}/tenants.jsonl.gz` (~14K venues as of Feb 2026)
|
||||||
- Throttle: 2 s between pages; deduplicates on `tenant_id`
|
- Throttle: 2 s between pages; deduplicates on `tenant_id`
|
||||||
- Staging models (all grain `tenant_id` or `(tenant_id, resource_id)`):
|
- Staging models (all grain `tenant_id` or `(tenant_id, resource_id)`):
|
||||||
- `stg_playtomic_venues` — venue metadata: name, address, city, country, coordinates, booking type, status
|
- `stg_playtomic_venues` — venue metadata: name, address, city, country, coordinates, booking type, status
|
||||||
@@ -127,9 +127,10 @@ Playtomic covers 16,000+ courts globally. The platform is dominant in Spain, UK,
|
|||||||
|
|
||||||
**Pipeline implementation (availability):** ✅ Ingested
|
**Pipeline implementation (availability):** ✅ Ingested
|
||||||
- Extractor: `extract-playtomic-availability` — reads tenant IDs from latest tenants file, queries `GET /v1/availability` for next-day slots per venue
|
- Extractor: `extract-playtomic-availability` — reads tenant IDs from latest tenants file, queries `GET /v1/availability` for next-day slots per venue
|
||||||
- Landing: `data/landing/playtomic/{year}/{month}/{date}/availability_morning.json.gz` + `availability_recheck.json.gz`
|
- Landing: `data/landing/playtomic/{year}/{month}/availability_{date}.jsonl.gz` (morning) + `availability_{date}_recheck_{HH}.jsonl.gz` (recheck)
|
||||||
- Recheck mode: re-queries slots starting within 90 min (controlled by `RECHECK_WINDOW_MINUTES`); captures near-real-time fill rates
|
- Old blob format (`.json.gz`) retained in landing zone alongside JSONL; staging reads both
|
||||||
- Parallelism: `EXTRACT_WORKERS` env var; `PROXY_URLS` for distributed rate limiting; throttle 1 s per venue per worker
|
- Recheck mode: re-queries slots starting within `RECHECK_WINDOW_MINUTES` (default 30); captures near-real-time fill rates
|
||||||
|
- Parallelism: worker count derived from `PROXY_URLS` length; throttle 1 s per venue per worker
|
||||||
- Staging: `stg_playtomic_availability`, grain `(snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc)`
|
- Staging: `stg_playtomic_availability`, grain `(snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc)`
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ src/padelnomics_extract/
|
|||||||
|
|
||||||
```python
|
```python
|
||||||
from ._shared import run_extractor, setup_logging
|
from ._shared import run_extractor, setup_logging
|
||||||
from .utils import landing_path, write_gzip_atomic
|
from .utils import compress_jsonl_atomic, landing_path
|
||||||
|
|
||||||
logger = setup_logging("padelnomics.extract.my_source")
|
logger = setup_logging("padelnomics.extract.my_source")
|
||||||
EXTRACTOR_NAME = "my_source"
|
EXTRACTOR_NAME = "my_source"
|
||||||
@@ -108,18 +108,23 @@ sqlite3 data/landing/.state.sqlite \
|
|||||||
```
|
```
|
||||||
data/landing/
|
data/landing/
|
||||||
├── .state.sqlite
|
├── .state.sqlite
|
||||||
├── overpass/{year}/{month}/courts.json.gz
|
├── overpass/{year}/{month}/courts.{jsonl,json}.gz
|
||||||
|
├── overpass_tennis/{year}/{month}/courts.{jsonl,json}.gz
|
||||||
├── eurostat/{year}/{month}/urb_cpop1.json.gz
|
├── eurostat/{year}/{month}/urb_cpop1.json.gz
|
||||||
├── eurostat/{year}/{month}/ilc_di03.json.gz
|
├── eurostat/{year}/{month}/ilc_di03.json.gz
|
||||||
├── playtomic/{year}/{month}/tenants.json.gz
|
├── geonames/{year}/{month}/cities_global.{jsonl,json}.gz
|
||||||
└── playtomic/{year}/{month}/availability_{date}.json.gz
|
├── playtomic/{year}/{month}/tenants.{jsonl,json}.gz
|
||||||
|
├── playtomic/{year}/{month}/availability_{date}.{jsonl,json}.gz
|
||||||
|
└── playtomic/{year}/{month}/availability_{date}_recheck_{HH}.{jsonl,json}.gz
|
||||||
```
|
```
|
||||||
|
|
||||||
## Data sources
|
## Data sources
|
||||||
|
|
||||||
| Source | Module | Schedule | Notes |
|
| Source | Module | Schedule | Notes |
|
||||||
|--------|--------|----------|-------|
|
|--------|--------|----------|-------|
|
||||||
| Overpass API | `overpass.py` | Daily | OSM padel courts, ~5K nodes |
|
| Overpass API (padel) | `overpass.py` | Daily | OSM padel courts, ~5K nodes; JSONL output |
|
||||||
|
| Overpass API (tennis) | `overpass_tennis.py` | Daily | OSM tennis courts, ~150K+ nodes; regional splits; JSONL output |
|
||||||
| Eurostat | `eurostat.py` | Daily (304 most runs) | urb_cpop1, ilc_di03 — etag dedup |
|
| Eurostat | `eurostat.py` | Daily (304 most runs) | urb_cpop1, ilc_di03 — etag dedup |
|
||||||
| Playtomic tenants | `playtomic_tenants.py` | Daily | ~8K venues, bounded pagination |
|
| GeoNames | `geonames.py` | Daily | ~140K locations (pop ≥1K); JSONL output |
|
||||||
| Playtomic availability | `playtomic_availability.py` | Daily | Next-day slots, ~4.5h runtime |
|
| Playtomic tenants | `playtomic_tenants.py` | Daily | ~14K venues, bounded pagination; JSONL output |
|
||||||
|
| Playtomic availability | `playtomic_availability.py` | Daily + recheck | Morning: next-day slots; recheck: near-real-time fill; JSONL output |
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ Recheck mode: re-queries venues with slots starting within the next 90 minutes.
|
|||||||
Writes a separate recheck file for more accurate occupancy measurement.
|
Writes a separate recheck file for more accurate occupancy measurement.
|
||||||
|
|
||||||
Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.jsonl.gz
|
Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.jsonl.gz
|
||||||
Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.json.gz
|
Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.jsonl.gz
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import gzip
|
import gzip
|
||||||
@@ -39,7 +39,6 @@ from .utils import (
|
|||||||
flush_partial_batch,
|
flush_partial_batch,
|
||||||
landing_path,
|
landing_path,
|
||||||
load_partial_results,
|
load_partial_results,
|
||||||
write_gzip_atomic,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = setup_logging("padelnomics.extract.playtomic_availability")
|
logger = setup_logging("padelnomics.extract.playtomic_availability")
|
||||||
@@ -510,25 +509,22 @@ def extract_recheck(
|
|||||||
logger.error("Circuit open with no fallback — writing partial recheck results")
|
logger.error("Circuit open with no fallback — writing partial recheck results")
|
||||||
break
|
break
|
||||||
|
|
||||||
# Write recheck file
|
# Write recheck file as JSONL — one venue per line with metadata injected
|
||||||
recheck_hour = now.hour
|
recheck_hour = now.hour
|
||||||
year, month = year_month.split("/")
|
year, month = year_month.split("/")
|
||||||
dest_dir = landing_path(landing_dir, "playtomic", year, month)
|
dest_dir = landing_path(landing_dir, "playtomic", year, month)
|
||||||
dest = dest_dir / f"availability_{target_date}_recheck_{recheck_hour:02d}.json.gz"
|
dest = dest_dir / f"availability_{target_date}_recheck_{recheck_hour:02d}.jsonl.gz"
|
||||||
|
|
||||||
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
|
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
payload = json.dumps({
|
working_path = dest.with_suffix("").with_suffix(".working.jsonl")
|
||||||
"date": target_date,
|
with open(working_path, "w") as f:
|
||||||
"captured_at_utc": captured_at,
|
for venue in venues_data:
|
||||||
"recheck_hour": recheck_hour,
|
venue["date"] = target_date
|
||||||
"recheck_window_minutes": RECHECK_WINDOW_MINUTES,
|
venue["captured_at_utc"] = captured_at
|
||||||
"rechecked_tenant_ids": venues_to_recheck,
|
venue["recheck_hour"] = recheck_hour
|
||||||
"venue_count": len(venues_data),
|
f.write(json.dumps(venue, separators=(",", ":")) + "\n")
|
||||||
"venues_errored": venues_errored,
|
bytes_written = compress_jsonl_atomic(working_path, dest)
|
||||||
"venues": venues_data,
|
|
||||||
}).encode()
|
|
||||||
|
|
||||||
bytes_written = write_gzip_atomic(dest, payload)
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Recheck: %d/%d venues (%d errors) -> %s (%s bytes)",
|
"Recheck: %d/%d venues (%d errors) -> %s (%s bytes)",
|
||||||
len(venues_data), len(venues_to_recheck), venues_errored, dest, f"{bytes_written:,}",
|
len(venues_data), len(venues_to_recheck), venues_errored, dest, f"{bytes_written:,}",
|
||||||
|
|||||||
@@ -51,7 +51,11 @@ def main() -> None:
|
|||||||
json.dumps({"date": "1970-01-01", "captured_at_utc": "1970-01-01T00:00:00Z",
|
json.dumps({"date": "1970-01-01", "captured_at_utc": "1970-01-01T00:00:00Z",
|
||||||
"venue_count": 0, "venues": []}).encode(),
|
"venue_count": 0, "venues": []}).encode(),
|
||||||
|
|
||||||
# --- Playtomic recheck (blob only, small format) ---
|
# --- Playtomic recheck ---
|
||||||
|
# JSONL: one null venue (filtered by WHERE tenant_id IS NOT NULL)
|
||||||
|
"playtomic/1970/01/availability_1970-01-01_recheck_00.jsonl.gz":
|
||||||
|
b'{"tenant_id":null,"date":"1970-01-01","captured_at_utc":"1970-01-01T00:00:00Z","recheck_hour":0,"slots":null}\n',
|
||||||
|
# Blob: empty venues array (old format, kept for transition)
|
||||||
"playtomic/1970/01/availability_1970-01-01_recheck_00.json.gz":
|
"playtomic/1970/01/availability_1970-01-01_recheck_00.json.gz":
|
||||||
json.dumps({"date": "1970-01-01", "captured_at_utc": "1970-01-01T00:00:00Z",
|
json.dumps({"date": "1970-01-01", "captured_at_utc": "1970-01-01T00:00:00Z",
|
||||||
"recheck_hour": 0, "venues": []}).encode(),
|
"recheck_hour": 0, "venues": []}).encode(),
|
||||||
|
|||||||
@@ -5,7 +5,8 @@
|
|||||||
-- Reads BOTH morning snapshots and recheck files:
|
-- Reads BOTH morning snapshots and recheck files:
|
||||||
-- Morning (new): availability_{date}.jsonl.gz → snapshot_type = 'morning'
|
-- Morning (new): availability_{date}.jsonl.gz → snapshot_type = 'morning'
|
||||||
-- Morning (old): availability_{date}.json.gz → snapshot_type = 'morning'
|
-- Morning (old): availability_{date}.json.gz → snapshot_type = 'morning'
|
||||||
-- Recheck: availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck'
|
-- Recheck (new): availability_{date}_recheck_{HH}.jsonl.gz → snapshot_type = 'recheck'
|
||||||
|
-- Recheck (old): availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck'
|
||||||
--
|
--
|
||||||
-- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit).
|
-- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit).
|
||||||
-- Price parsed from strings like "14.56 EUR" or "48 GBP".
|
-- Price parsed from strings like "14.56 EUR" or "48 GBP".
|
||||||
@@ -77,7 +78,30 @@ morning_blob AS (
|
|||||||
) af,
|
) af,
|
||||||
LATERAL UNNEST(af.venues) AS t(venue_json)
|
LATERAL UNNEST(af.venues) AS t(venue_json)
|
||||||
),
|
),
|
||||||
-- Recheck snapshots (blob format only — small files, no JSONL conversion needed)
|
-- Recheck snapshots (new JSONL format — one venue per line)
|
||||||
|
recheck_jsonl AS (
|
||||||
|
SELECT
|
||||||
|
date AS snapshot_date,
|
||||||
|
captured_at_utc,
|
||||||
|
'recheck' AS snapshot_type,
|
||||||
|
TRY_CAST(recheck_hour AS INTEGER) AS recheck_hour,
|
||||||
|
tenant_id,
|
||||||
|
slots AS slots_json
|
||||||
|
FROM read_json(
|
||||||
|
@LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.jsonl.gz',
|
||||||
|
format = 'newline_delimited',
|
||||||
|
columns = {
|
||||||
|
date: 'VARCHAR',
|
||||||
|
captured_at_utc: 'VARCHAR',
|
||||||
|
recheck_hour: 'VARCHAR',
|
||||||
|
tenant_id: 'VARCHAR',
|
||||||
|
slots: 'JSON'
|
||||||
|
},
|
||||||
|
filename = true
|
||||||
|
)
|
||||||
|
WHERE tenant_id IS NOT NULL
|
||||||
|
),
|
||||||
|
-- Recheck snapshots (old blob format, kept for transition)
|
||||||
recheck_blob AS (
|
recheck_blob AS (
|
||||||
SELECT
|
SELECT
|
||||||
rf.date AS snapshot_date,
|
rf.date AS snapshot_date,
|
||||||
@@ -111,6 +135,8 @@ all_venues AS (
|
|||||||
UNION ALL
|
UNION ALL
|
||||||
SELECT * FROM morning_blob
|
SELECT * FROM morning_blob
|
||||||
UNION ALL
|
UNION ALL
|
||||||
|
SELECT * FROM recheck_jsonl
|
||||||
|
UNION ALL
|
||||||
SELECT * FROM recheck_blob
|
SELECT * FROM recheck_blob
|
||||||
),
|
),
|
||||||
raw_resources AS (
|
raw_resources AS (
|
||||||
|
|||||||
Reference in New Issue
Block a user