Files
padelnomics/web/tests/conftest.py

363 lines
13 KiB
Python

"""
Shared test fixtures for the Padelnomics test suite.
"""
import hashlib
import hmac
import sqlite3
import tempfile
import time
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import AsyncMock, patch
import aiosqlite
import pytest
from padelnomics.app import create_app
from padelnomics.migrations.migrate import migrate
from padelnomics import core
from padelnomics import sitemap as sitemap_mod
_SCHEMA_CACHE = None
def _get_schema_ddl():
"""Run all migrations once against a temp DB and cache the resulting DDL."""
global _SCHEMA_CACHE
if _SCHEMA_CACHE is not None:
return _SCHEMA_CACHE
tmp_db = str(Path(tempfile.mkdtemp()) / "schema.db")
migrate(tmp_db)
tmp_conn = sqlite3.connect(tmp_db)
rows = tmp_conn.execute(
"SELECT sql FROM sqlite_master"
" WHERE sql IS NOT NULL"
" AND name NOT LIKE 'sqlite_%'"
" AND name NOT LIKE '%_fts_%'" # FTS5 shadow tables (created by VIRTUAL TABLE)
" AND name != '_migrations'"
" ORDER BY rowid"
).fetchall()
tmp_conn.close()
_SCHEMA_CACHE = ";\n".join(r[0] for r in rows) + ";"
return _SCHEMA_CACHE
# ── Database ─────────────────────────────────────────────────
@pytest.fixture
async def db():
"""In-memory SQLite with full schema from replaying migrations."""
schema_ddl = _get_schema_ddl()
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON")
await conn.executescript(schema_ddl)
# Seed feature flags so routes that use feature_gate() pass through by default.
# Tests that specifically test gated behaviour set the flag to 0 via _set_flag().
await conn.executescript("""
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('markets', 1, 'Market/SEO content pages');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('payments', 0, 'Paddle billing & checkout');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('planner_export', 0, 'Business plan PDF export');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('supplier_signup', 0, 'Supplier onboarding wizard');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('lead_unlock', 0, 'Lead credit purchase & unlock');
""")
await conn.commit()
original_db = core._db
core._db = conn
# Clear sitemap cache so tests see fresh DB state
sitemap_mod._cache_xml = ""
sitemap_mod._cache_timestamp = 0.0
yield conn
core._db = original_db
await conn.close()
# ── App & client ─────────────────────────────────────────────
@pytest.fixture
async def app(db):
"""Quart app with DB already initialized (init_db/close_db patched to no-op)."""
with patch.object(core, "init_db", new_callable=AsyncMock), \
patch.object(core, "close_db", new_callable=AsyncMock):
application = create_app()
application.config["TESTING"] = True
yield application
@pytest.fixture
async def client(app):
"""Unauthenticated test client."""
async with app.test_client() as c:
yield c
# ── Users ────────────────────────────────────────────────────
@pytest.fixture
async def test_user(db):
"""Create a test user, return dict with id/email/name."""
now = datetime.now(UTC).isoformat()
async with db.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("test@example.com", "Test User", now),
) as cursor:
user_id = cursor.lastrowid
await db.commit()
return {"id": user_id, "email": "test@example.com", "name": "Test User"}
@pytest.fixture
async def auth_client(app, test_user):
"""Test client with session['user_id'] pre-set."""
async with app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
yield c
# ── Subscriptions ────────────────────────────────────────────
@pytest.fixture
def create_subscription(db):
"""Factory: create a subscription row for a user."""
async def _create(
user_id: int,
plan: str = "pro",
status: str = "active",
provider_customer_id: str = "ctm_test123",
provider_subscription_id: str = "sub_test456",
current_period_end: str = "2025-03-01T00:00:00Z",
) -> int:
now = datetime.now(UTC).isoformat()
# Create billing_customers record if provider_customer_id given
if provider_customer_id:
await db.execute(
"""INSERT OR IGNORE INTO billing_customers
(user_id, provider_customer_id, created_at) VALUES (?, ?, ?)""",
(user_id, provider_customer_id, now),
)
async with db.execute(
"""INSERT INTO subscriptions
(user_id, plan, status,
provider_subscription_id, current_period_end, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(user_id, plan, status, provider_subscription_id,
current_period_end, now, now),
) as cursor:
sub_id = cursor.lastrowid
await db.commit()
return sub_id
return _create
# ── Scenarios ────────────────────────────────────────────────
@pytest.fixture
async def scenario(db, test_user):
"""User scenario with valid planner state for PDF generation."""
import json
from padelnomics.planner.calculator import validate_state
state = validate_state({"dblCourts": 4, "sglCourts": 2})
now = datetime.now(UTC).isoformat()
async with db.execute(
"""INSERT INTO scenarios (user_id, name, state_json, created_at)
VALUES (?, 'Test Scenario', ?, ?)""",
(test_user["id"], json.dumps(state), now),
) as cursor:
scenario_id = cursor.lastrowid
await db.commit()
return {"id": scenario_id, "state": state, "user_id": test_user["id"]}
# ── Config ───────────────────────────────────────────────────
@pytest.fixture(autouse=True)
def patch_config():
"""Set test Paddle config values."""
original_values = {}
test_values = {
"PADDLE_API_KEY": "test_api_key_123",
"PADDLE_WEBHOOK_SECRET": "whsec_test_secret",
"RESEND_API_KEY": "", # never send real emails in tests
"BASE_URL": "http://localhost:5000",
"DEBUG": True,
"WAITLIST_MODE": False,
}
for key, val in test_values.items():
original_values[key] = getattr(core.config, key, None)
setattr(core.config, key, val)
yield
for key, val in original_values.items():
setattr(core.config, key, val)
# ── Webhook helpers ──────────────────────────────────────────
def make_webhook_payload(
event_type: str,
subscription_id: str = "sub_test456",
customer_id: str = "ctm_test123",
user_id: str = "1",
plan: str = "starter",
status: str = "active",
ends_at: str = "2025-03-01T00:00:00.000000Z",
) -> dict:
"""Build a Paddle webhook payload dict."""
return {
"event_type": event_type,
"data": {
"id": subscription_id,
"status": status,
"customer_id": customer_id,
"custom_data": {"user_id": user_id, "plan": plan},
"current_billing_period": {
"starts_at": "2025-02-01T00:00:00.000000Z",
"ends_at": ends_at,
},
},
}
def sign_payload(payload_bytes: bytes, secret: str = "whsec_test_secret") -> str:
"""Build a Paddle-format signature header: ts=<unix>;h1=<hmac_sha256>."""
ts = str(int(time.time()))
data = f"{ts}:{payload_bytes.decode()}".encode()
h1 = hmac.new(secret.encode(), data, hashlib.sha256).hexdigest()
return f"ts={ts};h1={h1}"
# ── Visual test fixtures (Playwright) ────────────────────────
# Session-scoped: one server + one browser for all visual tests.
_VISUAL_PORT = 5111
async def _seed_visual_data(conn):
"""Seed a supplier + feature flags for E2E billing/dashboard tests."""
await conn.execute(
"INSERT INTO users (id, email, created_at)"
" VALUES (999, 'supplier@test.com', datetime('now'))"
)
await conn.execute(
"INSERT INTO suppliers"
" (id, name, slug, tier, claimed_by, claimed_at,"
" country_code, region, category, credit_balance,"
" monthly_credits, contact_name, contact_email, created_at)"
" VALUES (1, 'Test Supplier GmbH', 'test-supplier', 'pro', 999,"
" datetime('now'), 'DE', 'Europe', 'construction', 50,"
" 10, 'Test User', 'supplier@test.com', datetime('now'))"
)
for flag in ("supplier_signup", "markets", "payments",
"planner_export", "lead_unlock"):
await conn.execute(
"INSERT OR REPLACE INTO feature_flags (name, enabled)"
" VALUES (?, 1)", (flag,)
)
await conn.commit()
def _run_visual_server(ready_event):
"""Run a Quart dev server in a subprocess for visual/E2E tests.
Forces RESEND_API_KEY="" so no real emails are sent.
Forces WAITLIST_MODE=false so feature pages render (not waitlist templates).
Runs migrations in-process to build the full schema (including FTS tables).
"""
import asyncio
import os
os.environ["RESEND_API_KEY"] = ""
os.environ["WAITLIST_MODE"] = "false"
# Config class attributes are evaluated at import time (before fork),
# so we must also patch the live config object directly.
core.config.WAITLIST_MODE = False
async def _serve():
# Build schema DDL in-process (FTS5 virtual tables need this)
tmp_db = str(Path(tempfile.mkdtemp()) / "schema.db")
migrate(tmp_db)
tmp_conn = sqlite3.connect(tmp_db)
rows = tmp_conn.execute(
"SELECT sql FROM sqlite_master"
" WHERE sql IS NOT NULL"
" AND name NOT LIKE 'sqlite_%'"
" AND name NOT LIKE '%_fts_%'"
" AND name != '_migrations'"
" ORDER BY rowid"
).fetchall()
tmp_conn.close()
schema_ddl = ";\n".join(r[0] for r in rows) + ";"
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON")
await conn.executescript(schema_ddl)
# Ensure feature_flags table exists (may be missed if an FTS5
# CREATE VIRTUAL TABLE causes executescript to stop early)
await conn.execute(
"CREATE TABLE IF NOT EXISTS feature_flags"
" (name TEXT PRIMARY KEY, enabled INTEGER NOT NULL DEFAULT 0,"
" description TEXT,"
" updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now')))"
)
# Seed data needed by E2E tests (supplier dashboard, billing, etc.)
await _seed_visual_data(conn)
core._db = conn
# Patch init_db/close_db where they're USED (app.py imports them
# locally via `from .core import init_db` — patching core.init_db
# alone doesn't affect the local binding in app.py).
# Patches must stay active through run_task() because before_serving
# hooks call init_db() which would replace our in-memory DB.
with patch("padelnomics.app.init_db", new_callable=AsyncMock), \
patch("padelnomics.app.close_db", new_callable=AsyncMock), \
patch("padelnomics.app.open_analytics_db"), \
patch("padelnomics.app.close_analytics_db"):
app = create_app()
app.config["TESTING"] = True
ready_event.set()
await app.run_task(host="127.0.0.1", port=_VISUAL_PORT)
asyncio.run(_serve())
@pytest.fixture(scope="session")
def live_server():
"""Start a live Quart server on port 5111 for all visual/E2E tests."""
import multiprocessing
ready = multiprocessing.Event()
proc = multiprocessing.Process(
target=_run_visual_server, args=(ready,), daemon=True
)
proc.start()
ready.wait(timeout=10)
time.sleep(1)
yield f"http://127.0.0.1:{_VISUAL_PORT}"
proc.terminate()
proc.join(timeout=5)
@pytest.fixture(scope="session")
def browser():
"""Launch a headless Chromium browser (once per test session)."""
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
b = p.chromium.launch(headless=True)
yield b
b.close()