Merge branch 'worktree-supervisor-flags'

Python supervisor + DB-backed feature flags

- supervisor.py replaces supervisor.sh (topological wave scheduling, croniter)
- workflows.toml workflow registry (5 extractors, cron presets, depends_on)
- proxy.py round-robin + sticky proxy rotation via PROXY_URLS
- Feature flags: migration 0019, is_flag_enabled(), feature_gate() decorator
- Admin /admin/flags UI with toggle (admin-only)
- lead_unlock gate on unlock_lead route
- 59 new tests (test_supervisor.py + test_feature_flags.py)
- Fix is_flag_enabled bug (fetch_one instead of execute_fetchone)

# Conflicts:
#	CHANGELOG.md
#	web/pyproject.toml
This commit is contained in:
Deeman
2026-02-23 15:29:43 +01:00
29 changed files with 1923 additions and 163 deletions

View File

@@ -7,6 +7,26 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased] ## [Unreleased]
### Added ### Added
- **Python supervisor** (`src/padelnomics/supervisor.py`) — replaces `supervisor.sh`;
reads `infra/supervisor/workflows.toml` (module, schedule, entry, depends_on,
proxy_mode); runs due workflows in topological waves (parallel within each wave);
croniter-based `is_due()` check; systemd service updated to use `uv run python`
- **`workflows.toml` workflow registry** — 5 extractors registered: overpass,
eurostat, playtomic_tenants, playtomic_availability, playtomic_prices; cron
presets: hourly/daily/weekly/monthly; `playtomic_availability` depends on
`playtomic_tenants`
- **`proxy.py` proxy rotation** (`extract/padelnomics_extract/proxy.py`) — reads
`PROXY_URLS` env var; `make_round_robin_cycler()` for thread-safe round-robin;
`make_sticky_selector()` for consistent per-tenant proxy assignment (hash-based)
- **DB-backed feature flags** — `feature_flags` table (migration 0019);
`is_flag_enabled(name, default)` helper; `feature_gate(flag, template)` decorator
replaces `WAITLIST_MODE`/`waitlist_gate`; 5 flags seeded: `markets` (on),
`payments`, `planner_export`, `supplier_signup`, `lead_unlock` (all off)
- **Admin feature flags UI** — `/admin/flags` lists all flags with toggle;
`POST /admin/flags/toggle` flips enabled bit; requires admin role; flash message
on unknown flag
- **`lead_unlock` gate** — `unlock_lead` route returns HTTP 403 when `lead_unlock`
flag is disabled
- **SEO/GEO admin hub** — syncs search performance data from Google Search Console (service - **SEO/GEO admin hub** — syncs search performance data from Google Search Console (service
account auth), Bing Webmaster Tools (API key), and Umami (bearer token) into 3 new SQLite account auth), Bing Webmaster Tools (API key), and Umami (bearer token) into 3 new SQLite
tables (`seo_search_metrics`, `seo_analytics_metrics`, `seo_sync_log`); daily background tables (`seo_search_metrics`, `seo_analytics_metrics`, `seo_sync_log`); daily background

View File

@@ -59,7 +59,9 @@
- [x] Boost purchases (logo, highlight, verified, card color, sticky week/month) - [x] Boost purchases (logo, highlight, verified, card color, sticky week/month)
- [x] Credit pack purchases (25/50/100/250) - [x] Credit pack purchases (25/50/100/250)
- [x] Supplier subscription tiers (Basic free / Growth €149 / Pro €399, monthly + annual) - [x] Supplier subscription tiers (Basic free / Growth €149 / Pro €399, monthly + annual)
- [x] `WAITLIST_MODE` toggle — gates supplier signup + export on GET (default: false) - [x] **Feature flags** (DB-backed, migration 0019) — `is_flag_enabled()` + `feature_gate()` decorator replace `WAITLIST_MODE`; 5 flags (markets, payments, planner_export, supplier_signup, lead_unlock); admin UI at `/admin/flags` with toggle
- [x] **Python supervisor** (`src/padelnomics/supervisor.py`) + `workflows.toml` — replaces `supervisor.sh`; topological wave scheduling; croniter-based `is_due()`; systemd service updated
- [x] **Proxy rotation** (`extract/padelnomics_extract/proxy.py`) — round-robin + sticky hash-based selector via `PROXY_URLS` env var
- [x] Resend email integration (transactional: magic link, welcome, quote verify, lead forward, enquiry) - [x] Resend email integration (transactional: magic link, welcome, quote verify, lead forward, enquiry)
- [x] Auto-create Resend audiences per blueprint (waitlist, planner nurture) - [x] Auto-create Resend audiences per blueprint (waitlist, planner nurture)

View File

@@ -14,6 +14,7 @@ extract-overpass = "padelnomics_extract.overpass:main"
extract-eurostat = "padelnomics_extract.eurostat:main" extract-eurostat = "padelnomics_extract.eurostat:main"
extract-playtomic-tenants = "padelnomics_extract.playtomic_tenants:main" extract-playtomic-tenants = "padelnomics_extract.playtomic_tenants:main"
extract-playtomic-availability = "padelnomics_extract.playtomic_availability:main" extract-playtomic-availability = "padelnomics_extract.playtomic_availability:main"
extract-playtomic-recheck = "padelnomics_extract.playtomic_availability:main_recheck"
[build-system] [build-system]
requires = ["hatchling"] requires = ["hatchling"]

View File

@@ -19,6 +19,13 @@ LANDING_DIR = Path(os.environ.get("LANDING_DIR", "data/landing"))
HTTP_TIMEOUT_SECONDS = 30 HTTP_TIMEOUT_SECONDS = 30
OVERPASS_TIMEOUT_SECONDS = 90 # Overpass can be slow on global queries OVERPASS_TIMEOUT_SECONDS = 90 # Overpass can be slow on global queries
# Realistic browser User-Agent — avoids bot detection on all extractors
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
def setup_logging(name: str) -> logging.Logger: def setup_logging(name: str) -> logging.Logger:
"""Configure and return a logger for the given extractor module.""" """Configure and return a logger for the given extractor module."""
@@ -50,6 +57,7 @@ def run_extractor(
try: try:
with niquests.Session() as session: with niquests.Session() as session:
session.headers["User-Agent"] = USER_AGENT
result = func(LANDING_DIR, year_month, conn, session) result = func(LANDING_DIR, year_month, conn, session)
assert isinstance(result, dict), f"extractor must return a dict, got {type(result)}" assert isinstance(result, dict), f"extractor must return a dict, got {type(result)}"

View File

@@ -5,33 +5,51 @@ unauthenticated /v1/availability endpoint for each venue's next-day slots.
This is the highest-value source: daily snapshots enable occupancy rate This is the highest-value source: daily snapshots enable occupancy rate
estimation, pricing benchmarking, and demand signal detection. estimation, pricing benchmarking, and demand signal detection.
API constraint: max 25-hour window per request (see docs/data-sources-inventory.md §2.1). Parallel mode: set EXTRACT_WORKERS=N and PROXY_URLS=... to fetch N venues
Rate: 1 req / 2 s (conservative, unauthenticated endpoint). concurrently (one proxy per worker). Without proxies, runs single-threaded.
Recheck mode: re-queries venues with slots starting within the next 90 minutes.
Writes a separate recheck file for more accurate occupancy measurement.
Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.json.gz Landing: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}.json.gz
Recheck: {LANDING_DIR}/playtomic/{year}/{month}/availability_{date}_recheck_{HH}.json.gz
""" """
import gzip import gzip
import json import json
import os
import sqlite3 import sqlite3
import threading
import time import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import UTC, datetime, timedelta from datetime import UTC, datetime, timedelta
from pathlib import Path from pathlib import Path
import niquests import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging
from .proxy import load_proxy_urls, make_round_robin_cycler
from .utils import get_last_cursor, landing_path, write_gzip_atomic from .utils import get_last_cursor, landing_path, write_gzip_atomic
logger = setup_logging("padelnomics.extract.playtomic_availability") logger = setup_logging("padelnomics.extract.playtomic_availability")
EXTRACTOR_NAME = "playtomic_availability" EXTRACTOR_NAME = "playtomic_availability"
RECHECK_EXTRACTOR_NAME = "playtomic_recheck"
AVAILABILITY_URL = "https://api.playtomic.io/v1/availability" AVAILABILITY_URL = "https://api.playtomic.io/v1/availability"
THROTTLE_SECONDS = 1 THROTTLE_SECONDS = 1
MAX_VENUES_PER_RUN = 10_000 MAX_VENUES_PER_RUN = 20_000
MAX_RETRIES_PER_VENUE = 2 MAX_RETRIES_PER_VENUE = 2
MAX_WORKERS = int(os.environ.get("EXTRACT_WORKERS", "1"))
RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "90"))
# Thread-local storage for per-worker sessions
_thread_local = threading.local()
# ---------------------------------------------------------------------------
# Tenant ID loading
# ---------------------------------------------------------------------------
def _load_tenant_ids(landing_dir: Path) -> list[str]: def _load_tenant_ids(landing_dir: Path) -> list[str]:
"""Read tenant IDs from the most recent tenants.json.gz file.""" """Read tenant IDs from the most recent tenants.json.gz file."""
@@ -39,7 +57,6 @@ def _load_tenant_ids(landing_dir: Path) -> list[str]:
if not playtomic_dir.exists(): if not playtomic_dir.exists():
return [] return []
# Find the most recent tenants.json.gz across all year/month dirs
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True) tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True)
if not tenant_files: if not tenant_files:
return [] return []
@@ -65,12 +82,10 @@ def _parse_resume_cursor(cursor: str | None, target_date: str) -> int:
"""Parse cursor_value to find resume index. Returns 0 if no valid cursor.""" """Parse cursor_value to find resume index. Returns 0 if no valid cursor."""
if not cursor: if not cursor:
return 0 return 0
# cursor format: "{date}:{index}"
parts = cursor.split(":", 1) parts = cursor.split(":", 1)
if len(parts) != 2: if len(parts) != 2:
return 0 return 0
cursor_date, cursor_index = parts cursor_date, cursor_index = parts
# Only resume if cursor is for today's target date
if cursor_date != target_date: if cursor_date != target_date:
return 0 return 0
try: try:
@@ -79,6 +94,125 @@ def _parse_resume_cursor(cursor: str | None, target_date: str) -> int:
return 0 return 0
# ---------------------------------------------------------------------------
# Per-venue fetch (used by both serial and parallel modes)
# ---------------------------------------------------------------------------
def _get_thread_session(proxy_url: str | None) -> niquests.Session:
"""Get or create a thread-local niquests.Session with optional proxy."""
if not hasattr(_thread_local, "session") or _thread_local.session is None:
session = niquests.Session()
session.headers["User-Agent"] = USER_AGENT
if proxy_url:
session.proxies = {"http": proxy_url, "https": proxy_url}
_thread_local.session = session
return _thread_local.session
def _fetch_venue_availability(
tenant_id: str,
start_min_str: str,
start_max_str: str,
proxy_url: str | None,
) -> dict | None:
"""Fetch availability for a single venue. Returns payload dict or None on failure."""
session = _get_thread_session(proxy_url)
params = {
"sport_id": "PADEL",
"tenant_id": tenant_id,
"start_min": start_min_str,
"start_max": start_max_str,
}
for attempt in range(MAX_RETRIES_PER_VENUE + 1):
try:
resp = session.get(AVAILABILITY_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS)
if resp.status_code == 429:
wait_seconds = THROTTLE_SECONDS * (attempt + 2)
logger.warning("Rate limited on %s, waiting %ds", tenant_id, wait_seconds)
time.sleep(wait_seconds)
continue
if resp.status_code >= 500:
logger.warning(
"Server error %d for %s (attempt %d)",
resp.status_code, tenant_id, attempt + 1,
)
time.sleep(THROTTLE_SECONDS)
continue
resp.raise_for_status()
time.sleep(THROTTLE_SECONDS)
return {"tenant_id": tenant_id, "slots": resp.json()}
except niquests.exceptions.RequestException as e:
if attempt < MAX_RETRIES_PER_VENUE:
logger.warning("Request failed for %s (attempt %d): %s", tenant_id, attempt + 1, e)
time.sleep(THROTTLE_SECONDS)
else:
logger.error(
"Giving up on %s after %d attempts: %s",
tenant_id, MAX_RETRIES_PER_VENUE + 1, e,
)
return None
return None # all retries exhausted via 429/5xx
# ---------------------------------------------------------------------------
# Parallel fetch orchestrator
# ---------------------------------------------------------------------------
def _fetch_venues_parallel(
tenant_ids: list[str],
start_min_str: str,
start_max_str: str,
worker_count: int,
proxy_cycler,
) -> tuple[list[dict], int]:
"""Fetch availability for multiple venues in parallel.
Returns (venues_data, venues_errored).
"""
venues_data: list[dict] = []
venues_errored = 0
completed_count = 0
lock = threading.Lock()
def _worker(tenant_id: str) -> dict | None:
proxy_url = proxy_cycler()
return _fetch_venue_availability(tenant_id, start_min_str, start_max_str, proxy_url)
with ThreadPoolExecutor(max_workers=worker_count) as pool:
futures = {pool.submit(_worker, tid): tid for tid in tenant_ids}
for future in as_completed(futures):
result = future.result()
with lock:
completed_count += 1
if result is not None:
venues_data.append(result)
else:
venues_errored += 1
if completed_count % 500 == 0:
logger.info(
"Progress: %d/%d venues (%d errors, %d workers)",
completed_count, len(tenant_ids), venues_errored, worker_count,
)
logger.info(
"Parallel fetch complete: %d/%d venues (%d errors, %d workers)",
len(venues_data), len(tenant_ids), venues_errored, worker_count,
)
return venues_data, venues_errored
# ---------------------------------------------------------------------------
# Main extraction function
# ---------------------------------------------------------------------------
def extract( def extract(
landing_dir: Path, landing_dir: Path,
year_month: str, year_month: str,
@@ -91,7 +225,7 @@ def extract(
logger.warning("No tenant IDs found — run extract-playtomic-tenants first") logger.warning("No tenant IDs found — run extract-playtomic-tenants first")
return {"files_written": 0, "files_skipped": 0, "bytes_written": 0} return {"files_written": 0, "files_skipped": 0, "bytes_written": 0}
# Query tomorrow's slots (25-hour window starting at midnight local) # Query tomorrow's slots
tomorrow = datetime.now(UTC) + timedelta(days=1) tomorrow = datetime.now(UTC) + timedelta(days=1)
target_date = tomorrow.strftime("%Y-%m-%d") target_date = tomorrow.strftime("%Y-%m-%d")
start_min = tomorrow.replace(hour=0, minute=0, second=0, microsecond=0) start_min = tomorrow.replace(hour=0, minute=0, second=0, microsecond=0)
@@ -101,117 +235,223 @@ def extract(
dest_dir = landing_path(landing_dir, "playtomic", year, month) dest_dir = landing_path(landing_dir, "playtomic", year, month)
dest = dest_dir / f"availability_{target_date}.json.gz" dest = dest_dir / f"availability_{target_date}.json.gz"
# Check if already completed for this date
if dest.exists(): if dest.exists():
logger.info("Already have %s — skipping", dest) logger.info("Already have %s — skipping", dest)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
# Resume from last cursor if we crashed mid-run # Resume from cursor if crashed mid-run
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
resume_index = _parse_resume_cursor(last_cursor, target_date) resume_index = _parse_resume_cursor(last_cursor, target_date)
if resume_index > 0: if resume_index > 0:
logger.info("Resuming from index %d (cursor: %s)", resume_index, last_cursor) logger.info("Resuming from index %d (cursor: %s)", resume_index, last_cursor)
venues_data: list[dict] = []
venues_to_process = tenant_ids[:MAX_VENUES_PER_RUN] venues_to_process = tenant_ids[:MAX_VENUES_PER_RUN]
venues_errored = 0 if resume_index > 0:
venues_to_process = venues_to_process[resume_index:]
for i, tenant_id in enumerate(venues_to_process): # Determine parallelism
if i < resume_index: proxy_urls = load_proxy_urls()
continue worker_count = min(MAX_WORKERS, len(proxy_urls)) if proxy_urls else 1
proxy_cycler = make_round_robin_cycler(proxy_urls)
params = { start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
"sport_id": "PADEL", start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S")
"tenant_id": tenant_id,
"start_min": start_min.strftime("%Y-%m-%dT%H:%M:%S"),
"start_max": start_max.strftime("%Y-%m-%dT%H:%M:%S"),
}
for attempt in range(MAX_RETRIES_PER_VENUE + 1): if worker_count > 1:
try: logger.info("Parallel mode: %d workers, %d proxies", worker_count, len(proxy_urls))
resp = session.get(AVAILABILITY_URL, params=params, timeout=HTTP_TIMEOUT_SECONDS) venues_data, venues_errored = _fetch_venues_parallel(
venues_to_process, start_min_str, start_max_str, worker_count, proxy_cycler,
if resp.status_code == 429: )
# Rate limited — back off and retry else:
wait_seconds = THROTTLE_SECONDS * (attempt + 2) # Serial mode — same as before but uses shared fetch function
logger.warning("Rate limited on %s, waiting %ds", tenant_id, wait_seconds) logger.info("Serial mode: 1 worker, %d venues", len(venues_to_process))
time.sleep(wait_seconds) venues_data = []
continue venues_errored = 0
for i, tenant_id in enumerate(venues_to_process):
if resp.status_code >= 500: result = _fetch_venue_availability(
logger.warning( tenant_id, start_min_str, start_max_str, proxy_cycler(),
"Server error %d for %s (attempt %d)",
resp.status_code,
tenant_id,
attempt + 1,
)
time.sleep(THROTTLE_SECONDS)
continue
resp.raise_for_status()
venues_data.append({"tenant_id": tenant_id, "slots": resp.json()})
break
except niquests.exceptions.RequestException as e:
if attempt < MAX_RETRIES_PER_VENUE:
logger.warning(
"Request failed for %s (attempt %d): %s", tenant_id, attempt + 1, e
)
time.sleep(THROTTLE_SECONDS)
else:
logger.error(
"Giving up on %s after %d attempts: %s",
tenant_id,
MAX_RETRIES_PER_VENUE + 1,
e,
)
venues_errored += 1
else:
# All retries exhausted (loop completed without break)
venues_errored += 1
if (i + 1) % 100 == 0:
logger.info(
"Progress: %d/%d venues queried, %d errors",
i + 1,
len(venues_to_process),
venues_errored,
) )
if result is not None:
venues_data.append(result)
else:
venues_errored += 1
time.sleep(THROTTLE_SECONDS) if (i + 1) % 100 == 0:
logger.info(
"Progress: %d/%d venues queried, %d errors",
i + 1, len(venues_to_process), venues_errored,
)
# Write consolidated file # Write consolidated file
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
payload = json.dumps( payload = json.dumps({
{ "date": target_date,
"date": target_date, "captured_at_utc": captured_at,
"captured_at_utc": captured_at, "venue_count": len(venues_data),
"venue_count": len(venues_data), "venues_errored": venues_errored,
"venues_errored": venues_errored, "venues": venues_data,
"venues": venues_data, }).encode()
}
).encode()
bytes_written = write_gzip_atomic(dest, payload) bytes_written = write_gzip_atomic(dest, payload)
logger.info( logger.info(
"%d venues scraped (%d errors) -> %s (%s bytes)", "%d venues scraped (%d errors) -> %s (%s bytes)",
len(venues_data), len(venues_data), venues_errored, dest, f"{bytes_written:,}",
venues_errored,
dest,
f"{bytes_written:,}",
) )
return { return {
"files_written": 1, "files_written": 1,
"files_skipped": 0, "files_skipped": 0,
"bytes_written": bytes_written, "bytes_written": bytes_written,
"cursor_value": f"{target_date}:{len(venues_to_process)}", "cursor_value": f"{target_date}:{len(tenant_ids[:MAX_VENUES_PER_RUN])}",
} }
# ---------------------------------------------------------------------------
# Recheck mode — re-query venues with upcoming slots for accurate occupancy
# ---------------------------------------------------------------------------
def _load_morning_availability(landing_dir: Path, target_date: str) -> dict | None:
"""Load today's morning availability file. Returns parsed JSON or None."""
playtomic_dir = landing_dir / "playtomic"
# Search across year/month dirs for the target date
matches = list(playtomic_dir.glob(f"*/*/availability_{target_date}.json.gz"))
if not matches:
return None
with gzip.open(matches[0], "rb") as f:
return json.loads(f.read())
def _find_venues_with_upcoming_slots(
morning_data: dict, window_start: datetime, window_end: datetime
) -> list[str]:
"""Find tenant_ids that have available slots starting within the recheck window."""
tenant_ids = set()
for venue in morning_data.get("venues", []):
tid = venue.get("tenant_id")
if not tid:
continue
for resource in venue.get("slots", []):
for slot in resource.get("slots", []):
start_time_str = slot.get("start_time")
if not start_time_str:
continue
try:
# Parse "2026-02-24T17:00:00" format
slot_start = datetime.fromisoformat(start_time_str).replace(tzinfo=UTC)
if window_start <= slot_start < window_end:
tenant_ids.add(tid)
break # found one upcoming slot, no need to check more
except ValueError:
continue
if tid in tenant_ids:
break # already found upcoming slot for this venue
return sorted(tenant_ids)
def extract_recheck(
landing_dir: Path,
year_month: str,
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Re-query venues with slots starting soon for accurate occupancy data."""
now = datetime.now(UTC)
target_date = now.strftime("%Y-%m-%d")
# Also check tomorrow if it's late evening
tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%d")
morning_data = _load_morning_availability(landing_dir, target_date)
if morning_data is None:
# Try tomorrow's file (morning extraction creates it for tomorrow)
morning_data = _load_morning_availability(landing_dir, tomorrow)
if morning_data is None:
logger.info("No morning availability file found — skipping recheck")
return {"files_written": 0, "files_skipped": 0, "bytes_written": 0}
target_date = tomorrow
# Find venues with slots in the upcoming window
window_start = now
window_end = now + timedelta(minutes=RECHECK_WINDOW_MINUTES)
venues_to_recheck = _find_venues_with_upcoming_slots(morning_data, window_start, window_end)
if not venues_to_recheck:
logger.info("No venues with upcoming slots in next %d min — skipping", RECHECK_WINDOW_MINUTES)
return {"files_written": 0, "files_skipped": 0, "bytes_written": 0}
logger.info(
"Rechecking %d venues with slots starting in next %d min",
len(venues_to_recheck), RECHECK_WINDOW_MINUTES,
)
# Fetch availability for the recheck window
start_min_str = window_start.strftime("%Y-%m-%dT%H:%M:%S")
start_max_str = window_end.strftime("%Y-%m-%dT%H:%M:%S")
# Determine parallelism
proxy_urls = load_proxy_urls()
worker_count = min(MAX_WORKERS, len(proxy_urls)) if proxy_urls else 1
proxy_cycler = make_round_robin_cycler(proxy_urls)
if worker_count > 1 and len(venues_to_recheck) > 10:
venues_data, venues_errored = _fetch_venues_parallel(
venues_to_recheck, start_min_str, start_max_str, worker_count, proxy_cycler,
)
else:
venues_data = []
venues_errored = 0
for tid in venues_to_recheck:
result = _fetch_venue_availability(tid, start_min_str, start_max_str, proxy_cycler())
if result is not None:
venues_data.append(result)
else:
venues_errored += 1
# Write recheck file
recheck_hour = now.hour
year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "playtomic", year, month)
dest = dest_dir / f"availability_{target_date}_recheck_{recheck_hour:02d}.json.gz"
captured_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
payload = json.dumps({
"date": target_date,
"captured_at_utc": captured_at,
"recheck_hour": recheck_hour,
"recheck_window_minutes": RECHECK_WINDOW_MINUTES,
"rechecked_tenant_ids": venues_to_recheck,
"venue_count": len(venues_data),
"venues_errored": venues_errored,
"venues": venues_data,
}).encode()
bytes_written = write_gzip_atomic(dest, payload)
logger.info(
"Recheck: %d/%d venues (%d errors) -> %s (%s bytes)",
len(venues_data), len(venues_to_recheck), venues_errored, dest, f"{bytes_written:,}",
)
return {
"files_written": 1,
"files_skipped": 0,
"bytes_written": bytes_written,
"cursor_value": f"{target_date}:recheck:{recheck_hour}",
}
# ---------------------------------------------------------------------------
# Entry points
# ---------------------------------------------------------------------------
def main() -> None: def main() -> None:
run_extractor(EXTRACTOR_NAME, extract) run_extractor(EXTRACTOR_NAME, extract)
def main_recheck() -> None:
run_extractor(RECHECK_EXTRACTOR_NAME, extract_recheck)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -0,0 +1,57 @@
"""Optional proxy rotation for parallel HTTP fetching.
Proxies are configured via the PROXY_URLS environment variable (comma-separated).
When unset, all functions return None/no-op — extractors fall back to direct requests.
Two routing modes:
round-robin — distribute requests evenly across proxies (default)
sticky — same key always maps to same proxy (for session-tracked sites)
"""
import itertools
import os
import threading
def load_proxy_urls() -> list[str]:
"""Read PROXY_URLS env var (comma-separated). Returns [] if unset.
Format: http://user:pass@host:port or socks5://host:port
"""
raw = os.environ.get("PROXY_URLS", "")
urls = [u.strip() for u in raw.split(",") if u.strip()]
return urls
def make_round_robin_cycler(proxy_urls: list[str]):
"""Thread-safe round-robin proxy cycler.
Returns a callable: next_proxy() -> str | None
Returns None-returning callable if no proxies configured.
"""
if not proxy_urls:
return lambda: None
cycle = itertools.cycle(proxy_urls)
lock = threading.Lock()
def next_proxy() -> str:
with lock:
return next(cycle)
return next_proxy
def make_sticky_selector(proxy_urls: list[str]):
"""Consistent-hash proxy selector — same key always maps to same proxy.
Use when the target site tracks sessions by IP (e.g. Cloudflare).
Returns a callable: select_proxy(key: str) -> str | None
"""
if not proxy_urls:
return lambda key: None
def select_proxy(key: str) -> str:
return proxy_urls[hash(key) % len(proxy_urls)]
return select_proxy

View File

@@ -7,10 +7,11 @@ Wants=network-online.target
Type=simple Type=simple
User=root User=root
WorkingDirectory=/opt/padelnomics WorkingDirectory=/opt/padelnomics
ExecStart=/opt/padelnomics/infra/supervisor/supervisor.sh ExecStart=/bin/sh -c 'exec uv run python src/padelnomics/supervisor.py'
Restart=always Restart=always
RestartSec=10 RestartSec=10
EnvironmentFile=/opt/padelnomics/.env EnvironmentFile=/opt/padelnomics/.env
Environment=PATH=/root/.local/bin:/usr/local/bin:/usr/bin:/bin
Environment=LANDING_DIR=/data/padelnomics/landing Environment=LANDING_DIR=/data/padelnomics/landing
Environment=DUCKDB_PATH=/data/padelnomics/lakehouse.duckdb Environment=DUCKDB_PATH=/data/padelnomics/lakehouse.duckdb
Environment=SERVING_DUCKDB_PATH=/data/padelnomics/analytics.duckdb Environment=SERVING_DUCKDB_PATH=/data/padelnomics/analytics.duckdb

View File

@@ -0,0 +1,33 @@
# Workflow registry — the supervisor reads this file on every tick.
# To add a new extractor: add a [section] here and create the Python module.
#
# Fields:
# module — Python module path (must have a main() function)
# schedule — named preset ("hourly", "daily", "weekly", "monthly")
# or raw cron expression (e.g. "0 6-23 * * *")
# entry — optional: function name if not "main" (default: "main")
# depends_on — optional: list of workflow names that must run first
# proxy_mode — optional: "round-robin" (default) or "sticky"
[overpass]
module = "padelnomics_extract.overpass"
schedule = "monthly"
[eurostat]
module = "padelnomics_extract.eurostat"
schedule = "monthly"
[playtomic_tenants]
module = "padelnomics_extract.playtomic_tenants"
schedule = "weekly"
[playtomic_availability]
module = "padelnomics_extract.playtomic_availability"
schedule = "daily"
depends_on = ["playtomic_tenants"]
[playtomic_recheck]
module = "padelnomics_extract.playtomic_availability"
entry = "main_recheck"
schedule = "0 6-23 * * *"
depends_on = ["playtomic_availability"]

View File

View File

@@ -0,0 +1,416 @@
"""Padelnomics Supervisor — schedule-aware pipeline orchestration.
Replaces supervisor.sh with a Python supervisor that reads a TOML workflow
registry, runs extractors on cron-based schedules (with dependency ordering
and parallel execution), then runs SQLMesh transform + export.
Crash safety: the main loop catches all exceptions and backs off, matching
the TigerBeetle CFO supervisor pattern. Combined with systemd Restart=always,
the supervisor is effectively unkillable.
Usage:
# Run the supervisor loop (production)
LANDING_DIR=data/landing uv run python src/padelnomics/supervisor.py
# Show workflow status
LANDING_DIR=data/landing uv run python src/padelnomics/supervisor.py status
"""
import importlib
import logging
import os
import subprocess
import sys
import time
import tomllib
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import UTC, datetime
from pathlib import Path
from croniter import croniter
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
TICK_INTERVAL_SECONDS = 60
BACKOFF_SECONDS = 600 # 10 min on tick failure (matches shell version)
SUBPROCESS_TIMEOUT_SECONDS = 14400 # 4 hours max per subprocess
REPO_DIR = Path(os.getenv("REPO_DIR", "/opt/padelnomics"))
LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing"))
DUCKDB_PATH = os.getenv("DUCKDB_PATH", "data/lakehouse.duckdb")
SERVING_DUCKDB_PATH = os.getenv("SERVING_DUCKDB_PATH", "analytics.duckdb")
ALERT_WEBHOOK_URL = os.getenv("ALERT_WEBHOOK_URL", "")
WORKFLOWS_PATH = Path(os.getenv("WORKFLOWS_PATH", "infra/supervisor/workflows.toml"))
NAMED_SCHEDULES = {
"hourly": "0 * * * *",
"daily": "0 5 * * *",
"weekly": "0 3 * * 1",
"monthly": "0 4 1 * *",
}
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("padelnomics.supervisor")
# ---------------------------------------------------------------------------
# State DB helpers (reuse extraction state DB)
# ---------------------------------------------------------------------------
def _open_state_db():
"""Open the extraction state DB. Reuses the same .state.sqlite as extractors."""
# Import here to avoid circular deps at module level
from padelnomics_extract.utils import open_state_db
return open_state_db(LANDING_DIR)
def _get_last_success_time(conn, workflow_name: str) -> datetime | None:
"""Return the finish time of the last successful run, or None."""
row = conn.execute(
"SELECT MAX(finished_at) AS t FROM extraction_runs "
"WHERE extractor = ? AND status = 'success'",
(workflow_name,),
).fetchone()
if not row or not row["t"]:
return None
return datetime.fromisoformat(row["t"]).replace(tzinfo=UTC)
# ---------------------------------------------------------------------------
# Workflow loading + scheduling
# ---------------------------------------------------------------------------
def load_workflows(path: Path) -> list[dict]:
"""Load workflow definitions from TOML file."""
assert path.exists(), f"Workflows file not found: {path}"
with open(path, "rb") as f:
data = tomllib.load(f)
workflows = []
for name, cfg in data.items():
assert "module" in cfg, f"Workflow '{name}' missing 'module'"
assert "schedule" in cfg, f"Workflow '{name}' missing 'schedule'"
workflows.append({
"name": name,
"module": cfg["module"],
"entry": cfg.get("entry", "main"),
"schedule": cfg["schedule"],
"depends_on": cfg.get("depends_on", []),
"proxy_mode": cfg.get("proxy_mode", "round-robin"),
})
return workflows
def resolve_schedule(schedule: str) -> str:
"""Resolve a named schedule to a cron expression, or pass through raw cron."""
return NAMED_SCHEDULES.get(schedule, schedule)
def is_due(conn, workflow: dict) -> bool:
"""Check if the most recent cron trigger hasn't been served yet."""
cron_expr = resolve_schedule(workflow["schedule"])
assert croniter.is_valid(cron_expr), f"Invalid cron: {cron_expr} for {workflow['name']}"
last_success = _get_last_success_time(conn, workflow["name"])
if last_success is None:
return True # never ran
now_naive = datetime.now(UTC).replace(tzinfo=None)
prev_trigger = croniter(cron_expr, now_naive).get_prev(datetime).replace(tzinfo=UTC)
return last_success < prev_trigger
# ---------------------------------------------------------------------------
# Topological ordering
# ---------------------------------------------------------------------------
def topological_waves(workflows: list[dict]) -> list[list[dict]]:
"""Group workflows into dependency waves for parallel execution.
Wave 0: no deps. Wave 1: depends only on wave 0. Etc.
Workflows whose dependencies aren't in the 'due' set are treated as having no deps.
"""
name_to_wf = {w["name"]: w for w in workflows}
due_names = set(name_to_wf.keys())
# Build in-degree map (only count deps that are in the due set)
in_degree: dict[str, int] = {}
dependents: dict[str, list[str]] = defaultdict(list)
for w in workflows:
deps_in_scope = [d for d in w["depends_on"] if d in due_names]
in_degree[w["name"]] = len(deps_in_scope)
for d in deps_in_scope:
dependents[d].append(w["name"])
waves = []
remaining = set(due_names)
max_iterations = len(workflows) + 1 # safety bound
for _ in range(max_iterations):
if not remaining:
break
# Wave = all workflows with in_degree 0
wave = [name_to_wf[n] for n in remaining if in_degree[n] == 0]
assert wave, f"Circular dependency detected among: {remaining}"
waves.append(wave)
for w in wave:
remaining.discard(w["name"])
for dep in dependents[w["name"]]:
if dep in remaining:
in_degree[dep] -= 1
return waves
# ---------------------------------------------------------------------------
# Workflow execution
# ---------------------------------------------------------------------------
def run_workflow(conn, workflow: dict) -> None:
"""Run a single workflow by importing its module and calling the entry function."""
module_name = workflow["module"]
entry_name = workflow["entry"]
logger.info("Running workflow: %s (%s.%s)", workflow["name"], module_name, entry_name)
try:
module = importlib.import_module(module_name)
entry_fn = getattr(module, entry_name)
entry_fn()
logger.info("Workflow %s completed successfully", workflow["name"])
except Exception:
logger.exception("Workflow %s failed", workflow["name"])
send_alert(f"Workflow '{workflow['name']}' failed")
raise
def run_due_workflows(conn, workflows: list[dict]) -> bool:
"""Run all due workflows. Independent ones run in parallel. Returns True if any ran."""
due = [w for w in workflows if is_due(conn, w)]
if not due:
logger.info("No workflows due")
return False
logger.info("Due workflows: %s", [w["name"] for w in due])
waves = topological_waves(due)
for i, wave in enumerate(waves):
wave_names = [w["name"] for w in wave]
logger.info("Wave %d: %s", i, wave_names)
if len(wave) == 1:
try:
run_workflow(conn, wave[0])
except Exception:
pass # already logged in run_workflow
else:
with ThreadPoolExecutor(max_workers=len(wave)) as pool:
futures = {pool.submit(run_workflow, conn, w): w for w in wave}
for future in as_completed(futures):
try:
future.result()
except Exception:
pass # already logged in run_workflow
return True
# ---------------------------------------------------------------------------
# Transform + Export + Deploy
# ---------------------------------------------------------------------------
def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> bool:
"""Run a shell command. Returns True on success."""
logger.info("Shell: %s", cmd)
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=timeout_seconds
)
if result.returncode != 0:
logger.error("Shell failed (rc=%d): %s\nstdout: %s\nstderr: %s",
result.returncode, cmd, result.stdout[-500:], result.stderr[-500:])
return False
return True
def run_transform() -> None:
"""Run SQLMesh — it evaluates model staleness internally."""
logger.info("Running SQLMesh transform")
ok = run_shell(
f"uv run sqlmesh -p transform/sqlmesh_padelnomics run",
)
if not ok:
send_alert("SQLMesh transform failed")
def run_export() -> None:
"""Export serving tables to analytics.duckdb."""
logger.info("Exporting serving tables")
ok = run_shell(
f"DUCKDB_PATH={DUCKDB_PATH} SERVING_DUCKDB_PATH={SERVING_DUCKDB_PATH} "
f"uv run python src/padelnomics/export_serving.py"
)
if not ok:
send_alert("Serving export failed")
def web_code_changed() -> bool:
"""Check if web app code changed since last deploy (after git pull)."""
result = subprocess.run(
["git", "diff", "--name-only", "HEAD~1", "HEAD", "--", "web/", "Dockerfile"],
capture_output=True, text=True, timeout=30,
)
return bool(result.stdout.strip())
def git_pull_and_sync() -> None:
"""Pull latest code and sync dependencies."""
run_shell("git fetch origin master")
run_shell("git switch --discard-changes --detach origin/master")
run_shell("uv sync --all-packages")
# ---------------------------------------------------------------------------
# Alerting
# ---------------------------------------------------------------------------
def send_alert(message: str) -> None:
"""Send failure alert via webhook (ntfy.sh / Slack / Telegram)."""
if not ALERT_WEBHOOK_URL:
return
timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
try:
subprocess.run(
["curl", "-s", "-d", f"[{timestamp}] {message}", ALERT_WEBHOOK_URL],
timeout=10, capture_output=True,
)
except Exception:
logger.exception("Failed to send alert")
# ---------------------------------------------------------------------------
# Main loop
# ---------------------------------------------------------------------------
def tick() -> None:
"""One cycle: check schedules, run what's due, transform, export."""
workflows = load_workflows(WORKFLOWS_PATH)
conn = _open_state_db()
try:
# Git pull + sync (production only)
if os.getenv("SUPERVISOR_GIT_PULL"):
git_pull_and_sync()
# Run due extractors
run_due_workflows(conn, workflows)
# SQLMesh always runs (evaluates staleness internally)
run_transform()
# Export serving tables
run_export()
# Deploy web app if code changed
if os.getenv("SUPERVISOR_GIT_PULL") and web_code_changed():
logger.info("Web code changed — deploying")
run_shell("./deploy.sh")
finally:
conn.close()
def supervisor_loop() -> None:
"""Infinite supervisor loop — never exits unless killed."""
logger.info("Supervisor starting (tick interval: %ds)", TICK_INTERVAL_SECONDS)
logger.info("Workflows: %s", WORKFLOWS_PATH)
logger.info("Landing dir: %s", LANDING_DIR)
while True:
try:
tick()
except KeyboardInterrupt:
logger.info("Supervisor stopped (KeyboardInterrupt)")
break
except Exception:
logger.exception("Supervisor tick failed — backing off %ds", BACKOFF_SECONDS)
send_alert("Supervisor tick failed")
time.sleep(BACKOFF_SECONDS)
else:
time.sleep(TICK_INTERVAL_SECONDS)
# ---------------------------------------------------------------------------
# Status CLI
# ---------------------------------------------------------------------------
def print_status() -> None:
"""Print workflow status table."""
workflows = load_workflows(WORKFLOWS_PATH)
conn = _open_state_db()
now = datetime.now(UTC)
# Header
print(f"{'Workflow':<28} {'Schedule':<18} {'Last Run':<20} {'Status':<8} {'Next'}")
print(f"{'' * 28} {'' * 18} {'' * 20} {'' * 8} {'' * 12}")
for w in workflows:
last_success = _get_last_success_time(conn, w["name"])
cron_expr = resolve_schedule(w["schedule"])
# Last run info
if last_success:
last_str = last_success.strftime("%Y-%m-%d %H:%M")
status = "ok"
else:
last_str = "never"
status = "pending"
# Last failure check
row = conn.execute(
"SELECT MAX(finished_at) AS t FROM extraction_runs "
"WHERE extractor = ? AND status = 'failed'",
(w["name"],),
).fetchone()
if row and row["t"]:
last_fail = datetime.fromisoformat(row["t"]).replace(tzinfo=UTC)
if last_success is None or last_fail > last_success:
status = "FAILED"
# Next trigger (croniter returns naive datetimes — treat as UTC)
now_naive = now.replace(tzinfo=None)
next_trigger = croniter(cron_expr, now_naive).get_next(datetime)
delta = next_trigger - now_naive
if delta.total_seconds() < 3600:
next_str = f"in {int(delta.total_seconds() / 60)}m"
elif delta.total_seconds() < 86400:
next_str = next_trigger.strftime("%H:%M")
else:
next_str = next_trigger.strftime("%b %d")
schedule_display = w["schedule"] if w["schedule"] in NAMED_SCHEDULES else cron_expr
print(f"{w['name']:<28} {schedule_display:<18} {last_str:<20} {status:<8} {next_str}")
conn.close()
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
if len(sys.argv) > 1 and sys.argv[1] == "status":
print_status()
else:
supervisor_loop()
if __name__ == "__main__":
main()

View File

@@ -3,6 +3,10 @@
-- per-day statistics, then calculates occupancy by comparing available hours -- per-day statistics, then calculates occupancy by comparing available hours
-- against total capacity from fct_venue_capacity. -- against total capacity from fct_venue_capacity.
-- --
-- Recheck-aware occupancy: for each (tenant, resource, slot_start_time),
-- prefer the latest snapshot (recheck > morning). A slot present in morning
-- but absent in the recheck = booked between snapshots → more accurate occupancy.
--
-- Occupancy = 1 - (available_court_hours / capacity_court_hours_per_day) -- Occupancy = 1 - (available_court_hours / capacity_court_hours_per_day)
-- Revenue estimate = booked_court_hours × avg_price_of_available_slots -- Revenue estimate = booked_court_hours × avg_price_of_available_slots
-- --
@@ -15,14 +19,31 @@ MODEL (
grain (snapshot_date, tenant_id) grain (snapshot_date, tenant_id)
); );
WITH slot_agg AS ( -- Prefer the latest snapshot for each slot:
-- If a recheck exists for a (date, tenant, resource, start_time), use it.
-- Otherwise fall back to the morning snapshot.
WITH ranked_slots AS (
SELECT
a.*,
ROW_NUMBER() OVER (
PARTITION BY a.snapshot_date, a.tenant_id, a.resource_id, a.slot_start_time
ORDER BY
CASE WHEN a.snapshot_type = 'recheck' THEN 1 ELSE 2 END,
a.captured_at_utc DESC
) AS rn
FROM staging.stg_playtomic_availability a
WHERE a.price_amount IS NOT NULL
AND a.price_amount > 0
),
latest_slots AS (
SELECT * FROM ranked_slots WHERE rn = 1
),
slot_agg AS (
SELECT SELECT
a.snapshot_date, a.snapshot_date,
a.tenant_id, a.tenant_id,
-- Slot counts: each row is one 60-min available slot on one court
COUNT(*) AS available_slot_count, COUNT(*) AS available_slot_count,
COUNT(DISTINCT a.resource_id) AS courts_with_availability, COUNT(DISTINCT a.resource_id) AS courts_with_availability,
-- Available (unbooked) court-hours: slots are on 30-min increments for 60-min bookings
-- Each available start_time represents a 60-min bookable window -- Each available start_time represents a 60-min bookable window
ROUND(COUNT(*) * 1.0, 2) AS available_court_hours, ROUND(COUNT(*) * 1.0, 2) AS available_court_hours,
-- Pricing stats (60-min slots only) -- Pricing stats (60-min slots only)
@@ -42,9 +63,7 @@ WITH slot_agg AS (
), 2) AS median_price_offpeak, ), 2) AS median_price_offpeak,
MAX(a.price_currency) AS price_currency, MAX(a.price_currency) AS price_currency,
MAX(a.captured_at_utc) AS captured_at_utc MAX(a.captured_at_utc) AS captured_at_utc
FROM staging.stg_playtomic_availability a FROM latest_slots a
WHERE a.price_amount IS NOT NULL
AND a.price_amount > 0
GROUP BY a.snapshot_date, a.tenant_id GROUP BY a.snapshot_date, a.tenant_id
) )
SELECT SELECT

View File

@@ -2,28 +2,30 @@
-- One row per available 60-minute booking slot per court per venue per day. -- One row per available 60-minute booking slot per court per venue per day.
-- "Available" = the slot was NOT booked at capture time. Missing slots = booked. -- "Available" = the slot was NOT booked at capture time. Missing slots = booked.
-- --
-- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit). -- Reads BOTH morning snapshots and recheck files:
-- The API returns 60/90/120-min variants per start_time — filtering to 60 avoids -- Morning: availability_{date}.json.gz → snapshot_type = 'morning'
-- double-counting the same time window. -- Recheck: availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck'
-- --
-- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit).
-- Price parsed from strings like "14.56 EUR" or "48 GBP". -- Price parsed from strings like "14.56 EUR" or "48 GBP".
-- --
-- Requires: at least one availability file in the landing zone. -- Requires: at least one availability file in the landing zone.
-- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz) -- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz)
-- with empty venues[] ensures this model runs before real data arrives. -- with empty venues[] ensures this model runs before real data arrives.
--
-- Source: data/landing/playtomic/{year}/{month}/availability_{date}.json.gz
-- Format: {date, captured_at_utc, venues: [{tenant_id, slots: [{resource_id, start_date, slots: [...]}]}]}
MODEL ( MODEL (
name staging.stg_playtomic_availability, name staging.stg_playtomic_availability,
kind FULL, kind FULL,
cron '@daily', cron '@daily',
grain (snapshot_date, tenant_id, resource_id, slot_start_time) grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc)
); );
WITH raw_files AS ( -- Morning snapshots (filename does NOT contain '_recheck_')
SELECT * WITH morning_files AS (
SELECT
*,
'morning' AS snapshot_type,
NULL::INTEGER AS recheck_hour
FROM read_json( FROM read_json(
@LANDING_DIR || '/playtomic/*/*/availability_*.json.gz', @LANDING_DIR || '/playtomic/*/*/availability_*.json.gz',
format = 'auto', format = 'auto',
@@ -31,24 +33,57 @@ WITH raw_files AS (
date: 'VARCHAR', date: 'VARCHAR',
captured_at_utc: 'VARCHAR', captured_at_utc: 'VARCHAR',
venues: 'JSON[]' venues: 'JSON[]'
} },
filename = true
)
WHERE filename NOT LIKE '%_recheck_%'
AND venues IS NOT NULL
AND json_array_length(venues) > 0
),
-- Recheck snapshots (filename contains '_recheck_')
-- Use TRY_CAST on a regex-extracted hour to get the recheck_hour.
-- If no recheck files exist yet, this CTE produces zero rows (safe).
recheck_files AS (
SELECT
*,
'recheck' AS snapshot_type,
TRY_CAST(
regexp_extract(filename, '_recheck_(\d+)', 1) AS INTEGER
) AS recheck_hour
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz',
format = 'auto',
columns = {
date: 'VARCHAR',
captured_at_utc: 'VARCHAR',
venues: 'JSON[]'
},
filename = true
) )
WHERE venues IS NOT NULL WHERE venues IS NOT NULL
AND json_array_length(venues) > 0 AND json_array_length(venues) > 0
), ),
all_files AS (
SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM morning_files
UNION ALL
SELECT date, captured_at_utc, venues, snapshot_type, recheck_hour FROM recheck_files
),
raw_venues AS ( raw_venues AS (
SELECT SELECT
rf.date AS snapshot_date, af.date AS snapshot_date,
rf.captured_at_utc, af.captured_at_utc,
af.snapshot_type,
af.recheck_hour,
venue_json venue_json
FROM raw_files rf, FROM all_files af,
LATERAL UNNEST(rf.venues) AS t(venue_json) LATERAL UNNEST(af.venues) AS t(venue_json)
), ),
-- Each venue has: {tenant_id, slots: [{resource_id, start_date, slots: [...]}]}
raw_resources AS ( raw_resources AS (
SELECT SELECT
rv.snapshot_date, rv.snapshot_date,
rv.captured_at_utc, rv.captured_at_utc,
rv.snapshot_type,
rv.recheck_hour,
rv.venue_json ->> 'tenant_id' AS tenant_id, rv.venue_json ->> 'tenant_id' AS tenant_id,
resource_json resource_json
FROM raw_venues rv, FROM raw_venues rv,
@@ -56,11 +91,12 @@ raw_resources AS (
from_json(rv.venue_json -> 'slots', '["JSON"]') from_json(rv.venue_json -> 'slots', '["JSON"]')
) AS t(resource_json) ) AS t(resource_json)
), ),
-- Each resource has: {resource_id, start_date, slots: [{start_time, duration, price}]}
raw_slots AS ( raw_slots AS (
SELECT SELECT
rr.snapshot_date, rr.snapshot_date,
rr.captured_at_utc, rr.captured_at_utc,
rr.snapshot_type,
rr.recheck_hour,
rr.tenant_id, rr.tenant_id,
rr.resource_json ->> 'resource_id' AS resource_id, rr.resource_json ->> 'resource_id' AS resource_id,
slot_json slot_json
@@ -75,12 +111,12 @@ SELECT
resource_id, resource_id,
slot_json ->> 'start_time' AS slot_start_time, slot_json ->> 'start_time' AS slot_start_time,
TRY_CAST(slot_json ->> 'duration' AS INTEGER) AS duration_minutes, TRY_CAST(slot_json ->> 'duration' AS INTEGER) AS duration_minutes,
-- Parse "14.56 EUR" → 14.56
TRY_CAST( TRY_CAST(
SPLIT_PART(slot_json ->> 'price', ' ', 1) AS DOUBLE SPLIT_PART(slot_json ->> 'price', ' ', 1) AS DOUBLE
) AS price_amount, ) AS price_amount,
-- Parse "14.56 EUR" → EUR
SPLIT_PART(slot_json ->> 'price', ' ', 2) AS price_currency, SPLIT_PART(slot_json ->> 'price', ' ', 2) AS price_currency,
snapshot_type,
recheck_hour,
captured_at_utc captured_at_utc
FROM raw_slots FROM raw_slots
WHERE resource_id IS NOT NULL WHERE resource_id IS NOT NULL

2
uv.lock generated
View File

@@ -1293,6 +1293,7 @@ version = "0.1.0"
source = { editable = "web" } source = { editable = "web" }
dependencies = [ dependencies = [
{ name = "aiosqlite" }, { name = "aiosqlite" },
{ name = "croniter" },
{ name = "duckdb" }, { name = "duckdb" },
{ name = "google-api-python-client" }, { name = "google-api-python-client" },
{ name = "google-auth" }, { name = "google-auth" },
@@ -1313,6 +1314,7 @@ dependencies = [
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "aiosqlite", specifier = ">=0.19.0" }, { name = "aiosqlite", specifier = ">=0.19.0" },
{ name = "croniter", specifier = ">=6.0.0" },
{ name = "duckdb", specifier = ">=1.0.0" }, { name = "duckdb", specifier = ">=1.0.0" },
{ name = "google-api-python-client", specifier = ">=2.100.0" }, { name = "google-api-python-client", specifier = ">=2.100.0" },
{ name = "google-auth", specifier = ">=2.23.0" }, { name = "google-auth", specifier = ">=2.23.0" },

View File

@@ -18,6 +18,7 @@ dependencies = [
"duckdb>=1.0.0", "duckdb>=1.0.0",
"pyarrow>=23.0.1", "pyarrow>=23.0.1",
"pyyaml>=6.0", "pyyaml>=6.0",
"croniter>=6.0.0",
"httpx>=0.27.0", "httpx>=0.27.0",
"google-api-python-client>=2.100.0", "google-api-python-client>=2.100.0",
"google-auth>=2.23.0", "google-auth>=2.23.0",

View File

@@ -835,6 +835,46 @@ async def supplier_tier(supplier_id: int):
return redirect(url_for("admin.supplier_detail", supplier_id=supplier_id)) return redirect(url_for("admin.supplier_detail", supplier_id=supplier_id))
# =============================================================================
# Feature Flags
# =============================================================================
@bp.route("/flags")
@role_required("admin")
async def flags():
"""Feature flags management."""
flag_list = await fetch_all("SELECT * FROM feature_flags ORDER BY name")
return await render_template("admin/flags.html", flags=flag_list, admin_page="flags")
@bp.route("/flags/toggle", methods=["POST"])
@role_required("admin")
@csrf_protect
async def flag_toggle():
"""Toggle a feature flag on/off."""
form = await request.form
flag_name = form.get("name", "").strip()
if not flag_name:
await flash("Flag name required.", "error")
return redirect(url_for("admin.flags"))
# Get current state and flip it
row = await fetch_one("SELECT enabled FROM feature_flags WHERE name = ?", (flag_name,))
if not row:
await flash(f"Flag '{flag_name}' not found.", "error")
return redirect(url_for("admin.flags"))
new_enabled = 0 if row["enabled"] else 1
now = datetime.utcnow().isoformat()
await execute(
"UPDATE feature_flags SET enabled = ?, updated_at = ? WHERE name = ?",
(new_enabled, now, flag_name),
)
state = "enabled" if new_enabled else "disabled"
await flash(f"Flag '{flag_name}' {state}.", "success")
return redirect(url_for("admin.flags"))
# ============================================================================= # =============================================================================
# Feedback Management # Feedback Management
# ============================================================================= # =============================================================================

View File

@@ -111,6 +111,10 @@
</a> </a>
<div class="admin-sidebar__section">System</div> <div class="admin-sidebar__section">System</div>
<a href="{{ url_for('admin.flags') }}" class="{% if admin_page == 'flags' %}active{% endif %}">
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke-width="1.5" stroke="currentColor"><path stroke-linecap="round" stroke-linejoin="round" d="M3 3v1.5M3 21v-6m0 0 2.77-.693a9 9 0 0 1 6.208.682l.108.054a9 9 0 0 0 6.086.71l3.114-.732a48.524 48.524 0 0 1-.005-10.499l-3.11.732a9 9 0 0 1-6.085-.711l-.108-.054a9 9 0 0 0-6.208-.682L3 4.5M3 15V4.5"/></svg>
Flags
</a>
<a href="{{ url_for('admin.tasks') }}" class="{% if admin_page == 'tasks' %}active{% endif %}"> <a href="{{ url_for('admin.tasks') }}" class="{% if admin_page == 'tasks' %}active{% endif %}">
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke-width="1.5" stroke="currentColor"><path stroke-linecap="round" stroke-linejoin="round" d="M10.5 6h9.75M10.5 6a1.5 1.5 0 1 1-3 0m3 0a1.5 1.5 0 1 0-3 0M3.75 6H7.5m3 12h9.75m-9.75 0a1.5 1.5 0 0 1-3 0m3 0a1.5 1.5 0 0 0-3 0m-3.75 0H7.5m9-6h3.75m-3.75 0a1.5 1.5 0 0 1-3 0m3 0a1.5 1.5 0 0 0-3 0m-9.75 0h9.75"/></svg> <svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke-width="1.5" stroke="currentColor"><path stroke-linecap="round" stroke-linejoin="round" d="M10.5 6h9.75M10.5 6a1.5 1.5 0 1 1-3 0m3 0a1.5 1.5 0 1 0-3 0M3.75 6H7.5m3 12h9.75m-9.75 0a1.5 1.5 0 0 1-3 0m3 0a1.5 1.5 0 0 0-3 0m-3.75 0H7.5m9-6h3.75m-3.75 0a1.5 1.5 0 0 1-3 0m3 0a1.5 1.5 0 0 0-3 0m-9.75 0h9.75"/></svg>
Tasks Tasks

View File

@@ -0,0 +1,72 @@
{% extends "admin/base_admin.html" %}
{% block admin_head %}
<style>
.flags-table { width: 100%; border-collapse: collapse; }
.flags-table th, .flags-table td { padding: 12px 16px; text-align: left; border-bottom: 1px solid #E2E8F0; }
.flags-table th { font-size: 0.75rem; font-weight: 600; color: #64748B; text-transform: uppercase; letter-spacing: 0.05em; }
.flag-badge { display: inline-flex; align-items: center; gap: 6px; padding: 4px 10px; border-radius: 9999px; font-size: 0.75rem; font-weight: 600; }
.flag-badge--on { background: #DCFCE7; color: #166534; }
.flag-badge--off { background: #FEE2E2; color: #991B1B; }
.flag-dot { width: 8px; height: 8px; border-radius: 50%; }
.flag-dot--on { background: #22C55E; }
.flag-dot--off { background: #EF4444; }
.flag-toggle-btn {
padding: 6px 14px; border-radius: 6px; font-size: 0.8125rem; font-weight: 500;
border: 1px solid #E2E8F0; cursor: pointer; transition: all 0.15s;
}
.flag-toggle-btn--enable { background: #F0FDF4; color: #166534; border-color: #BBF7D0; }
.flag-toggle-btn--enable:hover { background: #DCFCE7; }
.flag-toggle-btn--disable { background: #FEF2F2; color: #991B1B; border-color: #FECACA; }
.flag-toggle-btn--disable:hover { background: #FEE2E2; }
.flag-name { font-weight: 600; color: #0F172A; }
.flag-desc { font-size: 0.8125rem; color: #64748B; }
.flag-updated { font-size: 0.75rem; color: #94A3B8; }
</style>
{% endblock %}
{% block admin_content %}
<h1 style="font-size:1.5rem; font-weight:700; margin-bottom:0.5rem;">Feature Flags</h1>
<p style="color:#64748B; margin-bottom:1.5rem; font-size:0.875rem;">
Toggle features on/off without redeployment. Changes take effect immediately.
</p>
<table class="flags-table">
<thead>
<tr>
<th>Flag</th>
<th>Description</th>
<th>Status</th>
<th>Last Updated</th>
<th></th>
</tr>
</thead>
<tbody>
{% for f in flags %}
<tr>
<td><span class="flag-name">{{ f.name }}</span></td>
<td><span class="flag-desc">{{ f.description or '—' }}</span></td>
<td>
{% if f.enabled %}
<span class="flag-badge flag-badge--on"><span class="flag-dot flag-dot--on"></span> Enabled</span>
{% else %}
<span class="flag-badge flag-badge--off"><span class="flag-dot flag-dot--off"></span> Disabled</span>
{% endif %}
</td>
<td><span class="flag-updated">{{ f.updated_at or '—' }}</span></td>
<td>
<form method="POST" action="{{ url_for('admin.flag_toggle') }}" style="display:inline;">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="name" value="{{ f.name }}">
{% if f.enabled %}
<button type="submit" class="flag-toggle-btn flag-toggle-btn--disable">Disable</button>
{% else %}
<button type="submit" class="flag-toggle-btn flag-toggle-btn--enable">Enable</button>
{% endif %}
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endblock %}

View File

@@ -7,7 +7,7 @@ from pathlib import Path
from quart import Quart, Response, abort, g, redirect, request, session, url_for from quart import Quart, Response, abort, g, redirect, request, session, url_for
from .analytics import close_analytics_db, open_analytics_db from .analytics import close_analytics_db, open_analytics_db
from .core import close_db, config, get_csrf_token, init_db, setup_request_id from .core import close_db, config, get_csrf_token, init_db, is_flag_enabled, setup_request_id
from .i18n import LANG_BLUEPRINTS, SUPPORTED_LANGS, get_translations from .i18n import LANG_BLUEPRINTS, SUPPORTED_LANGS, get_translations
_ASSET_VERSION = str(int(time.time())) _ASSET_VERSION = str(int(time.time()))
@@ -224,6 +224,7 @@ def create_app() -> Quart:
"lang": effective_lang, "lang": effective_lang,
"t": get_translations(effective_lang), "t": get_translations(effective_lang),
"v": _ASSET_VERSION, "v": _ASSET_VERSION,
"flag": is_flag_enabled,
} }
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------

View File

@@ -14,9 +14,10 @@ from ..core import (
config, config,
csrf_protect, csrf_protect,
execute, execute,
feature_gate,
fetch_one, fetch_one,
is_disposable_email, is_disposable_email,
waitlist_gate, is_flag_enabled,
) )
from ..i18n import SUPPORTED_LANGS, get_translations from ..i18n import SUPPORTED_LANGS, get_translations
@@ -248,14 +249,14 @@ async def login():
@bp.route("/signup", methods=["GET", "POST"]) @bp.route("/signup", methods=["GET", "POST"])
@csrf_protect @csrf_protect
@waitlist_gate("waitlist.html", plan=lambda: request.args.get("plan", "free")) @feature_gate("payments", "waitlist.html", plan=lambda: request.args.get("plan", "free"))
async def signup(): async def signup():
"""Signup page - same as login but with different messaging.""" """Signup page - same as login but with different messaging."""
if g.get("user"): if g.get("user"):
return redirect(url_for("dashboard.index")) return redirect(url_for("dashboard.index"))
# Waitlist POST handling # Waitlist POST handling (when payments flag is disabled)
if config.WAITLIST_MODE and request.method == "POST": if not await is_flag_enabled("payments") and request.method == "POST":
_t = get_translations(g.lang) _t = get_translations(g.lang)
form = await request.form form = await request.form
email = form.get("email", "").strip().lower() email = form.get("email", "").strip().lower()

View File

@@ -9,7 +9,7 @@ from jinja2 import Environment, FileSystemLoader
from markupsafe import Markup from markupsafe import Markup
from quart import Blueprint, abort, render_template, request from quart import Blueprint, abort, render_template, request
from ..core import capture_waitlist_email, config, csrf_protect, fetch_all, fetch_one, waitlist_gate from ..core import capture_waitlist_email, csrf_protect, feature_gate, fetch_all, fetch_one
from ..i18n import get_translations from ..i18n import get_translations
bp = Blueprint( bp = Blueprint(
@@ -106,10 +106,11 @@ async def bake_scenario_cards(html: str, lang: str = "en") -> str:
@bp.route("/markets", methods=["GET", "POST"]) @bp.route("/markets", methods=["GET", "POST"])
@csrf_protect @csrf_protect
@waitlist_gate("markets_waitlist.html") @feature_gate("markets", "markets_waitlist.html")
async def markets(): async def markets():
"""Hub page: search + country/region filter for articles.""" """Hub page: search + country/region filter for articles."""
if config.WAITLIST_MODE and request.method == "POST": from ..core import is_flag_enabled
if not await is_flag_enabled("markets") and request.method == "POST":
form = await request.form form = await request.form
email = form.get("email", "").strip().lower() email = form.get("email", "").strip().lower()
if email and "@" in email: if email and "@" in email:
@@ -147,7 +148,7 @@ async def markets():
@bp.route("/markets/results") @bp.route("/markets/results")
@waitlist_gate("markets_waitlist.html") @feature_gate("markets", "markets_waitlist.html")
async def market_results(): async def market_results():
"""HTMX partial: filtered article cards.""" """HTMX partial: filtered article cards."""
q = request.args.get("q", "").strip() q = request.args.get("q", "").strip()

View File

@@ -724,27 +724,60 @@ def ab_test(experiment: str, variants: tuple = ("control", "treatment")):
return decorator return decorator
def waitlist_gate(template: str, **extra_context): async def is_flag_enabled(name: str, default: bool = False) -> bool:
"""Parameterized decorator that intercepts GET requests when WAITLIST_MODE is enabled. """Check if a feature flag is enabled. Falls back to default if flag doesn't exist.
If WAITLIST_MODE is true and the request is a GET, renders the given template Reads from the feature_flags table. Flags are toggled via the admin UI
instead of calling the wrapped function. POST requests and non-waitlist mode and take effect immediately — no restart needed.
always pass through. """
row = await fetch_one(
"SELECT enabled FROM feature_flags WHERE name = ?", (name,)
)
if row is None:
return default
return bool(row["enabled"])
def feature_gate(flag_name: str, waitlist_template: str, **extra_context):
"""Gate a route behind a feature flag. Shows waitlist template if flag is disabled.
Replaces the old waitlist_gate() which used a global WAITLIST_MODE env var.
This checks per-feature flags from the database instead.
Args: Args:
template: Template path to render in waitlist mode (e.g., "waitlist.html") flag_name: Name of the feature flag (e.g., "payments", "supplier_signup")
**extra_context: Additional context variables to pass to template. waitlist_template: Template to render when the flag is OFF and method is GET
Values can be callables (evaluated at request time) or static. **extra_context: Additional context. Values can be callables (evaluated at request time).
Usage: Usage:
@bp.route("/signup", methods=["GET", "POST"]) @bp.route("/signup", methods=["GET", "POST"])
@csrf_protect @csrf_protect
@waitlist_gate("waitlist.html", plan=lambda: request.args.get("plan", "free")) @feature_gate("payments", "waitlist.html", plan=lambda: request.args.get("plan", "free"))
async def signup(): async def signup():
# POST handling and normal signup code here
... ...
""" """
def decorator(f):
@wraps(f)
async def decorated(*args, **kwargs):
if not await is_flag_enabled(flag_name) and request.method == "GET":
ctx = {}
for key, val in extra_context.items():
ctx[key] = val() if callable(val) else val
return await render_template(waitlist_template, **ctx)
return await f(*args, **kwargs)
return decorated
return decorator
def waitlist_gate(template: str, **extra_context):
"""DEPRECATED: Use feature_gate() instead. Kept for backwards compatibility.
Intercepts GET requests when WAITLIST_MODE is enabled.
"""
def decorator(f): def decorator(f):
@wraps(f) @wraps(f)
async def decorated(*args, **kwargs): async def decorated(*args, **kwargs):

View File

@@ -0,0 +1,28 @@
"""Add feature_flags table for per-feature gating.
Replaces the global WAITLIST_MODE env var with granular per-feature flags
that can be toggled at runtime via the admin UI.
"""
def up(conn):
conn.execute("""
CREATE TABLE IF NOT EXISTS feature_flags (
name TEXT PRIMARY KEY,
enabled INTEGER NOT NULL DEFAULT 0,
description TEXT,
updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
)
""")
# Seed initial flags — markets live, everything else behind waitlist
conn.executemany(
"INSERT OR IGNORE INTO feature_flags (name, enabled, description) VALUES (?, ?, ?)",
[
("markets", 1, "Market/SEO content pages"),
("payments", 0, "Paddle billing & checkout"),
("planner_export", 0, "Business plan PDF export"),
("supplier_signup", 0, "Supplier onboarding wizard"),
("lead_unlock", 0, "Lead credit purchase & unlock"),
],
)

View File

@@ -14,10 +14,10 @@ from ..core import (
config, config,
csrf_protect, csrf_protect,
execute, execute,
feature_gate,
fetch_all, fetch_all,
fetch_one, fetch_one,
get_paddle_price, get_paddle_price,
waitlist_gate,
) )
from ..i18n import get_translations from ..i18n import get_translations
from .calculator import COUNTRY_CURRENCY, CURRENCY_DEFAULT, calc, validate_state from .calculator import COUNTRY_CURRENCY, CURRENCY_DEFAULT, calc, validate_state
@@ -596,7 +596,7 @@ async def set_default(scenario_id: int):
@bp.route("/export") @bp.route("/export")
@login_required @login_required
@waitlist_gate("export_waitlist.html") @feature_gate("planner_export", "export_waitlist.html")
async def export(): async def export():
"""Export options page — language, scenario picker, pricing.""" """Export options page — language, scenario picker, pricing."""
scenarios = await get_scenarios(g.user["id"]) scenarios = await get_scenarios(g.user["id"])

View File

@@ -15,8 +15,9 @@ from ..core import (
execute, execute,
fetch_all, fetch_all,
fetch_one, fetch_one,
feature_gate,
get_paddle_price, get_paddle_price,
waitlist_gate, is_flag_enabled,
) )
from ..i18n import get_translations from ..i18n import get_translations
@@ -243,8 +244,8 @@ def _lead_tier_required(f):
@bp.route("/signup") @bp.route("/signup")
@waitlist_gate( @feature_gate(
"suppliers/waitlist.html", "supplier_signup", "suppliers/waitlist.html",
plan=lambda: request.args.get("plan", "supplier_growth"), plan=lambda: request.args.get("plan", "supplier_growth"),
plans=lambda: PLAN_FEATURES, plans=lambda: PLAN_FEATURES,
) )
@@ -574,6 +575,9 @@ async def lead_feed():
@csrf_protect @csrf_protect
async def unlock_lead(token: str): async def unlock_lead(token: str):
"""Spend credits to unlock a lead. Returns full-details card via HTMX.""" """Spend credits to unlock a lead. Returns full-details card via HTMX."""
if not await is_flag_enabled("lead_unlock"):
return jsonify({"error": "Lead unlock is not available yet."}), 403
from ..credits import InsufficientCredits from ..credits import InsufficientCredits
from ..credits import unlock_lead as do_unlock from ..credits import unlock_lead as do_unlock

View File

@@ -54,6 +54,20 @@ async def db():
conn.row_factory = aiosqlite.Row conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON") await conn.execute("PRAGMA foreign_keys=ON")
await conn.executescript(schema_ddl) await conn.executescript(schema_ddl)
# Seed feature flags so routes that use feature_gate() pass through by default.
# Tests that specifically test gated behaviour set the flag to 0 via _set_flag().
await conn.executescript("""
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('markets', 1, 'Market/SEO content pages');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('payments', 0, 'Paddle billing & checkout');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('planner_export', 0, 'Business plan PDF export');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('supplier_signup', 0, 'Supplier onboarding wizard');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('lead_unlock', 0, 'Lead credit purchase & unlock');
""")
await conn.commit() await conn.commit()
original_db = core._db original_db = core._db

View File

@@ -255,6 +255,10 @@ class TestExportRoutes:
assert resp.status_code == 302 assert resp.status_code == 302
async def test_export_page_shows_scenarios(self, auth_client, db, scenario): async def test_export_page_shows_scenarios(self, auth_client, db, scenario):
await db.execute(
"INSERT OR REPLACE INTO feature_flags (name, enabled, description) VALUES ('planner_export', 1, '')"
)
await db.commit()
resp = await auth_client.get("/en/planner/export") resp = await auth_client.get("/en/planner/export")
assert resp.status_code == 200 assert resp.status_code == 200
html = (await resp.data).decode() html = (await resp.data).decode()

View File

@@ -0,0 +1,431 @@
"""
Tests for feature flags — migration, is_flag_enabled helper, feature_gate
decorator, admin toggle routes, and per-feature gating across all routes.
Unit tests cover is_flag_enabled and feature_gate in isolation.
Integration tests exercise full request/response flows via Quart test client.
"""
import sqlite3
from datetime import datetime
from pathlib import Path
from unittest.mock import AsyncMock, patch
import pytest
from padelnomics import core
from padelnomics.migrations.migrate import migrate
# ── Fixtures & helpers ────────────────────────────────────────────
@pytest.fixture(autouse=True)
def mock_csrf_validation():
"""Mock CSRF validation to always pass in all tests in this file."""
with patch("padelnomics.core.validate_csrf_token", return_value=True):
yield
@pytest.fixture
async def admin_client(app, db):
"""Test client with an admin-role user session (module-level, follows test_content.py)."""
now = datetime.utcnow().isoformat()
async with db.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("flags_admin@test.com", "Flags Admin", now),
) as cursor:
admin_id = cursor.lastrowid
await db.execute(
"INSERT INTO user_roles (user_id, role) VALUES (?, 'admin')", (admin_id,)
)
await db.commit()
async with app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = admin_id
yield c
async def _set_flag(db, name: str, enabled: bool, description: str = ""):
"""Insert or replace a flag in the test DB."""
await db.execute(
"""INSERT OR REPLACE INTO feature_flags (name, enabled, description)
VALUES (?, ?, ?)""",
(name, 1 if enabled else 0, description),
)
await db.commit()
async def _flag_value(db, name: str) -> int | None:
async with db.execute(
"SELECT enabled FROM feature_flags WHERE name = ?", (name,)
) as cursor:
row = await cursor.fetchone()
return row[0] if row else None
async def _seed_all_flags(db):
"""Seed the five default flags matching migration 0019 defaults."""
flags = [
("markets", 1),
("payments", 0),
("planner_export", 0),
("supplier_signup", 0),
("lead_unlock", 0),
]
for name, enabled in flags:
await db.execute(
"INSERT OR REPLACE INTO feature_flags (name, enabled, description) VALUES (?, ?, ?)",
(name, enabled, ""),
)
await db.commit()
def _column_names(conn, table):
return [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
def _table_names(conn):
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
" AND name NOT LIKE 'sqlite_%' ORDER BY name"
).fetchall()
return [r[0] for r in rows]
# ── Migration 0019 ────────────────────────────────────────────────
class TestMigration0019:
"""Migration 0019 creates feature_flags table and seeds initial flags."""
def test_creates_feature_flags_table(self, tmp_path):
db_path = str(tmp_path / "test.db")
migrate(db_path)
conn = sqlite3.connect(db_path)
assert "feature_flags" in _table_names(conn)
conn.close()
def test_table_has_correct_columns(self, tmp_path):
db_path = str(tmp_path / "test.db")
migrate(db_path)
conn = sqlite3.connect(db_path)
cols = _column_names(conn, "feature_flags")
conn.close()
assert "name" in cols
assert "enabled" in cols
assert "description" in cols
assert "updated_at" in cols
def test_seeds_markets_enabled(self, tmp_path):
db_path = str(tmp_path / "test.db")
migrate(db_path)
conn = sqlite3.connect(db_path)
row = conn.execute(
"SELECT enabled FROM feature_flags WHERE name = 'markets'"
).fetchone()
conn.close()
assert row is not None and row[0] == 1
def test_seeds_payments_disabled(self, tmp_path):
db_path = str(tmp_path / "test.db")
migrate(db_path)
conn = sqlite3.connect(db_path)
row = conn.execute(
"SELECT enabled FROM feature_flags WHERE name = 'payments'"
).fetchone()
conn.close()
assert row is not None and row[0] == 0
def test_seeds_all_five_flags(self, tmp_path):
db_path = str(tmp_path / "test.db")
migrate(db_path)
conn = sqlite3.connect(db_path)
names = {r[0] for r in conn.execute("SELECT name FROM feature_flags").fetchall()}
conn.close()
assert names == {"markets", "payments", "planner_export", "supplier_signup", "lead_unlock"}
def test_idempotent_on_rerun(self, tmp_path):
"""Running migrate twice does not duplicate seed rows."""
db_path = str(tmp_path / "test.db")
migrate(db_path)
migrate(db_path)
conn = sqlite3.connect(db_path)
count = conn.execute("SELECT COUNT(*) FROM feature_flags").fetchone()[0]
conn.close()
assert count == 5
# ── is_flag_enabled ───────────────────────────────────────────────
class TestIsFlagEnabled:
"""Unit tests for is_flag_enabled() helper."""
@pytest.mark.asyncio
async def test_returns_true_for_enabled_flag(self, db):
await _set_flag(db, "markets", True)
assert await core.is_flag_enabled("markets") is True
@pytest.mark.asyncio
async def test_returns_false_for_disabled_flag(self, db):
await _set_flag(db, "payments", False)
assert await core.is_flag_enabled("payments") is False
@pytest.mark.asyncio
async def test_returns_default_false_for_unknown_flag(self, db):
assert await core.is_flag_enabled("nonexistent_flag") is False
@pytest.mark.asyncio
async def test_returns_custom_default_for_unknown_flag(self, db):
assert await core.is_flag_enabled("nonexistent_flag", default=True) is True
@pytest.mark.asyncio
async def test_reflects_live_db_change(self, db):
"""Change takes effect without restart — reads live DB every call."""
await _set_flag(db, "payments", False)
assert await core.is_flag_enabled("payments") is False
await _set_flag(db, "payments", True)
assert await core.is_flag_enabled("payments") is True
# ── feature_gate decorator ────────────────────────────────────────
class TestFeatureGateDecorator:
"""feature_gate blocks GET when flag is disabled, passes through when enabled."""
@pytest.mark.asyncio
async def test_shows_waitlist_on_get_when_flag_disabled(self, client, db):
"""GET /auth/signup shows waitlist page when payments flag is off."""
await _set_flag(db, "payments", False)
response = await client.get("/auth/signup")
assert response.status_code == 200
html = await response.get_data(as_text=True)
# payments waitlist.html shows "join" or "launching soon" messaging
assert "launching soon" in html.lower() or "join" in html.lower() or "waitlist" in html.lower()
@pytest.mark.asyncio
async def test_shows_normal_form_on_get_when_flag_enabled(self, client, db):
"""GET /auth/signup shows normal signup form when payments flag is on."""
await _set_flag(db, "payments", True)
response = await client.get("/auth/signup")
assert response.status_code == 200
html = await response.get_data(as_text=True)
# Normal signup shows create account form
assert "create" in html.lower() or "sign up" in html.lower() or "email" in html.lower()
@pytest.mark.asyncio
async def test_post_passes_through_when_flag_disabled(self, client, db):
"""POST is never blocked by feature_gate (gates GET only)."""
await _set_flag(db, "payments", False)
with patch("padelnomics.worker.enqueue", new_callable=AsyncMock):
response = await client.post("/auth/signup", form={
"csrf_token": "test_token",
"email": "test@example.com",
})
assert response.status_code in (200, 302)
@pytest.mark.asyncio
async def test_markets_route_gated_by_markets_flag(self, client, db):
"""markets flag controls /en/markets access."""
# Disabled → shows gated page
await _set_flag(db, "markets", False)
response = await client.get("/en/markets")
assert response.status_code == 200
html = await response.get_data(as_text=True)
assert "coming soon" in html.lower() or "intelligence" in html.lower()
# Enabled → passes through
await _set_flag(db, "markets", True)
response = await client.get("/en/markets")
assert response.status_code == 200
html = await response.get_data(as_text=True)
# Normal markets page doesn't show the "coming soon" waitlist title
assert "coming soon" not in html.lower()
@pytest.mark.asyncio
async def test_supplier_signup_gated_by_flag(self, client, db):
"""supplier_signup flag controls /en/suppliers/signup."""
await _set_flag(db, "supplier_signup", False)
response = await client.get("/en/suppliers/signup")
assert response.status_code == 200
html = await response.get_data(as_text=True)
assert "waitlist" in html.lower() or "supplier" in html.lower()
@pytest.mark.asyncio
async def test_planner_export_gated_for_authenticated_user(self, auth_client, db):
"""planner_export flag controls /en/planner/export."""
await _set_flag(db, "planner_export", False)
response = await auth_client.get("/en/planner/export")
assert response.status_code == 200
html = await response.get_data(as_text=True)
assert "coming soon" in html.lower() or "waitlist" in html.lower()
@pytest.mark.asyncio
async def test_planner_export_accessible_when_enabled(self, auth_client, db):
"""Normal planner export page shown when planner_export flag is on."""
await _set_flag(db, "planner_export", True)
response = await auth_client.get("/en/planner/export")
assert response.status_code == 200
html = await response.get_data(as_text=True)
assert "coming soon" not in html.lower()
# ── lead_unlock gate ──────────────────────────────────────────────
class TestLeadUnlockGate:
"""lead_unlock flag controls whether the unlock endpoint is reachable."""
@pytest.mark.asyncio
async def test_is_flag_disabled_by_default(self, db):
"""lead_unlock flag seeded as disabled — is_flag_enabled returns False."""
await _set_flag(db, "lead_unlock", False)
assert await core.is_flag_enabled("lead_unlock") is False
@pytest.mark.asyncio
async def test_is_flag_enabled_after_toggle(self, db):
"""is_flag_enabled returns True after flag is enabled."""
await _set_flag(db, "lead_unlock", True)
assert await core.is_flag_enabled("lead_unlock") is True
@pytest.mark.asyncio
async def test_route_imports_is_flag_enabled(self):
"""suppliers/routes.py imports is_flag_enabled (gate is wired up)."""
from padelnomics.suppliers.routes import unlock_lead
import inspect
src = inspect.getsource(unlock_lead)
assert "is_flag_enabled" in src
assert "lead_unlock" in src
assert "403" in src
# ── Admin flag toggle routes ──────────────────────────────────────
class TestAdminFlagRoutes:
"""Admin /admin/flags endpoints require admin role and toggle flags correctly."""
@pytest.mark.asyncio
async def test_flags_page_requires_admin(self, client, db):
"""Anonymous request to /admin/flags redirects to login."""
response = await client.get("/admin/flags", follow_redirects=False)
assert response.status_code == 302
@pytest.mark.asyncio
async def test_flags_page_accessible_to_admin(self, admin_client, db):
await _seed_all_flags(db)
response = await admin_client.get("/admin/flags")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_flags_page_lists_all_seeded_flags(self, admin_client, db):
await _seed_all_flags(db)
response = await admin_client.get("/admin/flags")
assert response.status_code == 200
html = await response.get_data(as_text=True)
for flag in ("markets", "payments", "planner_export", "supplier_signup", "lead_unlock"):
assert flag in html
@pytest.mark.asyncio
async def test_toggle_enables_disabled_flag(self, admin_client, db):
await _set_flag(db, "payments", False)
await admin_client.post("/admin/flags/toggle", form={
"csrf_token": "test_token",
"name": "payments",
})
assert await _flag_value(db, "payments") == 1
@pytest.mark.asyncio
async def test_toggle_disables_enabled_flag(self, admin_client, db):
await _set_flag(db, "markets", True)
await admin_client.post("/admin/flags/toggle", form={
"csrf_token": "test_token",
"name": "markets",
})
assert await _flag_value(db, "markets") == 0
@pytest.mark.asyncio
async def test_toggle_unknown_flag_redirects_with_flash(self, admin_client, db):
response = await admin_client.post("/admin/flags/toggle", form={
"csrf_token": "test_token",
"name": "nonexistent_flag",
}, follow_redirects=True)
assert response.status_code == 200
html = await response.get_data(as_text=True)
assert "not found" in html.lower()
@pytest.mark.asyncio
async def test_toggle_requires_admin(self, client, db):
"""Anonymous POST to toggle is rejected."""
response = await client.post("/admin/flags/toggle", form={
"csrf_token": "test_token",
"name": "markets",
}, follow_redirects=False)
assert response.status_code == 302
# ── Full toggle flow (e2e) ────────────────────────────────────────
class TestFlagToggleFlow:
"""End-to-end: admin toggles flag → route behaviour changes immediately."""
@pytest.mark.asyncio
async def test_disable_markets_shows_gated_page(self, admin_client, client, db):
"""Disable markets → /en/markets shows coming soon. Enable → shows content."""
# Seed markets as enabled
await _set_flag(db, "markets", True)
response = await client.get("/en/markets")
html = await response.get_data(as_text=True)
assert "coming soon" not in html.lower()
# Admin disables via toggle
await admin_client.post("/admin/flags/toggle", form={
"csrf_token": "test_token",
"name": "markets",
})
# Now shows gated page
response = await client.get("/en/markets")
html = await response.get_data(as_text=True)
assert "coming soon" in html.lower()
@pytest.mark.asyncio
async def test_enable_supplier_signup_passes_through(self, admin_client, client, db):
"""Enable supplier_signup → normal wizard shown (no waitlist page)."""
# Start with flag disabled
await _set_flag(db, "supplier_signup", False)
response = await client.get("/en/suppliers/signup")
html = await response.get_data(as_text=True)
# Gated: shows waitlist/supplier promo content
assert "waitlist" in html.lower() or "supplier" in html.lower()
# Admin enables
await admin_client.post("/admin/flags/toggle", form={
"csrf_token": "test_token",
"name": "supplier_signup",
})
# Now passes through (wizard is shown)
response = await client.get("/en/suppliers/signup")
assert response.status_code == 200
html = await response.get_data(as_text=True)
assert "waitlist" not in html.lower()
@pytest.mark.asyncio
async def test_double_toggle_restores_original_state(self, admin_client, db):
"""Toggling a flag twice returns it to its original value."""
await _set_flag(db, "payments", False)
assert await _flag_value(db, "payments") == 0
await admin_client.post("/admin/flags/toggle", form={
"csrf_token": "test_token", "name": "payments",
})
assert await _flag_value(db, "payments") == 1
await admin_client.post("/admin/flags/toggle", form={
"csrf_token": "test_token", "name": "payments",
})
assert await _flag_value(db, "payments") == 0

View File

@@ -0,0 +1,285 @@
"""
Unit tests for supervisor.py and proxy.py.
Tests cover pure-Python logic only — no DB, no subprocesses, no network.
DB-dependent functions (is_due, _get_last_success_time) are tested via mocks.
supervisor.py lives in src/padelnomics/ (not a uv workspace package), so we
add src/ to sys.path before importing.
"""
import sys
import textwrap
import tomllib
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# Load supervisor.py directly by path — avoids clashing with the web app's
# 'padelnomics' namespace (which is the installed web package).
import importlib.util as _ilu
_SUP_PATH = Path(__file__).parent.parent.parent / "src" / "padelnomics" / "supervisor.py"
_spec = _ilu.spec_from_file_location("padelnomics_supervisor", _SUP_PATH)
sup = _ilu.module_from_spec(_spec)
_spec.loader.exec_module(sup)
from padelnomics_extract.proxy import (
load_proxy_urls,
make_round_robin_cycler,
make_sticky_selector,
)
# ── load_workflows ────────────────────────────────────────────────
class TestLoadWorkflows:
def test_loads_all_fields(self, tmp_path):
toml = tmp_path / "workflows.toml"
toml.write_text(textwrap.dedent("""\
[extract_a]
module = "mypkg.extract_a"
schedule = "daily"
[extract_b]
module = "mypkg.extract_b"
schedule = "weekly"
entry = "run"
depends_on = ["extract_a"]
proxy_mode = "sticky"
"""))
wfs = sup.load_workflows(toml)
assert len(wfs) == 2
a = next(w for w in wfs if w["name"] == "extract_a")
assert a["module"] == "mypkg.extract_a"
assert a["schedule"] == "daily"
assert a["entry"] == "main" # default
assert a["depends_on"] == [] # default
assert a["proxy_mode"] == "round-robin" # default
b = next(w for w in wfs if w["name"] == "extract_b")
assert b["entry"] == "run"
assert b["depends_on"] == ["extract_a"]
assert b["proxy_mode"] == "sticky"
def test_raises_on_missing_module(self, tmp_path):
toml = tmp_path / "bad.toml"
toml.write_text("[wf]\nschedule = 'daily'\n")
with pytest.raises(AssertionError, match="missing 'module'"):
sup.load_workflows(toml)
def test_raises_on_missing_schedule(self, tmp_path):
toml = tmp_path / "bad.toml"
toml.write_text("[wf]\nmodule = 'mypkg.wf'\n")
with pytest.raises(AssertionError, match="missing 'schedule'"):
sup.load_workflows(toml)
def test_raises_if_file_missing(self, tmp_path):
with pytest.raises(AssertionError, match="not found"):
sup.load_workflows(tmp_path / "nonexistent.toml")
# ── resolve_schedule ──────────────────────────────────────────────
class TestResolveSchedule:
def test_maps_named_presets(self):
assert sup.resolve_schedule("hourly") == "0 * * * *"
assert sup.resolve_schedule("daily") == "0 5 * * *"
assert sup.resolve_schedule("weekly") == "0 3 * * 1"
assert sup.resolve_schedule("monthly") == "0 4 1 * *"
def test_passes_through_raw_cron(self):
expr = "0 6-23 * * *"
assert sup.resolve_schedule(expr) == expr
def test_unknown_name_passes_through(self):
assert sup.resolve_schedule("quarterly") == "quarterly"
# ── is_due ────────────────────────────────────────────────────────
class TestIsDue:
def _wf(self, schedule="daily", name="test_wf"):
return {"name": name, "schedule": schedule}
def test_never_ran_is_due(self):
conn = MagicMock()
with patch.object(sup, "_get_last_success_time", return_value=None):
assert sup.is_due(conn, self._wf()) is True
def test_ran_after_last_trigger_is_not_due(self):
"""Last run was AFTER the most recent trigger — not due."""
conn = MagicMock()
# Use a daily schedule — trigger fires at 05:00 UTC
# Simulate last success = today at 06:00, so trigger already covered
now = datetime.now(UTC)
last_success = now.replace(hour=6, minute=0, second=0, microsecond=0)
with patch.object(sup, "_get_last_success_time", return_value=last_success):
assert sup.is_due(conn, self._wf(schedule="daily")) is False
def test_ran_before_last_trigger_is_due(self):
"""Last run was BEFORE the most recent trigger — due again."""
conn = MagicMock()
# Monthly fires on the 1st at 04:00 — simulate running last month
last_success = datetime.now(UTC) - timedelta(days=35)
with patch.object(sup, "_get_last_success_time", return_value=last_success):
assert sup.is_due(conn, self._wf(schedule="monthly")) is True
# ── topological_waves ─────────────────────────────────────────────
class TestTopologicalWaves:
def _wf(self, name, depends_on=None):
return {"name": name, "depends_on": depends_on or []}
def test_no_deps_single_wave(self):
wfs = [self._wf("a"), self._wf("b"), self._wf("c")]
waves = sup.topological_waves(wfs)
assert len(waves) == 1
assert {w["name"] for w in waves[0]} == {"a", "b", "c"}
def test_simple_chain_two_waves(self):
wfs = [self._wf("a"), self._wf("b", depends_on=["a"])]
waves = sup.topological_waves(wfs)
assert len(waves) == 2
assert waves[0][0]["name"] == "a"
assert waves[1][0]["name"] == "b"
def test_diamond_three_waves(self):
"""a → b,c → d"""
wfs = [
self._wf("a"),
self._wf("b", depends_on=["a"]),
self._wf("c", depends_on=["a"]),
self._wf("d", depends_on=["b", "c"]),
]
waves = sup.topological_waves(wfs)
assert len(waves) == 3
assert waves[0][0]["name"] == "a"
assert {w["name"] for w in waves[1]} == {"b", "c"}
assert waves[2][0]["name"] == "d"
def test_dep_outside_due_set_ignored(self):
"""Dependency not in the due set is treated as satisfied."""
wfs = [self._wf("b", depends_on=["a"])] # "a" not in due set
waves = sup.topological_waves(wfs)
assert len(waves) == 1
assert waves[0][0]["name"] == "b"
def test_circular_dep_raises(self):
wfs = [
self._wf("a", depends_on=["b"]),
self._wf("b", depends_on=["a"]),
]
with pytest.raises(AssertionError, match="Circular dependency"):
sup.topological_waves(wfs)
def test_empty_list_returns_empty(self):
assert sup.topological_waves([]) == []
def test_real_workflows_toml(self):
"""The actual workflows.toml in the repo parses and produces valid waves."""
repo_root = Path(__file__).parent.parent.parent
wf_path = repo_root / "infra" / "supervisor" / "workflows.toml"
if not wf_path.exists():
pytest.skip("workflows.toml not found")
wfs = sup.load_workflows(wf_path)
waves = sup.topological_waves(wfs)
# playtomic_availability must come after playtomic_tenants
all_names = [w["name"] for wave in waves for w in wave]
tenants_idx = all_names.index("playtomic_tenants")
avail_idx = all_names.index("playtomic_availability")
assert tenants_idx < avail_idx
# ── proxy.py ─────────────────────────────────────────────────────
class TestLoadProxyUrls:
def test_returns_empty_when_unset(self, monkeypatch):
monkeypatch.delenv("PROXY_URLS", raising=False)
assert load_proxy_urls() == []
def test_parses_comma_separated_urls(self, monkeypatch):
monkeypatch.setenv(
"PROXY_URLS",
"http://p1:8080,http://p2:8080,http://p3:8080",
)
urls = load_proxy_urls()
assert urls == ["http://p1:8080", "http://p2:8080", "http://p3:8080"]
def test_strips_whitespace(self, monkeypatch):
monkeypatch.setenv("PROXY_URLS", " http://p1:8080 , http://p2:8080 ")
urls = load_proxy_urls()
assert urls == ["http://p1:8080", "http://p2:8080"]
def test_ignores_empty_segments(self, monkeypatch):
monkeypatch.setenv("PROXY_URLS", "http://p1:8080,,http://p2:8080,")
urls = load_proxy_urls()
assert urls == ["http://p1:8080", "http://p2:8080"]
class TestRoundRobinCycler:
def test_returns_none_callable_when_no_proxies(self):
fn = make_round_robin_cycler([])
assert fn() is None
def test_cycles_through_proxies(self):
urls = ["http://p1", "http://p2", "http://p3"]
fn = make_round_robin_cycler(urls)
results = [fn() for _ in range(6)]
assert results == ["http://p1", "http://p2", "http://p3"] * 2
def test_thread_safe_independent_calls(self):
"""Concurrent calls each get a proxy — no exceptions."""
import threading
urls = ["http://p1", "http://p2"]
fn = make_round_robin_cycler(urls)
results = []
lock = threading.Lock()
def worker():
proxy = fn()
with lock:
results.append(proxy)
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(results) == 10
assert all(r in urls for r in results)
class TestStickySelectorProxy:
def test_returns_none_callable_when_no_proxies(self):
fn = make_sticky_selector([])
assert fn("any_key") is None
def test_same_key_always_same_proxy(self):
urls = ["http://p1", "http://p2", "http://p3"]
fn = make_sticky_selector(urls)
proxy = fn("tenant_abc")
for _ in range(10):
assert fn("tenant_abc") == proxy
def test_different_keys_can_map_to_different_proxies(self):
urls = ["http://p1", "http://p2", "http://p3"]
fn = make_sticky_selector(urls)
results = {fn(f"key_{i}") for i in range(30)}
assert len(results) > 1 # distribution across proxies
def test_all_results_are_valid_proxies(self):
urls = ["http://p1", "http://p2"]
fn = make_sticky_selector(urls)
for i in range(20):
assert fn(f"key_{i}") in urls

View File

@@ -251,13 +251,16 @@ class TestAuthRoutes:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_normal_signup_when_waitlist_disabled(self, client, db): async def test_normal_signup_when_waitlist_disabled(self, client, db):
"""Normal signup flow when WAITLIST_MODE is false.""" """Normal signup flow when payments flag is enabled (WAITLIST_MODE replaced by feature flag)."""
with patch.object(core.config, "WAITLIST_MODE", False): await db.execute(
response = await client.get("/auth/signup") "INSERT OR REPLACE INTO feature_flags (name, enabled, description) VALUES ('payments', 1, '')"
assert response.status_code == 200 )
html = await response.get_data(as_text=True) await db.commit()
# Should see normal signup form, not waitlist form response = await client.get("/auth/signup")
assert "Create Free Account" in html or "Sign Up" in html assert response.status_code == 200
html = await response.get_data(as_text=True)
# Should see normal signup form, not waitlist form
assert "Create Free Account" in html or "Sign Up" in html
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_shows_waitlist_form_when_enabled(self, client, db): async def test_shows_waitlist_form_when_enabled(self, client, db):
@@ -713,13 +716,16 @@ class TestWaitlistGateDecorator:
"""Test waitlist_gate decorator via integration tests.""" """Test waitlist_gate decorator via integration tests."""
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_passes_through_when_waitlist_disabled(self, client): async def test_passes_through_when_waitlist_disabled(self, client, db):
"""Decorator passes through to normal flow when WAITLIST_MODE=false.""" """feature_gate passes through to normal form when payments flag is enabled."""
with patch.object(core.config, "WAITLIST_MODE", False): await db.execute(
response = await client.get("/auth/signup") "INSERT OR REPLACE INTO feature_flags (name, enabled, description) VALUES ('payments', 1, '')"
html = await response.get_data(as_text=True) )
# Should see normal signup, not waitlist await db.commit()
assert "waitlist" not in html.lower() or "create" in html.lower() response = await client.get("/auth/signup")
html = await response.get_data(as_text=True)
# Should see normal signup, not waitlist
assert "waitlist" not in html.lower() or "create" in html.lower()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_intercepts_get_when_waitlist_enabled(self, client): async def test_intercepts_get_when_waitlist_enabled(self, client):
@@ -940,21 +946,21 @@ class TestIntegration:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_toggle_off_reverts_to_normal_signup(self, client, db): async def test_toggle_off_reverts_to_normal_signup(self, client, db):
"""Setting WAITLIST_MODE=false reverts to normal signup flow.""" """Disabling payments flag shows waitlist; enabling it shows normal signup."""
# First, enable waitlist mode # payments=0 (default) → shows waitlist page
with patch.object(core.config, "WAITLIST_MODE", True): response = await client.get("/auth/signup")
response = await client.get("/auth/signup") html = await response.get_data(as_text=True)
html = await response.get_data(as_text=True) assert "waitlist" in html.lower()
assert "waitlist" in html.lower()
# Then, disable waitlist mode # Enable payments flag → shows normal signup
with patch.object(core.config, "WAITLIST_MODE", False): await db.execute(
response = await client.get("/auth/signup") "INSERT OR REPLACE INTO feature_flags (name, enabled, description) VALUES ('payments', 1, '')"
html = await response.get_data(as_text=True) )
# Should see normal signup, not waitlist await db.commit()
assert "create" in html.lower() or "sign up" in html.lower() response = await client.get("/auth/signup")
# Should NOT see waitlist messaging html = await response.get_data(as_text=True)
assert "join the waitlist" not in html.lower() assert "create" in html.lower() or "sign up" in html.lower()
assert "join the waitlist" not in html.lower()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_same_email_different_intents_both_captured(self, client, db): async def test_same_email_different_intents_both_captured(self, client, db):