Files
padelnomics/extract/padelnomics_extract
Deeman a1faddbed6 feat: Python supervisor + feature flags
Supervisor (replaces supervisor.sh):
- supervisor.py — cron-based pipeline orchestration, reads workflows.toml
  on every tick, runs due extractors in topological waves with parallel
  execution, then SQLMesh transform + serving export
- workflows.toml — workflow registry: overpass (monthly), eurostat (monthly),
  playtomic_tenants (weekly), playtomic_availability (daily),
  playtomic_recheck (hourly 6–23)
- padelnomics-supervisor.service — updated ExecStart to Python supervisor

Extraction enhancements:
- proxy.py — optional round-robin/sticky proxy rotation via PROXY_URLS env
- playtomic_availability.py — parallel fetch (EXTRACT_WORKERS), recheck mode
  (main_recheck) re-queries imminent slots for accurate occupancy measurement
- _shared.py — realistic browser User-Agent on all extractor sessions
- stg_playtomic_availability.sql — reads morning + recheck snapshots, tags each
- fct_daily_availability.sql — prefers recheck over morning for same slot

Feature flags (replaces WAITLIST_MODE env var):
- migration 0019 — feature_flags table, 5 initial flags:
  markets (on), payments/planner_export/supplier_signup/lead_unlock (off)
- core.py — is_flag_enabled() + feature_gate() decorator
- routes — payments, markets, planner_export, supplier_signup, lead_unlock gated
- admin flags UI — /admin/flags toggle page + nav link
- app.py — flag() injected as Jinja2 global

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 13:53:45 +01:00
..

Padelnomics Extraction

Fetches raw data from external sources to the local landing zone. The pipeline then reads from the landing zone — extraction and transformation are fully decoupled.

Running

# Run all extractors sequentially
LANDING_DIR=data/landing uv run extract

# Run a single extractor
LANDING_DIR=data/landing uv run extract-overpass
LANDING_DIR=data/landing uv run extract-eurostat
LANDING_DIR=data/landing uv run extract-playtomic-tenants
LANDING_DIR=data/landing uv run extract-playtomic-availability

Architecture: one file per source

Each data source lives in its own module with a dedicated CLI entry point:

src/padelnomics_extract/
├── __init__.py
├── _shared.py                  # LANDING_DIR, logger, run_extractor() wrapper
├── utils.py                    # SQLite state tracking, atomic I/O helpers
├── overpass.py                 # OSM padel courts via Overpass API
├── eurostat.py                 # Eurostat city demographics (urb_cpop1, ilc_di03)
├── playtomic_tenants.py        # Playtomic venue listings (tenant search)
├── playtomic_availability.py   # Playtomic booking slots (next-day availability)
└── all.py                      # Runs all extractors sequentially

Adding a new extractor

  1. Create my_source.py following the pattern:
from ._shared import run_extractor, setup_logging
from .utils import landing_path, write_gzip_atomic

logger = setup_logging("padelnomics.extract.my_source")
EXTRACTOR_NAME = "my_source"

def extract(landing_dir, year_month, conn, session):
    """Returns {"files_written": N, "bytes_written": N, ...}."""
    year, month = year_month.split("/")
    dest_dir = landing_path(landing_dir, "my_source", year, month)
    # ... fetch data, write to dest_dir ...
    return {"files_written": 1, "files_skipped": 0, "bytes_written": n}

def main():
    run_extractor(EXTRACTOR_NAME, extract)
  1. Add entry point to pyproject.toml:
extract-my-source = "padelnomics_extract.my_source:main"
  1. Import in all.py and add to EXTRACTORS list.

  2. Add a staging model in transform/sqlmesh_padelnomics/models/staging/.

Design: filesystem as state

The landing zone is an append-only store of raw files:

  • Idempotency: running twice writes nothing if the source hasn't changed
  • Debugging: every historical raw file is preserved
  • Safety: extraction never mutates existing files, only appends new ones

Etag-based dedup (Eurostat)

When the source provides an ETag header, store it in a sibling .etag file. On the next request, send If-None-Match — 304 means skip.

Content-addressed (Overpass, Playtomic)

Files named by date or content. write_gzip_atomic() writes to a .tmp sibling then renames — never leaves partial files on crash.

State tracking

Every run writes one row to data/landing/.state.sqlite:

sqlite3 data/landing/.state.sqlite \
  "SELECT extractor, started_at, status, files_written, cursor_value
   FROM extraction_runs ORDER BY run_id DESC LIMIT 10"
Column Type Description
run_id INTEGER Auto-increment primary key
extractor TEXT Extractor name (e.g. overpass, eurostat)
started_at TEXT ISO 8601 UTC timestamp
finished_at TEXT ISO 8601 UTC timestamp
status TEXT runningsuccess or failed
files_written INTEGER New files written this run
files_skipped INTEGER Files already present
bytes_written INTEGER Compressed bytes written
cursor_value TEXT Resume cursor (date, index, etc.)
error_message TEXT Exception message if failed

Landing zone structure

data/landing/
├── .state.sqlite
├── overpass/{year}/{month}/courts.json.gz
├── eurostat/{year}/{month}/urb_cpop1.json.gz
├── eurostat/{year}/{month}/ilc_di03.json.gz
├── playtomic/{year}/{month}/tenants.json.gz
└── playtomic/{year}/{month}/availability_{date}.json.gz

Data sources

Source Module Schedule Notes
Overpass API overpass.py Daily OSM padel courts, ~5K nodes
Eurostat eurostat.py Daily (304 most runs) urb_cpop1, ilc_di03 — etag dedup
Playtomic tenants playtomic_tenants.py Daily ~8K venues, bounded pagination
Playtomic availability playtomic_availability.py Daily Next-day slots, ~4.5h runtime