feat(daas): merge extraction pipelines (Overpass, Eurostat, Playtomic)

This commit is contained in:
Deeman
2026-02-21 21:48:26 +01:00
7 changed files with 436 additions and 0 deletions

View File

@@ -7,6 +7,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased]
### Added
- `extract/padelnomics_extract` workspace member: Overpass API (padel courts via OSM), Eurostat city demographics (`urb_cpop1`, `ilc_di03`), and Playtomic unauthenticated tenant search extractors
- Landing zone structure at `data/landing/` with per-source subdirectories: `overpass/`, `eurostat/`, `playtomic/`
- `.env.example` entries for `DUCKDB_PATH` and `LANDING_DIR`
- content: `scripts/seed_content.py` — seeds two article templates (EN + DE) and 18 cities × 2 language rows into the database; run with `uv run python -m padelnomics.scripts.seed_content --generate` to produce 36 pre-built SEO articles covering Germany (8 cities), USA (6 cities), and UK (4 cities); each city has realistic per-market overrides for rates, rent, utilities, permits, and court configuration so the financial model produces genuinely unique output per article
- content: EN template (`city-padel-cost-en`) at `/padel-cost/{{ city_slug }}` and DE template (`city-padel-cost-de`) at `/padel-kosten/{{ city_slug }}` with Jinja2 Markdown bodies embedding `[scenario:slug:section]` cards for summary, CAPEX, operating, cashflow, and returns

View File

@@ -60,3 +60,7 @@ LITESTREAM_R2_BUCKET=
LITESTREAM_R2_ACCESS_KEY_ID=
LITESTREAM_R2_SECRET_ACCESS_KEY=
LITESTREAM_R2_ENDPOINT=
# DaaS analytics
DUCKDB_PATH=data/lakehouse.duckdb
LANDING_DIR=data/landing

View File

@@ -0,0 +1,19 @@
[project]
name = "padelnomics_extract"
version = "0.1.0"
description = "Data extraction pipelines for padelnomics"
requires-python = ">=3.11"
dependencies = [
"niquests>=3.14.0",
"python-dotenv>=1.0.0",
]
[project.scripts]
extract = "padelnomics_extract.execute:extract_dataset"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/padelnomics_extract"]

View File

@@ -0,0 +1,220 @@
"""
Extraction pipelines — downloads source data into the landing zone.
Environment:
LANDING_DIR — local path for landing zone (default: data/landing)
"""
import gzip
import json
import os
import time
from datetime import UTC, datetime
from pathlib import Path
import niquests
LANDING_DIR = Path(os.environ.get("LANDING_DIR", "data/landing"))
OVERPASS_URL = "https://overpass-api.de/api/interpreter"
EUROSTAT_BASE_URL = "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data"
PLAYTOMIC_TENANTS_URL = "https://api.playtomic.io/v1/tenants"
TIMEOUT_SECONDS = 30
OVERPASS_TIMEOUT_SECONDS = 90 # Overpass can be slow on global queries
# Eurostat datasets to fetch
EUROSTAT_DATASETS = [
"urb_cpop1", # Urban Audit — city population
"ilc_di03", # Median equivalised net income by NUTS2
]
# Playtomic geo-search bounding boxes [min_lat, min_lon, max_lat, max_lon]
# Target markets: Spain, UK, Germany, France
PLAYTOMIC_BBOXES = [
{"min_latitude": 35.95, "min_longitude": -9.39, "max_latitude": 43.79, "max_longitude": 4.33},
{"min_latitude": 49.90, "min_longitude": -8.62, "max_latitude": 60.85, "max_longitude": 1.77},
{"min_latitude": 47.27, "min_longitude": 5.87, "max_latitude": 55.06, "max_longitude": 15.04},
{"min_latitude": 41.36, "min_longitude": -5.14, "max_latitude": 51.09, "max_longitude": 9.56},
]
def _write_gz(dest: Path, data: bytes) -> None:
"""Write bytes as gzip to dest, creating parent dirs as needed."""
assert dest.suffix == ".gz", f"dest must end in .gz: {dest}"
dest.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(dest, "wb") as f:
f.write(data)
def _etag_path(dest: Path) -> Path:
"""Return the sibling .etag file path for a given dest."""
return dest.parent / (dest.name + ".etag")
def extract_file(url: str, dest: Path, *, use_etag: bool = True) -> bool:
"""
GET url and write response body to dest as gzip. Returns True if new data
was fetched, False if the server returned 304 Not Modified.
"""
assert url, "url must not be empty"
headers: dict[str, str] = {}
etag_file = _etag_path(dest) if use_etag else None
if etag_file and etag_file.exists():
headers["If-None-Match"] = etag_file.read_text().strip()
resp = niquests.get(url, headers=headers, timeout=TIMEOUT_SECONDS)
if resp.status_code == 304:
return False
resp.raise_for_status()
_write_gz(dest, resp.content)
if etag_file and (etag := resp.headers.get("etag")):
etag_file.parent.mkdir(parents=True, exist_ok=True)
etag_file.write_text(etag)
return True
def extract_overpass(landing_dir: Path, year_month: str) -> None:
"""
POST a global OverpassQL query for padel courts (sport=padel) and write raw
OSM JSON to the landing zone.
Landing: {landing_dir}/overpass/{year}/{month}/courts.json.gz
"""
year, month = year_month.split("/")
dest = landing_dir / "overpass" / year / month / "courts.json.gz"
query = (
"[out:json][timeout:60];\n"
"(\n"
' node["sport"="padel"];\n'
' way["sport"="padel"];\n'
' relation["sport"="padel"];\n'
");\n"
"out body;"
)
print(f" [overpass] POST {OVERPASS_URL}")
resp = niquests.post(
OVERPASS_URL,
data={"data": query},
timeout=OVERPASS_TIMEOUT_SECONDS,
)
resp.raise_for_status()
size_bytes = len(resp.content)
print(f" [overpass] {size_bytes:,} bytes received")
_write_gz(dest, resp.content)
print(f" [overpass] -> {dest}")
def extract_eurostat(landing_dir: Path, year_month: str) -> None:
"""
Fetch Eurostat city-level demographic datasets (JSON format) and write to
the landing zone. Uses etag deduplication — data only changes ~twice a year.
Landing: {landing_dir}/eurostat/{year}/{month}/{dataset_code}.json.gz
"""
year, month = year_month.split("/")
for dataset_code in EUROSTAT_DATASETS:
url = f"{EUROSTAT_BASE_URL}/{dataset_code}?format=JSON&lang=EN"
dest = landing_dir / "eurostat" / year / month / f"{dataset_code}.json.gz"
print(f" [eurostat] GET {dataset_code}")
fetched = extract_file(url, dest, use_etag=True)
if fetched:
size_bytes = dest.stat().st_size
print(f" [eurostat] {dataset_code} updated -> {dest} ({size_bytes:,} bytes compressed)")
else:
print(f" [eurostat] {dataset_code} not modified (304)")
def extract_playtomic_tenants(landing_dir: Path, year_month: str) -> None:
"""
Fetch Playtomic venue listings via the unauthenticated tenant search endpoint.
Iterates over target-market bounding boxes with pagination, deduplicates on
tenant_id, and writes a single consolidated JSON to the landing zone.
Rate: 1 req / 2 s as documented in the data-sources inventory.
Landing: {landing_dir}/playtomic/{year}/{month}/tenants.json.gz
"""
year, month = year_month.split("/")
dest = landing_dir / "playtomic" / year / month / "tenants.json.gz"
all_tenants: list[dict] = []
seen_ids: set[str] = set()
page_size = 20
for bbox in PLAYTOMIC_BBOXES:
page = 0
while True:
params = {
"sport_ids": "PADEL",
"min_latitude": bbox["min_latitude"],
"min_longitude": bbox["min_longitude"],
"max_latitude": bbox["max_latitude"],
"max_longitude": bbox["max_longitude"],
"offset": page * page_size,
"size": page_size,
}
print(
f" [playtomic] GET page={page} "
f"bbox=({bbox['min_latitude']:.1f},{bbox['min_longitude']:.1f},"
f"{bbox['max_latitude']:.1f},{bbox['max_longitude']:.1f})"
)
resp = niquests.get(PLAYTOMIC_TENANTS_URL, params=params, timeout=TIMEOUT_SECONDS)
resp.raise_for_status()
tenants = resp.json()
assert isinstance(tenants, list), (
f"Expected list from Playtomic API, got {type(tenants)}"
)
new_count = 0
for tenant in tenants:
tid = tenant.get("tenant_id") or tenant.get("id")
if tid and tid not in seen_ids:
seen_ids.add(tid)
all_tenants.append(tenant)
new_count += 1
print(f" [playtomic] page={page} got={len(tenants)} new={new_count} total={len(all_tenants)}")
if len(tenants) < page_size:
break
page += 1
time.sleep(2) # throttle
payload = json.dumps({"tenants": all_tenants, "count": len(all_tenants)}).encode()
_write_gz(dest, payload)
print(f" [playtomic] {len(all_tenants)} unique venues -> {dest}")
def extract_dataset() -> None:
"""Entry point: run all extractors sequentially."""
today = datetime.now(UTC)
year_month = today.strftime("%Y/%m")
print(f"extract_dataset start: landing_dir={LANDING_DIR} period={year_month}")
print("\n[1/3] Overpass API — padel courts (OSM)")
extract_overpass(LANDING_DIR, year_month)
print("\n[2/3] Eurostat — city demographics")
extract_eurostat(LANDING_DIR, year_month)
print("\n[3/3] Playtomic — venue listings (unauthenticated)")
extract_playtomic_tenants(LANDING_DIR, year_month)
print("\nextract_dataset: done")

View File

@@ -1,11 +1,13 @@
[tool.uv.workspace]
members = [
"web",
"extract/padelnomics_extract",
]
[dependency-groups]
dev = [
"hypothesis>=6.151.6",
"niquests>=3.14.0",
"playwright>=1.58.0",
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",

188
padelnomics/uv.lock generated

File diff suppressed because one or more lines are too long