Files
padelnomics/web/tests/conftest.py
Deeman b5c9a4e573 test: e2e + unit tests for supervisor, proxy, and feature flags
- test_supervisor.py: 28 tests covering load_workflows, resolve_schedule,
  is_due, topological_waves, and proxy round-robin / sticky selection
- test_feature_flags.py: 31 tests covering migration 0019, is_flag_enabled,
  feature_gate decorator, admin toggle routes, and full toggle e2e flows
- conftest.py: seed feature flags with production defaults (markets=1,
  others=0) so all routes behave consistently in tests
- Fix is_flag_enabled bug: replace non-existent db.execute_fetchone()
  with fetch_one() helper
- Update 4 test_waitlist / test_businessplan tests that relied on
  WAITLIST_MODE patches — now enable the relevant DB flag instead

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 15:26:40 +01:00

239 lines
8.4 KiB
Python

"""
Shared test fixtures for the Padelnomics test suite.
"""
import hashlib
import hmac
import sqlite3
import tempfile
import time
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import AsyncMock, patch
import aiosqlite
import pytest
from padelnomics.app import create_app
from padelnomics.migrations.migrate import migrate
from padelnomics import core
from padelnomics import sitemap as sitemap_mod
_SCHEMA_CACHE = None
def _get_schema_ddl():
"""Run all migrations once against a temp DB and cache the resulting DDL."""
global _SCHEMA_CACHE
if _SCHEMA_CACHE is not None:
return _SCHEMA_CACHE
tmp_db = str(Path(tempfile.mkdtemp()) / "schema.db")
migrate(tmp_db)
tmp_conn = sqlite3.connect(tmp_db)
rows = tmp_conn.execute(
"SELECT sql FROM sqlite_master"
" WHERE sql IS NOT NULL"
" AND name NOT LIKE 'sqlite_%'"
" AND name NOT LIKE '%_fts_%'" # FTS5 shadow tables (created by VIRTUAL TABLE)
" AND name != '_migrations'"
" ORDER BY rowid"
).fetchall()
tmp_conn.close()
_SCHEMA_CACHE = ";\n".join(r[0] for r in rows) + ";"
return _SCHEMA_CACHE
# ── Database ─────────────────────────────────────────────────
@pytest.fixture
async def db():
"""In-memory SQLite with full schema from replaying migrations."""
schema_ddl = _get_schema_ddl()
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON")
await conn.executescript(schema_ddl)
# Seed feature flags so routes that use feature_gate() pass through by default.
# Tests that specifically test gated behaviour set the flag to 0 via _set_flag().
await conn.executescript("""
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('markets', 1, 'Market/SEO content pages');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('payments', 0, 'Paddle billing & checkout');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('planner_export', 0, 'Business plan PDF export');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('supplier_signup', 0, 'Supplier onboarding wizard');
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
VALUES ('lead_unlock', 0, 'Lead credit purchase & unlock');
""")
await conn.commit()
original_db = core._db
core._db = conn
# Clear sitemap cache so tests see fresh DB state
sitemap_mod._cache_xml = ""
sitemap_mod._cache_timestamp = 0.0
yield conn
core._db = original_db
await conn.close()
# ── App & client ─────────────────────────────────────────────
@pytest.fixture
async def app(db):
"""Quart app with DB already initialized (init_db/close_db patched to no-op)."""
with patch.object(core, "init_db", new_callable=AsyncMock), \
patch.object(core, "close_db", new_callable=AsyncMock):
application = create_app()
application.config["TESTING"] = True
yield application
@pytest.fixture
async def client(app):
"""Unauthenticated test client."""
async with app.test_client() as c:
yield c
# ── Users ────────────────────────────────────────────────────
@pytest.fixture
async def test_user(db):
"""Create a test user, return dict with id/email/name."""
now = datetime.now(UTC).isoformat()
async with db.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("test@example.com", "Test User", now),
) as cursor:
user_id = cursor.lastrowid
await db.commit()
return {"id": user_id, "email": "test@example.com", "name": "Test User"}
@pytest.fixture
async def auth_client(app, test_user):
"""Test client with session['user_id'] pre-set."""
async with app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
yield c
# ── Subscriptions ────────────────────────────────────────────
@pytest.fixture
def create_subscription(db):
"""Factory: create a subscription row for a user."""
async def _create(
user_id: int,
plan: str = "pro",
status: str = "active",
provider_customer_id: str = "ctm_test123",
provider_subscription_id: str = "sub_test456",
current_period_end: str = "2025-03-01T00:00:00Z",
) -> int:
now = datetime.now(UTC).isoformat()
# Create billing_customers record if provider_customer_id given
if provider_customer_id:
await db.execute(
"""INSERT OR IGNORE INTO billing_customers
(user_id, provider_customer_id, created_at) VALUES (?, ?, ?)""",
(user_id, provider_customer_id, now),
)
async with db.execute(
"""INSERT INTO subscriptions
(user_id, plan, status,
provider_subscription_id, current_period_end, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(user_id, plan, status, provider_subscription_id,
current_period_end, now, now),
) as cursor:
sub_id = cursor.lastrowid
await db.commit()
return sub_id
return _create
# ── Scenarios ────────────────────────────────────────────────
@pytest.fixture
async def scenario(db, test_user):
"""User scenario with valid planner state for PDF generation."""
import json
from padelnomics.planner.calculator import validate_state
state = validate_state({"dblCourts": 4, "sglCourts": 2})
now = datetime.now(UTC).isoformat()
async with db.execute(
"""INSERT INTO scenarios (user_id, name, state_json, created_at)
VALUES (?, 'Test Scenario', ?, ?)""",
(test_user["id"], json.dumps(state), now),
) as cursor:
scenario_id = cursor.lastrowid
await db.commit()
return {"id": scenario_id, "state": state, "user_id": test_user["id"]}
# ── Config ───────────────────────────────────────────────────
@pytest.fixture(autouse=True)
def patch_config():
"""Set test Paddle config values."""
original_values = {}
test_values = {
"PADDLE_API_KEY": "test_api_key_123",
"PADDLE_WEBHOOK_SECRET": "whsec_test_secret",
"BASE_URL": "http://localhost:5000",
"DEBUG": True,
"WAITLIST_MODE": False,
}
for key, val in test_values.items():
original_values[key] = getattr(core.config, key, None)
setattr(core.config, key, val)
yield
for key, val in original_values.items():
setattr(core.config, key, val)
# ── Webhook helpers ──────────────────────────────────────────
def make_webhook_payload(
event_type: str,
subscription_id: str = "sub_test456",
customer_id: str = "ctm_test123",
user_id: str = "1",
plan: str = "starter",
status: str = "active",
ends_at: str = "2025-03-01T00:00:00.000000Z",
) -> dict:
"""Build a Paddle webhook payload dict."""
return {
"event_type": event_type,
"data": {
"id": subscription_id,
"status": status,
"customer_id": customer_id,
"custom_data": {"user_id": user_id, "plan": plan},
"current_billing_period": {
"starts_at": "2025-02-01T00:00:00.000000Z",
"ends_at": ends_at,
},
},
}
def sign_payload(payload_bytes: bytes, secret: str = "whsec_test_secret") -> str:
"""Build a Paddle-format signature header: ts=<unix>;h1=<hmac_sha256>."""
ts = str(int(time.time()))
data = f"{ts}:{payload_bytes.decode()}".encode()
h1 = hmac.new(secret.encode(), data, hashlib.sha256).hexdigest()
return f"ts={ts};h1={h1}"