""" Integration tests for supplier webhook handlers. POST real webhook payloads to /billing/webhook/paddle and verify DB state. Uses the existing client, db, sign_payload from conftest. """ import json from datetime import UTC, datetime from unittest.mock import AsyncMock, patch import pytest from conftest import sign_payload from padelnomics.core import utcnow_iso WEBHOOK_PATH = "/billing/webhook/paddle" SIG_HEADER = "Paddle-Signature" # ── Fixtures ───────────────────────────────────────────────── @pytest.fixture async def supplier(db): """Supplier with tier=free, credit_balance=0.""" now = utcnow_iso() async with db.execute( """INSERT INTO suppliers (name, slug, country_code, region, category, tier, credit_balance, monthly_credits, created_at) VALUES ('Webhook Test Supplier', 'webhook-test', 'DE', 'Europe', 'Courts', 'free', 0, 0, ?)""", (now,), ) as cursor: supplier_id = cursor.lastrowid await db.commit() return {"id": supplier_id} @pytest.fixture async def paddle_products(db): """Insert paddle_products rows for all keys the handlers need.""" now = utcnow_iso() products = [ ("credits_25", "pri_credits25", "Credit Pack 25", 999, "one_time"), ("credits_100", "pri_credits100", "Credit Pack 100", 3290, "one_time"), ("boost_sticky_week", "pri_sticky_week", "Sticky Week", 4900, "one_time"), ("boost_sticky_month", "pri_sticky_month", "Sticky Month", 14900, "one_time"), ("boost_highlight", "pri_highlight", "Highlight", 2900, "recurring"), ("boost_verified", "pri_verified", "Verified Badge", 1900, "recurring"), ("boost_card_color", "pri_card_color", "Card Color", 1900, "recurring"), ("supplier_growth", "pri_growth", "Growth Plan", 14900, "recurring"), ("supplier_pro", "pri_pro", "Pro Plan", 39900, "recurring"), ("business_plan", "pri_bplan", "Business Plan PDF", 9900, "one_time"), ] for key, price_id, name, price_cents, billing_type in products: await db.execute( """INSERT INTO paddle_products (key, paddle_product_id, paddle_price_id, name, price_cents, billing_type, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)""", (key, f"pro_{key}", price_id, name, price_cents, billing_type, now), ) await db.commit() # ── Helpers ────────────────────────────────────────────────── def make_transaction_payload(items, supplier_id, **custom_data_extra): """Build a transaction.completed event payload.""" custom_data = {"supplier_id": str(supplier_id), **custom_data_extra} return { "event_type": "transaction.completed", "data": { "id": "txn_test_123", "status": "completed", "customer_id": "ctm_supplier_test", "custom_data": custom_data, "items": [ {"price": {"id": price_id}, "quantity": qty} for price_id, qty in items ], }, } def make_supplier_activation_payload(items, supplier_id, plan, user_id): """Build a subscription.activated event for supplier plans.""" return { "event_type": "subscription.activated", "data": { "id": "sub_supplier_test_456", "status": "active", "customer_id": "ctm_supplier_test", "custom_data": { "supplier_id": str(supplier_id), "plan": plan, "user_id": str(user_id), }, "current_billing_period": { "starts_at": "2026-02-01T00:00:00.000000Z", "ends_at": "2026-03-01T00:00:00.000000Z", }, "items": [ {"price": {"id": price_id}, "quantity": qty} for price_id, qty in items ], }, } def _post_webhook(client, payload): """Sign and POST a webhook payload, return awaitable response.""" payload_bytes = json.dumps(payload).encode() sig = sign_payload(payload_bytes) return client.post( WEBHOOK_PATH, data=payload_bytes, headers={SIG_HEADER: sig, "Content-Type": "application/json"}, ) # ── Credit Pack Purchase ───────────────────────────────────── class TestCreditPackPurchase: async def test_credits_25_adds_25_credits(self, client, db, supplier, paddle_products): payload = make_transaction_payload([("pri_credits25", 1)], supplier["id"]) resp = await _post_webhook(client, payload) assert resp.status_code == 200 row = await db.execute_fetchall( "SELECT credit_balance FROM suppliers WHERE id = ?", (supplier["id"],), ) assert row[0][0] == 25 ledger = await db.execute_fetchall( "SELECT delta, event_type, note FROM credit_ledger WHERE supplier_id = ?", (supplier["id"],), ) assert len(ledger) == 1 assert ledger[0][0] == 25 assert ledger[0][1] == "pack_purchase" async def test_credits_100_adds_100_credits(self, client, db, supplier, paddle_products): payload = make_transaction_payload([("pri_credits100", 1)], supplier["id"]) resp = await _post_webhook(client, payload) assert resp.status_code == 200 row = await db.execute_fetchall( "SELECT credit_balance FROM suppliers WHERE id = ?", (supplier["id"],), ) assert row[0][0] == 100 # ── Sticky Boost Purchase ──────────────────────────────────── class TestStickyBoostPurchase: async def test_sticky_week_creates_boost_and_updates_supplier( self, client, db, supplier, paddle_products, ): payload = make_transaction_payload( [("pri_sticky_week", 1)], supplier["id"], sticky_country="DE", ) resp = await _post_webhook(client, payload) assert resp.status_code == 200 # Verify supplier_boosts row boosts = await db.execute_fetchall( "SELECT boost_type, status, expires_at FROM supplier_boosts WHERE supplier_id = ?", (supplier["id"],), ) assert len(boosts) == 1 assert boosts[0][0] == "sticky_week" assert boosts[0][1] == "active" # expires_at should be ~7 days from now expires = datetime.fromisoformat(boosts[0][2]) assert abs((expires - datetime.now(UTC).replace(tzinfo=None)).days - 7) <= 1 # Verify sticky_until set on supplier sup = await db.execute_fetchall( "SELECT sticky_until, sticky_country FROM suppliers WHERE id = ?", (supplier["id"],), ) assert sup[0][0] is not None # sticky_until set assert sup[0][1] == "DE" async def test_sticky_month_creates_boost_and_updates_supplier( self, client, db, supplier, paddle_products, ): payload = make_transaction_payload( [("pri_sticky_month", 1)], supplier["id"], sticky_country="ES", ) resp = await _post_webhook(client, payload) assert resp.status_code == 200 boosts = await db.execute_fetchall( "SELECT boost_type, expires_at FROM supplier_boosts WHERE supplier_id = ?", (supplier["id"],), ) assert len(boosts) == 1 assert boosts[0][0] == "sticky_month" expires = datetime.fromisoformat(boosts[0][1]) assert abs((expires - datetime.now(UTC).replace(tzinfo=None)).days - 30) <= 1 async def test_sticky_boost_sets_country(self, client, db, supplier, paddle_products): payload = make_transaction_payload( [("pri_sticky_week", 1)], supplier["id"], sticky_country="FR", ) resp = await _post_webhook(client, payload) assert resp.status_code == 200 sup = await db.execute_fetchall( "SELECT sticky_country FROM suppliers WHERE id = ?", (supplier["id"],), ) assert sup[0][0] == "FR" # ── Supplier Subscription Activated ────────────────────────── class TestSupplierSubscriptionActivated: async def test_growth_plan_sets_tier_and_credits( self, client, db, supplier, paddle_products, test_user, ): payload = make_supplier_activation_payload( items=[("pri_growth", 1)], supplier_id=supplier["id"], plan="supplier_growth", user_id=test_user["id"], ) resp = await _post_webhook(client, payload) assert resp.status_code == 200 row = await db.execute_fetchall( "SELECT tier, credit_balance, monthly_credits, claimed_by FROM suppliers WHERE id = ?", (supplier["id"],), ) assert row[0][0] == "growth" assert row[0][1] == 30 assert row[0][2] == 30 assert row[0][3] == test_user["id"] async def test_pro_plan_sets_tier_and_credits( self, client, db, supplier, paddle_products, test_user, ): payload = make_supplier_activation_payload( items=[("pri_pro", 1)], supplier_id=supplier["id"], plan="supplier_pro", user_id=test_user["id"], ) resp = await _post_webhook(client, payload) assert resp.status_code == 200 row = await db.execute_fetchall( "SELECT tier, credit_balance, monthly_credits FROM suppliers WHERE id = ?", (supplier["id"],), ) assert row[0][0] == "pro" assert row[0][1] == 100 assert row[0][2] == 100 async def test_boost_items_create_boost_records( self, client, db, supplier, paddle_products, test_user, ): payload = make_supplier_activation_payload( items=[ ("pri_growth", 1), ("pri_highlight", 1), ("pri_verified", 1), ], supplier_id=supplier["id"], plan="supplier_growth", user_id=test_user["id"], ) resp = await _post_webhook(client, payload) assert resp.status_code == 200 # Verify boost rows boosts = await db.execute_fetchall( "SELECT boost_type FROM supplier_boosts WHERE supplier_id = ? ORDER BY boost_type", (supplier["id"],), ) boost_types = [b[0] for b in boosts] assert "highlight" in boost_types assert "verified" in boost_types # Verify denormalized columns sup = await db.execute_fetchall( "SELECT highlight, is_verified FROM suppliers WHERE id = ?", (supplier["id"],), ) assert sup[0][0] == 1 # highlight assert sup[0][1] == 1 # is_verified async def test_basic_plan_sets_tier_zero_credits_and_verified( self, client, db, supplier, paddle_products, test_user, ): """Basic plan: tier='basic', 0 credits, is_verified=1, no ledger entry.""" payload = make_supplier_activation_payload( items=[], supplier_id=supplier["id"], plan="supplier_basic", user_id=test_user["id"], ) resp = await _post_webhook(client, payload) assert resp.status_code == 200 row = await db.execute_fetchall( "SELECT tier, credit_balance, monthly_credits, is_verified FROM suppliers WHERE id = ?", (supplier["id"],), ) assert row[0][0] == "basic" assert row[0][1] == 0 assert row[0][2] == 0 assert row[0][3] == 1 # No credit ledger entry for Basic (0 credits) ledger = await db.execute_fetchall( "SELECT id FROM credit_ledger WHERE supplier_id = ?", (supplier["id"],), ) assert len(ledger) == 0 async def test_yearly_plan_derives_correct_tier( self, client, db, supplier, paddle_products, test_user, ): """Yearly plan key suffix is stripped; tier derives correctly.""" payload = make_supplier_activation_payload( items=[("pri_growth", 1)], supplier_id=supplier["id"], plan="supplier_growth_yearly", # yearly variant user_id=test_user["id"], ) resp = await _post_webhook(client, payload) assert resp.status_code == 200 row = await db.execute_fetchall( "SELECT tier, credit_balance FROM suppliers WHERE id = ?", (supplier["id"],), ) assert row[0][0] == "growth" assert row[0][1] == 30 async def test_no_supplier_id_is_noop( self, client, db, supplier, paddle_products, test_user, ): """Activation without supplier_id in custom_data does nothing.""" payload = { "event_type": "subscription.activated", "data": { "id": "sub_no_supplier", "status": "active", "customer_id": "ctm_test", "custom_data": { "plan": "supplier_growth", "user_id": str(test_user["id"]), # no supplier_id }, "current_billing_period": { "starts_at": "2026-02-01T00:00:00.000000Z", "ends_at": "2026-03-01T00:00:00.000000Z", }, "items": [], }, } resp = await _post_webhook(client, payload) assert resp.status_code == 200 # Supplier unchanged row = await db.execute_fetchall( "SELECT tier, credit_balance FROM suppliers WHERE id = ?", (supplier["id"],), ) assert row[0][0] == "free" assert row[0][1] == 0 # ── Business Plan Purchase ─────────────────────────────────── class TestBusinessPlanPurchase: async def test_creates_export_record( self, client, db, supplier, paddle_products, test_user, ): # Need a scenario for the export now = utcnow_iso() async with db.execute( """INSERT INTO scenarios (user_id, name, state_json, created_at) VALUES (?, 'Test Scenario', '{}', ?)""", (test_user["id"], now), ) as cursor: scenario_id = cursor.lastrowid await db.commit() payload = { "event_type": "transaction.completed", "data": { "id": "txn_bplan_123", "status": "completed", "customer_id": "ctm_test", "custom_data": { "user_id": str(test_user["id"]), "scenario_id": str(scenario_id), "language": "de", }, "items": [{"price": {"id": "pri_bplan"}, "quantity": 1}], }, } # enqueue is lazily imported inside the handler via `from ..worker import enqueue` # Patching at the module level ensures the lazy import picks up the mock with patch("padelnomics.worker.enqueue", new_callable=AsyncMock): resp = await _post_webhook(client, payload) assert resp.status_code == 200 # Verify business_plan_exports row exports = await db.execute_fetchall( "SELECT user_id, scenario_id, language, status FROM business_plan_exports", ) assert len(exports) == 1 assert exports[0][0] == test_user["id"] assert exports[0][1] == scenario_id assert exports[0][2] == "de" assert exports[0][3] == "pending"