Files
padelnomics/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py
Deeman b7c8568265 fix(extract): recheck window 90→60 min (correct reasoning this time)
60-min window + hourly rechecks = each slot caught exactly once, 0-60 min
before it starts. 90-min window causes double-querying (T-90 and T-30).
Slot duration is irrelevant — it doesn't affect when the slot appears in
the window.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-25 09:37:17 +01:00

520 lines
20 KiB
Python

"""Playtomic availability extractor — booking slot data for market intelligence.
Reads tenant IDs from the latest tenants.json.gz, then queries the
unauthenticated /v1/availability endpoint for each venue's next-day slots.
This is the highest-value source: daily snapshots enable occupancy rate
estimation, pricing benchmarking, and demand signal detection.
Parallel mode: worker count is derived from PROXY_URLS (one worker per proxy).
Without proxies, runs single-threaded with per-request throttling.
Crash resumption: progress is flushed to a .partial.jsonl sidecar file every
PARTIAL_FLUSH_SIZE records. On restart the already-fetched venues are skipped
and prior results are merged into the final file. At most PARTIAL_FLUSH_SIZE
records (a few seconds of work with 10 workers) are lost on crash.
Recheck mode: re-queries venues with slots starting within the next 90 minutes.
Writes a separate recheck file for more accurate occupancy measurement.
Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.json.gz
Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.json.gz
"""
import gzip
import json
import os
import sqlite3
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import UTC, datetime, timedelta
from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging
from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler
from .utils import flush_partial_batch, landing_path, load_partial_results, write_gzip_atomic
logger = setup_logging("padelnomics.extract.playtomic_availability")
EXTRACTOR_NAME = "playtomic_availability"
RECHECK_EXTRACTOR_NAME = "playtomic_recheck"
AVAILABILITY_URL = "https://api.playtomic.io/v1/availability"
THROTTLE_SECONDS = 1
MAX_VENUES_PER_RUN = 20_000
MAX_RETRIES_PER_VENUE = 2
RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "60"))
CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD") or "10")
# Parallel mode submits futures in batches so the circuit breaker can stop
# new submissions after it opens. Already-inflight futures in the current
# batch still complete.
PARALLEL_BATCH_SIZE = 100
# Flush partial results to disk every N records — lose at most this many on crash.
PARTIAL_FLUSH_SIZE = 50
# Thread-local storage for per-worker sessions
_thread_local = threading.local()
# ---------------------------------------------------------------------------
# Tenant ID loading
# ---------------------------------------------------------------------------
def _load_tenant_ids(landing_dir: Path) -> list[str]:
"""Read tenant IDs from the most recent tenants.json.gz file."""
assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}"
playtomic_dir = landing_dir / "playtomic"
if not playtomic_dir.exists():
return []
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True)
if not tenant_files:
return []
latest = tenant_files[0]
logger.info("Loading tenant IDs from %s", latest)
with gzip.open(latest, "rb") as f:
data = json.loads(f.read())
tenants = data.get("tenants", [])
ids = []
for t in tenants:
tid = t.get("tenant_id") or t.get("id")
if tid:
ids.append(tid)
logger.info("Loaded %d tenant IDs", len(ids))
return ids
# ---------------------------------------------------------------------------
# Per-venue fetch (used by both serial and parallel modes)
# ---------------------------------------------------------------------------
def _get_thread_session(proxy_url: str | None) -> niquests.Session:
"""Get or create a thread-local niquests.Session with optional proxy."""
if not hasattr(_thread_local, "session") or _thread_local.session is None:
session = niquests.Session()
session.headers["User-Agent"] = USER_AGENT
if proxy_url:
session.proxies = {"http": proxy_url, "https": proxy_url}
_thread_local.session = session
return _thread_local.session
def _fetch_venue_availability(
tenant_id: str,
start_min_str: str,
start_max_str: str,
proxy_url: str | None,
) -> dict | None:
"""Fetch availability for a single venue. Returns payload dict or None on failure."""
session = _get_thread_session(proxy_url)
params = {
"sport_id": "PADEL",
"tenant_id": tenant_id,
"start_min": start_min_str,
"start_max": start_max_str,
}
for attempt in range(MAX_RETRIES_PER_VENUE + 1):
try:
resp = session.get(AVAILABILITY_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS)
if resp.status_code == 429:
wait_seconds = THROTTLE_SECONDS * (attempt + 2)
logger.warning("Rate limited on %s, waiting %ds", tenant_id, wait_seconds)
time.sleep(wait_seconds)
continue
if resp.status_code >= 500:
logger.warning(
"Server error %d for %s (attempt %d)",
resp.status_code, tenant_id, attempt + 1,
)
time.sleep(THROTTLE_SECONDS)
continue
resp.raise_for_status()
if not proxy_url:
time.sleep(THROTTLE_SECONDS)
return {"tenant_id": tenant_id, "slots": resp.json()}
except niquests.exceptions.RequestException as e:
if attempt < MAX_RETRIES_PER_VENUE:
logger.warning("Request failed for %s (attempt %d): %s", tenant_id, attempt + 1, e)
time.sleep(THROTTLE_SECONDS)
else:
logger.error(
"Giving up on %s after %d attempts: %s",
tenant_id, MAX_RETRIES_PER_VENUE + 1, e,
)
return None
return None # all retries exhausted via 429/5xx
# ---------------------------------------------------------------------------
# Parallel fetch orchestrator
# ---------------------------------------------------------------------------
def _fetch_venues_parallel(
tenant_ids: list[str],
start_min_str: str,
start_max_str: str,
worker_count: int,
cycler: dict,
fallback_urls: list[str],
on_result=None,
) -> tuple[list[dict], int]:
"""Fetch availability for multiple venues in parallel.
Submits futures in batches of PARALLEL_BATCH_SIZE. After each batch
completes, checks the circuit breaker: if it opened and there is no
fallback configured, stops submitting further batches.
on_result: optional callable(result: dict) invoked inside the lock for
each successful result — used for incremental partial-file flushing.
Returns (venues_data, venues_errored).
"""
venues_data: list[dict] = []
venues_errored = 0
completed_count = 0
lock = threading.Lock()
def _worker(tenant_id: str) -> dict | None:
proxy_url = cycler["next_proxy"]()
return _fetch_venue_availability(tenant_id, start_min_str, start_max_str, proxy_url)
with ThreadPoolExecutor(max_workers=worker_count) as pool:
for batch_start in range(0, len(tenant_ids), PARALLEL_BATCH_SIZE):
# Stop submitting new work if circuit is open with no fallback
if cycler["is_fallback_active"]() and not fallback_urls:
logger.error(
"Circuit open with no fallback — stopping after %d/%d venues",
completed_count, len(tenant_ids),
)
break
batch = tenant_ids[batch_start:batch_start + PARALLEL_BATCH_SIZE]
batch_futures = {pool.submit(_worker, tid): tid for tid in batch}
for future in as_completed(batch_futures):
result = future.result()
with lock:
completed_count += 1
if result is not None:
venues_data.append(result)
cycler["record_success"]()
if on_result is not None:
on_result(result)
else:
venues_errored += 1
cycler["record_failure"]()
if completed_count % 500 == 0:
logger.info(
"Progress: %d/%d venues (%d errors, %d workers)",
completed_count, len(tenant_ids), venues_errored, worker_count,
)
logger.info(
"Parallel fetch complete: %d/%d venues (%d errors, %d workers)",
len(venues_data), len(tenant_ids), venues_errored, worker_count,
)
return venues_data, venues_errored
# ---------------------------------------------------------------------------
# Main extraction function
# ---------------------------------------------------------------------------
def extract(
landing_dir: Path,
year_month: str,
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Fetch next-day availability for all known Playtomic venues."""
assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}"
assert "/" in year_month and len(year_month) == 7, f"year_month must be YYYY/MM: {year_month!r}"
tenant_ids = _load_tenant_ids(landing_dir)
if not tenant_ids:
logger.warning("No tenant IDs found — run extract-playtomic-tenants first")
return {"files_written": 0, "files_skipped": 0, "bytes_written": 0}
# Query tomorrow's slots
tomorrow = datetime.now(UTC) + timedelta(days=1)
target_date = tomorrow.strftime("%Y-%m-%d")
start_min = tomorrow.replace(hour=0, minute=0, second=0, microsecond=0)
start_max = start_min + timedelta(hours=24)
year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "playtomic", year, month)
dest = dest_dir / f"availability_{target_date}.json.gz"
if dest.exists():
logger.info("Already have %s — skipping", dest)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
# Crash resumption: load already-fetched venues from partial file
partial_path = dest.with_suffix(".partial.jsonl")
prior_results, already_done = load_partial_results(partial_path, id_key="tenant_id")
if already_done:
logger.info("Resuming: %d venues already fetched from partial file", len(already_done))
all_venues_to_process = tenant_ids[:MAX_VENUES_PER_RUN]
venues_to_process = [tid for tid in all_venues_to_process if tid not in already_done]
# Set up tiered proxy cycler with circuit breaker
proxy_urls = load_proxy_urls()
fallback_urls = load_fallback_proxy_urls()
worker_count = len(proxy_urls) if proxy_urls else 1
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S")
# Partial file for incremental crash-safe progress
partial_file = open(partial_path, "a") # noqa: SIM115
partial_lock = threading.Lock()
pending_batch: list[dict] = []
def _on_result(result: dict) -> None:
# Called inside _fetch_venues_parallel's lock — no additional locking needed.
# In serial mode, called single-threaded — also safe without extra locking.
pending_batch.append(result)
if len(pending_batch) >= PARTIAL_FLUSH_SIZE:
flush_partial_batch(partial_file, partial_lock, pending_batch)
pending_batch.clear()
new_venues_data: list[dict] = []
venues_errored = 0
if worker_count > 1:
logger.info("Parallel mode: %d workers, %d proxies", worker_count, len(proxy_urls))
new_venues_data, venues_errored = _fetch_venues_parallel(
venues_to_process, start_min_str, start_max_str, worker_count, cycler, fallback_urls,
on_result=_on_result,
)
else:
logger.info("Serial mode: 1 worker, %d venues", len(venues_to_process))
for i, tenant_id in enumerate(venues_to_process):
result = _fetch_venue_availability(
tenant_id, start_min_str, start_max_str, cycler["next_proxy"](),
)
if result is not None:
new_venues_data.append(result)
cycler["record_success"]()
_on_result(result)
else:
venues_errored += 1
circuit_opened = cycler["record_failure"]()
if circuit_opened and not fallback_urls:
logger.error("Circuit open with no fallback — writing partial results")
break
if (i + 1) % 100 == 0:
logger.info(
"Progress: %d/%d venues queried, %d errors",
i + 1, len(venues_to_process), venues_errored,
)
# Final flush of any remaining partial batch
if pending_batch:
flush_partial_batch(partial_file, partial_lock, pending_batch)
pending_batch.clear()
partial_file.close()
# Consolidate prior (resumed) + new results into final file
venues_data = prior_results + new_venues_data
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
payload = json.dumps({
"date": target_date,
"captured_at_utc": captured_at,
"venue_count": len(venues_data),
"venues_errored": venues_errored,
"venues": venues_data,
}).encode()
bytes_written = write_gzip_atomic(dest, payload)
if partial_path.exists():
partial_path.unlink()
logger.info(
"%d venues scraped (%d errors) -> %s (%s bytes)",
len(venues_data), venues_errored, dest, f"{bytes_written:,}",
)
return {
"files_written": 1,
"files_skipped": 0,
"bytes_written": bytes_written,
"cursor_value": f"{target_date}:{len(all_venues_to_process)}",
}
# ---------------------------------------------------------------------------
# Recheck mode — re-query venues with upcoming slots for accurate occupancy
# ---------------------------------------------------------------------------
def _load_morning_availability(landing_dir: Path, target_date: str) -> dict | None:
"""Load today's morning availability file. Returns parsed JSON or None."""
playtomic_dir = landing_dir / "playtomic"
# Search across year/month dirs for the target date
matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.json.gz"))
if not matches:
return None
with gzip.open(matches[0], "rb") as f:
return json.loads(f.read())
def _find_venues_with_upcoming_slots(
morning_data: dict, window_start: datetime, window_end: datetime
) -> list[str]:
"""Find tenant_ids that have available slots starting within the recheck window."""
tenant_ids = set()
for venue in morning_data.get("venues", []):
tid = venue.get("tenant_id")
if not tid:
continue
for resource in venue.get("slots", []):
for slot in resource.get("slots", []):
start_time_str = slot.get("start_time")
if not start_time_str:
continue
try:
# Parse "2026-02-24T17:00:00" format
slot_start = datetime.fromisoformat(start_time_str).replace(tzinfo=UTC)
if window_start <= slot_start < window_end:
tenant_ids.add(tid)
break # found one upcoming slot, no need to check more
except ValueError:
continue
if tid in tenant_ids:
break # already found upcoming slot for this venue
return sorted(tenant_ids)
def extract_recheck(
landing_dir: Path,
year_month: str,
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Re-query venues with slots starting soon for accurate occupancy data."""
assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}"
assert "/" in year_month and len(year_month) == 7, f"year_month must be YYYY/MM: {year_month!r}"
now = datetime.now(UTC)
target_date = now.strftime("%Y-%m-%d")
# Also check tomorrow if it's late evening
tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d")
morning_data = _load_morning_availability(landing_dir, target_date)
if morning_data is None:
# Try tomorrow's file (morning extraction creates it for tomorrow)
morning_data = _load_morning_availability(landing_dir, tomorrow)
if morning_data is None:
logger.info("No morning availability file found — skipping recheck")
return {"files_written": 0, "files_skipped": 0, "bytes_written": 0}
target_date = tomorrow
# Find venues with slots in the upcoming window
window_start = now
window_end = now + timedelta(minutes=RECHECK_WINDOW_MINUTES)
venues_to_recheck = _find_venues_with_upcoming_slots(morning_data, window_start, window_end)
if not venues_to_recheck:
logger.info("No venues with upcoming slots in next %d min — skipping", RECHECK_WINDOW_MINUTES)
return {"files_written": 0, "files_skipped": 0, "bytes_written": 0}
logger.info(
"Rechecking %d venues with slots starting in next %d min",
len(venues_to_recheck), RECHECK_WINDOW_MINUTES,
)
# Fetch availability for the recheck window
start_min_str = window_start.strftime("%Y-%m-%dT%H:%M:%S")
start_max_str = window_end.strftime("%Y-%m-%dT%H:%M:%S")
# Set up tiered proxy cycler with circuit breaker
proxy_urls = load_proxy_urls()
fallback_urls = load_fallback_proxy_urls()
worker_count = len(proxy_urls) if proxy_urls else 1
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
if worker_count > 1 and len(venues_to_recheck) > 10:
venues_data, venues_errored = _fetch_venues_parallel(
venues_to_recheck, start_min_str, start_max_str, worker_count, cycler, fallback_urls,
)
else:
venues_data = []
venues_errored = 0
for tid in venues_to_recheck:
result = _fetch_venue_availability(tid, start_min_str, start_max_str, cycler["next_proxy"]())
if result is not None:
venues_data.append(result)
cycler["record_success"]()
else:
venues_errored += 1
circuit_opened = cycler["record_failure"]()
if circuit_opened and not fallback_urls:
logger.error("Circuit open with no fallback — writing partial recheck results")
break
# Write recheck file
recheck_hour = now.hour
year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "playtomic", year, month)
dest = dest_dir / f"availability_{target_date}_recheck_{recheck_hour:02d}.json.gz"
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
payload = json.dumps({
"date": target_date,
"captured_at_utc": captured_at,
"recheck_hour": recheck_hour,
"recheck_window_minutes": RECHECK_WINDOW_MINUTES,
"rechecked_tenant_ids": venues_to_recheck,
"venue_count": len(venues_data),
"venues_errored": venues_errored,
"venues": venues_data,
}).encode()
bytes_written = write_gzip_atomic(dest, payload)
logger.info(
"Recheck: %d/%d venues (%d errors) -> %s (%s bytes)",
len(venues_data), len(venues_to_recheck), venues_errored, dest, f"{bytes_written:,}",
)
return {
"files_written": 1,
"files_skipped": 0,
"bytes_written": bytes_written,
"cursor_value": f"{target_date}:recheck:{recheck_hour}",
}
# ---------------------------------------------------------------------------
# Entry points
# ---------------------------------------------------------------------------
def main() -> None:
run_extractor(EXTRACTOR_NAME, extract)
def main_recheck() -> None:
run_extractor(RECHECK_EXTRACTOR_NAME, extract_recheck)
if __name__ == "__main__":
main()