Files
beanflows/web/tests/test_billing_webhooks.py
Deeman 52bd731fc3
Some checks failed
CI / test-cli (push) Successful in 11s
CI / test-sqlmesh (push) Successful in 13s
CI / test-web (push) Failing after 14s
CI / tag (push) Has been skipped
chore: fix all ruff lint warnings (unused imports, unsorted imports, unused vars)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 10:05:05 +01:00

278 lines
10 KiB
Python

"""
Integration tests for Paddle webhook handling.
Covers signature verification, event parsing, subscription lifecycle transitions, and Hypothesis fuzzing.
"""
import json
import pytest
from conftest import make_webhook_payload, sign_payload
from hypothesis import HealthCheck, given
from hypothesis import settings as h_settings
from hypothesis import strategies as st
from beanflows.billing.routes import get_billing_customer, get_subscription
WEBHOOK_PATH = "/billing/webhook/paddle"
SIG_HEADER = "Paddle-Signature"
# ════════════════════════════════════════════════════════════
# Signature Verification
# ════════════════════════════════════════════════════════════
class TestWebhookSignature:
async def test_missing_signature_rejected(self, client, db):
payload = make_webhook_payload("subscription.activated")
payload_bytes = json.dumps(payload).encode()
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"Content-Type": "application/json"},
)
assert response.status_code in (400, 401)
async def test_invalid_signature_rejected(self, client, db):
payload = make_webhook_payload("subscription.activated")
payload_bytes = json.dumps(payload).encode()
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={SIG_HEADER: "invalid_signature", "Content-Type": "application/json"},
)
assert response.status_code in (400, 401)
async def test_valid_signature_accepted(self, client, db, test_user):
payload = make_webhook_payload("subscription.activated", user_id=str(test_user["id"]))
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code in (200, 204)
async def test_modified_payload_rejected(self, client, db, test_user):
# Paddle SDK Verifier handles tamper detection internally.
# We test signature rejection via test_invalid_signature_rejected above.
# This test verifies the Verifier is actually called by sending
# a payload with an explicitly bad signature.
payload = make_webhook_payload("subscription.activated", user_id=str(test_user["id"]))
payload_bytes = json.dumps(payload).encode()
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={SIG_HEADER: "invalid_signature", "Content-Type": "application/json"},
)
assert response.status_code == 400
async def test_empty_payload_rejected(self, client, db):
sig = sign_payload(b"")
with pytest.raises(Exception): # JSONDecodeError in TESTING mode
await client.post(
WEBHOOK_PATH,
data=b"",
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
# ════════════════════════════════════════════════════════════
# Subscription Lifecycle Events
# ════════════════════════════════════════════════════════════
class TestWebhookSubscriptionActivated:
async def test_creates_subscription_and_billing_customer(self, client, db, test_user):
payload = make_webhook_payload(
"subscription.activated",
user_id=str(test_user["id"]),
plan="starter",
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code in (200, 204)
sub = await get_subscription(test_user["id"])
assert sub is not None
assert sub["plan"] == "starter"
assert sub["status"] == "active"
bc = await get_billing_customer(test_user["id"])
assert bc is not None
assert bc["provider_customer_id"] == "ctm_test123"
class TestWebhookSubscriptionUpdated:
async def test_updates_subscription_status(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="active", provider_subscription_id="sub_test456")
payload = make_webhook_payload(
"subscription.updated",
subscription_id="sub_test456",
status="paused",
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code in (200, 204)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "paused"
class TestWebhookSubscriptionCanceled:
async def test_marks_subscription_cancelled(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="active", provider_subscription_id="sub_test456")
payload = make_webhook_payload(
"subscription.canceled",
subscription_id="sub_test456",
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code in (200, 204)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "cancelled"
class TestWebhookSubscriptionPastDue:
async def test_marks_subscription_past_due(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="active", provider_subscription_id="sub_test456")
payload = make_webhook_payload(
"subscription.past_due",
subscription_id="sub_test456",
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code in (200, 204)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "past_due"
# ════════════════════════════════════════════════════════════
# Parameterized: event → status transitions
# ════════════════════════════════════════════════════════════
@pytest.mark.parametrize("event_type,expected_status", [
("subscription.activated", "active"),
("subscription.updated", "active"),
("subscription.canceled", "cancelled"),
("subscription.past_due", "past_due"),
])
async def test_event_status_transitions(client, db, test_user, create_subscription, event_type, expected_status):
if event_type != "subscription.activated":
await create_subscription(test_user["id"], provider_subscription_id="sub_test456")
payload = make_webhook_payload(event_type, user_id=str(test_user["id"]))
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code in (200, 204)
sub = await get_subscription(test_user["id"])
assert sub["status"] == expected_status
# ════════════════════════════════════════════════════════════
# Hypothesis: fuzz webhook payloads
# ════════════════════════════════════════════════════════════
fuzz_event_type = st.sampled_from([
"subscription.activated",
"subscription.updated",
"subscription.canceled",
"subscription.past_due",
])
fuzz_status = st.sampled_from(["active", "paused", "past_due", "canceled"])
@st.composite
def fuzz_payload(draw):
event_type = draw(fuzz_event_type)
return make_webhook_payload(
event_type=event_type,
subscription_id=f"sub_{draw(st.integers(min_value=100, max_value=999999))}",
user_id=str(draw(st.integers(min_value=1, max_value=999999))),
status=draw(fuzz_status),
)
class TestWebhookHypothesis:
@given(payload_dict=fuzz_payload())
@h_settings(max_examples=50, deadline=5000, suppress_health_check=[HealthCheck.function_scoped_fixture])
async def test_webhook_never_500s(self, client, db, test_user, payload_dict):
# Pin user_id to the test user so subscription_created/activated events don't hit FK violations
payload_dict["data"]["custom_data"]["user_id"] = str(test_user["id"])
payload_bytes = json.dumps(payload_dict).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={SIG_HEADER: sig, "Content-Type": "application/json"},
)
assert response.status_code < 500