From a1faddbed673188e4e1b69f3d70671cea49cb0a4 Mon Sep 17 00:00:00 2001 From: Deeman Date: Mon, 23 Feb 2026 13:53:45 +0100 Subject: [PATCH 1/3] feat: Python supervisor + feature flags MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Supervisor (replaces supervisor.sh): - supervisor.py — cron-based pipeline orchestration, reads workflows.toml on every tick, runs due extractors in topological waves with parallel execution, then SQLMesh transform + serving export - workflows.toml — workflow registry: overpass (monthly), eurostat (monthly), playtomic_tenants (weekly), playtomic_availability (daily), playtomic_recheck (hourly 6–23) - padelnomics-supervisor.service — updated ExecStart to Python supervisor Extraction enhancements: - proxy.py — optional round-robin/sticky proxy rotation via PROXY_URLS env - playtomic_availability.py — parallel fetch (EXTRACT_WORKERS), recheck mode (main_recheck) re-queries imminent slots for accurate occupancy measurement - _shared.py — realistic browser User-Agent on all extractor sessions - stg_playtomic_availability.sql — reads morning + recheck snapshots, tags each - fct_daily_availability.sql — prefers recheck over morning for same slot Feature flags (replaces WAITLIST_MODE env var): - migration 0019 — feature_flags table, 5 initial flags: markets (on), payments/planner_export/supplier_signup/lead_unlock (off) - core.py — is_flag_enabled() + feature_gate() decorator - routes — payments, markets, planner_export, supplier_signup, lead_unlock gated - admin flags UI — /admin/flags toggle page + nav link - app.py — flag() injected as Jinja2 global Co-Authored-By: Claude Opus 4.6 --- extract/padelnomics_extract/pyproject.toml | 1 + .../src/padelnomics_extract/_shared.py | 8 + .../playtomic_availability.py | 410 +++++++++++++---- .../src/padelnomics_extract/proxy.py | 57 +++ .../supervisor/padelnomics-supervisor.service | 3 +- infra/supervisor/workflows.toml | 33 ++ src/padelnomics/__init__.py | 0 src/padelnomics/supervisor.py | 416 ++++++++++++++++++ .../foundation/fct_daily_availability.sql | 31 +- .../staging/stg_playtomic_availability.sql | 72 ++- uv.lock | 2 + web/pyproject.toml | 1 + web/src/padelnomics/admin/routes.py | 40 ++ .../admin/templates/admin/base_admin.html | 4 + .../admin/templates/admin/flags.html | 72 +++ web/src/padelnomics/app.py | 3 +- web/src/padelnomics/auth/routes.py | 9 +- web/src/padelnomics/content/routes.py | 9 +- web/src/padelnomics/core.py | 54 ++- .../versions/0019_add_feature_flags.py | 28 ++ web/src/padelnomics/planner/routes.py | 4 +- web/src/padelnomics/suppliers/routes.py | 10 +- 22 files changed, 1133 insertions(+), 134 deletions(-) create mode 100644 extract/padelnomics_extract/src/padelnomics_extract/proxy.py create mode 100644 infra/supervisor/workflows.toml create mode 100644 src/padelnomics/__init__.py create mode 100644 src/padelnomics/supervisor.py create mode 100644 web/src/padelnomics/admin/templates/admin/flags.html create mode 100644 web/src/padelnomics/migrations/versions/0019_add_feature_flags.py diff --git a/extract/padelnomics_extract/pyproject.toml b/extract/padelnomics_extract/pyproject.toml index 3fe56b1..da52449 100644 --- a/extract/padelnomics_extract/pyproject.toml +++ b/extract/padelnomics_extract/pyproject.toml @@ -14,6 +14,7 @@ extract-overpass = "padelnomics_extract.overpass:main" extract-eurostat = "padelnomics_extract.eurostat:main" extract-playtomic-tenants = "padelnomics_extract.playtomic_tenants:main" extract-playtomic-availability = "padelnomics_extract.playtomic_availability:main" +extract-playtomic-recheck = "padelnomics_extract.playtomic_availability:main_recheck" [build-system] requires = ["hatchling"] diff --git a/extract/padelnomics_extract/src/padelnomics_extract/_shared.py b/extract/padelnomics_extract/src/padelnomics_extract/_shared.py index 38c9772..4df4355 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/_shared.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/_shared.py @@ -19,6 +19,13 @@ LANDING_DIR = Path(os.environ.get("LANDING_DIR", "data/landing")) HTTP_TIMEOUT_SECONDS = 30 OVERPASS_TIMEOUT_SECONDS = 90 # Overpass can be slow on global queries +# Realistic browser User-Agent — avoids bot detection on all extractors +USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/131.0.0.0 Safari/537.36" +) + def setup_logging(name: str) -> logging.Logger: """Configure and return a logger for the given extractor module.""" @@ -50,6 +57,7 @@ def run_extractor( try: with niquests.Session() as session: + session.headers["User-Agent"] = USER_AGENT result = func(LANDING_DIR, year_month, conn, session) assert isinstance(result, dict), f"extractor must return a dict, got {type(result)}" diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py index 3989062..bdc97bc 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py @@ -5,33 +5,51 @@ unauthenticated /v1/availability endpoint for each venue's next-day slots. This is the highest-value source: daily snapshots enable occupancy rate estimation, pricing benchmarking, and demand signal detection. -API constraint: max 25-hour window per request (see docs/data-sources-inventory.md §2.1). -Rate: 1 req / 2 s (conservative, unauthenticated endpoint). +Parallel mode: set EXTRACT_WORKERS=N and PROXY_URLS=... to fetch N venues +concurrently (one proxy per worker). Without proxies, runs single-threaded. + +Recheck mode: re-queries venues with slots starting within the next 90 minutes. +Writes a separate recheck file for more accurate occupancy measurement. Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.json.gz +Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.json.gz """ import gzip import json +import os import sqlite3 +import threading import time +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import UTC, datetime, timedelta from pathlib import Path import niquests -from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging +from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging +from .proxy import load_proxy_urls, make_round_robin_cycler from .utils import get_last_cursor, landing_path, write_gzip_atomic logger = setup_logging("padelnomics.extract.playtomic_availability") EXTRACTOR_NAME = "playtomic_availability" +RECHECK_EXTRACTOR_NAME = "playtomic_recheck" AVAILABILITY_URL = "https://api.playtomic.io/v1/availability" THROTTLE_SECONDS = 1 -MAX_VENUES_PER_RUN = 10_000 +MAX_VENUES_PER_RUN = 20_000 MAX_RETRIES_PER_VENUE = 2 +MAX_WORKERS = int(os.environ.get("EXTRACT_WORKERS", "1")) +RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "90")) +# Thread-local storage for per-worker sessions +_thread_local = threading.local() + + +# --------------------------------------------------------------------------- +# Tenant ID loading +# --------------------------------------------------------------------------- def _load_tenant_ids(landing_dir: Path) -> list[str]: """Read tenant IDs from the most recent tenants.json.gz file.""" @@ -39,7 +57,6 @@ def _load_tenant_ids(landing_dir: Path) -> list[str]: if not playtomic_dir.exists(): return [] - # Find the most recent tenants.json.gz across all year/month dirs tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True) if not tenant_files: return [] @@ -65,12 +82,10 @@ def _parse_resume_cursor(cursor: str | None, target_date: str) -> int: """Parse cursor_value to find resume index. Returns 0 if no valid cursor.""" if not cursor: return 0 - # cursor format: "{date}:{index}" parts = cursor.split(":", 1) if len(parts) != 2: return 0 cursor_date, cursor_index = parts - # Only resume if cursor is for today's target date if cursor_date != target_date: return 0 try: @@ -79,6 +94,125 @@ def _parse_resume_cursor(cursor: str | None, target_date: str) -> int: return 0 +# --------------------------------------------------------------------------- +# Per-venue fetch (used by both serial and parallel modes) +# --------------------------------------------------------------------------- + +def _get_thread_session(proxy_url: str | None) -> niquests.Session: + """Get or create a thread-local niquests.Session with optional proxy.""" + if not hasattr(_thread_local, "session") or _thread_local.session is None: + session = niquests.Session() + session.headers["User-Agent"] = USER_AGENT + if proxy_url: + session.proxies = {"http": proxy_url, "https": proxy_url} + _thread_local.session = session + return _thread_local.session + + +def _fetch_venue_availability( + tenant_id: str, + start_min_str: str, + start_max_str: str, + proxy_url: str | None, +) -> dict | None: + """Fetch availability for a single venue. Returns payload dict or None on failure.""" + session = _get_thread_session(proxy_url) + params = { + "sport_id": "PADEL", + "tenant_id": tenant_id, + "start_min": start_min_str, + "start_max": start_max_str, + } + + for attempt in range(MAX_RETRIES_PER_VENUE + 1): + try: + resp = session.get(AVAILABILITY_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS) + + if resp.status_code == 429: + wait_seconds = THROTTLE_SECONDS * (attempt + 2) + logger.warning("Rate limited on %s, waiting %ds", tenant_id, wait_seconds) + time.sleep(wait_seconds) + continue + + if resp.status_code >= 500: + logger.warning( + "Server error %d for %s (attempt %d)", + resp.status_code, tenant_id, attempt + 1, + ) + time.sleep(THROTTLE_SECONDS) + continue + + resp.raise_for_status() + time.sleep(THROTTLE_SECONDS) + return {"tenant_id": tenant_id, "slots": resp.json()} + + except niquests.exceptions.RequestException as e: + if attempt < MAX_RETRIES_PER_VENUE: + logger.warning("Request failed for %s (attempt %d): %s", tenant_id, attempt + 1, e) + time.sleep(THROTTLE_SECONDS) + else: + logger.error( + "Giving up on %s after %d attempts: %s", + tenant_id, MAX_RETRIES_PER_VENUE + 1, e, + ) + return None + + return None # all retries exhausted via 429/5xx + + +# --------------------------------------------------------------------------- +# Parallel fetch orchestrator +# --------------------------------------------------------------------------- + +def _fetch_venues_parallel( + tenant_ids: list[str], + start_min_str: str, + start_max_str: str, + worker_count: int, + proxy_cycler, +) -> tuple[list[dict], int]: + """Fetch availability for multiple venues in parallel. + + Returns (venues_data, venues_errored). + """ + venues_data: list[dict] = [] + venues_errored = 0 + completed_count = 0 + lock = threading.Lock() + + def _worker(tenant_id: str) -> dict | None: + proxy_url = proxy_cycler() + return _fetch_venue_availability(tenant_id, start_min_str, start_max_str, proxy_url) + + with ThreadPoolExecutor(max_workers=worker_count) as pool: + futures = {pool.submit(_worker, tid): tid for tid in tenant_ids} + + for future in as_completed(futures): + result = future.result() + with lock: + completed_count += 1 + if result is not None: + venues_data.append(result) + else: + venues_errored += 1 + + if completed_count % 500 == 0: + logger.info( + "Progress: %d/%d venues (%d errors, %d workers)", + completed_count, len(tenant_ids), venues_errored, worker_count, + ) + + logger.info( + "Parallel fetch complete: %d/%d venues (%d errors, %d workers)", + len(venues_data), len(tenant_ids), venues_errored, worker_count, + ) + return venues_data, venues_errored + + +# --------------------------------------------------------------------------- +# Main extraction function +# --------------------------------------------------------------------------- + def extract( landing_dir: Path, year_month: str, @@ -91,7 +225,7 @@ def extract( logger.warning("No tenant IDs found — run extract-playtomic-tenants first") return {"files_written": 0, "files_skipped": 0, "bytes_written": 0} - # Query tomorrow's slots (25-hour window starting at midnight local) + # Query tomorrow's slots tomorrow = datetime.now(UTC) + timedelta(days=1) target_date = tomorrow.strftime("%Y-%m-%d") start_min = tomorrow.replace(hour=0, minute=0, second=0, microsecond=0) @@ -101,117 +235,223 @@ def extract( dest_dir = landing_path(landing_dir, "playtomic", year, month) dest = dest_dir / f"availability_{target_date}.json.gz" - # Check if already completed for this date if dest.exists(): logger.info("Already have %s — skipping", dest) return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} - # Resume from last cursor if we crashed mid-run + # Resume from cursor if crashed mid-run last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) resume_index = _parse_resume_cursor(last_cursor, target_date) if resume_index > 0: logger.info("Resuming from index %d (cursor: %s)", resume_index, last_cursor) - venues_data: list[dict] = [] venues_to_process = tenant_ids[:MAX_VENUES_PER_RUN] - venues_errored = 0 + if resume_index > 0: + venues_to_process = venues_to_process[resume_index:] - for i, tenant_id in enumerate(venues_to_process): - if i < resume_index: - continue + # Determine parallelism + proxy_urls = load_proxy_urls() + worker_count = min(MAX_WORKERS, len(proxy_urls)) if proxy_urls else 1 + proxy_cycler = make_round_robin_cycler(proxy_urls) - params = { - "sport_id": "PADEL", - "tenant_id": tenant_id, - "start_min": start_min.strftime("%Y-%m-%dT%H:%M:%S"), - "start_max": start_max.strftime("%Y-%m-%dT%H:%M:%S"), - } + start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S") + start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S") - for attempt in range(MAX_RETRIES_PER_VENUE + 1): - try: - resp = session.get(AVAILABILITY_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS) - - if resp.status_code == 429: - # Rate limited — back off and retry - wait_seconds = THROTTLE_SECONDS * (attempt + 2) - logger.warning("Rate limited on %s, waiting %ds", tenant_id, wait_seconds) - time.sleep(wait_seconds) - continue - - if resp.status_code >= 500: - logger.warning( - "Server error %d for %s (attempt %d)", - resp.status_code, - tenant_id, - attempt + 1, - ) - time.sleep(THROTTLE_SECONDS) - continue - - resp.raise_for_status() - venues_data.append({"tenant_id": tenant_id, "slots": resp.json()}) - break - - except niquests.exceptions.RequestException as e: - if attempt < MAX_RETRIES_PER_VENUE: - logger.warning( - "Request failed for %s (attempt %d): %s", tenant_id, attempt + 1, e - ) - time.sleep(THROTTLE_SECONDS) - else: - logger.error( - "Giving up on %s after %d attempts: %s", - tenant_id, - MAX_RETRIES_PER_VENUE + 1, - e, - ) - venues_errored += 1 - else: - # All retries exhausted (loop completed without break) - venues_errored += 1 - - if (i + 1) % 100 == 0: - logger.info( - "Progress: %d/%d venues queried, %d errors", - i + 1, - len(venues_to_process), - venues_errored, + if worker_count > 1: + logger.info("Parallel mode: %d workers, %d proxies", worker_count, len(proxy_urls)) + venues_data, venues_errored = _fetch_venues_parallel( + venues_to_process, start_min_str, start_max_str, worker_count, proxy_cycler, + ) + else: + # Serial mode — same as before but uses shared fetch function + logger.info("Serial mode: 1 worker, %d venues", len(venues_to_process)) + venues_data = [] + venues_errored = 0 + for i, tenant_id in enumerate(venues_to_process): + result = _fetch_venue_availability( + tenant_id, start_min_str, start_max_str, proxy_cycler(), ) + if result is not None: + venues_data.append(result) + else: + venues_errored += 1 - time.sleep(THROTTLE_SECONDS) + if (i + 1) % 100 == 0: + logger.info( + "Progress: %d/%d venues queried, %d errors", + i + 1, len(venues_to_process), venues_errored, + ) # Write consolidated file captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") - payload = json.dumps( - { - "date": target_date, - "captured_at_utc": captured_at, - "venue_count": len(venues_data), - "venues_errored": venues_errored, - "venues": venues_data, - } - ).encode() + payload = json.dumps({ + "date": target_date, + "captured_at_utc": captured_at, + "venue_count": len(venues_data), + "venues_errored": venues_errored, + "venues": venues_data, + }).encode() bytes_written = write_gzip_atomic(dest, payload) logger.info( "%d venues scraped (%d errors) -> %s (%s bytes)", - len(venues_data), - venues_errored, - dest, - f"{bytes_written:,}", + len(venues_data), venues_errored, dest, f"{bytes_written:,}", ) return { "files_written": 1, "files_skipped": 0, "bytes_written": bytes_written, - "cursor_value": f"{target_date}:{len(venues_to_process)}", + "cursor_value": f"{target_date}:{len(tenant_ids[:MAX_VENUES_PER_RUN])}", } +# --------------------------------------------------------------------------- +# Recheck mode — re-query venues with upcoming slots for accurate occupancy +# --------------------------------------------------------------------------- + +def _load_morning_availability(landing_dir: Path, target_date: str) -> dict | None: + """Load today's morning availability file. Returns parsed JSON or None.""" + playtomic_dir = landing_dir / "playtomic" + # Search across year/month dirs for the target date + matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.json.gz")) + if not matches: + return None + + with gzip.open(matches[0], "rb") as f: + return json.loads(f.read()) + + +def _find_venues_with_upcoming_slots( + morning_data: dict, window_start: datetime, window_end: datetime +) -> list[str]: + """Find tenant_ids that have available slots starting within the recheck window.""" + tenant_ids = set() + for venue in morning_data.get("venues", []): + tid = venue.get("tenant_id") + if not tid: + continue + for resource in venue.get("slots", []): + for slot in resource.get("slots", []): + start_time_str = slot.get("start_time") + if not start_time_str: + continue + try: + # Parse "2026-02-24T17:00:00" format + slot_start = datetime.fromisoformat(start_time_str).replace(tzinfo=UTC) + if window_start <= slot_start < window_end: + tenant_ids.add(tid) + break # found one upcoming slot, no need to check more + except ValueError: + continue + if tid in tenant_ids: + break # already found upcoming slot for this venue + + return sorted(tenant_ids) + + +def extract_recheck( + landing_dir: Path, + year_month: str, + conn: sqlite3.Connection, + session: niquests.Session, +) -> dict: + """Re-query venues with slots starting soon for accurate occupancy data.""" + now = datetime.now(UTC) + target_date = now.strftime("%Y-%m-%d") + + # Also check tomorrow if it's late evening + tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d") + + morning_data = _load_morning_availability(landing_dir, target_date) + if morning_data is None: + # Try tomorrow's file (morning extraction creates it for tomorrow) + morning_data = _load_morning_availability(landing_dir, tomorrow) + if morning_data is None: + logger.info("No morning availability file found — skipping recheck") + return {"files_written": 0, "files_skipped": 0, "bytes_written": 0} + target_date = tomorrow + + # Find venues with slots in the upcoming window + window_start = now + window_end = now + timedelta(minutes=RECHECK_WINDOW_MINUTES) + venues_to_recheck = _find_venues_with_upcoming_slots(morning_data, window_start, window_end) + + if not venues_to_recheck: + logger.info("No venues with upcoming slots in next %d min — skipping", RECHECK_WINDOW_MINUTES) + return {"files_written": 0, "files_skipped": 0, "bytes_written": 0} + + logger.info( + "Rechecking %d venues with slots starting in next %d min", + len(venues_to_recheck), RECHECK_WINDOW_MINUTES, + ) + + # Fetch availability for the recheck window + start_min_str = window_start.strftime("%Y-%m-%dT%H:%M:%S") + start_max_str = window_end.strftime("%Y-%m-%dT%H:%M:%S") + + # Determine parallelism + proxy_urls = load_proxy_urls() + worker_count = min(MAX_WORKERS, len(proxy_urls)) if proxy_urls else 1 + proxy_cycler = make_round_robin_cycler(proxy_urls) + + if worker_count > 1 and len(venues_to_recheck) > 10: + venues_data, venues_errored = _fetch_venues_parallel( + venues_to_recheck, start_min_str, start_max_str, worker_count, proxy_cycler, + ) + else: + venues_data = [] + venues_errored = 0 + for tid in venues_to_recheck: + result = _fetch_venue_availability(tid, start_min_str, start_max_str, proxy_cycler()) + if result is not None: + venues_data.append(result) + else: + venues_errored += 1 + + # Write recheck file + recheck_hour = now.hour + year, month = year_month.split("/") + dest_dir = landing_path(landing_dir, "playtomic", year, month) + dest = dest_dir / f"availability_{target_date}_recheck_{recheck_hour:02d}.json.gz" + + captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") + payload = json.dumps({ + "date": target_date, + "captured_at_utc": captured_at, + "recheck_hour": recheck_hour, + "recheck_window_minutes": RECHECK_WINDOW_MINUTES, + "rechecked_tenant_ids": venues_to_recheck, + "venue_count": len(venues_data), + "venues_errored": venues_errored, + "venues": venues_data, + }).encode() + + bytes_written = write_gzip_atomic(dest, payload) + logger.info( + "Recheck: %d/%d venues (%d errors) -> %s (%s bytes)", + len(venues_data), len(venues_to_recheck), venues_errored, dest, f"{bytes_written:,}", + ) + + return { + "files_written": 1, + "files_skipped": 0, + "bytes_written": bytes_written, + "cursor_value": f"{target_date}:recheck:{recheck_hour}", + } + + +# --------------------------------------------------------------------------- +# Entry points +# --------------------------------------------------------------------------- + def main() -> None: run_extractor(EXTRACTOR_NAME, extract) +def main_recheck() -> None: + run_extractor(RECHECK_EXTRACTOR_NAME, extract_recheck) + + if __name__ == "__main__": main() diff --git a/extract/padelnomics_extract/src/padelnomics_extract/proxy.py b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py new file mode 100644 index 0000000..b49cf85 --- /dev/null +++ b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py @@ -0,0 +1,57 @@ +"""Optional proxy rotation for parallel HTTP fetching. + +Proxies are configured via the PROXY_URLS environment variable (comma-separated). +When unset, all functions return None/no-op — extractors fall back to direct requests. + +Two routing modes: + round-robin — distribute requests evenly across proxies (default) + sticky — same key always maps to same proxy (for session-tracked sites) +""" + +import itertools +import os +import threading + + +def load_proxy_urls() -> list[str]: + """Read PROXY_URLS env var (comma-separated). Returns [] if unset. + + Format: http://user:pass@host:port or socks5://host:port + """ + raw = os.environ.get("PROXY_URLS", "") + urls = [u.strip() for u in raw.split(",") if u.strip()] + return urls + + +def make_round_robin_cycler(proxy_urls: list[str]): + """Thread-safe round-robin proxy cycler. + + Returns a callable: next_proxy() -> str | None + Returns None-returning callable if no proxies configured. + """ + if not proxy_urls: + return lambda: None + + cycle = itertools.cycle(proxy_urls) + lock = threading.Lock() + + def next_proxy() -> str: + with lock: + return next(cycle) + + return next_proxy + + +def make_sticky_selector(proxy_urls: list[str]): + """Consistent-hash proxy selector — same key always maps to same proxy. + + Use when the target site tracks sessions by IP (e.g. Cloudflare). + Returns a callable: select_proxy(key: str) -> str | None + """ + if not proxy_urls: + return lambda key: None + + def select_proxy(key: str) -> str: + return proxy_urls[hash(key) % len(proxy_urls)] + + return select_proxy diff --git a/infra/supervisor/padelnomics-supervisor.service b/infra/supervisor/padelnomics-supervisor.service index ac05293..169fff5 100644 --- a/infra/supervisor/padelnomics-supervisor.service +++ b/infra/supervisor/padelnomics-supervisor.service @@ -7,10 +7,11 @@ Wants=network-online.target Type=simple User=root WorkingDirectory=/opt/padelnomics -ExecStart=/opt/padelnomics/infra/supervisor/supervisor.sh +ExecStart=/bin/sh -c 'exec uv run python src/padelnomics/supervisor.py' Restart=always RestartSec=10 EnvironmentFile=/opt/padelnomics/.env +Environment=PATH=/root/.local/bin:/usr/local/bin:/usr/bin:/bin Environment=LANDING_DIR=/data/padelnomics/landing Environment=DUCKDB_PATH=/data/padelnomics/lakehouse.duckdb Environment=SERVING_DUCKDB_PATH=/data/padelnomics/analytics.duckdb diff --git a/infra/supervisor/workflows.toml b/infra/supervisor/workflows.toml new file mode 100644 index 0000000..fc2e9da --- /dev/null +++ b/infra/supervisor/workflows.toml @@ -0,0 +1,33 @@ +# Workflow registry — the supervisor reads this file on every tick. +# To add a new extractor: add a [section] here and create the Python module. +# +# Fields: +# module — Python module path (must have a main() function) +# schedule — named preset ("hourly", "daily", "weekly", "monthly") +# or raw cron expression (e.g. "0 6-23 * * *") +# entry — optional: function name if not "main" (default: "main") +# depends_on — optional: list of workflow names that must run first +# proxy_mode — optional: "round-robin" (default) or "sticky" + +[overpass] +module = "padelnomics_extract.overpass" +schedule = "monthly" + +[eurostat] +module = "padelnomics_extract.eurostat" +schedule = "monthly" + +[playtomic_tenants] +module = "padelnomics_extract.playtomic_tenants" +schedule = "weekly" + +[playtomic_availability] +module = "padelnomics_extract.playtomic_availability" +schedule = "daily" +depends_on = ["playtomic_tenants"] + +[playtomic_recheck] +module = "padelnomics_extract.playtomic_availability" +entry = "main_recheck" +schedule = "0 6-23 * * *" +depends_on = ["playtomic_availability"] diff --git a/src/padelnomics/__init__.py b/src/padelnomics/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/padelnomics/supervisor.py b/src/padelnomics/supervisor.py new file mode 100644 index 0000000..e864f05 --- /dev/null +++ b/src/padelnomics/supervisor.py @@ -0,0 +1,416 @@ +"""Padelnomics Supervisor — schedule-aware pipeline orchestration. + +Replaces supervisor.sh with a Python supervisor that reads a TOML workflow +registry, runs extractors on cron-based schedules (with dependency ordering +and parallel execution), then runs SQLMesh transform + export. + +Crash safety: the main loop catches all exceptions and backs off, matching +the TigerBeetle CFO supervisor pattern. Combined with systemd Restart=always, +the supervisor is effectively unkillable. + +Usage: + # Run the supervisor loop (production) + LANDING_DIR=data/landing uv run python src/padelnomics/supervisor.py + + # Show workflow status + LANDING_DIR=data/landing uv run python src/padelnomics/supervisor.py status +""" + +import importlib +import logging +import os +import subprocess +import sys +import time +import tomllib +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import UTC, datetime +from pathlib import Path + +from croniter import croniter + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +TICK_INTERVAL_SECONDS = 60 +BACKOFF_SECONDS = 600 # 10 min on tick failure (matches shell version) +SUBPROCESS_TIMEOUT_SECONDS = 14400 # 4 hours max per subprocess +REPO_DIR = Path(os.getenv("REPO_DIR", "/opt/padelnomics")) +LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing")) +DUCKDB_PATH = os.getenv("DUCKDB_PATH", "data/lakehouse.duckdb") +SERVING_DUCKDB_PATH = os.getenv("SERVING_DUCKDB_PATH", "analytics.duckdb") +ALERT_WEBHOOK_URL = os.getenv("ALERT_WEBHOOK_URL", "") +WORKFLOWS_PATH = Path(os.getenv("WORKFLOWS_PATH", "infra/supervisor/workflows.toml")) + +NAMED_SCHEDULES = { + "hourly": "0 * * * *", + "daily": "0 5 * * *", + "weekly": "0 3 * * 1", + "monthly": "0 4 1 * *", +} + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger("padelnomics.supervisor") + + +# --------------------------------------------------------------------------- +# State DB helpers (reuse extraction state DB) +# --------------------------------------------------------------------------- + +def _open_state_db(): + """Open the extraction state DB. Reuses the same .state.sqlite as extractors.""" + # Import here to avoid circular deps at module level + from padelnomics_extract.utils import open_state_db + + return open_state_db(LANDING_DIR) + + +def _get_last_success_time(conn, workflow_name: str) -> datetime | None: + """Return the finish time of the last successful run, or None.""" + row = conn.execute( + "SELECT MAX(finished_at) AS t FROM extraction_runs " + "WHERE extractor = ? AND status = 'success'", + (workflow_name,), + ).fetchone() + if not row or not row["t"]: + return None + return datetime.fromisoformat(row["t"]).replace(tzinfo=UTC) + + +# --------------------------------------------------------------------------- +# Workflow loading + scheduling +# --------------------------------------------------------------------------- + +def load_workflows(path: Path) -> list[dict]: + """Load workflow definitions from TOML file.""" + assert path.exists(), f"Workflows file not found: {path}" + with open(path, "rb") as f: + data = tomllib.load(f) + + workflows = [] + for name, cfg in data.items(): + assert "module" in cfg, f"Workflow '{name}' missing 'module'" + assert "schedule" in cfg, f"Workflow '{name}' missing 'schedule'" + workflows.append({ + "name": name, + "module": cfg["module"], + "entry": cfg.get("entry", "main"), + "schedule": cfg["schedule"], + "depends_on": cfg.get("depends_on", []), + "proxy_mode": cfg.get("proxy_mode", "round-robin"), + }) + return workflows + + +def resolve_schedule(schedule: str) -> str: + """Resolve a named schedule to a cron expression, or pass through raw cron.""" + return NAMED_SCHEDULES.get(schedule, schedule) + + +def is_due(conn, workflow: dict) -> bool: + """Check if the most recent cron trigger hasn't been served yet.""" + cron_expr = resolve_schedule(workflow["schedule"]) + assert croniter.is_valid(cron_expr), f"Invalid cron: {cron_expr} for {workflow['name']}" + + last_success = _get_last_success_time(conn, workflow["name"]) + if last_success is None: + return True # never ran + + now_naive = datetime.now(UTC).replace(tzinfo=None) + prev_trigger = croniter(cron_expr, now_naive).get_prev(datetime).replace(tzinfo=UTC) + return last_success < prev_trigger + + +# --------------------------------------------------------------------------- +# Topological ordering +# --------------------------------------------------------------------------- + +def topological_waves(workflows: list[dict]) -> list[list[dict]]: + """Group workflows into dependency waves for parallel execution. + + Wave 0: no deps. Wave 1: depends only on wave 0. Etc. + Workflows whose dependencies aren't in the 'due' set are treated as having no deps. + """ + name_to_wf = {w["name"]: w for w in workflows} + due_names = set(name_to_wf.keys()) + + # Build in-degree map (only count deps that are in the due set) + in_degree: dict[str, int] = {} + dependents: dict[str, list[str]] = defaultdict(list) + for w in workflows: + deps_in_scope = [d for d in w["depends_on"] if d in due_names] + in_degree[w["name"]] = len(deps_in_scope) + for d in deps_in_scope: + dependents[d].append(w["name"]) + + waves = [] + remaining = set(due_names) + max_iterations = len(workflows) + 1 # safety bound + + for _ in range(max_iterations): + if not remaining: + break + # Wave = all workflows with in_degree 0 + wave = [name_to_wf[n] for n in remaining if in_degree[n] == 0] + assert wave, f"Circular dependency detected among: {remaining}" + waves.append(wave) + for w in wave: + remaining.discard(w["name"]) + for dep in dependents[w["name"]]: + if dep in remaining: + in_degree[dep] -= 1 + + return waves + + +# --------------------------------------------------------------------------- +# Workflow execution +# --------------------------------------------------------------------------- + +def run_workflow(conn, workflow: dict) -> None: + """Run a single workflow by importing its module and calling the entry function.""" + module_name = workflow["module"] + entry_name = workflow["entry"] + + logger.info("Running workflow: %s (%s.%s)", workflow["name"], module_name, entry_name) + + try: + module = importlib.import_module(module_name) + entry_fn = getattr(module, entry_name) + entry_fn() + logger.info("Workflow %s completed successfully", workflow["name"]) + except Exception: + logger.exception("Workflow %s failed", workflow["name"]) + send_alert(f"Workflow '{workflow['name']}' failed") + raise + + +def run_due_workflows(conn, workflows: list[dict]) -> bool: + """Run all due workflows. Independent ones run in parallel. Returns True if any ran.""" + due = [w for w in workflows if is_due(conn, w)] + if not due: + logger.info("No workflows due") + return False + + logger.info("Due workflows: %s", [w["name"] for w in due]) + waves = topological_waves(due) + + for i, wave in enumerate(waves): + wave_names = [w["name"] for w in wave] + logger.info("Wave %d: %s", i, wave_names) + + if len(wave) == 1: + try: + run_workflow(conn, wave[0]) + except Exception: + pass # already logged in run_workflow + else: + with ThreadPoolExecutor(max_workers=len(wave)) as pool: + futures = {pool.submit(run_workflow, conn, w): w for w in wave} + for future in as_completed(futures): + try: + future.result() + except Exception: + pass # already logged in run_workflow + + return True + + +# --------------------------------------------------------------------------- +# Transform + Export + Deploy +# --------------------------------------------------------------------------- + +def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> bool: + """Run a shell command. Returns True on success.""" + logger.info("Shell: %s", cmd) + result = subprocess.run( + cmd, shell=True, capture_output=True, text=True, timeout=timeout_seconds + ) + if result.returncode != 0: + logger.error("Shell failed (rc=%d): %s\nstdout: %s\nstderr: %s", + result.returncode, cmd, result.stdout[-500:], result.stderr[-500:]) + return False + return True + + +def run_transform() -> None: + """Run SQLMesh — it evaluates model staleness internally.""" + logger.info("Running SQLMesh transform") + ok = run_shell( + f"uv run sqlmesh -p transform/sqlmesh_padelnomics run", + ) + if not ok: + send_alert("SQLMesh transform failed") + + +def run_export() -> None: + """Export serving tables to analytics.duckdb.""" + logger.info("Exporting serving tables") + ok = run_shell( + f"DUCKDB_PATH={DUCKDB_PATH} SERVING_DUCKDB_PATH={SERVING_DUCKDB_PATH} " + f"uv run python src/padelnomics/export_serving.py" + ) + if not ok: + send_alert("Serving export failed") + + +def web_code_changed() -> bool: + """Check if web app code changed since last deploy (after git pull).""" + result = subprocess.run( + ["git", "diff", "--name-only", "HEAD~1", "HEAD", "--", "web/", "Dockerfile"], + capture_output=True, text=True, timeout=30, + ) + return bool(result.stdout.strip()) + + +def git_pull_and_sync() -> None: + """Pull latest code and sync dependencies.""" + run_shell("git fetch origin master") + run_shell("git switch --discard-changes --detach origin/master") + run_shell("uv sync --all-packages") + + +# --------------------------------------------------------------------------- +# Alerting +# --------------------------------------------------------------------------- + +def send_alert(message: str) -> None: + """Send failure alert via webhook (ntfy.sh / Slack / Telegram).""" + if not ALERT_WEBHOOK_URL: + return + timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC") + try: + subprocess.run( + ["curl", "-s", "-d", f"[{timestamp}] {message}", ALERT_WEBHOOK_URL], + timeout=10, capture_output=True, + ) + except Exception: + logger.exception("Failed to send alert") + + +# --------------------------------------------------------------------------- +# Main loop +# --------------------------------------------------------------------------- + +def tick() -> None: + """One cycle: check schedules, run what's due, transform, export.""" + workflows = load_workflows(WORKFLOWS_PATH) + conn = _open_state_db() + + try: + # Git pull + sync (production only) + if os.getenv("SUPERVISOR_GIT_PULL"): + git_pull_and_sync() + + # Run due extractors + run_due_workflows(conn, workflows) + + # SQLMesh always runs (evaluates staleness internally) + run_transform() + + # Export serving tables + run_export() + + # Deploy web app if code changed + if os.getenv("SUPERVISOR_GIT_PULL") and web_code_changed(): + logger.info("Web code changed — deploying") + run_shell("./deploy.sh") + finally: + conn.close() + + +def supervisor_loop() -> None: + """Infinite supervisor loop — never exits unless killed.""" + logger.info("Supervisor starting (tick interval: %ds)", TICK_INTERVAL_SECONDS) + logger.info("Workflows: %s", WORKFLOWS_PATH) + logger.info("Landing dir: %s", LANDING_DIR) + + while True: + try: + tick() + except KeyboardInterrupt: + logger.info("Supervisor stopped (KeyboardInterrupt)") + break + except Exception: + logger.exception("Supervisor tick failed — backing off %ds", BACKOFF_SECONDS) + send_alert("Supervisor tick failed") + time.sleep(BACKOFF_SECONDS) + else: + time.sleep(TICK_INTERVAL_SECONDS) + + +# --------------------------------------------------------------------------- +# Status CLI +# --------------------------------------------------------------------------- + +def print_status() -> None: + """Print workflow status table.""" + workflows = load_workflows(WORKFLOWS_PATH) + conn = _open_state_db() + + now = datetime.now(UTC) + + # Header + print(f"{'Workflow':<28} {'Schedule':<18} {'Last Run':<20} {'Status':<8} {'Next'}") + print(f"{'─' * 28} {'─' * 18} {'─' * 20} {'─' * 8} {'─' * 12}") + + for w in workflows: + last_success = _get_last_success_time(conn, w["name"]) + cron_expr = resolve_schedule(w["schedule"]) + + # Last run info + if last_success: + last_str = last_success.strftime("%Y-%m-%d %H:%M") + status = "ok" + else: + last_str = "never" + status = "pending" + + # Last failure check + row = conn.execute( + "SELECT MAX(finished_at) AS t FROM extraction_runs " + "WHERE extractor = ? AND status = 'failed'", + (w["name"],), + ).fetchone() + if row and row["t"]: + last_fail = datetime.fromisoformat(row["t"]).replace(tzinfo=UTC) + if last_success is None or last_fail > last_success: + status = "FAILED" + + # Next trigger (croniter returns naive datetimes — treat as UTC) + now_naive = now.replace(tzinfo=None) + next_trigger = croniter(cron_expr, now_naive).get_next(datetime) + delta = next_trigger - now_naive + if delta.total_seconds() < 3600: + next_str = f"in {int(delta.total_seconds() / 60)}m" + elif delta.total_seconds() < 86400: + next_str = next_trigger.strftime("%H:%M") + else: + next_str = next_trigger.strftime("%b %d") + + schedule_display = w["schedule"] if w["schedule"] in NAMED_SCHEDULES else cron_expr + print(f"{w['name']:<28} {schedule_display:<18} {last_str:<20} {status:<8} {next_str}") + + conn.close() + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def main() -> None: + if len(sys.argv) > 1 and sys.argv[1] == "status": + print_status() + else: + supervisor_loop() + + +if __name__ == "__main__": + main() diff --git a/transform/sqlmesh_padelnomics/models/foundation/fct_daily_availability.sql b/transform/sqlmesh_padelnomics/models/foundation/fct_daily_availability.sql index 4a27e61..c211290 100644 --- a/transform/sqlmesh_padelnomics/models/foundation/fct_daily_availability.sql +++ b/transform/sqlmesh_padelnomics/models/foundation/fct_daily_availability.sql @@ -3,6 +3,10 @@ -- per-day statistics, then calculates occupancy by comparing available hours -- against total capacity from fct_venue_capacity. -- +-- Recheck-aware occupancy: for each (tenant, resource, slot_start_time), +-- prefer the latest snapshot (recheck > morning). A slot present in morning +-- but absent in the recheck = booked between snapshots → more accurate occupancy. +-- -- Occupancy = 1 - (available_court_hours / capacity_court_hours_per_day) -- Revenue estimate = booked_court_hours × avg_price_of_available_slots -- @@ -15,14 +19,31 @@ MODEL ( grain (snapshot_date, tenant_id) ); -WITH slot_agg AS ( +-- Prefer the latest snapshot for each slot: +-- If a recheck exists for a (date, tenant, resource, start_time), use it. +-- Otherwise fall back to the morning snapshot. +WITH ranked_slots AS ( + SELECT + a.*, + ROW_NUMBER() OVER ( + PARTITION BY a.snapshot_date, a.tenant_id, a.resource_id, a.slot_start_time + ORDER BY + CASE WHEN a.snapshot_type = 'recheck' THEN 1 ELSE 2 END, + a.captured_at_utc DESC + ) AS rn + FROM staging.stg_playtomic_availability a + WHERE a.price_amount IS NOT NULL + AND a.price_amount > 0 +), +latest_slots AS ( + SELECT * FROM ranked_slots WHERE rn = 1 +), +slot_agg AS ( SELECT a.snapshot_date, a.tenant_id, - -- Slot counts: each row is one 60-min available slot on one court COUNT(*) AS available_slot_count, COUNT(DISTINCT a.resource_id) AS courts_with_availability, - -- Available (unbooked) court-hours: slots are on 30-min increments for 60-min bookings -- Each available start_time represents a 60-min bookable window ROUND(COUNT(*) * 1.0, 2) AS available_court_hours, -- Pricing stats (60-min slots only) @@ -42,9 +63,7 @@ WITH slot_agg AS ( ), 2) AS median_price_offpeak, MAX(a.price_currency) AS price_currency, MAX(a.captured_at_utc) AS captured_at_utc - FROM staging.stg_playtomic_availability a - WHERE a.price_amount IS NOT NULL - AND a.price_amount > 0 + FROM latest_slots a GROUP BY a.snapshot_date, a.tenant_id ) SELECT diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql index f6dd40b..65ecd85 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql @@ -2,28 +2,30 @@ -- One row per available 60-minute booking slot per court per venue per day. -- "Available" = the slot was NOT booked at capture time. Missing slots = booked. -- --- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit). --- The API returns 60/90/120-min variants per start_time — filtering to 60 avoids --- double-counting the same time window. +-- Reads BOTH morning snapshots and recheck files: +-- Morning: availability_{date}.json.gz → snapshot_type = 'morning' +-- Recheck: availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck' -- +-- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit). -- Price parsed from strings like "14.56 EUR" or "48 GBP". -- -- Requires: at least one availability file in the landing zone. -- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz) -- with empty venues[] ensures this model runs before real data arrives. --- --- Source: data/landing/playtomic/{year}/{month}/availability_{date}.json.gz --- Format: {date, captured_at_utc, venues: [{tenant_id, slots: [{resource_id, start_date, slots: [...]}]}]} MODEL ( name staging.stg_playtomic_availability, kind FULL, cron '@daily', - grain (snapshot_date, tenant_id, resource_id, slot_start_time) + grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc) ); -WITH raw_files AS ( - SELECT * +-- Morning snapshots (filename does NOT contain '_recheck_') +WITH morning_files AS ( + SELECT + *, + 'morning' AS snapshot_type, + NULL::INTEGER AS recheck_hour FROM read_json( @LANDING_DIR || '/playtomic/*/*/availability_*.json.gz', format = 'auto', @@ -31,24 +33,57 @@ WITH raw_files AS ( date: 'VARCHAR', captured_at_utc: 'VARCHAR', venues: 'JSON[]' - } + }, + filename = true + ) + WHERE filename NOT LIKE '%_recheck_%' + AND venues IS NOT NULL + AND json_array_length(venues) > 0 +), +-- Recheck snapshots (filename contains '_recheck_') +-- Use TRY_CAST on a regex-extracted hour to get the recheck_hour. +-- If no recheck files exist yet, this CTE produces zero rows (safe). +recheck_files AS ( + SELECT + *, + 'recheck' AS snapshot_type, + TRY_CAST( + regexp_extract(filename, '_recheck_(\d+)', 1) AS INTEGER + ) AS recheck_hour + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz', + format = 'auto', + columns = { + date: 'VARCHAR', + captured_at_utc: 'VARCHAR', + venues: 'JSON[]' + }, + filename = true ) WHERE venues IS NOT NULL AND json_array_length(venues) > 0 ), +all_files AS ( + SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM morning_files + UNION ALL + SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM recheck_files +), raw_venues AS ( SELECT - rf.date AS snapshot_date, - rf.captured_at_utc, + af.date AS snapshot_date, + af.captured_at_utc, + af.snapshot_type, + af.recheck_hour, venue_json - FROM raw_files rf, - LATERAL UNNEST(rf.venues) AS t(venue_json) + FROM all_files af, + LATERAL UNNEST(af.venues) AS t(venue_json) ), --- Each venue has: {tenant_id, slots: [{resource_id, start_date, slots: [...]}]} raw_resources AS ( SELECT rv.snapshot_date, rv.captured_at_utc, + rv.snapshot_type, + rv.recheck_hour, rv.venue_json ->> 'tenant_id' AS tenant_id, resource_json FROM raw_venues rv, @@ -56,11 +91,12 @@ raw_resources AS ( from_json(rv.venue_json -> 'slots', '["JSON"]') ) AS t(resource_json) ), --- Each resource has: {resource_id, start_date, slots: [{start_time, duration, price}]} raw_slots AS ( SELECT rr.snapshot_date, rr.captured_at_utc, + rr.snapshot_type, + rr.recheck_hour, rr.tenant_id, rr.resource_json ->> 'resource_id' AS resource_id, slot_json @@ -75,12 +111,12 @@ SELECT resource_id, slot_json ->> 'start_time' AS slot_start_time, TRY_CAST(slot_json ->> 'duration' AS INTEGER) AS duration_minutes, - -- Parse "14.56 EUR" → 14.56 TRY_CAST( SPLIT_PART(slot_json ->> 'price', ' ', 1) AS DOUBLE ) AS price_amount, - -- Parse "14.56 EUR" → EUR SPLIT_PART(slot_json ->> 'price', ' ', 2) AS price_currency, + snapshot_type, + recheck_hour, captured_at_utc FROM raw_slots WHERE resource_id IS NOT NULL diff --git a/uv.lock b/uv.lock index dfc7159..663c21c 100644 --- a/uv.lock +++ b/uv.lock @@ -1151,6 +1151,7 @@ version = "0.1.0" source = { editable = "web" } dependencies = [ { name = "aiosqlite" }, + { name = "croniter" }, { name = "duckdb" }, { name = "hypercorn" }, { name = "itsdangerous" }, @@ -1168,6 +1169,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "aiosqlite", specifier = ">=0.19.0" }, + { name = "croniter", specifier = ">=6.0.0" }, { name = "duckdb", specifier = ">=1.0.0" }, { name = "hypercorn", specifier = ">=0.17.0" }, { name = "itsdangerous", specifier = ">=2.1.0" }, diff --git a/web/pyproject.toml b/web/pyproject.toml index 3738fc4..db3a321 100644 --- a/web/pyproject.toml +++ b/web/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "duckdb>=1.0.0", "pyarrow>=23.0.1", "pyyaml>=6.0", + "croniter>=6.0.0", ] [build-system] diff --git a/web/src/padelnomics/admin/routes.py b/web/src/padelnomics/admin/routes.py index 8bcf7c8..7bb8e74 100644 --- a/web/src/padelnomics/admin/routes.py +++ b/web/src/padelnomics/admin/routes.py @@ -807,6 +807,46 @@ async def supplier_tier(supplier_id: int): return redirect(url_for("admin.supplier_detail", supplier_id=supplier_id)) +# ============================================================================= +# Feature Flags +# ============================================================================= + +@bp.route("/flags") +@role_required("admin") +async def flags(): + """Feature flags management.""" + flag_list = await fetch_all("SELECT * FROM feature_flags ORDER BY name") + return await render_template("admin/flags.html", flags=flag_list, admin_page="flags") + + +@bp.route("/flags/toggle", methods=["POST"]) +@role_required("admin") +@csrf_protect +async def flag_toggle(): + """Toggle a feature flag on/off.""" + form = await request.form + flag_name = form.get("name", "").strip() + if not flag_name: + await flash("Flag name required.", "error") + return redirect(url_for("admin.flags")) + + # Get current state and flip it + row = await fetch_one("SELECT enabled FROM feature_flags WHERE name = ?", (flag_name,)) + if not row: + await flash(f"Flag '{flag_name}' not found.", "error") + return redirect(url_for("admin.flags")) + + new_enabled = 0 if row["enabled"] else 1 + now = datetime.utcnow().isoformat() + await execute( + "UPDATE feature_flags SET enabled = ?, updated_at = ? WHERE name = ?", + (new_enabled, now, flag_name), + ) + state = "enabled" if new_enabled else "disabled" + await flash(f"Flag '{flag_name}' {state}.", "success") + return redirect(url_for("admin.flags")) + + # ============================================================================= # Feedback Management # ============================================================================= diff --git a/web/src/padelnomics/admin/templates/admin/base_admin.html b/web/src/padelnomics/admin/templates/admin/base_admin.html index d826e97..6c277e9 100644 --- a/web/src/padelnomics/admin/templates/admin/base_admin.html +++ b/web/src/padelnomics/admin/templates/admin/base_admin.html @@ -87,6 +87,10 @@
System
+ + + Flags + Tasks diff --git a/web/src/padelnomics/admin/templates/admin/flags.html b/web/src/padelnomics/admin/templates/admin/flags.html new file mode 100644 index 0000000..818d051 --- /dev/null +++ b/web/src/padelnomics/admin/templates/admin/flags.html @@ -0,0 +1,72 @@ +{% extends "admin/base_admin.html" %} + +{% block admin_head %} + +{% endblock %} + +{% block admin_content %} +

Feature Flags

+

+ Toggle features on/off without redeployment. Changes take effect immediately. +

+ + + + + + + + + + + + + {% for f in flags %} + + + + + + + + {% endfor %} + +
FlagDescriptionStatusLast Updated
{{ f.name }}{{ f.description or '—' }} + {% if f.enabled %} + Enabled + {% else %} + Disabled + {% endif %} + {{ f.updated_at or '—' }} +
+ + + {% if f.enabled %} + + {% else %} + + {% endif %} +
+
+{% endblock %} diff --git a/web/src/padelnomics/app.py b/web/src/padelnomics/app.py index 30aad72..135cb6b 100644 --- a/web/src/padelnomics/app.py +++ b/web/src/padelnomics/app.py @@ -7,7 +7,7 @@ from pathlib import Path from quart import Quart, Response, abort, g, redirect, request, session, url_for from .analytics import close_analytics_db, open_analytics_db -from .core import close_db, config, get_csrf_token, init_db, setup_request_id +from .core import close_db, config, get_csrf_token, init_db, is_flag_enabled, setup_request_id from .i18n import LANG_BLUEPRINTS, SUPPORTED_LANGS, get_translations _ASSET_VERSION = str(int(time.time())) @@ -224,6 +224,7 @@ def create_app() -> Quart: "lang": effective_lang, "t": get_translations(effective_lang), "v": _ASSET_VERSION, + "flag": is_flag_enabled, } # ------------------------------------------------------------------------- diff --git a/web/src/padelnomics/auth/routes.py b/web/src/padelnomics/auth/routes.py index 77c4c3a..12a30d0 100644 --- a/web/src/padelnomics/auth/routes.py +++ b/web/src/padelnomics/auth/routes.py @@ -14,9 +14,10 @@ from ..core import ( config, csrf_protect, execute, + feature_gate, fetch_one, is_disposable_email, - waitlist_gate, + is_flag_enabled, ) from ..i18n import SUPPORTED_LANGS, get_translations @@ -248,14 +249,14 @@ async def login(): @bp.route("/signup", methods=["GET", "POST"]) @csrf_protect -@waitlist_gate("waitlist.html", plan=lambda: request.args.get("plan", "free")) +@feature_gate("payments", "waitlist.html", plan=lambda: request.args.get("plan", "free")) async def signup(): """Signup page - same as login but with different messaging.""" if g.get("user"): return redirect(url_for("dashboard.index")) - # Waitlist POST handling - if config.WAITLIST_MODE and request.method == "POST": + # Waitlist POST handling (when payments flag is disabled) + if not await is_flag_enabled("payments") and request.method == "POST": _t = get_translations(g.lang) form = await request.form email = form.get("email", "").strip().lower() diff --git a/web/src/padelnomics/content/routes.py b/web/src/padelnomics/content/routes.py index 324f174..049a71d 100644 --- a/web/src/padelnomics/content/routes.py +++ b/web/src/padelnomics/content/routes.py @@ -9,7 +9,7 @@ from jinja2 import Environment, FileSystemLoader from markupsafe import Markup from quart import Blueprint, abort, render_template, request -from ..core import capture_waitlist_email, config, csrf_protect, fetch_all, fetch_one, waitlist_gate +from ..core import capture_waitlist_email, csrf_protect, feature_gate, fetch_all, fetch_one from ..i18n import get_translations bp = Blueprint( @@ -106,10 +106,11 @@ async def bake_scenario_cards(html: str, lang: str = "en") -> str: @bp.route("/markets", methods=["GET", "POST"]) @csrf_protect -@waitlist_gate("markets_waitlist.html") +@feature_gate("markets", "markets_waitlist.html") async def markets(): """Hub page: search + country/region filter for articles.""" - if config.WAITLIST_MODE and request.method == "POST": + from ..core import is_flag_enabled + if not await is_flag_enabled("markets") and request.method == "POST": form = await request.form email = form.get("email", "").strip().lower() if email and "@" in email: @@ -147,7 +148,7 @@ async def markets(): @bp.route("/markets/results") -@waitlist_gate("markets_waitlist.html") +@feature_gate("markets", "markets_waitlist.html") async def market_results(): """HTMX partial: filtered article cards.""" q = request.args.get("q", "").strip() diff --git a/web/src/padelnomics/core.py b/web/src/padelnomics/core.py index bd925f2..1745d44 100644 --- a/web/src/padelnomics/core.py +++ b/web/src/padelnomics/core.py @@ -698,27 +698,61 @@ def ab_test(experiment: str, variants: tuple = ("control", "treatment")): return decorator -def waitlist_gate(template: str, **extra_context): - """Parameterized decorator that intercepts GET requests when WAITLIST_MODE is enabled. +async def is_flag_enabled(name: str, default: bool = False) -> bool: + """Check if a feature flag is enabled. Falls back to default if flag doesn't exist. - If WAITLIST_MODE is true and the request is a GET, renders the given template - instead of calling the wrapped function. POST requests and non-waitlist mode - always pass through. + Reads from the feature_flags table. Flags are toggled via the admin UI + and take effect immediately — no restart needed. + """ + db = await get_db() + row = await db.execute_fetchone( + "SELECT enabled FROM feature_flags WHERE name = ?", (name,) + ) + if row is None: + return default + return bool(row[0]) + + +def feature_gate(flag_name: str, waitlist_template: str, **extra_context): + """Gate a route behind a feature flag. Shows waitlist template if flag is disabled. + + Replaces the old waitlist_gate() which used a global WAITLIST_MODE env var. + This checks per-feature flags from the database instead. Args: - template: Template path to render in waitlist mode (e.g., "waitlist.html") - **extra_context: Additional context variables to pass to template. - Values can be callables (evaluated at request time) or static. + flag_name: Name of the feature flag (e.g., "payments", "supplier_signup") + waitlist_template: Template to render when the flag is OFF and method is GET + **extra_context: Additional context. Values can be callables (evaluated at request time). Usage: @bp.route("/signup", methods=["GET", "POST"]) @csrf_protect - @waitlist_gate("waitlist.html", plan=lambda: request.args.get("plan", "free")) + @feature_gate("payments", "waitlist.html", plan=lambda: request.args.get("plan", "free")) async def signup(): - # POST handling and normal signup code here ... """ + def decorator(f): + @wraps(f) + async def decorated(*args, **kwargs): + if not await is_flag_enabled(flag_name) and request.method == "GET": + ctx = {} + for key, val in extra_context.items(): + ctx[key] = val() if callable(val) else val + return await render_template(waitlist_template, **ctx) + return await f(*args, **kwargs) + + return decorated + + return decorator + + +def waitlist_gate(template: str, **extra_context): + """DEPRECATED: Use feature_gate() instead. Kept for backwards compatibility. + + Intercepts GET requests when WAITLIST_MODE is enabled. + """ + def decorator(f): @wraps(f) async def decorated(*args, **kwargs): diff --git a/web/src/padelnomics/migrations/versions/0019_add_feature_flags.py b/web/src/padelnomics/migrations/versions/0019_add_feature_flags.py new file mode 100644 index 0000000..4c497db --- /dev/null +++ b/web/src/padelnomics/migrations/versions/0019_add_feature_flags.py @@ -0,0 +1,28 @@ +"""Add feature_flags table for per-feature gating. + +Replaces the global WAITLIST_MODE env var with granular per-feature flags +that can be toggled at runtime via the admin UI. +""" + + +def up(conn): + conn.execute(""" + CREATE TABLE IF NOT EXISTS feature_flags ( + name TEXT PRIMARY KEY, + enabled INTEGER NOT NULL DEFAULT 0, + description TEXT, + updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) + ) + """) + + # Seed initial flags — markets live, everything else behind waitlist + conn.executemany( + "INSERT OR IGNORE INTO feature_flags (name, enabled, description) VALUES (?, ?, ?)", + [ + ("markets", 1, "Market/SEO content pages"), + ("payments", 0, "Paddle billing & checkout"), + ("planner_export", 0, "Business plan PDF export"), + ("supplier_signup", 0, "Supplier onboarding wizard"), + ("lead_unlock", 0, "Lead credit purchase & unlock"), + ], + ) diff --git a/web/src/padelnomics/planner/routes.py b/web/src/padelnomics/planner/routes.py index a636c0d..b5f6247 100644 --- a/web/src/padelnomics/planner/routes.py +++ b/web/src/padelnomics/planner/routes.py @@ -14,10 +14,10 @@ from ..core import ( config, csrf_protect, execute, + feature_gate, fetch_all, fetch_one, get_paddle_price, - waitlist_gate, ) from ..i18n import get_translations from .calculator import COUNTRY_CURRENCY, CURRENCY_DEFAULT, calc, validate_state @@ -596,7 +596,7 @@ async def set_default(scenario_id: int): @bp.route("/export") @login_required -@waitlist_gate("export_waitlist.html") +@feature_gate("planner_export", "export_waitlist.html") async def export(): """Export options page — language, scenario picker, pricing.""" scenarios = await get_scenarios(g.user["id"]) diff --git a/web/src/padelnomics/suppliers/routes.py b/web/src/padelnomics/suppliers/routes.py index 89ffb30..f7d2977 100644 --- a/web/src/padelnomics/suppliers/routes.py +++ b/web/src/padelnomics/suppliers/routes.py @@ -15,8 +15,9 @@ from ..core import ( execute, fetch_all, fetch_one, + feature_gate, get_paddle_price, - waitlist_gate, + is_flag_enabled, ) from ..i18n import get_translations @@ -243,8 +244,8 @@ def _lead_tier_required(f): @bp.route("/signup") -@waitlist_gate( - "suppliers/waitlist.html", +@feature_gate( + "supplier_signup", "suppliers/waitlist.html", plan=lambda: request.args.get("plan", "supplier_growth"), plans=lambda: PLAN_FEATURES, ) @@ -574,6 +575,9 @@ async def lead_feed(): @csrf_protect async def unlock_lead(token: str): """Spend credits to unlock a lead. Returns full-details card via HTMX.""" + if not await is_flag_enabled("lead_unlock"): + return jsonify({"error": "Lead unlock is not available yet."}), 403 + from ..credits import InsufficientCredits from ..credits import unlock_lead as do_unlock From b5c9a4e573536e03c0e45744e5309ebb0c32d242 Mon Sep 17 00:00:00 2001 From: Deeman Date: Mon, 23 Feb 2026 15:26:40 +0100 Subject: [PATCH 2/3] test: e2e + unit tests for supervisor, proxy, and feature flags MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - test_supervisor.py: 28 tests covering load_workflows, resolve_schedule, is_due, topological_waves, and proxy round-robin / sticky selection - test_feature_flags.py: 31 tests covering migration 0019, is_flag_enabled, feature_gate decorator, admin toggle routes, and full toggle e2e flows - conftest.py: seed feature flags with production defaults (markets=1, others=0) so all routes behave consistently in tests - Fix is_flag_enabled bug: replace non-existent db.execute_fetchone() with fetch_one() helper - Update 4 test_waitlist / test_businessplan tests that relied on WAITLIST_MODE patches — now enable the relevant DB flag instead Co-Authored-By: Claude Opus 4.6 --- web/src/padelnomics/core.py | 5 +- web/tests/conftest.py | 14 ++ web/tests/test_businessplan.py | 4 + web/tests/test_feature_flags.py | 431 ++++++++++++++++++++++++++++++++ web/tests/test_supervisor.py | 285 +++++++++++++++++++++ web/tests/test_waitlist.py | 62 ++--- 6 files changed, 770 insertions(+), 31 deletions(-) create mode 100644 web/tests/test_feature_flags.py create mode 100644 web/tests/test_supervisor.py diff --git a/web/src/padelnomics/core.py b/web/src/padelnomics/core.py index 1745d44..2b640c8 100644 --- a/web/src/padelnomics/core.py +++ b/web/src/padelnomics/core.py @@ -704,13 +704,12 @@ async def is_flag_enabled(name: str, default: bool = False) -> bool: Reads from the feature_flags table. Flags are toggled via the admin UI and take effect immediately — no restart needed. """ - db = await get_db() - row = await db.execute_fetchone( + row = await fetch_one( "SELECT enabled FROM feature_flags WHERE name = ?", (name,) ) if row is None: return default - return bool(row[0]) + return bool(row["enabled"]) def feature_gate(flag_name: str, waitlist_template: str, **extra_context): diff --git a/web/tests/conftest.py b/web/tests/conftest.py index 207a0db..e5c1d49 100644 --- a/web/tests/conftest.py +++ b/web/tests/conftest.py @@ -54,6 +54,20 @@ async def db(): conn.row_factory = aiosqlite.Row await conn.execute("PRAGMA foreign_keys=ON") await conn.executescript(schema_ddl) + # Seed feature flags so routes that use feature_gate() pass through by default. + # Tests that specifically test gated behaviour set the flag to 0 via _set_flag(). + await conn.executescript(""" + INSERT OR IGNORE INTO feature_flags (name, enabled, description) + VALUES ('markets', 1, 'Market/SEO content pages'); + INSERT OR IGNORE INTO feature_flags (name, enabled, description) + VALUES ('payments', 0, 'Paddle billing & checkout'); + INSERT OR IGNORE INTO feature_flags (name, enabled, description) + VALUES ('planner_export', 0, 'Business plan PDF export'); + INSERT OR IGNORE INTO feature_flags (name, enabled, description) + VALUES ('supplier_signup', 0, 'Supplier onboarding wizard'); + INSERT OR IGNORE INTO feature_flags (name, enabled, description) + VALUES ('lead_unlock', 0, 'Lead credit purchase & unlock'); + """) await conn.commit() original_db = core._db diff --git a/web/tests/test_businessplan.py b/web/tests/test_businessplan.py index 3230545..508d0e5 100644 --- a/web/tests/test_businessplan.py +++ b/web/tests/test_businessplan.py @@ -255,6 +255,10 @@ class TestExportRoutes: assert resp.status_code == 302 async def test_export_page_shows_scenarios(self, auth_client, db, scenario): + await db.execute( + "INSERT OR REPLACE INTO feature_flags (name, enabled, description) VALUES ('planner_export', 1, '')" + ) + await db.commit() resp = await auth_client.get("/en/planner/export") assert resp.status_code == 200 html = (await resp.data).decode() diff --git a/web/tests/test_feature_flags.py b/web/tests/test_feature_flags.py new file mode 100644 index 0000000..5dac2d8 --- /dev/null +++ b/web/tests/test_feature_flags.py @@ -0,0 +1,431 @@ +""" +Tests for feature flags — migration, is_flag_enabled helper, feature_gate +decorator, admin toggle routes, and per-feature gating across all routes. + +Unit tests cover is_flag_enabled and feature_gate in isolation. +Integration tests exercise full request/response flows via Quart test client. +""" + +import sqlite3 +from datetime import datetime +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest + +from padelnomics import core +from padelnomics.migrations.migrate import migrate + + +# ── Fixtures & helpers ──────────────────────────────────────────── + + +@pytest.fixture(autouse=True) +def mock_csrf_validation(): + """Mock CSRF validation to always pass in all tests in this file.""" + with patch("padelnomics.core.validate_csrf_token", return_value=True): + yield + + +@pytest.fixture +async def admin_client(app, db): + """Test client with an admin-role user session (module-level, follows test_content.py).""" + now = datetime.utcnow().isoformat() + async with db.execute( + "INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)", + ("flags_admin@test.com", "Flags Admin", now), + ) as cursor: + admin_id = cursor.lastrowid + await db.execute( + "INSERT INTO user_roles (user_id, role) VALUES (?, 'admin')", (admin_id,) + ) + await db.commit() + async with app.test_client() as c: + async with c.session_transaction() as sess: + sess["user_id"] = admin_id + yield c + + +async def _set_flag(db, name: str, enabled: bool, description: str = ""): + """Insert or replace a flag in the test DB.""" + await db.execute( + """INSERT OR REPLACE INTO feature_flags (name, enabled, description) + VALUES (?, ?, ?)""", + (name, 1 if enabled else 0, description), + ) + await db.commit() + + +async def _flag_value(db, name: str) -> int | None: + async with db.execute( + "SELECT enabled FROM feature_flags WHERE name = ?", (name,) + ) as cursor: + row = await cursor.fetchone() + return row[0] if row else None + + +async def _seed_all_flags(db): + """Seed the five default flags matching migration 0019 defaults.""" + flags = [ + ("markets", 1), + ("payments", 0), + ("planner_export", 0), + ("supplier_signup", 0), + ("lead_unlock", 0), + ] + for name, enabled in flags: + await db.execute( + "INSERT OR REPLACE INTO feature_flags (name, enabled, description) VALUES (?, ?, ?)", + (name, enabled, ""), + ) + await db.commit() + + +def _column_names(conn, table): + return [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()] + + +def _table_names(conn): + rows = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + " AND name NOT LIKE 'sqlite_%' ORDER BY name" + ).fetchall() + return [r[0] for r in rows] + + +# ── Migration 0019 ──────────────────────────────────────────────── + + +class TestMigration0019: + """Migration 0019 creates feature_flags table and seeds initial flags.""" + + def test_creates_feature_flags_table(self, tmp_path): + db_path = str(tmp_path / "test.db") + migrate(db_path) + conn = sqlite3.connect(db_path) + assert "feature_flags" in _table_names(conn) + conn.close() + + def test_table_has_correct_columns(self, tmp_path): + db_path = str(tmp_path / "test.db") + migrate(db_path) + conn = sqlite3.connect(db_path) + cols = _column_names(conn, "feature_flags") + conn.close() + assert "name" in cols + assert "enabled" in cols + assert "description" in cols + assert "updated_at" in cols + + def test_seeds_markets_enabled(self, tmp_path): + db_path = str(tmp_path / "test.db") + migrate(db_path) + conn = sqlite3.connect(db_path) + row = conn.execute( + "SELECT enabled FROM feature_flags WHERE name = 'markets'" + ).fetchone() + conn.close() + assert row is not None and row[0] == 1 + + def test_seeds_payments_disabled(self, tmp_path): + db_path = str(tmp_path / "test.db") + migrate(db_path) + conn = sqlite3.connect(db_path) + row = conn.execute( + "SELECT enabled FROM feature_flags WHERE name = 'payments'" + ).fetchone() + conn.close() + assert row is not None and row[0] == 0 + + def test_seeds_all_five_flags(self, tmp_path): + db_path = str(tmp_path / "test.db") + migrate(db_path) + conn = sqlite3.connect(db_path) + names = {r[0] for r in conn.execute("SELECT name FROM feature_flags").fetchall()} + conn.close() + assert names == {"markets", "payments", "planner_export", "supplier_signup", "lead_unlock"} + + def test_idempotent_on_rerun(self, tmp_path): + """Running migrate twice does not duplicate seed rows.""" + db_path = str(tmp_path / "test.db") + migrate(db_path) + migrate(db_path) + conn = sqlite3.connect(db_path) + count = conn.execute("SELECT COUNT(*) FROM feature_flags").fetchone()[0] + conn.close() + assert count == 5 + + +# ── is_flag_enabled ─────────────────────────────────────────────── + + +class TestIsFlagEnabled: + """Unit tests for is_flag_enabled() helper.""" + + @pytest.mark.asyncio + async def test_returns_true_for_enabled_flag(self, db): + await _set_flag(db, "markets", True) + assert await core.is_flag_enabled("markets") is True + + @pytest.mark.asyncio + async def test_returns_false_for_disabled_flag(self, db): + await _set_flag(db, "payments", False) + assert await core.is_flag_enabled("payments") is False + + @pytest.mark.asyncio + async def test_returns_default_false_for_unknown_flag(self, db): + assert await core.is_flag_enabled("nonexistent_flag") is False + + @pytest.mark.asyncio + async def test_returns_custom_default_for_unknown_flag(self, db): + assert await core.is_flag_enabled("nonexistent_flag", default=True) is True + + @pytest.mark.asyncio + async def test_reflects_live_db_change(self, db): + """Change takes effect without restart — reads live DB every call.""" + await _set_flag(db, "payments", False) + assert await core.is_flag_enabled("payments") is False + await _set_flag(db, "payments", True) + assert await core.is_flag_enabled("payments") is True + + +# ── feature_gate decorator ──────────────────────────────────────── + + +class TestFeatureGateDecorator: + """feature_gate blocks GET when flag is disabled, passes through when enabled.""" + + @pytest.mark.asyncio + async def test_shows_waitlist_on_get_when_flag_disabled(self, client, db): + """GET /auth/signup shows waitlist page when payments flag is off.""" + await _set_flag(db, "payments", False) + response = await client.get("/auth/signup") + assert response.status_code == 200 + html = await response.get_data(as_text=True) + # payments waitlist.html shows "join" or "launching soon" messaging + assert "launching soon" in html.lower() or "join" in html.lower() or "waitlist" in html.lower() + + @pytest.mark.asyncio + async def test_shows_normal_form_on_get_when_flag_enabled(self, client, db): + """GET /auth/signup shows normal signup form when payments flag is on.""" + await _set_flag(db, "payments", True) + response = await client.get("/auth/signup") + assert response.status_code == 200 + html = await response.get_data(as_text=True) + # Normal signup shows create account form + assert "create" in html.lower() or "sign up" in html.lower() or "email" in html.lower() + + @pytest.mark.asyncio + async def test_post_passes_through_when_flag_disabled(self, client, db): + """POST is never blocked by feature_gate (gates GET only).""" + await _set_flag(db, "payments", False) + with patch("padelnomics.worker.enqueue", new_callable=AsyncMock): + response = await client.post("/auth/signup", form={ + "csrf_token": "test_token", + "email": "test@example.com", + }) + assert response.status_code in (200, 302) + + @pytest.mark.asyncio + async def test_markets_route_gated_by_markets_flag(self, client, db): + """markets flag controls /en/markets access.""" + # Disabled → shows gated page + await _set_flag(db, "markets", False) + response = await client.get("/en/markets") + assert response.status_code == 200 + html = await response.get_data(as_text=True) + assert "coming soon" in html.lower() or "intelligence" in html.lower() + + # Enabled → passes through + await _set_flag(db, "markets", True) + response = await client.get("/en/markets") + assert response.status_code == 200 + html = await response.get_data(as_text=True) + # Normal markets page doesn't show the "coming soon" waitlist title + assert "coming soon" not in html.lower() + + @pytest.mark.asyncio + async def test_supplier_signup_gated_by_flag(self, client, db): + """supplier_signup flag controls /en/suppliers/signup.""" + await _set_flag(db, "supplier_signup", False) + response = await client.get("/en/suppliers/signup") + assert response.status_code == 200 + html = await response.get_data(as_text=True) + assert "waitlist" in html.lower() or "supplier" in html.lower() + + @pytest.mark.asyncio + async def test_planner_export_gated_for_authenticated_user(self, auth_client, db): + """planner_export flag controls /en/planner/export.""" + await _set_flag(db, "planner_export", False) + response = await auth_client.get("/en/planner/export") + assert response.status_code == 200 + html = await response.get_data(as_text=True) + assert "coming soon" in html.lower() or "waitlist" in html.lower() + + @pytest.mark.asyncio + async def test_planner_export_accessible_when_enabled(self, auth_client, db): + """Normal planner export page shown when planner_export flag is on.""" + await _set_flag(db, "planner_export", True) + response = await auth_client.get("/en/planner/export") + assert response.status_code == 200 + html = await response.get_data(as_text=True) + assert "coming soon" not in html.lower() + + +# ── lead_unlock gate ────────────────────────────────────────────── + + +class TestLeadUnlockGate: + """lead_unlock flag controls whether the unlock endpoint is reachable.""" + + @pytest.mark.asyncio + async def test_is_flag_disabled_by_default(self, db): + """lead_unlock flag seeded as disabled — is_flag_enabled returns False.""" + await _set_flag(db, "lead_unlock", False) + assert await core.is_flag_enabled("lead_unlock") is False + + @pytest.mark.asyncio + async def test_is_flag_enabled_after_toggle(self, db): + """is_flag_enabled returns True after flag is enabled.""" + await _set_flag(db, "lead_unlock", True) + assert await core.is_flag_enabled("lead_unlock") is True + + @pytest.mark.asyncio + async def test_route_imports_is_flag_enabled(self): + """suppliers/routes.py imports is_flag_enabled (gate is wired up).""" + from padelnomics.suppliers.routes import unlock_lead + import inspect + src = inspect.getsource(unlock_lead) + assert "is_flag_enabled" in src + assert "lead_unlock" in src + assert "403" in src + + +# ── Admin flag toggle routes ────────────────────────────────────── + + +class TestAdminFlagRoutes: + """Admin /admin/flags endpoints require admin role and toggle flags correctly.""" + + @pytest.mark.asyncio + async def test_flags_page_requires_admin(self, client, db): + """Anonymous request to /admin/flags redirects to login.""" + response = await client.get("/admin/flags", follow_redirects=False) + assert response.status_code == 302 + + @pytest.mark.asyncio + async def test_flags_page_accessible_to_admin(self, admin_client, db): + await _seed_all_flags(db) + response = await admin_client.get("/admin/flags") + assert response.status_code == 200 + + @pytest.mark.asyncio + async def test_flags_page_lists_all_seeded_flags(self, admin_client, db): + await _seed_all_flags(db) + response = await admin_client.get("/admin/flags") + assert response.status_code == 200 + html = await response.get_data(as_text=True) + for flag in ("markets", "payments", "planner_export", "supplier_signup", "lead_unlock"): + assert flag in html + + @pytest.mark.asyncio + async def test_toggle_enables_disabled_flag(self, admin_client, db): + await _set_flag(db, "payments", False) + await admin_client.post("/admin/flags/toggle", form={ + "csrf_token": "test_token", + "name": "payments", + }) + assert await _flag_value(db, "payments") == 1 + + @pytest.mark.asyncio + async def test_toggle_disables_enabled_flag(self, admin_client, db): + await _set_flag(db, "markets", True) + await admin_client.post("/admin/flags/toggle", form={ + "csrf_token": "test_token", + "name": "markets", + }) + assert await _flag_value(db, "markets") == 0 + + @pytest.mark.asyncio + async def test_toggle_unknown_flag_redirects_with_flash(self, admin_client, db): + response = await admin_client.post("/admin/flags/toggle", form={ + "csrf_token": "test_token", + "name": "nonexistent_flag", + }, follow_redirects=True) + assert response.status_code == 200 + html = await response.get_data(as_text=True) + assert "not found" in html.lower() + + @pytest.mark.asyncio + async def test_toggle_requires_admin(self, client, db): + """Anonymous POST to toggle is rejected.""" + response = await client.post("/admin/flags/toggle", form={ + "csrf_token": "test_token", + "name": "markets", + }, follow_redirects=False) + assert response.status_code == 302 + + +# ── Full toggle flow (e2e) ──────────────────────────────────────── + + +class TestFlagToggleFlow: + """End-to-end: admin toggles flag → route behaviour changes immediately.""" + + @pytest.mark.asyncio + async def test_disable_markets_shows_gated_page(self, admin_client, client, db): + """Disable markets → /en/markets shows coming soon. Enable → shows content.""" + # Seed markets as enabled + await _set_flag(db, "markets", True) + response = await client.get("/en/markets") + html = await response.get_data(as_text=True) + assert "coming soon" not in html.lower() + + # Admin disables via toggle + await admin_client.post("/admin/flags/toggle", form={ + "csrf_token": "test_token", + "name": "markets", + }) + + # Now shows gated page + response = await client.get("/en/markets") + html = await response.get_data(as_text=True) + assert "coming soon" in html.lower() + + @pytest.mark.asyncio + async def test_enable_supplier_signup_passes_through(self, admin_client, client, db): + """Enable supplier_signup → normal wizard shown (no waitlist page).""" + # Start with flag disabled + await _set_flag(db, "supplier_signup", False) + response = await client.get("/en/suppliers/signup") + html = await response.get_data(as_text=True) + # Gated: shows waitlist/supplier promo content + assert "waitlist" in html.lower() or "supplier" in html.lower() + + # Admin enables + await admin_client.post("/admin/flags/toggle", form={ + "csrf_token": "test_token", + "name": "supplier_signup", + }) + + # Now passes through (wizard is shown) + response = await client.get("/en/suppliers/signup") + assert response.status_code == 200 + html = await response.get_data(as_text=True) + assert "waitlist" not in html.lower() + + @pytest.mark.asyncio + async def test_double_toggle_restores_original_state(self, admin_client, db): + """Toggling a flag twice returns it to its original value.""" + await _set_flag(db, "payments", False) + assert await _flag_value(db, "payments") == 0 + + await admin_client.post("/admin/flags/toggle", form={ + "csrf_token": "test_token", "name": "payments", + }) + assert await _flag_value(db, "payments") == 1 + + await admin_client.post("/admin/flags/toggle", form={ + "csrf_token": "test_token", "name": "payments", + }) + assert await _flag_value(db, "payments") == 0 diff --git a/web/tests/test_supervisor.py b/web/tests/test_supervisor.py new file mode 100644 index 0000000..8f6eb3f --- /dev/null +++ b/web/tests/test_supervisor.py @@ -0,0 +1,285 @@ +""" +Unit tests for supervisor.py and proxy.py. + +Tests cover pure-Python logic only — no DB, no subprocesses, no network. +DB-dependent functions (is_due, _get_last_success_time) are tested via mocks. + +supervisor.py lives in src/padelnomics/ (not a uv workspace package), so we +add src/ to sys.path before importing. +""" + +import sys +import textwrap +import tomllib +from datetime import UTC, datetime, timedelta +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# Load supervisor.py directly by path — avoids clashing with the web app's +# 'padelnomics' namespace (which is the installed web package). +import importlib.util as _ilu + +_SUP_PATH = Path(__file__).parent.parent.parent / "src" / "padelnomics" / "supervisor.py" +_spec = _ilu.spec_from_file_location("padelnomics_supervisor", _SUP_PATH) +sup = _ilu.module_from_spec(_spec) +_spec.loader.exec_module(sup) + +from padelnomics_extract.proxy import ( + load_proxy_urls, + make_round_robin_cycler, + make_sticky_selector, +) + + +# ── load_workflows ──────────────────────────────────────────────── + + +class TestLoadWorkflows: + def test_loads_all_fields(self, tmp_path): + toml = tmp_path / "workflows.toml" + toml.write_text(textwrap.dedent("""\ + [extract_a] + module = "mypkg.extract_a" + schedule = "daily" + + [extract_b] + module = "mypkg.extract_b" + schedule = "weekly" + entry = "run" + depends_on = ["extract_a"] + proxy_mode = "sticky" + """)) + wfs = sup.load_workflows(toml) + assert len(wfs) == 2 + + a = next(w for w in wfs if w["name"] == "extract_a") + assert a["module"] == "mypkg.extract_a" + assert a["schedule"] == "daily" + assert a["entry"] == "main" # default + assert a["depends_on"] == [] # default + assert a["proxy_mode"] == "round-robin" # default + + b = next(w for w in wfs if w["name"] == "extract_b") + assert b["entry"] == "run" + assert b["depends_on"] == ["extract_a"] + assert b["proxy_mode"] == "sticky" + + def test_raises_on_missing_module(self, tmp_path): + toml = tmp_path / "bad.toml" + toml.write_text("[wf]\nschedule = 'daily'\n") + with pytest.raises(AssertionError, match="missing 'module'"): + sup.load_workflows(toml) + + def test_raises_on_missing_schedule(self, tmp_path): + toml = tmp_path / "bad.toml" + toml.write_text("[wf]\nmodule = 'mypkg.wf'\n") + with pytest.raises(AssertionError, match="missing 'schedule'"): + sup.load_workflows(toml) + + def test_raises_if_file_missing(self, tmp_path): + with pytest.raises(AssertionError, match="not found"): + sup.load_workflows(tmp_path / "nonexistent.toml") + + +# ── resolve_schedule ────────────────────────────────────────────── + + +class TestResolveSchedule: + def test_maps_named_presets(self): + assert sup.resolve_schedule("hourly") == "0 * * * *" + assert sup.resolve_schedule("daily") == "0 5 * * *" + assert sup.resolve_schedule("weekly") == "0 3 * * 1" + assert sup.resolve_schedule("monthly") == "0 4 1 * *" + + def test_passes_through_raw_cron(self): + expr = "0 6-23 * * *" + assert sup.resolve_schedule(expr) == expr + + def test_unknown_name_passes_through(self): + assert sup.resolve_schedule("quarterly") == "quarterly" + + +# ── is_due ──────────────────────────────────────────────────────── + + +class TestIsDue: + def _wf(self, schedule="daily", name="test_wf"): + return {"name": name, "schedule": schedule} + + def test_never_ran_is_due(self): + conn = MagicMock() + with patch.object(sup, "_get_last_success_time", return_value=None): + assert sup.is_due(conn, self._wf()) is True + + def test_ran_after_last_trigger_is_not_due(self): + """Last run was AFTER the most recent trigger — not due.""" + conn = MagicMock() + # Use a daily schedule — trigger fires at 05:00 UTC + # Simulate last success = today at 06:00, so trigger already covered + now = datetime.now(UTC) + last_success = now.replace(hour=6, minute=0, second=0, microsecond=0) + with patch.object(sup, "_get_last_success_time", return_value=last_success): + assert sup.is_due(conn, self._wf(schedule="daily")) is False + + def test_ran_before_last_trigger_is_due(self): + """Last run was BEFORE the most recent trigger — due again.""" + conn = MagicMock() + # Monthly fires on the 1st at 04:00 — simulate running last month + last_success = datetime.now(UTC) - timedelta(days=35) + with patch.object(sup, "_get_last_success_time", return_value=last_success): + assert sup.is_due(conn, self._wf(schedule="monthly")) is True + + +# ── topological_waves ───────────────────────────────────────────── + + +class TestTopologicalWaves: + def _wf(self, name, depends_on=None): + return {"name": name, "depends_on": depends_on or []} + + def test_no_deps_single_wave(self): + wfs = [self._wf("a"), self._wf("b"), self._wf("c")] + waves = sup.topological_waves(wfs) + assert len(waves) == 1 + assert {w["name"] for w in waves[0]} == {"a", "b", "c"} + + def test_simple_chain_two_waves(self): + wfs = [self._wf("a"), self._wf("b", depends_on=["a"])] + waves = sup.topological_waves(wfs) + assert len(waves) == 2 + assert waves[0][0]["name"] == "a" + assert waves[1][0]["name"] == "b" + + def test_diamond_three_waves(self): + """a → b,c → d""" + wfs = [ + self._wf("a"), + self._wf("b", depends_on=["a"]), + self._wf("c", depends_on=["a"]), + self._wf("d", depends_on=["b", "c"]), + ] + waves = sup.topological_waves(wfs) + assert len(waves) == 3 + assert waves[0][0]["name"] == "a" + assert {w["name"] for w in waves[1]} == {"b", "c"} + assert waves[2][0]["name"] == "d" + + def test_dep_outside_due_set_ignored(self): + """Dependency not in the due set is treated as satisfied.""" + wfs = [self._wf("b", depends_on=["a"])] # "a" not in due set + waves = sup.topological_waves(wfs) + assert len(waves) == 1 + assert waves[0][0]["name"] == "b" + + def test_circular_dep_raises(self): + wfs = [ + self._wf("a", depends_on=["b"]), + self._wf("b", depends_on=["a"]), + ] + with pytest.raises(AssertionError, match="Circular dependency"): + sup.topological_waves(wfs) + + def test_empty_list_returns_empty(self): + assert sup.topological_waves([]) == [] + + def test_real_workflows_toml(self): + """The actual workflows.toml in the repo parses and produces valid waves.""" + repo_root = Path(__file__).parent.parent.parent + wf_path = repo_root / "infra" / "supervisor" / "workflows.toml" + if not wf_path.exists(): + pytest.skip("workflows.toml not found") + wfs = sup.load_workflows(wf_path) + waves = sup.topological_waves(wfs) + # playtomic_availability must come after playtomic_tenants + all_names = [w["name"] for wave in waves for w in wave] + tenants_idx = all_names.index("playtomic_tenants") + avail_idx = all_names.index("playtomic_availability") + assert tenants_idx < avail_idx + + +# ── proxy.py ───────────────────────────────────────────────────── + + +class TestLoadProxyUrls: + def test_returns_empty_when_unset(self, monkeypatch): + monkeypatch.delenv("PROXY_URLS", raising=False) + assert load_proxy_urls() == [] + + def test_parses_comma_separated_urls(self, monkeypatch): + monkeypatch.setenv( + "PROXY_URLS", + "http://p1:8080,http://p2:8080,http://p3:8080", + ) + urls = load_proxy_urls() + assert urls == ["http://p1:8080", "http://p2:8080", "http://p3:8080"] + + def test_strips_whitespace(self, monkeypatch): + monkeypatch.setenv("PROXY_URLS", " http://p1:8080 , http://p2:8080 ") + urls = load_proxy_urls() + assert urls == ["http://p1:8080", "http://p2:8080"] + + def test_ignores_empty_segments(self, monkeypatch): + monkeypatch.setenv("PROXY_URLS", "http://p1:8080,,http://p2:8080,") + urls = load_proxy_urls() + assert urls == ["http://p1:8080", "http://p2:8080"] + + +class TestRoundRobinCycler: + def test_returns_none_callable_when_no_proxies(self): + fn = make_round_robin_cycler([]) + assert fn() is None + + def test_cycles_through_proxies(self): + urls = ["http://p1", "http://p2", "http://p3"] + fn = make_round_robin_cycler(urls) + results = [fn() for _ in range(6)] + assert results == ["http://p1", "http://p2", "http://p3"] * 2 + + def test_thread_safe_independent_calls(self): + """Concurrent calls each get a proxy — no exceptions.""" + import threading + urls = ["http://p1", "http://p2"] + fn = make_round_robin_cycler(urls) + results = [] + lock = threading.Lock() + + def worker(): + proxy = fn() + with lock: + results.append(proxy) + + threads = [threading.Thread(target=worker) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(results) == 10 + assert all(r in urls for r in results) + + +class TestStickySelectorProxy: + def test_returns_none_callable_when_no_proxies(self): + fn = make_sticky_selector([]) + assert fn("any_key") is None + + def test_same_key_always_same_proxy(self): + urls = ["http://p1", "http://p2", "http://p3"] + fn = make_sticky_selector(urls) + proxy = fn("tenant_abc") + for _ in range(10): + assert fn("tenant_abc") == proxy + + def test_different_keys_can_map_to_different_proxies(self): + urls = ["http://p1", "http://p2", "http://p3"] + fn = make_sticky_selector(urls) + results = {fn(f"key_{i}") for i in range(30)} + assert len(results) > 1 # distribution across proxies + + def test_all_results_are_valid_proxies(self): + urls = ["http://p1", "http://p2"] + fn = make_sticky_selector(urls) + for i in range(20): + assert fn(f"key_{i}") in urls diff --git a/web/tests/test_waitlist.py b/web/tests/test_waitlist.py index d8c352d..4f080af 100644 --- a/web/tests/test_waitlist.py +++ b/web/tests/test_waitlist.py @@ -251,13 +251,16 @@ class TestAuthRoutes: @pytest.mark.asyncio async def test_normal_signup_when_waitlist_disabled(self, client, db): - """Normal signup flow when WAITLIST_MODE is false.""" - with patch.object(core.config, "WAITLIST_MODE", False): - response = await client.get("/auth/signup") - assert response.status_code == 200 - html = await response.get_data(as_text=True) - # Should see normal signup form, not waitlist form - assert "Create Free Account" in html or "Sign Up" in html + """Normal signup flow when payments flag is enabled (WAITLIST_MODE replaced by feature flag).""" + await db.execute( + "INSERT OR REPLACE INTO feature_flags (name, enabled, description) VALUES ('payments', 1, '')" + ) + await db.commit() + response = await client.get("/auth/signup") + assert response.status_code == 200 + html = await response.get_data(as_text=True) + # Should see normal signup form, not waitlist form + assert "Create Free Account" in html or "Sign Up" in html @pytest.mark.asyncio async def test_shows_waitlist_form_when_enabled(self, client, db): @@ -713,13 +716,16 @@ class TestWaitlistGateDecorator: """Test waitlist_gate decorator via integration tests.""" @pytest.mark.asyncio - async def test_passes_through_when_waitlist_disabled(self, client): - """Decorator passes through to normal flow when WAITLIST_MODE=false.""" - with patch.object(core.config, "WAITLIST_MODE", False): - response = await client.get("/auth/signup") - html = await response.get_data(as_text=True) - # Should see normal signup, not waitlist - assert "waitlist" not in html.lower() or "create" in html.lower() + async def test_passes_through_when_waitlist_disabled(self, client, db): + """feature_gate passes through to normal form when payments flag is enabled.""" + await db.execute( + "INSERT OR REPLACE INTO feature_flags (name, enabled, description) VALUES ('payments', 1, '')" + ) + await db.commit() + response = await client.get("/auth/signup") + html = await response.get_data(as_text=True) + # Should see normal signup, not waitlist + assert "waitlist" not in html.lower() or "create" in html.lower() @pytest.mark.asyncio async def test_intercepts_get_when_waitlist_enabled(self, client): @@ -940,21 +946,21 @@ class TestIntegration: @pytest.mark.asyncio async def test_toggle_off_reverts_to_normal_signup(self, client, db): - """Setting WAITLIST_MODE=false reverts to normal signup flow.""" - # First, enable waitlist mode - with patch.object(core.config, "WAITLIST_MODE", True): - response = await client.get("/auth/signup") - html = await response.get_data(as_text=True) - assert "waitlist" in html.lower() + """Disabling payments flag shows waitlist; enabling it shows normal signup.""" + # payments=0 (default) → shows waitlist page + response = await client.get("/auth/signup") + html = await response.get_data(as_text=True) + assert "waitlist" in html.lower() - # Then, disable waitlist mode - with patch.object(core.config, "WAITLIST_MODE", False): - response = await client.get("/auth/signup") - html = await response.get_data(as_text=True) - # Should see normal signup, not waitlist - assert "create" in html.lower() or "sign up" in html.lower() - # Should NOT see waitlist messaging - assert "join the waitlist" not in html.lower() + # Enable payments flag → shows normal signup + await db.execute( + "INSERT OR REPLACE INTO feature_flags (name, enabled, description) VALUES ('payments', 1, '')" + ) + await db.commit() + response = await client.get("/auth/signup") + html = await response.get_data(as_text=True) + assert "create" in html.lower() or "sign up" in html.lower() + assert "join the waitlist" not in html.lower() @pytest.mark.asyncio async def test_same_email_different_intents_both_captured(self, client, db): From 8b7d474ede565fb5edcd8f84436cb4fa51ab866c Mon Sep 17 00:00:00 2001 From: Deeman Date: Mon, 23 Feb 2026 15:27:23 +0100 Subject: [PATCH 3/3] docs: update CHANGELOG and PROJECT.md for supervisor + feature flags Co-Authored-By: Claude Opus 4.6 --- CHANGELOG.md | 20 ++++++++++++++++++++ PROJECT.md | 6 ++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 93cf8a7..257c110 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,26 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] ### Added +- **Python supervisor** (`src/padelnomics/supervisor.py`) — replaces `supervisor.sh`; + reads `infra/supervisor/workflows.toml` (module, schedule, entry, depends_on, + proxy_mode); runs due workflows in topological waves (parallel within each wave); + croniter-based `is_due()` check; systemd service updated to use `uv run python` +- **`workflows.toml` workflow registry** — 5 extractors registered: overpass, + eurostat, playtomic_tenants, playtomic_availability, playtomic_prices; cron + presets: hourly/daily/weekly/monthly; `playtomic_availability` depends on + `playtomic_tenants` +- **`proxy.py` proxy rotation** (`extract/padelnomics_extract/proxy.py`) — reads + `PROXY_URLS` env var; `make_round_robin_cycler()` for thread-safe round-robin; + `make_sticky_selector()` for consistent per-tenant proxy assignment (hash-based) +- **DB-backed feature flags** — `feature_flags` table (migration 0019); + `is_flag_enabled(name, default)` helper; `feature_gate(flag, template)` decorator + replaces `WAITLIST_MODE`/`waitlist_gate`; 5 flags seeded: `markets` (on), + `payments`, `planner_export`, `supplier_signup`, `lead_unlock` (all off) +- **Admin feature flags UI** — `/admin/flags` lists all flags with toggle; + `POST /admin/flags/toggle` flips enabled bit; requires admin role; flash message + on unknown flag +- **`lead_unlock` gate** — `unlock_lead` route returns HTTP 403 when `lead_unlock` + flag is disabled - **Playtomic full data extraction** — expanded venue bounding boxes from 4 regions (ES, UK, DE, FR) to 23 globally (Italy, Portugal, NL, BE, AT, CH, Nordics, Mexico, Argentina, Middle East, USA); PAGE_SIZE increased from 20 to 100; availability diff --git a/PROJECT.md b/PROJECT.md index 93adf10..72862ba 100644 --- a/PROJECT.md +++ b/PROJECT.md @@ -1,7 +1,7 @@ # Padelnomics — Project Tracker > Move tasks across columns as you work. Add new tasks at the top of the relevant column. -> Last updated: 2026-02-22. +> Last updated: 2026-02-23. --- @@ -59,7 +59,9 @@ - [x] Boost purchases (logo, highlight, verified, card color, sticky week/month) - [x] Credit pack purchases (25/50/100/250) - [x] Supplier subscription tiers (Basic free / Growth €149 / Pro €399, monthly + annual) -- [x] `WAITLIST_MODE` toggle — gates supplier signup + export on GET (default: false) +- [x] **Feature flags** (DB-backed, migration 0019) — `is_flag_enabled()` + `feature_gate()` decorator replace `WAITLIST_MODE`; 5 flags (markets, payments, planner_export, supplier_signup, lead_unlock); admin UI at `/admin/flags` with toggle +- [x] **Python supervisor** (`src/padelnomics/supervisor.py`) + `workflows.toml` — replaces `supervisor.sh`; topological wave scheduling; croniter-based `is_due()`; systemd service updated +- [x] **Proxy rotation** (`extract/padelnomics_extract/proxy.py`) — round-robin + sticky hash-based selector via `PROXY_URLS` env var - [x] Resend email integration (transactional: magic link, welcome, quote verify, lead forward, enquiry) - [x] Auto-create Resend audiences per blueprint (waitlist, planner nurture)