perf(extract): auto-detect workers from proxies, skip throttle on success, crash-safe partial JSONL
- proxy.py: delete unused make_sticky_selector() - utils.py: add load_partial_results() + flush_partial_batch() for crash-resumable extraction - playtomic_availability.py: - drop MAX_WORKERS / EXTRACT_WORKERS — worker_count = len(proxy_urls) or 1 - skip time.sleep(THROTTLE_SECONDS) on success when proxy_url is set; keep sleeps for 429/5xx - replace cursor-based resumption with .partial.jsonl sidecar (flush every 50 records) - _fetch_venues_parallel accepts on_result callback for incremental partial-file flushing - mirror auto-detect worker count in extract_recheck() Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -5,8 +5,13 @@ unauthenticated /v1/availability endpoint for each venue's next-day slots.
|
|||||||
This is the highest-value source: daily snapshots enable occupancy rate
|
This is the highest-value source: daily snapshots enable occupancy rate
|
||||||
estimation, pricing benchmarking, and demand signal detection.
|
estimation, pricing benchmarking, and demand signal detection.
|
||||||
|
|
||||||
Parallel mode: set EXTRACT_WORKERS=N and PROXY_URLS=... to fetch N venues
|
Parallel mode: worker count is derived from PROXY_URLS (one worker per proxy).
|
||||||
concurrently (one proxy per worker). Without proxies, runs single-threaded.
|
Without proxies, runs single-threaded with per-request throttling.
|
||||||
|
|
||||||
|
Crash resumption: progress is flushed to a .partial.jsonl sidecar file every
|
||||||
|
PARTIAL_FLUSH_SIZE records. On restart the already-fetched venues are skipped
|
||||||
|
and prior results are merged into the final file. At most PARTIAL_FLUSH_SIZE
|
||||||
|
records (a few seconds of work with 10 workers) are lost on crash.
|
||||||
|
|
||||||
Recheck mode: re-queries venues with slots starting within the next 90 minutes.
|
Recheck mode: re-queries venues with slots starting within the next 90 minutes.
|
||||||
Writes a separate recheck file for more accurate occupancy measurement.
|
Writes a separate recheck file for more accurate occupancy measurement.
|
||||||
@@ -29,7 +34,7 @@ import niquests
|
|||||||
|
|
||||||
from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging
|
from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging
|
||||||
from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler
|
from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler
|
||||||
from .utils import get_last_cursor, landing_path, write_gzip_atomic
|
from .utils import flush_partial_batch, landing_path, load_partial_results, write_gzip_atomic
|
||||||
|
|
||||||
logger = setup_logging("padelnomics.extract.playtomic_availability")
|
logger = setup_logging("padelnomics.extract.playtomic_availability")
|
||||||
|
|
||||||
@@ -40,7 +45,6 @@ AVAILABILITY_URL = "https://api.playtomic.io/v1/availability"
|
|||||||
THROTTLE_SECONDS = 1
|
THROTTLE_SECONDS = 1
|
||||||
MAX_VENUES_PER_RUN = 20_000
|
MAX_VENUES_PER_RUN = 20_000
|
||||||
MAX_RETRIES_PER_VENUE = 2
|
MAX_RETRIES_PER_VENUE = 2
|
||||||
MAX_WORKERS = int(os.environ.get("EXTRACT_WORKERS", "1"))
|
|
||||||
RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "90"))
|
RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "90"))
|
||||||
CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD", "10"))
|
CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD", "10"))
|
||||||
|
|
||||||
@@ -49,6 +53,9 @@ CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD", "10"
|
|||||||
# batch still complete.
|
# batch still complete.
|
||||||
PARALLEL_BATCH_SIZE = 100
|
PARALLEL_BATCH_SIZE = 100
|
||||||
|
|
||||||
|
# Flush partial results to disk every N records — lose at most this many on crash.
|
||||||
|
PARTIAL_FLUSH_SIZE = 50
|
||||||
|
|
||||||
# Thread-local storage for per-worker sessions
|
# Thread-local storage for per-worker sessions
|
||||||
_thread_local = threading.local()
|
_thread_local = threading.local()
|
||||||
|
|
||||||
@@ -84,22 +91,6 @@ def _load_tenant_ids(landing_dir: Path) -> list[str]:
|
|||||||
return ids
|
return ids
|
||||||
|
|
||||||
|
|
||||||
def _parse_resume_cursor(cursor: str | None, target_date: str) -> int:
|
|
||||||
"""Parse cursor_value to find resume index. Returns 0 if no valid cursor."""
|
|
||||||
if not cursor:
|
|
||||||
return 0
|
|
||||||
parts = cursor.split(":", 1)
|
|
||||||
if len(parts) != 2:
|
|
||||||
return 0
|
|
||||||
cursor_date, cursor_index = parts
|
|
||||||
if cursor_date != target_date:
|
|
||||||
return 0
|
|
||||||
try:
|
|
||||||
return int(cursor_index)
|
|
||||||
except ValueError:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Per-venue fetch (used by both serial and parallel modes)
|
# Per-venue fetch (used by both serial and parallel modes)
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -149,7 +140,8 @@ def _fetch_venue_availability(
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
time.sleep(THROTTLE_SECONDS)
|
if not proxy_url:
|
||||||
|
time.sleep(THROTTLE_SECONDS)
|
||||||
return {"tenant_id": tenant_id, "slots": resp.json()}
|
return {"tenant_id": tenant_id, "slots": resp.json()}
|
||||||
|
|
||||||
except niquests.exceptions.RequestException as e:
|
except niquests.exceptions.RequestException as e:
|
||||||
@@ -177,6 +169,7 @@ def _fetch_venues_parallel(
|
|||||||
worker_count: int,
|
worker_count: int,
|
||||||
cycler: dict,
|
cycler: dict,
|
||||||
fallback_urls: list[str],
|
fallback_urls: list[str],
|
||||||
|
on_result=None,
|
||||||
) -> tuple[list[dict], int]:
|
) -> tuple[list[dict], int]:
|
||||||
"""Fetch availability for multiple venues in parallel.
|
"""Fetch availability for multiple venues in parallel.
|
||||||
|
|
||||||
@@ -184,6 +177,9 @@ def _fetch_venues_parallel(
|
|||||||
completes, checks the circuit breaker: if it opened and there is no
|
completes, checks the circuit breaker: if it opened and there is no
|
||||||
fallback configured, stops submitting further batches.
|
fallback configured, stops submitting further batches.
|
||||||
|
|
||||||
|
on_result: optional callable(result: dict) invoked inside the lock for
|
||||||
|
each successful result — used for incremental partial-file flushing.
|
||||||
|
|
||||||
Returns (venues_data, venues_errored).
|
Returns (venues_data, venues_errored).
|
||||||
"""
|
"""
|
||||||
venues_data: list[dict] = []
|
venues_data: list[dict] = []
|
||||||
@@ -215,6 +211,8 @@ def _fetch_venues_parallel(
|
|||||||
if result is not None:
|
if result is not None:
|
||||||
venues_data.append(result)
|
venues_data.append(result)
|
||||||
cycler["record_success"]()
|
cycler["record_success"]()
|
||||||
|
if on_result is not None:
|
||||||
|
on_result(result)
|
||||||
else:
|
else:
|
||||||
venues_errored += 1
|
venues_errored += 1
|
||||||
cycler["record_failure"]()
|
cycler["record_failure"]()
|
||||||
@@ -262,41 +260,56 @@ def extract(
|
|||||||
logger.info("Already have %s — skipping", dest)
|
logger.info("Already have %s — skipping", dest)
|
||||||
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||||
|
|
||||||
# Resume from cursor if crashed mid-run
|
# Crash resumption: load already-fetched venues from partial file
|
||||||
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
|
partial_path = dest.with_suffix(".partial.jsonl")
|
||||||
resume_index = _parse_resume_cursor(last_cursor, target_date)
|
prior_results, already_done = load_partial_results(partial_path, id_key="tenant_id")
|
||||||
if resume_index > 0:
|
if already_done:
|
||||||
logger.info("Resuming from index %d (cursor: %s)", resume_index, last_cursor)
|
logger.info("Resuming: %d venues already fetched from partial file", len(already_done))
|
||||||
|
|
||||||
venues_to_process = tenant_ids[:MAX_VENUES_PER_RUN]
|
all_venues_to_process = tenant_ids[:MAX_VENUES_PER_RUN]
|
||||||
if resume_index > 0:
|
venues_to_process = [tid for tid in all_venues_to_process if tid not in already_done]
|
||||||
venues_to_process = venues_to_process[resume_index:]
|
|
||||||
|
|
||||||
# Set up tiered proxy cycler with circuit breaker
|
# Set up tiered proxy cycler with circuit breaker
|
||||||
proxy_urls = load_proxy_urls()
|
proxy_urls = load_proxy_urls()
|
||||||
fallback_urls = load_fallback_proxy_urls()
|
fallback_urls = load_fallback_proxy_urls()
|
||||||
worker_count = min(MAX_WORKERS, len(proxy_urls)) if proxy_urls else 1
|
worker_count = len(proxy_urls) if proxy_urls else 1
|
||||||
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
|
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
|
||||||
|
|
||||||
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
|
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
|
||||||
start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S")
|
start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S")
|
||||||
|
|
||||||
|
# Partial file for incremental crash-safe progress
|
||||||
|
partial_file = open(partial_path, "a") # noqa: SIM115
|
||||||
|
partial_lock = threading.Lock()
|
||||||
|
pending_batch: list[dict] = []
|
||||||
|
|
||||||
|
def _on_result(result: dict) -> None:
|
||||||
|
# Called inside _fetch_venues_parallel's lock — no additional locking needed.
|
||||||
|
# In serial mode, called single-threaded — also safe without extra locking.
|
||||||
|
pending_batch.append(result)
|
||||||
|
if len(pending_batch) >= PARTIAL_FLUSH_SIZE:
|
||||||
|
flush_partial_batch(partial_file, partial_lock, pending_batch)
|
||||||
|
pending_batch.clear()
|
||||||
|
|
||||||
|
new_venues_data: list[dict] = []
|
||||||
|
venues_errored = 0
|
||||||
|
|
||||||
if worker_count > 1:
|
if worker_count > 1:
|
||||||
logger.info("Parallel mode: %d workers, %d proxies", worker_count, len(proxy_urls))
|
logger.info("Parallel mode: %d workers, %d proxies", worker_count, len(proxy_urls))
|
||||||
venues_data, venues_errored = _fetch_venues_parallel(
|
new_venues_data, venues_errored = _fetch_venues_parallel(
|
||||||
venues_to_process, start_min_str, start_max_str, worker_count, cycler, fallback_urls,
|
venues_to_process, start_min_str, start_max_str, worker_count, cycler, fallback_urls,
|
||||||
|
on_result=_on_result,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
logger.info("Serial mode: 1 worker, %d venues", len(venues_to_process))
|
logger.info("Serial mode: 1 worker, %d venues", len(venues_to_process))
|
||||||
venues_data = []
|
|
||||||
venues_errored = 0
|
|
||||||
for i, tenant_id in enumerate(venues_to_process):
|
for i, tenant_id in enumerate(venues_to_process):
|
||||||
result = _fetch_venue_availability(
|
result = _fetch_venue_availability(
|
||||||
tenant_id, start_min_str, start_max_str, cycler["next_proxy"](),
|
tenant_id, start_min_str, start_max_str, cycler["next_proxy"](),
|
||||||
)
|
)
|
||||||
if result is not None:
|
if result is not None:
|
||||||
venues_data.append(result)
|
new_venues_data.append(result)
|
||||||
cycler["record_success"]()
|
cycler["record_success"]()
|
||||||
|
_on_result(result)
|
||||||
else:
|
else:
|
||||||
venues_errored += 1
|
venues_errored += 1
|
||||||
circuit_opened = cycler["record_failure"]()
|
circuit_opened = cycler["record_failure"]()
|
||||||
@@ -310,7 +323,14 @@ def extract(
|
|||||||
i + 1, len(venues_to_process), venues_errored,
|
i + 1, len(venues_to_process), venues_errored,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Write consolidated file
|
# Final flush of any remaining partial batch
|
||||||
|
if pending_batch:
|
||||||
|
flush_partial_batch(partial_file, partial_lock, pending_batch)
|
||||||
|
pending_batch.clear()
|
||||||
|
partial_file.close()
|
||||||
|
|
||||||
|
# Consolidate prior (resumed) + new results into final file
|
||||||
|
venues_data = prior_results + new_venues_data
|
||||||
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
|
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
payload = json.dumps({
|
payload = json.dumps({
|
||||||
"date": target_date,
|
"date": target_date,
|
||||||
@@ -321,6 +341,9 @@ def extract(
|
|||||||
}).encode()
|
}).encode()
|
||||||
|
|
||||||
bytes_written = write_gzip_atomic(dest, payload)
|
bytes_written = write_gzip_atomic(dest, payload)
|
||||||
|
if partial_path.exists():
|
||||||
|
partial_path.unlink()
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"%d venues scraped (%d errors) -> %s (%s bytes)",
|
"%d venues scraped (%d errors) -> %s (%s bytes)",
|
||||||
len(venues_data), venues_errored, dest, f"{bytes_written:,}",
|
len(venues_data), venues_errored, dest, f"{bytes_written:,}",
|
||||||
@@ -330,7 +353,7 @@ def extract(
|
|||||||
"files_written": 1,
|
"files_written": 1,
|
||||||
"files_skipped": 0,
|
"files_skipped": 0,
|
||||||
"bytes_written": bytes_written,
|
"bytes_written": bytes_written,
|
||||||
"cursor_value": f"{target_date}:{len(tenant_ids[:MAX_VENUES_PER_RUN])}",
|
"cursor_value": f"{target_date}:{len(all_venues_to_process)}",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -421,7 +444,7 @@ def extract_recheck(
|
|||||||
# Set up tiered proxy cycler with circuit breaker
|
# Set up tiered proxy cycler with circuit breaker
|
||||||
proxy_urls = load_proxy_urls()
|
proxy_urls = load_proxy_urls()
|
||||||
fallback_urls = load_fallback_proxy_urls()
|
fallback_urls = load_fallback_proxy_urls()
|
||||||
worker_count = min(MAX_WORKERS, len(proxy_urls)) if proxy_urls else 1
|
worker_count = len(proxy_urls) if proxy_urls else 1
|
||||||
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
|
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
|
||||||
|
|
||||||
if worker_count > 1 and len(venues_to_recheck) > 10:
|
if worker_count > 1 and len(venues_to_recheck) > 10:
|
||||||
|
|||||||
@@ -3,10 +3,6 @@
|
|||||||
Proxies are configured via the PROXY_URLS environment variable (comma-separated).
|
Proxies are configured via the PROXY_URLS environment variable (comma-separated).
|
||||||
When unset, all functions return None/no-op — extractors fall back to direct requests.
|
When unset, all functions return None/no-op — extractors fall back to direct requests.
|
||||||
|
|
||||||
Two routing modes:
|
|
||||||
round-robin — distribute requests evenly across proxies (default)
|
|
||||||
sticky — same key always maps to same proxy (for session-tracked sites)
|
|
||||||
|
|
||||||
Tiered proxy with circuit breaker:
|
Tiered proxy with circuit breaker:
|
||||||
Primary tier (PROXY_URLS) is used by default — typically cheap datacenter proxies.
|
Primary tier (PROXY_URLS) is used by default — typically cheap datacenter proxies.
|
||||||
Fallback tier (PROXY_URLS_FALLBACK) activates once consecutive failures >= threshold.
|
Fallback tier (PROXY_URLS_FALLBACK) activates once consecutive failures >= threshold.
|
||||||
@@ -141,17 +137,3 @@ def make_tiered_cycler(
|
|||||||
"is_fallback_active": is_fallback_active,
|
"is_fallback_active": is_fallback_active,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def make_sticky_selector(proxy_urls: list[str]):
|
|
||||||
"""Consistent-hash proxy selector — same key always maps to same proxy.
|
|
||||||
|
|
||||||
Use when the target site tracks sessions by IP (e.g. Cloudflare).
|
|
||||||
Returns a callable: select_proxy(key: str) -> str | None
|
|
||||||
"""
|
|
||||||
if not proxy_urls:
|
|
||||||
return lambda key: None
|
|
||||||
|
|
||||||
def select_proxy(key: str) -> str:
|
|
||||||
return proxy_urls[hash(key) % len(proxy_urls)]
|
|
||||||
|
|
||||||
return select_proxy
|
|
||||||
|
|||||||
@@ -7,7 +7,9 @@ if you add multiple data sources, extract them to a shared workspace package.
|
|||||||
|
|
||||||
import gzip
|
import gzip
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import json
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import threading
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -117,6 +119,50 @@ def content_hash(data: bytes, prefix_bytes: int = 8) -> str:
|
|||||||
return hashlib.sha256(data).hexdigest()[:prefix_bytes]
|
return hashlib.sha256(data).hexdigest()[:prefix_bytes]
|
||||||
|
|
||||||
|
|
||||||
|
def load_partial_results(partial_path: Path, id_key: str) -> tuple[list[dict], set[str]]:
|
||||||
|
"""Load already-completed records from a partial JSONL file (crash recovery).
|
||||||
|
|
||||||
|
Returns (records, seen_ids). If the file doesn't exist, returns ([], set()).
|
||||||
|
Gracefully handles a truncated last line from a mid-write crash.
|
||||||
|
"""
|
||||||
|
records: list[dict] = []
|
||||||
|
seen_ids: set[str] = set()
|
||||||
|
if not partial_path.exists():
|
||||||
|
return records, seen_ids
|
||||||
|
with open(partial_path) as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
record = json.loads(line)
|
||||||
|
records.append(record)
|
||||||
|
rid = record.get(id_key)
|
||||||
|
if rid:
|
||||||
|
seen_ids.add(rid)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
break # truncated last line from crash — skip it
|
||||||
|
return records, seen_ids
|
||||||
|
|
||||||
|
|
||||||
|
def flush_partial_batch(
|
||||||
|
partial_file,
|
||||||
|
lock: threading.Lock,
|
||||||
|
batch: list[dict],
|
||||||
|
) -> None:
|
||||||
|
"""Thread-safe batch write of JSON records to the partial JSONL file.
|
||||||
|
|
||||||
|
Writes all records in one lock acquisition with a single flush.
|
||||||
|
Call with batches of ~50 records for good I/O throughput vs crash safety tradeoff.
|
||||||
|
On crash, at most one batch worth of records is lost.
|
||||||
|
"""
|
||||||
|
assert batch, "batch must not be empty"
|
||||||
|
with lock:
|
||||||
|
for record in batch:
|
||||||
|
partial_file.write(json.dumps(record, separators=(",", ":")) + "\n")
|
||||||
|
partial_file.flush()
|
||||||
|
|
||||||
|
|
||||||
def write_gzip_atomic(path: Path, data: bytes) -> int:
|
def write_gzip_atomic(path: Path, data: bytes) -> int:
|
||||||
"""Gzip compress data and write to path atomically via .tmp sibling.
|
"""Gzip compress data and write to path atomically via .tmp sibling.
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user