- Accept RBAC system: user_roles table, role_required decorator, grant_role/revoke_role/ensure_admin_role functions - Accept improved billing architecture: billing_customers table separation, provider-agnostic naming - Accept enhanced user loading with subscription/roles eager loading in app.py - Accept improved email templates with branded styling - Accept new infrastructure: migration tracking, transaction logging, A/B testing - Accept template improvements: Resend SDK, Tailwind build stage, UMAMI analytics config - Keep beanflows-specific configs: BASE_URL 5001, coffee PLAN_FEATURES/PLAN_LIMITS - Keep beanflows analytics integration and DuckDB health check - Add new test files and utility scripts from template Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com>
123 lines
4.5 KiB
Python
123 lines
4.5 KiB
Python
"""
|
|
Tests for the billing event hook system.
|
|
"""
|
|
import pytest
|
|
|
|
from beanflows.billing.routes import _billing_hooks, _fire_hooks, on_billing_event
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def clear_hooks():
|
|
"""Ensure hooks are clean before and after each test."""
|
|
_billing_hooks.clear()
|
|
yield
|
|
_billing_hooks.clear()
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# Registration
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestOnBillingEvent:
|
|
def test_registers_single_event(self):
|
|
@on_billing_event("subscription.activated")
|
|
async def my_hook(event_type, data):
|
|
pass
|
|
|
|
assert "subscription.activated" in _billing_hooks
|
|
assert my_hook in _billing_hooks["subscription.activated"]
|
|
|
|
def test_registers_multiple_events(self):
|
|
@on_billing_event("subscription.activated", "subscription.updated")
|
|
async def my_hook(event_type, data):
|
|
pass
|
|
|
|
assert my_hook in _billing_hooks["subscription.activated"]
|
|
assert my_hook in _billing_hooks["subscription.updated"]
|
|
|
|
def test_multiple_hooks_per_event(self):
|
|
@on_billing_event("subscription.activated")
|
|
async def hook_a(event_type, data):
|
|
pass
|
|
|
|
@on_billing_event("subscription.activated")
|
|
async def hook_b(event_type, data):
|
|
pass
|
|
|
|
assert len(_billing_hooks["subscription.activated"]) == 2
|
|
|
|
def test_decorator_returns_original_function(self):
|
|
@on_billing_event("test_event")
|
|
async def my_hook(event_type, data):
|
|
pass
|
|
|
|
assert my_hook.__name__ == "my_hook"
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# Firing
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestFireHooks:
|
|
async def test_fires_registered_hook(self):
|
|
calls = []
|
|
|
|
@on_billing_event("subscription.activated")
|
|
async def recorder(event_type, data):
|
|
calls.append((event_type, data))
|
|
|
|
await _fire_hooks("subscription.activated", {"id": "sub_123"})
|
|
assert len(calls) == 1
|
|
assert calls[0] == ("subscription.activated", {"id": "sub_123"})
|
|
|
|
async def test_no_hooks_registered_is_noop(self):
|
|
# Should not raise
|
|
await _fire_hooks("unregistered_event", {"id": "sub_123"})
|
|
|
|
async def test_fires_all_hooks_for_event(self):
|
|
calls = []
|
|
|
|
@on_billing_event("subscription.activated")
|
|
async def hook_a(event_type, data):
|
|
calls.append("a")
|
|
|
|
@on_billing_event("subscription.activated")
|
|
async def hook_b(event_type, data):
|
|
calls.append("b")
|
|
|
|
await _fire_hooks("subscription.activated", {})
|
|
assert calls == ["a", "b"]
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# Error isolation
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestHookErrorIsolation:
|
|
async def test_failing_hook_does_not_block_others(self):
|
|
calls = []
|
|
|
|
@on_billing_event("subscription.activated")
|
|
async def failing_hook(event_type, data):
|
|
raise RuntimeError("boom")
|
|
|
|
@on_billing_event("subscription.activated")
|
|
async def good_hook(event_type, data):
|
|
calls.append("ok")
|
|
|
|
# Should not raise despite first hook failing
|
|
await _fire_hooks("subscription.activated", {})
|
|
assert calls == ["ok"]
|
|
|
|
async def test_failing_hook_is_logged(self, caplog):
|
|
@on_billing_event("subscription.activated")
|
|
async def bad_hook(event_type, data):
|
|
raise ValueError("test error")
|
|
|
|
import logging
|
|
with caplog.at_level(logging.ERROR):
|
|
await _fire_hooks("subscription.activated", {})
|
|
|
|
assert "bad_hook" in caplog.text
|
|
assert "test error" in caplog.text
|