Files
padelnomics/web/tests/conftest.py
Deeman e35e01edb1 test: add 18 e2e tests for billing, checkout, supplier signup/dashboard, export
- Pricing page (EN/DE, plan cards, no-auth access)
- Checkout success (auth required, renders for authed user)
- Supplier signup wizard (step 1, plan cards, DE variant, success page)
- Supplier dashboard (overview stats, boosts/credit packs, listing, leads tabs)
- Business plan export (auth required, form renders)

Also fixes:
- E2e server init_db mock scope — before_serving was calling real init_db
  outside the patch context, overwriting the in-memory DB (fixes 3
  pre-existing failures: markets_hub, markets_results, signup_page)
- Add _seed_billing_data() for supplier + feature flags in e2e server
- Mock RESEND_API_KEY="" in conftest + e2e server to prevent real emails

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 18:34:39 +01:00

240 lines
8.4 KiB
Python

"""
Shared test fixtures for the Padelnomics test suite.
"""
import hashlib
import hmac
import sqlite3
import tempfile
import time
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import AsyncMock, patch
import aiosqlite
import pytest
from padelnomics.app import create_app
from padelnomics.migrations.migrate import migrate
from padelnomics import core
from padelnomics import sitemap as sitemap_mod
_SCHEMA_CACHE = None
def _get_schema_ddl():
"""Run all migrations once against a temp DB and cache the resulting DDL."""
global _SCHEMA_CACHE
if _SCHEMA_CACHE is not None:
return _SCHEMA_CACHE
tmp_db = str(Path(tempfile.mkdtemp()) / "schema.db")
migrate(tmp_db)
tmp_conn = sqlite3.connect(tmp_db)
rows = tmp_conn.execute(
"SELECT sql FROM sqlite_master"
" WHERE sql IS NOT NULL"
" AND name NOT LIKE 'sqlite_%'"
" AND name NOT LIKE '%_fts_%'" # FTS5 shadow tables (created by VIRTUAL TABLE)
" AND name != '_migrations'"
" ORDER BY rowid"
).fetchall()
tmp_conn.close()
_SCHEMA_CACHE = ";\n".join(r[0] for r in rows) + ";"
return _SCHEMA_CACHE
# ── Database ─────────────────────────────────────────────────
@pytest.fixture
async def db():
"""In-memory SQLite with full schema from replaying migrations."""
schema_ddl = _get_schema_ddl()
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON")
await conn.executescript(schema_ddl)
# Seed feature flags so routes that use feature_gate() pass through by default.
# Tests that specifically test gated behaviour set the flag to 0 via _set_flag().
await conn.executescript("""
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('markets', 1, 'Market/SEO content pages');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('payments', 0, 'Paddle billing & checkout');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('planner_export', 0, 'Business plan PDF export');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('supplier_signup', 0, 'Supplier onboarding wizard');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('lead_unlock', 0, 'Lead credit purchase & unlock');
""")
await conn.commit()
original_db = core._db
core._db = conn
# Clear sitemap cache so tests see fresh DB state
sitemap_mod._cache_xml = ""
sitemap_mod._cache_timestamp = 0.0
yield conn
core._db = original_db
await conn.close()
# ── App & client ─────────────────────────────────────────────
@pytest.fixture
async def app(db):
"""Quart app with DB already initialized (init_db/close_db patched to no-op)."""
with patch.object(core, "init_db", new_callable=AsyncMock), \
patch.object(core, "close_db", new_callable=AsyncMock):
application = create_app()
application.config["TESTING"] = True
yield application
@pytest.fixture
async def client(app):
"""Unauthenticated test client."""
async with app.test_client() as c:
yield c
# ── Users ────────────────────────────────────────────────────
@pytest.fixture
async def test_user(db):
"""Create a test user, return dict with id/email/name."""
now = datetime.now(UTC).isoformat()
async with db.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("test@example.com", "Test User", now),
) as cursor:
user_id = cursor.lastrowid
await db.commit()
return {"id": user_id, "email": "test@example.com", "name": "Test User"}
@pytest.fixture
async def auth_client(app, test_user):
"""Test client with session['user_id'] pre-set."""
async with app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
yield c
# ── Subscriptions ────────────────────────────────────────────
@pytest.fixture
def create_subscription(db):
"""Factory: create a subscription row for a user."""
async def _create(
user_id: int,
plan: str = "pro",
status: str = "active",
provider_customer_id: str = "ctm_test123",
provider_subscription_id: str = "sub_test456",
current_period_end: str = "2025-03-01T00:00:00Z",
) -> int:
now = datetime.now(UTC).isoformat()
# Create billing_customers record if provider_customer_id given
if provider_customer_id:
await db.execute(
"""INSERT OR IGNORE INTO billing_customers
(user_id, provider_customer_id, created_at) VALUES (?, ?, ?)""",
(user_id, provider_customer_id, now),
)
async with db.execute(
"""INSERT INTO subscriptions
(user_id, plan, status,
provider_subscription_id, current_period_end, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(user_id, plan, status, provider_subscription_id,
current_period_end, now, now),
) as cursor:
sub_id = cursor.lastrowid
await db.commit()
return sub_id
return _create
# ── Scenarios ────────────────────────────────────────────────
@pytest.fixture
async def scenario(db, test_user):
"""User scenario with valid planner state for PDF generation."""
import json
from padelnomics.planner.calculator import validate_state
state = validate_state({"dblCourts": 4, "sglCourts": 2})
now = datetime.now(UTC).isoformat()
async with db.execute(
"""INSERT INTO scenarios (user_id, name, state_json, created_at)
VALUES (?, 'Test Scenario', ?, ?)""",
(test_user["id"], json.dumps(state), now),
) as cursor:
scenario_id = cursor.lastrowid
await db.commit()
return {"id": scenario_id, "state": state, "user_id": test_user["id"]}
# ── Config ───────────────────────────────────────────────────
@pytest.fixture(autouse=True)
def patch_config():
"""Set test Paddle config values."""
original_values = {}
test_values = {
"PADDLE_API_KEY": "test_api_key_123",
"PADDLE_WEBHOOK_SECRET": "whsec_test_secret",
"RESEND_API_KEY": "", # never send real emails in tests
"BASE_URL": "http://localhost:5000",
"DEBUG": True,
"WAITLIST_MODE": False,
}
for key, val in test_values.items():
original_values[key] = getattr(core.config, key, None)
setattr(core.config, key, val)
yield
for key, val in original_values.items():
setattr(core.config, key, val)
# ── Webhook helpers ──────────────────────────────────────────
def make_webhook_payload(
event_type: str,
subscription_id: str = "sub_test456",
customer_id: str = "ctm_test123",
user_id: str = "1",
plan: str = "starter",
status: str = "active",
ends_at: str = "2025-03-01T00:00:00.000000Z",
) -> dict:
"""Build a Paddle webhook payload dict."""
return {
"event_type": event_type,
"data": {
"id": subscription_id,
"status": status,
"customer_id": customer_id,
"custom_data": {"user_id": user_id, "plan": plan},
"current_billing_period": {
"starts_at": "2025-02-01T00:00:00.000000Z",
"ends_at": ends_at,
},
},
}
def sign_payload(payload_bytes: bytes, secret: str = "whsec_test_secret") -> str:
"""Build a Paddle-format signature header: ts=<unix>;h1=<hmac_sha256>."""
ts = str(int(time.time()))
data = f"{ts}:{payload_bytes.decode()}".encode()
h1 = hmac.new(secret.encode(), data, hashlib.sha256).hexdigest()
return f"ts={ts};h1={h1}"