diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md new file mode 100644 index 0000000..b3a614f --- /dev/null +++ b/.claude/CLAUDE.md @@ -0,0 +1,106 @@ +# CLAUDE.md — Padelnomics + +This file tells Claude Code how to work in this repository. + +## Project Overview + +Padelnomics is a SaaS application built with Quart (async Python), HTMX, and SQLite. +It includes a full data pipeline: + +``` +External APIs → extract → landing zone → SQLMesh transform → DuckDB → web app +``` + +**Packages** (uv workspace): +- `web/` — Quart + HTMX web application (auth, billing, dashboard) +- `extract/padelnomics_extract/` — data extraction to local landing zone +- `transform/sqlmesh_padelnomics/` — 4-layer SQL transformation (raw → staging → foundation → serving) +- `src/padelnomics/` — CLI utilities, export_serving helper + +## Skills: invoke these for domain tasks + +### Working on extraction or transformation? + +Use the **`data-engineer`** skill for: +- Designing or reviewing SQLMesh model logic +- Adding a new data source (extract + raw + staging models) +- Performance tuning DuckDB queries +- Data modeling decisions (dimensions, facts, aggregates) +- Understanding the 4-layer architecture + +``` +/data-engineer (or ask Claude to invoke it) +``` + +### Working on the web app UI or frontend? + +Use the **`frontend-design`** skill for UI components, templates, or dashboard layouts. + +### Working on payments or subscriptions? + +Use the **`paddle-integration`** skill for billing, webhooks, and subscription logic. + +## Key commands + +```bash +# Install all dependencies +uv sync --all-packages + +# Lint & format +ruff check . +ruff format . + +# Run tests +uv run pytest tests/ -v + +# Dev server +./scripts/dev_run.sh + +# Extract data +LANDING_DIR=data/landing uv run extract + +# SQLMesh plan + run (from repo root) +uv run sqlmesh -p transform/sqlmesh_padelnomics plan +uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod + +# Export serving tables (run after SQLMesh) +DUCKDB_PATH=local.duckdb SERVING_DUCKDB_PATH=analytics.duckdb \ + uv run python -m padelnomics.export_serving +``` + +## Architecture documentation + +| Topic | File | +|-------|------| +| Extraction patterns, state tracking, adding new sources | `extract/padelnomics_extract/README.md` | +| 4-layer SQLMesh architecture, materialization strategy | `transform/sqlmesh_padelnomics/README.md` | +| Two-file DuckDB architecture (SQLMesh lock isolation) | `src/padelnomics/export_serving.py` docstring | + +## Pipeline data flow + +``` +data/landing/ + └── padelnomics/{year}/{etag}.csv.gz ← extraction output + +local.duckdb ← SQLMesh exclusive (raw → staging → foundation → serving) + +analytics.duckdb ← serving tables only, web app read-only + └── serving.* ← atomically replaced by export_serving.py +``` + +## Environment variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `LANDING_DIR` | `data/landing` | Landing zone root (extraction writes here) | +| `DUCKDB_PATH` | `local.duckdb` | SQLMesh pipeline DB (exclusive write) | +| `SERVING_DUCKDB_PATH` | `analytics.duckdb` | Read-only DB for web app | + +## Coding philosophy + +- **Simple and procedural** — functions over classes, no "Manager" patterns +- **Idempotent operations** — running twice produces the same result +- **Explicit assertions** — assert preconditions at function boundaries +- **Bounded operations** — set timeouts, page limits, buffer sizes + +Read `coding_philosophy.md` (if present) for the full guide. diff --git a/.claude/coding_philosophy.md b/.claude/coding_philosophy.md new file mode 100644 index 0000000..a8b83c8 --- /dev/null +++ b/.claude/coding_philosophy.md @@ -0,0 +1,542 @@ +# Coding Philosophy & Engineering Principles + +This document defines the coding philosophy and engineering principles that guide all agent work. All agents should internalize and follow these principles. + +Influenced by Casey Muratori, Jonathan Blow, and [TigerStyle](https://github.com/tigerbeetle/tigerbeetle/blob/main/docs/TIGER_STYLE.md) (adapted for Python/SQL). + + +**Simple, Direct, Procedural Code** + +- Solve the actual problem, not the general case +- Understand what the computer is doing +- Explicit is better than clever +- Code should be obvious, not impressive +- Do it right the first time — feature gaps are acceptable, but what ships must meet design goals + + + + + +**Prefer:** +- Pure functions that transform data +- Simple procedures that do clear things +- Explicit data structures (dicts, lists, named tuples) + +**Avoid:** +- Classes that are just namespaces for functions +- Objects hiding behavior behind methods +- Inheritance hierarchies +- "Manager" or "Handler" classes + +**Example - Good:** +```python +def calculate_user_metrics(events: list[dict]) -> dict: + """Calculate metrics from event list.""" + total = len(events) + unique_sessions = len(set(e['session_id'] for e in events)) + + return { + 'total_events': total, + 'unique_sessions': unique_sessions, + 'events_per_session': total / unique_sessions if unique_sessions > 0 else 0 + } +``` + +**Example - Bad:** +```python +class UserMetricsCalculator: + def __init__(self): + self._events = [] + + def add_events(self, events: list[dict]): + self._events.extend(events) + + def calculate(self) -> UserMetrics: + return UserMetrics( + total=self._calculate_total(), + sessions=self._calculate_sessions() + ) +``` + + + +**Think about the data:** +- What's the shape of the data? +- How does it flow through the system? +- What transformations are needed? +- What's the memory layout? + +**Data is just data:** +- Use simple structures (dicts, lists, tuples) +- Don't hide data behind getters/setters +- Make data transformations explicit +- Consider performance implications + +**Example - Good:** +```python +# Data is data, functions transform it +users = [ + {'id': 1, 'name': 'Alice', 'active': True}, + {'id': 2, 'name': 'Bob', 'active': False}, +] + +def filter_active(users: list[dict]) -> list[dict]: + return [u for u in users if u['active']] + +active_users = filter_active(users) +``` + +**Example - Bad:** +```python +# Data hidden behind objects +class User: + def __init__(self, id, name, active): + self._id = id + self._name = name + self._active = active + + def get_name(self): + return self._name + + def is_active(self): + return self._active + +users = [User(1, 'Alice', True), User(2, 'Bob', False)] +active_users = [u for u in users if u.is_active()] +``` + + + +**Simple control flow:** +- Straightforward if/else over clever tricks +- Explicit loops over list comprehensions when clearer +- Early returns to reduce nesting +- Avoid deeply nested logic + +**Simple naming:** +- Descriptive variable names (`user_count` not `uc`) +- Function names that say what they do (`calculate_total` not `process`) +- No abbreviations unless universal (`id`, `url`, `sql`) +- Include units in names: `timeout_seconds`, `size_bytes`, `latency_ms` — not `timeout`, `size`, `latency` +- Place qualifiers last in descending significance: `latency_ms_max` not `max_latency_ms` (aligns related variables) + +**Simple structure:** +- Functions should do one thing +- Keep functions short (20-50 lines, hard limit ~70 — must fit on screen without scrolling) +- If it's getting complex, break it up +- But don't break it up "just because" + + + +**Declare variables close to where they're used:** +- Don't introduce variables before they're needed +- Remove them when no longer relevant +- Minimize the number of variables in scope at any point +- Reduces probability of stale-state bugs (check something in one place, use it in another) + +**Don't duplicate state:** +- One source of truth for each piece of data +- Don't create aliases or copies that can drift out of sync +- If you compute a value, use it directly — don't store it in a variable you'll use 50 lines later + + + + + + + +**Start simple:** +- Solve the immediate problem +- Don't build for imagined future requirements +- Add complexity only when actually needed +- Prefer obvious solutions over clever ones + +**Avoid premature abstraction:** +- Duplication is okay early on +- Abstract only when pattern is clear +- Three examples before abstracting +- Question every layer of indirection + +**Zero technical debt:** +- Do it right the first time +- A problem solved in design costs less than one solved in implementation, which costs less than one solved in production +- Feature gaps are acceptable; broken or half-baked code is not + + + +**Be explicit about:** +- Where data comes from +- What transformations happen +- Error conditions and handling +- Dependencies and side effects + +**Avoid magic:** +- Framework conventions that hide behavior +- Implicit configuration +- Action-at-a-distance +- Metaprogramming tricks +- Relying on library defaults — pass options explicitly at call site + + + +**Nothing should run unbounded:** +- Set max retries on network calls +- Set timeouts on all external requests +- Bound loop iterations where data size is unknown +- Set max page counts on paginated API fetches +- Cap queue/buffer sizes + +**Why:** Unbounded operations cause tail latency spikes, resource exhaustion, and silent hangs. A system that fails loudly at a known limit is better than one that degrades mysteriously. + + + +**Before adding a library:** +- Can I write this simply myself? +- What's the complexity budget? +- Am I using 5% of a large framework? +- Is this solving my actual problem? + +**Prefer:** +- Standard library when possible +- Small, focused libraries +- Direct solutions +- Understanding what code does + +**Approved dependencies (earn their place):** +- `msgspec` — struct types and validation at system boundaries (external APIs, user input, + inter-process data). Use `msgspec.Struct` instead of dataclasses when you need: fast + encode/decode, built-in validation, or typed containers for boundary data. + **Rule:** use Structs at boundaries (API responses, HAR entries, MCP tool I/O) — + keep internal plumbing as plain dicts/tuples. + + + + + + + +**Understand:** +- Memory layout matters +- Cache locality matters +- Allocations have cost +- Loops over data can be fast or slow + +**Common issues:** +- N+1 queries (database or API) +- Nested loops over large data +- Copying large structures unnecessarily +- Loading entire datasets into memory + + + +**Think about performance upfront during design, not just after profiling:** +- The largest wins (100-1000x) happen in the design phase +- Back-of-envelope sketch: estimate load across network, disk, memory, CPU +- Optimize for the slowest resource first (network > disk > memory > CPU) +- Compensate for frequency — a cheap operation called 10M times can dominate + +**Batching:** +- Amortize costs via batching (network calls, disk writes, database inserts) +- One batch insert of 1000 rows beats 1000 individual inserts +- Distinguish control plane (rare, can be slow) from data plane (hot path, must be fast) + +**But don't prematurely optimize implementation details:** +- Design for performance, then measure before micro-optimizing +- Make it work, then make it fast +- Optimize the hot path, not everything + + + + + + + +**Assert preconditions, postconditions, and invariants — especially in data pipelines:** + +```python +def normalize_prices(prices: list[dict], currency: str) -> list[dict]: + assert len(prices) > 0, "prices must not be empty" + assert currency in ("USD", "EUR", "BRL"), f"unsupported currency: {currency}" + + result = [convert_price(p, currency) for p in prices] + + assert len(result) == len(prices), "normalization must not drop rows" + assert all(r['currency'] == currency for r in result), "all prices must be in target currency" + return result +``` + +**Guidelines:** +- Assert function arguments and return values at boundaries +- Assert data quality: row counts, non-null columns, expected ranges +- Use assertions to document surprising or critical invariants +- Split compound assertions: `assert a; assert b` not `assert a and b` (clearer error messages) +- Assertions catch programmer errors — they should never be used for expected runtime conditions (use if/else for those) + + + + + + + +**Good:** +```sql +-- Logic is clear, database does the work +SELECT + user_id, + COUNT(*) as event_count, + COUNT(DISTINCT session_id) as session_count, + MAX(event_time) as last_active +FROM events +WHERE event_time >= CURRENT_DATE - 30 +GROUP BY user_id +HAVING COUNT(*) >= 10 +``` + +**Bad:** +```python +# Pulling too much data, doing work in Python +events = db.query("SELECT * FROM events WHERE event_time >= CURRENT_DATE - 30") +user_events = {} +for event in events: # Could be millions of rows! + if event.user_id not in user_events: + user_events[event.user_id] = [] + user_events[event.user_id].append(event) + +results = [] +for user_id, events in user_events.items(): + if len(events) >= 10: + results.append({'user_id': user_id, 'count': len(events)}) +``` + + + +**Write readable SQL:** +- Use CTEs for complex queries +- One concept per CTE +- Descriptive CTE names +- Comments for non-obvious logic + +**Example:** +```sql +WITH active_users AS ( + -- Users who logged in within last 30 days + SELECT DISTINCT user_id + FROM login_events + WHERE login_time >= CURRENT_DATE - 30 +), + +user_activity AS ( + -- Count events for active users + SELECT + e.user_id, + COUNT(*) as event_count + FROM events e + INNER JOIN active_users au ON e.user_id = au.user_id + GROUP BY e.user_id +) + +SELECT + user_id, + event_count, + event_count / 30.0 as avg_daily_events +FROM user_activity +ORDER BY event_count DESC +``` + + + + + + + +**Handle errors explicitly:** +```python +def get_user(user_id: str) -> dict | None: + """Get user by ID. Returns None if not found.""" + result = db.query("SELECT * FROM users WHERE id = ?", [user_id]) + return result[0] if result else None + +def process_user(user_id: str): + user = get_user(user_id) + if user is None: + logger.warning(f"User {user_id} not found") + return None + + # Process user... + return result +``` + +**Don't hide errors:** +```python +# Bad - silently catches everything +try: + result = do_something() +except: + result = None + +# Good - explicit about what can fail +try: + result = do_something() +except ValueError as e: + logger.error(f"Invalid value: {e}") + raise +except ConnectionError as e: + logger.error(f"Connection failed: {e}") + return None +``` + + + +- Validate inputs at boundaries +- Check preconditions early +- Return early on error conditions +- Don't let bad data propagate +- All errors must be handled — 92% of catastrophic system failures come from incorrect handling of non-fatal errors + + + + + + + +- Repository pattern for simple CRUD +- Service layer that just calls the database +- Dependency injection containers +- Abstract factories for concrete things +- Interfaces with one implementation + + + +- ORM hiding N+1 queries +- Decorators doing complex logic +- Metaclass magic +- Convention over configuration (when it hides behavior) + + + +- Creating interfaces "for future flexibility" +- Generics for specific use cases +- Configuration files for hardcoded values +- Plugins systems for known features + + + +- Class hierarchies for classification +- Design patterns "just because" +- Microservices for a small app +- Message queues for synchronous operations + + + + + + + +**Focus on:** +- What the function does (inputs → outputs) +- Edge cases and boundaries +- Error conditions +- Data transformations + +**Don't test:** +- Private implementation details +- Framework internals +- External libraries +- Simple property access + + + +```python +def test_user_aggregation(): + # Arrange - simple, clear test data + events = [ + {'user_id': 'u1', 'event': 'click'}, + {'user_id': 'u1', 'event': 'view'}, + {'user_id': 'u2', 'event': 'click'}, + ] + + # Act - call the function + result = aggregate_user_events(events) + + # Assert - check the behavior + assert result == {'u1': 2, 'u2': 1} +``` + + + +**Test positive and negative space:** +- Test valid inputs produce correct outputs (positive space) +- Test invalid inputs are rejected or handled correctly (negative space) +- For data pipelines: test with realistic data samples AND with malformed/missing data + + + +- Test with real database (DuckDB is fast) +- Test actual SQL queries +- Test end-to-end flows +- Use realistic data samples + + + + + + + +**Comment the "why":** +```python +# Use binary search because list is sorted and can be large (1M+ items) +index = binary_search(sorted_items, target) + +# Cache for 5 minutes - balance freshness vs database load +@cache(ttl=300) +def get_user_stats(user_id): + ... +``` + +**Don't comment the "what":** +```python +# Bad - code is self-explanatory +# Increment the counter +counter += 1 + +# Good - code is clear on its own +counter += 1 +``` + +**Always motivate decisions:** +- Explain why you wrote code the way you did +- Code alone isn't documentation — the reasoning matters +- Comments are well-written prose, not margin scribblings + + + +- Use descriptive names +- Keep functions focused +- Make data flow obvious +- Structure for readability + + + + + +**Key Principles:** +1. **Simple, direct, procedural** — functions over classes +2. **Data-oriented** — understand the data and its flow +3. **Explicit over implicit** — no magic, no hiding +4. **Build minimum that works** — solve actual problems, zero technical debt +5. **Performance conscious** — design for performance, then measure before micro-optimizing +6. **Keep logic in SQL** — let the database do the work +7. **Handle errors explicitly** — no silent failures, all errors handled +8. **Assert invariants** — use assertions to document and enforce correctness +9. **Set limits on everything** — nothing runs unbounded +10. **Question abstractions** — every layer needs justification + +**Ask yourself:** +- Is this the simplest solution? +- Can someone else understand this? +- What is the computer actually doing? +- Am I solving the real problem? +- What are the bounds on this operation? + +When in doubt, go simpler. + diff --git a/.copier-answers.yml b/.copier-answers.yml index 68716fb..9641c2e 100644 --- a/.copier-answers.yml +++ b/.copier-answers.yml @@ -1,5 +1,5 @@ # Changes here will be overwritten by Copier; NEVER EDIT MANUALLY -_commit: 29ac25b +_commit: v0.9.0 _src_path: /home/Deeman/Projects/quart_saas_boilerplate author_email: '' author_name: '' diff --git a/.gitignore b/.gitignore index 42d6f99..ce1a060 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ # Personal / project-root -CLAUDE.md +/CLAUDE.md .bedrockapikey .live-slot .worktrees/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 121f09b..d7d6bea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] ### Added +- Template sync: copier update from `29ac25b` → `v0.9.0` (29 template commits) + - `.claude/CLAUDE.md`: project-specific Claude Code instructions (skills, commands, architecture) + - `.claude/coding_philosophy.md`: engineering principles guide + - `extract/padelnomics_extract/README.md`: extraction patterns & state tracking docs + - `extract/padelnomics_extract/src/padelnomics_extract/utils.py`: SQLite state tracking + (`open_state_db`, `start_run`, `end_run`, `get_last_cursor`) + file I/O helpers + (`landing_path`, `content_hash`, `write_gzip_atomic`) + - `transform/sqlmesh_padelnomics/README.md`: 4-layer SQLMesh architecture guide + - Per-layer model READMEs (raw, staging, foundation, serving) + - `infra/supervisor/`: systemd service + supervisor script for pipeline orchestration +- Copier answers file now includes `enable_daas`, `enable_cms`, `enable_directory`, `enable_i18n` + toggles (prevents accidental deletion on future copier updates) + - Expanded programmatic SEO city coverage from 18 to 40 cities (+22 cities across ES, FR, IT, NL, AT, CH, SE, PT, BE, AE, AU, IE) — generates 80 articles (40 cities × EN + DE) - `scripts/refresh_from_daas.py`: syncs template_data rows from DuckDB `planner_defaults` diff --git a/extract/padelnomics_extract/README.md b/extract/padelnomics_extract/README.md new file mode 100644 index 0000000..5b36d06 --- /dev/null +++ b/extract/padelnomics_extract/README.md @@ -0,0 +1,90 @@ +# Padelnomics Extraction + +Fetches raw data from external sources to the local landing zone. The pipeline then reads from the landing zone — extraction and transformation are fully decoupled. + +## Running + +```bash +# One-shot (most recent data only) +LANDING_DIR=data/landing uv run extract + +# First-time full backfill (add your own backfill entry point) +LANDING_DIR=data/landing uv run python -m padelnomics_extract.execute +``` + +## Design: filesystem as state + +The landing zone is an append-only store of raw files. Each file is named by its content fingerprint (etag or SHA256 hash), so: + +- **Idempotency**: running twice writes nothing if the source hasn't changed +- **Debugging**: every historical raw file is preserved — reprocess any window by re-running transforms +- **Safety**: extraction never mutates existing files, only appends new ones + +### Etag-based dedup (preferred) + +When the source provides an `ETag` header, use it as the filename: + +``` +data/landing/padelnomics/{year}/{month:02d}/{etag}.csv.gz +``` + +The file existing on disk means the content matches the server's current version. No content download needed. + +### Hash-based dedup (fallback) + +When the source has no etag (static files that update in-place), download the content and use its SHA256 prefix as the filename: + +``` +data/landing/padelnomics/{year}/{date}_{sha256[:8]}.csv.gz +``` + +Two runs that produce identical content produce the same hash → same filename → skip. + +## State tracking + +Every run writes one row to `data/landing/.state.sqlite`. Query it to answer operational questions: + +```bash +# When did extraction last succeed? +sqlite3 data/landing/.state.sqlite \ + "SELECT extractor, started_at, status, files_written, files_skipped, cursor_value + FROM extraction_runs ORDER BY run_id DESC LIMIT 10" + +# Did anything fail in the last 7 days? +sqlite3 data/landing/.state.sqlite \ + "SELECT * FROM extraction_runs WHERE status = 'failed' + AND started_at > datetime('now', '-7 days')" +``` + +State table schema: + +| Column | Type | Description | +|--------|------|-------------| +| `run_id` | INTEGER | Auto-increment primary key | +| `extractor` | TEXT | Extractor name (e.g. `padelnomics`) | +| `started_at` | TEXT | ISO 8601 UTC timestamp | +| `finished_at` | TEXT | ISO 8601 UTC timestamp, NULL if still running | +| `status` | TEXT | `running` → `success` or `failed` | +| `files_written` | INTEGER | New files written this run | +| `files_skipped` | INTEGER | Files already present (content unchanged) | +| `bytes_written` | INTEGER | Compressed bytes written | +| `cursor_value` | TEXT | Last successful cursor (date, etag, page, etc.) | +| `error_message` | TEXT | Exception message if status = `failed` | + +## Adding a new extractor + +1. Add a function in `execute.py` following the same pattern as `extract_file_by_etag()` or `extract_file_by_hash()` +2. Call it from `extract_dataset()` with its own `extractor` name in `start_run()` +3. Store files under a new subdirectory: `landing_path(LANDING_DIR, "my_new_source", year)` +4. Add a new SQLMesh `raw/` model that reads from the new subdirectory glob + +## Landing zone structure + +``` +data/landing/ +├── .state.sqlite # extraction run history +└── padelnomics/ # one subdirectory per source + └── {year}/ + └── {month:02d}/ + └── {etag}.csv.gz # immutable, content-addressed files +``` diff --git a/extract/padelnomics_extract/src/padelnomics_extract/utils.py b/extract/padelnomics_extract/src/padelnomics_extract/utils.py new file mode 100644 index 0000000..42a9c44 --- /dev/null +++ b/extract/padelnomics_extract/src/padelnomics_extract/utils.py @@ -0,0 +1,129 @@ +"""Extraction utilities: SQLite state tracking, file I/O helpers. + +These are inline equivalents of the extract_core library used in larger +multi-extractor pipelines. For a single-package project they live here; +if you add multiple data sources, extract them to a shared workspace package. +""" + +import gzip +import hashlib +import sqlite3 +from pathlib import Path + +# --------------------------------------------------------------------------- +# State tracking (SQLite — transactional, stdlib, no extra dependency) +# --------------------------------------------------------------------------- + +_CREATE_TABLE_SQL = """ +CREATE TABLE IF NOT EXISTS extraction_runs ( + run_id INTEGER PRIMARY KEY AUTOINCREMENT, + extractor TEXT NOT NULL, + started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + finished_at TEXT, + status TEXT NOT NULL DEFAULT 'running', + files_written INTEGER DEFAULT 0, + files_skipped INTEGER DEFAULT 0, + bytes_written INTEGER DEFAULT 0, + cursor_value TEXT, + error_message TEXT +) +""" + + +def open_state_db(landing_dir: str | Path) -> sqlite3.Connection: + """Open (or create) .state.sqlite inside landing_dir. + + WAL mode allows concurrent reads while a run is in progress. + Caller is responsible for conn.close(). + """ + db_path = Path(landing_dir) / ".state.sqlite" + db_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute(_CREATE_TABLE_SQL) + conn.commit() + return conn + + +def start_run(conn: sqlite3.Connection, extractor: str) -> int: + """Insert a 'running' row. Returns run_id.""" + cur = conn.execute( + "INSERT INTO extraction_runs (extractor, status) VALUES (?, 'running')", + (extractor,), + ) + conn.commit() + return cur.lastrowid + + +def end_run( + conn: sqlite3.Connection, + run_id: int, + *, + status: str, + files_written: int = 0, + files_skipped: int = 0, + bytes_written: int = 0, + cursor_value: str | None = None, + error_message: str | None = None, +) -> None: + """Update the run row to its final state.""" + assert status in ("success", "failed") + conn.execute( + """ + UPDATE extraction_runs + SET finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), + status = ?, + files_written = ?, + files_skipped = ?, + bytes_written = ?, + cursor_value = ?, + error_message = ? + WHERE run_id = ? + """, + (status, files_written, files_skipped, bytes_written, cursor_value, error_message, run_id), + ) + conn.commit() + + +def get_last_cursor(conn: sqlite3.Connection, extractor: str) -> str | None: + """Return the cursor_value from the most recent successful run, or None.""" + row = conn.execute( + """ + SELECT cursor_value FROM extraction_runs + WHERE extractor = ? AND status = 'success' AND cursor_value IS NOT NULL + ORDER BY run_id DESC LIMIT 1 + """, + (extractor,), + ).fetchone() + return row["cursor_value"] if row else None + + +# --------------------------------------------------------------------------- +# File I/O helpers +# --------------------------------------------------------------------------- + +def landing_path(landing_dir: str | Path, *parts: str) -> Path: + """Return path to a subdirectory of landing_dir, creating it if absent.""" + path = Path(landing_dir).joinpath(*parts) + path.mkdir(parents=True, exist_ok=True) + return path + + +def content_hash(data: bytes, prefix_bytes: int = 8) -> str: + """SHA256 content fingerprint — used as idempotency key in filenames.""" + assert data, "data must not be empty" + return hashlib.sha256(data).hexdigest()[:prefix_bytes] + + +def write_gzip_atomic(path: Path, data: bytes) -> int: + """Gzip compress data and write to path atomically via .tmp sibling. + + Returns bytes written. Atomic write means readers never see a partial file. + """ + assert data, "data must not be empty" + compressed = gzip.compress(data) + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_bytes(compressed) + tmp.rename(path) + return len(compressed) diff --git a/infra/supervisor/padelnomics-supervisor.service b/infra/supervisor/padelnomics-supervisor.service new file mode 100644 index 0000000..ab82da7 --- /dev/null +++ b/infra/supervisor/padelnomics-supervisor.service @@ -0,0 +1,24 @@ +[Unit] +Description=Padelnomics Supervisor — Pipeline Orchestration +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +User=root +WorkingDirectory=/opt/padelnomics +ExecStart=/opt/padelnomics/infra/supervisor/supervisor.sh +Restart=always +RestartSec=10 +EnvironmentFile=/opt/padelnomics/.env +Environment=LANDING_DIR=/data/padelnomics/landing +Environment=DUCKDB_PATH=/data/padelnomics/lakehouse.duckdb + +LimitNOFILE=65536 + +StandardOutput=journal +StandardError=journal +SyslogIdentifier=padelnomics-supervisor + +[Install] +WantedBy=multi-user.target diff --git a/infra/supervisor/supervisor.sh b/infra/supervisor/supervisor.sh new file mode 100644 index 0000000..0376ca6 --- /dev/null +++ b/infra/supervisor/supervisor.sh @@ -0,0 +1,47 @@ +#!/bin/sh +# Padelnomics Supervisor — continuous pipeline orchestration. +# Inspired by TigerBeetle's CFO supervisor: simple, resilient, easy to understand. +# https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh +# +# Environment variables (set in systemd EnvironmentFile or .env): +# LANDING_DIR — local path for extracted landing data +# DUCKDB_PATH — path to DuckDB lakehouse file +# ALERT_WEBHOOK_URL — optional ntfy.sh / Slack / Telegram webhook for failures + +set -eu + +readonly REPO_DIR="/opt/padelnomics" + +while true +do + ( + if ! [ -d "$REPO_DIR/.git" ]; then + echo "Repository not found at $REPO_DIR — bootstrap required!" + exit 1 + fi + + cd "$REPO_DIR" + + # Pull latest code + git fetch origin master + git switch --discard-changes --detach origin/master + uv sync + + # Extract + LANDING_DIR="${LANDING_DIR:-/data/padelnomics/landing}" \ + DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \ + uv run --package padelnomics_extract extract + + # Transform + LANDING_DIR="${LANDING_DIR:-/data/padelnomics/landing}" \ + DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \ + uv run --package sqlmesh_padelnomics sqlmesh run --select-model "serving.*" + + ) || { + if [ -n "${ALERT_WEBHOOK_URL:-}" ]; then + curl -s -d "Padelnomics pipeline failed at $(date)" \ + "$ALERT_WEBHOOK_URL" 2>/dev/null || true + fi + sleep 600 # back off 10 min on failure + } +done diff --git a/transform/sqlmesh_padelnomics/README.md b/transform/sqlmesh_padelnomics/README.md new file mode 100644 index 0000000..3fb4d2d --- /dev/null +++ b/transform/sqlmesh_padelnomics/README.md @@ -0,0 +1,107 @@ +# Padelnomics Transform (SQLMesh) + +4-layer SQL transformation pipeline using SQLMesh + DuckDB. Reads from the landing zone, produces analytics-ready tables consumed by the web app. + +## Running + +```bash +# From repo root — plan all changes (shows what will run) +uv run sqlmesh -p transform/sqlmesh_padelnomics plan + +# Apply to production +uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod + +# Run model tests +uv run sqlmesh -p transform/sqlmesh_padelnomics test + +# Format SQL +uv run sqlmesh -p transform/sqlmesh_padelnomics format +``` + +## 4-layer architecture + +``` +landing/ <- raw files (extraction output) + +-- padelnomics/ + +-- {year}/{etag}.csv.gz + +raw/ <- reads files verbatim + +-- raw.padelnomics + +staging/ <- type casting, deduplication + +-- staging.stg_padelnomics + +foundation/ <- business logic, dimensions, facts + +-- foundation.dim_category + +serving/ <- pre-aggregated for web app + +-- serving.padelnomics_metrics +``` + +### raw/ — verbatim source reads + +- Reads landing zone files directly with `read_csv(..., all_varchar=true)` +- No transformations, no business logic +- Column names match the source exactly +- Uses a macro (`@padelnomics_glob()`) so new landing files are picked up automatically +- Naming: `raw.` + +### staging/ — type casting and cleansing + +- One model per raw model (1:1) +- Cast all columns to correct types: `TRY_CAST(report_date AS DATE)` +- Deduplicate if source produces duplicates +- Minimal renaming — only where raw names are genuinely unclear +- Naming: `staging.stg_` + +### foundation/ — business logic + +- Dimensions (`dim_*`): slowly changing attributes, one row per entity +- Facts (`fact_*`): events and measurements, one row per event +- May join across multiple staging models from different sources +- Surrogate keys: `MD5(business_key)` for stable joins +- Naming: `foundation.dim_`, `foundation.fact_` + +### serving/ — analytics-ready aggregates + +- Pre-aggregated for specific web app query patterns +- These are the only tables the web app reads +- Queried from `analytics.py` via `fetch_analytics()` +- Named to match what the frontend expects +- Naming: `serving.` + +## Adding a new data source + +1. Add a landing zone directory in the extraction package +2. Add a glob macro in `macros/__init__.py`: + ```python + @macro() + def my_source_glob(evaluator) -> str: + landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") + return f"'{landing_dir}/my_source/**/*.csv.gz'" + ``` +3. Add a raw model: `models/raw/raw_my_source.sql` +4. Add a staging model: `models/staging/stg_my_source.sql` +5. Join into foundation or serving models as needed + +## Model materialization + +| Layer | Default kind | Rationale | +|-------|-------------|-----------| +| raw | FULL | Always re-reads all files; cheap with DuckDB parallel scan | +| staging | FULL | 1:1 with raw; same cost | +| foundation | FULL | Business logic rarely changes; recompute is fast | +| serving | FULL | Small aggregates; web app needs latest at all times | + +For large historical tables, switch to `kind INCREMENTAL_BY_TIME_RANGE` with a time partition column. SQLMesh handles the incremental logic automatically. + +## Environment variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `LANDING_DIR` | `data/landing` | Root of the landing zone | +| `DUCKDB_PATH` | `local.duckdb` | DuckDB file (SQLMesh exclusive write access) | + +The web app reads from a **separate** `analytics.duckdb` file via `export_serving.py`. +Never point `DUCKDB_PATH` and `SERVING_DUCKDB_PATH` to the same file — +SQLMesh holds an exclusive write lock during plan/run. diff --git a/transform/sqlmesh_padelnomics/models/foundation/README.md b/transform/sqlmesh_padelnomics/models/foundation/README.md new file mode 100644 index 0000000..9749b20 --- /dev/null +++ b/transform/sqlmesh_padelnomics/models/foundation/README.md @@ -0,0 +1,6 @@ +# foundation + +Business logic layer: dimensions, facts, conformed metrics. +May join across staging models from different sources. + +Naming convention: `foundation.dim_`, `foundation.fact_` diff --git a/transform/sqlmesh_padelnomics/models/raw/README.md b/transform/sqlmesh_padelnomics/models/raw/README.md new file mode 100644 index 0000000..4deaf69 --- /dev/null +++ b/transform/sqlmesh_padelnomics/models/raw/README.md @@ -0,0 +1,6 @@ +# raw + +Read raw landing zone files directly with `read_csv_auto()`. +No transformations — schema as-is from source. + +Naming convention: `raw._` diff --git a/transform/sqlmesh_padelnomics/models/serving/README.md b/transform/sqlmesh_padelnomics/models/serving/README.md new file mode 100644 index 0000000..73cc013 --- /dev/null +++ b/transform/sqlmesh_padelnomics/models/serving/README.md @@ -0,0 +1,6 @@ +# serving + +Analytics-ready views consumed by the web app and programmatic SEO. +Query these from `analytics.py` via DuckDB read-only connection. + +Naming convention: `serving.` (e.g. `serving.city_market_profile`) diff --git a/transform/sqlmesh_padelnomics/models/staging/README.md b/transform/sqlmesh_padelnomics/models/staging/README.md new file mode 100644 index 0000000..7d2dd31 --- /dev/null +++ b/transform/sqlmesh_padelnomics/models/staging/README.md @@ -0,0 +1,6 @@ +# staging + +Type casting, deduplication, null handling on top of raw models. +One staging model per raw model. + +Naming convention: `staging._`