diff --git a/extract/padelnomics_extract/pyproject.toml b/extract/padelnomics_extract/pyproject.toml index 3fe56b1..da52449 100644 --- a/extract/padelnomics_extract/pyproject.toml +++ b/extract/padelnomics_extract/pyproject.toml @@ -14,6 +14,7 @@ extract-overpass = "padelnomics_extract.overpass:main" extract-eurostat = "padelnomics_extract.eurostat:main" extract-playtomic-tenants = "padelnomics_extract.playtomic_tenants:main" extract-playtomic-availability = "padelnomics_extract.playtomic_availability:main" +extract-playtomic-recheck = "padelnomics_extract.playtomic_availability:main_recheck" [build-system] requires = ["hatchling"] diff --git a/extract/padelnomics_extract/src/padelnomics_extract/_shared.py b/extract/padelnomics_extract/src/padelnomics_extract/_shared.py index 38c9772..4df4355 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/_shared.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/_shared.py @@ -19,6 +19,13 @@ LANDING_DIR = Path(os.environ.get("LANDING_DIR", "data/landing")) HTTP_TIMEOUT_SECONDS = 30 OVERPASS_TIMEOUT_SECONDS = 90 # Overpass can be slow on global queries +# Realistic browser User-Agent — avoids bot detection on all extractors +USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/131.0.0.0 Safari/537.36" +) + def setup_logging(name: str) -> logging.Logger: """Configure and return a logger for the given extractor module.""" @@ -50,6 +57,7 @@ def run_extractor( try: with niquests.Session() as session: + session.headers["User-Agent"] = USER_AGENT result = func(LANDING_DIR, year_month, conn, session) assert isinstance(result, dict), f"extractor must return a dict, got {type(result)}" diff --git a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py index 3989062..bdc97bc 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py @@ -5,33 +5,51 @@ unauthenticated /v1/availability endpoint for each venue's next-day slots. This is the highest-value source: daily snapshots enable occupancy rate estimation, pricing benchmarking, and demand signal detection. -API constraint: max 25-hour window per request (see docs/data-sources-inventory.md §2.1). -Rate: 1 req / 2 s (conservative, unauthenticated endpoint). +Parallel mode: set EXTRACT_WORKERS=N and PROXY_URLS=... to fetch N venues +concurrently (one proxy per worker). Without proxies, runs single-threaded. + +Recheck mode: re-queries venues with slots starting within the next 90 minutes. +Writes a separate recheck file for more accurate occupancy measurement. Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.json.gz +Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.json.gz """ import gzip import json +import os import sqlite3 +import threading import time +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import UTC, datetime, timedelta from pathlib import Path import niquests -from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging +from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging +from .proxy import load_proxy_urls, make_round_robin_cycler from .utils import get_last_cursor, landing_path, write_gzip_atomic logger = setup_logging("padelnomics.extract.playtomic_availability") EXTRACTOR_NAME = "playtomic_availability" +RECHECK_EXTRACTOR_NAME = "playtomic_recheck" AVAILABILITY_URL = "https://api.playtomic.io/v1/availability" THROTTLE_SECONDS = 1 -MAX_VENUES_PER_RUN = 10_000 +MAX_VENUES_PER_RUN = 20_000 MAX_RETRIES_PER_VENUE = 2 +MAX_WORKERS = int(os.environ.get("EXTRACT_WORKERS", "1")) +RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "90")) +# Thread-local storage for per-worker sessions +_thread_local = threading.local() + + +# --------------------------------------------------------------------------- +# Tenant ID loading +# --------------------------------------------------------------------------- def _load_tenant_ids(landing_dir: Path) -> list[str]: """Read tenant IDs from the most recent tenants.json.gz file.""" @@ -39,7 +57,6 @@ def _load_tenant_ids(landing_dir: Path) -> list[str]: if not playtomic_dir.exists(): return [] - # Find the most recent tenants.json.gz across all year/month dirs tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True) if not tenant_files: return [] @@ -65,12 +82,10 @@ def _parse_resume_cursor(cursor: str | None, target_date: str) -> int: """Parse cursor_value to find resume index. Returns 0 if no valid cursor.""" if not cursor: return 0 - # cursor format: "{date}:{index}" parts = cursor.split(":", 1) if len(parts) != 2: return 0 cursor_date, cursor_index = parts - # Only resume if cursor is for today's target date if cursor_date != target_date: return 0 try: @@ -79,6 +94,125 @@ def _parse_resume_cursor(cursor: str | None, target_date: str) -> int: return 0 +# --------------------------------------------------------------------------- +# Per-venue fetch (used by both serial and parallel modes) +# --------------------------------------------------------------------------- + +def _get_thread_session(proxy_url: str | None) -> niquests.Session: + """Get or create a thread-local niquests.Session with optional proxy.""" + if not hasattr(_thread_local, "session") or _thread_local.session is None: + session = niquests.Session() + session.headers["User-Agent"] = USER_AGENT + if proxy_url: + session.proxies = {"http": proxy_url, "https": proxy_url} + _thread_local.session = session + return _thread_local.session + + +def _fetch_venue_availability( + tenant_id: str, + start_min_str: str, + start_max_str: str, + proxy_url: str | None, +) -> dict | None: + """Fetch availability for a single venue. Returns payload dict or None on failure.""" + session = _get_thread_session(proxy_url) + params = { + "sport_id": "PADEL", + "tenant_id": tenant_id, + "start_min": start_min_str, + "start_max": start_max_str, + } + + for attempt in range(MAX_RETRIES_PER_VENUE + 1): + try: + resp = session.get(AVAILABILITY_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS) + + if resp.status_code == 429: + wait_seconds = THROTTLE_SECONDS * (attempt + 2) + logger.warning("Rate limited on %s, waiting %ds", tenant_id, wait_seconds) + time.sleep(wait_seconds) + continue + + if resp.status_code >= 500: + logger.warning( + "Server error %d for %s (attempt %d)", + resp.status_code, tenant_id, attempt + 1, + ) + time.sleep(THROTTLE_SECONDS) + continue + + resp.raise_for_status() + time.sleep(THROTTLE_SECONDS) + return {"tenant_id": tenant_id, "slots": resp.json()} + + except niquests.exceptions.RequestException as e: + if attempt < MAX_RETRIES_PER_VENUE: + logger.warning("Request failed for %s (attempt %d): %s", tenant_id, attempt + 1, e) + time.sleep(THROTTLE_SECONDS) + else: + logger.error( + "Giving up on %s after %d attempts: %s", + tenant_id, MAX_RETRIES_PER_VENUE + 1, e, + ) + return None + + return None # all retries exhausted via 429/5xx + + +# --------------------------------------------------------------------------- +# Parallel fetch orchestrator +# --------------------------------------------------------------------------- + +def _fetch_venues_parallel( + tenant_ids: list[str], + start_min_str: str, + start_max_str: str, + worker_count: int, + proxy_cycler, +) -> tuple[list[dict], int]: + """Fetch availability for multiple venues in parallel. + + Returns (venues_data, venues_errored). + """ + venues_data: list[dict] = [] + venues_errored = 0 + completed_count = 0 + lock = threading.Lock() + + def _worker(tenant_id: str) -> dict | None: + proxy_url = proxy_cycler() + return _fetch_venue_availability(tenant_id, start_min_str, start_max_str, proxy_url) + + with ThreadPoolExecutor(max_workers=worker_count) as pool: + futures = {pool.submit(_worker, tid): tid for tid in tenant_ids} + + for future in as_completed(futures): + result = future.result() + with lock: + completed_count += 1 + if result is not None: + venues_data.append(result) + else: + venues_errored += 1 + + if completed_count % 500 == 0: + logger.info( + "Progress: %d/%d venues (%d errors, %d workers)", + completed_count, len(tenant_ids), venues_errored, worker_count, + ) + + logger.info( + "Parallel fetch complete: %d/%d venues (%d errors, %d workers)", + len(venues_data), len(tenant_ids), venues_errored, worker_count, + ) + return venues_data, venues_errored + + +# --------------------------------------------------------------------------- +# Main extraction function +# --------------------------------------------------------------------------- + def extract( landing_dir: Path, year_month: str, @@ -91,7 +225,7 @@ def extract( logger.warning("No tenant IDs found — run extract-playtomic-tenants first") return {"files_written": 0, "files_skipped": 0, "bytes_written": 0} - # Query tomorrow's slots (25-hour window starting at midnight local) + # Query tomorrow's slots tomorrow = datetime.now(UTC) + timedelta(days=1) target_date = tomorrow.strftime("%Y-%m-%d") start_min = tomorrow.replace(hour=0, minute=0, second=0, microsecond=0) @@ -101,117 +235,223 @@ def extract( dest_dir = landing_path(landing_dir, "playtomic", year, month) dest = dest_dir / f"availability_{target_date}.json.gz" - # Check if already completed for this date if dest.exists(): logger.info("Already have %s — skipping", dest) return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} - # Resume from last cursor if we crashed mid-run + # Resume from cursor if crashed mid-run last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) resume_index = _parse_resume_cursor(last_cursor, target_date) if resume_index > 0: logger.info("Resuming from index %d (cursor: %s)", resume_index, last_cursor) - venues_data: list[dict] = [] venues_to_process = tenant_ids[:MAX_VENUES_PER_RUN] - venues_errored = 0 + if resume_index > 0: + venues_to_process = venues_to_process[resume_index:] - for i, tenant_id in enumerate(venues_to_process): - if i < resume_index: - continue + # Determine parallelism + proxy_urls = load_proxy_urls() + worker_count = min(MAX_WORKERS, len(proxy_urls)) if proxy_urls else 1 + proxy_cycler = make_round_robin_cycler(proxy_urls) - params = { - "sport_id": "PADEL", - "tenant_id": tenant_id, - "start_min": start_min.strftime("%Y-%m-%dT%H:%M:%S"), - "start_max": start_max.strftime("%Y-%m-%dT%H:%M:%S"), - } + start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S") + start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S") - for attempt in range(MAX_RETRIES_PER_VENUE + 1): - try: - resp = session.get(AVAILABILITY_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS) - - if resp.status_code == 429: - # Rate limited — back off and retry - wait_seconds = THROTTLE_SECONDS * (attempt + 2) - logger.warning("Rate limited on %s, waiting %ds", tenant_id, wait_seconds) - time.sleep(wait_seconds) - continue - - if resp.status_code >= 500: - logger.warning( - "Server error %d for %s (attempt %d)", - resp.status_code, - tenant_id, - attempt + 1, - ) - time.sleep(THROTTLE_SECONDS) - continue - - resp.raise_for_status() - venues_data.append({"tenant_id": tenant_id, "slots": resp.json()}) - break - - except niquests.exceptions.RequestException as e: - if attempt < MAX_RETRIES_PER_VENUE: - logger.warning( - "Request failed for %s (attempt %d): %s", tenant_id, attempt + 1, e - ) - time.sleep(THROTTLE_SECONDS) - else: - logger.error( - "Giving up on %s after %d attempts: %s", - tenant_id, - MAX_RETRIES_PER_VENUE + 1, - e, - ) - venues_errored += 1 - else: - # All retries exhausted (loop completed without break) - venues_errored += 1 - - if (i + 1) % 100 == 0: - logger.info( - "Progress: %d/%d venues queried, %d errors", - i + 1, - len(venues_to_process), - venues_errored, + if worker_count > 1: + logger.info("Parallel mode: %d workers, %d proxies", worker_count, len(proxy_urls)) + venues_data, venues_errored = _fetch_venues_parallel( + venues_to_process, start_min_str, start_max_str, worker_count, proxy_cycler, + ) + else: + # Serial mode — same as before but uses shared fetch function + logger.info("Serial mode: 1 worker, %d venues", len(venues_to_process)) + venues_data = [] + venues_errored = 0 + for i, tenant_id in enumerate(venues_to_process): + result = _fetch_venue_availability( + tenant_id, start_min_str, start_max_str, proxy_cycler(), ) + if result is not None: + venues_data.append(result) + else: + venues_errored += 1 - time.sleep(THROTTLE_SECONDS) + if (i + 1) % 100 == 0: + logger.info( + "Progress: %d/%d venues queried, %d errors", + i + 1, len(venues_to_process), venues_errored, + ) # Write consolidated file captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") - payload = json.dumps( - { - "date": target_date, - "captured_at_utc": captured_at, - "venue_count": len(venues_data), - "venues_errored": venues_errored, - "venues": venues_data, - } - ).encode() + payload = json.dumps({ + "date": target_date, + "captured_at_utc": captured_at, + "venue_count": len(venues_data), + "venues_errored": venues_errored, + "venues": venues_data, + }).encode() bytes_written = write_gzip_atomic(dest, payload) logger.info( "%d venues scraped (%d errors) -> %s (%s bytes)", - len(venues_data), - venues_errored, - dest, - f"{bytes_written:,}", + len(venues_data), venues_errored, dest, f"{bytes_written:,}", ) return { "files_written": 1, "files_skipped": 0, "bytes_written": bytes_written, - "cursor_value": f"{target_date}:{len(venues_to_process)}", + "cursor_value": f"{target_date}:{len(tenant_ids[:MAX_VENUES_PER_RUN])}", } +# --------------------------------------------------------------------------- +# Recheck mode — re-query venues with upcoming slots for accurate occupancy +# --------------------------------------------------------------------------- + +def _load_morning_availability(landing_dir: Path, target_date: str) -> dict | None: + """Load today's morning availability file. Returns parsed JSON or None.""" + playtomic_dir = landing_dir / "playtomic" + # Search across year/month dirs for the target date + matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.json.gz")) + if not matches: + return None + + with gzip.open(matches[0], "rb") as f: + return json.loads(f.read()) + + +def _find_venues_with_upcoming_slots( + morning_data: dict, window_start: datetime, window_end: datetime +) -> list[str]: + """Find tenant_ids that have available slots starting within the recheck window.""" + tenant_ids = set() + for venue in morning_data.get("venues", []): + tid = venue.get("tenant_id") + if not tid: + continue + for resource in venue.get("slots", []): + for slot in resource.get("slots", []): + start_time_str = slot.get("start_time") + if not start_time_str: + continue + try: + # Parse "2026-02-24T17:00:00" format + slot_start = datetime.fromisoformat(start_time_str).replace(tzinfo=UTC) + if window_start <= slot_start < window_end: + tenant_ids.add(tid) + break # found one upcoming slot, no need to check more + except ValueError: + continue + if tid in tenant_ids: + break # already found upcoming slot for this venue + + return sorted(tenant_ids) + + +def extract_recheck( + landing_dir: Path, + year_month: str, + conn: sqlite3.Connection, + session: niquests.Session, +) -> dict: + """Re-query venues with slots starting soon for accurate occupancy data.""" + now = datetime.now(UTC) + target_date = now.strftime("%Y-%m-%d") + + # Also check tomorrow if it's late evening + tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d") + + morning_data = _load_morning_availability(landing_dir, target_date) + if morning_data is None: + # Try tomorrow's file (morning extraction creates it for tomorrow) + morning_data = _load_morning_availability(landing_dir, tomorrow) + if morning_data is None: + logger.info("No morning availability file found — skipping recheck") + return {"files_written": 0, "files_skipped": 0, "bytes_written": 0} + target_date = tomorrow + + # Find venues with slots in the upcoming window + window_start = now + window_end = now + timedelta(minutes=RECHECK_WINDOW_MINUTES) + venues_to_recheck = _find_venues_with_upcoming_slots(morning_data, window_start, window_end) + + if not venues_to_recheck: + logger.info("No venues with upcoming slots in next %d min — skipping", RECHECK_WINDOW_MINUTES) + return {"files_written": 0, "files_skipped": 0, "bytes_written": 0} + + logger.info( + "Rechecking %d venues with slots starting in next %d min", + len(venues_to_recheck), RECHECK_WINDOW_MINUTES, + ) + + # Fetch availability for the recheck window + start_min_str = window_start.strftime("%Y-%m-%dT%H:%M:%S") + start_max_str = window_end.strftime("%Y-%m-%dT%H:%M:%S") + + # Determine parallelism + proxy_urls = load_proxy_urls() + worker_count = min(MAX_WORKERS, len(proxy_urls)) if proxy_urls else 1 + proxy_cycler = make_round_robin_cycler(proxy_urls) + + if worker_count > 1 and len(venues_to_recheck) > 10: + venues_data, venues_errored = _fetch_venues_parallel( + venues_to_recheck, start_min_str, start_max_str, worker_count, proxy_cycler, + ) + else: + venues_data = [] + venues_errored = 0 + for tid in venues_to_recheck: + result = _fetch_venue_availability(tid, start_min_str, start_max_str, proxy_cycler()) + if result is not None: + venues_data.append(result) + else: + venues_errored += 1 + + # Write recheck file + recheck_hour = now.hour + year, month = year_month.split("/") + dest_dir = landing_path(landing_dir, "playtomic", year, month) + dest = dest_dir / f"availability_{target_date}_recheck_{recheck_hour:02d}.json.gz" + + captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") + payload = json.dumps({ + "date": target_date, + "captured_at_utc": captured_at, + "recheck_hour": recheck_hour, + "recheck_window_minutes": RECHECK_WINDOW_MINUTES, + "rechecked_tenant_ids": venues_to_recheck, + "venue_count": len(venues_data), + "venues_errored": venues_errored, + "venues": venues_data, + }).encode() + + bytes_written = write_gzip_atomic(dest, payload) + logger.info( + "Recheck: %d/%d venues (%d errors) -> %s (%s bytes)", + len(venues_data), len(venues_to_recheck), venues_errored, dest, f"{bytes_written:,}", + ) + + return { + "files_written": 1, + "files_skipped": 0, + "bytes_written": bytes_written, + "cursor_value": f"{target_date}:recheck:{recheck_hour}", + } + + +# --------------------------------------------------------------------------- +# Entry points +# --------------------------------------------------------------------------- + def main() -> None: run_extractor(EXTRACTOR_NAME, extract) +def main_recheck() -> None: + run_extractor(RECHECK_EXTRACTOR_NAME, extract_recheck) + + if __name__ == "__main__": main() diff --git a/extract/padelnomics_extract/src/padelnomics_extract/proxy.py b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py new file mode 100644 index 0000000..b49cf85 --- /dev/null +++ b/extract/padelnomics_extract/src/padelnomics_extract/proxy.py @@ -0,0 +1,57 @@ +"""Optional proxy rotation for parallel HTTP fetching. + +Proxies are configured via the PROXY_URLS environment variable (comma-separated). +When unset, all functions return None/no-op — extractors fall back to direct requests. + +Two routing modes: + round-robin — distribute requests evenly across proxies (default) + sticky — same key always maps to same proxy (for session-tracked sites) +""" + +import itertools +import os +import threading + + +def load_proxy_urls() -> list[str]: + """Read PROXY_URLS env var (comma-separated). Returns [] if unset. + + Format: http://user:pass@host:port or socks5://host:port + """ + raw = os.environ.get("PROXY_URLS", "") + urls = [u.strip() for u in raw.split(",") if u.strip()] + return urls + + +def make_round_robin_cycler(proxy_urls: list[str]): + """Thread-safe round-robin proxy cycler. + + Returns a callable: next_proxy() -> str | None + Returns None-returning callable if no proxies configured. + """ + if not proxy_urls: + return lambda: None + + cycle = itertools.cycle(proxy_urls) + lock = threading.Lock() + + def next_proxy() -> str: + with lock: + return next(cycle) + + return next_proxy + + +def make_sticky_selector(proxy_urls: list[str]): + """Consistent-hash proxy selector — same key always maps to same proxy. + + Use when the target site tracks sessions by IP (e.g. Cloudflare). + Returns a callable: select_proxy(key: str) -> str | None + """ + if not proxy_urls: + return lambda key: None + + def select_proxy(key: str) -> str: + return proxy_urls[hash(key) % len(proxy_urls)] + + return select_proxy diff --git a/infra/supervisor/padelnomics-supervisor.service b/infra/supervisor/padelnomics-supervisor.service index ac05293..169fff5 100644 --- a/infra/supervisor/padelnomics-supervisor.service +++ b/infra/supervisor/padelnomics-supervisor.service @@ -7,10 +7,11 @@ Wants=network-online.target Type=simple User=root WorkingDirectory=/opt/padelnomics -ExecStart=/opt/padelnomics/infra/supervisor/supervisor.sh +ExecStart=/bin/sh -c 'exec uv run python src/padelnomics/supervisor.py' Restart=always RestartSec=10 EnvironmentFile=/opt/padelnomics/.env +Environment=PATH=/root/.local/bin:/usr/local/bin:/usr/bin:/bin Environment=LANDING_DIR=/data/padelnomics/landing Environment=DUCKDB_PATH=/data/padelnomics/lakehouse.duckdb Environment=SERVING_DUCKDB_PATH=/data/padelnomics/analytics.duckdb diff --git a/infra/supervisor/workflows.toml b/infra/supervisor/workflows.toml new file mode 100644 index 0000000..fc2e9da --- /dev/null +++ b/infra/supervisor/workflows.toml @@ -0,0 +1,33 @@ +# Workflow registry — the supervisor reads this file on every tick. +# To add a new extractor: add a [section] here and create the Python module. +# +# Fields: +# module — Python module path (must have a main() function) +# schedule — named preset ("hourly", "daily", "weekly", "monthly") +# or raw cron expression (e.g. "0 6-23 * * *") +# entry — optional: function name if not "main" (default: "main") +# depends_on — optional: list of workflow names that must run first +# proxy_mode — optional: "round-robin" (default) or "sticky" + +[overpass] +module = "padelnomics_extract.overpass" +schedule = "monthly" + +[eurostat] +module = "padelnomics_extract.eurostat" +schedule = "monthly" + +[playtomic_tenants] +module = "padelnomics_extract.playtomic_tenants" +schedule = "weekly" + +[playtomic_availability] +module = "padelnomics_extract.playtomic_availability" +schedule = "daily" +depends_on = ["playtomic_tenants"] + +[playtomic_recheck] +module = "padelnomics_extract.playtomic_availability" +entry = "main_recheck" +schedule = "0 6-23 * * *" +depends_on = ["playtomic_availability"] diff --git a/src/padelnomics/__init__.py b/src/padelnomics/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/padelnomics/supervisor.py b/src/padelnomics/supervisor.py new file mode 100644 index 0000000..e864f05 --- /dev/null +++ b/src/padelnomics/supervisor.py @@ -0,0 +1,416 @@ +"""Padelnomics Supervisor — schedule-aware pipeline orchestration. + +Replaces supervisor.sh with a Python supervisor that reads a TOML workflow +registry, runs extractors on cron-based schedules (with dependency ordering +and parallel execution), then runs SQLMesh transform + export. + +Crash safety: the main loop catches all exceptions and backs off, matching +the TigerBeetle CFO supervisor pattern. Combined with systemd Restart=always, +the supervisor is effectively unkillable. + +Usage: + # Run the supervisor loop (production) + LANDING_DIR=data/landing uv run python src/padelnomics/supervisor.py + + # Show workflow status + LANDING_DIR=data/landing uv run python src/padelnomics/supervisor.py status +""" + +import importlib +import logging +import os +import subprocess +import sys +import time +import tomllib +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import UTC, datetime +from pathlib import Path + +from croniter import croniter + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +TICK_INTERVAL_SECONDS = 60 +BACKOFF_SECONDS = 600 # 10 min on tick failure (matches shell version) +SUBPROCESS_TIMEOUT_SECONDS = 14400 # 4 hours max per subprocess +REPO_DIR = Path(os.getenv("REPO_DIR", "/opt/padelnomics")) +LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing")) +DUCKDB_PATH = os.getenv("DUCKDB_PATH", "data/lakehouse.duckdb") +SERVING_DUCKDB_PATH = os.getenv("SERVING_DUCKDB_PATH", "analytics.duckdb") +ALERT_WEBHOOK_URL = os.getenv("ALERT_WEBHOOK_URL", "") +WORKFLOWS_PATH = Path(os.getenv("WORKFLOWS_PATH", "infra/supervisor/workflows.toml")) + +NAMED_SCHEDULES = { + "hourly": "0 * * * *", + "daily": "0 5 * * *", + "weekly": "0 3 * * 1", + "monthly": "0 4 1 * *", +} + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger("padelnomics.supervisor") + + +# --------------------------------------------------------------------------- +# State DB helpers (reuse extraction state DB) +# --------------------------------------------------------------------------- + +def _open_state_db(): + """Open the extraction state DB. Reuses the same .state.sqlite as extractors.""" + # Import here to avoid circular deps at module level + from padelnomics_extract.utils import open_state_db + + return open_state_db(LANDING_DIR) + + +def _get_last_success_time(conn, workflow_name: str) -> datetime | None: + """Return the finish time of the last successful run, or None.""" + row = conn.execute( + "SELECT MAX(finished_at) AS t FROM extraction_runs " + "WHERE extractor = ? AND status = 'success'", + (workflow_name,), + ).fetchone() + if not row or not row["t"]: + return None + return datetime.fromisoformat(row["t"]).replace(tzinfo=UTC) + + +# --------------------------------------------------------------------------- +# Workflow loading + scheduling +# --------------------------------------------------------------------------- + +def load_workflows(path: Path) -> list[dict]: + """Load workflow definitions from TOML file.""" + assert path.exists(), f"Workflows file not found: {path}" + with open(path, "rb") as f: + data = tomllib.load(f) + + workflows = [] + for name, cfg in data.items(): + assert "module" in cfg, f"Workflow '{name}' missing 'module'" + assert "schedule" in cfg, f"Workflow '{name}' missing 'schedule'" + workflows.append({ + "name": name, + "module": cfg["module"], + "entry": cfg.get("entry", "main"), + "schedule": cfg["schedule"], + "depends_on": cfg.get("depends_on", []), + "proxy_mode": cfg.get("proxy_mode", "round-robin"), + }) + return workflows + + +def resolve_schedule(schedule: str) -> str: + """Resolve a named schedule to a cron expression, or pass through raw cron.""" + return NAMED_SCHEDULES.get(schedule, schedule) + + +def is_due(conn, workflow: dict) -> bool: + """Check if the most recent cron trigger hasn't been served yet.""" + cron_expr = resolve_schedule(workflow["schedule"]) + assert croniter.is_valid(cron_expr), f"Invalid cron: {cron_expr} for {workflow['name']}" + + last_success = _get_last_success_time(conn, workflow["name"]) + if last_success is None: + return True # never ran + + now_naive = datetime.now(UTC).replace(tzinfo=None) + prev_trigger = croniter(cron_expr, now_naive).get_prev(datetime).replace(tzinfo=UTC) + return last_success < prev_trigger + + +# --------------------------------------------------------------------------- +# Topological ordering +# --------------------------------------------------------------------------- + +def topological_waves(workflows: list[dict]) -> list[list[dict]]: + """Group workflows into dependency waves for parallel execution. + + Wave 0: no deps. Wave 1: depends only on wave 0. Etc. + Workflows whose dependencies aren't in the 'due' set are treated as having no deps. + """ + name_to_wf = {w["name"]: w for w in workflows} + due_names = set(name_to_wf.keys()) + + # Build in-degree map (only count deps that are in the due set) + in_degree: dict[str, int] = {} + dependents: dict[str, list[str]] = defaultdict(list) + for w in workflows: + deps_in_scope = [d for d in w["depends_on"] if d in due_names] + in_degree[w["name"]] = len(deps_in_scope) + for d in deps_in_scope: + dependents[d].append(w["name"]) + + waves = [] + remaining = set(due_names) + max_iterations = len(workflows) + 1 # safety bound + + for _ in range(max_iterations): + if not remaining: + break + # Wave = all workflows with in_degree 0 + wave = [name_to_wf[n] for n in remaining if in_degree[n] == 0] + assert wave, f"Circular dependency detected among: {remaining}" + waves.append(wave) + for w in wave: + remaining.discard(w["name"]) + for dep in dependents[w["name"]]: + if dep in remaining: + in_degree[dep] -= 1 + + return waves + + +# --------------------------------------------------------------------------- +# Workflow execution +# --------------------------------------------------------------------------- + +def run_workflow(conn, workflow: dict) -> None: + """Run a single workflow by importing its module and calling the entry function.""" + module_name = workflow["module"] + entry_name = workflow["entry"] + + logger.info("Running workflow: %s (%s.%s)", workflow["name"], module_name, entry_name) + + try: + module = importlib.import_module(module_name) + entry_fn = getattr(module, entry_name) + entry_fn() + logger.info("Workflow %s completed successfully", workflow["name"]) + except Exception: + logger.exception("Workflow %s failed", workflow["name"]) + send_alert(f"Workflow '{workflow['name']}' failed") + raise + + +def run_due_workflows(conn, workflows: list[dict]) -> bool: + """Run all due workflows. Independent ones run in parallel. Returns True if any ran.""" + due = [w for w in workflows if is_due(conn, w)] + if not due: + logger.info("No workflows due") + return False + + logger.info("Due workflows: %s", [w["name"] for w in due]) + waves = topological_waves(due) + + for i, wave in enumerate(waves): + wave_names = [w["name"] for w in wave] + logger.info("Wave %d: %s", i, wave_names) + + if len(wave) == 1: + try: + run_workflow(conn, wave[0]) + except Exception: + pass # already logged in run_workflow + else: + with ThreadPoolExecutor(max_workers=len(wave)) as pool: + futures = {pool.submit(run_workflow, conn, w): w for w in wave} + for future in as_completed(futures): + try: + future.result() + except Exception: + pass # already logged in run_workflow + + return True + + +# --------------------------------------------------------------------------- +# Transform + Export + Deploy +# --------------------------------------------------------------------------- + +def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> bool: + """Run a shell command. Returns True on success.""" + logger.info("Shell: %s", cmd) + result = subprocess.run( + cmd, shell=True, capture_output=True, text=True, timeout=timeout_seconds + ) + if result.returncode != 0: + logger.error("Shell failed (rc=%d): %s\nstdout: %s\nstderr: %s", + result.returncode, cmd, result.stdout[-500:], result.stderr[-500:]) + return False + return True + + +def run_transform() -> None: + """Run SQLMesh — it evaluates model staleness internally.""" + logger.info("Running SQLMesh transform") + ok = run_shell( + f"uv run sqlmesh -p transform/sqlmesh_padelnomics run", + ) + if not ok: + send_alert("SQLMesh transform failed") + + +def run_export() -> None: + """Export serving tables to analytics.duckdb.""" + logger.info("Exporting serving tables") + ok = run_shell( + f"DUCKDB_PATH={DUCKDB_PATH} SERVING_DUCKDB_PATH={SERVING_DUCKDB_PATH} " + f"uv run python src/padelnomics/export_serving.py" + ) + if not ok: + send_alert("Serving export failed") + + +def web_code_changed() -> bool: + """Check if web app code changed since last deploy (after git pull).""" + result = subprocess.run( + ["git", "diff", "--name-only", "HEAD~1", "HEAD", "--", "web/", "Dockerfile"], + capture_output=True, text=True, timeout=30, + ) + return bool(result.stdout.strip()) + + +def git_pull_and_sync() -> None: + """Pull latest code and sync dependencies.""" + run_shell("git fetch origin master") + run_shell("git switch --discard-changes --detach origin/master") + run_shell("uv sync --all-packages") + + +# --------------------------------------------------------------------------- +# Alerting +# --------------------------------------------------------------------------- + +def send_alert(message: str) -> None: + """Send failure alert via webhook (ntfy.sh / Slack / Telegram).""" + if not ALERT_WEBHOOK_URL: + return + timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC") + try: + subprocess.run( + ["curl", "-s", "-d", f"[{timestamp}] {message}", ALERT_WEBHOOK_URL], + timeout=10, capture_output=True, + ) + except Exception: + logger.exception("Failed to send alert") + + +# --------------------------------------------------------------------------- +# Main loop +# --------------------------------------------------------------------------- + +def tick() -> None: + """One cycle: check schedules, run what's due, transform, export.""" + workflows = load_workflows(WORKFLOWS_PATH) + conn = _open_state_db() + + try: + # Git pull + sync (production only) + if os.getenv("SUPERVISOR_GIT_PULL"): + git_pull_and_sync() + + # Run due extractors + run_due_workflows(conn, workflows) + + # SQLMesh always runs (evaluates staleness internally) + run_transform() + + # Export serving tables + run_export() + + # Deploy web app if code changed + if os.getenv("SUPERVISOR_GIT_PULL") and web_code_changed(): + logger.info("Web code changed — deploying") + run_shell("./deploy.sh") + finally: + conn.close() + + +def supervisor_loop() -> None: + """Infinite supervisor loop — never exits unless killed.""" + logger.info("Supervisor starting (tick interval: %ds)", TICK_INTERVAL_SECONDS) + logger.info("Workflows: %s", WORKFLOWS_PATH) + logger.info("Landing dir: %s", LANDING_DIR) + + while True: + try: + tick() + except KeyboardInterrupt: + logger.info("Supervisor stopped (KeyboardInterrupt)") + break + except Exception: + logger.exception("Supervisor tick failed — backing off %ds", BACKOFF_SECONDS) + send_alert("Supervisor tick failed") + time.sleep(BACKOFF_SECONDS) + else: + time.sleep(TICK_INTERVAL_SECONDS) + + +# --------------------------------------------------------------------------- +# Status CLI +# --------------------------------------------------------------------------- + +def print_status() -> None: + """Print workflow status table.""" + workflows = load_workflows(WORKFLOWS_PATH) + conn = _open_state_db() + + now = datetime.now(UTC) + + # Header + print(f"{'Workflow':<28} {'Schedule':<18} {'Last Run':<20} {'Status':<8} {'Next'}") + print(f"{'─' * 28} {'─' * 18} {'─' * 20} {'─' * 8} {'─' * 12}") + + for w in workflows: + last_success = _get_last_success_time(conn, w["name"]) + cron_expr = resolve_schedule(w["schedule"]) + + # Last run info + if last_success: + last_str = last_success.strftime("%Y-%m-%d %H:%M") + status = "ok" + else: + last_str = "never" + status = "pending" + + # Last failure check + row = conn.execute( + "SELECT MAX(finished_at) AS t FROM extraction_runs " + "WHERE extractor = ? AND status = 'failed'", + (w["name"],), + ).fetchone() + if row and row["t"]: + last_fail = datetime.fromisoformat(row["t"]).replace(tzinfo=UTC) + if last_success is None or last_fail > last_success: + status = "FAILED" + + # Next trigger (croniter returns naive datetimes — treat as UTC) + now_naive = now.replace(tzinfo=None) + next_trigger = croniter(cron_expr, now_naive).get_next(datetime) + delta = next_trigger - now_naive + if delta.total_seconds() < 3600: + next_str = f"in {int(delta.total_seconds() / 60)}m" + elif delta.total_seconds() < 86400: + next_str = next_trigger.strftime("%H:%M") + else: + next_str = next_trigger.strftime("%b %d") + + schedule_display = w["schedule"] if w["schedule"] in NAMED_SCHEDULES else cron_expr + print(f"{w['name']:<28} {schedule_display:<18} {last_str:<20} {status:<8} {next_str}") + + conn.close() + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def main() -> None: + if len(sys.argv) > 1 and sys.argv[1] == "status": + print_status() + else: + supervisor_loop() + + +if __name__ == "__main__": + main() diff --git a/transform/sqlmesh_padelnomics/models/foundation/fct_daily_availability.sql b/transform/sqlmesh_padelnomics/models/foundation/fct_daily_availability.sql index 4a27e61..c211290 100644 --- a/transform/sqlmesh_padelnomics/models/foundation/fct_daily_availability.sql +++ b/transform/sqlmesh_padelnomics/models/foundation/fct_daily_availability.sql @@ -3,6 +3,10 @@ -- per-day statistics, then calculates occupancy by comparing available hours -- against total capacity from fct_venue_capacity. -- +-- Recheck-aware occupancy: for each (tenant, resource, slot_start_time), +-- prefer the latest snapshot (recheck > morning). A slot present in morning +-- but absent in the recheck = booked between snapshots → more accurate occupancy. +-- -- Occupancy = 1 - (available_court_hours / capacity_court_hours_per_day) -- Revenue estimate = booked_court_hours × avg_price_of_available_slots -- @@ -15,14 +19,31 @@ MODEL ( grain (snapshot_date, tenant_id) ); -WITH slot_agg AS ( +-- Prefer the latest snapshot for each slot: +-- If a recheck exists for a (date, tenant, resource, start_time), use it. +-- Otherwise fall back to the morning snapshot. +WITH ranked_slots AS ( + SELECT + a.*, + ROW_NUMBER() OVER ( + PARTITION BY a.snapshot_date, a.tenant_id, a.resource_id, a.slot_start_time + ORDER BY + CASE WHEN a.snapshot_type = 'recheck' THEN 1 ELSE 2 END, + a.captured_at_utc DESC + ) AS rn + FROM staging.stg_playtomic_availability a + WHERE a.price_amount IS NOT NULL + AND a.price_amount > 0 +), +latest_slots AS ( + SELECT * FROM ranked_slots WHERE rn = 1 +), +slot_agg AS ( SELECT a.snapshot_date, a.tenant_id, - -- Slot counts: each row is one 60-min available slot on one court COUNT(*) AS available_slot_count, COUNT(DISTINCT a.resource_id) AS courts_with_availability, - -- Available (unbooked) court-hours: slots are on 30-min increments for 60-min bookings -- Each available start_time represents a 60-min bookable window ROUND(COUNT(*) * 1.0, 2) AS available_court_hours, -- Pricing stats (60-min slots only) @@ -42,9 +63,7 @@ WITH slot_agg AS ( ), 2) AS median_price_offpeak, MAX(a.price_currency) AS price_currency, MAX(a.captured_at_utc) AS captured_at_utc - FROM staging.stg_playtomic_availability a - WHERE a.price_amount IS NOT NULL - AND a.price_amount > 0 + FROM latest_slots a GROUP BY a.snapshot_date, a.tenant_id ) SELECT diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql index f6dd40b..65ecd85 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql @@ -2,28 +2,30 @@ -- One row per available 60-minute booking slot per court per venue per day. -- "Available" = the slot was NOT booked at capture time. Missing slots = booked. -- --- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit). --- The API returns 60/90/120-min variants per start_time — filtering to 60 avoids --- double-counting the same time window. +-- Reads BOTH morning snapshots and recheck files: +-- Morning: availability_{date}.json.gz → snapshot_type = 'morning' +-- Recheck: availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck' -- +-- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit). -- Price parsed from strings like "14.56 EUR" or "48 GBP". -- -- Requires: at least one availability file in the landing zone. -- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz) -- with empty venues[] ensures this model runs before real data arrives. --- --- Source: data/landing/playtomic/{year}/{month}/availability_{date}.json.gz --- Format: {date, captured_at_utc, venues: [{tenant_id, slots: [{resource_id, start_date, slots: [...]}]}]} MODEL ( name staging.stg_playtomic_availability, kind FULL, cron '@daily', - grain (snapshot_date, tenant_id, resource_id, slot_start_time) + grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc) ); -WITH raw_files AS ( - SELECT * +-- Morning snapshots (filename does NOT contain '_recheck_') +WITH morning_files AS ( + SELECT + *, + 'morning' AS snapshot_type, + NULL::INTEGER AS recheck_hour FROM read_json( @LANDING_DIR || '/playtomic/*/*/availability_*.json.gz', format = 'auto', @@ -31,24 +33,57 @@ WITH raw_files AS ( date: 'VARCHAR', captured_at_utc: 'VARCHAR', venues: 'JSON[]' - } + }, + filename = true + ) + WHERE filename NOT LIKE '%_recheck_%' + AND venues IS NOT NULL + AND json_array_length(venues) > 0 +), +-- Recheck snapshots (filename contains '_recheck_') +-- Use TRY_CAST on a regex-extracted hour to get the recheck_hour. +-- If no recheck files exist yet, this CTE produces zero rows (safe). +recheck_files AS ( + SELECT + *, + 'recheck' AS snapshot_type, + TRY_CAST( + regexp_extract(filename, '_recheck_(\d+)', 1) AS INTEGER + ) AS recheck_hour + FROM read_json( + @LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz', + format = 'auto', + columns = { + date: 'VARCHAR', + captured_at_utc: 'VARCHAR', + venues: 'JSON[]' + }, + filename = true ) WHERE venues IS NOT NULL AND json_array_length(venues) > 0 ), +all_files AS ( + SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM morning_files + UNION ALL + SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM recheck_files +), raw_venues AS ( SELECT - rf.date AS snapshot_date, - rf.captured_at_utc, + af.date AS snapshot_date, + af.captured_at_utc, + af.snapshot_type, + af.recheck_hour, venue_json - FROM raw_files rf, - LATERAL UNNEST(rf.venues) AS t(venue_json) + FROM all_files af, + LATERAL UNNEST(af.venues) AS t(venue_json) ), --- Each venue has: {tenant_id, slots: [{resource_id, start_date, slots: [...]}]} raw_resources AS ( SELECT rv.snapshot_date, rv.captured_at_utc, + rv.snapshot_type, + rv.recheck_hour, rv.venue_json ->> 'tenant_id' AS tenant_id, resource_json FROM raw_venues rv, @@ -56,11 +91,12 @@ raw_resources AS ( from_json(rv.venue_json -> 'slots', '["JSON"]') ) AS t(resource_json) ), --- Each resource has: {resource_id, start_date, slots: [{start_time, duration, price}]} raw_slots AS ( SELECT rr.snapshot_date, rr.captured_at_utc, + rr.snapshot_type, + rr.recheck_hour, rr.tenant_id, rr.resource_json ->> 'resource_id' AS resource_id, slot_json @@ -75,12 +111,12 @@ SELECT resource_id, slot_json ->> 'start_time' AS slot_start_time, TRY_CAST(slot_json ->> 'duration' AS INTEGER) AS duration_minutes, - -- Parse "14.56 EUR" → 14.56 TRY_CAST( SPLIT_PART(slot_json ->> 'price', ' ', 1) AS DOUBLE ) AS price_amount, - -- Parse "14.56 EUR" → EUR SPLIT_PART(slot_json ->> 'price', ' ', 2) AS price_currency, + snapshot_type, + recheck_hour, captured_at_utc FROM raw_slots WHERE resource_id IS NOT NULL diff --git a/uv.lock b/uv.lock index dfc7159..663c21c 100644 --- a/uv.lock +++ b/uv.lock @@ -1151,6 +1151,7 @@ version = "0.1.0" source = { editable = "web" } dependencies = [ { name = "aiosqlite" }, + { name = "croniter" }, { name = "duckdb" }, { name = "hypercorn" }, { name = "itsdangerous" }, @@ -1168,6 +1169,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "aiosqlite", specifier = ">=0.19.0" }, + { name = "croniter", specifier = ">=6.0.0" }, { name = "duckdb", specifier = ">=1.0.0" }, { name = "hypercorn", specifier = ">=0.17.0" }, { name = "itsdangerous", specifier = ">=2.1.0" }, diff --git a/web/pyproject.toml b/web/pyproject.toml index 3738fc4..db3a321 100644 --- a/web/pyproject.toml +++ b/web/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "duckdb>=1.0.0", "pyarrow>=23.0.1", "pyyaml>=6.0", + "croniter>=6.0.0", ] [build-system] diff --git a/web/src/padelnomics/admin/routes.py b/web/src/padelnomics/admin/routes.py index 8bcf7c8..7bb8e74 100644 --- a/web/src/padelnomics/admin/routes.py +++ b/web/src/padelnomics/admin/routes.py @@ -807,6 +807,46 @@ async def supplier_tier(supplier_id: int): return redirect(url_for("admin.supplier_detail", supplier_id=supplier_id)) +# ============================================================================= +# Feature Flags +# ============================================================================= + +@bp.route("/flags") +@role_required("admin") +async def flags(): + """Feature flags management.""" + flag_list = await fetch_all("SELECT * FROM feature_flags ORDER BY name") + return await render_template("admin/flags.html", flags=flag_list, admin_page="flags") + + +@bp.route("/flags/toggle", methods=["POST"]) +@role_required("admin") +@csrf_protect +async def flag_toggle(): + """Toggle a feature flag on/off.""" + form = await request.form + flag_name = form.get("name", "").strip() + if not flag_name: + await flash("Flag name required.", "error") + return redirect(url_for("admin.flags")) + + # Get current state and flip it + row = await fetch_one("SELECT enabled FROM feature_flags WHERE name = ?", (flag_name,)) + if not row: + await flash(f"Flag '{flag_name}' not found.", "error") + return redirect(url_for("admin.flags")) + + new_enabled = 0 if row["enabled"] else 1 + now = datetime.utcnow().isoformat() + await execute( + "UPDATE feature_flags SET enabled = ?, updated_at = ? WHERE name = ?", + (new_enabled, now, flag_name), + ) + state = "enabled" if new_enabled else "disabled" + await flash(f"Flag '{flag_name}' {state}.", "success") + return redirect(url_for("admin.flags")) + + # ============================================================================= # Feedback Management # ============================================================================= diff --git a/web/src/padelnomics/admin/templates/admin/base_admin.html b/web/src/padelnomics/admin/templates/admin/base_admin.html index d826e97..6c277e9 100644 --- a/web/src/padelnomics/admin/templates/admin/base_admin.html +++ b/web/src/padelnomics/admin/templates/admin/base_admin.html @@ -87,6 +87,10 @@
+ + + Flags + Tasks diff --git a/web/src/padelnomics/admin/templates/admin/flags.html b/web/src/padelnomics/admin/templates/admin/flags.html new file mode 100644 index 0000000..818d051 --- /dev/null +++ b/web/src/padelnomics/admin/templates/admin/flags.html @@ -0,0 +1,72 @@ +{% extends "admin/base_admin.html" %} + +{% block admin_head %} + +{% endblock %} + +{% block admin_content %} ++ Toggle features on/off without redeployment. Changes take effect immediately. +
+ +| Flag | +Description | +Status | +Last Updated | ++ |
|---|---|---|---|---|
| {{ f.name }} | +{{ f.description or '—' }} | ++ {% if f.enabled %} + Enabled + {% else %} + Disabled + {% endif %} + | +{{ f.updated_at or '—' }} | ++ + | +