feat(extract): regional overpass_tennis splitting + JSONL output
Replace single global Overpass query (150K+ elements, times out) with
10 regional bbox queries (~10-40K elements each, 150s server / 180s client).
- REGIONS: 10 bboxes covering all continents
- Crash recovery: working.jsonl accumulates per-region results;
already_seen_ids deduplication skips re-written elements on restart
- Overlapping bbox elements deduped by OSM id across regions
- Retry per region: up to 2 retries with 30s cooldown
- Polite 5s inter-region delay
- Skip if courts.jsonl.gz or courts.json.gz already exists for the month
stg_tennis_courts: UNION ALL transition (jsonl_elements + blob_elements)
- jsonl_elements: JSONL, explicit columns, COALESCE lat/lon with center coords
(supports both node direct lat/lon and way/relation Overpass out center)
- blob_elements: existing UNNEST(elements) pattern, unchanged
- Removed osm_type='node' filter — ways/relations now usable via center coords
- Dedup on (osm_id, extracted_date DESC) unchanged
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,40 +1,77 @@
|
||||
"""Overpass API extractor — global tennis court locations from OpenStreetMap.
|
||||
|
||||
Queries the Overpass API for all nodes/ways/relations tagged sport=tennis.
|
||||
Tennis court density near a location is a proxy for racket-sport culture —
|
||||
areas with many tennis clubs are prime candidates for padel adoption.
|
||||
Queries the Overpass API for all nodes/ways/relations tagged sport=tennis,
|
||||
split across 10 geographic regions to avoid timeout on the ~150K+ global result.
|
||||
|
||||
The query returns ~150K+ results globally (vs ~5K for padel), so a higher
|
||||
Overpass timeout is used.
|
||||
Regional strategy:
|
||||
- Each region is a bounding box covering a continent or sub-continent
|
||||
- Each region is queried independently (POST with [bbox:...])
|
||||
- Overlapping bboxes are deduped on OSM element id
|
||||
- One region per POST (~10-40K elements each, well within Overpass limits)
|
||||
- Crash recovery: working JSONL accumulates completed regions; on restart
|
||||
already-written IDs are skipped, completed regions produce 0 new elements
|
||||
|
||||
Landing: {LANDING_DIR}/overpass_tennis/{year}/{month}/courts.json.gz
|
||||
Landing: {LANDING_DIR}/overpass_tennis/{year}/{month}/courts.jsonl.gz
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import niquests
|
||||
|
||||
from ._shared import OVERPASS_TIMEOUT_SECONDS, run_extractor, setup_logging
|
||||
from .utils import landing_path, write_gzip_atomic
|
||||
from ._shared import run_extractor, setup_logging
|
||||
from .utils import compress_jsonl_atomic, landing_path, load_partial_results
|
||||
|
||||
logger = setup_logging("padelnomics.extract.overpass_tennis")
|
||||
|
||||
EXTRACTOR_NAME = "overpass_tennis"
|
||||
OVERPASS_URL = "https://overpass-api.de/api/interpreter"
|
||||
|
||||
# Tennis returns ~150K+ elements globally vs ~5K for padel — use 3× timeout.
|
||||
TENNIS_OVERPASS_TIMEOUT_SECONDS = OVERPASS_TIMEOUT_SECONDS * 3
|
||||
# Each region is [south, west, north, east] — Overpass bbox format
|
||||
REGIONS = [
|
||||
{"name": "europe_west", "bbox": "35.0,-11.0,61.0,8.0"}, # FR ES GB PT IE BE NL
|
||||
{"name": "europe_central", "bbox": "42.0,8.0,55.5,24.0"}, # DE IT AT CH CZ PL HU
|
||||
{"name": "europe_east", "bbox": "35.0,24.0,72.0,60.0"}, # Nordics Baltics GR TR RO
|
||||
{"name": "north_america", "bbox": "15.0,-170.0,72.0,-50.0"}, # US CA MX
|
||||
{"name": "south_america", "bbox": "-56.0,-82.0,15.0,-34.0"}, # BR AR CL
|
||||
{"name": "asia_east", "bbox": "18.0,73.0,54.0,150.0"}, # JP KR CN
|
||||
{"name": "asia_west", "bbox": "-11.0,24.0,42.0,73.0"}, # Middle East India
|
||||
{"name": "oceania", "bbox": "-50.0,110.0,5.0,180.0"}, # AU NZ
|
||||
{"name": "africa", "bbox": "-35.0,-18.0,37.0,52.0"}, # ZA EG MA
|
||||
{"name": "asia_north", "bbox": "42.0,60.0,82.0,180.0"}, # RU-east KZ
|
||||
]
|
||||
|
||||
OVERPASS_QUERY = (
|
||||
"[out:json][timeout:300];\n"
|
||||
MAX_RETRIES_PER_REGION = 2
|
||||
RETRY_DELAY_SECONDS = 30 # Overpass cooldown between retries
|
||||
REGION_TIMEOUT_SECONDS = 180 # Client-side per-region timeout (server uses 150s)
|
||||
INTER_REGION_DELAY_SECONDS = 5 # Polite delay between regions
|
||||
|
||||
|
||||
def _region_query(bbox: str) -> str:
|
||||
"""Build an Overpass QL query for tennis courts within a bounding box."""
|
||||
return (
|
||||
f"[out:json][timeout:150][bbox:{bbox}];\n"
|
||||
"(\n"
|
||||
' node["sport"="tennis"];\n'
|
||||
' way["sport"="tennis"];\n'
|
||||
' relation["sport"="tennis"];\n'
|
||||
" node[\"sport\"=\"tennis\"];\n"
|
||||
" way[\"sport\"=\"tennis\"];\n"
|
||||
" rel[\"sport\"=\"tennis\"];\n"
|
||||
");\n"
|
||||
"out center;"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def _query_region(session: niquests.Session, region: dict) -> list[dict]:
|
||||
"""POST one regional Overpass query. Returns list of OSM elements."""
|
||||
query = _region_query(region["bbox"])
|
||||
resp = session.post(
|
||||
OVERPASS_URL,
|
||||
data={"data": query},
|
||||
timeout=REGION_TIMEOUT_SECONDS,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json().get("elements", [])
|
||||
|
||||
|
||||
def extract(
|
||||
@@ -43,24 +80,84 @@ def extract(
|
||||
conn: sqlite3.Connection,
|
||||
session: niquests.Session,
|
||||
) -> dict:
|
||||
"""POST OverpassQL query for tennis courts and write raw OSM JSON. Returns run metrics."""
|
||||
"""Query Overpass for global tennis courts using regional bbox splitting.
|
||||
|
||||
Splits the global query into REGIONS to avoid Overpass timeout.
|
||||
Writes one OSM element per line to courts.jsonl.gz.
|
||||
Crash-safe: working.jsonl accumulates results; on restart already-written
|
||||
element IDs are skipped so completed regions produce 0 new elements.
|
||||
"""
|
||||
assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}"
|
||||
assert "/" in year_month and len(year_month) == 7, f"year_month must be YYYY/MM: {year_month!r}"
|
||||
|
||||
year, month = year_month.split("/")
|
||||
dest_dir = landing_path(landing_dir, "overpass_tennis", year, month)
|
||||
dest = dest_dir / "courts.json.gz"
|
||||
dest = dest_dir / "courts.jsonl.gz"
|
||||
old_blob = dest_dir / "courts.json.gz"
|
||||
|
||||
logger.info("POST %s (sport=tennis, ~150K+ results expected)", OVERPASS_URL)
|
||||
resp = session.post(
|
||||
OVERPASS_URL,
|
||||
data={"data": OVERPASS_QUERY},
|
||||
timeout=TENNIS_OVERPASS_TIMEOUT_SECONDS,
|
||||
if dest.exists() or old_blob.exists():
|
||||
logger.info("Already have courts for %s — skipping", year_month)
|
||||
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||
|
||||
# Crash recovery: load already-written elements from the working file
|
||||
working_path = dest_dir / "courts.working.jsonl"
|
||||
prior_records, already_seen_ids = load_partial_results(working_path, id_key="id")
|
||||
if already_seen_ids:
|
||||
logger.info("Resuming: %d elements already in working file", len(already_seen_ids))
|
||||
|
||||
total_new = 0
|
||||
regions_succeeded: list[str] = []
|
||||
regions_failed: list[str] = []
|
||||
|
||||
working_file = open(working_path, "a") # noqa: SIM115
|
||||
try:
|
||||
for i, region in enumerate(REGIONS):
|
||||
for attempt in range(MAX_RETRIES_PER_REGION + 1):
|
||||
try:
|
||||
elements = _query_region(session, region)
|
||||
new_elements = [e for e in elements if str(e.get("id", "")) not in already_seen_ids]
|
||||
for elem in new_elements:
|
||||
working_file.write(json.dumps(elem, separators=(",", ":")) + "\n")
|
||||
already_seen_ids.add(str(elem["id"]))
|
||||
working_file.flush()
|
||||
total_new += len(new_elements)
|
||||
regions_succeeded.append(region["name"])
|
||||
logger.info(
|
||||
"Region %s: %d elements (%d new, %d total)",
|
||||
region["name"], len(elements), len(new_elements), len(already_seen_ids),
|
||||
)
|
||||
break
|
||||
except niquests.exceptions.RequestException as exc:
|
||||
if attempt < MAX_RETRIES_PER_REGION:
|
||||
logger.warning(
|
||||
"Region %s attempt %d failed: %s — retrying in %ds",
|
||||
region["name"], attempt + 1, exc, RETRY_DELAY_SECONDS,
|
||||
)
|
||||
time.sleep(RETRY_DELAY_SECONDS)
|
||||
else:
|
||||
regions_failed.append(region["name"])
|
||||
logger.error(
|
||||
"Region %s failed after %d attempts: %s",
|
||||
region["name"], MAX_RETRIES_PER_REGION + 1, exc,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
|
||||
size_bytes = len(resp.content)
|
||||
logger.info("%s bytes received", f"{size_bytes:,}")
|
||||
if i < len(REGIONS) - 1:
|
||||
time.sleep(INTER_REGION_DELAY_SECONDS)
|
||||
finally:
|
||||
working_file.close()
|
||||
|
||||
bytes_written = write_gzip_atomic(dest, resp.content)
|
||||
logger.info("wrote %s (%s bytes compressed)", dest, f"{bytes_written:,}")
|
||||
total_elements = len(prior_records) + total_new
|
||||
if total_elements == 0:
|
||||
raise RuntimeError(f"All regions failed, no elements written: {regions_failed}")
|
||||
|
||||
if regions_failed:
|
||||
logger.warning("Completed with %d failed regions: %s", len(regions_failed), regions_failed)
|
||||
|
||||
bytes_written = compress_jsonl_atomic(working_path, dest)
|
||||
logger.info(
|
||||
"%d total elements (%d regions, %d failed) -> %s (%s bytes)",
|
||||
total_elements, len(regions_succeeded), len(regions_failed), dest, f"{bytes_written:,}",
|
||||
)
|
||||
|
||||
return {
|
||||
"files_written": 1,
|
||||
|
||||
@@ -2,7 +2,12 @@
|
||||
-- Used as a "racket sport culture" signal in the opportunity score:
|
||||
-- areas with high tennis court density are prime padel adoption markets.
|
||||
--
|
||||
-- Source: data/landing/overpass_tennis/{year}/{month}/courts.json.gz
|
||||
-- Supports two landing formats (UNION ALL during migration):
|
||||
-- New: courts.jsonl.gz — one OSM element per line; nodes have lat/lon directly,
|
||||
-- ways/relations have center.lat/center.lon (Overpass out center)
|
||||
-- Old: courts.json.gz — {"elements": [...]} blob (UNNEST required)
|
||||
--
|
||||
-- Source: data/landing/overpass_tennis/{year}/{month}/courts.{jsonl,json}.gz
|
||||
|
||||
MODEL (
|
||||
name staging.stg_tennis_courts,
|
||||
@@ -11,7 +16,39 @@ MODEL (
|
||||
grain osm_id
|
||||
);
|
||||
|
||||
WITH parsed AS (
|
||||
WITH
|
||||
-- New format: one OSM element per JSONL line
|
||||
jsonl_elements AS (
|
||||
SELECT
|
||||
type AS osm_type,
|
||||
TRY_CAST(id AS BIGINT) AS osm_id,
|
||||
-- Nodes: lat/lon direct. Ways/relations: center object (Overpass out center).
|
||||
COALESCE(
|
||||
TRY_CAST(lat AS DOUBLE),
|
||||
TRY_CAST(center ->> 'lat' AS DOUBLE)
|
||||
) AS lat,
|
||||
COALESCE(
|
||||
TRY_CAST(lon AS DOUBLE),
|
||||
TRY_CAST(center ->> 'lon' AS DOUBLE)
|
||||
) AS lon,
|
||||
tags ->> 'name' AS name,
|
||||
tags ->> 'addr:country' AS country_code,
|
||||
tags ->> 'addr:city' AS city_tag,
|
||||
filename AS source_file,
|
||||
CURRENT_DATE AS extracted_date
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/overpass_tennis/*/*/courts.jsonl.gz',
|
||||
format = 'newline_delimited',
|
||||
columns = {
|
||||
type: 'VARCHAR', id: 'BIGINT', lat: 'DOUBLE', lon: 'DOUBLE',
|
||||
center: 'JSON', tags: 'JSON'
|
||||
},
|
||||
filename = true
|
||||
)
|
||||
WHERE type IS NOT NULL
|
||||
),
|
||||
-- Old format: {"elements": [...]} blob — kept for transition
|
||||
blob_elements AS (
|
||||
SELECT
|
||||
elem ->> 'type' AS osm_type,
|
||||
(elem ->> 'id')::BIGINT AS osm_id,
|
||||
@@ -32,12 +69,16 @@ WITH parsed AS (
|
||||
)
|
||||
WHERE (elem ->> 'type') IS NOT NULL
|
||||
),
|
||||
parsed AS (
|
||||
SELECT * FROM jsonl_elements
|
||||
UNION ALL
|
||||
SELECT * FROM blob_elements
|
||||
),
|
||||
deduped AS (
|
||||
SELECT *,
|
||||
ROW_NUMBER() OVER (PARTITION BY osm_id ORDER BY extracted_date DESC) AS rn
|
||||
FROM parsed
|
||||
WHERE osm_type = 'node'
|
||||
AND lat IS NOT NULL AND lon IS NOT NULL
|
||||
WHERE lat IS NOT NULL AND lon IS NOT NULL
|
||||
AND lat BETWEEN -90 AND 90
|
||||
AND lon BETWEEN -180 AND 180
|
||||
),
|
||||
|
||||
Reference in New Issue
Block a user