feat(billing): A2+A4 — extract paddle.py + dispatch layer in routes.py

- New billing/paddle.py: Paddle-specific functions (build_checkout_payload,
  cancel_subscription, get_management_url, verify_webhook, parse_webhook)
- routes.py: _provider() dispatch function selects paddle or stripe module
- Checkout/manage/cancel routes now delegate to _provider()
- /webhook/paddle always active (existing subscribers)
- /webhook/stripe endpoint added (returns 404 until Stripe configured)
- Shared _handle_webhook_event() processes normalized events from any provider
- _price_id_to_key() queries payment_products with paddle_products fallback

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-03-03 15:26:47 +01:00
parent 276328af33
commit 7af9b2c82c
3 changed files with 228 additions and 83 deletions

View File

View File

@@ -0,0 +1,116 @@
"""
Paddle payment provider — checkout, webhook verification, subscription management.
Exports the 5 functions that billing/routes.py dispatches to:
- build_checkout_payload()
- build_multi_item_checkout_payload()
- cancel_subscription()
- get_management_url()
- handle_webhook()
"""
import json
from paddle_billing import Client as PaddleClient
from paddle_billing import Environment, Options
from paddle_billing.Notifications import Secret, Verifier
from ..core import config
def _paddle_client() -> PaddleClient:
"""Create a Paddle SDK client."""
env = Environment.SANDBOX if config.PADDLE_ENVIRONMENT == "sandbox" else Environment.PRODUCTION
return PaddleClient(config.PADDLE_API_KEY, options=Options(env))
class _WebhookRequest:
"""Minimal wrapper satisfying paddle_billing's Request Protocol."""
def __init__(self, body: bytes, headers):
self.body = body
self.headers = headers
_verifier = Verifier(maximum_variance=300)
def build_checkout_payload(
price_id: str, custom_data: dict, success_url: str,
) -> dict:
"""Build JSON payload for a single-item Paddle.js overlay checkout."""
return {
"items": [{"priceId": price_id, "quantity": 1}],
"customData": custom_data,
"settings": {"successUrl": success_url},
}
def build_multi_item_checkout_payload(
items: list[dict], custom_data: dict, success_url: str,
) -> dict:
"""Build JSON payload for a multi-item Paddle.js overlay checkout."""
return {
"items": items,
"customData": custom_data,
"settings": {"successUrl": success_url},
}
def cancel_subscription(provider_subscription_id: str) -> None:
"""Cancel a Paddle subscription at end of current billing period."""
from paddle_billing.Resources.Subscriptions.Operations import CancelSubscription
paddle = _paddle_client()
paddle.subscriptions.cancel(
provider_subscription_id,
CancelSubscription(effective_from="next_billing_period"),
)
def get_management_url(provider_subscription_id: str) -> str:
"""Get the Paddle customer portal URL for updating payment method."""
paddle = _paddle_client()
paddle_sub = paddle.subscriptions.get(provider_subscription_id)
return paddle_sub.management_urls.update_payment_method
def verify_webhook(payload: bytes, headers) -> bool:
"""Verify Paddle webhook signature. Returns True if valid or no secret configured."""
if not config.PADDLE_WEBHOOK_SECRET:
return True
try:
return _verifier.verify(
_WebhookRequest(payload, headers),
Secret(config.PADDLE_WEBHOOK_SECRET),
)
except (ConnectionRefusedError, ValueError):
return False
def parse_webhook(payload: bytes) -> dict:
"""Parse a Paddle webhook payload into a normalized event dict.
Returns dict with keys: event_type, subscription_id, customer_id,
user_id, supplier_id, plan, status, current_period_end, data, items.
"""
event = json.loads(payload)
event_type = event.get("event_type", "")
data = event.get("data") or {}
custom_data = data.get("custom_data") or {}
billing_period = data.get("current_billing_period") or {}
return {
"event_type": event_type,
"subscription_id": data.get("id", ""),
"customer_id": str(data.get("customer_id", "")),
"user_id": custom_data.get("user_id"),
"supplier_id": custom_data.get("supplier_id"),
"plan": custom_data.get("plan", ""),
"status": data.get("status", ""),
"current_period_end": billing_period.get("ends_at"),
"data": data,
"items": data.get("items", []),
"custom_data": custom_data,
}

View File

@@ -1,6 +1,9 @@
"""
Billing domain: checkout, webhooks, subscription management.
Payment provider: paddle
Provider dispatch: PAYMENT_PROVIDER env var selects 'paddle' or 'stripe'.
Both webhook endpoints (/webhook/paddle and /webhook/stripe) stay active
regardless of the toggle — existing subscribers keep sending webhooks.
"""
import json
@@ -8,20 +11,21 @@ import secrets
from datetime import timedelta
from pathlib import Path
from paddle_billing import Client as PaddleClient
from paddle_billing import Environment, Options
from paddle_billing.Notifications import Secret, Verifier
from quart import Blueprint, flash, g, jsonify, redirect, render_template, request, session, url_for
from ..auth.routes import login_required
from ..core import config, execute, fetch_one, get_paddle_price, utcnow, utcnow_iso
from ..core import config, execute, fetch_one, get_price_id, utcnow, utcnow_iso
from ..i18n import get_translations
def _paddle_client() -> PaddleClient:
"""Create a Paddle SDK client. Used only for subscription management + webhook verification."""
env = Environment.SANDBOX if config.PADDLE_ENVIRONMENT == "sandbox" else Environment.PRODUCTION
return PaddleClient(config.PADDLE_API_KEY, options=Options(env))
def _provider():
"""Return the active payment provider module."""
if config.PAYMENT_PROVIDER == "stripe":
from . import stripe as mod
else:
from . import paddle as mod
return mod
# Blueprint with its own template folder
bp = Blueprint(
@@ -33,7 +37,7 @@ bp = Blueprint(
# =============================================================================
# SQL Queries
# SQL Queries (provider-agnostic)
# =============================================================================
async def get_subscription(user_id: int) -> dict | None:
@@ -132,7 +136,7 @@ async def is_within_limits(user_id: int, resource: str, current_count: int) -> b
# =============================================================================
# Routes
# Routes (provider-agnostic)
# =============================================================================
@bp.route("/pricing")
@@ -151,129 +155,145 @@ async def success():
return await render_template("success.html")
# =============================================================================
# Paddle Implementation — Paddle.js Overlay Checkout
# Checkout / Manage / Cancel — dispatched to active provider
# =============================================================================
@bp.route("/checkout/<plan>", methods=["POST"])
@login_required
async def checkout(plan: str):
"""Return JSON for Paddle.js overlay checkout."""
price_id = await get_paddle_price(plan)
"""Return JSON for checkout (overlay for Paddle, redirect URL for Stripe)."""
price_id = await get_price_id(plan)
if not price_id:
return jsonify({"error": "Invalid plan selected."}), 400
return jsonify({
"items": [{"priceId": price_id, "quantity": 1}],
"customData": {"user_id": str(g.user["id"]), "plan": plan},
"settings": {
"successUrl": f"{config.BASE_URL}/billing/success",
},
})
payload = _provider().build_checkout_payload(
price_id=price_id,
custom_data={"user_id": str(g.user["id"]), "plan": plan},
success_url=f"{config.BASE_URL}/billing/success",
)
return jsonify(payload)
@bp.route("/manage", methods=["POST"])
@login_required
async def manage():
"""Redirect to Paddle customer portal."""
"""Redirect to payment provider's customer portal."""
sub = await get_subscription(g.user["id"])
if not sub or not sub.get("provider_subscription_id"):
t = get_translations(g.get("lang") or "en")
await flash(t["billing_no_subscription"], "error")
return redirect(url_for("dashboard.settings"))
paddle = _paddle_client()
paddle_sub = paddle.subscriptions.get(sub["provider_subscription_id"])
portal_url = paddle_sub.management_urls.update_payment_method
portal_url = _provider().get_management_url(sub["provider_subscription_id"])
return redirect(portal_url)
@bp.route("/cancel", methods=["POST"])
@login_required
async def cancel():
"""Cancel subscription via Paddle API."""
"""Cancel subscription via active payment provider."""
sub = await get_subscription(g.user["id"])
if sub and sub.get("provider_subscription_id"):
from paddle_billing.Resources.Subscriptions.Operations import CancelSubscription
paddle = _paddle_client()
paddle.subscriptions.cancel(
sub["provider_subscription_id"],
CancelSubscription(effective_from="next_billing_period"),
)
_provider().cancel_subscription(sub["provider_subscription_id"])
return redirect(url_for("dashboard.settings"))
class _WebhookRequest:
"""Minimal wrapper satisfying paddle_billing's Request Protocol."""
def __init__(self, body: bytes, headers):
self.body = body
self.headers = headers
_verifier = Verifier(maximum_variance=300)
# =============================================================================
# Paddle Webhook — always active (existing subscribers keep sending)
# =============================================================================
@bp.route("/webhook/paddle", methods=["POST"])
async def webhook():
"""Handle Paddle webhooks."""
async def webhook_paddle():
"""Handle Paddle webhooks — always active regardless of PAYMENT_PROVIDER toggle."""
from . import paddle as paddle_mod
payload = await request.get_data()
if config.PADDLE_WEBHOOK_SECRET:
try:
ok = _verifier.verify(
_WebhookRequest(payload, request.headers),
Secret(config.PADDLE_WEBHOOK_SECRET),
)
except (ConnectionRefusedError, ValueError):
ok = False
if not ok:
return jsonify({"error": "Invalid signature"}), 400
if not paddle_mod.verify_webhook(payload, request.headers):
return jsonify({"error": "Invalid signature"}), 400
try:
event = json.loads(payload)
ev = paddle_mod.parse_webhook(payload)
except (json.JSONDecodeError, ValueError):
return jsonify({"error": "Invalid JSON payload"}), 400
event_type = event.get("event_type")
data = event.get("data") or {}
custom_data = data.get("custom_data") or {}
user_id = custom_data.get("user_id")
plan = custom_data.get("plan", "")
# Store billing customer for any subscription event with a customer_id
customer_id = str(data.get("customer_id", ""))
await _handle_webhook_event(ev)
return jsonify({"received": True})
# =============================================================================
# Stripe Webhook — always active (once Stripe is configured)
# =============================================================================
@bp.route("/webhook/stripe", methods=["POST"])
async def webhook_stripe():
"""Handle Stripe webhooks — always active regardless of PAYMENT_PROVIDER toggle."""
if not config.STRIPE_WEBHOOK_SECRET:
return jsonify({"error": "Stripe not configured"}), 404
from . import stripe as stripe_mod
payload = await request.get_data()
if not stripe_mod.verify_webhook(payload, request.headers):
return jsonify({"error": "Invalid signature"}), 400
try:
ev = stripe_mod.parse_webhook(payload)
except (json.JSONDecodeError, ValueError):
return jsonify({"error": "Invalid payload"}), 400
await _handle_webhook_event(ev)
return jsonify({"received": True})
# =============================================================================
# Shared Webhook Event Handler (provider-agnostic)
# =============================================================================
async def _handle_webhook_event(ev: dict) -> None:
"""Process a normalized webhook event from any provider.
ev keys: event_type, subscription_id, customer_id, user_id, supplier_id,
plan, status, current_period_end, data, items, custom_data
"""
event_type = ev.get("event_type", "")
user_id = ev.get("user_id")
plan = ev.get("plan", "")
# Store billing customer
customer_id = ev.get("customer_id", "")
if customer_id and user_id:
await upsert_billing_customer(int(user_id), customer_id)
if event_type == "subscription.activated":
if plan.startswith("supplier_"):
await _handle_supplier_subscription_activated(data, custom_data)
await _handle_supplier_subscription_activated(ev)
elif user_id:
await upsert_subscription(
user_id=int(user_id),
plan=plan or "starter",
status="active",
provider_subscription_id=data.get("id", ""),
current_period_end=(data.get("current_billing_period") or {}).get("ends_at"),
provider_subscription_id=ev.get("subscription_id", ""),
current_period_end=ev.get("current_period_end"),
)
elif event_type == "subscription.updated":
await update_subscription_status(
data.get("id", ""),
status=data.get("status", "active"),
current_period_end=(data.get("current_billing_period") or {}).get("ends_at"),
ev.get("subscription_id", ""),
status=ev.get("status", "active"),
current_period_end=ev.get("current_period_end"),
)
elif event_type == "subscription.canceled":
await update_subscription_status(data.get("id", ""), status="cancelled")
await update_subscription_status(ev.get("subscription_id", ""), status="cancelled")
elif event_type == "subscription.past_due":
await update_subscription_status(data.get("id", ""), status="past_due")
await update_subscription_status(ev.get("subscription_id", ""), status="past_due")
elif event_type == "transaction.completed":
await _handle_transaction_completed(data, custom_data)
return jsonify({"received": True})
await _handle_transaction_completed(ev)
# =============================================================================
@@ -301,7 +321,13 @@ BOOST_PRICE_KEYS = {
async def _price_id_to_key(price_id: str) -> str | None:
"""Reverse-lookup a paddle_products key from a Paddle price ID."""
"""Reverse-lookup a product key from a provider price ID."""
row = await fetch_one(
"SELECT key FROM payment_products WHERE provider_price_id = ?", (price_id,)
)
if row:
return row["key"]
# Fallback to old table for pre-migration DBs
row = await fetch_one(
"SELECT key FROM paddle_products WHERE paddle_price_id = ?", (price_id,)
)
@@ -330,13 +356,13 @@ def _derive_tier_from_plan(plan: str) -> tuple[str, str]:
return base, tier
async def _handle_supplier_subscription_activated(data: dict, custom_data: dict) -> None:
async def _handle_supplier_subscription_activated(ev: dict) -> None:
"""Handle supplier plan subscription activation."""
from ..core import transaction as db_transaction
supplier_id = custom_data.get("supplier_id")
plan = custom_data.get("plan", "supplier_growth")
user_id = custom_data.get("user_id")
supplier_id = ev.get("supplier_id")
plan = ev.get("plan", "supplier_growth")
user_id = ev.get("user_id")
if not supplier_id:
return
@@ -365,7 +391,8 @@ async def _handle_supplier_subscription_activated(data: dict, custom_data: dict)
)
# Create boost records for items included in the subscription
items = data.get("items", [])
items = ev.get("items", [])
data = ev.get("data", {})
for item in items:
price_id = item.get("price", {}).get("id", "")
key = await _price_id_to_key(price_id)
@@ -388,13 +415,15 @@ async def _handle_supplier_subscription_activated(data: dict, custom_data: dict)
)
async def _handle_transaction_completed(data: dict, custom_data: dict) -> None:
async def _handle_transaction_completed(ev: dict) -> None:
"""Handle one-time transaction completion (credit packs, sticky boosts, business plan)."""
supplier_id = custom_data.get("supplier_id")
user_id = custom_data.get("user_id")
supplier_id = ev.get("supplier_id")
user_id = ev.get("user_id")
custom_data = ev.get("custom_data", {})
data = ev.get("data", {})
now = utcnow_iso()
items = data.get("items", [])
items = ev.get("items", [])
for item in items:
price_id = item.get("price", {}).get("id", "")
key = await _price_id_to_key(price_id)