diff --git a/extract/padelnomics_extract/src/padelnomics_extract/_shared.py b/extract/padelnomics_extract/src/padelnomics_extract/_shared.py index 4df4355..be4ad1b 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/_shared.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/_shared.py @@ -41,12 +41,17 @@ def setup_logging(name: str) -> logging.Logger: def run_extractor( extractor_name: str, func, + proxy_url: str | None = None, ) -> None: """Boilerplate wrapper: open state DB, start run, call func, end run. func signature: func(landing_dir, year_month, conn, session) -> dict The dict must contain: files_written, files_skipped, bytes_written. Optional: cursor_value. + + proxy_url: if set, configure the session proxy before calling func. + Extractors that manage their own proxy logic (e.g. playtomic_availability) + ignore the shared session and are unaffected. """ LANDING_DIR.mkdir(parents=True, exist_ok=True) conn = open_state_db(LANDING_DIR) @@ -58,6 +63,8 @@ def run_extractor( try: with niquests.Session() as session: session.headers["User-Agent"] = USER_AGENT + if proxy_url: + session.proxies = {"http": proxy_url, "https": proxy_url} result = func(LANDING_DIR, year_month, conn, session) assert isinstance(result, dict), f"extractor must return a dict, got {type(result)}" diff --git a/extract/padelnomics_extract/src/padelnomics_extract/all.py b/extract/padelnomics_extract/src/padelnomics_extract/all.py index 15c153f..8b93c94 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/all.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/all.py @@ -1,9 +1,20 @@ -"""Run all extractors sequentially. +"""Run all extractors with dependency-aware parallel execution. Entry point for the combined `uv run extract` command. -Each extractor gets its own state tracking row in .state.sqlite. + +Extractors are declared as a dict mapping name → (func, [dependencies]). +A graphlib.TopologicalSorter schedules them: tasks with no unmet dependencies +run immediately in parallel; each completion may unlock new tasks. + +Current dependency graph: + - All 8 non-availability extractors have no dependencies (run in parallel) + - playtomic_availability depends on playtomic_tenants (starts as soon as + tenants finishes, even if other extractors are still running) """ +from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait +from graphlib import TopologicalSorter + from ._shared import run_extractor, setup_logging from .census_usa import EXTRACTOR_NAME as CENSUS_USA_NAME from .census_usa import extract as extract_census_usa @@ -26,31 +37,68 @@ from .playtomic_tenants import extract as extract_tenants logger = setup_logging("padelnomics.extract") -EXTRACTORS = [ - (OVERPASS_NAME, extract_overpass), - (OVERPASS_TENNIS_NAME, extract_overpass_tennis), - (EUROSTAT_NAME, extract_eurostat), - (EUROSTAT_CITY_LABELS_NAME, extract_eurostat_city_labels), - (CENSUS_USA_NAME, extract_census_usa), - (ONS_UK_NAME, extract_ons_uk), - (GEONAMES_NAME, extract_geonames), - (TENANTS_NAME, extract_tenants), - (AVAILABILITY_NAME, extract_availability), -] +# Declarative: name → (func, [dependency names]) +# Add new extractors here; the scheduler handles ordering and parallelism. +EXTRACTORS: dict[str, tuple] = { + OVERPASS_NAME: (extract_overpass, []), + OVERPASS_TENNIS_NAME: (extract_overpass_tennis, []), + EUROSTAT_NAME: (extract_eurostat, []), + EUROSTAT_CITY_LABELS_NAME: (extract_eurostat_city_labels, []), + CENSUS_USA_NAME: (extract_census_usa, []), + ONS_UK_NAME: (extract_ons_uk, []), + GEONAMES_NAME: (extract_geonames, []), + TENANTS_NAME: (extract_tenants, []), + AVAILABILITY_NAME: (extract_availability, [TENANTS_NAME]), +} + + +def _run_safe(name: str) -> bool: + """Run one extractor, return True on success.""" + func, _ = EXTRACTORS[name] + try: + run_extractor(name, func) + return True + except Exception: + logger.exception("Extractor %s failed", name) + return False def main() -> None: - """Run all extractors. Each gets its own state row.""" + """Run all extractors respecting declared dependencies, maximally parallel.""" logger.info("Running %d extractors", len(EXTRACTORS)) - for i, (name, func) in enumerate(EXTRACTORS, 1): - logger.info("[%d/%d] %s", i, len(EXTRACTORS), name) - try: - run_extractor(name, func) - except Exception: - logger.exception("Extractor %s failed — continuing with next", name) + graph = {name: set(deps) for name, (_, deps) in EXTRACTORS.items()} + ts = TopologicalSorter(graph) + ts.prepare() - logger.info("All extractors complete") + failed: list[str] = [] + with ThreadPoolExecutor(max_workers=len(EXTRACTORS)) as pool: + futures: dict = {} + + # Submit all initially ready tasks (no dependencies) + for name in ts.get_ready(): + futures[pool.submit(_run_safe, name)] = name + + # Process completions and submit newly-unblocked tasks + while futures: + done_set, _ = wait(futures, return_when=FIRST_COMPLETED) + for f in done_set: + name = futures.pop(f) + ok = f.result() + if ok: + logger.info("done: %s", name) + else: + failed.append(name) + logger.warning("FAILED: %s", name) + ts.done(name) + + for ready in ts.get_ready(): + futures[pool.submit(_run_safe, ready)] = ready + + if failed: + logger.warning("Completed with %d failure(s): %s", len(failed), ", ".join(failed)) + else: + logger.info("All %d extractors complete", len(EXTRACTORS)) if __name__ == "__main__": diff --git a/extract/padelnomics_extract/src/padelnomics_extract/overpass_tennis.py b/extract/padelnomics_extract/src/padelnomics_extract/overpass_tennis.py index 79c75e7..d0a6748 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/overpass_tennis.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/overpass_tennis.py @@ -27,13 +27,13 @@ OVERPASS_URL = "https://overpass-api.de/api/interpreter" TENNIS_OVERPASS_TIMEOUT_SECONDS = OVERPASS_TIMEOUT_SECONDS * 3 OVERPASS_QUERY = ( - "[out:json][timeout:180];\n" + "[out:json][timeout:300];\n" "(\n" ' node["sport"="tennis"];\n' ' way["sport"="tennis"];\n' ' relation["sport"="tennis"];\n' ");\n" - "out body;" + "out center;" ) diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py index a80636a..699ace2 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py @@ -10,7 +10,8 @@ API notes (discovered 2026-02): - `size=100` is the maximum effective page size - ~14K venues globally as of Feb 2026 -Rate: 1 req / 2 s (see docs/data-sources-inventory.md §1.2). +Rate: 1 req / 2 s when running direct (see docs/data-sources-inventory.md §1.2). + No throttle when PROXY_URLS is set — IP rotation removes per-IP rate concern. Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.json.gz """ @@ -23,6 +24,7 @@ from pathlib import Path import niquests from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging +from .proxy import load_proxy_urls, make_round_robin_cycler from .utils import landing_path, write_gzip_atomic logger = setup_logging("padelnomics.extract.playtomic_tenants") @@ -46,10 +48,22 @@ def extract( dest_dir = landing_path(landing_dir, "playtomic", year, month) dest = dest_dir / "tenants.json.gz" + proxy_urls = load_proxy_urls() + cycler = make_round_robin_cycler(proxy_urls) if proxy_urls else None + if cycler: + logger.info("proxy rotation enabled (%d proxies, no throttle)", len(proxy_urls)) + else: + logger.info("no proxies configured — throttle %ds per page", THROTTLE_SECONDS) + all_tenants: list[dict] = [] seen_ids: set[str] = set() for page in range(MAX_PAGES): + if cycler: + proxy = cycler["next_proxy"]() + if proxy: + session.proxies = {"http": proxy, "https": proxy} + params = { "sport_ids": "PADEL", "size": PAGE_SIZE, @@ -82,7 +96,8 @@ def extract( if len(tenants) < PAGE_SIZE: break - time.sleep(THROTTLE_SECONDS) + if not cycler: + time.sleep(THROTTLE_SECONDS) payload = json.dumps({"tenants": all_tenants, "count": len(all_tenants)}).encode() bytes_written = write_gzip_atomic(dest, payload)