diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py index e086855..1fab6e2 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py @@ -16,7 +16,7 @@ records (a few seconds of work with 10 workers) are lost on crash. Recheck mode: re-queries venues with slots starting within the next 90 minutes. Writes a separate recheck file for more accurate occupancy measurement. -Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.json.gz +Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.jsonl.gz Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.json.gz """ @@ -34,7 +34,13 @@ import niquests from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler -from .utils import flush_partial_batch, landing_path, load_partial_results, write_gzip_atomic +from .utils import ( + compress_jsonl_atomic, + flush_partial_batch, + landing_path, + load_partial_results, + write_gzip_atomic, +) logger = setup_logging("padelnomics.extract.playtomic_availability") @@ -273,14 +279,14 @@ def extract( year, month = year_month.split("/") dest_dir = landing_path(landing_dir, "playtomic", year, month) - dest = dest_dir / f"availability_{target_date}.json.gz" - - if dest.exists(): - logger.info("Already have %s — skipping", dest) + dest = dest_dir / f"availability_{target_date}.jsonl.gz" + old_blob = dest_dir / f"availability_{target_date}.json.gz" + if dest.exists() or old_blob.exists(): + logger.info("Already have availability for %s — skipping", target_date) return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} - # Crash resumption: load already-fetched venues from partial file - partial_path = dest.with_suffix(".partial.jsonl") + # Crash resumption: load already-fetched venues from working file + partial_path = dest_dir / f"availability_{target_date}.working.jsonl" prior_results, already_done = load_partial_results(partial_path, id_key="tenant_id") if already_done: logger.info("Resuming: %d venues already fetched from partial file", len(already_done)) @@ -297,7 +303,10 @@ def extract( start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S") start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S") - # Partial file for incremental crash-safe progress + # Timestamp stamped into every JSONL line — computed once before the fetch loop. + captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") + + # Working file for incremental crash-safe progress (IS the final file). partial_file = open(partial_path, "a") # noqa: SIM115 partial_lock = threading.Lock() pending_batch: list[dict] = [] @@ -305,6 +314,9 @@ def extract( def _on_result(result: dict) -> None: # Called inside _fetch_venues_parallel's lock — no additional locking needed. # In serial mode, called single-threaded — also safe without extra locking. + # Inject date + captured_at so every JSONL line is self-contained. + result["date"] = target_date + result["captured_at_utc"] = captured_at pending_batch.append(result) if len(pending_batch) >= PARTIAL_FLUSH_SIZE: flush_partial_batch(partial_file, partial_lock, pending_batch) @@ -348,24 +360,13 @@ def extract( pending_batch.clear() partial_file.close() - # Consolidate prior (resumed) + new results into final file - venues_data = prior_results + new_venues_data - captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") - payload = json.dumps({ - "date": target_date, - "captured_at_utc": captured_at, - "venue_count": len(venues_data), - "venues_errored": venues_errored, - "venues": venues_data, - }).encode() - - bytes_written = write_gzip_atomic(dest, payload) - if partial_path.exists(): - partial_path.unlink() + # Working file IS the output — compress atomically (deletes source). + total_venues = len(prior_results) + len(new_venues_data) + bytes_written = compress_jsonl_atomic(partial_path, dest) logger.info( "%d venues scraped (%d errors) -> %s (%s bytes)", - len(venues_data), venues_errored, dest, f"{bytes_written:,}", + total_venues, venues_errored, dest, f"{bytes_written:,}", ) return { @@ -380,14 +381,36 @@ def extract( # Recheck mode — re-query venues with upcoming slots for accurate occupancy # --------------------------------------------------------------------------- +def _read_availability_jsonl(path: Path) -> dict: + """Read a JSONL availability file into the blob dict format recheck expects.""" + venues = [] + date_val = captured_at = None + with gzip.open(path, "rt") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + break # truncated last line on crash + if date_val is None: + date_val = record.get("date") + captured_at = record.get("captured_at_utc") + venues.append(record) + return {"date": date_val, "captured_at_utc": captured_at, "venues": venues} + + def _load_morning_availability(landing_dir: Path, target_date: str) -> dict | None: - """Load today's morning availability file. Returns parsed JSON or None.""" + """Load today's morning availability file (JSONL or blob). Returns dict or None.""" playtomic_dir = landing_dir / "playtomic" - # Search across year/month dirs for the target date + # Try JSONL first (new format), fall back to blob (old format) + matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.jsonl.gz")) + if matches: + return _read_availability_jsonl(matches[0]) matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.json.gz")) if not matches: return None - with gzip.open(matches[0], "rb") as f: return json.loads(f.read()) diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql index bf0b3f2..9092796 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql @@ -3,12 +3,17 @@ -- "Available" = the slot was NOT booked at capture time. Missing slots = booked. -- -- Reads BOTH morning snapshots and recheck files: --- Morning: availability_{date}.json.gz → snapshot_type = 'morning' --- Recheck: availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck' +-- Morning (new): availability_{date}.jsonl.gz → snapshot_type = 'morning' +-- Morning (old): availability_{date}.json.gz → snapshot_type = 'morning' +-- Recheck: availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck' -- -- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit). -- Price parsed from strings like "14.56 EUR" or "48 GBP". -- +-- Supports two morning landing formats (UNION ALL during migration): +-- New: availability_{date}.jsonl.gz — one venue per line, columns: tenant_id, slots, date, captured_at_utc +-- Old: availability_{date}.json.gz — {"date":..., "venues": [...]} blob (UNNEST required) +-- -- Requires: at least one availability file in the landing zone. -- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz) -- with empty venues[] ensures this model runs before real data arrives. @@ -20,77 +25,105 @@ MODEL ( grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc) ); --- Morning snapshots (filename does NOT contain '_recheck_') -WITH morning_files AS ( +WITH +-- New format: one venue per JSONL line — no outer UNNEST needed +morning_jsonl AS ( SELECT - *, - 'morning' AS snapshot_type, - NULL::INTEGER AS recheck_hour + date AS snapshot_date, + captured_at_utc, + 'morning' AS snapshot_type, + NULL::INTEGER AS recheck_hour, + tenant_id, + slots AS slots_json FROM read_json( - @LANDING_DIR || '/playtomic/*/*/availability_*.json.gz', - format = 'auto', + @LANDING_DIR || '/playtomic/*/*/availability_*.jsonl.gz', + format = 'newline_delimited', columns = { date: 'VARCHAR', captured_at_utc: 'VARCHAR', - venues: 'JSON[]' + tenant_id: 'VARCHAR', + slots: 'JSON' }, - filename = true, - maximum_object_size = 134217728 -- 128 MB; daily files grow with venue count + filename = true ) WHERE filename NOT LIKE '%_recheck_%' - AND venues IS NOT NULL - AND json_array_length(venues) > 0 + AND tenant_id IS NOT NULL ), --- Recheck snapshots (filename contains '_recheck_') --- Use TRY_CAST on a regex-extracted hour to get the recheck_hour. --- If no recheck files exist yet, this CTE produces zero rows (safe). -recheck_files AS ( +-- Old format: {"date":..., "venues": [...]} blob — kept for transition +morning_blob AS ( SELECT - *, - 'recheck' AS snapshot_type, - TRY_CAST( - regexp_extract(filename, '_recheck_(\d+)', 1) AS INTEGER - ) AS recheck_hour - FROM read_json( - @LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz', - format = 'auto', - columns = { - date: 'VARCHAR', - captured_at_utc: 'VARCHAR', - venues: 'JSON[]' - }, - filename = true, - maximum_object_size = 134217728 -- 128 MB; matches morning snapshot limit - ) - WHERE venues IS NOT NULL - AND json_array_length(venues) > 0 -), -all_files AS ( - SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM morning_files - UNION ALL - SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM recheck_files -), -raw_venues AS ( - SELECT - af.date AS snapshot_date, + af.date AS snapshot_date, af.captured_at_utc, - af.snapshot_type, - af.recheck_hour, - venue_json - FROM all_files af, + 'morning' AS snapshot_type, + NULL::INTEGER AS recheck_hour, + venue_json ->> 'tenant_id' AS tenant_id, + venue_json -> 'slots' AS slots_json + FROM ( + SELECT date, captured_at_utc, venues + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/availability_*.json.gz', + format = 'auto', + columns = { + date: 'VARCHAR', + captured_at_utc: 'VARCHAR', + venues: 'JSON[]' + }, + filename = true, + maximum_object_size = 134217728 -- 128 MB; daily files grow with venue count + ) + WHERE filename NOT LIKE '%_recheck_%' + AND venues IS NOT NULL + AND json_array_length(venues) > 0 + ) af, LATERAL UNNEST(af.venues) AS t(venue_json) ), +-- Recheck snapshots (blob format only — small files, no JSONL conversion needed) +recheck_blob AS ( + SELECT + rf.date AS snapshot_date, + rf.captured_at_utc, + 'recheck' AS snapshot_type, + TRY_CAST( + regexp_extract(rf.filename, '_recheck_(\d+)', 1) AS INTEGER + ) AS recheck_hour, + venue_json ->> 'tenant_id' AS tenant_id, + venue_json -> 'slots' AS slots_json + FROM ( + SELECT date, captured_at_utc, venues, filename + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz', + format = 'auto', + columns = { + date: 'VARCHAR', + captured_at_utc: 'VARCHAR', + venues: 'JSON[]' + }, + filename = true, + maximum_object_size = 134217728 -- 128 MB; matches morning snapshot limit + ) + WHERE venues IS NOT NULL + AND json_array_length(venues) > 0 + ) rf, + LATERAL UNNEST(rf.venues) AS t(venue_json) +), +all_venues AS ( + SELECT * FROM morning_jsonl + UNION ALL + SELECT * FROM morning_blob + UNION ALL + SELECT * FROM recheck_blob +), raw_resources AS ( SELECT - rv.snapshot_date, - rv.captured_at_utc, - rv.snapshot_type, - rv.recheck_hour, - rv.venue_json ->> 'tenant_id' AS tenant_id, + av.snapshot_date, + av.captured_at_utc, + av.snapshot_type, + av.recheck_hour, + av.tenant_id, resource_json - FROM raw_venues rv, + FROM all_venues av, LATERAL UNNEST( - from_json(rv.venue_json -> 'slots', '["JSON"]') + from_json(av.slots_json, '["JSON"]') ) AS t(resource_json) ), raw_slots AS (