feat: copier update v0.9.0 — extraction docs, state tracking, architecture guides
Sync template from 29ac25b → v0.9.0 (29 template commits). Due to template's _subdirectory migration, new files were manually rendered rather than auto-merged by copier. New files: - .claude/CLAUDE.md + coding_philosophy.md (agent instructions) - extract utils.py: SQLite state tracking for extraction runs - extract/transform READMEs: architecture & pattern documentation - infra/supervisor: systemd service + orchestration script - Per-layer model READMEs (raw, staging, foundation, serving) Also fixes copier-answers.yml (adds 4 feature toggles, removes stale payment_provider key) and scopes CLAUDE.md gitignore to root only. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
90
extract/padelnomics_extract/README.md
Normal file
90
extract/padelnomics_extract/README.md
Normal file
@@ -0,0 +1,90 @@
|
||||
# Padelnomics Extraction
|
||||
|
||||
Fetches raw data from external sources to the local landing zone. The pipeline then reads from the landing zone — extraction and transformation are fully decoupled.
|
||||
|
||||
## Running
|
||||
|
||||
```bash
|
||||
# One-shot (most recent data only)
|
||||
LANDING_DIR=data/landing uv run extract
|
||||
|
||||
# First-time full backfill (add your own backfill entry point)
|
||||
LANDING_DIR=data/landing uv run python -m padelnomics_extract.execute
|
||||
```
|
||||
|
||||
## Design: filesystem as state
|
||||
|
||||
The landing zone is an append-only store of raw files. Each file is named by its content fingerprint (etag or SHA256 hash), so:
|
||||
|
||||
- **Idempotency**: running twice writes nothing if the source hasn't changed
|
||||
- **Debugging**: every historical raw file is preserved — reprocess any window by re-running transforms
|
||||
- **Safety**: extraction never mutates existing files, only appends new ones
|
||||
|
||||
### Etag-based dedup (preferred)
|
||||
|
||||
When the source provides an `ETag` header, use it as the filename:
|
||||
|
||||
```
|
||||
data/landing/padelnomics/{year}/{month:02d}/{etag}.csv.gz
|
||||
```
|
||||
|
||||
The file existing on disk means the content matches the server's current version. No content download needed.
|
||||
|
||||
### Hash-based dedup (fallback)
|
||||
|
||||
When the source has no etag (static files that update in-place), download the content and use its SHA256 prefix as the filename:
|
||||
|
||||
```
|
||||
data/landing/padelnomics/{year}/{date}_{sha256[:8]}.csv.gz
|
||||
```
|
||||
|
||||
Two runs that produce identical content produce the same hash → same filename → skip.
|
||||
|
||||
## State tracking
|
||||
|
||||
Every run writes one row to `data/landing/.state.sqlite`. Query it to answer operational questions:
|
||||
|
||||
```bash
|
||||
# When did extraction last succeed?
|
||||
sqlite3 data/landing/.state.sqlite \
|
||||
"SELECT extractor, started_at, status, files_written, files_skipped, cursor_value
|
||||
FROM extraction_runs ORDER BY run_id DESC LIMIT 10"
|
||||
|
||||
# Did anything fail in the last 7 days?
|
||||
sqlite3 data/landing/.state.sqlite \
|
||||
"SELECT * FROM extraction_runs WHERE status = 'failed'
|
||||
AND started_at > datetime('now', '-7 days')"
|
||||
```
|
||||
|
||||
State table schema:
|
||||
|
||||
| Column | Type | Description |
|
||||
|--------|------|-------------|
|
||||
| `run_id` | INTEGER | Auto-increment primary key |
|
||||
| `extractor` | TEXT | Extractor name (e.g. `padelnomics`) |
|
||||
| `started_at` | TEXT | ISO 8601 UTC timestamp |
|
||||
| `finished_at` | TEXT | ISO 8601 UTC timestamp, NULL if still running |
|
||||
| `status` | TEXT | `running` → `success` or `failed` |
|
||||
| `files_written` | INTEGER | New files written this run |
|
||||
| `files_skipped` | INTEGER | Files already present (content unchanged) |
|
||||
| `bytes_written` | INTEGER | Compressed bytes written |
|
||||
| `cursor_value` | TEXT | Last successful cursor (date, etag, page, etc.) |
|
||||
| `error_message` | TEXT | Exception message if status = `failed` |
|
||||
|
||||
## Adding a new extractor
|
||||
|
||||
1. Add a function in `execute.py` following the same pattern as `extract_file_by_etag()` or `extract_file_by_hash()`
|
||||
2. Call it from `extract_dataset()` with its own `extractor` name in `start_run()`
|
||||
3. Store files under a new subdirectory: `landing_path(LANDING_DIR, "my_new_source", year)`
|
||||
4. Add a new SQLMesh `raw/` model that reads from the new subdirectory glob
|
||||
|
||||
## Landing zone structure
|
||||
|
||||
```
|
||||
data/landing/
|
||||
├── .state.sqlite # extraction run history
|
||||
└── padelnomics/ # one subdirectory per source
|
||||
└── {year}/
|
||||
└── {month:02d}/
|
||||
└── {etag}.csv.gz # immutable, content-addressed files
|
||||
```
|
||||
129
extract/padelnomics_extract/src/padelnomics_extract/utils.py
Normal file
129
extract/padelnomics_extract/src/padelnomics_extract/utils.py
Normal file
@@ -0,0 +1,129 @@
|
||||
"""Extraction utilities: SQLite state tracking, file I/O helpers.
|
||||
|
||||
These are inline equivalents of the extract_core library used in larger
|
||||
multi-extractor pipelines. For a single-package project they live here;
|
||||
if you add multiple data sources, extract them to a shared workspace package.
|
||||
"""
|
||||
|
||||
import gzip
|
||||
import hashlib
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# State tracking (SQLite — transactional, stdlib, no extra dependency)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_CREATE_TABLE_SQL = """
|
||||
CREATE TABLE IF NOT EXISTS extraction_runs (
|
||||
run_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
extractor TEXT NOT NULL,
|
||||
started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
|
||||
finished_at TEXT,
|
||||
status TEXT NOT NULL DEFAULT 'running',
|
||||
files_written INTEGER DEFAULT 0,
|
||||
files_skipped INTEGER DEFAULT 0,
|
||||
bytes_written INTEGER DEFAULT 0,
|
||||
cursor_value TEXT,
|
||||
error_message TEXT
|
||||
)
|
||||
"""
|
||||
|
||||
|
||||
def open_state_db(landing_dir: str | Path) -> sqlite3.Connection:
|
||||
"""Open (or create) .state.sqlite inside landing_dir.
|
||||
|
||||
WAL mode allows concurrent reads while a run is in progress.
|
||||
Caller is responsible for conn.close().
|
||||
"""
|
||||
db_path = Path(landing_dir) / ".state.sqlite"
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute(_CREATE_TABLE_SQL)
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
|
||||
def start_run(conn: sqlite3.Connection, extractor: str) -> int:
|
||||
"""Insert a 'running' row. Returns run_id."""
|
||||
cur = conn.execute(
|
||||
"INSERT INTO extraction_runs (extractor, status) VALUES (?, 'running')",
|
||||
(extractor,),
|
||||
)
|
||||
conn.commit()
|
||||
return cur.lastrowid
|
||||
|
||||
|
||||
def end_run(
|
||||
conn: sqlite3.Connection,
|
||||
run_id: int,
|
||||
*,
|
||||
status: str,
|
||||
files_written: int = 0,
|
||||
files_skipped: int = 0,
|
||||
bytes_written: int = 0,
|
||||
cursor_value: str | None = None,
|
||||
error_message: str | None = None,
|
||||
) -> None:
|
||||
"""Update the run row to its final state."""
|
||||
assert status in ("success", "failed")
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE extraction_runs
|
||||
SET finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
|
||||
status = ?,
|
||||
files_written = ?,
|
||||
files_skipped = ?,
|
||||
bytes_written = ?,
|
||||
cursor_value = ?,
|
||||
error_message = ?
|
||||
WHERE run_id = ?
|
||||
""",
|
||||
(status, files_written, files_skipped, bytes_written, cursor_value, error_message, run_id),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def get_last_cursor(conn: sqlite3.Connection, extractor: str) -> str | None:
|
||||
"""Return the cursor_value from the most recent successful run, or None."""
|
||||
row = conn.execute(
|
||||
"""
|
||||
SELECT cursor_value FROM extraction_runs
|
||||
WHERE extractor = ? AND status = 'success' AND cursor_value IS NOT NULL
|
||||
ORDER BY run_id DESC LIMIT 1
|
||||
""",
|
||||
(extractor,),
|
||||
).fetchone()
|
||||
return row["cursor_value"] if row else None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# File I/O helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def landing_path(landing_dir: str | Path, *parts: str) -> Path:
|
||||
"""Return path to a subdirectory of landing_dir, creating it if absent."""
|
||||
path = Path(landing_dir).joinpath(*parts)
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
return path
|
||||
|
||||
|
||||
def content_hash(data: bytes, prefix_bytes: int = 8) -> str:
|
||||
"""SHA256 content fingerprint — used as idempotency key in filenames."""
|
||||
assert data, "data must not be empty"
|
||||
return hashlib.sha256(data).hexdigest()[:prefix_bytes]
|
||||
|
||||
|
||||
def write_gzip_atomic(path: Path, data: bytes) -> int:
|
||||
"""Gzip compress data and write to path atomically via .tmp sibling.
|
||||
|
||||
Returns bytes written. Atomic write means readers never see a partial file.
|
||||
"""
|
||||
assert data, "data must not be empty"
|
||||
compressed = gzip.compress(data)
|
||||
tmp = path.with_suffix(path.suffix + ".tmp")
|
||||
tmp.write_bytes(compressed)
|
||||
tmp.rename(path)
|
||||
return len(compressed)
|
||||
Reference in New Issue
Block a user