feat(extract): convert playtomic_availability to JSONL output
- availability_{date}.jsonl.gz replaces .json.gz for morning snapshots
- Each JSONL line = one venue object with date + captured_at_utc injected
- Eliminates in-memory consolidation: working.jsonl IS the final file
(compress_jsonl_atomic at end instead of write_gzip_atomic blob)
- Crash recovery unchanged: working.jsonl accumulates via flush_partial_batch
- _load_morning_availability tries .jsonl.gz first, falls back to .json.gz
- Skip check covers both formats during transition
- Recheck files stay blob format (small, infrequent)
stg_playtomic_availability: UNION ALL transition (morning_jsonl + morning_blob + recheck_blob)
- morning_jsonl: read_json JSONL, tenant_id direct column, no outer UNNEST
- morning_blob / recheck_blob: subquery + LATERAL UNNEST (unchanged semantics)
- All three produce (snapshot_date, captured_at_utc, snapshot_type, recheck_hour, tenant_id, slots_json)
- Downstream raw_resources / raw_slots CTEs unchanged
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -16,7 +16,7 @@ records (a few seconds of work with 10 workers) are lost on crash.
|
|||||||
Recheck mode: re-queries venues with slots starting within the next 90 minutes.
|
Recheck mode: re-queries venues with slots starting within the next 90 minutes.
|
||||||
Writes a separate recheck file for more accurate occupancy measurement.
|
Writes a separate recheck file for more accurate occupancy measurement.
|
||||||
|
|
||||||
Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.json.gz
|
Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.jsonl.gz
|
||||||
Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.json.gz
|
Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.json.gz
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -34,7 +34,13 @@ import niquests
|
|||||||
|
|
||||||
from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging
|
from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging
|
||||||
from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler
|
from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler
|
||||||
from .utils import flush_partial_batch, landing_path, load_partial_results, write_gzip_atomic
|
from .utils import (
|
||||||
|
compress_jsonl_atomic,
|
||||||
|
flush_partial_batch,
|
||||||
|
landing_path,
|
||||||
|
load_partial_results,
|
||||||
|
write_gzip_atomic,
|
||||||
|
)
|
||||||
|
|
||||||
logger = setup_logging("padelnomics.extract.playtomic_availability")
|
logger = setup_logging("padelnomics.extract.playtomic_availability")
|
||||||
|
|
||||||
@@ -273,14 +279,14 @@ def extract(
|
|||||||
|
|
||||||
year, month = year_month.split("/")
|
year, month = year_month.split("/")
|
||||||
dest_dir = landing_path(landing_dir, "playtomic", year, month)
|
dest_dir = landing_path(landing_dir, "playtomic", year, month)
|
||||||
dest = dest_dir / f"availability_{target_date}.json.gz"
|
dest = dest_dir / f"availability_{target_date}.jsonl.gz"
|
||||||
|
old_blob = dest_dir / f"availability_{target_date}.json.gz"
|
||||||
if dest.exists():
|
if dest.exists() or old_blob.exists():
|
||||||
logger.info("Already have %s — skipping", dest)
|
logger.info("Already have availability for %s — skipping", target_date)
|
||||||
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||||
|
|
||||||
# Crash resumption: load already-fetched venues from partial file
|
# Crash resumption: load already-fetched venues from working file
|
||||||
partial_path = dest.with_suffix(".partial.jsonl")
|
partial_path = dest_dir / f"availability_{target_date}.working.jsonl"
|
||||||
prior_results, already_done = load_partial_results(partial_path, id_key="tenant_id")
|
prior_results, already_done = load_partial_results(partial_path, id_key="tenant_id")
|
||||||
if already_done:
|
if already_done:
|
||||||
logger.info("Resuming: %d venues already fetched from partial file", len(already_done))
|
logger.info("Resuming: %d venues already fetched from partial file", len(already_done))
|
||||||
@@ -297,7 +303,10 @@ def extract(
|
|||||||
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
|
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
|
||||||
start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S")
|
start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S")
|
||||||
|
|
||||||
# Partial file for incremental crash-safe progress
|
# Timestamp stamped into every JSONL line — computed once before the fetch loop.
|
||||||
|
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
|
||||||
|
# Working file for incremental crash-safe progress (IS the final file).
|
||||||
partial_file = open(partial_path, "a") # noqa: SIM115
|
partial_file = open(partial_path, "a") # noqa: SIM115
|
||||||
partial_lock = threading.Lock()
|
partial_lock = threading.Lock()
|
||||||
pending_batch: list[dict] = []
|
pending_batch: list[dict] = []
|
||||||
@@ -305,6 +314,9 @@ def extract(
|
|||||||
def _on_result(result: dict) -> None:
|
def _on_result(result: dict) -> None:
|
||||||
# Called inside _fetch_venues_parallel's lock — no additional locking needed.
|
# Called inside _fetch_venues_parallel's lock — no additional locking needed.
|
||||||
# In serial mode, called single-threaded — also safe without extra locking.
|
# In serial mode, called single-threaded — also safe without extra locking.
|
||||||
|
# Inject date + captured_at so every JSONL line is self-contained.
|
||||||
|
result["date"] = target_date
|
||||||
|
result["captured_at_utc"] = captured_at
|
||||||
pending_batch.append(result)
|
pending_batch.append(result)
|
||||||
if len(pending_batch) >= PARTIAL_FLUSH_SIZE:
|
if len(pending_batch) >= PARTIAL_FLUSH_SIZE:
|
||||||
flush_partial_batch(partial_file, partial_lock, pending_batch)
|
flush_partial_batch(partial_file, partial_lock, pending_batch)
|
||||||
@@ -348,24 +360,13 @@ def extract(
|
|||||||
pending_batch.clear()
|
pending_batch.clear()
|
||||||
partial_file.close()
|
partial_file.close()
|
||||||
|
|
||||||
# Consolidate prior (resumed) + new results into final file
|
# Working file IS the output — compress atomically (deletes source).
|
||||||
venues_data = prior_results + new_venues_data
|
total_venues = len(prior_results) + len(new_venues_data)
|
||||||
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
|
bytes_written = compress_jsonl_atomic(partial_path, dest)
|
||||||
payload = json.dumps({
|
|
||||||
"date": target_date,
|
|
||||||
"captured_at_utc": captured_at,
|
|
||||||
"venue_count": len(venues_data),
|
|
||||||
"venues_errored": venues_errored,
|
|
||||||
"venues": venues_data,
|
|
||||||
}).encode()
|
|
||||||
|
|
||||||
bytes_written = write_gzip_atomic(dest, payload)
|
|
||||||
if partial_path.exists():
|
|
||||||
partial_path.unlink()
|
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"%d venues scraped (%d errors) -> %s (%s bytes)",
|
"%d venues scraped (%d errors) -> %s (%s bytes)",
|
||||||
len(venues_data), venues_errored, dest, f"{bytes_written:,}",
|
total_venues, venues_errored, dest, f"{bytes_written:,}",
|
||||||
)
|
)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
@@ -380,14 +381,36 @@ def extract(
|
|||||||
# Recheck mode — re-query venues with upcoming slots for accurate occupancy
|
# Recheck mode — re-query venues with upcoming slots for accurate occupancy
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _read_availability_jsonl(path: Path) -> dict:
|
||||||
|
"""Read a JSONL availability file into the blob dict format recheck expects."""
|
||||||
|
venues = []
|
||||||
|
date_val = captured_at = None
|
||||||
|
with gzip.open(path, "rt") as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
record = json.loads(line)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
break # truncated last line on crash
|
||||||
|
if date_val is None:
|
||||||
|
date_val = record.get("date")
|
||||||
|
captured_at = record.get("captured_at_utc")
|
||||||
|
venues.append(record)
|
||||||
|
return {"date": date_val, "captured_at_utc": captured_at, "venues": venues}
|
||||||
|
|
||||||
|
|
||||||
def _load_morning_availability(landing_dir: Path, target_date: str) -> dict | None:
|
def _load_morning_availability(landing_dir: Path, target_date: str) -> dict | None:
|
||||||
"""Load today's morning availability file. Returns parsed JSON or None."""
|
"""Load today's morning availability file (JSONL or blob). Returns dict or None."""
|
||||||
playtomic_dir = landing_dir / "playtomic"
|
playtomic_dir = landing_dir / "playtomic"
|
||||||
# Search across year/month dirs for the target date
|
# Try JSONL first (new format), fall back to blob (old format)
|
||||||
|
matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.jsonl.gz"))
|
||||||
|
if matches:
|
||||||
|
return _read_availability_jsonl(matches[0])
|
||||||
matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.json.gz"))
|
matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.json.gz"))
|
||||||
if not matches:
|
if not matches:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
with gzip.open(matches[0], "rb") as f:
|
with gzip.open(matches[0], "rb") as f:
|
||||||
return json.loads(f.read())
|
return json.loads(f.read())
|
||||||
|
|
||||||
|
|||||||
@@ -3,12 +3,17 @@
|
|||||||
-- "Available" = the slot was NOT booked at capture time. Missing slots = booked.
|
-- "Available" = the slot was NOT booked at capture time. Missing slots = booked.
|
||||||
--
|
--
|
||||||
-- Reads BOTH morning snapshots and recheck files:
|
-- Reads BOTH morning snapshots and recheck files:
|
||||||
-- Morning: availability_{date}.json.gz → snapshot_type = 'morning'
|
-- Morning (new): availability_{date}.jsonl.gz → snapshot_type = 'morning'
|
||||||
-- Recheck: availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck'
|
-- Morning (old): availability_{date}.json.gz → snapshot_type = 'morning'
|
||||||
|
-- Recheck: availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck'
|
||||||
--
|
--
|
||||||
-- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit).
|
-- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit).
|
||||||
-- Price parsed from strings like "14.56 EUR" or "48 GBP".
|
-- Price parsed from strings like "14.56 EUR" or "48 GBP".
|
||||||
--
|
--
|
||||||
|
-- Supports two morning landing formats (UNION ALL during migration):
|
||||||
|
-- New: availability_{date}.jsonl.gz — one venue per line, columns: tenant_id, slots, date, captured_at_utc
|
||||||
|
-- Old: availability_{date}.json.gz — {"date":..., "venues": [...]} blob (UNNEST required)
|
||||||
|
--
|
||||||
-- Requires: at least one availability file in the landing zone.
|
-- Requires: at least one availability file in the landing zone.
|
||||||
-- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz)
|
-- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz)
|
||||||
-- with empty venues[] ensures this model runs before real data arrives.
|
-- with empty venues[] ensures this model runs before real data arrives.
|
||||||
@@ -20,77 +25,105 @@ MODEL (
|
|||||||
grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc)
|
grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc)
|
||||||
);
|
);
|
||||||
|
|
||||||
-- Morning snapshots (filename does NOT contain '_recheck_')
|
WITH
|
||||||
WITH morning_files AS (
|
-- New format: one venue per JSONL line — no outer UNNEST needed
|
||||||
|
morning_jsonl AS (
|
||||||
SELECT
|
SELECT
|
||||||
*,
|
date AS snapshot_date,
|
||||||
'morning' AS snapshot_type,
|
captured_at_utc,
|
||||||
NULL::INTEGER AS recheck_hour
|
'morning' AS snapshot_type,
|
||||||
|
NULL::INTEGER AS recheck_hour,
|
||||||
|
tenant_id,
|
||||||
|
slots AS slots_json
|
||||||
FROM read_json(
|
FROM read_json(
|
||||||
@LANDING_DIR || '/playtomic/*/*/availability_*.json.gz',
|
@LANDING_DIR || '/playtomic/*/*/availability_*.jsonl.gz',
|
||||||
format = 'auto',
|
format = 'newline_delimited',
|
||||||
columns = {
|
columns = {
|
||||||
date: 'VARCHAR',
|
date: 'VARCHAR',
|
||||||
captured_at_utc: 'VARCHAR',
|
captured_at_utc: 'VARCHAR',
|
||||||
venues: 'JSON[]'
|
tenant_id: 'VARCHAR',
|
||||||
|
slots: 'JSON'
|
||||||
},
|
},
|
||||||
filename = true,
|
filename = true
|
||||||
maximum_object_size = 134217728 -- 128 MB; daily files grow with venue count
|
|
||||||
)
|
)
|
||||||
WHERE filename NOT LIKE '%_recheck_%'
|
WHERE filename NOT LIKE '%_recheck_%'
|
||||||
AND venues IS NOT NULL
|
AND tenant_id IS NOT NULL
|
||||||
AND json_array_length(venues) > 0
|
|
||||||
),
|
),
|
||||||
-- Recheck snapshots (filename contains '_recheck_')
|
-- Old format: {"date":..., "venues": [...]} blob — kept for transition
|
||||||
-- Use TRY_CAST on a regex-extracted hour to get the recheck_hour.
|
morning_blob AS (
|
||||||
-- If no recheck files exist yet, this CTE produces zero rows (safe).
|
|
||||||
recheck_files AS (
|
|
||||||
SELECT
|
SELECT
|
||||||
*,
|
af.date AS snapshot_date,
|
||||||
'recheck' AS snapshot_type,
|
|
||||||
TRY_CAST(
|
|
||||||
regexp_extract(filename, '_recheck_(\d+)', 1) AS INTEGER
|
|
||||||
) AS recheck_hour
|
|
||||||
FROM read_json(
|
|
||||||
@LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz',
|
|
||||||
format = 'auto',
|
|
||||||
columns = {
|
|
||||||
date: 'VARCHAR',
|
|
||||||
captured_at_utc: 'VARCHAR',
|
|
||||||
venues: 'JSON[]'
|
|
||||||
},
|
|
||||||
filename = true,
|
|
||||||
maximum_object_size = 134217728 -- 128 MB; matches morning snapshot limit
|
|
||||||
)
|
|
||||||
WHERE venues IS NOT NULL
|
|
||||||
AND json_array_length(venues) > 0
|
|
||||||
),
|
|
||||||
all_files AS (
|
|
||||||
SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM morning_files
|
|
||||||
UNION ALL
|
|
||||||
SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM recheck_files
|
|
||||||
),
|
|
||||||
raw_venues AS (
|
|
||||||
SELECT
|
|
||||||
af.date AS snapshot_date,
|
|
||||||
af.captured_at_utc,
|
af.captured_at_utc,
|
||||||
af.snapshot_type,
|
'morning' AS snapshot_type,
|
||||||
af.recheck_hour,
|
NULL::INTEGER AS recheck_hour,
|
||||||
venue_json
|
venue_json ->> 'tenant_id' AS tenant_id,
|
||||||
FROM all_files af,
|
venue_json -> 'slots' AS slots_json
|
||||||
|
FROM (
|
||||||
|
SELECT date, captured_at_utc, venues
|
||||||
|
FROM read_json(
|
||||||
|
@LANDING_DIR || '/playtomic/*/*/availability_*.json.gz',
|
||||||
|
format = 'auto',
|
||||||
|
columns = {
|
||||||
|
date: 'VARCHAR',
|
||||||
|
captured_at_utc: 'VARCHAR',
|
||||||
|
venues: 'JSON[]'
|
||||||
|
},
|
||||||
|
filename = true,
|
||||||
|
maximum_object_size = 134217728 -- 128 MB; daily files grow with venue count
|
||||||
|
)
|
||||||
|
WHERE filename NOT LIKE '%_recheck_%'
|
||||||
|
AND venues IS NOT NULL
|
||||||
|
AND json_array_length(venues) > 0
|
||||||
|
) af,
|
||||||
LATERAL UNNEST(af.venues) AS t(venue_json)
|
LATERAL UNNEST(af.venues) AS t(venue_json)
|
||||||
),
|
),
|
||||||
|
-- Recheck snapshots (blob format only — small files, no JSONL conversion needed)
|
||||||
|
recheck_blob AS (
|
||||||
|
SELECT
|
||||||
|
rf.date AS snapshot_date,
|
||||||
|
rf.captured_at_utc,
|
||||||
|
'recheck' AS snapshot_type,
|
||||||
|
TRY_CAST(
|
||||||
|
regexp_extract(rf.filename, '_recheck_(\d+)', 1) AS INTEGER
|
||||||
|
) AS recheck_hour,
|
||||||
|
venue_json ->> 'tenant_id' AS tenant_id,
|
||||||
|
venue_json -> 'slots' AS slots_json
|
||||||
|
FROM (
|
||||||
|
SELECT date, captured_at_utc, venues, filename
|
||||||
|
FROM read_json(
|
||||||
|
@LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz',
|
||||||
|
format = 'auto',
|
||||||
|
columns = {
|
||||||
|
date: 'VARCHAR',
|
||||||
|
captured_at_utc: 'VARCHAR',
|
||||||
|
venues: 'JSON[]'
|
||||||
|
},
|
||||||
|
filename = true,
|
||||||
|
maximum_object_size = 134217728 -- 128 MB; matches morning snapshot limit
|
||||||
|
)
|
||||||
|
WHERE venues IS NOT NULL
|
||||||
|
AND json_array_length(venues) > 0
|
||||||
|
) rf,
|
||||||
|
LATERAL UNNEST(rf.venues) AS t(venue_json)
|
||||||
|
),
|
||||||
|
all_venues AS (
|
||||||
|
SELECT * FROM morning_jsonl
|
||||||
|
UNION ALL
|
||||||
|
SELECT * FROM morning_blob
|
||||||
|
UNION ALL
|
||||||
|
SELECT * FROM recheck_blob
|
||||||
|
),
|
||||||
raw_resources AS (
|
raw_resources AS (
|
||||||
SELECT
|
SELECT
|
||||||
rv.snapshot_date,
|
av.snapshot_date,
|
||||||
rv.captured_at_utc,
|
av.captured_at_utc,
|
||||||
rv.snapshot_type,
|
av.snapshot_type,
|
||||||
rv.recheck_hour,
|
av.recheck_hour,
|
||||||
rv.venue_json ->> 'tenant_id' AS tenant_id,
|
av.tenant_id,
|
||||||
resource_json
|
resource_json
|
||||||
FROM raw_venues rv,
|
FROM all_venues av,
|
||||||
LATERAL UNNEST(
|
LATERAL UNNEST(
|
||||||
from_json(rv.venue_json -> 'slots', '["JSON"]')
|
from_json(av.slots_json, '["JSON"]')
|
||||||
) AS t(resource_json)
|
) AS t(resource_json)
|
||||||
),
|
),
|
||||||
raw_slots AS (
|
raw_slots AS (
|
||||||
|
|||||||
Reference in New Issue
Block a user