# Padelnomics Extraction Fetches raw data from external sources to the local landing zone. The pipeline then reads from the landing zone — extraction and transformation are fully decoupled. ## Running ```bash # One-shot (most recent data only) LANDING_DIR=data/landing uv run extract # First-time full backfill (add your own backfill entry point) LANDING_DIR=data/landing uv run python -m padelnomics_extract.execute ``` ## Design: filesystem as state The landing zone is an append-only store of raw files. Each file is named by its content fingerprint (etag or SHA256 hash), so: - **Idempotency**: running twice writes nothing if the source hasn't changed - **Debugging**: every historical raw file is preserved — reprocess any window by re-running transforms - **Safety**: extraction never mutates existing files, only appends new ones ### Etag-based dedup (preferred) When the source provides an `ETag` header, use it as the filename: ``` data/landing/padelnomics/{year}/{month:02d}/{etag}.csv.gz ``` The file existing on disk means the content matches the server's current version. No content download needed. ### Hash-based dedup (fallback) When the source has no etag (static files that update in-place), download the content and use its SHA256 prefix as the filename: ``` data/landing/padelnomics/{year}/{date}_{sha256[:8]}.csv.gz ``` Two runs that produce identical content produce the same hash → same filename → skip. ## State tracking Every run writes one row to `data/landing/.state.sqlite`. Query it to answer operational questions: ```bash # When did extraction last succeed? sqlite3 data/landing/.state.sqlite \ "SELECT extractor, started_at, status, files_written, files_skipped, cursor_value FROM extraction_runs ORDER BY run_id DESC LIMIT 10" # Did anything fail in the last 7 days? sqlite3 data/landing/.state.sqlite \ "SELECT * FROM extraction_runs WHERE status = 'failed' AND started_at > datetime('now', '-7 days')" ``` State table schema: | Column | Type | Description | |--------|------|-------------| | `run_id` | INTEGER | Auto-increment primary key | | `extractor` | TEXT | Extractor name (e.g. `padelnomics`) | | `started_at` | TEXT | ISO 8601 UTC timestamp | | `finished_at` | TEXT | ISO 8601 UTC timestamp, NULL if still running | | `status` | TEXT | `running` → `success` or `failed` | | `files_written` | INTEGER | New files written this run | | `files_skipped` | INTEGER | Files already present (content unchanged) | | `bytes_written` | INTEGER | Compressed bytes written | | `cursor_value` | TEXT | Last successful cursor (date, etag, page, etc.) | | `error_message` | TEXT | Exception message if status = `failed` | ## Adding a new extractor 1. Add a function in `execute.py` following the same pattern as `extract_file_by_etag()` or `extract_file_by_hash()` 2. Call it from `extract_dataset()` with its own `extractor` name in `start_run()` 3. Store files under a new subdirectory: `landing_path(LANDING_DIR, "my_new_source", year)` 4. Add a new SQLMesh `raw/` model that reads from the new subdirectory glob ## Landing zone structure ``` data/landing/ ├── .state.sqlite # extraction run history └── padelnomics/ # one subdirectory per source └── {year}/ └── {month:02d}/ └── {etag}.csv.gz # immutable, content-addressed files ```