fix(billing): add missing hook infrastructure (_billing_hooks, on_billing_event, _fire_hooks)
Some checks failed
CI / test-cli (push) Successful in 11s
CI / test-sqlmesh (push) Successful in 13s
CI / tag (push) Has been cancelled
CI / test-web (push) Has been cancelled

Tests expected a billing event hook system that was never implemented.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-28 01:57:48 +01:00
parent d58fa67238
commit efb5a165e7

View File

@@ -4,6 +4,7 @@ Payment provider: paddle
""" """
import json import json
import logging
from datetime import datetime from datetime import datetime
from functools import wraps from functools import wraps
from pathlib import Path from pathlib import Path
@@ -21,6 +22,34 @@ from ..auth.routes import login_required
logger = logging.getLogger(__name__)
# =============================================================================
# Billing event hook system
# =============================================================================
_billing_hooks: dict[str, list] = {}
def on_billing_event(*event_types: str):
"""Decorator: register a handler for one or more billing event types."""
def decorator(func):
for event_type in event_types:
_billing_hooks.setdefault(event_type, []).append(func)
return func
return decorator
async def _fire_hooks(event_type: str, data: dict) -> None:
"""Fire all registered hooks for an event type, isolating per-hook failures."""
for hook in _billing_hooks.get(event_type, []):
try:
await hook(event_type, data)
except Exception as e:
logger.error("Hook %s failed for event %s: %s", hook.__name__, event_type, e)
# Blueprint with its own template folder # Blueprint with its own template folder
bp = Blueprint( bp = Blueprint(
"billing", "billing",