Add proxy_failure_limit param to make_tiered_cycler (default 3). Individual proxies hitting the limit are marked dead and permanently skipped. next_proxy() auto-escalates when all proxies in the active tier are dead. Both mechanisms coexist: per-proxy dead tracking removes broken individuals; tier-level threshold catches systemic failure. - proxy.py: dead_proxies set + proxy_failure_counts dict in state; next_proxy skips dead proxies with bounded loop; record_failure/ record_success accept optional proxy_url; dead_proxy_count() added - playtomic_tenants.py: pass proxy_url to record_success/record_failure - playtomic_availability.py: _worker returns (proxy_url, result); serial loops in extract + extract_recheck capture proxy_url - test_supervisor.py: 11 new tests in TestTieredCyclerDeadProxyTracking Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
560 lines
22 KiB
Python
560 lines
22 KiB
Python
"""Playtomic availability extractor — booking slot data for market intelligence.
|
|
|
|
Reads tenant IDs from the latest tenants.json.gz, then queries the
|
|
unauthenticated /v1/availability endpoint for each venue's next-day slots.
|
|
This is the highest-value source: daily snapshots enable occupancy rate
|
|
estimation, pricing benchmarking, and demand signal detection.
|
|
|
|
Parallel mode: worker count is derived from PROXY_URLS (one worker per proxy).
|
|
Without proxies, runs single-threaded with per-request throttling.
|
|
|
|
Crash resumption: progress is flushed to a .partial.jsonl sidecar file every
|
|
PARTIAL_FLUSH_SIZE records. On restart the already-fetched venues are skipped
|
|
and prior results are merged into the final file. At most PARTIAL_FLUSH_SIZE
|
|
records (a few seconds of work with 10 workers) are lost on crash.
|
|
|
|
Recheck mode: re-queries venues with slots starting within the next 90 minutes.
|
|
Writes a separate recheck file for more accurate occupancy measurement.
|
|
|
|
Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.jsonl.gz
|
|
Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.jsonl.gz
|
|
"""
|
|
|
|
import gzip
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import threading
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import UTC, datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import niquests
|
|
|
|
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
|
|
from .proxy import load_proxy_tiers, make_tiered_cycler
|
|
from .utils import (
|
|
compress_jsonl_atomic,
|
|
flush_partial_batch,
|
|
landing_path,
|
|
load_partial_results,
|
|
)
|
|
|
|
logger = setup_logging("padelnomics.extract.playtomic_availability")
|
|
|
|
EXTRACTOR_NAME = "playtomic_availability"
|
|
RECHECK_EXTRACTOR_NAME = "playtomic_recheck"
|
|
AVAILABILITY_URL = "https://api.playtomic.io/v1/availability"
|
|
|
|
THROTTLE_SECONDS = 1
|
|
MAX_VENUES_PER_RUN = 20_000
|
|
MAX_RETRIES_PER_VENUE = 2
|
|
RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "30"))
|
|
CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD") or "10")
|
|
# Worker count: defaults to MAX_PROXY_CONCURRENCY (200). Override via PROXY_CONCURRENCY env var.
|
|
_PROXY_CONCURRENCY = os.environ.get("PROXY_CONCURRENCY", "").strip()
|
|
MAX_PROXY_CONCURRENCY = 200
|
|
|
|
# Parallel mode submits futures in batches so the circuit breaker can stop
|
|
# new submissions after it opens. Already-inflight futures in the current
|
|
# batch still complete.
|
|
PARALLEL_BATCH_SIZE = 100
|
|
|
|
# Flush partial results to disk every N records — lose at most this many on crash.
|
|
PARTIAL_FLUSH_SIZE = 50
|
|
|
|
# Thread-local storage for per-worker sessions
|
|
_thread_local = threading.local()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tenant ID loading
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _load_tenant_ids(landing_dir: Path) -> list[str]:
|
|
"""Read tenant IDs from the most recent tenants file (JSONL or blob format)."""
|
|
assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}"
|
|
playtomic_dir = landing_dir / "playtomic"
|
|
if not playtomic_dir.exists():
|
|
return []
|
|
|
|
# Prefer daily partition (YYYY/MM/DD), fall back to older monthly/weekly partitions
|
|
tenant_files = sorted(playtomic_dir.glob("*/*/*/tenants.jsonl.gz"), reverse=True)
|
|
if not tenant_files:
|
|
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.jsonl.gz"), reverse=True)
|
|
if not tenant_files:
|
|
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True)
|
|
if not tenant_files:
|
|
return []
|
|
|
|
latest = tenant_files[0]
|
|
logger.info("Loading tenant IDs from %s", latest)
|
|
ids = []
|
|
|
|
with gzip.open(latest, "rt") as f:
|
|
if latest.name.endswith(".jsonl.gz"):
|
|
# JSONL: one tenant object per line
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
record = json.loads(line)
|
|
tid = record.get("tenant_id") or record.get("id")
|
|
if tid:
|
|
ids.append(tid)
|
|
except json.JSONDecodeError:
|
|
break # truncated last line
|
|
else:
|
|
# Blob: {"tenants": [...]}
|
|
data = json.loads(f.read())
|
|
for t in data.get("tenants", []):
|
|
tid = t.get("tenant_id") or t.get("id")
|
|
if tid:
|
|
ids.append(tid)
|
|
|
|
logger.info("Loaded %d tenant IDs", len(ids))
|
|
return ids
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-venue fetch (used by both serial and parallel modes)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_thread_session(proxy_url: str | None) -> niquests.Session:
|
|
"""Get or create a thread-local niquests.Session with optional proxy."""
|
|
if not hasattr(_thread_local, "session") or _thread_local.session is None:
|
|
session = niquests.Session()
|
|
session.headers["User-Agent"] = ua_for_proxy(proxy_url)
|
|
if proxy_url:
|
|
session.proxies = {"http": proxy_url, "https": proxy_url}
|
|
_thread_local.session = session
|
|
return _thread_local.session
|
|
|
|
|
|
def _fetch_venue_availability(
|
|
tenant_id: str,
|
|
start_min_str: str,
|
|
start_max_str: str,
|
|
proxy_url: str | None,
|
|
) -> dict | None:
|
|
"""Fetch availability for a single venue. Returns payload dict or None on failure."""
|
|
session = _get_thread_session(proxy_url)
|
|
params = {
|
|
"sport_id": "PADEL",
|
|
"tenant_id": tenant_id,
|
|
"start_min": start_min_str,
|
|
"start_max": start_max_str,
|
|
}
|
|
|
|
for attempt in range(MAX_RETRIES_PER_VENUE + 1):
|
|
try:
|
|
resp = session.get(AVAILABILITY_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS)
|
|
|
|
if resp.status_code == 429:
|
|
wait_seconds = THROTTLE_SECONDS * (attempt + 2)
|
|
logger.warning("Rate limited on %s, waiting %ds", tenant_id, wait_seconds)
|
|
time.sleep(wait_seconds)
|
|
continue
|
|
|
|
if resp.status_code >= 500:
|
|
logger.warning(
|
|
"Server error %d for %s (attempt %d)",
|
|
resp.status_code, tenant_id, attempt + 1,
|
|
)
|
|
time.sleep(THROTTLE_SECONDS)
|
|
continue
|
|
|
|
resp.raise_for_status()
|
|
if not proxy_url:
|
|
time.sleep(THROTTLE_SECONDS)
|
|
return {"tenant_id": tenant_id, "slots": resp.json()}
|
|
|
|
except niquests.exceptions.RequestException as e:
|
|
if attempt < MAX_RETRIES_PER_VENUE:
|
|
logger.warning("Request failed for %s (attempt %d): %s", tenant_id, attempt + 1, e)
|
|
time.sleep(THROTTLE_SECONDS)
|
|
else:
|
|
logger.error(
|
|
"Giving up on %s after %d attempts: %s",
|
|
tenant_id, MAX_RETRIES_PER_VENUE + 1, e,
|
|
)
|
|
return None
|
|
|
|
return None # all retries exhausted via 429/5xx
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parallel fetch orchestrator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _fetch_venues_parallel(
|
|
tenant_ids: list[str],
|
|
start_min_str: str,
|
|
start_max_str: str,
|
|
worker_count: int,
|
|
cycler: dict,
|
|
on_result=None,
|
|
) -> tuple[list[dict], int]:
|
|
"""Fetch availability for multiple venues in parallel.
|
|
|
|
Submits futures in batches of PARALLEL_BATCH_SIZE. After each batch
|
|
completes, checks the circuit breaker: if all proxy tiers are exhausted,
|
|
stops submitting further batches.
|
|
|
|
on_result: optional callable(result: dict) invoked inside the lock for
|
|
each successful result — used for incremental partial-file flushing.
|
|
|
|
Returns (venues_data, venues_errored).
|
|
"""
|
|
venues_data: list[dict] = []
|
|
venues_errored = 0
|
|
completed_count = 0
|
|
lock = threading.Lock()
|
|
|
|
def _worker(tenant_id: str) -> tuple[str | None, dict | None]:
|
|
proxy_url = cycler["next_proxy"]()
|
|
result = _fetch_venue_availability(tenant_id, start_min_str, start_max_str, proxy_url)
|
|
return proxy_url, result
|
|
|
|
with ThreadPoolExecutor(max_workers=worker_count) as pool:
|
|
for batch_start in range(0, len(tenant_ids), PARALLEL_BATCH_SIZE):
|
|
# Stop submitting new work if all proxy tiers are exhausted
|
|
if cycler["is_exhausted"]():
|
|
logger.error(
|
|
"All proxy tiers exhausted — stopping after %d/%d venues",
|
|
completed_count, len(tenant_ids),
|
|
)
|
|
break
|
|
|
|
batch = tenant_ids[batch_start:batch_start + PARALLEL_BATCH_SIZE]
|
|
batch_futures = {pool.submit(_worker, tid): tid for tid in batch}
|
|
|
|
for future in as_completed(batch_futures):
|
|
proxy_url, result = future.result()
|
|
with lock:
|
|
completed_count += 1
|
|
if result is not None:
|
|
venues_data.append(result)
|
|
cycler["record_success"](proxy_url)
|
|
if on_result is not None:
|
|
on_result(result)
|
|
else:
|
|
venues_errored += 1
|
|
cycler["record_failure"](proxy_url)
|
|
|
|
if completed_count % 500 == 0:
|
|
logger.info(
|
|
"Progress: %d/%d venues (%d errors, %d workers)",
|
|
completed_count, len(tenant_ids), venues_errored, worker_count,
|
|
)
|
|
|
|
logger.info(
|
|
"Parallel fetch complete: %d/%d venues (%d errors, %d workers)",
|
|
len(venues_data), len(tenant_ids), venues_errored, worker_count,
|
|
)
|
|
return venues_data, venues_errored
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main extraction function
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def extract(
|
|
landing_dir: Path,
|
|
year_month: str,
|
|
conn: sqlite3.Connection,
|
|
session: niquests.Session,
|
|
) -> dict:
|
|
"""Fetch next-day availability for all known Playtomic venues."""
|
|
assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}"
|
|
assert "/" in year_month and len(year_month) == 7, f"year_month must be YYYY/MM: {year_month!r}"
|
|
tenant_ids = _load_tenant_ids(landing_dir)
|
|
if not tenant_ids:
|
|
logger.warning("No tenant IDs found — run extract-playtomic-tenants first")
|
|
return {"files_written": 0, "files_skipped": 0, "bytes_written": 0}
|
|
|
|
# Query tomorrow's slots
|
|
tomorrow = datetime.now(UTC) + timedelta(days=1)
|
|
target_date = tomorrow.strftime("%Y-%m-%d")
|
|
start_min = tomorrow.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
start_max = start_min + timedelta(hours=24)
|
|
|
|
year, month = year_month.split("/")
|
|
dest_dir = landing_path(landing_dir, "playtomic", year, month)
|
|
dest = dest_dir / f"availability_{target_date}.jsonl.gz"
|
|
old_blob = dest_dir / f"availability_{target_date}.json.gz"
|
|
if dest.exists() or old_blob.exists():
|
|
logger.info("Already have availability for %s — skipping", target_date)
|
|
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
|
|
|
# Crash resumption: load already-fetched venues from working file
|
|
partial_path = dest_dir / f"availability_{target_date}.working.jsonl"
|
|
prior_results, already_done = load_partial_results(partial_path, id_key="tenant_id")
|
|
if already_done:
|
|
logger.info("Resuming: %d venues already fetched from partial file", len(already_done))
|
|
|
|
all_venues_to_process = tenant_ids[:MAX_VENUES_PER_RUN]
|
|
venues_to_process = [tid for tid in all_venues_to_process if tid not in already_done]
|
|
|
|
# Set up tiered proxy cycler with circuit breaker
|
|
tiers = load_proxy_tiers()
|
|
worker_count = min(int(_PROXY_CONCURRENCY), MAX_PROXY_CONCURRENCY) if _PROXY_CONCURRENCY else (MAX_PROXY_CONCURRENCY if tiers else 1)
|
|
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD)
|
|
|
|
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
|
|
start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S")
|
|
|
|
# Timestamp stamped into every JSONL line — computed once before the fetch loop.
|
|
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
# Working file for incremental crash-safe progress (IS the final file).
|
|
partial_file = open(partial_path, "a") # noqa: SIM115
|
|
partial_lock = threading.Lock()
|
|
pending_batch: list[dict] = []
|
|
|
|
def _on_result(result: dict) -> None:
|
|
# Called inside _fetch_venues_parallel's lock — no additional locking needed.
|
|
# In serial mode, called single-threaded — also safe without extra locking.
|
|
# Inject date + captured_at so every JSONL line is self-contained.
|
|
result["date"] = target_date
|
|
result["captured_at_utc"] = captured_at
|
|
pending_batch.append(result)
|
|
if len(pending_batch) >= PARTIAL_FLUSH_SIZE:
|
|
flush_partial_batch(partial_file, partial_lock, pending_batch)
|
|
pending_batch.clear()
|
|
|
|
new_venues_data: list[dict] = []
|
|
venues_errored = 0
|
|
|
|
if worker_count > 1:
|
|
logger.info("Parallel mode: %d workers, %d tier(s)", worker_count, len(tiers))
|
|
new_venues_data, venues_errored = _fetch_venues_parallel(
|
|
venues_to_process, start_min_str, start_max_str, worker_count, cycler,
|
|
on_result=_on_result,
|
|
)
|
|
else:
|
|
logger.info("Serial mode: 1 worker, %d venues", len(venues_to_process))
|
|
for i, tenant_id in enumerate(venues_to_process):
|
|
proxy_url = cycler["next_proxy"]()
|
|
result = _fetch_venue_availability(
|
|
tenant_id, start_min_str, start_max_str, proxy_url,
|
|
)
|
|
if result is not None:
|
|
new_venues_data.append(result)
|
|
cycler["record_success"](proxy_url)
|
|
_on_result(result)
|
|
else:
|
|
venues_errored += 1
|
|
cycler["record_failure"](proxy_url)
|
|
if cycler["is_exhausted"]():
|
|
logger.error("All proxy tiers exhausted — writing partial results")
|
|
break
|
|
|
|
if (i + 1) % 100 == 0:
|
|
logger.info(
|
|
"Progress: %d/%d venues queried, %d errors",
|
|
i + 1, len(venues_to_process), venues_errored,
|
|
)
|
|
|
|
# Final flush of any remaining partial batch
|
|
if pending_batch:
|
|
flush_partial_batch(partial_file, partial_lock, pending_batch)
|
|
pending_batch.clear()
|
|
partial_file.close()
|
|
|
|
# Working file IS the output — compress atomically (deletes source).
|
|
total_venues = len(prior_results) + len(new_venues_data)
|
|
bytes_written = compress_jsonl_atomic(partial_path, dest)
|
|
|
|
logger.info(
|
|
"%d venues scraped (%d errors) -> %s (%s bytes)",
|
|
total_venues, venues_errored, dest, f"{bytes_written:,}",
|
|
)
|
|
|
|
return {
|
|
"files_written": 1,
|
|
"files_skipped": 0,
|
|
"bytes_written": bytes_written,
|
|
"cursor_value": f"{target_date}:{len(all_venues_to_process)}",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Recheck mode — re-query venues with upcoming slots for accurate occupancy
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _read_availability_jsonl(path: Path) -> dict:
|
|
"""Read a JSONL availability file into the blob dict format recheck expects."""
|
|
venues = []
|
|
date_val = captured_at = None
|
|
with gzip.open(path, "rt") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
record = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
break # truncated last line on crash
|
|
if date_val is None:
|
|
date_val = record.get("date")
|
|
captured_at = record.get("captured_at_utc")
|
|
venues.append(record)
|
|
return {"date": date_val, "captured_at_utc": captured_at, "venues": venues}
|
|
|
|
|
|
def _load_morning_availability(landing_dir: Path, target_date: str) -> dict | None:
|
|
"""Load today's morning availability file (JSONL or blob). Returns dict or None."""
|
|
playtomic_dir = landing_dir / "playtomic"
|
|
# Try JSONL first (new format), fall back to blob (old format)
|
|
matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.jsonl.gz"))
|
|
if matches:
|
|
return _read_availability_jsonl(matches[0])
|
|
matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.json.gz"))
|
|
if not matches:
|
|
return None
|
|
with gzip.open(matches[0], "rb") as f:
|
|
return json.loads(f.read())
|
|
|
|
|
|
def _find_venues_with_upcoming_slots(
|
|
morning_data: dict, window_start: datetime, window_end: datetime
|
|
) -> list[str]:
|
|
"""Find tenant_ids that have available slots starting within the recheck window."""
|
|
tenant_ids = set()
|
|
for venue in morning_data.get("venues", []):
|
|
tid = venue.get("tenant_id")
|
|
if not tid:
|
|
continue
|
|
for resource in venue.get("slots", []):
|
|
for slot in resource.get("slots", []):
|
|
start_time_str = slot.get("start_time")
|
|
if not start_time_str:
|
|
continue
|
|
try:
|
|
# Parse "2026-02-24T17:00:00" format
|
|
slot_start = datetime.fromisoformat(start_time_str).replace(tzinfo=UTC)
|
|
if window_start <= slot_start < window_end:
|
|
tenant_ids.add(tid)
|
|
break # found one upcoming slot, no need to check more
|
|
except ValueError:
|
|
continue
|
|
if tid in tenant_ids:
|
|
break # already found upcoming slot for this venue
|
|
|
|
return sorted(tenant_ids)
|
|
|
|
|
|
def extract_recheck(
|
|
landing_dir: Path,
|
|
year_month: str,
|
|
conn: sqlite3.Connection,
|
|
session: niquests.Session,
|
|
) -> dict:
|
|
"""Re-query venues with slots starting soon for accurate occupancy data."""
|
|
assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}"
|
|
assert "/" in year_month and len(year_month) == 7, f"year_month must be YYYY/MM: {year_month!r}"
|
|
now = datetime.now(UTC)
|
|
target_date = now.strftime("%Y-%m-%d")
|
|
|
|
# Also check tomorrow if it's late evening
|
|
tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d")
|
|
|
|
morning_data = _load_morning_availability(landing_dir, target_date)
|
|
if morning_data is None:
|
|
# Try tomorrow's file (morning extraction creates it for tomorrow)
|
|
morning_data = _load_morning_availability(landing_dir, tomorrow)
|
|
if morning_data is None:
|
|
logger.info("No morning availability file found — skipping recheck")
|
|
return {"files_written": 0, "files_skipped": 0, "bytes_written": 0}
|
|
target_date = tomorrow
|
|
|
|
# Find venues with slots in the upcoming window
|
|
window_start = now
|
|
window_end = now + timedelta(minutes=RECHECK_WINDOW_MINUTES)
|
|
venues_to_recheck = _find_venues_with_upcoming_slots(morning_data, window_start, window_end)
|
|
|
|
if not venues_to_recheck:
|
|
logger.info("No venues with upcoming slots in next %d min — skipping", RECHECK_WINDOW_MINUTES)
|
|
return {"files_written": 0, "files_skipped": 0, "bytes_written": 0}
|
|
|
|
logger.info(
|
|
"Rechecking %d venues with slots starting in next %d min",
|
|
len(venues_to_recheck), RECHECK_WINDOW_MINUTES,
|
|
)
|
|
|
|
# Fetch availability for the recheck window
|
|
start_min_str = window_start.strftime("%Y-%m-%dT%H:%M:%S")
|
|
start_max_str = window_end.strftime("%Y-%m-%dT%H:%M:%S")
|
|
|
|
# Set up tiered proxy cycler with circuit breaker
|
|
tiers = load_proxy_tiers()
|
|
worker_count = min(int(_PROXY_CONCURRENCY), MAX_PROXY_CONCURRENCY) if _PROXY_CONCURRENCY else (MAX_PROXY_CONCURRENCY if tiers else 1)
|
|
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD)
|
|
|
|
if worker_count > 1 and len(venues_to_recheck) > 10:
|
|
venues_data, venues_errored = _fetch_venues_parallel(
|
|
venues_to_recheck, start_min_str, start_max_str, worker_count, cycler,
|
|
)
|
|
else:
|
|
venues_data = []
|
|
venues_errored = 0
|
|
for tid in venues_to_recheck:
|
|
proxy_url = cycler["next_proxy"]()
|
|
result = _fetch_venue_availability(tid, start_min_str, start_max_str, proxy_url)
|
|
if result is not None:
|
|
venues_data.append(result)
|
|
cycler["record_success"](proxy_url)
|
|
else:
|
|
venues_errored += 1
|
|
cycler["record_failure"](proxy_url)
|
|
if cycler["is_exhausted"]():
|
|
logger.error("All proxy tiers exhausted — writing partial recheck results")
|
|
break
|
|
|
|
# Write recheck file as JSONL — one venue per line with metadata injected
|
|
recheck_hour = now.hour
|
|
year, month = year_month.split("/")
|
|
dest_dir = landing_path(landing_dir, "playtomic", year, month)
|
|
dest = dest_dir / f"availability_{target_date}_recheck_{recheck_hour:02d}.jsonl.gz"
|
|
|
|
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
working_path = dest.with_suffix("").with_suffix(".working.jsonl")
|
|
with open(working_path, "w") as f:
|
|
for venue in venues_data:
|
|
venue["date"] = target_date
|
|
venue["captured_at_utc"] = captured_at
|
|
venue["recheck_hour"] = recheck_hour
|
|
f.write(json.dumps(venue, separators=(",", ":")) + "\n")
|
|
bytes_written = compress_jsonl_atomic(working_path, dest)
|
|
|
|
logger.info(
|
|
"Recheck: %d/%d venues (%d errors) -> %s (%s bytes)",
|
|
len(venues_data), len(venues_to_recheck), venues_errored, dest, f"{bytes_written:,}",
|
|
)
|
|
|
|
return {
|
|
"files_written": 1,
|
|
"files_skipped": 0,
|
|
"bytes_written": bytes_written,
|
|
"cursor_value": f"{target_date}:recheck:{recheck_hour}",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry points
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> None:
|
|
run_extractor(EXTRACTOR_NAME, extract)
|
|
|
|
|
|
def main_recheck() -> None:
|
|
run_extractor(RECHECK_EXTRACTOR_NAME, extract_recheck)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|