feat: migrate transform to 3-layer architecture with per-layer schemas
Remove raw/ layer — staging models now read landing JSON directly. Rename all model schemas from padelnomics.* to staging.*/foundation.*/serving.*. Web app queries updated to serving.planner_defaults via SERVING_DUCKDB_PATH. Supervisor gets daily sleep interval between pipeline runs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -5,86 +5,121 @@ Fetches raw data from external sources to the local landing zone. The pipeline t
|
||||
## Running
|
||||
|
||||
```bash
|
||||
# One-shot (most recent data only)
|
||||
# Run all extractors sequentially
|
||||
LANDING_DIR=data/landing uv run extract
|
||||
|
||||
# First-time full backfill (add your own backfill entry point)
|
||||
LANDING_DIR=data/landing uv run python -m padelnomics_extract.execute
|
||||
# Run a single extractor
|
||||
LANDING_DIR=data/landing uv run extract-overpass
|
||||
LANDING_DIR=data/landing uv run extract-eurostat
|
||||
LANDING_DIR=data/landing uv run extract-playtomic-tenants
|
||||
LANDING_DIR=data/landing uv run extract-playtomic-availability
|
||||
```
|
||||
|
||||
## Architecture: one file per source
|
||||
|
||||
Each data source lives in its own module with a dedicated CLI entry point:
|
||||
|
||||
```
|
||||
src/padelnomics_extract/
|
||||
├── __init__.py
|
||||
├── _shared.py # LANDING_DIR, logger, run_extractor() wrapper
|
||||
├── utils.py # SQLite state tracking, atomic I/O helpers
|
||||
├── overpass.py # OSM padel courts via Overpass API
|
||||
├── eurostat.py # Eurostat city demographics (urb_cpop1, ilc_di03)
|
||||
├── playtomic_tenants.py # Playtomic venue listings (tenant search)
|
||||
├── playtomic_availability.py # Playtomic booking slots (next-day availability)
|
||||
└── all.py # Runs all extractors sequentially
|
||||
```
|
||||
|
||||
### Adding a new extractor
|
||||
|
||||
1. Create `my_source.py` following the pattern:
|
||||
|
||||
```python
|
||||
from ._shared import run_extractor, setup_logging
|
||||
from .utils import landing_path, write_gzip_atomic
|
||||
|
||||
logger = setup_logging("padelnomics.extract.my_source")
|
||||
EXTRACTOR_NAME = "my_source"
|
||||
|
||||
def extract(landing_dir, year_month, conn, session):
|
||||
"""Returns {"files_written": N, "bytes_written": N, ...}."""
|
||||
year, month = year_month.split("/")
|
||||
dest_dir = landing_path(landing_dir, "my_source", year, month)
|
||||
# ... fetch data, write to dest_dir ...
|
||||
return {"files_written": 1, "files_skipped": 0, "bytes_written": n}
|
||||
|
||||
def main():
|
||||
run_extractor(EXTRACTOR_NAME, extract)
|
||||
```
|
||||
|
||||
2. Add entry point to `pyproject.toml`:
|
||||
```toml
|
||||
extract-my-source = "padelnomics_extract.my_source:main"
|
||||
```
|
||||
|
||||
3. Import in `all.py` and add to `EXTRACTORS` list.
|
||||
|
||||
4. Add a staging model in `transform/sqlmesh_padelnomics/models/staging/`.
|
||||
|
||||
## Design: filesystem as state
|
||||
|
||||
The landing zone is an append-only store of raw files. Each file is named by its content fingerprint (etag or SHA256 hash), so:
|
||||
The landing zone is an append-only store of raw files:
|
||||
|
||||
- **Idempotency**: running twice writes nothing if the source hasn't changed
|
||||
- **Debugging**: every historical raw file is preserved — reprocess any window by re-running transforms
|
||||
- **Debugging**: every historical raw file is preserved
|
||||
- **Safety**: extraction never mutates existing files, only appends new ones
|
||||
|
||||
### Etag-based dedup (preferred)
|
||||
### Etag-based dedup (Eurostat)
|
||||
|
||||
When the source provides an `ETag` header, use it as the filename:
|
||||
When the source provides an `ETag` header, store it in a sibling `.etag` file.
|
||||
On the next request, send `If-None-Match` — 304 means skip.
|
||||
|
||||
```
|
||||
data/landing/padelnomics/{year}/{month:02d}/{etag}.csv.gz
|
||||
```
|
||||
### Content-addressed (Overpass, Playtomic)
|
||||
|
||||
The file existing on disk means the content matches the server's current version. No content download needed.
|
||||
|
||||
### Hash-based dedup (fallback)
|
||||
|
||||
When the source has no etag (static files that update in-place), download the content and use its SHA256 prefix as the filename:
|
||||
|
||||
```
|
||||
data/landing/padelnomics/{year}/{date}_{sha256[:8]}.csv.gz
|
||||
```
|
||||
|
||||
Two runs that produce identical content produce the same hash → same filename → skip.
|
||||
Files named by date or content. `write_gzip_atomic()` writes to a `.tmp` sibling
|
||||
then renames — never leaves partial files on crash.
|
||||
|
||||
## State tracking
|
||||
|
||||
Every run writes one row to `data/landing/.state.sqlite`. Query it to answer operational questions:
|
||||
Every run writes one row to `data/landing/.state.sqlite`:
|
||||
|
||||
```bash
|
||||
# When did extraction last succeed?
|
||||
sqlite3 data/landing/.state.sqlite \
|
||||
"SELECT extractor, started_at, status, files_written, files_skipped, cursor_value
|
||||
"SELECT extractor, started_at, status, files_written, cursor_value
|
||||
FROM extraction_runs ORDER BY run_id DESC LIMIT 10"
|
||||
|
||||
# Did anything fail in the last 7 days?
|
||||
sqlite3 data/landing/.state.sqlite \
|
||||
"SELECT * FROM extraction_runs WHERE status = 'failed'
|
||||
AND started_at > datetime('now', '-7 days')"
|
||||
```
|
||||
|
||||
State table schema:
|
||||
|
||||
| Column | Type | Description |
|
||||
|--------|------|-------------|
|
||||
| `run_id` | INTEGER | Auto-increment primary key |
|
||||
| `extractor` | TEXT | Extractor name (e.g. `padelnomics`) |
|
||||
| `extractor` | TEXT | Extractor name (e.g. `overpass`, `eurostat`) |
|
||||
| `started_at` | TEXT | ISO 8601 UTC timestamp |
|
||||
| `finished_at` | TEXT | ISO 8601 UTC timestamp, NULL if still running |
|
||||
| `finished_at` | TEXT | ISO 8601 UTC timestamp |
|
||||
| `status` | TEXT | `running` → `success` or `failed` |
|
||||
| `files_written` | INTEGER | New files written this run |
|
||||
| `files_skipped` | INTEGER | Files already present (content unchanged) |
|
||||
| `files_skipped` | INTEGER | Files already present |
|
||||
| `bytes_written` | INTEGER | Compressed bytes written |
|
||||
| `cursor_value` | TEXT | Last successful cursor (date, etag, page, etc.) |
|
||||
| `error_message` | TEXT | Exception message if status = `failed` |
|
||||
|
||||
## Adding a new extractor
|
||||
|
||||
1. Add a function in `execute.py` following the same pattern as `extract_file_by_etag()` or `extract_file_by_hash()`
|
||||
2. Call it from `extract_dataset()` with its own `extractor` name in `start_run()`
|
||||
3. Store files under a new subdirectory: `landing_path(LANDING_DIR, "my_new_source", year)`
|
||||
4. Add a new SQLMesh `raw/` model that reads from the new subdirectory glob
|
||||
| `cursor_value` | TEXT | Resume cursor (date, index, etc.) |
|
||||
| `error_message` | TEXT | Exception message if failed |
|
||||
|
||||
## Landing zone structure
|
||||
|
||||
```
|
||||
data/landing/
|
||||
├── .state.sqlite # extraction run history
|
||||
└── padelnomics/ # one subdirectory per source
|
||||
└── {year}/
|
||||
└── {month:02d}/
|
||||
└── {etag}.csv.gz # immutable, content-addressed files
|
||||
├── .state.sqlite
|
||||
├── overpass/{year}/{month}/courts.json.gz
|
||||
├── eurostat/{year}/{month}/urb_cpop1.json.gz
|
||||
├── eurostat/{year}/{month}/ilc_di03.json.gz
|
||||
├── playtomic/{year}/{month}/tenants.json.gz
|
||||
└── playtomic/{year}/{month}/availability_{date}.json.gz
|
||||
```
|
||||
|
||||
## Data sources
|
||||
|
||||
| Source | Module | Schedule | Notes |
|
||||
|--------|--------|----------|-------|
|
||||
| Overpass API | `overpass.py` | Daily | OSM padel courts, ~5K nodes |
|
||||
| Eurostat | `eurostat.py` | Daily (304 most runs) | urb_cpop1, ilc_di03 — etag dedup |
|
||||
| Playtomic tenants | `playtomic_tenants.py` | Daily | ~8K venues, bounded pagination |
|
||||
| Playtomic availability | `playtomic_availability.py` | Daily | Next-day slots, ~4.5h runtime |
|
||||
|
||||
Reference in New Issue
Block a user