refactor: flatten padelnomics/padelnomics/ → repo root
git mv all tracked files from the nested padelnomics/ workspace directory to the git repo root. Merged .gitignore files. No code changes — pure path rename. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
199
web/tests/conftest.py
Normal file
199
web/tests/conftest.py
Normal file
@@ -0,0 +1,199 @@
|
||||
"""
|
||||
Shared test fixtures for the Padelnomics test suite.
|
||||
"""
|
||||
import hashlib
|
||||
import hmac
|
||||
import sqlite3
|
||||
import tempfile
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import aiosqlite
|
||||
import pytest
|
||||
from padelnomics import core
|
||||
from padelnomics.app import create_app
|
||||
from padelnomics.migrations.migrate import migrate
|
||||
|
||||
_SCHEMA_CACHE = None
|
||||
|
||||
|
||||
def _get_schema_ddl():
|
||||
"""Run all migrations once against a temp DB and cache the resulting DDL."""
|
||||
global _SCHEMA_CACHE
|
||||
if _SCHEMA_CACHE is not None:
|
||||
return _SCHEMA_CACHE
|
||||
|
||||
tmp_db = str(Path(tempfile.mkdtemp()) / "schema.db")
|
||||
migrate(tmp_db)
|
||||
tmp_conn = sqlite3.connect(tmp_db)
|
||||
rows = tmp_conn.execute(
|
||||
"SELECT sql FROM sqlite_master"
|
||||
" WHERE sql IS NOT NULL"
|
||||
" AND name NOT LIKE 'sqlite_%'"
|
||||
" AND name NOT LIKE '%_fts_%'" # FTS5 shadow tables (created by VIRTUAL TABLE)
|
||||
" AND name != '_migrations'"
|
||||
" ORDER BY rowid"
|
||||
).fetchall()
|
||||
tmp_conn.close()
|
||||
_SCHEMA_CACHE = ";\n".join(r[0] for r in rows) + ";"
|
||||
return _SCHEMA_CACHE
|
||||
|
||||
|
||||
# ── Database ─────────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture
|
||||
async def db():
|
||||
"""In-memory SQLite with full schema from replaying migrations."""
|
||||
schema_ddl = _get_schema_ddl()
|
||||
|
||||
conn = await aiosqlite.connect(":memory:")
|
||||
conn.row_factory = aiosqlite.Row
|
||||
await conn.execute("PRAGMA foreign_keys=ON")
|
||||
await conn.executescript(schema_ddl)
|
||||
await conn.commit()
|
||||
|
||||
original_db = core._db
|
||||
core._db = conn
|
||||
|
||||
yield conn
|
||||
|
||||
core._db = original_db
|
||||
await conn.close()
|
||||
|
||||
|
||||
# ── App & client ─────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture
|
||||
async def app(db):
|
||||
"""Quart app with DB already initialized (init_db/close_db patched to no-op)."""
|
||||
with patch.object(core, "init_db", new_callable=AsyncMock), \
|
||||
patch.object(core, "close_db", new_callable=AsyncMock):
|
||||
application = create_app()
|
||||
application.config["TESTING"] = True
|
||||
yield application
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def client(app):
|
||||
"""Unauthenticated test client."""
|
||||
async with app.test_client() as c:
|
||||
yield c
|
||||
|
||||
|
||||
# ── Users ────────────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture
|
||||
async def test_user(db):
|
||||
"""Create a test user, return dict with id/email/name."""
|
||||
now = datetime.utcnow().isoformat()
|
||||
async with db.execute(
|
||||
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
|
||||
("test@example.com", "Test User", now),
|
||||
) as cursor:
|
||||
user_id = cursor.lastrowid
|
||||
await db.commit()
|
||||
return {"id": user_id, "email": "test@example.com", "name": "Test User"}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def auth_client(app, test_user):
|
||||
"""Test client with session['user_id'] pre-set."""
|
||||
async with app.test_client() as c:
|
||||
async with c.session_transaction() as sess:
|
||||
sess["user_id"] = test_user["id"]
|
||||
yield c
|
||||
|
||||
|
||||
# ── Subscriptions ────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture
|
||||
def create_subscription(db):
|
||||
"""Factory: create a subscription row for a user."""
|
||||
async def _create(
|
||||
user_id: int,
|
||||
plan: str = "pro",
|
||||
status: str = "active",
|
||||
provider_customer_id: str = "ctm_test123",
|
||||
provider_subscription_id: str = "sub_test456",
|
||||
current_period_end: str = "2025-03-01T00:00:00Z",
|
||||
) -> int:
|
||||
now = datetime.utcnow().isoformat()
|
||||
# Create billing_customers record if provider_customer_id given
|
||||
if provider_customer_id:
|
||||
await db.execute(
|
||||
"""INSERT OR IGNORE INTO billing_customers
|
||||
(user_id, provider_customer_id, created_at) VALUES (?, ?, ?)""",
|
||||
(user_id, provider_customer_id, now),
|
||||
)
|
||||
async with db.execute(
|
||||
"""INSERT INTO subscriptions
|
||||
(user_id, plan, status,
|
||||
provider_subscription_id, current_period_end, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||
(user_id, plan, status, provider_subscription_id,
|
||||
current_period_end, now, now),
|
||||
) as cursor:
|
||||
sub_id = cursor.lastrowid
|
||||
await db.commit()
|
||||
return sub_id
|
||||
return _create
|
||||
|
||||
|
||||
# ── Config ───────────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_config():
|
||||
"""Set test Paddle config values."""
|
||||
original_values = {}
|
||||
test_values = {
|
||||
"PADDLE_API_KEY": "test_api_key_123",
|
||||
"PADDLE_WEBHOOK_SECRET": "whsec_test_secret",
|
||||
"BASE_URL": "http://localhost:5000",
|
||||
"DEBUG": True,
|
||||
"WAITLIST_MODE": False,
|
||||
}
|
||||
for key, val in test_values.items():
|
||||
original_values[key] = getattr(core.config, key, None)
|
||||
setattr(core.config, key, val)
|
||||
|
||||
yield
|
||||
|
||||
for key, val in original_values.items():
|
||||
setattr(core.config, key, val)
|
||||
|
||||
|
||||
# ── Webhook helpers ──────────────────────────────────────────
|
||||
|
||||
def make_webhook_payload(
|
||||
event_type: str,
|
||||
subscription_id: str = "sub_test456",
|
||||
customer_id: str = "ctm_test123",
|
||||
user_id: str = "1",
|
||||
plan: str = "starter",
|
||||
status: str = "active",
|
||||
ends_at: str = "2025-03-01T00:00:00.000000Z",
|
||||
) -> dict:
|
||||
"""Build a Paddle webhook payload dict."""
|
||||
return {
|
||||
"event_type": event_type,
|
||||
"data": {
|
||||
"id": subscription_id,
|
||||
"status": status,
|
||||
"customer_id": customer_id,
|
||||
"custom_data": {"user_id": user_id, "plan": plan},
|
||||
"current_billing_period": {
|
||||
"starts_at": "2025-02-01T00:00:00.000000Z",
|
||||
"ends_at": ends_at,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def sign_payload(payload_bytes: bytes, secret: str = "whsec_test_secret") -> str:
|
||||
"""Build a Paddle-format signature header: ts=<unix>;h1=<hmac_sha256>."""
|
||||
ts = str(int(time.time()))
|
||||
data = f"{ts}:{payload_bytes.decode()}".encode()
|
||||
h1 = hmac.new(secret.encode(), data, hashlib.sha256).hexdigest()
|
||||
return f"ts={ts};h1={h1}"
|
||||
Reference in New Issue
Block a user