feat(extract): parallel DAG scheduler + proxy rotation for tenants
- all.py: replace sequential loop with graphlib.TopologicalSorter + ThreadPoolExecutor
- EXTRACTORS dict declares (func, [deps]) — self-documenting dependency graph
- 8 extractors run in parallel immediately; availability starts as soon as
tenants finishes (not after all others complete)
- max_workers=len(EXTRACTORS) — all I/O-bound, no CPU contention
- playtomic_tenants.py: add proxy rotation via make_round_robin_cycler
- no throttle when PROXY_URLS set (IP rotation removes per-IP rate concern)
- keeps 2s throttle for direct runs
- _shared.py: add optional proxy_url param to run_extractor()
- any extractor can opt in to proxy support via the shared session
- overpass_tennis.py: fix query timeout (out body → out center, timeout 180 → 300)
- out center returns centroids only, not full geometry — fits within server limits
- playtomic_availability.py: fix CIRCUIT_BREAKER_THRESHOLD empty string crash
- int(os.environ.get(..., "10")) → int(os.environ.get(...) or "10")
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -41,12 +41,17 @@ def setup_logging(name: str) -> logging.Logger:
|
|||||||
def run_extractor(
|
def run_extractor(
|
||||||
extractor_name: str,
|
extractor_name: str,
|
||||||
func,
|
func,
|
||||||
|
proxy_url: str | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Boilerplate wrapper: open state DB, start run, call func, end run.
|
"""Boilerplate wrapper: open state DB, start run, call func, end run.
|
||||||
|
|
||||||
func signature: func(landing_dir, year_month, conn, session) -> dict
|
func signature: func(landing_dir, year_month, conn, session) -> dict
|
||||||
The dict must contain: files_written, files_skipped, bytes_written.
|
The dict must contain: files_written, files_skipped, bytes_written.
|
||||||
Optional: cursor_value.
|
Optional: cursor_value.
|
||||||
|
|
||||||
|
proxy_url: if set, configure the session proxy before calling func.
|
||||||
|
Extractors that manage their own proxy logic (e.g. playtomic_availability)
|
||||||
|
ignore the shared session and are unaffected.
|
||||||
"""
|
"""
|
||||||
LANDING_DIR.mkdir(parents=True, exist_ok=True)
|
LANDING_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
conn = open_state_db(LANDING_DIR)
|
conn = open_state_db(LANDING_DIR)
|
||||||
@@ -58,6 +63,8 @@ def run_extractor(
|
|||||||
try:
|
try:
|
||||||
with niquests.Session() as session:
|
with niquests.Session() as session:
|
||||||
session.headers["User-Agent"] = USER_AGENT
|
session.headers["User-Agent"] = USER_AGENT
|
||||||
|
if proxy_url:
|
||||||
|
session.proxies = {"http": proxy_url, "https": proxy_url}
|
||||||
result = func(LANDING_DIR, year_month, conn, session)
|
result = func(LANDING_DIR, year_month, conn, session)
|
||||||
|
|
||||||
assert isinstance(result, dict), f"extractor must return a dict, got {type(result)}"
|
assert isinstance(result, dict), f"extractor must return a dict, got {type(result)}"
|
||||||
|
|||||||
@@ -1,9 +1,20 @@
|
|||||||
"""Run all extractors sequentially.
|
"""Run all extractors with dependency-aware parallel execution.
|
||||||
|
|
||||||
Entry point for the combined `uv run extract` command.
|
Entry point for the combined `uv run extract` command.
|
||||||
Each extractor gets its own state tracking row in .state.sqlite.
|
|
||||||
|
Extractors are declared as a dict mapping name → (func, [dependencies]).
|
||||||
|
A graphlib.TopologicalSorter schedules them: tasks with no unmet dependencies
|
||||||
|
run immediately in parallel; each completion may unlock new tasks.
|
||||||
|
|
||||||
|
Current dependency graph:
|
||||||
|
- All 8 non-availability extractors have no dependencies (run in parallel)
|
||||||
|
- playtomic_availability depends on playtomic_tenants (starts as soon as
|
||||||
|
tenants finishes, even if other extractors are still running)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
|
||||||
|
from graphlib import TopologicalSorter
|
||||||
|
|
||||||
from ._shared import run_extractor, setup_logging
|
from ._shared import run_extractor, setup_logging
|
||||||
from .census_usa import EXTRACTOR_NAME as CENSUS_USA_NAME
|
from .census_usa import EXTRACTOR_NAME as CENSUS_USA_NAME
|
||||||
from .census_usa import extract as extract_census_usa
|
from .census_usa import extract as extract_census_usa
|
||||||
@@ -26,31 +37,68 @@ from .playtomic_tenants import extract as extract_tenants
|
|||||||
|
|
||||||
logger = setup_logging("padelnomics.extract")
|
logger = setup_logging("padelnomics.extract")
|
||||||
|
|
||||||
EXTRACTORS = [
|
# Declarative: name → (func, [dependency names])
|
||||||
(OVERPASS_NAME, extract_overpass),
|
# Add new extractors here; the scheduler handles ordering and parallelism.
|
||||||
(OVERPASS_TENNIS_NAME, extract_overpass_tennis),
|
EXTRACTORS: dict[str, tuple] = {
|
||||||
(EUROSTAT_NAME, extract_eurostat),
|
OVERPASS_NAME: (extract_overpass, []),
|
||||||
(EUROSTAT_CITY_LABELS_NAME, extract_eurostat_city_labels),
|
OVERPASS_TENNIS_NAME: (extract_overpass_tennis, []),
|
||||||
(CENSUS_USA_NAME, extract_census_usa),
|
EUROSTAT_NAME: (extract_eurostat, []),
|
||||||
(ONS_UK_NAME, extract_ons_uk),
|
EUROSTAT_CITY_LABELS_NAME: (extract_eurostat_city_labels, []),
|
||||||
(GEONAMES_NAME, extract_geonames),
|
CENSUS_USA_NAME: (extract_census_usa, []),
|
||||||
(TENANTS_NAME, extract_tenants),
|
ONS_UK_NAME: (extract_ons_uk, []),
|
||||||
(AVAILABILITY_NAME, extract_availability),
|
GEONAMES_NAME: (extract_geonames, []),
|
||||||
]
|
TENANTS_NAME: (extract_tenants, []),
|
||||||
|
AVAILABILITY_NAME: (extract_availability, [TENANTS_NAME]),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _run_safe(name: str) -> bool:
|
||||||
|
"""Run one extractor, return True on success."""
|
||||||
|
func, _ = EXTRACTORS[name]
|
||||||
|
try:
|
||||||
|
run_extractor(name, func)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Extractor %s failed", name)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
"""Run all extractors. Each gets its own state row."""
|
"""Run all extractors respecting declared dependencies, maximally parallel."""
|
||||||
logger.info("Running %d extractors", len(EXTRACTORS))
|
logger.info("Running %d extractors", len(EXTRACTORS))
|
||||||
|
|
||||||
for i, (name, func) in enumerate(EXTRACTORS, 1):
|
graph = {name: set(deps) for name, (_, deps) in EXTRACTORS.items()}
|
||||||
logger.info("[%d/%d] %s", i, len(EXTRACTORS), name)
|
ts = TopologicalSorter(graph)
|
||||||
try:
|
ts.prepare()
|
||||||
run_extractor(name, func)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Extractor %s failed — continuing with next", name)
|
|
||||||
|
|
||||||
logger.info("All extractors complete")
|
failed: list[str] = []
|
||||||
|
with ThreadPoolExecutor(max_workers=len(EXTRACTORS)) as pool:
|
||||||
|
futures: dict = {}
|
||||||
|
|
||||||
|
# Submit all initially ready tasks (no dependencies)
|
||||||
|
for name in ts.get_ready():
|
||||||
|
futures[pool.submit(_run_safe, name)] = name
|
||||||
|
|
||||||
|
# Process completions and submit newly-unblocked tasks
|
||||||
|
while futures:
|
||||||
|
done_set, _ = wait(futures, return_when=FIRST_COMPLETED)
|
||||||
|
for f in done_set:
|
||||||
|
name = futures.pop(f)
|
||||||
|
ok = f.result()
|
||||||
|
if ok:
|
||||||
|
logger.info("done: %s", name)
|
||||||
|
else:
|
||||||
|
failed.append(name)
|
||||||
|
logger.warning("FAILED: %s", name)
|
||||||
|
ts.done(name)
|
||||||
|
|
||||||
|
for ready in ts.get_ready():
|
||||||
|
futures[pool.submit(_run_safe, ready)] = ready
|
||||||
|
|
||||||
|
if failed:
|
||||||
|
logger.warning("Completed with %d failure(s): %s", len(failed), ", ".join(failed))
|
||||||
|
else:
|
||||||
|
logger.info("All %d extractors complete", len(EXTRACTORS))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -27,13 +27,13 @@ OVERPASS_URL = "https://overpass-api.de/api/interpreter"
|
|||||||
TENNIS_OVERPASS_TIMEOUT_SECONDS = OVERPASS_TIMEOUT_SECONDS * 3
|
TENNIS_OVERPASS_TIMEOUT_SECONDS = OVERPASS_TIMEOUT_SECONDS * 3
|
||||||
|
|
||||||
OVERPASS_QUERY = (
|
OVERPASS_QUERY = (
|
||||||
"[out:json][timeout:180];\n"
|
"[out:json][timeout:300];\n"
|
||||||
"(\n"
|
"(\n"
|
||||||
' node["sport"="tennis"];\n'
|
' node["sport"="tennis"];\n'
|
||||||
' way["sport"="tennis"];\n'
|
' way["sport"="tennis"];\n'
|
||||||
' relation["sport"="tennis"];\n'
|
' relation["sport"="tennis"];\n'
|
||||||
");\n"
|
");\n"
|
||||||
"out body;"
|
"out center;"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -10,7 +10,8 @@ API notes (discovered 2026-02):
|
|||||||
- `size=100` is the maximum effective page size
|
- `size=100` is the maximum effective page size
|
||||||
- ~14K venues globally as of Feb 2026
|
- ~14K venues globally as of Feb 2026
|
||||||
|
|
||||||
Rate: 1 req / 2 s (see docs/data-sources-inventory.md §1.2).
|
Rate: 1 req / 2 s when running direct (see docs/data-sources-inventory.md §1.2).
|
||||||
|
No throttle when PROXY_URLS is set — IP rotation removes per-IP rate concern.
|
||||||
|
|
||||||
Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.json.gz
|
Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.json.gz
|
||||||
"""
|
"""
|
||||||
@@ -23,6 +24,7 @@ from pathlib import Path
|
|||||||
import niquests
|
import niquests
|
||||||
|
|
||||||
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
|
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
|
||||||
|
from .proxy import load_proxy_urls, make_round_robin_cycler
|
||||||
from .utils import landing_path, write_gzip_atomic
|
from .utils import landing_path, write_gzip_atomic
|
||||||
|
|
||||||
logger = setup_logging("padelnomics.extract.playtomic_tenants")
|
logger = setup_logging("padelnomics.extract.playtomic_tenants")
|
||||||
@@ -46,10 +48,22 @@ def extract(
|
|||||||
dest_dir = landing_path(landing_dir, "playtomic", year, month)
|
dest_dir = landing_path(landing_dir, "playtomic", year, month)
|
||||||
dest = dest_dir / "tenants.json.gz"
|
dest = dest_dir / "tenants.json.gz"
|
||||||
|
|
||||||
|
proxy_urls = load_proxy_urls()
|
||||||
|
cycler = make_round_robin_cycler(proxy_urls) if proxy_urls else None
|
||||||
|
if cycler:
|
||||||
|
logger.info("proxy rotation enabled (%d proxies, no throttle)", len(proxy_urls))
|
||||||
|
else:
|
||||||
|
logger.info("no proxies configured — throttle %ds per page", THROTTLE_SECONDS)
|
||||||
|
|
||||||
all_tenants: list[dict] = []
|
all_tenants: list[dict] = []
|
||||||
seen_ids: set[str] = set()
|
seen_ids: set[str] = set()
|
||||||
|
|
||||||
for page in range(MAX_PAGES):
|
for page in range(MAX_PAGES):
|
||||||
|
if cycler:
|
||||||
|
proxy = cycler["next_proxy"]()
|
||||||
|
if proxy:
|
||||||
|
session.proxies = {"http": proxy, "https": proxy}
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
"sport_ids": "PADEL",
|
"sport_ids": "PADEL",
|
||||||
"size": PAGE_SIZE,
|
"size": PAGE_SIZE,
|
||||||
@@ -82,7 +96,8 @@ def extract(
|
|||||||
if len(tenants) < PAGE_SIZE:
|
if len(tenants) < PAGE_SIZE:
|
||||||
break
|
break
|
||||||
|
|
||||||
time.sleep(THROTTLE_SECONDS)
|
if not cycler:
|
||||||
|
time.sleep(THROTTLE_SECONDS)
|
||||||
|
|
||||||
payload = json.dumps({"tenants": all_tenants, "count": len(all_tenants)}).encode()
|
payload = json.dumps({"tenants": all_tenants, "count": len(all_tenants)}).encode()
|
||||||
bytes_written = write_gzip_atomic(dest, payload)
|
bytes_written = write_gzip_atomic(dest, payload)
|
||||||
|
|||||||
Reference in New Issue
Block a user