""" E2E test for checkout.session.completed webhook → transaction.completed handler. Tests credit packs, sticky boosts, and business plan PDF purchases by: 1. Constructing realistic checkout.session.completed payloads with our real price IDs 2. Signing them with the active webhook secret 3. POSTing to the running dev server 4. Verifying DB state changes (credit_balance, supplier_boosts, business_plan_exports) Prerequisites: - ngrok + webhook endpoint registered (stripe_e2e_setup.py) - Dev server running with webhook secret loaded - Stripe products synced (setup_stripe --sync) Run: uv run python scripts/stripe_e2e_checkout_test.py """ import hashlib import hmac import json import os import sqlite3 import subprocess import sys import time from dotenv import load_dotenv load_dotenv(override=True) DATABASE_PATH = os.getenv("DATABASE_PATH", "data/app.db") WEBHOOK_SECRET = os.getenv("STRIPE_WEBHOOK_SECRET", "") SERVER_URL = "http://localhost:5000" WEBHOOK_URL = f"{SERVER_URL}/billing/webhook/stripe" assert WEBHOOK_SECRET, "STRIPE_WEBHOOK_SECRET not set — run stripe_e2e_setup.py" passed = 0 failed = 0 errors = [] def ok(msg): global passed passed += 1 print(f" \u2713 {msg}") def fail(msg): global failed failed += 1 errors.append(msg) print(f" \u2717 {msg}") def section(title): print(f"\n{'─' * 60}") print(f" {title}") print(f"{'─' * 60}") def query_db(sql, params=()): conn = sqlite3.connect(f"file:{DATABASE_PATH}?mode=ro", uri=True) conn.row_factory = sqlite3.Row try: return [dict(r) for r in conn.execute(sql, params).fetchall()] finally: conn.close() def sign_stripe_payload(payload_bytes: bytes, secret: str) -> str: """Create a valid Stripe-Signature header.""" timestamp = str(int(time.time())) signed_payload = f"{timestamp}.{payload_bytes.decode()}" sig = hmac.new( secret.encode(), signed_payload.encode(), hashlib.sha256 ).hexdigest() return f"t={timestamp},v1={sig}" def post_webhook(event_type: str, obj: dict) -> int: """Post a signed webhook to the server. Returns HTTP status code.""" payload = json.dumps({ "id": f"evt_test_{int(time.time()*1000)}", "type": event_type, "data": {"object": obj}, }).encode() sig = sign_stripe_payload(payload, WEBHOOK_SECRET) result = subprocess.run( ["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", "-X", "POST", "-H", "Content-Type: application/json", "-H", f"Stripe-Signature: {sig}", "--data-binary", "@-", WEBHOOK_URL], input=payload.decode(), capture_output=True, text=True, timeout=10, ) return int(result.stdout.strip()) # ─── Preflight ──────────────────────────────────────────── section("Preflight") # Server up result = subprocess.run( ["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", f"{SERVER_URL}/"], capture_output=True, text=True, timeout=5, ) assert result.stdout.strip() in ("200", "301"), f"Server down ({result.stdout})" ok("Dev server running") # Webhook active result = subprocess.run( ["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", "-X", "POST", "-H", "Content-Type: application/json", "-d", "{}", WEBHOOK_URL], capture_output=True, text=True, timeout=5, ) assert result.stdout.strip() == "400", f"Webhook returns {result.stdout} (expected 400)" ok("Webhook signature check active") # Load price IDs products = query_db("SELECT key, provider_price_id FROM payment_products WHERE provider = 'stripe'") price_map = {p["key"]: p["provider_price_id"] for p in products} ok(f"Loaded {len(price_map)} products") # Test data users = query_db("SELECT id, email FROM users LIMIT 5") test_user = users[0] ok(f"User: {test_user['email']} (id={test_user['id']})") suppliers = query_db("SELECT id, name, credit_balance FROM suppliers WHERE claimed_by IS NOT NULL LIMIT 1") assert suppliers, "No claimed supplier found" test_supplier = suppliers[0] initial_balance = test_supplier["credit_balance"] ok(f"Supplier: {test_supplier['name']} (id={test_supplier['id']}, balance={initial_balance})") # ═══════════════════════════════════════════════════════════ # Test 1: Credit Pack purchases (all 4 sizes) # ═══════════════════════════════════════════════════════════ section("1. Credit Pack purchases via checkout.session.completed") credit_packs = [ ("credits_25", 25), ("credits_50", 50), ("credits_100", 100), ("credits_250", 250), ] running_balance = initial_balance for key, amount in credit_packs: price_id = price_map.get(key) if not price_id: fail(f"{key}: price not found") continue status = post_webhook("checkout.session.completed", { "id": f"cs_test_{key}_{int(time.time())}", "mode": "payment", "customer": "cus_test_credits", "metadata": { "user_id": str(test_user["id"]), "supplier_id": str(test_supplier["id"]), "plan": key, }, "line_items": {"data": [{"price": {"id": price_id}, "quantity": 1}]}, }) if status == 200: ok(f"{key}: webhook accepted (HTTP 200)") else: fail(f"{key}: webhook returned HTTP {status}") continue # Wait and check balance time.sleep(2) rows = query_db("SELECT credit_balance FROM suppliers WHERE id = ?", (test_supplier["id"],)) new_balance = rows[0]["credit_balance"] if rows else -1 expected = running_balance + amount if new_balance == expected: ok(f"{key}: balance {running_balance} → {new_balance} (+{amount})") running_balance = new_balance else: fail(f"{key}: balance {new_balance}, expected {expected}") running_balance = new_balance # update anyway for next test # Check ledger entries ledger = query_db( "SELECT * FROM credit_ledger WHERE supplier_id = ? AND event_type = 'pack_purchase' ORDER BY id DESC LIMIT 4", (test_supplier["id"],), ) if len(ledger) >= 4: ok(f"Credit ledger: {len(ledger)} pack_purchase entries") else: fail(f"Credit ledger: only {len(ledger)} entries (expected 4)") # ═══════════════════════════════════════════════════════════ # Test 2: Sticky Boost purchases # ═══════════════════════════════════════════════════════════ section("2. Sticky boost purchases") # 2a. Sticky Week price_id = price_map.get("boost_sticky_week") if price_id: status = post_webhook("checkout.session.completed", { "id": f"cs_test_sticky_week_{int(time.time())}", "mode": "payment", "customer": "cus_test_sticky", "metadata": { "user_id": str(test_user["id"]), "supplier_id": str(test_supplier["id"]), "plan": "boost_sticky_week", "sticky_country": "DE", }, "line_items": {"data": [{"price": {"id": price_id}, "quantity": 1}]}, }) if status == 200: ok("boost_sticky_week: webhook accepted") else: fail(f"boost_sticky_week: HTTP {status}") time.sleep(2) # Check supplier_boosts boosts = query_db( "SELECT * FROM supplier_boosts WHERE supplier_id = ? AND boost_type = 'sticky_week' ORDER BY id DESC LIMIT 1", (test_supplier["id"],), ) if boosts: b = boosts[0] ok(f"supplier_boosts row: type=sticky_week, status={b['status']}") if b.get("expires_at"): ok(f"expires_at set: {b['expires_at']}") else: fail("expires_at is NULL") else: fail("No supplier_boosts row for sticky_week") # Check suppliers.sticky_until sup = query_db("SELECT sticky_until, sticky_country FROM suppliers WHERE id = ?", (test_supplier["id"],)) if sup and sup[0]["sticky_until"]: ok(f"sticky_until set: {sup[0]['sticky_until']}") else: fail("sticky_until not set") if sup and sup[0]["sticky_country"] == "DE": ok("sticky_country=DE") else: fail(f"sticky_country={sup[0]['sticky_country'] if sup else '?'}") else: fail("boost_sticky_week price not found") # 2b. Sticky Month price_id = price_map.get("boost_sticky_month") if price_id: # Reset sticky fields conn = sqlite3.connect(DATABASE_PATH) conn.execute("UPDATE suppliers SET sticky_until=NULL, sticky_country=NULL WHERE id=?", (test_supplier["id"],)) conn.commit() conn.close() status = post_webhook("checkout.session.completed", { "id": f"cs_test_sticky_month_{int(time.time())}", "mode": "payment", "customer": "cus_test_sticky", "metadata": { "user_id": str(test_user["id"]), "supplier_id": str(test_supplier["id"]), "plan": "boost_sticky_month", "sticky_country": "ES", }, "line_items": {"data": [{"price": {"id": price_id}, "quantity": 1}]}, }) if status == 200: ok("boost_sticky_month: webhook accepted") else: fail(f"boost_sticky_month: HTTP {status}") time.sleep(2) boosts = query_db( "SELECT * FROM supplier_boosts WHERE supplier_id = ? AND boost_type = 'sticky_month' ORDER BY id DESC LIMIT 1", (test_supplier["id"],), ) if boosts: ok(f"supplier_boosts row: type=sticky_month, expires_at={boosts[0].get('expires_at', '?')[:10]}") else: fail("No supplier_boosts row for sticky_month") sup = query_db("SELECT sticky_until, sticky_country FROM suppliers WHERE id = ?", (test_supplier["id"],)) if sup and sup[0]["sticky_country"] == "ES": ok("sticky_country=ES (month)") else: fail(f"sticky_country wrong: {sup[0] if sup else '?'}") else: fail("boost_sticky_month price not found") # ═══════════════════════════════════════════════════════════ # Test 3: Business Plan PDF purchase # ═══════════════════════════════════════════════════════════ section("3. Business Plan PDF purchase") price_id = price_map.get("business_plan") if price_id: # Create a scenario for the user first conn = sqlite3.connect(DATABASE_PATH) conn.execute( "INSERT INTO scenarios (user_id, name, state_json, created_at) VALUES (?, 'Test', '{}', datetime('now'))", (test_user["id"],), ) conn.commit() scenario_row = conn.execute("SELECT id FROM scenarios WHERE user_id = ? ORDER BY id DESC LIMIT 1", (test_user["id"],)).fetchone() scenario_id = scenario_row[0] if scenario_row else 0 conn.close() ok(f"Created test scenario: id={scenario_id}") status = post_webhook("checkout.session.completed", { "id": f"cs_test_bp_{int(time.time())}", "mode": "payment", "customer": "cus_test_bp", "metadata": { "user_id": str(test_user["id"]), "plan": "business_plan", "scenario_id": str(scenario_id), "language": "de", }, "line_items": {"data": [{"price": {"id": price_id}, "quantity": 1}]}, }) if status == 200: ok("business_plan: webhook accepted") else: fail(f"business_plan: HTTP {status}") time.sleep(2) # Check business_plan_exports exports = query_db( "SELECT * FROM business_plan_exports WHERE user_id = ? ORDER BY id DESC LIMIT 1", (test_user["id"],), ) if exports: e = exports[0] ok(f"Export row: status={e['status']}, language={e['language']}") if e["status"] == "pending": ok("Status: pending (waiting for worker)") else: print(f" ? Status: {e['status']} (expected pending)") if e["language"] == "de": ok("Language: de") else: fail(f"Language: {e['language']} (expected de)") if e.get("token"): ok(f"Download token generated: {e['token'][:10]}...") else: fail("No download token") if e.get("scenario_id") == scenario_id: ok(f"Scenario ID matches: {scenario_id}") else: fail(f"Scenario ID: {e.get('scenario_id')} (expected {scenario_id})") else: fail("No business_plan_exports row created") else: fail("business_plan price not found") # ═══════════════════════════════════════════════════════════ # Test 4: Edge cases # ═══════════════════════════════════════════════════════════ section("4a. Edge: checkout.session.completed with unknown price_id") status = post_webhook("checkout.session.completed", { "id": "cs_test_unknown", "mode": "payment", "customer": "cus_test_unknown", "metadata": { "user_id": str(test_user["id"]), "supplier_id": str(test_supplier["id"]), "plan": "nonexistent_product", }, "line_items": {"data": [{"price": {"id": "price_nonexistent"}, "quantity": 1}]}, }) ok(f"Unknown price: HTTP {status} (no crash)") if status == 200 else fail(f"Unknown price: HTTP {status}") # Server alive? result = subprocess.run( ["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", f"{SERVER_URL}/"], capture_output=True, text=True, timeout=5, ) ok("Server alive after unknown price") if result.stdout.strip() in ("200", "301") else fail("Server crashed!") section("4b. Edge: checkout.session.completed with missing supplier_id (credit pack)") balance_before = query_db("SELECT credit_balance FROM suppliers WHERE id = ?", (test_supplier["id"],))[0]["credit_balance"] status = post_webhook("checkout.session.completed", { "id": "cs_test_no_supplier", "mode": "payment", "customer": "cus_test_nosup", "metadata": { "user_id": str(test_user["id"]), # NO supplier_id "plan": "credits_25", }, "line_items": {"data": [{"price": {"id": price_map["credits_25"]}, "quantity": 1}]}, }) ok(f"Missing supplier_id: HTTP {status} (no crash)") if status == 200 else fail(f"HTTP {status}") time.sleep(1) balance_after = query_db("SELECT credit_balance FROM suppliers WHERE id = ?", (test_supplier["id"],))[0]["credit_balance"] if balance_after == balance_before: ok("Balance unchanged (correctly skipped — no supplier_id)") else: fail(f"Balance changed: {balance_before} → {balance_after}") section("4c. Edge: checkout.session.completed with missing metadata") status = post_webhook("checkout.session.completed", { "id": "cs_test_no_meta", "mode": "payment", "customer": "cus_test_nometa", "metadata": {}, }) ok(f"Empty metadata: HTTP {status}") if status == 200 else fail(f"HTTP {status}") result = subprocess.run( ["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", f"{SERVER_URL}/"], capture_output=True, text=True, timeout=5, ) ok("Server alive after empty metadata") if result.stdout.strip() in ("200", "301") else fail("Server crashed!") section("4d. Edge: subscription mode checkout (not payment)") # checkout.session.completed with mode=subscription should create a subscription status = post_webhook("checkout.session.completed", { "id": "cs_test_sub_mode", "mode": "subscription", "customer": "cus_test_submode", "subscription": "sub_from_checkout_123", "metadata": { "user_id": str(test_user["id"]), "plan": "starter", }, }) ok(f"Subscription-mode checkout: HTTP {status}") if status == 200 else fail(f"HTTP {status}") # Note: this fires subscription.activated, but since we can't mock the Stripe API call # to fetch the subscription, it will log a warning and continue. That's fine. section("4e. Edge: sticky boost without sticky_country in metadata") price_id = price_map.get("boost_sticky_week") if price_id: # Reset sticky fields conn = sqlite3.connect(DATABASE_PATH) conn.execute("UPDATE suppliers SET sticky_until=NULL, sticky_country=NULL WHERE id=?", (test_supplier["id"],)) conn.commit() conn.close() status = post_webhook("checkout.session.completed", { "id": f"cs_test_no_country_{int(time.time())}", "mode": "payment", "customer": "cus_test_nocountry", "metadata": { "user_id": str(test_user["id"]), "supplier_id": str(test_supplier["id"]), "plan": "boost_sticky_week", # NO sticky_country }, "line_items": {"data": [{"price": {"id": price_id}, "quantity": 1}]}, }) ok(f"Missing sticky_country: HTTP {status}") if status == 200 else fail(f"HTTP {status}") time.sleep(2) sup = query_db("SELECT sticky_until, sticky_country FROM suppliers WHERE id = ?", (test_supplier["id"],)) if sup and sup[0]["sticky_until"]: ok(f"sticky_until still set (country defaults to empty: '{sup[0]['sticky_country']}')") else: fail("sticky boost not created without country") # ═══════════════════════════════════════════════════════════ # Test 5: Use stripe trigger for a real checkout.session.completed # ═══════════════════════════════════════════════════════════ section("5. stripe trigger checkout.session.completed (real Stripe event)") print(" Triggering real checkout.session.completed via Stripe CLI...") result = subprocess.run( ["stripe", "trigger", "checkout.session.completed"], capture_output=True, text=True, timeout=30, ) if result.returncode == 0: ok("stripe trigger succeeded") # Wait for webhook delivery via ngrok time.sleep(5) # Check ngrok for the delivery import urllib.request try: resp = urllib.request.urlopen("http://localhost:4040/api/requests/http?limit=5", timeout=5) reqs = json.loads(resp.read()) recent_webhooks = [ r for r in reqs.get("requests", []) if r.get("request", {}).get("uri") == "/billing/webhook/stripe" ] if recent_webhooks: latest = recent_webhooks[0] http_status = latest.get("response", {}).get("status_code") ok(f"Webhook delivered via ngrok: HTTP {http_status}") else: print(" (no webhook seen in ngrok — may have been delivered before log window)") ok("stripe trigger completed (webhook delivery not verified)") except Exception: ok("stripe trigger completed (ngrok API unavailable for verification)") else: fail(f"stripe trigger failed: {result.stderr[:100]}") # ═══════════════════════════════════════════════════════════ # Summary # ═══════════════════════════════════════════════════════════ section("RESULTS") total = passed + failed print(f"\n {passed}/{total} passed, {failed} failed\n") if errors: print(" Failures:") for err in errors: print(f" - {err}") print() sys.exit(1 if failed else 0)