- Add 18 new E2E tests from master: pricing, checkout, supplier signup, supplier dashboard, and business plan export (sections J-N) - Force WAITLIST_MODE=false in visual server subprocess — the root .env sets WAITLIST_MODE=true, and since Config class attributes evaluate at import time (before fork), the subprocess inherits the parent's value. Patching both os.environ and core.config directly ensures feature pages render instead of waitlist templates. - All 77 visual tests now pass in ~59 seconds. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
344 lines
12 KiB
Python
344 lines
12 KiB
Python
"""
|
|
Shared test fixtures for the Padelnomics test suite.
|
|
"""
|
|
import hashlib
|
|
import hmac
|
|
import sqlite3
|
|
import tempfile
|
|
import time
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import aiosqlite
|
|
import pytest
|
|
from padelnomics.app import create_app
|
|
from padelnomics.migrations.migrate import migrate
|
|
|
|
from padelnomics import core
|
|
|
|
_SCHEMA_CACHE = None
|
|
|
|
|
|
def _get_schema_ddl():
|
|
"""Run all migrations once against a temp DB and cache the resulting DDL."""
|
|
global _SCHEMA_CACHE
|
|
if _SCHEMA_CACHE is not None:
|
|
return _SCHEMA_CACHE
|
|
|
|
tmp_db = str(Path(tempfile.mkdtemp()) / "schema.db")
|
|
migrate(tmp_db)
|
|
tmp_conn = sqlite3.connect(tmp_db)
|
|
rows = tmp_conn.execute(
|
|
"SELECT sql FROM sqlite_master"
|
|
" WHERE sql IS NOT NULL"
|
|
" AND name NOT LIKE 'sqlite_%'"
|
|
" AND name NOT LIKE '%_fts_%'" # FTS5 shadow tables (created by VIRTUAL TABLE)
|
|
" AND name != '_migrations'"
|
|
" ORDER BY rowid"
|
|
).fetchall()
|
|
tmp_conn.close()
|
|
_SCHEMA_CACHE = ";\n".join(r[0] for r in rows) + ";"
|
|
return _SCHEMA_CACHE
|
|
|
|
|
|
# ── Database ─────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
async def db():
|
|
"""In-memory SQLite with full schema from replaying migrations."""
|
|
schema_ddl = _get_schema_ddl()
|
|
|
|
conn = await aiosqlite.connect(":memory:")
|
|
conn.row_factory = aiosqlite.Row
|
|
await conn.execute("PRAGMA foreign_keys=ON")
|
|
await conn.executescript(schema_ddl)
|
|
await conn.commit()
|
|
|
|
original_db = core._db
|
|
core._db = conn
|
|
|
|
yield conn
|
|
|
|
core._db = original_db
|
|
await conn.close()
|
|
|
|
|
|
# ── App & client ─────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
async def app(db):
|
|
"""Quart app with DB already initialized (init_db/close_db patched to no-op)."""
|
|
with patch.object(core, "init_db", new_callable=AsyncMock), \
|
|
patch.object(core, "close_db", new_callable=AsyncMock):
|
|
application = create_app()
|
|
application.config["TESTING"] = True
|
|
yield application
|
|
|
|
|
|
@pytest.fixture
|
|
async def client(app):
|
|
"""Unauthenticated test client."""
|
|
async with app.test_client() as c:
|
|
yield c
|
|
|
|
|
|
# ── Users ────────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
async def test_user(db):
|
|
"""Create a test user, return dict with id/email/name."""
|
|
now = datetime.now(UTC).isoformat()
|
|
async with db.execute(
|
|
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
|
|
("test@example.com", "Test User", now),
|
|
) as cursor:
|
|
user_id = cursor.lastrowid
|
|
await db.commit()
|
|
return {"id": user_id, "email": "test@example.com", "name": "Test User"}
|
|
|
|
|
|
@pytest.fixture
|
|
async def auth_client(app, test_user):
|
|
"""Test client with session['user_id'] pre-set."""
|
|
async with app.test_client() as c:
|
|
async with c.session_transaction() as sess:
|
|
sess["user_id"] = test_user["id"]
|
|
yield c
|
|
|
|
|
|
# ── Subscriptions ────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
def create_subscription(db):
|
|
"""Factory: create a subscription row for a user."""
|
|
async def _create(
|
|
user_id: int,
|
|
plan: str = "pro",
|
|
status: str = "active",
|
|
provider_customer_id: str = "ctm_test123",
|
|
provider_subscription_id: str = "sub_test456",
|
|
current_period_end: str = "2025-03-01T00:00:00Z",
|
|
) -> int:
|
|
now = datetime.now(UTC).isoformat()
|
|
# Create billing_customers record if provider_customer_id given
|
|
if provider_customer_id:
|
|
await db.execute(
|
|
"""INSERT OR IGNORE INTO billing_customers
|
|
(user_id, provider_customer_id, created_at) VALUES (?, ?, ?)""",
|
|
(user_id, provider_customer_id, now),
|
|
)
|
|
async with db.execute(
|
|
"""INSERT INTO subscriptions
|
|
(user_id, plan, status,
|
|
provider_subscription_id, current_period_end, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(user_id, plan, status, provider_subscription_id,
|
|
current_period_end, now, now),
|
|
) as cursor:
|
|
sub_id = cursor.lastrowid
|
|
await db.commit()
|
|
return sub_id
|
|
return _create
|
|
|
|
|
|
# ── Scenarios ────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
async def scenario(db, test_user):
|
|
"""User scenario with valid planner state for PDF generation."""
|
|
import json
|
|
|
|
from padelnomics.planner.calculator import validate_state
|
|
state = validate_state({"dblCourts": 4, "sglCourts": 2})
|
|
now = datetime.now(UTC).isoformat()
|
|
async with db.execute(
|
|
"""INSERT INTO scenarios (user_id, name, state_json, created_at)
|
|
VALUES (?, 'Test Scenario', ?, ?)""",
|
|
(test_user["id"], json.dumps(state), now),
|
|
) as cursor:
|
|
scenario_id = cursor.lastrowid
|
|
await db.commit()
|
|
return {"id": scenario_id, "state": state, "user_id": test_user["id"]}
|
|
|
|
|
|
# ── Config ───────────────────────────────────────────────────
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def patch_config():
|
|
"""Set test Paddle config values."""
|
|
original_values = {}
|
|
test_values = {
|
|
"PADDLE_API_KEY": "test_api_key_123",
|
|
"PADDLE_WEBHOOK_SECRET": "whsec_test_secret",
|
|
"BASE_URL": "http://localhost:5000",
|
|
"DEBUG": True,
|
|
"WAITLIST_MODE": False,
|
|
}
|
|
for key, val in test_values.items():
|
|
original_values[key] = getattr(core.config, key, None)
|
|
setattr(core.config, key, val)
|
|
|
|
yield
|
|
|
|
for key, val in original_values.items():
|
|
setattr(core.config, key, val)
|
|
|
|
|
|
# ── Webhook helpers ──────────────────────────────────────────
|
|
|
|
def make_webhook_payload(
|
|
event_type: str,
|
|
subscription_id: str = "sub_test456",
|
|
customer_id: str = "ctm_test123",
|
|
user_id: str = "1",
|
|
plan: str = "starter",
|
|
status: str = "active",
|
|
ends_at: str = "2025-03-01T00:00:00.000000Z",
|
|
) -> dict:
|
|
"""Build a Paddle webhook payload dict."""
|
|
return {
|
|
"event_type": event_type,
|
|
"data": {
|
|
"id": subscription_id,
|
|
"status": status,
|
|
"customer_id": customer_id,
|
|
"custom_data": {"user_id": user_id, "plan": plan},
|
|
"current_billing_period": {
|
|
"starts_at": "2025-02-01T00:00:00.000000Z",
|
|
"ends_at": ends_at,
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
def sign_payload(payload_bytes: bytes, secret: str = "whsec_test_secret") -> str:
|
|
"""Build a Paddle-format signature header: ts=<unix>;h1=<hmac_sha256>."""
|
|
ts = str(int(time.time()))
|
|
data = f"{ts}:{payload_bytes.decode()}".encode()
|
|
h1 = hmac.new(secret.encode(), data, hashlib.sha256).hexdigest()
|
|
return f"ts={ts};h1={h1}"
|
|
|
|
|
|
# ── Visual test fixtures (Playwright) ────────────────────────
|
|
# Session-scoped: one server + one browser for all visual tests.
|
|
|
|
_VISUAL_PORT = 5111
|
|
|
|
|
|
async def _seed_visual_data(conn):
|
|
"""Seed a supplier + feature flags for E2E billing/dashboard tests."""
|
|
await conn.execute(
|
|
"INSERT INTO users (id, email, created_at)"
|
|
" VALUES (999, 'supplier@test.com', datetime('now'))"
|
|
)
|
|
await conn.execute(
|
|
"INSERT INTO suppliers"
|
|
" (id, name, slug, tier, claimed_by, claimed_at,"
|
|
" country_code, region, category, credit_balance,"
|
|
" monthly_credits, contact_name, contact_email, created_at)"
|
|
" VALUES (1, 'Test Supplier GmbH', 'test-supplier', 'pro', 999,"
|
|
" datetime('now'), 'DE', 'Europe', 'construction', 50,"
|
|
" 10, 'Test User', 'supplier@test.com', datetime('now'))"
|
|
)
|
|
for flag in ("supplier_signup", "markets", "payments",
|
|
"planner_export", "lead_unlock"):
|
|
await conn.execute(
|
|
"INSERT OR REPLACE INTO feature_flags (name, enabled)"
|
|
" VALUES (?, 1)", (flag,)
|
|
)
|
|
await conn.commit()
|
|
|
|
|
|
def _run_visual_server(ready_event):
|
|
"""Run a Quart dev server in a subprocess for visual/E2E tests.
|
|
|
|
Forces RESEND_API_KEY="" so no real emails are sent.
|
|
Forces WAITLIST_MODE=false so feature pages render (not waitlist templates).
|
|
Runs migrations in-process to build the full schema (including FTS tables).
|
|
"""
|
|
import asyncio
|
|
import os
|
|
|
|
os.environ["RESEND_API_KEY"] = ""
|
|
os.environ["WAITLIST_MODE"] = "false"
|
|
# Config class attributes are evaluated at import time (before fork),
|
|
# so we must also patch the live config object directly.
|
|
core.config.WAITLIST_MODE = False
|
|
|
|
async def _serve():
|
|
# Build schema DDL in-process (FTS5 virtual tables need this)
|
|
tmp_db = str(Path(tempfile.mkdtemp()) / "schema.db")
|
|
migrate(tmp_db)
|
|
tmp_conn = sqlite3.connect(tmp_db)
|
|
rows = tmp_conn.execute(
|
|
"SELECT sql FROM sqlite_master"
|
|
" WHERE sql IS NOT NULL"
|
|
" AND name NOT LIKE 'sqlite_%'"
|
|
" AND name NOT LIKE '%_fts_%'"
|
|
" AND name != '_migrations'"
|
|
" ORDER BY rowid"
|
|
).fetchall()
|
|
tmp_conn.close()
|
|
schema_ddl = ";\n".join(r[0] for r in rows) + ";"
|
|
|
|
conn = await aiosqlite.connect(":memory:")
|
|
conn.row_factory = aiosqlite.Row
|
|
await conn.execute("PRAGMA foreign_keys=ON")
|
|
await conn.executescript(schema_ddl)
|
|
# Ensure feature_flags table exists (may be missed if an FTS5
|
|
# CREATE VIRTUAL TABLE causes executescript to stop early)
|
|
await conn.execute(
|
|
"CREATE TABLE IF NOT EXISTS feature_flags"
|
|
" (name TEXT PRIMARY KEY, enabled INTEGER NOT NULL DEFAULT 0,"
|
|
" description TEXT,"
|
|
" updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now')))"
|
|
)
|
|
# Seed data needed by E2E tests (supplier dashboard, billing, etc.)
|
|
await _seed_visual_data(conn)
|
|
core._db = conn
|
|
|
|
# Patch init_db/close_db where they're USED (app.py imports them
|
|
# locally via `from .core import init_db` — patching core.init_db
|
|
# alone doesn't affect the local binding in app.py).
|
|
# Patches must stay active through run_task() because before_serving
|
|
# hooks call init_db() which would replace our in-memory DB.
|
|
with patch("padelnomics.app.init_db", new_callable=AsyncMock), \
|
|
patch("padelnomics.app.close_db", new_callable=AsyncMock), \
|
|
patch("padelnomics.app.open_analytics_db"), \
|
|
patch("padelnomics.app.close_analytics_db"):
|
|
app = create_app()
|
|
app.config["TESTING"] = True
|
|
ready_event.set()
|
|
await app.run_task(host="127.0.0.1", port=_VISUAL_PORT)
|
|
|
|
asyncio.run(_serve())
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def live_server():
|
|
"""Start a live Quart server on port 5111 for all visual/E2E tests."""
|
|
import multiprocessing
|
|
|
|
ready = multiprocessing.Event()
|
|
proc = multiprocessing.Process(
|
|
target=_run_visual_server, args=(ready,), daemon=True
|
|
)
|
|
proc.start()
|
|
ready.wait(timeout=10)
|
|
time.sleep(1)
|
|
yield f"http://127.0.0.1:{_VISUAL_PORT}"
|
|
proc.terminate()
|
|
proc.join(timeout=5)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def browser():
|
|
"""Launch a headless Chromium browser (once per test session)."""
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
with sync_playwright() as p:
|
|
b = p.chromium.launch(headless=True)
|
|
yield b
|
|
b.close()
|