""" Tests for the billing event hook system. """ import pytest from beanflows.billing.routes import _billing_hooks, _fire_hooks, on_billing_event @pytest.fixture(autouse=True) def clear_hooks(): """Ensure hooks are clean before and after each test.""" _billing_hooks.clear() yield _billing_hooks.clear() # ════════════════════════════════════════════════════════════ # Registration # ════════════════════════════════════════════════════════════ class TestOnBillingEvent: def test_registers_single_event(self): @on_billing_event("subscription.activated") async def my_hook(event_type, data): pass assert "subscription.activated" in _billing_hooks assert my_hook in _billing_hooks["subscription.activated"] def test_registers_multiple_events(self): @on_billing_event("subscription.activated", "subscription.updated") async def my_hook(event_type, data): pass assert my_hook in _billing_hooks["subscription.activated"] assert my_hook in _billing_hooks["subscription.updated"] def test_multiple_hooks_per_event(self): @on_billing_event("subscription.activated") async def hook_a(event_type, data): pass @on_billing_event("subscription.activated") async def hook_b(event_type, data): pass assert len(_billing_hooks["subscription.activated"]) == 2 def test_decorator_returns_original_function(self): @on_billing_event("test_event") async def my_hook(event_type, data): pass assert my_hook.__name__ == "my_hook" # ════════════════════════════════════════════════════════════ # Firing # ════════════════════════════════════════════════════════════ class TestFireHooks: async def test_fires_registered_hook(self): calls = [] @on_billing_event("subscription.activated") async def recorder(event_type, data): calls.append((event_type, data)) await _fire_hooks("subscription.activated", {"id": "sub_123"}) assert len(calls) == 1 assert calls[0] == ("subscription.activated", {"id": "sub_123"}) async def test_no_hooks_registered_is_noop(self): # Should not raise await _fire_hooks("unregistered_event", {"id": "sub_123"}) async def test_fires_all_hooks_for_event(self): calls = [] @on_billing_event("subscription.activated") async def hook_a(event_type, data): calls.append("a") @on_billing_event("subscription.activated") async def hook_b(event_type, data): calls.append("b") await _fire_hooks("subscription.activated", {}) assert calls == ["a", "b"] # ════════════════════════════════════════════════════════════ # Error isolation # ════════════════════════════════════════════════════════════ class TestHookErrorIsolation: async def test_failing_hook_does_not_block_others(self): calls = [] @on_billing_event("subscription.activated") async def failing_hook(event_type, data): raise RuntimeError("boom") @on_billing_event("subscription.activated") async def good_hook(event_type, data): calls.append("ok") # Should not raise despite first hook failing await _fire_hooks("subscription.activated", {}) assert calls == ["ok"] async def test_failing_hook_is_logged(self, caplog): @on_billing_event("subscription.activated") async def bad_hook(event_type, data): raise ValueError("test error") import logging with caplog.at_level(logging.ERROR): await _fire_hooks("subscription.activated", {}) assert "bad_hook" in caplog.text assert "test error" in caplog.text