diff --git a/extract/padelnomics_extract/src/padelnomics_extract/overpass_tennis.py b/extract/padelnomics_extract/src/padelnomics_extract/overpass_tennis.py index d0a6748..e7f1c0f 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/overpass_tennis.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/overpass_tennis.py @@ -1,40 +1,77 @@ """Overpass API extractor — global tennis court locations from OpenStreetMap. -Queries the Overpass API for all nodes/ways/relations tagged sport=tennis. -Tennis court density near a location is a proxy for racket-sport culture — -areas with many tennis clubs are prime candidates for padel adoption. +Queries the Overpass API for all nodes/ways/relations tagged sport=tennis, +split across 10 geographic regions to avoid timeout on the ~150K+ global result. -The query returns ~150K+ results globally (vs ~5K for padel), so a higher -Overpass timeout is used. +Regional strategy: + - Each region is a bounding box covering a continent or sub-continent + - Each region is queried independently (POST with [bbox:...]) + - Overlapping bboxes are deduped on OSM element id + - One region per POST (~10-40K elements each, well within Overpass limits) + - Crash recovery: working JSONL accumulates completed regions; on restart + already-written IDs are skipped, completed regions produce 0 new elements -Landing: {LANDING_DIR}/overpass_tennis/{year}/{month}/courts.json.gz +Landing: {LANDING_DIR}/overpass_tennis/{year}/{month}/courts.jsonl.gz """ +import json import sqlite3 +import time from pathlib import Path import niquests -from ._shared import OVERPASS_TIMEOUT_SECONDS, run_extractor, setup_logging -from .utils import landing_path, write_gzip_atomic +from ._shared import run_extractor, setup_logging +from .utils import compress_jsonl_atomic, landing_path, load_partial_results logger = setup_logging("padelnomics.extract.overpass_tennis") EXTRACTOR_NAME = "overpass_tennis" OVERPASS_URL = "https://overpass-api.de/api/interpreter" -# Tennis returns ~150K+ elements globally vs ~5K for padel — use 3× timeout. -TENNIS_OVERPASS_TIMEOUT_SECONDS = OVERPASS_TIMEOUT_SECONDS * 3 +# Each region is [south, west, north, east] — Overpass bbox format +REGIONS = [ + {"name": "europe_west", "bbox": "35.0,-11.0,61.0,8.0"}, # FR ES GB PT IE BE NL + {"name": "europe_central", "bbox": "42.0,8.0,55.5,24.0"}, # DE IT AT CH CZ PL HU + {"name": "europe_east", "bbox": "35.0,24.0,72.0,60.0"}, # Nordics Baltics GR TR RO + {"name": "north_america", "bbox": "15.0,-170.0,72.0,-50.0"}, # US CA MX + {"name": "south_america", "bbox": "-56.0,-82.0,15.0,-34.0"}, # BR AR CL + {"name": "asia_east", "bbox": "18.0,73.0,54.0,150.0"}, # JP KR CN + {"name": "asia_west", "bbox": "-11.0,24.0,42.0,73.0"}, # Middle East India + {"name": "oceania", "bbox": "-50.0,110.0,5.0,180.0"}, # AU NZ + {"name": "africa", "bbox": "-35.0,-18.0,37.0,52.0"}, # ZA EG MA + {"name": "asia_north", "bbox": "42.0,60.0,82.0,180.0"}, # RU-east KZ +] -OVERPASS_QUERY = ( - "[out:json][timeout:300];\n" - "(\n" - ' node["sport"="tennis"];\n' - ' way["sport"="tennis"];\n' - ' relation["sport"="tennis"];\n' - ");\n" - "out center;" -) +MAX_RETRIES_PER_REGION = 2 +RETRY_DELAY_SECONDS = 30 # Overpass cooldown between retries +REGION_TIMEOUT_SECONDS = 180 # Client-side per-region timeout (server uses 150s) +INTER_REGION_DELAY_SECONDS = 5 # Polite delay between regions + + +def _region_query(bbox: str) -> str: + """Build an Overpass QL query for tennis courts within a bounding box.""" + return ( + f"[out:json][timeout:150][bbox:{bbox}];\n" + "(\n" + " node[\"sport\"=\"tennis\"];\n" + " way[\"sport\"=\"tennis\"];\n" + " rel[\"sport\"=\"tennis\"];\n" + ");\n" + "out center;" + ) + + +def _query_region(session: niquests.Session, region: dict) -> list[dict]: + """POST one regional Overpass query. Returns list of OSM elements.""" + query = _region_query(region["bbox"]) + resp = session.post( + OVERPASS_URL, + data={"data": query}, + timeout=REGION_TIMEOUT_SECONDS, + ) + resp.raise_for_status() + return resp.json().get("elements", []) def extract( @@ -43,24 +80,84 @@ def extract( conn: sqlite3.Connection, session: niquests.Session, ) -> dict: - """POST OverpassQL query for tennis courts and write raw OSM JSON. Returns run metrics.""" + """Query Overpass for global tennis courts using regional bbox splitting. + + Splits the global query into REGIONS to avoid Overpass timeout. + Writes one OSM element per line to courts.jsonl.gz. + Crash-safe: working.jsonl accumulates results; on restart already-written + element IDs are skipped so completed regions produce 0 new elements. + """ + assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}" + assert "/" in year_month and len(year_month) == 7, f"year_month must be YYYY/MM: {year_month!r}" + year, month = year_month.split("/") dest_dir = landing_path(landing_dir, "overpass_tennis", year, month) - dest = dest_dir / "courts.json.gz" + dest = dest_dir / "courts.jsonl.gz" + old_blob = dest_dir / "courts.json.gz" - logger.info("POST %s (sport=tennis, ~150K+ results expected)", OVERPASS_URL) - resp = session.post( - OVERPASS_URL, - data={"data": OVERPASS_QUERY}, - timeout=TENNIS_OVERPASS_TIMEOUT_SECONDS, + if dest.exists() or old_blob.exists(): + logger.info("Already have courts for %s — skipping", year_month) + return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} + + # Crash recovery: load already-written elements from the working file + working_path = dest_dir / "courts.working.jsonl" + prior_records, already_seen_ids = load_partial_results(working_path, id_key="id") + if already_seen_ids: + logger.info("Resuming: %d elements already in working file", len(already_seen_ids)) + + total_new = 0 + regions_succeeded: list[str] = [] + regions_failed: list[str] = [] + + working_file = open(working_path, "a") # noqa: SIM115 + try: + for i, region in enumerate(REGIONS): + for attempt in range(MAX_RETRIES_PER_REGION + 1): + try: + elements = _query_region(session, region) + new_elements = [e for e in elements if str(e.get("id", "")) not in already_seen_ids] + for elem in new_elements: + working_file.write(json.dumps(elem, separators=(",", ":")) + "\n") + already_seen_ids.add(str(elem["id"])) + working_file.flush() + total_new += len(new_elements) + regions_succeeded.append(region["name"]) + logger.info( + "Region %s: %d elements (%d new, %d total)", + region["name"], len(elements), len(new_elements), len(already_seen_ids), + ) + break + except niquests.exceptions.RequestException as exc: + if attempt < MAX_RETRIES_PER_REGION: + logger.warning( + "Region %s attempt %d failed: %s — retrying in %ds", + region["name"], attempt + 1, exc, RETRY_DELAY_SECONDS, + ) + time.sleep(RETRY_DELAY_SECONDS) + else: + regions_failed.append(region["name"]) + logger.error( + "Region %s failed after %d attempts: %s", + region["name"], MAX_RETRIES_PER_REGION + 1, exc, + ) + + if i < len(REGIONS) - 1: + time.sleep(INTER_REGION_DELAY_SECONDS) + finally: + working_file.close() + + total_elements = len(prior_records) + total_new + if total_elements == 0: + raise RuntimeError(f"All regions failed, no elements written: {regions_failed}") + + if regions_failed: + logger.warning("Completed with %d failed regions: %s", len(regions_failed), regions_failed) + + bytes_written = compress_jsonl_atomic(working_path, dest) + logger.info( + "%d total elements (%d regions, %d failed) -> %s (%s bytes)", + total_elements, len(regions_succeeded), len(regions_failed), dest, f"{bytes_written:,}", ) - resp.raise_for_status() - - size_bytes = len(resp.content) - logger.info("%s bytes received", f"{size_bytes:,}") - - bytes_written = write_gzip_atomic(dest, resp.content) - logger.info("wrote %s (%s bytes compressed)", dest, f"{bytes_written:,}") return { "files_written": 1, diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_tennis_courts.sql b/transform/sqlmesh_padelnomics/models/staging/stg_tennis_courts.sql index 8821f45..c9c5577 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_tennis_courts.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_tennis_courts.sql @@ -2,7 +2,12 @@ -- Used as a "racket sport culture" signal in the opportunity score: -- areas with high tennis court density are prime padel adoption markets. -- --- Source: data/landing/overpass_tennis/{year}/{month}/courts.json.gz +-- Supports two landing formats (UNION ALL during migration): +-- New: courts.jsonl.gz — one OSM element per line; nodes have lat/lon directly, +-- ways/relations have center.lat/center.lon (Overpass out center) +-- Old: courts.json.gz — {"elements": [...]} blob (UNNEST required) +-- +-- Source: data/landing/overpass_tennis/{year}/{month}/courts.{jsonl,json}.gz MODEL ( name staging.stg_tennis_courts, @@ -11,7 +16,39 @@ MODEL ( grain osm_id ); -WITH parsed AS ( +WITH +-- New format: one OSM element per JSONL line +jsonl_elements AS ( + SELECT + type AS osm_type, + TRY_CAST(id AS BIGINT) AS osm_id, + -- Nodes: lat/lon direct. Ways/relations: center object (Overpass out center). + COALESCE( + TRY_CAST(lat AS DOUBLE), + TRY_CAST(center ->> 'lat' AS DOUBLE) + ) AS lat, + COALESCE( + TRY_CAST(lon AS DOUBLE), + TRY_CAST(center ->> 'lon' AS DOUBLE) + ) AS lon, + tags ->> 'name' AS name, + tags ->> 'addr:country' AS country_code, + tags ->> 'addr:city' AS city_tag, + filename AS source_file, + CURRENT_DATE AS extracted_date + FROM read_json( + @LANDING_DIR || '/overpass_tennis/*/*/courts.jsonl.gz', + format = 'newline_delimited', + columns = { + type: 'VARCHAR', id: 'BIGINT', lat: 'DOUBLE', lon: 'DOUBLE', + center: 'JSON', tags: 'JSON' + }, + filename = true + ) + WHERE type IS NOT NULL +), +-- Old format: {"elements": [...]} blob — kept for transition +blob_elements AS ( SELECT elem ->> 'type' AS osm_type, (elem ->> 'id')::BIGINT AS osm_id, @@ -32,12 +69,16 @@ WITH parsed AS ( ) WHERE (elem ->> 'type') IS NOT NULL ), +parsed AS ( + SELECT * FROM jsonl_elements + UNION ALL + SELECT * FROM blob_elements +), deduped AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY osm_id ORDER BY extracted_date DESC) AS rn FROM parsed - WHERE osm_type = 'node' - AND lat IS NOT NULL AND lon IS NOT NULL + WHERE lat IS NOT NULL AND lon IS NOT NULL AND lat BETWEEN -90 AND 90 AND lon BETWEEN -180 AND 180 ), @@ -54,8 +95,8 @@ with_country AS ( WHEN lat BETWEEN 36.35 AND 47.09 AND lon BETWEEN 6.62 AND 18.51 THEN 'IT' WHEN lat BETWEEN 37.00 AND 42.15 AND lon BETWEEN -9.50 AND -6.19 THEN 'PT' ELSE NULL - END) AS country_code, - NULLIF(TRIM(name), '') AS name, + END) AS country_code, + NULLIF(TRIM(name), '') AS name, NULLIF(TRIM(city_tag), '') AS city, extracted_date FROM deduped