Files
beanflows/web/tests/test_billing_hooks.py
Deeman 42c1309b20
Some checks failed
CI / test-cli (push) Successful in 11s
CI / test-sqlmesh (push) Successful in 12s
CI / test-web (push) Failing after 14s
CI / tag (push) Has been skipped
chore: add pre-commit ruff hook with auto-fix
- scripts/hooks/pre-commit: runs ruff --fix for root and web/ (matching CI)
  and re-stages any auto-fixed files so they land in the commit
- Makefile: add install-hooks target (run once after clone)
- pyproject.toml: exclude web/ from root ruff (web has its own config)
- Fix remaining import sort warnings caught by the new hook

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 10:19:29 +01:00

122 lines
4.4 KiB
Python

"""
Tests for the billing event hook system.
"""
import pytest
from beanflows.billing.routes import _billing_hooks, _fire_hooks, on_billing_event
@pytest.fixture(autouse=True)
def clear_hooks():
"""Ensure hooks are clean before and after each test."""
_billing_hooks.clear()
yield
_billing_hooks.clear()
# ════════════════════════════════════════════════════════════
# Registration
# ════════════════════════════════════════════════════════════
class TestOnBillingEvent:
def test_registers_single_event(self):
@on_billing_event("subscription.activated")
async def my_hook(event_type, data):
pass
assert "subscription.activated" in _billing_hooks
assert my_hook in _billing_hooks["subscription.activated"]
def test_registers_multiple_events(self):
@on_billing_event("subscription.activated", "subscription.updated")
async def my_hook(event_type, data):
pass
assert my_hook in _billing_hooks["subscription.activated"]
assert my_hook in _billing_hooks["subscription.updated"]
def test_multiple_hooks_per_event(self):
@on_billing_event("subscription.activated")
async def hook_a(event_type, data):
pass
@on_billing_event("subscription.activated")
async def hook_b(event_type, data):
pass
assert len(_billing_hooks["subscription.activated"]) == 2
def test_decorator_returns_original_function(self):
@on_billing_event("test_event")
async def my_hook(event_type, data):
pass
assert my_hook.__name__ == "my_hook"
# ════════════════════════════════════════════════════════════
# Firing
# ════════════════════════════════════════════════════════════
class TestFireHooks:
async def test_fires_registered_hook(self):
calls = []
@on_billing_event("subscription.activated")
async def recorder(event_type, data):
calls.append((event_type, data))
await _fire_hooks("subscription.activated", {"id": "sub_123"})
assert len(calls) == 1
assert calls[0] == ("subscription.activated", {"id": "sub_123"})
async def test_no_hooks_registered_is_noop(self):
# Should not raise
await _fire_hooks("unregistered_event", {"id": "sub_123"})
async def test_fires_all_hooks_for_event(self):
calls = []
@on_billing_event("subscription.activated")
async def hook_a(event_type, data):
calls.append("a")
@on_billing_event("subscription.activated")
async def hook_b(event_type, data):
calls.append("b")
await _fire_hooks("subscription.activated", {})
assert calls == ["a", "b"]
# ════════════════════════════════════════════════════════════
# Error isolation
# ════════════════════════════════════════════════════════════
class TestHookErrorIsolation:
async def test_failing_hook_does_not_block_others(self):
calls = []
@on_billing_event("subscription.activated")
async def failing_hook(event_type, data):
raise RuntimeError("boom")
@on_billing_event("subscription.activated")
async def good_hook(event_type, data):
calls.append("ok")
# Should not raise despite first hook failing
await _fire_hooks("subscription.activated", {})
assert calls == ["ok"]
async def test_failing_hook_is_logged(self, caplog):
@on_billing_event("subscription.activated")
async def bad_hook(event_type, data):
raise ValueError("test error")
import logging
with caplog.at_level(logging.ERROR):
await _fire_hooks("subscription.activated", {})
assert "bad_hook" in caplog.text
assert "test error" in caplog.text