feat: restructure extraction to one file per source

Split monolithic execute.py into per-source modules with separate CLI
entry points. Each extractor now uses the framework from utils.py:
- SQLite state tracking (start_run / end_run per extractor)
- Proper logging (replace print() with logger)
- Atomic gzip writes (write_gzip_atomic)
- Connection pooling (niquests.Session)
- Bounded pagination (MAX_PAGES_PER_BBOX = 500)

New entry points:
  extract              — run all 4 extractors sequentially
  extract-overpass     — OSM padel courts
  extract-eurostat     — city demographics (etag dedup)
  extract-playtomic-tenants      — venue listings
  extract-playtomic-availability — booking slots + pricing (NEW)

The availability extractor reads tenant IDs from the latest tenants.json.gz,
queries next-day slots for each venue, and stores daily consolidated snapshots.
Supports resumability via cursor and retry with backoff.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-22 18:56:41 +01:00
parent ea86940b78
commit 53e9bbd66b
10 changed files with 625 additions and 223 deletions

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "padelnomics_extract" name = "padelnomics_extract"
version = "0.1.0" version = "0.2.0"
description = "Data extraction pipelines for padelnomics" description = "Data extraction pipelines for padelnomics"
requires-python = ">=3.11" requires-python = ">=3.11"
dependencies = [ dependencies = [
@@ -9,7 +9,11 @@ dependencies = [
] ]
[project.scripts] [project.scripts]
extract = "padelnomics_extract.execute:extract_dataset" extract = "padelnomics_extract.all:main"
extract-overpass = "padelnomics_extract.overpass:main"
extract-eurostat = "padelnomics_extract.eurostat:main"
extract-playtomic-tenants = "padelnomics_extract.playtomic_tenants:main"
extract-playtomic-availability = "padelnomics_extract.playtomic_availability:main"
[build-system] [build-system]
requires = ["hatchling"] requires = ["hatchling"]

View File

@@ -0,0 +1,69 @@
"""Shared configuration and helpers for all extractors.
Each source module imports from here to get LANDING_DIR, logging setup,
and the run_extractor() wrapper that handles state tracking boilerplate.
"""
import logging
import os
import sys
from datetime import UTC, datetime
from pathlib import Path
import niquests
from .utils import end_run, open_state_db, start_run
LANDING_DIR = Path(os.environ.get("LANDING_DIR", "data/landing"))
HTTP_TIMEOUT_SECONDS = 30
OVERPASS_TIMEOUT_SECONDS = 90 # Overpass can be slow on global queries
def setup_logging(name: str) -> logging.Logger:
"""Configure and return a logger for the given extractor module."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
return logging.getLogger(name)
def run_extractor(
extractor_name: str,
func,
) -> None:
"""Boilerplate wrapper: open state DB, start run, call func, end run.
func signature: func(landing_dir, year_month, conn, session) -> dict
The dict must contain: files_written, files_skipped, bytes_written.
Optional: cursor_value.
"""
LANDING_DIR.mkdir(parents=True, exist_ok=True)
conn = open_state_db(LANDING_DIR)
run_id = start_run(conn, extractor_name)
today = datetime.now(UTC)
year_month = today.strftime("%Y/%m")
try:
with niquests.Session() as session:
result = func(LANDING_DIR, year_month, conn, session)
assert isinstance(result, dict), f"extractor must return a dict, got {type(result)}"
end_run(
conn,
run_id,
status="success",
files_written=result.get("files_written", 0),
files_skipped=result.get("files_skipped", 0),
bytes_written=result.get("bytes_written", 0),
cursor_value=result.get("cursor_value"),
)
except Exception as e:
end_run(conn, run_id, status="failed", error_message=str(e)[:500])
raise
finally:
conn.close()

View File

@@ -0,0 +1,42 @@
"""Run all extractors sequentially.
Entry point for the combined `uv run extract` command.
Each extractor gets its own state tracking row in .state.sqlite.
"""
from ._shared import run_extractor, setup_logging
from .eurostat import EXTRACTOR_NAME as EUROSTAT_NAME
from .eurostat import extract as extract_eurostat
from .overpass import EXTRACTOR_NAME as OVERPASS_NAME
from .overpass import extract as extract_overpass
from .playtomic_availability import EXTRACTOR_NAME as AVAILABILITY_NAME
from .playtomic_availability import extract as extract_availability
from .playtomic_tenants import EXTRACTOR_NAME as TENANTS_NAME
from .playtomic_tenants import extract as extract_tenants
logger = setup_logging("padelnomics.extract")
EXTRACTORS = [
(OVERPASS_NAME, extract_overpass),
(EUROSTAT_NAME, extract_eurostat),
(TENANTS_NAME, extract_tenants),
(AVAILABILITY_NAME, extract_availability),
]
def main() -> None:
"""Run all extractors. Each gets its own state row."""
logger.info("Running %d extractors", len(EXTRACTORS))
for i, (name, func) in enumerate(EXTRACTORS, 1):
logger.info("[%d/%d] %s", i, len(EXTRACTORS), name)
try:
run_extractor(name, func)
except Exception:
logger.exception("Extractor %s failed — continuing with next", name)
logger.info("All extractors complete")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,102 @@
"""Eurostat extractor — city-level demographic datasets.
Fetches Eurostat Statistics API JSON datasets using etag-based deduplication.
Data only changes ~twice a year so most runs skip with 304 Not Modified.
Landing: {LANDING_DIR}/eurostat/{year}/{month}/{dataset_code}.json.gz
"""
import sqlite3
from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import landing_path, write_gzip_atomic
logger = setup_logging("padelnomics.extract.eurostat")
EXTRACTOR_NAME = "eurostat"
EUROSTAT_BASE_URL = "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data"
# Datasets to fetch
DATASETS = [
"urb_cpop1", # Urban Audit — city population
"ilc_di03", # Median equivalised net income by NUTS2
]
def _etag_path(dest: Path) -> Path:
"""Return the sibling .etag file path for a given dest."""
return dest.parent / (dest.name + ".etag")
def _fetch_with_etag(
url: str,
dest: Path,
session: niquests.Session,
) -> int:
"""GET url with If-None-Match etag. Returns bytes_written (0 if 304)."""
etag_file = _etag_path(dest)
headers: dict[str, str] = {}
if etag_file.exists():
headers["If-None-Match"] = etag_file.read_text().strip()
resp = session.get(url, headers=headers, timeout=HTTP_TIMEOUT_SECONDS)
if resp.status_code == 304:
return 0
resp.raise_for_status()
bytes_written = write_gzip_atomic(dest, resp.content)
if etag := resp.headers.get("etag"):
etag_file.parent.mkdir(parents=True, exist_ok=True)
etag_file.write_text(etag)
return bytes_written
def extract(
landing_dir: Path,
year_month: str,
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Fetch all Eurostat datasets. Returns run metrics."""
year, month = year_month.split("/")
files_written = 0
files_skipped = 0
bytes_written_total = 0
for dataset_code in DATASETS:
url = f"{EUROSTAT_BASE_URL}/{dataset_code}?format=JSON&lang=EN"
dest_dir = landing_path(landing_dir, "eurostat", year, month)
dest = dest_dir / f"{dataset_code}.json.gz"
logger.info("GET %s", dataset_code)
bytes_written = _fetch_with_etag(url, dest, session)
if bytes_written > 0:
logger.info("%s updated — %s bytes compressed", dataset_code, f"{bytes_written:,}")
files_written += 1
bytes_written_total += bytes_written
else:
logger.info("%s not modified (304)", dataset_code)
files_skipped += 1
return {
"files_written": files_written,
"files_skipped": files_skipped,
"bytes_written": bytes_written_total,
"cursor_value": year_month,
}
def main() -> None:
run_extractor(EXTRACTOR_NAME, extract)
if __name__ == "__main__":
main()

View File

@@ -1,220 +0,0 @@
"""
Extraction pipelines — downloads source data into the landing zone.
Environment:
LANDING_DIR — local path for landing zone (default: data/landing)
"""
import gzip
import json
import os
import time
from datetime import UTC, datetime
from pathlib import Path
import niquests
LANDING_DIR = Path(os.environ.get("LANDING_DIR", "data/landing"))
OVERPASS_URL = "https://overpass-api.de/api/interpreter"
EUROSTAT_BASE_URL = "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data"
PLAYTOMIC_TENANTS_URL = "https://api.playtomic.io/v1/tenants"
TIMEOUT_SECONDS = 30
OVERPASS_TIMEOUT_SECONDS = 90 # Overpass can be slow on global queries
# Eurostat datasets to fetch
EUROSTAT_DATASETS = [
"urb_cpop1", # Urban Audit — city population
"ilc_di03", # Median equivalised net income by NUTS2
]
# Playtomic geo-search bounding boxes [min_lat, min_lon, max_lat, max_lon]
# Target markets: Spain, UK, Germany, France
PLAYTOMIC_BBOXES = [
{"min_latitude": 35.95, "min_longitude": -9.39, "max_latitude": 43.79, "max_longitude": 4.33},
{"min_latitude": 49.90, "min_longitude": -8.62, "max_latitude": 60.85, "max_longitude": 1.77},
{"min_latitude": 47.27, "min_longitude": 5.87, "max_latitude": 55.06, "max_longitude": 15.04},
{"min_latitude": 41.36, "min_longitude": -5.14, "max_latitude": 51.09, "max_longitude": 9.56},
]
def _write_gz(dest: Path, data: bytes) -> None:
"""Write bytes as gzip to dest, creating parent dirs as needed."""
assert dest.suffix == ".gz", f"dest must end in .gz: {dest}"
dest.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(dest, "wb") as f:
f.write(data)
def _etag_path(dest: Path) -> Path:
"""Return the sibling .etag file path for a given dest."""
return dest.parent / (dest.name + ".etag")
def extract_file(url: str, dest: Path, *, use_etag: bool = True) -> bool:
"""
GET url and write response body to dest as gzip. Returns True if new data
was fetched, False if the server returned 304 Not Modified.
"""
assert url, "url must not be empty"
headers: dict[str, str] = {}
etag_file = _etag_path(dest) if use_etag else None
if etag_file and etag_file.exists():
headers["If-None-Match"] = etag_file.read_text().strip()
resp = niquests.get(url, headers=headers, timeout=TIMEOUT_SECONDS)
if resp.status_code == 304:
return False
resp.raise_for_status()
_write_gz(dest, resp.content)
if etag_file and (etag := resp.headers.get("etag")):
etag_file.parent.mkdir(parents=True, exist_ok=True)
etag_file.write_text(etag)
return True
def extract_overpass(landing_dir: Path, year_month: str) -> None:
"""
POST a global OverpassQL query for padel courts (sport=padel) and write raw
OSM JSON to the landing zone.
Landing: {landing_dir}/overpass/{year}/{month}/courts.json.gz
"""
year, month = year_month.split("/")
dest = landing_dir / "overpass" / year / month / "courts.json.gz"
query = (
"[out:json][timeout:60];\n"
"(\n"
' node["sport"="padel"];\n'
' way["sport"="padel"];\n'
' relation["sport"="padel"];\n'
");\n"
"out body;"
)
print(f" [overpass] POST {OVERPASS_URL}")
resp = niquests.post(
OVERPASS_URL,
data={"data": query},
timeout=OVERPASS_TIMEOUT_SECONDS,
)
resp.raise_for_status()
size_bytes = len(resp.content)
print(f" [overpass] {size_bytes:,} bytes received")
_write_gz(dest, resp.content)
print(f" [overpass] -> {dest}")
def extract_eurostat(landing_dir: Path, year_month: str) -> None:
"""
Fetch Eurostat city-level demographic datasets (JSON format) and write to
the landing zone. Uses etag deduplication — data only changes ~twice a year.
Landing: {landing_dir}/eurostat/{year}/{month}/{dataset_code}.json.gz
"""
year, month = year_month.split("/")
for dataset_code in EUROSTAT_DATASETS:
url = f"{EUROSTAT_BASE_URL}/{dataset_code}?format=JSON&lang=EN"
dest = landing_dir / "eurostat" / year / month / f"{dataset_code}.json.gz"
print(f" [eurostat] GET {dataset_code}")
fetched = extract_file(url, dest, use_etag=True)
if fetched:
size_bytes = dest.stat().st_size
print(f" [eurostat] {dataset_code} updated -> {dest} ({size_bytes:,} bytes compressed)")
else:
print(f" [eurostat] {dataset_code} not modified (304)")
def extract_playtomic_tenants(landing_dir: Path, year_month: str) -> None:
"""
Fetch Playtomic venue listings via the unauthenticated tenant search endpoint.
Iterates over target-market bounding boxes with pagination, deduplicates on
tenant_id, and writes a single consolidated JSON to the landing zone.
Rate: 1 req / 2 s as documented in the data-sources inventory.
Landing: {landing_dir}/playtomic/{year}/{month}/tenants.json.gz
"""
year, month = year_month.split("/")
dest = landing_dir / "playtomic" / year / month / "tenants.json.gz"
all_tenants: list[dict] = []
seen_ids: set[str] = set()
page_size = 20
for bbox in PLAYTOMIC_BBOXES:
page = 0
while True:
params = {
"sport_ids": "PADEL",
"min_latitude": bbox["min_latitude"],
"min_longitude": bbox["min_longitude"],
"max_latitude": bbox["max_latitude"],
"max_longitude": bbox["max_longitude"],
"offset": page * page_size,
"size": page_size,
}
print(
f" [playtomic] GET page={page} "
f"bbox=({bbox['min_latitude']:.1f},{bbox['min_longitude']:.1f},"
f"{bbox['max_latitude']:.1f},{bbox['max_longitude']:.1f})"
)
resp = niquests.get(PLAYTOMIC_TENANTS_URL, params=params, timeout=TIMEOUT_SECONDS)
resp.raise_for_status()
tenants = resp.json()
assert isinstance(tenants, list), (
f"Expected list from Playtomic API, got {type(tenants)}"
)
new_count = 0
for tenant in tenants:
tid = tenant.get("tenant_id") or tenant.get("id")
if tid and tid not in seen_ids:
seen_ids.add(tid)
all_tenants.append(tenant)
new_count += 1
print(f" [playtomic] page={page} got={len(tenants)} new={new_count} total={len(all_tenants)}")
if len(tenants) < page_size:
break
page += 1
time.sleep(2) # throttle
payload = json.dumps({"tenants": all_tenants, "count": len(all_tenants)}).encode()
_write_gz(dest, payload)
print(f" [playtomic] {len(all_tenants)} unique venues -> {dest}")
def extract_dataset() -> None:
"""Entry point: run all extractors sequentially."""
today = datetime.now(UTC)
year_month = today.strftime("%Y/%m")
print(f"extract_dataset start: landing_dir={LANDING_DIR} period={year_month}")
print("\n[1/3] Overpass API — padel courts (OSM)")
extract_overpass(LANDING_DIR, year_month)
print("\n[2/3] Eurostat — city demographics")
extract_eurostat(LANDING_DIR, year_month)
print("\n[3/3] Playtomic — venue listings (unauthenticated)")
extract_playtomic_tenants(LANDING_DIR, year_month)
print("\nextract_dataset: done")

View File

@@ -0,0 +1,71 @@
"""Overpass API extractor — global padel court locations from OpenStreetMap.
Queries the Overpass API for all nodes/ways/relations tagged sport=padel
and writes the raw OSM JSON to the landing zone.
Landing: {LANDING_DIR}/overpass/{year}/{month}/courts.json.gz
"""
import sqlite3
from pathlib import Path
import niquests
from ._shared import OVERPASS_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import landing_path, write_gzip_atomic
logger = setup_logging("padelnomics.extract.overpass")
EXTRACTOR_NAME = "overpass"
OVERPASS_URL = "https://overpass-api.de/api/interpreter"
OVERPASS_QUERY = (
"[out:json][timeout:60];\n"
"(\n"
' node["sport"="padel"];\n'
' way["sport"="padel"];\n'
' relation["sport"="padel"];\n'
");\n"
"out body;"
)
def extract(
landing_dir: Path,
year_month: str,
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""POST OverpassQL query and write raw OSM JSON. Returns run metrics."""
year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "overpass", year, month)
dest = dest_dir / "courts.json.gz"
logger.info("POST %s", OVERPASS_URL)
resp = session.post(
OVERPASS_URL,
data={"data": OVERPASS_QUERY},
timeout=OVERPASS_TIMEOUT_SECONDS,
)
resp.raise_for_status()
size_bytes = len(resp.content)
logger.info("%s bytes received", f"{size_bytes:,}")
bytes_written = write_gzip_atomic(dest, resp.content)
logger.info("wrote %s (%s bytes compressed)", dest, f"{bytes_written:,}")
return {
"files_written": 1,
"files_skipped": 0,
"bytes_written": bytes_written,
"cursor_value": year_month,
}
def main() -> None:
run_extractor(EXTRACTOR_NAME, extract)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,217 @@
"""Playtomic availability extractor — booking slot data for market intelligence.
Reads tenant IDs from the latest tenants.json.gz, then queries the
unauthenticated /v1/availability endpoint for each venue's next-day slots.
This is the highest-value source: daily snapshots enable occupancy rate
estimation, pricing benchmarking, and demand signal detection.
API constraint: max 25-hour window per request (see docs/data-sources-inventory.md §2.1).
Rate: 1 req / 2 s (conservative, unauthenticated endpoint).
Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.json.gz
"""
import gzip
import json
import sqlite3
import time
from datetime import UTC, datetime, timedelta
from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import get_last_cursor, landing_path, write_gzip_atomic
logger = setup_logging("padelnomics.extract.playtomic_availability")
EXTRACTOR_NAME = "playtomic_availability"
AVAILABILITY_URL = "https://api.playtomic.io/v1/availability"
THROTTLE_SECONDS = 2
MAX_VENUES_PER_RUN = 10_000
MAX_RETRIES_PER_VENUE = 2
def _load_tenant_ids(landing_dir: Path) -> list[str]:
"""Read tenant IDs from the most recent tenants.json.gz file."""
playtomic_dir = landing_dir / "playtomic"
if not playtomic_dir.exists():
return []
# Find the most recent tenants.json.gz across all year/month dirs
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True)
if not tenant_files:
return []
latest = tenant_files[0]
logger.info("Loading tenant IDs from %s", latest)
with gzip.open(latest, "rb") as f:
data = json.loads(f.read())
tenants = data.get("tenants", [])
ids = []
for t in tenants:
tid = t.get("tenant_id") or t.get("id")
if tid:
ids.append(tid)
logger.info("Loaded %d tenant IDs", len(ids))
return ids
def _parse_resume_cursor(cursor: str | None, target_date: str) -> int:
"""Parse cursor_value to find resume index. Returns 0 if no valid cursor."""
if not cursor:
return 0
# cursor format: "{date}:{index}"
parts = cursor.split(":", 1)
if len(parts) != 2:
return 0
cursor_date, cursor_index = parts
# Only resume if cursor is for today's target date
if cursor_date != target_date:
return 0
try:
return int(cursor_index)
except ValueError:
return 0
def extract(
landing_dir: Path,
year_month: str,
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Fetch next-day availability for all known Playtomic venues."""
tenant_ids = _load_tenant_ids(landing_dir)
if not tenant_ids:
logger.warning("No tenant IDs found — run extract-playtomic-tenants first")
return {"files_written": 0, "files_skipped": 0, "bytes_written": 0}
# Query tomorrow's slots (25-hour window starting at midnight local)
tomorrow = datetime.now(UTC) + timedelta(days=1)
target_date = tomorrow.strftime("%Y-%m-%d")
start_min = tomorrow.replace(hour=0, minute=0, second=0, microsecond=0)
start_max = start_min + timedelta(hours=24)
year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "playtomic", year, month)
dest = dest_dir / f"availability_{target_date}.json.gz"
# Check if already completed for this date
if dest.exists():
logger.info("Already have %s — skipping", dest)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
# Resume from last cursor if we crashed mid-run
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
resume_index = _parse_resume_cursor(last_cursor, target_date)
if resume_index > 0:
logger.info("Resuming from index %d (cursor: %s)", resume_index, last_cursor)
venues_data: list[dict] = []
venues_to_process = tenant_ids[:MAX_VENUES_PER_RUN]
venues_errored = 0
for i, tenant_id in enumerate(venues_to_process):
if i < resume_index:
continue
params = {
"sport_id": "PADEL",
"tenant_id": tenant_id,
"start_min": start_min.strftime("%Y-%m-%dT%H:%M:%S"),
"start_max": start_max.strftime("%Y-%m-%dT%H:%M:%S"),
}
for attempt in range(MAX_RETRIES_PER_VENUE + 1):
try:
resp = session.get(AVAILABILITY_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS)
if resp.status_code == 429:
# Rate limited — back off and retry
wait_seconds = THROTTLE_SECONDS * (attempt + 2)
logger.warning("Rate limited on %s, waiting %ds", tenant_id, wait_seconds)
time.sleep(wait_seconds)
continue
if resp.status_code >= 500:
logger.warning(
"Server error %d for %s (attempt %d)",
resp.status_code,
tenant_id,
attempt + 1,
)
time.sleep(THROTTLE_SECONDS)
continue
resp.raise_for_status()
venues_data.append({"tenant_id": tenant_id, "slots": resp.json()})
break
except niquests.exceptions.RequestException as e:
if attempt < MAX_RETRIES_PER_VENUE:
logger.warning(
"Request failed for %s (attempt %d): %s", tenant_id, attempt + 1, e
)
time.sleep(THROTTLE_SECONDS)
else:
logger.error(
"Giving up on %s after %d attempts: %s",
tenant_id,
MAX_RETRIES_PER_VENUE + 1,
e,
)
venues_errored += 1
else:
# All retries exhausted (loop completed without break)
venues_errored += 1
if (i + 1) % 100 == 0:
logger.info(
"Progress: %d/%d venues queried, %d errors",
i + 1,
len(venues_to_process),
venues_errored,
)
time.sleep(THROTTLE_SECONDS)
# Write consolidated file
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
payload = json.dumps(
{
"date": target_date,
"captured_at_utc": captured_at,
"venue_count": len(venues_data),
"venues_errored": venues_errored,
"venues": venues_data,
}
).encode()
bytes_written = write_gzip_atomic(dest, payload)
logger.info(
"%d venues scraped (%d errors) -> %s (%s bytes)",
len(venues_data),
venues_errored,
dest,
f"{bytes_written:,}",
)
return {
"files_written": 1,
"files_skipped": 0,
"bytes_written": bytes_written,
"cursor_value": f"{target_date}:{len(venues_to_process)}",
}
def main() -> None:
run_extractor(EXTRACTOR_NAME, extract)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,116 @@
"""Playtomic tenants extractor — venue listings via unauthenticated API.
Iterates over target-market bounding boxes with pagination, deduplicates
on tenant_id, and writes a single consolidated JSON to the landing zone.
Rate: 1 req / 2 s (see docs/data-sources-inventory.md §1.2).
Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.json.gz
"""
import json
import sqlite3
import time
from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import landing_path, write_gzip_atomic
logger = setup_logging("padelnomics.extract.playtomic_tenants")
EXTRACTOR_NAME = "playtomic_tenants"
PLAYTOMIC_TENANTS_URL = "https://api.playtomic.io/v1/tenants"
THROTTLE_SECONDS = 2
PAGE_SIZE = 20
MAX_PAGES_PER_BBOX = 500 # safety bound — prevents infinite pagination
# Target markets: Spain, UK/Ireland, Germany, France
BBOXES = [
{"min_latitude": 35.95, "min_longitude": -9.39, "max_latitude": 43.79, "max_longitude": 4.33},
{"min_latitude": 49.90, "min_longitude": -8.62, "max_latitude": 60.85, "max_longitude": 1.77},
{"min_latitude": 47.27, "min_longitude": 5.87, "max_latitude": 55.06, "max_longitude": 15.04},
{"min_latitude": 41.36, "min_longitude": -5.14, "max_latitude": 51.09, "max_longitude": 9.56},
]
def extract(
landing_dir: Path,
year_month: str,
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Fetch all Playtomic venues across target markets. Returns run metrics."""
year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "playtomic", year, month)
dest = dest_dir / "tenants.json.gz"
all_tenants: list[dict] = []
seen_ids: set[str] = set()
for bbox in BBOXES:
for page in range(MAX_PAGES_PER_BBOX):
params = {
"sport_ids": "PADEL",
"min_latitude": bbox["min_latitude"],
"min_longitude": bbox["min_longitude"],
"max_latitude": bbox["max_latitude"],
"max_longitude": bbox["max_longitude"],
"offset": page * PAGE_SIZE,
"size": PAGE_SIZE,
}
logger.info(
"GET page=%d bbox=(%.1f,%.1f,%.1f,%.1f)",
page,
bbox["min_latitude"],
bbox["min_longitude"],
bbox["max_latitude"],
bbox["max_longitude"],
)
resp = session.get(PLAYTOMIC_TENANTS_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS)
resp.raise_for_status()
tenants = resp.json()
assert isinstance(tenants, list), (
f"Expected list from Playtomic API, got {type(tenants)}"
)
new_count = 0
for tenant in tenants:
tid = tenant.get("tenant_id") or tenant.get("id")
if tid and tid not in seen_ids:
seen_ids.add(tid)
all_tenants.append(tenant)
new_count += 1
logger.info(
"page=%d got=%d new=%d total=%d", page, len(tenants), new_count, len(all_tenants)
)
if len(tenants) < PAGE_SIZE:
break
time.sleep(THROTTLE_SECONDS)
payload = json.dumps({"tenants": all_tenants, "count": len(all_tenants)}).encode()
bytes_written = write_gzip_atomic(dest, payload)
logger.info("%d unique venues -> %s", len(all_tenants), dest)
return {
"files_written": 1,
"files_skipped": 0,
"bytes_written": bytes_written,
"cursor_value": year_month,
}
def main() -> None:
run_extractor(EXTRACTOR_NAME, extract)
if __name__ == "__main__":
main()

View File

@@ -103,6 +103,7 @@ def get_last_cursor(conn: sqlite3.Connection, extractor: str) -> str | None:
# File I/O helpers # File I/O helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def landing_path(landing_dir: str | Path, *parts: str) -> Path: def landing_path(landing_dir: str | Path, *parts: str) -> Path:
"""Return path to a subdirectory of landing_dir, creating it if absent.""" """Return path to a subdirectory of landing_dir, creating it if absent."""
path = Path(landing_dir).joinpath(*parts) path = Path(landing_dir).joinpath(*parts)

2
uv.lock generated
View File

@@ -1180,7 +1180,7 @@ requires-dist = [
[[package]] [[package]]
name = "padelnomics-extract" name = "padelnomics-extract"
version = "0.1.0" version = "0.2.0"
source = { editable = "extract/padelnomics_extract" } source = { editable = "extract/padelnomics_extract" }
dependencies = [ dependencies = [
{ name = "niquests" }, { name = "niquests" },