switch payment provider from LemonSqueezy to Paddle

Run copier update with payment_provider=paddle to switch all billing
integration: routes, config, schema columns, webhook handling, tests,
and CI deploy secrets. Add one-time migration script for renaming
lemonsqueezy_* columns to paddle_* in production DB.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-16 10:40:40 +01:00
parent 53ca195a49
commit 25d06a80d5
11 changed files with 406 additions and 771 deletions

View File

@@ -1,10 +1,10 @@
# Changes here will be overwritten by Copier; NEVER EDIT MANUALLY
_commit: c920923
_commit: v0.2.0
_src_path: /home/Deeman/Projects/quart_saas_boilerplate
author_email: ''
author_name: ''
base_url: https://padelnomics.io
description: Plan, finance, and build your padel business
payment_provider: lemonsqueezy
payment_provider: paddle
project_name: Padelnomics
project_slug: padelnomics

View File

@@ -9,7 +9,7 @@ test:
- pip install uv
script:
- cd padelnomics && uv sync
- uv run -m pytest tests/ -x -q
- uv run pytest tests/ -x -q
- uv run ruff check src/ tests/
rules:
- if: $CI_COMMIT_BRANCH == "master"
@@ -29,4 +29,25 @@ deploy:
- chmod 700 ~/.ssh
- echo "$SSH_KNOWN_HOSTS" >> ~/.ssh/known_hosts
script:
- |
ssh "$DEPLOY_USER@$DEPLOY_HOST" "cat > /opt/padelnomics/padelnomics/.env" << ENVEOF
APP_NAME=$APP_NAME
SECRET_KEY=$SECRET_KEY
BASE_URL=$BASE_URL
DEBUG=false
ADMIN_PASSWORD=$ADMIN_PASSWORD
DATABASE_PATH=data/app.db
MAGIC_LINK_EXPIRY_MINUTES=${MAGIC_LINK_EXPIRY_MINUTES:-15}
SESSION_LIFETIME_DAYS=${SESSION_LIFETIME_DAYS:-30}
RESEND_API_KEY=$RESEND_API_KEY
EMAIL_FROM=${EMAIL_FROM:-hello@example.com}
ADMIN_EMAIL=${ADMIN_EMAIL:-}
RATE_LIMIT_REQUESTS=${RATE_LIMIT_REQUESTS:-100}
RATE_LIMIT_WINDOW=${RATE_LIMIT_WINDOW:-60}
PADDLE_API_KEY=$PADDLE_API_KEY
PADDLE_WEBHOOK_SECRET=$PADDLE_WEBHOOK_SECRET
PADDLE_PRICE_STARTER=$PADDLE_PRICE_STARTER
PADDLE_PRICE_PRO=$PADDLE_PRICE_PRO
ENVEOF
- ssh "$DEPLOY_USER@$DEPLOY_HOST" "chmod 600 /opt/padelnomics/padelnomics/.env"
- ssh "$DEPLOY_USER@$DEPLOY_HOST" "cd /opt/padelnomics && git pull origin master && ./deploy.sh"

View File

@@ -17,12 +17,11 @@ RESEND_API_KEY=
EMAIL_FROM=hello@padelnomics.io
ADMIN_EMAIL=leads@padelnomics.io
# LemonSqueezy
LEMONSQUEEZY_API_KEY=
LEMONSQUEEZY_STORE_ID=
LEMONSQUEEZY_WEBHOOK_SECRET=
LEMONSQUEEZY_MONTHLY_VARIANT_ID=
LEMONSQUEEZY_YEARLY_VARIANT_ID=
# Paddle
PADDLE_API_KEY=
PADDLE_WEBHOOK_SECRET=
PADDLE_PRICE_STARTER=
PADDLE_PRICE_PRO=
# Rate limiting
RATE_LIMIT_REQUESTS=100

View File

@@ -0,0 +1,19 @@
"""One-time migration: rename lemonsqueezy columns to paddle."""
import sqlite3
import sys
db_path = sys.argv[1] if len(sys.argv) > 1 else "data/app.db"
conn = sqlite3.connect(db_path)
conn.execute(
"ALTER TABLE subscriptions RENAME COLUMN lemonsqueezy_customer_id TO paddle_customer_id"
)
conn.execute(
"ALTER TABLE subscriptions RENAME COLUMN lemonsqueezy_subscription_id TO paddle_subscription_id"
)
conn.execute("DROP INDEX IF EXISTS idx_subscriptions_provider")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_subscriptions_provider ON subscriptions(paddle_subscription_id)"
)
conn.commit()
conn.close()
print(f"Migration complete: {db_path}")

View File

@@ -1,6 +1,6 @@
"""
Billing domain: checkout, webhooks, subscription management.
Payment provider: lemonsqueezy
Payment provider: paddle
"""
import json
@@ -46,8 +46,8 @@ async def upsert_subscription(
"""Create or update subscription."""
now = datetime.utcnow().isoformat()
customer_col = "lemonsqueezy_customer_id"
subscription_col = "lemonsqueezy_subscription_id"
customer_col = "paddle_customer_id"
subscription_col = "paddle_subscription_id"
existing = await fetch_one("SELECT id FROM subscriptions WHERE user_id = ?", (user_id,))
@@ -76,7 +76,7 @@ async def upsert_subscription(
async def get_subscription_by_provider_id(subscription_id: str) -> dict | None:
return await fetch_one(
"SELECT * FROM subscriptions WHERE lemonsqueezy_subscription_id = ?",
"SELECT * FROM subscriptions WHERE paddle_subscription_id = ?",
(subscription_id,)
)
@@ -90,9 +90,7 @@ async def update_subscription_status(provider_subscription_id: str, status: str,
values = list(extra.values())
values.append(provider_subscription_id)
await execute(
f"UPDATE subscriptions SET {sets} WHERE lemonsqueezy_subscription_id = ?", tuple(values)
)
await execute(f"UPDATE subscriptions SET {sets} WHERE paddle_subscription_id = ?", tuple(values))
@@ -154,196 +152,117 @@ async def success():
# =============================================================================
# LemonSqueezy Implementation
# Paddle Implementation
# =============================================================================
VARIANT_TO_PLAN: dict = {}
def _get_variant_map() -> dict:
if not VARIANT_TO_PLAN:
VARIANT_TO_PLAN[config.LEMONSQUEEZY_MONTHLY_VARIANT_ID] = "pro"
VARIANT_TO_PLAN[config.LEMONSQUEEZY_YEARLY_VARIANT_ID] = "pro"
return VARIANT_TO_PLAN
def determine_plan(variant_id) -> str:
return _get_variant_map().get(str(variant_id), "free")
@bp.route("/checkout/<plan>")
@bp.route("/checkout/<plan>", methods=["POST"])
@login_required
async def checkout(plan: str):
"""Create LemonSqueezy checkout."""
variant_id = {
"monthly": config.LEMONSQUEEZY_MONTHLY_VARIANT_ID,
"yearly": config.LEMONSQUEEZY_YEARLY_VARIANT_ID,
}.get(plan)
if not variant_id:
"""Create Paddle checkout via API."""
price_id = config.PADDLE_PRICES.get(plan)
if not price_id:
await flash("Invalid plan selected.", "error")
return redirect(url_for("billing.pricing"))
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.lemonsqueezy.com/v1/checkouts",
"https://api.paddle.com/transactions",
headers={
"Authorization": f"Bearer {config.LEMONSQUEEZY_API_KEY}",
"Content-Type": "application/vnd.api+json",
"Accept": "application/vnd.api+json",
"Authorization": f"Bearer {config.PADDLE_API_KEY}",
"Content-Type": "application/json",
},
json={
"data": {
"type": "checkouts",
"attributes": {
"checkout_data": {
"email": g.user["email"],
"custom": {"user_id": str(g.user["id"])},
},
"product_options": {
"redirect_url": f"{config.BASE_URL}/billing/success",
},
},
"relationships": {
"store": {
"data": {"type": "stores", "id": config.LEMONSQUEEZY_STORE_ID}
},
"variant": {
"data": {"type": "variants", "id": variant_id}
},
},
}
"items": [{"price_id": price_id, "quantity": 1}],
"custom_data": {"user_id": str(g.user["id"]), "plan": plan},
"checkout": {
"url": f"{config.BASE_URL}/billing/success",
},
},
)
response.raise_for_status()
checkout_url = response.json()["data"]["attributes"]["url"]
# Return URL for Lemon.js overlay, or redirect for non-JS
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
return jsonify({"checkout_url": checkout_url})
checkout_url = response.json()["data"]["checkout"]["url"]
return redirect(checkout_url)
@bp.route("/manage", methods=["POST"])
@login_required
async def manage():
"""Redirect to LemonSqueezy customer portal."""
"""Redirect to Paddle customer portal."""
sub = await get_subscription(g.user["id"])
if not sub or not sub.get("lemonsqueezy_subscription_id"):
if not sub or not sub.get("paddle_subscription_id"):
await flash("No active subscription found.", "error")
return redirect(url_for("dashboard.settings"))
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.lemonsqueezy.com/v1/subscriptions/{sub['lemonsqueezy_subscription_id']}",
headers={
"Authorization": f"Bearer {config.LEMONSQUEEZY_API_KEY}",
"Accept": "application/vnd.api+json",
},
f"https://api.paddle.com/subscriptions/{sub['paddle_subscription_id']}",
headers={"Authorization": f"Bearer {config.PADDLE_API_KEY}"},
)
response.raise_for_status()
portal_url = response.json()["data"]["attributes"]["urls"]["customer_portal"]
portal_url = response.json()["data"]["management_urls"]["update_payment_method"]
return redirect(portal_url)
@bp.route("/cancel", methods=["POST"])
@login_required
async def cancel():
"""Cancel subscription via LemonSqueezy API."""
"""Cancel subscription via Paddle API."""
sub = await get_subscription(g.user["id"])
if sub and sub.get("lemonsqueezy_subscription_id"):
if sub and sub.get("paddle_subscription_id"):
async with httpx.AsyncClient() as client:
await client.patch(
f"https://api.lemonsqueezy.com/v1/subscriptions/{sub['lemonsqueezy_subscription_id']}",
await client.post(
f"https://api.paddle.com/subscriptions/{sub['paddle_subscription_id']}/cancel",
headers={
"Authorization": f"Bearer {config.LEMONSQUEEZY_API_KEY}",
"Content-Type": "application/vnd.api+json",
"Accept": "application/vnd.api+json",
},
json={
"data": {
"type": "subscriptions",
"id": sub["lemonsqueezy_subscription_id"],
"attributes": {"cancelled": True},
}
"Authorization": f"Bearer {config.PADDLE_API_KEY}",
"Content-Type": "application/json",
},
json={"effective_from": "next_billing_period"},
)
return redirect(url_for("dashboard.settings"))
@bp.route("/resume", methods=["POST"])
@login_required
async def resume():
"""Resume a cancelled subscription before period end."""
sub = await get_subscription(g.user["id"])
if sub and sub.get("lemonsqueezy_subscription_id"):
async with httpx.AsyncClient() as client:
await client.patch(
f"https://api.lemonsqueezy.com/v1/subscriptions/{sub['lemonsqueezy_subscription_id']}",
headers={
"Authorization": f"Bearer {config.LEMONSQUEEZY_API_KEY}",
"Content-Type": "application/vnd.api+json",
"Accept": "application/vnd.api+json",
},
json={
"data": {
"type": "subscriptions",
"id": sub["lemonsqueezy_subscription_id"],
"attributes": {"cancelled": False},
}
},
)
return redirect(url_for("dashboard.settings"))
@bp.route("/webhook/lemonsqueezy", methods=["POST"])
@bp.route("/webhook/paddle", methods=["POST"])
async def webhook():
"""Handle LemonSqueezy webhooks."""
"""Handle Paddle webhooks."""
payload = await request.get_data()
signature = request.headers.get("X-Signature", "")
sig = request.headers.get("Paddle-Signature", "")
if not verify_hmac_signature(payload, signature, config.LEMONSQUEEZY_WEBHOOK_SECRET):
return jsonify({"error": "Invalid signature"}), 401
if not verify_hmac_signature(payload, sig, config.PADDLE_WEBHOOK_SECRET):
return jsonify({"error": "Invalid signature"}), 400
event = json.loads(payload)
event_name = event["meta"]["event_name"]
custom_data = event["meta"].get("custom_data", {})
event_type = event.get("event_type")
data = event.get("data", {})
custom_data = data.get("custom_data", {})
user_id = custom_data.get("user_id")
data = event["data"]
attrs = data["attributes"]
if event_name == "subscription_created":
if event_type == "subscription.activated":
plan = custom_data.get("plan", "starter")
await upsert_subscription(
user_id=int(user_id) if user_id else 0,
plan=determine_plan(attrs.get("variant_id")),
status=attrs["status"],
provider_customer_id=str(attrs["customer_id"]),
provider_subscription_id=data["id"],
current_period_end=attrs.get("renews_at"),
plan=plan,
status="active",
provider_customer_id=str(data.get("customer_id", "")),
provider_subscription_id=data.get("id", ""),
current_period_end=data.get("current_billing_period", {}).get("ends_at"),
)
elif event_name in ("subscription_updated", "subscription_payment_success"):
elif event_type == "subscription.updated":
await update_subscription_status(
data["id"],
status=attrs["status"],
plan=determine_plan(attrs.get("variant_id")),
current_period_end=attrs.get("renews_at"),
data.get("id", ""),
status=data.get("status", "active"),
current_period_end=data.get("current_billing_period", {}).get("ends_at"),
)
elif event_name == "subscription_cancelled":
await update_subscription_status(data["id"], status="cancelled")
elif event_type == "subscription.canceled":
await update_subscription_status(data.get("id", ""), status="cancelled")
elif event_name in ("subscription_expired", "order_refunded"):
await update_subscription_status(data["id"], status="expired")
elif event_type == "subscription.past_due":
await update_subscription_status(data.get("id", ""), status="past_due")
elif event_name == "subscription_payment_failed":
await update_subscription_status(data["id"], status="past_due")
return jsonify({"received": True})
elif event_name == "subscription_paused":
await update_subscription_status(data["id"], status="paused")
elif event_name in ("subscription_unpaused", "subscription_resumed"):
await update_subscription_status(data["id"], status="active")
return jsonify({"received": True}), 200

View File

@@ -32,13 +32,14 @@ class Config:
MAGIC_LINK_EXPIRY_MINUTES: int = int(os.getenv("MAGIC_LINK_EXPIRY_MINUTES", "15"))
SESSION_LIFETIME_DAYS: int = int(os.getenv("SESSION_LIFETIME_DAYS", "30"))
PAYMENT_PROVIDER: str = "lemonsqueezy"
PAYMENT_PROVIDER: str = "paddle"
LEMONSQUEEZY_API_KEY: str = os.getenv("LEMONSQUEEZY_API_KEY", "")
LEMONSQUEEZY_STORE_ID: str = os.getenv("LEMONSQUEEZY_STORE_ID", "")
LEMONSQUEEZY_WEBHOOK_SECRET: str = os.getenv("LEMONSQUEEZY_WEBHOOK_SECRET", "")
LEMONSQUEEZY_MONTHLY_VARIANT_ID: str = os.getenv("LEMONSQUEEZY_MONTHLY_VARIANT_ID", "")
LEMONSQUEEZY_YEARLY_VARIANT_ID: str = os.getenv("LEMONSQUEEZY_YEARLY_VARIANT_ID", "")
PADDLE_API_KEY: str = os.getenv("PADDLE_API_KEY", "")
PADDLE_WEBHOOK_SECRET: str = os.getenv("PADDLE_WEBHOOK_SECRET", "")
PADDLE_PRICES: dict = {
"starter": os.getenv("PADDLE_PRICE_STARTER", ""),
"pro": os.getenv("PADDLE_PRICE_PRO", ""),
}
RESEND_API_KEY: str = os.getenv("RESEND_API_KEY", "")
EMAIL_FROM: str = os.getenv("EMAIL_FROM", "hello@padelnomics.io")

View File

@@ -35,8 +35,8 @@ CREATE TABLE IF NOT EXISTS subscriptions (
plan TEXT NOT NULL DEFAULT 'free',
status TEXT NOT NULL DEFAULT 'free',
lemonsqueezy_customer_id TEXT,
lemonsqueezy_subscription_id TEXT,
paddle_customer_id TEXT,
paddle_subscription_id TEXT,
current_period_end TEXT,
created_at TEXT NOT NULL,
@@ -45,7 +45,7 @@ CREATE TABLE IF NOT EXISTS subscriptions (
CREATE INDEX IF NOT EXISTS idx_subscriptions_user ON subscriptions(user_id);
CREATE INDEX IF NOT EXISTS idx_subscriptions_provider ON subscriptions(lemonsqueezy_subscription_id);
CREATE INDEX IF NOT EXISTS idx_subscriptions_provider ON subscriptions(paddle_subscription_id);
-- API Keys

View File

@@ -1,5 +1,5 @@
"""
Shared test fixtures for the padelnomics test suite.
Shared test fixtures for the Padelnomics test suite.
"""
import hashlib
import hmac
@@ -12,7 +12,6 @@ import pytest
from padelnomics import core
from padelnomics.app import create_app
from padelnomics.billing.routes import VARIANT_TO_PLAN
SCHEMA_PATH = Path(__file__).parent.parent / "src" / "padelnomics" / "migrations" / "schema.sql"
@@ -91,17 +90,17 @@ def create_subscription(db):
user_id: int,
plan: str = "pro",
status: str = "active",
ls_customer_id: str = "cust_123",
ls_subscription_id: str = "sub_456",
paddle_customer_id: str = "ctm_test123",
paddle_subscription_id: str = "sub_test456",
current_period_end: str = "2025-03-01T00:00:00Z",
) -> int:
now = datetime.utcnow().isoformat()
async with db.execute(
"""INSERT INTO subscriptions
(user_id, plan, status, lemonsqueezy_customer_id,
lemonsqueezy_subscription_id, current_period_end, created_at, updated_at)
(user_id, plan, status, paddle_customer_id,
paddle_subscription_id, current_period_end, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(user_id, plan, status, ls_customer_id, ls_subscription_id,
(user_id, plan, status, paddle_customer_id, paddle_subscription_id,
current_period_end, now, now),
) as cursor:
sub_id = cursor.lastrowid
@@ -114,14 +113,12 @@ def create_subscription(db):
@pytest.fixture(autouse=True)
def patch_config():
"""Set test LemonSqueezy config values. Clears variant cache on teardown."""
"""Set test Paddle config values."""
original_values = {}
test_values = {
"LEMONSQUEEZY_API_KEY": "test_api_key_123",
"LEMONSQUEEZY_STORE_ID": "store_999",
"LEMONSQUEEZY_WEBHOOK_SECRET": "whsec_test_secret",
"LEMONSQUEEZY_MONTHLY_VARIANT_ID": "variant_monthly_100",
"LEMONSQUEEZY_YEARLY_VARIANT_ID": "variant_yearly_200",
"PADDLE_API_KEY": "test_api_key_123",
"PADDLE_WEBHOOK_SECRET": "whsec_test_secret",
"PADDLE_PRICES": {"starter": "pri_starter_123", "pro": "pri_pro_456"},
"BASE_URL": "http://localhost:5000",
"DEBUG": True,
}
@@ -129,11 +126,8 @@ def patch_config():
original_values[key] = getattr(core.config, key, None)
setattr(core.config, key, val)
VARIANT_TO_PLAN.clear()
yield
VARIANT_TO_PLAN.clear()
for key, val in original_values.items():
setattr(core.config, key, val)
@@ -141,35 +135,25 @@ def patch_config():
# ── Webhook helpers ──────────────────────────────────────────
def make_webhook_payload(
event_name: str,
subscription_id: str = "sub_456",
event_type: str,
subscription_id: str = "sub_test456",
customer_id: str = "ctm_test123",
user_id: str = "1",
variant_id: str = "variant_monthly_100",
plan: str = "starter",
status: str = "active",
customer_id: int = 67890,
renews_at: str = "2025-03-01T00:00:00.000000Z",
ends_at: str = "2025-03-01T00:00:00.000000Z",
) -> dict:
"""Build a LemonSqueezy webhook payload dict."""
"""Build a Paddle webhook payload dict."""
return {
"meta": {
"event_name": event_name,
"custom_data": {"user_id": user_id},
"webhook_id": "wh_test_123",
},
"event_type": event_type,
"data": {
"type": "subscriptions",
"id": subscription_id,
"attributes": {
"store_id": 12345,
"customer_id": customer_id,
"order_id": 11111,
"product_id": 22222,
"variant_id": variant_id,
"status": status,
"renews_at": renews_at,
"ends_at": None,
"created_at": "2025-02-01T00:00:00.000000Z",
"updated_at": "2025-02-01T00:00:00.000000Z",
"status": status,
"customer_id": customer_id,
"custom_data": {"user_id": user_id, "plan": plan},
"current_billing_period": {
"starts_at": "2025-02-01T00:00:00.000000Z",
"ends_at": ends_at,
},
},
}

View File

@@ -7,9 +7,7 @@ from hypothesis import settings as h_settings
from hypothesis import strategies as st
from padelnomics.billing.routes import (
VARIANT_TO_PLAN,
can_access_feature,
determine_plan,
get_subscription,
get_subscription_by_provider_id,
is_within_limits,
@@ -54,14 +52,14 @@ class TestUpsertSubscription:
row = await get_subscription(test_user["id"])
assert row["plan"] == "pro"
assert row["status"] == "active"
assert row["lemonsqueezy_customer_id"] == "cust_abc"
assert row["lemonsqueezy_subscription_id"] == "sub_xyz"
assert row["paddle_customer_id"] == "cust_abc"
assert row["paddle_subscription_id"] == "sub_xyz"
assert row["current_period_end"] == "2025-06-01T00:00:00Z"
async def test_update_existing_subscription(self, db, test_user, create_subscription):
original_id = await create_subscription(
test_user["id"], plan="starter", status="active",
ls_subscription_id="sub_old",
paddle_subscription_id="sub_old",
)
returned_id = await upsert_subscription(
user_id=test_user["id"],
@@ -73,7 +71,7 @@ class TestUpsertSubscription:
assert returned_id == original_id
row = await get_subscription(test_user["id"])
assert row["plan"] == "pro"
assert row["lemonsqueezy_subscription_id"] == "sub_new"
assert row["paddle_subscription_id"] == "sub_new"
async def test_upsert_with_none_period_end(self, db, test_user):
await upsert_subscription(
@@ -97,8 +95,8 @@ class TestGetSubscriptionByProviderId:
result = await get_subscription_by_provider_id("nonexistent")
assert result is None
async def test_finds_by_lemonsqueezy_subscription_id(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_findme")
async def test_finds_by_paddle_subscription_id(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], paddle_subscription_id="sub_findme")
result = await get_subscription_by_provider_id("sub_findme")
assert result is not None
assert result["user_id"] == test_user["id"]
@@ -110,14 +108,14 @@ class TestGetSubscriptionByProviderId:
class TestUpdateSubscriptionStatus:
async def test_updates_status(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="active", ls_subscription_id="sub_upd")
await create_subscription(test_user["id"], status="active", paddle_subscription_id="sub_upd")
await update_subscription_status("sub_upd", status="cancelled")
row = await get_subscription(test_user["id"])
assert row["status"] == "cancelled"
assert row["updated_at"] is not None
async def test_updates_extra_fields(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_extra")
await create_subscription(test_user["id"], paddle_subscription_id="sub_extra")
await update_subscription_status(
"sub_extra",
status="active",
@@ -130,7 +128,7 @@ class TestUpdateSubscriptionStatus:
assert row["current_period_end"] == "2026-01-01T00:00:00Z"
async def test_noop_for_unknown_provider_id(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_known", status="active")
await create_subscription(test_user["id"], paddle_subscription_id="sub_known", status="active")
await update_subscription_status("sub_unknown", status="expired")
row = await get_subscription(test_user["id"])
assert row["status"] == "active" # unchanged
@@ -218,33 +216,7 @@ class TestIsWithinLimits:
# ════════════════════════════════════════════════════════════
# determine_plan
# ════════════════════════════════════════════════════════════
class TestDeterminePlan:
def test_monthly_variant_returns_pro(self):
VARIANT_TO_PLAN.clear()
assert determine_plan("variant_monthly_100") == "pro"
def test_yearly_variant_returns_pro(self):
VARIANT_TO_PLAN.clear()
assert determine_plan("variant_yearly_200") == "pro"
def test_unknown_variant_returns_free(self):
VARIANT_TO_PLAN.clear()
assert determine_plan("unknown_variant") == "free"
def test_integer_variant_id_coerced(self):
VARIANT_TO_PLAN.clear()
assert determine_plan(12345) == "free"
def test_none_variant_returns_free(self):
VARIANT_TO_PLAN.clear()
assert determine_plan(None) == "free"
# ════════════════════════════════════════════════════════════
# Parameterized: status × feature access matrix
# Parameterized: status x feature access matrix
# ════════════════════════════════════════════════════════════
STATUSES = ["free", "active", "on_trial", "cancelled", "past_due", "paused", "expired"]
@@ -269,7 +241,7 @@ async def test_feature_access_matrix(db, test_user, create_subscription, status,
# ════════════════════════════════════════════════════════════
# Parameterized: plan × feature matrix (active status)
# Parameterized: plan x feature matrix (active status)
# ════════════════════════════════════════════════════════════
PLANS = ["free", "starter", "pro"]
@@ -287,7 +259,7 @@ async def test_plan_feature_matrix(db, test_user, create_subscription, plan, fea
# ════════════════════════════════════════════════════════════
# Parameterized: plan × resource limit boundaries
# Parameterized: plan x resource limit boundaries
# ════════════════════════════════════════════════════════════
@pytest.mark.parametrize("plan", PLANS)

View File

@@ -1,15 +1,17 @@
"""
Route tests for billing pages, checkout, manage, cancel, resume.
External LemonSqueezy API calls mocked with respx.
Route integration tests for Paddle billing endpoints.
External Paddle API calls mocked with respx.
"""
import json
import httpx
import pytest
import respx
CHECKOUT_METHOD = "POST"
CHECKOUT_PLAN = "starter"
# ════════════════════════════════════════════════════════════
# GET /billing/pricing
# Public routes (pricing, success)
# ════════════════════════════════════════════════════════════
class TestPricingPage:
@@ -27,15 +29,10 @@ class TestPricingPage:
assert response.status_code == 200
# ════════════════════════════════════════════════════════════
# GET /billing/success
# ════════════════════════════════════════════════════════════
class TestSuccessPage:
async def test_requires_auth(self, client, db):
response = await client.get("/billing/success")
assert response.status_code == 302
assert "/auth/login" in response.headers["Location"]
response = await client.get("/billing/success", follow_redirects=False)
assert response.status_code in (302, 303, 307)
async def test_accessible_with_auth(self, auth_client, db, test_user):
response = await auth_client.get("/billing/success")
@@ -43,266 +40,196 @@ class TestSuccessPage:
# ════════════════════════════════════════════════════════════
# GET /billing/checkout/<plan>
# Checkout
# ════════════════════════════════════════════════════════════
class TestCheckoutRoute:
async def test_requires_auth(self, client, db):
response = await client.get("/billing/checkout/monthly")
assert response.status_code == 302
assert "/auth/login" in response.headers["Location"]
response = await client.post(f"/billing/checkout/{CHECKOUT_PLAN}", follow_redirects=False)
assert response.status_code in (302, 303, 307)
@respx.mock
async def test_monthly_redirects_to_checkout_url(self, auth_client, db, test_user):
checkout_url = "https://checkout.lemonsqueezy.com/checkout/test123"
respx.post("https://api.lemonsqueezy.com/v1/checkouts").mock(
return_value=httpx.Response(
200, json={"data": {"attributes": {"url": checkout_url}}},
)
async def test_creates_checkout_session(self, auth_client, db, test_user):
respx.post("https://api.paddle.com/transactions").mock(
return_value=httpx.Response(200, json={
"data": {
"checkout": {
"url": "https://checkout.paddle.com/test_123"
}
}
})
)
response = await auth_client.get("/billing/checkout/monthly")
assert response.status_code == 302
assert response.headers["Location"] == checkout_url
response = await auth_client.post(f"/billing/checkout/{CHECKOUT_PLAN}", follow_redirects=False)
assert response.status_code in (302, 303, 307)
@respx.mock
async def test_yearly_redirects_to_checkout_url(self, auth_client, db, test_user):
checkout_url = "https://checkout.lemonsqueezy.com/checkout/yearly456"
respx.post("https://api.lemonsqueezy.com/v1/checkouts").mock(
return_value=httpx.Response(
200, json={"data": {"attributes": {"url": checkout_url}}},
)
)
response = await auth_client.get("/billing/checkout/yearly")
assert response.status_code == 302
assert response.headers["Location"] == checkout_url
async def test_invalid_plan_redirects_to_pricing(self, auth_client, db, test_user):
response = await auth_client.get("/billing/checkout/enterprise")
assert response.status_code == 302
assert "/billing/pricing" in response.headers["Location"]
@respx.mock
async def test_ajax_returns_json(self, auth_client, db, test_user):
checkout_url = "https://checkout.lemonsqueezy.com/checkout/ajax789"
respx.post("https://api.lemonsqueezy.com/v1/checkouts").mock(
return_value=httpx.Response(
200, json={"data": {"attributes": {"url": checkout_url}}},
)
)
response = await auth_client.get(
"/billing/checkout/monthly",
headers={"X-Requested-With": "XMLHttpRequest"},
)
assert response.status_code == 200
data = await response.get_json()
assert data["checkout_url"] == checkout_url
@respx.mock
async def test_sends_correct_api_payload(self, auth_client, db, test_user):
route = respx.post("https://api.lemonsqueezy.com/v1/checkouts").mock(
return_value=httpx.Response(
200, json={"data": {"attributes": {"url": "https://example.com"}}},
)
)
await auth_client.get("/billing/checkout/monthly")
assert route.called
sent_json = json.loads(route.calls.last.request.content)
assert sent_json["data"]["type"] == "checkouts"
assert sent_json["data"]["attributes"]["checkout_data"]["email"] == test_user["email"]
assert sent_json["data"]["attributes"]["checkout_data"]["custom"]["user_id"] == str(test_user["id"])
assert sent_json["data"]["relationships"]["store"]["data"]["id"] == "store_999"
assert sent_json["data"]["relationships"]["variant"]["data"]["id"] == "variant_monthly_100"
async def test_invalid_plan_rejected(self, auth_client, db, test_user):
response = await auth_client.post("/billing/checkout/invalid", follow_redirects=False)
assert response.status_code in (302, 303, 307)
@respx.mock
async def test_api_error_propagates(self, auth_client, db, test_user):
respx.post("https://api.lemonsqueezy.com/v1/checkouts").mock(
respx.post("https://api.paddle.com/transactions").mock(
return_value=httpx.Response(500, json={"error": "server error"})
)
with pytest.raises(httpx.HTTPStatusError):
await auth_client.get("/billing/checkout/monthly")
await auth_client.post(f"/billing/checkout/{CHECKOUT_PLAN}")
# ════════════════════════════════════════════════════════════
# POST /billing/manage
# Manage subscription / Portal
# ════════════════════════════════════════════════════════════
class TestManageRoute:
async def test_requires_auth(self, client, db):
response = await client.post("/billing/manage")
assert response.status_code == 302
assert "/auth/login" in response.headers["Location"]
response = await client.post("/billing/manage", follow_redirects=False)
assert response.status_code in (302, 303, 307)
async def test_no_subscription_redirects(self, auth_client, db, test_user):
response = await auth_client.post("/billing/manage")
assert response.status_code == 302
assert "/dashboard" in response.headers["Location"]
async def test_requires_subscription(self, auth_client, db, test_user):
response = await auth_client.post("/billing/manage", follow_redirects=False)
assert response.status_code in (302, 303, 307)
@respx.mock
async def test_redirects_to_portal(self, auth_client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_manage_001")
portal_url = "https://app.lemonsqueezy.com/my-orders/portal"
await create_subscription(test_user["id"], paddle_subscription_id="sub_test")
respx.get("https://api.lemonsqueezy.com/v1/subscriptions/sub_manage_001").mock(
respx.get("https://api.paddle.com/subscriptions/sub_test").mock(
return_value=httpx.Response(200, json={
"data": {"attributes": {"urls": {"customer_portal": portal_url}}}
"data": {
"management_urls": {
"update_payment_method": "https://paddle.com/manage/test_123"
}
}
})
)
response = await auth_client.post("/billing/manage")
assert response.status_code == 302
assert response.headers["Location"] == portal_url
response = await auth_client.post("/billing/manage", follow_redirects=False)
assert response.status_code in (302, 303, 307)
# ════════════════════════════════════════════════════════════
# POST /billing/cancel
# Cancel subscription
# ════════════════════════════════════════════════════════════
class TestCancelRoute:
async def test_requires_auth(self, client, db):
response = await client.post("/billing/cancel")
assert response.status_code == 302
assert "/auth/login" in response.headers["Location"]
response = await client.post("/billing/cancel", follow_redirects=False)
assert response.status_code in (302, 303, 307)
async def test_no_subscription_redirects(self, auth_client, db, test_user):
response = await auth_client.post("/billing/cancel")
assert response.status_code == 302
assert "/dashboard" in response.headers["Location"]
async def test_no_error_without_subscription(self, auth_client, db, test_user):
response = await auth_client.post("/billing/cancel", follow_redirects=False)
assert response.status_code in (302, 303, 307)
@respx.mock
async def test_sends_cancel_patch(self, auth_client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_cancel_route")
async def test_cancels_subscription(self, auth_client, db, test_user, create_subscription):
await create_subscription(test_user["id"], paddle_subscription_id="sub_test")
route = respx.patch(
"https://api.lemonsqueezy.com/v1/subscriptions/sub_cancel_route"
).mock(return_value=httpx.Response(200, json={}))
response = await auth_client.post("/billing/cancel")
assert response.status_code == 302
assert "/dashboard" in response.headers["Location"]
assert route.called
sent_json = json.loads(route.calls.last.request.content)
assert sent_json["data"]["attributes"]["cancelled"] is True
# ════════════════════════════════════════════════════════════
# POST /billing/resume
# ════════════════════════════════════════════════════════════
class TestResumeRoute:
async def test_requires_auth(self, client, db):
response = await client.post("/billing/resume")
assert response.status_code == 302
assert "/auth/login" in response.headers["Location"]
@respx.mock
async def test_sends_resume_patch(self, auth_client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_resume_route")
route = respx.patch(
"https://api.lemonsqueezy.com/v1/subscriptions/sub_resume_route"
).mock(return_value=httpx.Response(200, json={}))
response = await auth_client.post("/billing/resume")
assert response.status_code == 302
assert "/dashboard" in response.headers["Location"]
assert route.called
sent_json = json.loads(route.calls.last.request.content)
assert sent_json["data"]["attributes"]["cancelled"] is False
respx.post("https://api.paddle.com/subscriptions/sub_test/cancel").mock(
return_value=httpx.Response(200, json={"data": {}})
)
response = await auth_client.post("/billing/cancel", follow_redirects=False)
assert response.status_code in (302, 303, 307)
# ════════════════════════════════════════════════════════════
# subscription_required decorator
# ════════════════════════════════════════════════════════════
class TestSubscriptionRequired:
from quart import Blueprint # noqa: E402
from padelnomics.billing.routes import subscription_required # noqa: E402
test_bp = Blueprint("test", __name__)
@test_bp.route("/protected")
@subscription_required()
async def protected_route():
return "success", 200
@test_bp.route("/custom_allowed")
@subscription_required(allowed=("active", "past_due"))
async def custom_allowed_route():
return "success", 200
class TestSubscriptionRequiredDecorator:
@pytest.fixture
async def gated_app(self, app):
"""Register a test route using subscription_required with restricted allowed."""
from padelnomics.billing.routes import subscription_required
@app.route("/test-gated")
@subscription_required(allowed=("active", "on_trial"))
async def gated():
return "OK", 200
async def test_app(self, app):
app.register_blueprint(test_bp)
return app
async def test_no_session_redirects_to_login(self, gated_app, db):
async with gated_app.test_client() as c:
response = await c.get("/test-gated")
assert response.status_code == 302
assert "/auth/login" in response.headers["Location"]
@pytest.fixture
async def test_client(self, test_app):
async with test_app.test_client() as c:
yield c
async def test_no_subscription_redirects_to_pricing(self, gated_app, db, test_user):
async with gated_app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await c.get("/test-gated")
assert response.status_code == 302
assert "/billing/pricing" in response.headers["Location"]
async def test_redirects_unauthenticated(self, test_client, db):
response = await test_client.get("/protected", follow_redirects=False)
assert response.status_code in (302, 303, 307)
async def test_active_passes(self, gated_app, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="active")
async with gated_app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await c.get("/test-gated")
assert response.status_code == 200
async def test_on_trial_passes(self, gated_app, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="on_trial")
async with gated_app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await c.get("/test-gated")
assert response.status_code == 200
async def test_cancelled_rejected_when_not_in_allowed(self, gated_app, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="cancelled")
async with gated_app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await c.get("/test-gated")
assert response.status_code == 302
assert "/billing/pricing" in response.headers["Location"]
async def test_expired_redirects(self, gated_app, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="expired")
async with gated_app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await c.get("/test-gated")
assert response.status_code == 302
# ════════════════════════════════════════════════════════════
# Parameterized: subscription_required default allowed
# ════════════════════════════════════════════════════════════
ALL_STATUSES = ["free", "active", "on_trial", "cancelled", "past_due", "paused", "expired"]
DEFAULT_ALLOWED = ("active", "on_trial", "cancelled")
@pytest.mark.parametrize("status", ALL_STATUSES)
async def test_subscription_required_default_allowed(app, db, test_user, create_subscription, status):
from padelnomics.billing.routes import subscription_required
@app.route(f"/test-gate-{status}")
@subscription_required()
async def gated():
return "OK", 200
if status != "free":
await create_subscription(test_user["id"], status=status)
async with app.test_client() as c:
async with c.session_transaction() as sess:
async def test_redirects_without_subscription(self, test_client, db, test_user):
async with test_client.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await c.get(f"/test-gate-{status}")
if status in DEFAULT_ALLOWED:
response = await test_client.get("/protected", follow_redirects=False)
assert response.status_code in (302, 303, 307)
async def test_allows_active_subscription(self, test_client, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="active")
async with test_client.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await test_client.get("/protected")
assert response.status_code == 200
async def test_allows_on_trial(self, test_client, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="on_trial")
async with test_client.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await test_client.get("/protected")
assert response.status_code == 200
async def test_allows_cancelled(self, test_client, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="cancelled")
async with test_client.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await test_client.get("/protected")
assert response.status_code == 200
async def test_rejects_expired(self, test_client, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="expired")
async with test_client.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await test_client.get("/protected", follow_redirects=False)
assert response.status_code in (302, 303, 307)
@pytest.mark.parametrize("status", ["free", "active", "on_trial", "cancelled", "past_due", "paused", "expired"])
async def test_default_allowed_tuple(self, test_client, db, test_user, create_subscription, status):
if status != "free":
await create_subscription(test_user["id"], plan="pro", status=status)
async with test_client.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await test_client.get("/protected", follow_redirects=False)
if status in ("active", "on_trial", "cancelled"):
assert response.status_code == 200
else:
assert response.status_code == 302
assert response.status_code in (302, 303, 307)
async def test_custom_allowed_tuple(self, test_client, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="past_due")
async with test_client.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await test_client.get("/custom_allowed")
assert response.status_code == 200

View File

@@ -1,8 +1,6 @@
"""
Integration tests for the LemonSqueezy webhook endpoint.
Tests signature verification, all event types, parameterized status
transitions, and Hypothesis fuzzing.
Integration tests for Paddle webhook handling.
Covers signature verification, event parsing, subscription lifecycle transitions, and Hypothesis fuzzing.
"""
import json
@@ -14,150 +12,83 @@ from hypothesis import strategies as st
from padelnomics.billing.routes import get_subscription
WEBHOOK_PATH = "/billing/webhook/lemonsqueezy"
WEBHOOK_PATH = "/billing/webhook/paddle"
SIG_HEADER = "Paddle-Signature"
# ════════════════════════════════════════════════════════════
# Signature verification
# Signature Verification
# ════════════════════════════════════════════════════════════
class TestWebhookSignatureVerification:
async def test_valid_signature_returns_200(self, client, db, test_user):
payload = make_webhook_payload("subscription_created", user_id=str(test_user["id"]))
class TestWebhookSignature:
async def test_missing_signature_rejected(self, client, db):
payload = make_webhook_payload("subscription.activated")
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
assert response.status_code == 200
async def test_invalid_signature_returns_401(self, client, db):
payload = make_webhook_payload("subscription_created")
payload_bytes = json.dumps(payload).encode()
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": "bad_signature", "Content-Type": "application/json"},
)
assert response.status_code == 401
data = await response.get_json()
assert data["error"] == "Invalid signature"
async def test_missing_signature_returns_401(self, client, db):
payload_bytes = json.dumps(make_webhook_payload("subscription_created")).encode()
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"Content-Type": "application/json"},
)
assert response.status_code == 401
assert response.status_code in (400, 401)
async def test_empty_signature_returns_401(self, client, db):
payload_bytes = json.dumps(make_webhook_payload("subscription_created")).encode()
async def test_invalid_signature_rejected(self, client, db):
payload = make_webhook_payload("subscription.activated")
payload_bytes = json.dumps(payload).encode()
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": "", "Content-Type": "application/json"},
headers={SIG_HEADER: "invalid_signature", "Content-Type": "application/json"},
)
assert response.status_code == 401
assert response.status_code in (400, 401)
async def test_tampered_payload_returns_401(self, client, db, test_user):
payload = make_webhook_payload("subscription_created", user_id=str(test_user["id"]))
async def test_valid_signature_accepted(self, client, db, test_user):
payload = make_webhook_payload("subscription.activated", user_id=str(test_user["id"]))
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code in (200, 204)
async def test_modified_payload_rejected(self, client, db, test_user):
payload = make_webhook_payload("subscription.activated", user_id=str(test_user["id"]))
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
tampered = payload_bytes + b"extra"
response = await client.post(
WEBHOOK_PATH,
data=tampered,
headers={"X-Signature": sig, "Content-Type": "application/json"},
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code == 401
assert response.status_code in (400, 401)
async def test_empty_payload_rejected(self, client, db):
sig = sign_payload(b"")
# ════════════════════════════════════════════════════════════
# subscription_created
# ════════════════════════════════════════════════════════════
class TestWebhookSubscriptionCreated:
async def test_creates_new_subscription(self, client, db, test_user):
payload = make_webhook_payload(
"subscription_created",
subscription_id="sub_new_001",
user_id=str(test_user["id"]),
variant_id="variant_monthly_100",
status="active",
customer_id=99999,
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
assert response.status_code == 200
sub = await get_subscription(test_user["id"])
assert sub is not None
assert sub["plan"] == "pro"
assert sub["status"] == "active"
assert sub["lemonsqueezy_subscription_id"] == "sub_new_001"
assert sub["lemonsqueezy_customer_id"] == "99999"
async def test_unknown_variant_gets_free_plan(self, client, db, test_user):
payload = make_webhook_payload(
"subscription_created",
user_id=str(test_user["id"]),
variant_id="unknown_variant",
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["plan"] == "free"
async def test_missing_user_id_causes_db_error(self, client, db):
"""When user_id is absent, the handler passes user_id=0 which violates the FK constraint."""
payload = make_webhook_payload("subscription_created")
payload["meta"]["custom_data"] = {}
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
with pytest.raises(Exception):
with pytest.raises(Exception): # JSONDecodeError in TESTING mode
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
data=b"",
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
# ════════════════════════════════════════════════════════════
# subscription_updated
# Subscription Lifecycle Events
# ════════════════════════════════════════════════════════════
class TestWebhookSubscriptionUpdated:
async def test_updates_existing_subscription(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_upd_001", plan="starter")
class TestWebhookSubscriptionActivated:
async def test_creates_subscription(self, client, db, test_user):
payload = make_webhook_payload(
"subscription_updated",
subscription_id="sub_upd_001",
variant_id="variant_yearly_200",
status="active",
renews_at="2026-01-01T00:00:00Z",
"subscription.activated",
user_id=str(test_user["id"]),
plan="starter",
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
@@ -165,233 +96,107 @@ class TestWebhookSubscriptionUpdated:
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code == 200
assert response.status_code in (200, 204)
sub = await get_subscription(test_user["id"])
assert sub["plan"] == "pro"
assert sub["current_period_end"] == "2026-01-01T00:00:00Z"
assert sub is not None
assert sub["plan"] == "starter"
assert sub["status"] == "active"
# ════════════════════════════════════════════════════════════
# subscription_payment_success
# ════════════════════════════════════════════════════════════
class TestWebhookPaymentSuccess:
async def test_updates_status_and_period(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_pay_001")
class TestWebhookSubscriptionUpdated:
async def test_updates_subscription_status(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="active", paddle_subscription_id="sub_test456")
payload = make_webhook_payload(
"subscription_payment_success",
subscription_id="sub_pay_001",
status="active",
renews_at="2026-06-01T00:00:00Z",
"subscription.updated",
subscription_id="sub_test456",
status="paused",
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code in (200, 204)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "active"
assert sub["current_period_end"] == "2026-06-01T00:00:00Z"
assert sub["status"] == "paused"
# ════════════════════════════════════════════════════════════
# subscription_cancelled
# ════════════════════════════════════════════════════════════
class TestWebhookSubscriptionCanceled:
async def test_marks_subscription_cancelled(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="active", paddle_subscription_id="sub_test456")
class TestWebhookSubscriptionCancelled:
async def test_sets_status_to_cancelled(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_cancel_001", status="active")
payload = make_webhook_payload("subscription_cancelled", subscription_id="sub_cancel_001")
payload = make_webhook_payload(
"subscription.canceled",
subscription_id="sub_test456",
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code in (200, 204)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "cancelled"
# ════════════════════════════════════════════════════════════
# subscription_expired
# ════════════════════════════════════════════════════════════
class TestWebhookSubscriptionPastDue:
async def test_marks_subscription_past_due(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="active", paddle_subscription_id="sub_test456")
class TestWebhookSubscriptionExpired:
async def test_sets_status_to_expired(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_exp_001")
payload = make_webhook_payload("subscription_expired", subscription_id="sub_exp_001")
payload = make_webhook_payload(
"subscription.past_due",
subscription_id="sub_test456",
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "expired"
assert response.status_code in (200, 204)
# ════════════════════════════════════════════════════════════
# order_refunded
# ════════════════════════════════════════════════════════════
class TestWebhookOrderRefunded:
async def test_sets_status_to_expired(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_refund_001")
payload = make_webhook_payload("order_refunded", subscription_id="sub_refund_001")
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "expired"
# ════════════════════════════════════════════════════════════
# subscription_payment_failed
# ════════════════════════════════════════════════════════════
class TestWebhookPaymentFailed:
async def test_sets_status_to_past_due(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_fail_001")
payload = make_webhook_payload("subscription_payment_failed", subscription_id="sub_fail_001")
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "past_due"
# ════════════════════════════════════════════════════════════
# subscription_paused
# Parameterized: event -> status transitions
# ════════════════════════════════════════════════════════════
class TestWebhookSubscriptionPaused:
async def test_sets_status_to_paused(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_pause_001")
@pytest.mark.parametrize("event_type,expected_status", [
("subscription.activated", "active"),
("subscription.updated", "active"),
("subscription.canceled", "cancelled"),
("subscription.past_due", "past_due"),
])
async def test_event_status_transitions(client, db, test_user, create_subscription, event_type, expected_status):
if event_type != "subscription.activated":
await create_subscription(test_user["id"], paddle_subscription_id="sub_test456")
payload = make_webhook_payload("subscription_paused", subscription_id="sub_pause_001")
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "paused"
# ════════════════════════════════════════════════════════════
# subscription_unpaused / subscription_resumed
# ════════════════════════════════════════════════════════════
class TestWebhookSubscriptionUnpaused:
async def test_unpaused_sets_active(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_unpause_001", status="paused")
payload = make_webhook_payload("subscription_unpaused", subscription_id="sub_unpause_001")
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "active"
async def test_resumed_sets_active(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_resume_001", status="paused")
payload = make_webhook_payload("subscription_resumed", subscription_id="sub_resume_001")
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "active"
# ════════════════════════════════════════════════════════════
# Unknown event
# ════════════════════════════════════════════════════════════
class TestWebhookUnknownEvent:
async def test_unknown_event_returns_200(self, client, db, test_user):
payload = make_webhook_payload("some_future_event", user_id=str(test_user["id"]))
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
assert response.status_code == 200
# ════════════════════════════════════════════════════════════
# Parameterized: event → expected status
# ════════════════════════════════════════════════════════════
WEBHOOK_EVENT_STATUS_MAP = [
("subscription_cancelled", "cancelled"),
("subscription_expired", "expired"),
("order_refunded", "expired"),
("subscription_payment_failed", "past_due"),
("subscription_paused", "paused"),
("subscription_unpaused", "active"),
("subscription_resumed", "active"),
]
@pytest.mark.parametrize("event_name,expected_status", WEBHOOK_EVENT_STATUS_MAP)
async def test_webhook_event_status_transition(
client, db, test_user, create_subscription, event_name, expected_status,
):
sub_id = f"sub_param_{event_name}"
await create_subscription(test_user["id"], ls_subscription_id=sub_id, status="active")
payload = make_webhook_payload(event_name, subscription_id=sub_id)
payload = make_webhook_payload(event_type, user_id=str(test_user["id"]))
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code == 200
assert response.status_code in (200, 204)
sub = await get_subscription(test_user["id"])
assert sub["status"] == expected_status
@@ -401,50 +206,38 @@ async def test_webhook_event_status_transition(
# Hypothesis: fuzz webhook payloads
# ════════════════════════════════════════════════════════════
webhook_event_names = st.sampled_from([
"subscription_created", "subscription_updated", "subscription_payment_success",
"subscription_cancelled", "subscription_expired", "order_refunded",
"subscription_payment_failed", "subscription_paused",
"subscription_unpaused", "subscription_resumed",
"unknown_event", "order_created",
fuzz_event_type = st.sampled_from([
"subscription.activated",
"subscription.updated",
"subscription.canceled",
"subscription.past_due",
])
fuzz_status = st.sampled_from(["active", "paused", "past_due", "canceled"])
fuzz_payload = st.fixed_dictionaries({
"meta": st.fixed_dictionaries({
"event_name": webhook_event_names,
"custom_data": st.fixed_dictionaries({
"user_id": st.from_regex(r"[0-9]{1,6}", fullmatch=True),
}),
"webhook_id": st.text(min_size=1, max_size=20),
}),
"data": st.fixed_dictionaries({
"type": st.just("subscriptions"),
"id": st.text(min_size=1, max_size=30).filter(lambda x: x.strip()),
"attributes": st.fixed_dictionaries({
"store_id": st.integers(1, 99999),
"customer_id": st.integers(1, 99999),
"order_id": st.integers(1, 99999),
"product_id": st.integers(1, 99999),
"variant_id": st.text(min_size=1, max_size=30),
"status": st.sampled_from(["active", "on_trial", "cancelled", "past_due", "paused", "expired"]),
"renews_at": st.from_regex(r"2025-\d{2}-\d{2}T00:00:00Z", fullmatch=True),
}),
}),
})
@st.composite
def fuzz_payload(draw):
event_type = draw(fuzz_event_type)
return make_webhook_payload(
event_type=event_type,
subscription_id=f"sub_{draw(st.integers(min_value=100, max_value=999999))}",
user_id=str(draw(st.integers(min_value=1, max_value=999999))),
status=draw(fuzz_status),
)
class TestWebhookHypothesis:
@given(payload_dict=fuzz_payload)
@given(payload_dict=fuzz_payload())
@h_settings(max_examples=50, deadline=5000, suppress_health_check=[HealthCheck.function_scoped_fixture])
async def test_webhook_never_500s(self, client, db, test_user, payload_dict):
# Pin user_id to the test user so subscription_created events don't hit FK violations
payload_dict["meta"]["custom_data"]["user_id"] = str(test_user["id"])
# Pin user_id to the test user so subscription_created/activated events don't hit FK violations
payload_dict["data"]["custom_data"]["user_id"] = str(test_user["id"])
payload_bytes = json.dumps(payload_dict).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code < 500