Files
padelnomics/web/tests/conftest.py
Deeman 777333e918 refactor(tests): overhaul visual tests — single server, mock emails, fix init_db
- Consolidate 3 duplicate server processes into 1 session-scoped
  live_server fixture in conftest.py (port 5111, shared across all
  visual test modules). Reduces startup overhead from ~3× to 1×.

- Fix init_db mock: patch padelnomics.app.init_db (where it's used)
  instead of core.init_db (where it's defined). The before_serving
  hook imported init_db locally — patching core alone didn't prevent
  the real init_db from replacing the in-memory test DB.

- Keep patches active through app.run_task() so before_serving hooks
  can't replace the test DB during the server's lifetime.

- Force RESEND_API_KEY="" in the visual test server subprocess to
  prevent real email sends (dev mode: prints to stdout, returns "dev").

- Remove 4 screenshot-only no-op tests, replace with single
  test_capture_screenshots that grabs all pages in one pass.

- Fix test_planner_tab_switching: remove nonexistent "metrics" tab.

- Delete ~200 lines of duplicated boilerplate from 3 test files.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 18:40:11 +01:00

304 lines
10 KiB
Python

"""
Shared test fixtures for the Padelnomics test suite.
"""
import hashlib
import hmac
import sqlite3
import tempfile
import time
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import AsyncMock, patch
import aiosqlite
import pytest
from padelnomics.app import create_app
from padelnomics.migrations.migrate import migrate
from padelnomics import core
_SCHEMA_CACHE = None
def _get_schema_ddl():
"""Run all migrations once against a temp DB and cache the resulting DDL."""
global _SCHEMA_CACHE
if _SCHEMA_CACHE is not None:
return _SCHEMA_CACHE
tmp_db = str(Path(tempfile.mkdtemp()) / "schema.db")
migrate(tmp_db)
tmp_conn = sqlite3.connect(tmp_db)
rows = tmp_conn.execute(
"SELECT sql FROM sqlite_master"
" WHERE sql IS NOT NULL"
" AND name NOT LIKE 'sqlite_%'"
" AND name NOT LIKE '%_fts_%'" # FTS5 shadow tables (created by VIRTUAL TABLE)
" AND name != '_migrations'"
" ORDER BY rowid"
).fetchall()
tmp_conn.close()
_SCHEMA_CACHE = ";\n".join(r[0] for r in rows) + ";"
return _SCHEMA_CACHE
# ── Database ─────────────────────────────────────────────────
@pytest.fixture
async def db():
"""In-memory SQLite with full schema from replaying migrations."""
schema_ddl = _get_schema_ddl()
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON")
await conn.executescript(schema_ddl)
await conn.commit()
original_db = core._db
core._db = conn
yield conn
core._db = original_db
await conn.close()
# ── App & client ─────────────────────────────────────────────
@pytest.fixture
async def app(db):
"""Quart app with DB already initialized (init_db/close_db patched to no-op)."""
with patch.object(core, "init_db", new_callable=AsyncMock), \
patch.object(core, "close_db", new_callable=AsyncMock):
application = create_app()
application.config["TESTING"] = True
yield application
@pytest.fixture
async def client(app):
"""Unauthenticated test client."""
async with app.test_client() as c:
yield c
# ── Users ────────────────────────────────────────────────────
@pytest.fixture
async def test_user(db):
"""Create a test user, return dict with id/email/name."""
now = datetime.now(UTC).isoformat()
async with db.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("test@example.com", "Test User", now),
) as cursor:
user_id = cursor.lastrowid
await db.commit()
return {"id": user_id, "email": "test@example.com", "name": "Test User"}
@pytest.fixture
async def auth_client(app, test_user):
"""Test client with session['user_id'] pre-set."""
async with app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
yield c
# ── Subscriptions ────────────────────────────────────────────
@pytest.fixture
def create_subscription(db):
"""Factory: create a subscription row for a user."""
async def _create(
user_id: int,
plan: str = "pro",
status: str = "active",
provider_customer_id: str = "ctm_test123",
provider_subscription_id: str = "sub_test456",
current_period_end: str = "2025-03-01T00:00:00Z",
) -> int:
now = datetime.now(UTC).isoformat()
# Create billing_customers record if provider_customer_id given
if provider_customer_id:
await db.execute(
"""INSERT OR IGNORE INTO billing_customers
(user_id, provider_customer_id, created_at) VALUES (?, ?, ?)""",
(user_id, provider_customer_id, now),
)
async with db.execute(
"""INSERT INTO subscriptions
(user_id, plan, status,
provider_subscription_id, current_period_end, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(user_id, plan, status, provider_subscription_id,
current_period_end, now, now),
) as cursor:
sub_id = cursor.lastrowid
await db.commit()
return sub_id
return _create
# ── Scenarios ────────────────────────────────────────────────
@pytest.fixture
async def scenario(db, test_user):
"""User scenario with valid planner state for PDF generation."""
import json
from padelnomics.planner.calculator import validate_state
state = validate_state({"dblCourts": 4, "sglCourts": 2})
now = datetime.now(UTC).isoformat()
async with db.execute(
"""INSERT INTO scenarios (user_id, name, state_json, created_at)
VALUES (?, 'Test Scenario', ?, ?)""",
(test_user["id"], json.dumps(state), now),
) as cursor:
scenario_id = cursor.lastrowid
await db.commit()
return {"id": scenario_id, "state": state, "user_id": test_user["id"]}
# ── Config ───────────────────────────────────────────────────
@pytest.fixture(autouse=True)
def patch_config():
"""Set test Paddle config values."""
original_values = {}
test_values = {
"PADDLE_API_KEY": "test_api_key_123",
"PADDLE_WEBHOOK_SECRET": "whsec_test_secret",
"BASE_URL": "http://localhost:5000",
"DEBUG": True,
"WAITLIST_MODE": False,
}
for key, val in test_values.items():
original_values[key] = getattr(core.config, key, None)
setattr(core.config, key, val)
yield
for key, val in original_values.items():
setattr(core.config, key, val)
# ── Webhook helpers ──────────────────────────────────────────
def make_webhook_payload(
event_type: str,
subscription_id: str = "sub_test456",
customer_id: str = "ctm_test123",
user_id: str = "1",
plan: str = "starter",
status: str = "active",
ends_at: str = "2025-03-01T00:00:00.000000Z",
) -> dict:
"""Build a Paddle webhook payload dict."""
return {
"event_type": event_type,
"data": {
"id": subscription_id,
"status": status,
"customer_id": customer_id,
"custom_data": {"user_id": user_id, "plan": plan},
"current_billing_period": {
"starts_at": "2025-02-01T00:00:00.000000Z",
"ends_at": ends_at,
},
},
}
def sign_payload(payload_bytes: bytes, secret: str = "whsec_test_secret") -> str:
"""Build a Paddle-format signature header: ts=<unix>;h1=<hmac_sha256>."""
ts = str(int(time.time()))
data = f"{ts}:{payload_bytes.decode()}".encode()
h1 = hmac.new(secret.encode(), data, hashlib.sha256).hexdigest()
return f"ts={ts};h1={h1}"
# ── Visual test fixtures (Playwright) ────────────────────────
# Session-scoped: one server + one browser for all visual tests.
_VISUAL_PORT = 5111
def _run_visual_server(ready_event):
"""Run a Quart dev server in a subprocess for visual/E2E tests.
Forces RESEND_API_KEY="" so no real emails are sent.
Runs migrations in-process to build the full schema (including FTS tables).
"""
import asyncio
import os
os.environ["RESEND_API_KEY"] = ""
async def _serve():
# Build schema DDL in-process (FTS5 virtual tables need this)
tmp_db = str(Path(tempfile.mkdtemp()) / "schema.db")
migrate(tmp_db)
tmp_conn = sqlite3.connect(tmp_db)
rows = tmp_conn.execute(
"SELECT sql FROM sqlite_master"
" WHERE sql IS NOT NULL"
" AND name NOT LIKE 'sqlite_%'"
" AND name NOT LIKE '%_fts_%'"
" AND name != '_migrations'"
" ORDER BY rowid"
).fetchall()
tmp_conn.close()
schema_ddl = ";\n".join(r[0] for r in rows) + ";"
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON")
await conn.executescript(schema_ddl)
await conn.commit()
core._db = conn
# Patch init_db/close_db where they're USED (app.py imports them
# locally via `from .core import init_db` — patching core.init_db
# alone doesn't affect the local binding in app.py).
# Patches must stay active through run_task() because before_serving
# hooks call init_db() which would replace our in-memory DB.
with patch("padelnomics.app.init_db", new_callable=AsyncMock), \
patch("padelnomics.app.close_db", new_callable=AsyncMock):
app = create_app()
app.config["TESTING"] = True
ready_event.set()
await app.run_task(host="127.0.0.1", port=_VISUAL_PORT)
asyncio.run(_serve())
@pytest.fixture(scope="session")
def live_server():
"""Start a live Quart server on port 5111 for all visual/E2E tests."""
import multiprocessing
ready = multiprocessing.Event()
proc = multiprocessing.Process(
target=_run_visual_server, args=(ready,), daemon=True
)
proc.start()
ready.wait(timeout=10)
time.sleep(1)
yield f"http://127.0.0.1:{_VISUAL_PORT}"
proc.terminate()
proc.join(timeout=5)
@pytest.fixture(scope="session")
def browser():
"""Launch a headless Chromium browser (once per test session)."""
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
b = p.chromium.launch(headless=True)
yield b
b.close()