From 53e9bbd66bb4aae48330e3c3b82e371674796e41 Mon Sep 17 00:00:00 2001 From: Deeman Date: Sun, 22 Feb 2026 18:56:41 +0100 Subject: [PATCH] feat: restructure extraction to one file per source MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split monolithic execute.py into per-source modules with separate CLI entry points. Each extractor now uses the framework from utils.py: - SQLite state tracking (start_run / end_run per extractor) - Proper logging (replace print() with logger) - Atomic gzip writes (write_gzip_atomic) - Connection pooling (niquests.Session) - Bounded pagination (MAX_PAGES_PER_BBOX = 500) New entry points: extract — run all 4 extractors sequentially extract-overpass — OSM padel courts extract-eurostat — city demographics (etag dedup) extract-playtomic-tenants — venue listings extract-playtomic-availability — booking slots + pricing (NEW) The availability extractor reads tenant IDs from the latest tenants.json.gz, queries next-day slots for each venue, and stores daily consolidated snapshots. Supports resumability via cursor and retry with backoff. Co-Authored-By: Claude Opus 4.6 --- extract/padelnomics_extract/pyproject.toml | 8 +- .../src/padelnomics_extract/_shared.py | 69 ++++++ .../src/padelnomics_extract/all.py | 42 ++++ .../src/padelnomics_extract/eurostat.py | 102 ++++++++ .../src/padelnomics_extract/execute.py | 220 ------------------ .../src/padelnomics_extract/overpass.py | 71 ++++++ .../playtomic_availability.py | 217 +++++++++++++++++ .../padelnomics_extract/playtomic_tenants.py | 116 +++++++++ .../src/padelnomics_extract/utils.py | 1 + uv.lock | 2 +- 10 files changed, 625 insertions(+), 223 deletions(-) create mode 100644 extract/padelnomics_extract/src/padelnomics_extract/_shared.py create mode 100644 extract/padelnomics_extract/src/padelnomics_extract/all.py create mode 100644 extract/padelnomics_extract/src/padelnomics_extract/eurostat.py delete mode 100644 extract/padelnomics_extract/src/padelnomics_extract/execute.py create mode 100644 extract/padelnomics_extract/src/padelnomics_extract/overpass.py create mode 100644 extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py create mode 100644 extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py diff --git a/extract/padelnomics_extract/pyproject.toml b/extract/padelnomics_extract/pyproject.toml index b3157df..3fe56b1 100644 --- a/extract/padelnomics_extract/pyproject.toml +++ b/extract/padelnomics_extract/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "padelnomics_extract" -version = "0.1.0" +version = "0.2.0" description = "Data extraction pipelines for padelnomics" requires-python = ">=3.11" dependencies = [ @@ -9,7 +9,11 @@ dependencies = [ ] [project.scripts] -extract = "padelnomics_extract.execute:extract_dataset" +extract = "padelnomics_extract.all:main" +extract-overpass = "padelnomics_extract.overpass:main" +extract-eurostat = "padelnomics_extract.eurostat:main" +extract-playtomic-tenants = "padelnomics_extract.playtomic_tenants:main" +extract-playtomic-availability = "padelnomics_extract.playtomic_availability:main" [build-system] requires = ["hatchling"] diff --git a/extract/padelnomics_extract/src/padelnomics_extract/_shared.py b/extract/padelnomics_extract/src/padelnomics_extract/_shared.py new file mode 100644 index 0000000..38c9772 --- /dev/null +++ b/extract/padelnomics_extract/src/padelnomics_extract/_shared.py @@ -0,0 +1,69 @@ +"""Shared configuration and helpers for all extractors. + +Each source module imports from here to get LANDING_DIR, logging setup, +and the run_extractor() wrapper that handles state tracking boilerplate. +""" + +import logging +import os +import sys +from datetime import UTC, datetime +from pathlib import Path + +import niquests + +from .utils import end_run, open_state_db, start_run + +LANDING_DIR = Path(os.environ.get("LANDING_DIR", "data/landing")) + +HTTP_TIMEOUT_SECONDS = 30 +OVERPASS_TIMEOUT_SECONDS = 90 # Overpass can be slow on global queries + + +def setup_logging(name: str) -> logging.Logger: + """Configure and return a logger for the given extractor module.""" + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], + ) + return logging.getLogger(name) + + +def run_extractor( + extractor_name: str, + func, +) -> None: + """Boilerplate wrapper: open state DB, start run, call func, end run. + + func signature: func(landing_dir, year_month, conn, session) -> dict + The dict must contain: files_written, files_skipped, bytes_written. + Optional: cursor_value. + """ + LANDING_DIR.mkdir(parents=True, exist_ok=True) + conn = open_state_db(LANDING_DIR) + run_id = start_run(conn, extractor_name) + + today = datetime.now(UTC) + year_month = today.strftime("%Y/%m") + + try: + with niquests.Session() as session: + result = func(LANDING_DIR, year_month, conn, session) + + assert isinstance(result, dict), f"extractor must return a dict, got {type(result)}" + end_run( + conn, + run_id, + status="success", + files_written=result.get("files_written", 0), + files_skipped=result.get("files_skipped", 0), + bytes_written=result.get("bytes_written", 0), + cursor_value=result.get("cursor_value"), + ) + except Exception as e: + end_run(conn, run_id, status="failed", error_message=str(e)[:500]) + raise + finally: + conn.close() diff --git a/extract/padelnomics_extract/src/padelnomics_extract/all.py b/extract/padelnomics_extract/src/padelnomics_extract/all.py new file mode 100644 index 0000000..3004753 --- /dev/null +++ b/extract/padelnomics_extract/src/padelnomics_extract/all.py @@ -0,0 +1,42 @@ +"""Run all extractors sequentially. + +Entry point for the combined `uv run extract` command. +Each extractor gets its own state tracking row in .state.sqlite. +""" + +from ._shared import run_extractor, setup_logging +from .eurostat import EXTRACTOR_NAME as EUROSTAT_NAME +from .eurostat import extract as extract_eurostat +from .overpass import EXTRACTOR_NAME as OVERPASS_NAME +from .overpass import extract as extract_overpass +from .playtomic_availability import EXTRACTOR_NAME as AVAILABILITY_NAME +from .playtomic_availability import extract as extract_availability +from .playtomic_tenants import EXTRACTOR_NAME as TENANTS_NAME +from .playtomic_tenants import extract as extract_tenants + +logger = setup_logging("padelnomics.extract") + +EXTRACTORS = [ + (OVERPASS_NAME, extract_overpass), + (EUROSTAT_NAME, extract_eurostat), + (TENANTS_NAME, extract_tenants), + (AVAILABILITY_NAME, extract_availability), +] + + +def main() -> None: + """Run all extractors. Each gets its own state row.""" + logger.info("Running %d extractors", len(EXTRACTORS)) + + for i, (name, func) in enumerate(EXTRACTORS, 1): + logger.info("[%d/%d] %s", i, len(EXTRACTORS), name) + try: + run_extractor(name, func) + except Exception: + logger.exception("Extractor %s failed — continuing with next", name) + + logger.info("All extractors complete") + + +if __name__ == "__main__": + main() diff --git a/extract/padelnomics_extract/src/padelnomics_extract/eurostat.py b/extract/padelnomics_extract/src/padelnomics_extract/eurostat.py new file mode 100644 index 0000000..6dfba92 --- /dev/null +++ b/extract/padelnomics_extract/src/padelnomics_extract/eurostat.py @@ -0,0 +1,102 @@ +"""Eurostat extractor — city-level demographic datasets. + +Fetches Eurostat Statistics API JSON datasets using etag-based deduplication. +Data only changes ~twice a year so most runs skip with 304 Not Modified. + +Landing: {LANDING_DIR}/eurostat/{year}/{month}/{dataset_code}.json.gz +""" + +import sqlite3 +from pathlib import Path + +import niquests + +from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging +from .utils import landing_path, write_gzip_atomic + +logger = setup_logging("padelnomics.extract.eurostat") + +EXTRACTOR_NAME = "eurostat" +EUROSTAT_BASE_URL = "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data" + +# Datasets to fetch +DATASETS = [ + "urb_cpop1", # Urban Audit — city population + "ilc_di03", # Median equivalised net income by NUTS2 +] + + +def _etag_path(dest: Path) -> Path: + """Return the sibling .etag file path for a given dest.""" + return dest.parent / (dest.name + ".etag") + + +def _fetch_with_etag( + url: str, + dest: Path, + session: niquests.Session, +) -> int: + """GET url with If-None-Match etag. Returns bytes_written (0 if 304).""" + etag_file = _etag_path(dest) + headers: dict[str, str] = {} + + if etag_file.exists(): + headers["If-None-Match"] = etag_file.read_text().strip() + + resp = session.get(url, headers=headers, timeout=HTTP_TIMEOUT_SECONDS) + + if resp.status_code == 304: + return 0 + + resp.raise_for_status() + bytes_written = write_gzip_atomic(dest, resp.content) + + if etag := resp.headers.get("etag"): + etag_file.parent.mkdir(parents=True, exist_ok=True) + etag_file.write_text(etag) + + return bytes_written + + +def extract( + landing_dir: Path, + year_month: str, + conn: sqlite3.Connection, + session: niquests.Session, +) -> dict: + """Fetch all Eurostat datasets. Returns run metrics.""" + year, month = year_month.split("/") + files_written = 0 + files_skipped = 0 + bytes_written_total = 0 + + for dataset_code in DATASETS: + url = f"{EUROSTAT_BASE_URL}/{dataset_code}?format=JSON&lang=EN" + dest_dir = landing_path(landing_dir, "eurostat", year, month) + dest = dest_dir / f"{dataset_code}.json.gz" + + logger.info("GET %s", dataset_code) + bytes_written = _fetch_with_etag(url, dest, session) + + if bytes_written > 0: + logger.info("%s updated — %s bytes compressed", dataset_code, f"{bytes_written:,}") + files_written += 1 + bytes_written_total += bytes_written + else: + logger.info("%s not modified (304)", dataset_code) + files_skipped += 1 + + return { + "files_written": files_written, + "files_skipped": files_skipped, + "bytes_written": bytes_written_total, + "cursor_value": year_month, + } + + +def main() -> None: + run_extractor(EXTRACTOR_NAME, extract) + + +if __name__ == "__main__": + main() diff --git a/extract/padelnomics_extract/src/padelnomics_extract/execute.py b/extract/padelnomics_extract/src/padelnomics_extract/execute.py deleted file mode 100644 index 2d7b534..0000000 --- a/extract/padelnomics_extract/src/padelnomics_extract/execute.py +++ /dev/null @@ -1,220 +0,0 @@ -""" -Extraction pipelines — downloads source data into the landing zone. - -Environment: - LANDING_DIR — local path for landing zone (default: data/landing) -""" -import gzip -import json -import os -import time -from datetime import UTC, datetime -from pathlib import Path - -import niquests - -LANDING_DIR = Path(os.environ.get("LANDING_DIR", "data/landing")) - -OVERPASS_URL = "https://overpass-api.de/api/interpreter" -EUROSTAT_BASE_URL = "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data" -PLAYTOMIC_TENANTS_URL = "https://api.playtomic.io/v1/tenants" - -TIMEOUT_SECONDS = 30 -OVERPASS_TIMEOUT_SECONDS = 90 # Overpass can be slow on global queries - -# Eurostat datasets to fetch -EUROSTAT_DATASETS = [ - "urb_cpop1", # Urban Audit — city population - "ilc_di03", # Median equivalised net income by NUTS2 -] - -# Playtomic geo-search bounding boxes [min_lat, min_lon, max_lat, max_lon] -# Target markets: Spain, UK, Germany, France -PLAYTOMIC_BBOXES = [ - {"min_latitude": 35.95, "min_longitude": -9.39, "max_latitude": 43.79, "max_longitude": 4.33}, - {"min_latitude": 49.90, "min_longitude": -8.62, "max_latitude": 60.85, "max_longitude": 1.77}, - {"min_latitude": 47.27, "min_longitude": 5.87, "max_latitude": 55.06, "max_longitude": 15.04}, - {"min_latitude": 41.36, "min_longitude": -5.14, "max_latitude": 51.09, "max_longitude": 9.56}, -] - - -def _write_gz(dest: Path, data: bytes) -> None: - """Write bytes as gzip to dest, creating parent dirs as needed.""" - assert dest.suffix == ".gz", f"dest must end in .gz: {dest}" - dest.parent.mkdir(parents=True, exist_ok=True) - with gzip.open(dest, "wb") as f: - f.write(data) - - -def _etag_path(dest: Path) -> Path: - """Return the sibling .etag file path for a given dest.""" - return dest.parent / (dest.name + ".etag") - - -def extract_file(url: str, dest: Path, *, use_etag: bool = True) -> bool: - """ - GET url and write response body to dest as gzip. Returns True if new data - was fetched, False if the server returned 304 Not Modified. - """ - assert url, "url must not be empty" - - headers: dict[str, str] = {} - etag_file = _etag_path(dest) if use_etag else None - - if etag_file and etag_file.exists(): - headers["If-None-Match"] = etag_file.read_text().strip() - - resp = niquests.get(url, headers=headers, timeout=TIMEOUT_SECONDS) - - if resp.status_code == 304: - return False - - resp.raise_for_status() - _write_gz(dest, resp.content) - - if etag_file and (etag := resp.headers.get("etag")): - etag_file.parent.mkdir(parents=True, exist_ok=True) - etag_file.write_text(etag) - - return True - - -def extract_overpass(landing_dir: Path, year_month: str) -> None: - """ - POST a global OverpassQL query for padel courts (sport=padel) and write raw - OSM JSON to the landing zone. - - Landing: {landing_dir}/overpass/{year}/{month}/courts.json.gz - """ - year, month = year_month.split("/") - dest = landing_dir / "overpass" / year / month / "courts.json.gz" - - query = ( - "[out:json][timeout:60];\n" - "(\n" - ' node["sport"="padel"];\n' - ' way["sport"="padel"];\n' - ' relation["sport"="padel"];\n' - ");\n" - "out body;" - ) - - print(f" [overpass] POST {OVERPASS_URL}") - resp = niquests.post( - OVERPASS_URL, - data={"data": query}, - timeout=OVERPASS_TIMEOUT_SECONDS, - ) - resp.raise_for_status() - - size_bytes = len(resp.content) - print(f" [overpass] {size_bytes:,} bytes received") - _write_gz(dest, resp.content) - print(f" [overpass] -> {dest}") - - -def extract_eurostat(landing_dir: Path, year_month: str) -> None: - """ - Fetch Eurostat city-level demographic datasets (JSON format) and write to - the landing zone. Uses etag deduplication — data only changes ~twice a year. - - Landing: {landing_dir}/eurostat/{year}/{month}/{dataset_code}.json.gz - """ - year, month = year_month.split("/") - - for dataset_code in EUROSTAT_DATASETS: - url = f"{EUROSTAT_BASE_URL}/{dataset_code}?format=JSON&lang=EN" - dest = landing_dir / "eurostat" / year / month / f"{dataset_code}.json.gz" - - print(f" [eurostat] GET {dataset_code}") - fetched = extract_file(url, dest, use_etag=True) - - if fetched: - size_bytes = dest.stat().st_size - print(f" [eurostat] {dataset_code} updated -> {dest} ({size_bytes:,} bytes compressed)") - else: - print(f" [eurostat] {dataset_code} not modified (304)") - - -def extract_playtomic_tenants(landing_dir: Path, year_month: str) -> None: - """ - Fetch Playtomic venue listings via the unauthenticated tenant search endpoint. - Iterates over target-market bounding boxes with pagination, deduplicates on - tenant_id, and writes a single consolidated JSON to the landing zone. - - Rate: 1 req / 2 s as documented in the data-sources inventory. - - Landing: {landing_dir}/playtomic/{year}/{month}/tenants.json.gz - """ - year, month = year_month.split("/") - dest = landing_dir / "playtomic" / year / month / "tenants.json.gz" - - all_tenants: list[dict] = [] - seen_ids: set[str] = set() - page_size = 20 - - for bbox in PLAYTOMIC_BBOXES: - page = 0 - while True: - params = { - "sport_ids": "PADEL", - "min_latitude": bbox["min_latitude"], - "min_longitude": bbox["min_longitude"], - "max_latitude": bbox["max_latitude"], - "max_longitude": bbox["max_longitude"], - "offset": page * page_size, - "size": page_size, - } - - print( - f" [playtomic] GET page={page} " - f"bbox=({bbox['min_latitude']:.1f},{bbox['min_longitude']:.1f}," - f"{bbox['max_latitude']:.1f},{bbox['max_longitude']:.1f})" - ) - - resp = niquests.get(PLAYTOMIC_TENANTS_URL, params=params, timeout=TIMEOUT_SECONDS) - resp.raise_for_status() - - tenants = resp.json() - assert isinstance(tenants, list), ( - f"Expected list from Playtomic API, got {type(tenants)}" - ) - - new_count = 0 - for tenant in tenants: - tid = tenant.get("tenant_id") or tenant.get("id") - if tid and tid not in seen_ids: - seen_ids.add(tid) - all_tenants.append(tenant) - new_count += 1 - - print(f" [playtomic] page={page} got={len(tenants)} new={new_count} total={len(all_tenants)}") - - if len(tenants) < page_size: - break - - page += 1 - time.sleep(2) # throttle - - payload = json.dumps({"tenants": all_tenants, "count": len(all_tenants)}).encode() - _write_gz(dest, payload) - print(f" [playtomic] {len(all_tenants)} unique venues -> {dest}") - - -def extract_dataset() -> None: - """Entry point: run all extractors sequentially.""" - today = datetime.now(UTC) - year_month = today.strftime("%Y/%m") - - print(f"extract_dataset start: landing_dir={LANDING_DIR} period={year_month}") - - print("\n[1/3] Overpass API — padel courts (OSM)") - extract_overpass(LANDING_DIR, year_month) - - print("\n[2/3] Eurostat — city demographics") - extract_eurostat(LANDING_DIR, year_month) - - print("\n[3/3] Playtomic — venue listings (unauthenticated)") - extract_playtomic_tenants(LANDING_DIR, year_month) - - print("\nextract_dataset: done") diff --git a/extract/padelnomics_extract/src/padelnomics_extract/overpass.py b/extract/padelnomics_extract/src/padelnomics_extract/overpass.py new file mode 100644 index 0000000..950dcdc --- /dev/null +++ b/extract/padelnomics_extract/src/padelnomics_extract/overpass.py @@ -0,0 +1,71 @@ +"""Overpass API extractor — global padel court locations from OpenStreetMap. + +Queries the Overpass API for all nodes/ways/relations tagged sport=padel +and writes the raw OSM JSON to the landing zone. + +Landing: {LANDING_DIR}/overpass/{year}/{month}/courts.json.gz +""" + +import sqlite3 +from pathlib import Path + +import niquests + +from ._shared import OVERPASS_TIMEOUT_SECONDS, run_extractor, setup_logging +from .utils import landing_path, write_gzip_atomic + +logger = setup_logging("padelnomics.extract.overpass") + +EXTRACTOR_NAME = "overpass" +OVERPASS_URL = "https://overpass-api.de/api/interpreter" + +OVERPASS_QUERY = ( + "[out:json][timeout:60];\n" + "(\n" + ' node["sport"="padel"];\n' + ' way["sport"="padel"];\n' + ' relation["sport"="padel"];\n' + ");\n" + "out body;" +) + + +def extract( + landing_dir: Path, + year_month: str, + conn: sqlite3.Connection, + session: niquests.Session, +) -> dict: + """POST OverpassQL query and write raw OSM JSON. Returns run metrics.""" + year, month = year_month.split("/") + dest_dir = landing_path(landing_dir, "overpass", year, month) + dest = dest_dir / "courts.json.gz" + + logger.info("POST %s", OVERPASS_URL) + resp = session.post( + OVERPASS_URL, + data={"data": OVERPASS_QUERY}, + timeout=OVERPASS_TIMEOUT_SECONDS, + ) + resp.raise_for_status() + + size_bytes = len(resp.content) + logger.info("%s bytes received", f"{size_bytes:,}") + + bytes_written = write_gzip_atomic(dest, resp.content) + logger.info("wrote %s (%s bytes compressed)", dest, f"{bytes_written:,}") + + return { + "files_written": 1, + "files_skipped": 0, + "bytes_written": bytes_written, + "cursor_value": year_month, + } + + +def main() -> None: + run_extractor(EXTRACTOR_NAME, extract) + + +if __name__ == "__main__": + main() diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py new file mode 100644 index 0000000..7f4127f --- /dev/null +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py @@ -0,0 +1,217 @@ +"""Playtomic availability extractor — booking slot data for market intelligence. + +Reads tenant IDs from the latest tenants.json.gz, then queries the +unauthenticated /v1/availability endpoint for each venue's next-day slots. +This is the highest-value source: daily snapshots enable occupancy rate +estimation, pricing benchmarking, and demand signal detection. + +API constraint: max 25-hour window per request (see docs/data-sources-inventory.md §2.1). +Rate: 1 req / 2 s (conservative, unauthenticated endpoint). + +Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.json.gz +""" + +import gzip +import json +import sqlite3 +import time +from datetime import UTC, datetime, timedelta +from pathlib import Path + +import niquests + +from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging +from .utils import get_last_cursor, landing_path, write_gzip_atomic + +logger = setup_logging("padelnomics.extract.playtomic_availability") + +EXTRACTOR_NAME = "playtomic_availability" +AVAILABILITY_URL = "https://api.playtomic.io/v1/availability" + +THROTTLE_SECONDS = 2 +MAX_VENUES_PER_RUN = 10_000 +MAX_RETRIES_PER_VENUE = 2 + + +def _load_tenant_ids(landing_dir: Path) -> list[str]: + """Read tenant IDs from the most recent tenants.json.gz file.""" + playtomic_dir = landing_dir / "playtomic" + if not playtomic_dir.exists(): + return [] + + # Find the most recent tenants.json.gz across all year/month dirs + tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True) + if not tenant_files: + return [] + + latest = tenant_files[0] + logger.info("Loading tenant IDs from %s", latest) + + with gzip.open(latest, "rb") as f: + data = json.loads(f.read()) + + tenants = data.get("tenants", []) + ids = [] + for t in tenants: + tid = t.get("tenant_id") or t.get("id") + if tid: + ids.append(tid) + + logger.info("Loaded %d tenant IDs", len(ids)) + return ids + + +def _parse_resume_cursor(cursor: str | None, target_date: str) -> int: + """Parse cursor_value to find resume index. Returns 0 if no valid cursor.""" + if not cursor: + return 0 + # cursor format: "{date}:{index}" + parts = cursor.split(":", 1) + if len(parts) != 2: + return 0 + cursor_date, cursor_index = parts + # Only resume if cursor is for today's target date + if cursor_date != target_date: + return 0 + try: + return int(cursor_index) + except ValueError: + return 0 + + +def extract( + landing_dir: Path, + year_month: str, + conn: sqlite3.Connection, + session: niquests.Session, +) -> dict: + """Fetch next-day availability for all known Playtomic venues.""" + tenant_ids = _load_tenant_ids(landing_dir) + if not tenant_ids: + logger.warning("No tenant IDs found — run extract-playtomic-tenants first") + return {"files_written": 0, "files_skipped": 0, "bytes_written": 0} + + # Query tomorrow's slots (25-hour window starting at midnight local) + tomorrow = datetime.now(UTC) + timedelta(days=1) + target_date = tomorrow.strftime("%Y-%m-%d") + start_min = tomorrow.replace(hour=0, minute=0, second=0, microsecond=0) + start_max = start_min + timedelta(hours=24) + + year, month = year_month.split("/") + dest_dir = landing_path(landing_dir, "playtomic", year, month) + dest = dest_dir / f"availability_{target_date}.json.gz" + + # Check if already completed for this date + if dest.exists(): + logger.info("Already have %s — skipping", dest) + return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} + + # Resume from last cursor if we crashed mid-run + last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) + resume_index = _parse_resume_cursor(last_cursor, target_date) + if resume_index > 0: + logger.info("Resuming from index %d (cursor: %s)", resume_index, last_cursor) + + venues_data: list[dict] = [] + venues_to_process = tenant_ids[:MAX_VENUES_PER_RUN] + venues_errored = 0 + + for i, tenant_id in enumerate(venues_to_process): + if i < resume_index: + continue + + params = { + "sport_id": "PADEL", + "tenant_id": tenant_id, + "start_min": start_min.strftime("%Y-%m-%dT%H:%M:%S"), + "start_max": start_max.strftime("%Y-%m-%dT%H:%M:%S"), + } + + for attempt in range(MAX_RETRIES_PER_VENUE + 1): + try: + resp = session.get(AVAILABILITY_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS) + + if resp.status_code == 429: + # Rate limited — back off and retry + wait_seconds = THROTTLE_SECONDS * (attempt + 2) + logger.warning("Rate limited on %s, waiting %ds", tenant_id, wait_seconds) + time.sleep(wait_seconds) + continue + + if resp.status_code >= 500: + logger.warning( + "Server error %d for %s (attempt %d)", + resp.status_code, + tenant_id, + attempt + 1, + ) + time.sleep(THROTTLE_SECONDS) + continue + + resp.raise_for_status() + venues_data.append({"tenant_id": tenant_id, "slots": resp.json()}) + break + + except niquests.exceptions.RequestException as e: + if attempt < MAX_RETRIES_PER_VENUE: + logger.warning( + "Request failed for %s (attempt %d): %s", tenant_id, attempt + 1, e + ) + time.sleep(THROTTLE_SECONDS) + else: + logger.error( + "Giving up on %s after %d attempts: %s", + tenant_id, + MAX_RETRIES_PER_VENUE + 1, + e, + ) + venues_errored += 1 + else: + # All retries exhausted (loop completed without break) + venues_errored += 1 + + if (i + 1) % 100 == 0: + logger.info( + "Progress: %d/%d venues queried, %d errors", + i + 1, + len(venues_to_process), + venues_errored, + ) + + time.sleep(THROTTLE_SECONDS) + + # Write consolidated file + captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") + payload = json.dumps( + { + "date": target_date, + "captured_at_utc": captured_at, + "venue_count": len(venues_data), + "venues_errored": venues_errored, + "venues": venues_data, + } + ).encode() + + bytes_written = write_gzip_atomic(dest, payload) + logger.info( + "%d venues scraped (%d errors) -> %s (%s bytes)", + len(venues_data), + venues_errored, + dest, + f"{bytes_written:,}", + ) + + return { + "files_written": 1, + "files_skipped": 0, + "bytes_written": bytes_written, + "cursor_value": f"{target_date}:{len(venues_to_process)}", + } + + +def main() -> None: + run_extractor(EXTRACTOR_NAME, extract) + + +if __name__ == "__main__": + main() diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py new file mode 100644 index 0000000..b73a2df --- /dev/null +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py @@ -0,0 +1,116 @@ +"""Playtomic tenants extractor — venue listings via unauthenticated API. + +Iterates over target-market bounding boxes with pagination, deduplicates +on tenant_id, and writes a single consolidated JSON to the landing zone. + +Rate: 1 req / 2 s (see docs/data-sources-inventory.md §1.2). + +Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.json.gz +""" + +import json +import sqlite3 +import time +from pathlib import Path + +import niquests + +from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging +from .utils import landing_path, write_gzip_atomic + +logger = setup_logging("padelnomics.extract.playtomic_tenants") + +EXTRACTOR_NAME = "playtomic_tenants" +PLAYTOMIC_TENANTS_URL = "https://api.playtomic.io/v1/tenants" + +THROTTLE_SECONDS = 2 +PAGE_SIZE = 20 +MAX_PAGES_PER_BBOX = 500 # safety bound — prevents infinite pagination + +# Target markets: Spain, UK/Ireland, Germany, France +BBOXES = [ + {"min_latitude": 35.95, "min_longitude": -9.39, "max_latitude": 43.79, "max_longitude": 4.33}, + {"min_latitude": 49.90, "min_longitude": -8.62, "max_latitude": 60.85, "max_longitude": 1.77}, + {"min_latitude": 47.27, "min_longitude": 5.87, "max_latitude": 55.06, "max_longitude": 15.04}, + {"min_latitude": 41.36, "min_longitude": -5.14, "max_latitude": 51.09, "max_longitude": 9.56}, +] + + +def extract( + landing_dir: Path, + year_month: str, + conn: sqlite3.Connection, + session: niquests.Session, +) -> dict: + """Fetch all Playtomic venues across target markets. Returns run metrics.""" + year, month = year_month.split("/") + dest_dir = landing_path(landing_dir, "playtomic", year, month) + dest = dest_dir / "tenants.json.gz" + + all_tenants: list[dict] = [] + seen_ids: set[str] = set() + + for bbox in BBOXES: + for page in range(MAX_PAGES_PER_BBOX): + params = { + "sport_ids": "PADEL", + "min_latitude": bbox["min_latitude"], + "min_longitude": bbox["min_longitude"], + "max_latitude": bbox["max_latitude"], + "max_longitude": bbox["max_longitude"], + "offset": page * PAGE_SIZE, + "size": PAGE_SIZE, + } + + logger.info( + "GET page=%d bbox=(%.1f,%.1f,%.1f,%.1f)", + page, + bbox["min_latitude"], + bbox["min_longitude"], + bbox["max_latitude"], + bbox["max_longitude"], + ) + + resp = session.get(PLAYTOMIC_TENANTS_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS) + resp.raise_for_status() + + tenants = resp.json() + assert isinstance(tenants, list), ( + f"Expected list from Playtomic API, got {type(tenants)}" + ) + + new_count = 0 + for tenant in tenants: + tid = tenant.get("tenant_id") or tenant.get("id") + if tid and tid not in seen_ids: + seen_ids.add(tid) + all_tenants.append(tenant) + new_count += 1 + + logger.info( + "page=%d got=%d new=%d total=%d", page, len(tenants), new_count, len(all_tenants) + ) + + if len(tenants) < PAGE_SIZE: + break + + time.sleep(THROTTLE_SECONDS) + + payload = json.dumps({"tenants": all_tenants, "count": len(all_tenants)}).encode() + bytes_written = write_gzip_atomic(dest, payload) + logger.info("%d unique venues -> %s", len(all_tenants), dest) + + return { + "files_written": 1, + "files_skipped": 0, + "bytes_written": bytes_written, + "cursor_value": year_month, + } + + +def main() -> None: + run_extractor(EXTRACTOR_NAME, extract) + + +if __name__ == "__main__": + main() diff --git a/extract/padelnomics_extract/src/padelnomics_extract/utils.py b/extract/padelnomics_extract/src/padelnomics_extract/utils.py index 42a9c44..3cb2562 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/utils.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/utils.py @@ -103,6 +103,7 @@ def get_last_cursor(conn: sqlite3.Connection, extractor: str) -> str | None: # File I/O helpers # --------------------------------------------------------------------------- + def landing_path(landing_dir: str | Path, *parts: str) -> Path: """Return path to a subdirectory of landing_dir, creating it if absent.""" path = Path(landing_dir).joinpath(*parts) diff --git a/uv.lock b/uv.lock index 4bc456d..ee0ef11 100644 --- a/uv.lock +++ b/uv.lock @@ -1180,7 +1180,7 @@ requires-dist = [ [[package]] name = "padelnomics-extract" -version = "0.1.0" +version = "0.2.0" source = { editable = "extract/padelnomics_extract" } dependencies = [ { name = "niquests" },