- test_supervisor.py: 28 tests covering load_workflows, resolve_schedule, is_due, topological_waves, and proxy round-robin / sticky selection - test_feature_flags.py: 31 tests covering migration 0019, is_flag_enabled, feature_gate decorator, admin toggle routes, and full toggle e2e flows - conftest.py: seed feature flags with production defaults (markets=1, others=0) so all routes behave consistently in tests - Fix is_flag_enabled bug: replace non-existent db.execute_fetchone() with fetch_one() helper - Update 4 test_waitlist / test_businessplan tests that relied on WAITLIST_MODE patches — now enable the relevant DB flag instead Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
239 lines
8.4 KiB
Python
239 lines
8.4 KiB
Python
"""
|
|
Shared test fixtures for the Padelnomics test suite.
|
|
"""
|
|
import hashlib
|
|
import hmac
|
|
import sqlite3
|
|
import tempfile
|
|
import time
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import aiosqlite
|
|
import pytest
|
|
from padelnomics.app import create_app
|
|
from padelnomics.migrations.migrate import migrate
|
|
|
|
from padelnomics import core
|
|
from padelnomics import sitemap as sitemap_mod
|
|
|
|
_SCHEMA_CACHE = None
|
|
|
|
|
|
def _get_schema_ddl():
|
|
"""Run all migrations once against a temp DB and cache the resulting DDL."""
|
|
global _SCHEMA_CACHE
|
|
if _SCHEMA_CACHE is not None:
|
|
return _SCHEMA_CACHE
|
|
|
|
tmp_db = str(Path(tempfile.mkdtemp()) / "schema.db")
|
|
migrate(tmp_db)
|
|
tmp_conn = sqlite3.connect(tmp_db)
|
|
rows = tmp_conn.execute(
|
|
"SELECT sql FROM sqlite_master"
|
|
" WHERE sql IS NOT NULL"
|
|
" AND name NOT LIKE 'sqlite_%'"
|
|
" AND name NOT LIKE '%_fts_%'" # FTS5 shadow tables (created by VIRTUAL TABLE)
|
|
" AND name != '_migrations'"
|
|
" ORDER BY rowid"
|
|
).fetchall()
|
|
tmp_conn.close()
|
|
_SCHEMA_CACHE = ";\n".join(r[0] for r in rows) + ";"
|
|
return _SCHEMA_CACHE
|
|
|
|
|
|
# ── Database ─────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
async def db():
|
|
"""In-memory SQLite with full schema from replaying migrations."""
|
|
schema_ddl = _get_schema_ddl()
|
|
|
|
conn = await aiosqlite.connect(":memory:")
|
|
conn.row_factory = aiosqlite.Row
|
|
await conn.execute("PRAGMA foreign_keys=ON")
|
|
await conn.executescript(schema_ddl)
|
|
# Seed feature flags so routes that use feature_gate() pass through by default.
|
|
# Tests that specifically test gated behaviour set the flag to 0 via _set_flag().
|
|
await conn.executescript("""
|
|
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
|
|
VALUES ('markets', 1, 'Market/SEO content pages');
|
|
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
|
|
VALUES ('payments', 0, 'Paddle billing & checkout');
|
|
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
|
|
VALUES ('planner_export', 0, 'Business plan PDF export');
|
|
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
|
|
VALUES ('supplier_signup', 0, 'Supplier onboarding wizard');
|
|
INSERT OR IGNORE INTO feature_flags (name, enabled, description)
|
|
VALUES ('lead_unlock', 0, 'Lead credit purchase & unlock');
|
|
""")
|
|
await conn.commit()
|
|
|
|
original_db = core._db
|
|
core._db = conn
|
|
# Clear sitemap cache so tests see fresh DB state
|
|
sitemap_mod._cache_xml = ""
|
|
sitemap_mod._cache_timestamp = 0.0
|
|
|
|
yield conn
|
|
|
|
core._db = original_db
|
|
await conn.close()
|
|
|
|
|
|
# ── App & client ─────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
async def app(db):
|
|
"""Quart app with DB already initialized (init_db/close_db patched to no-op)."""
|
|
with patch.object(core, "init_db", new_callable=AsyncMock), \
|
|
patch.object(core, "close_db", new_callable=AsyncMock):
|
|
application = create_app()
|
|
application.config["TESTING"] = True
|
|
yield application
|
|
|
|
|
|
@pytest.fixture
|
|
async def client(app):
|
|
"""Unauthenticated test client."""
|
|
async with app.test_client() as c:
|
|
yield c
|
|
|
|
|
|
# ── Users ────────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
async def test_user(db):
|
|
"""Create a test user, return dict with id/email/name."""
|
|
now = datetime.now(UTC).isoformat()
|
|
async with db.execute(
|
|
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
|
|
("test@example.com", "Test User", now),
|
|
) as cursor:
|
|
user_id = cursor.lastrowid
|
|
await db.commit()
|
|
return {"id": user_id, "email": "test@example.com", "name": "Test User"}
|
|
|
|
|
|
@pytest.fixture
|
|
async def auth_client(app, test_user):
|
|
"""Test client with session['user_id'] pre-set."""
|
|
async with app.test_client() as c:
|
|
async with c.session_transaction() as sess:
|
|
sess["user_id"] = test_user["id"]
|
|
yield c
|
|
|
|
|
|
# ── Subscriptions ────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
def create_subscription(db):
|
|
"""Factory: create a subscription row for a user."""
|
|
async def _create(
|
|
user_id: int,
|
|
plan: str = "pro",
|
|
status: str = "active",
|
|
provider_customer_id: str = "ctm_test123",
|
|
provider_subscription_id: str = "sub_test456",
|
|
current_period_end: str = "2025-03-01T00:00:00Z",
|
|
) -> int:
|
|
now = datetime.now(UTC).isoformat()
|
|
# Create billing_customers record if provider_customer_id given
|
|
if provider_customer_id:
|
|
await db.execute(
|
|
"""INSERT OR IGNORE INTO billing_customers
|
|
(user_id, provider_customer_id, created_at) VALUES (?, ?, ?)""",
|
|
(user_id, provider_customer_id, now),
|
|
)
|
|
async with db.execute(
|
|
"""INSERT INTO subscriptions
|
|
(user_id, plan, status,
|
|
provider_subscription_id, current_period_end, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(user_id, plan, status, provider_subscription_id,
|
|
current_period_end, now, now),
|
|
) as cursor:
|
|
sub_id = cursor.lastrowid
|
|
await db.commit()
|
|
return sub_id
|
|
return _create
|
|
|
|
|
|
# ── Scenarios ────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
async def scenario(db, test_user):
|
|
"""User scenario with valid planner state for PDF generation."""
|
|
import json
|
|
|
|
from padelnomics.planner.calculator import validate_state
|
|
state = validate_state({"dblCourts": 4, "sglCourts": 2})
|
|
now = datetime.now(UTC).isoformat()
|
|
async with db.execute(
|
|
"""INSERT INTO scenarios (user_id, name, state_json, created_at)
|
|
VALUES (?, 'Test Scenario', ?, ?)""",
|
|
(test_user["id"], json.dumps(state), now),
|
|
) as cursor:
|
|
scenario_id = cursor.lastrowid
|
|
await db.commit()
|
|
return {"id": scenario_id, "state": state, "user_id": test_user["id"]}
|
|
|
|
|
|
# ── Config ───────────────────────────────────────────────────
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def patch_config():
|
|
"""Set test Paddle config values."""
|
|
original_values = {}
|
|
test_values = {
|
|
"PADDLE_API_KEY": "test_api_key_123",
|
|
"PADDLE_WEBHOOK_SECRET": "whsec_test_secret",
|
|
"BASE_URL": "http://localhost:5000",
|
|
"DEBUG": True,
|
|
"WAITLIST_MODE": False,
|
|
}
|
|
for key, val in test_values.items():
|
|
original_values[key] = getattr(core.config, key, None)
|
|
setattr(core.config, key, val)
|
|
|
|
yield
|
|
|
|
for key, val in original_values.items():
|
|
setattr(core.config, key, val)
|
|
|
|
|
|
# ── Webhook helpers ──────────────────────────────────────────
|
|
|
|
def make_webhook_payload(
|
|
event_type: str,
|
|
subscription_id: str = "sub_test456",
|
|
customer_id: str = "ctm_test123",
|
|
user_id: str = "1",
|
|
plan: str = "starter",
|
|
status: str = "active",
|
|
ends_at: str = "2025-03-01T00:00:00.000000Z",
|
|
) -> dict:
|
|
"""Build a Paddle webhook payload dict."""
|
|
return {
|
|
"event_type": event_type,
|
|
"data": {
|
|
"id": subscription_id,
|
|
"status": status,
|
|
"customer_id": customer_id,
|
|
"custom_data": {"user_id": user_id, "plan": plan},
|
|
"current_billing_period": {
|
|
"starts_at": "2025-02-01T00:00:00.000000Z",
|
|
"ends_at": ends_at,
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
def sign_payload(payload_bytes: bytes, secret: str = "whsec_test_secret") -> str:
|
|
"""Build a Paddle-format signature header: ts=<unix>;h1=<hmac_sha256>."""
|
|
ts = str(int(time.time()))
|
|
data = f"{ts}:{payload_bytes.decode()}".encode()
|
|
h1 = hmac.new(secret.encode(), data, hashlib.sha256).hexdigest()
|
|
return f"ts={ts};h1={h1}"
|