utcnow_iso() now produces 'YYYY-MM-DD HH:MM:SS' (space separator) matching
SQLite's datetime('now') so lexicographic comparisons like
'published_at <= datetime(now)' work correctly.
Also add `id DESC` tiebreaker to get_ledger() ORDER BY to preserve
insertion order when multiple credits are added within the same second.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
205 lines
6.7 KiB
Python
205 lines
6.7 KiB
Python
"""
|
|
Credit system: balance tracking, lead unlocking, and ledger management.
|
|
|
|
All balance mutations go through this module to keep credit_ledger (source of truth)
|
|
and suppliers.credit_balance (denormalized cache) in sync within a single transaction.
|
|
"""
|
|
|
|
from .core import execute, fetch_all, fetch_one, transaction, utcnow_iso
|
|
|
|
# Credit cost per heat tier
|
|
HEAT_CREDIT_COSTS = {"hot": 35, "warm": 20, "cool": 8}
|
|
|
|
# Monthly credits by supplier plan
|
|
PLAN_MONTHLY_CREDITS = {"growth": 30, "pro": 100}
|
|
|
|
# Credit pack prices (amount -> EUR cents, for reference)
|
|
CREDIT_PACKS = {25: 99, 50: 179, 100: 329, 250: 749}
|
|
|
|
|
|
class InsufficientCredits(Exception):
|
|
"""Raised when a supplier doesn't have enough credits."""
|
|
|
|
def __init__(self, balance: int, required: int):
|
|
self.balance = balance
|
|
self.required = required
|
|
super().__init__(f"Need {required} credits, have {balance}")
|
|
|
|
|
|
async def get_balance(supplier_id: int) -> int:
|
|
"""Get current credit balance for a supplier."""
|
|
row = await fetch_one(
|
|
"SELECT credit_balance FROM suppliers WHERE id = ?", (supplier_id,)
|
|
)
|
|
return row["credit_balance"] if row else 0
|
|
|
|
|
|
async def add_credits(
|
|
supplier_id: int,
|
|
amount: int,
|
|
event_type: str,
|
|
reference_id: int = None,
|
|
note: str = None,
|
|
) -> int:
|
|
"""Add credits to a supplier. Returns new balance."""
|
|
now = utcnow_iso()
|
|
async with transaction() as db:
|
|
row = await db.execute_fetchall(
|
|
"SELECT credit_balance FROM suppliers WHERE id = ?", (supplier_id,)
|
|
)
|
|
current = row[0][0] if row else 0
|
|
new_balance = current + amount
|
|
|
|
await db.execute(
|
|
"""INSERT INTO credit_ledger
|
|
(supplier_id, delta, balance_after, event_type, reference_id, note, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(supplier_id, amount, new_balance, event_type, reference_id, note, now),
|
|
)
|
|
await db.execute(
|
|
"UPDATE suppliers SET credit_balance = ? WHERE id = ?",
|
|
(new_balance, supplier_id),
|
|
)
|
|
return new_balance
|
|
|
|
|
|
async def spend_credits(
|
|
supplier_id: int,
|
|
amount: int,
|
|
event_type: str,
|
|
reference_id: int = None,
|
|
note: str = None,
|
|
) -> int:
|
|
"""Spend credits from a supplier. Returns new balance. Raises InsufficientCredits."""
|
|
now = utcnow_iso()
|
|
async with transaction() as db:
|
|
row = await db.execute_fetchall(
|
|
"SELECT credit_balance FROM suppliers WHERE id = ?", (supplier_id,)
|
|
)
|
|
current = row[0][0] if row else 0
|
|
|
|
if current < amount:
|
|
raise InsufficientCredits(current, amount)
|
|
|
|
new_balance = current - amount
|
|
await db.execute(
|
|
"""INSERT INTO credit_ledger
|
|
(supplier_id, delta, balance_after, event_type, reference_id, note, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(supplier_id, -amount, new_balance, event_type, reference_id, note, now),
|
|
)
|
|
await db.execute(
|
|
"UPDATE suppliers SET credit_balance = ? WHERE id = ?",
|
|
(new_balance, supplier_id),
|
|
)
|
|
return new_balance
|
|
|
|
|
|
async def already_unlocked(supplier_id: int, lead_id: int) -> bool:
|
|
"""Check if a supplier has already unlocked a lead."""
|
|
row = await fetch_one(
|
|
"SELECT 1 FROM lead_forwards WHERE supplier_id = ? AND lead_id = ?",
|
|
(supplier_id, lead_id),
|
|
)
|
|
return row is not None
|
|
|
|
|
|
async def unlock_lead(supplier_id: int, lead_id: int) -> dict:
|
|
"""Unlock a lead for a supplier. Atomic: check, spend, insert forward, increment unlock_count."""
|
|
if await already_unlocked(supplier_id, lead_id):
|
|
raise ValueError("Lead already unlocked by this supplier")
|
|
|
|
lead = await fetch_one("SELECT * FROM lead_requests WHERE id = ?", (lead_id,))
|
|
if not lead:
|
|
raise ValueError("Lead not found")
|
|
|
|
cost = lead["credit_cost"] or compute_credit_cost(lead)
|
|
now = utcnow_iso()
|
|
|
|
async with transaction() as db:
|
|
# Check balance
|
|
row = await db.execute_fetchall(
|
|
"SELECT credit_balance FROM suppliers WHERE id = ?", (supplier_id,)
|
|
)
|
|
current = row[0][0] if row else 0
|
|
if current < cost:
|
|
raise InsufficientCredits(current, cost)
|
|
|
|
new_balance = current - cost
|
|
|
|
# Insert lead forward
|
|
cursor = await db.execute(
|
|
"""INSERT INTO lead_forwards (lead_id, supplier_id, credit_cost, created_at)
|
|
VALUES (?, ?, ?, ?)""",
|
|
(lead_id, supplier_id, cost, now),
|
|
)
|
|
forward_id = cursor.lastrowid
|
|
|
|
# Record in ledger
|
|
await db.execute(
|
|
"""INSERT INTO credit_ledger
|
|
(supplier_id, delta, balance_after, event_type, reference_id, note, created_at)
|
|
VALUES (?, ?, ?, 'lead_unlock', ?, ?, ?)""",
|
|
(supplier_id, -cost, new_balance, forward_id,
|
|
f"Unlocked lead #{lead_id}", now),
|
|
)
|
|
|
|
# Update supplier balance
|
|
await db.execute(
|
|
"UPDATE suppliers SET credit_balance = ? WHERE id = ?",
|
|
(new_balance, supplier_id),
|
|
)
|
|
|
|
# Increment unlock count on lead
|
|
await db.execute(
|
|
"UPDATE lead_requests SET unlock_count = unlock_count + 1 WHERE id = ?",
|
|
(lead_id,),
|
|
)
|
|
|
|
return {
|
|
"forward_id": forward_id,
|
|
"credit_cost": cost,
|
|
"new_balance": new_balance,
|
|
"lead": dict(lead),
|
|
}
|
|
|
|
|
|
def compute_credit_cost(lead: dict) -> int:
|
|
"""Compute credit cost from lead heat score."""
|
|
heat = (lead.get("heat_score") or "cool").lower()
|
|
return HEAT_CREDIT_COSTS.get(heat, HEAT_CREDIT_COSTS["cool"])
|
|
|
|
|
|
async def monthly_credit_refill(supplier_id: int) -> int:
|
|
"""Refill monthly credits for a supplier. Returns new balance."""
|
|
row = await fetch_one(
|
|
"SELECT monthly_credits, tier FROM suppliers WHERE id = ?", (supplier_id,)
|
|
)
|
|
if not row or not row["monthly_credits"]:
|
|
return 0
|
|
|
|
now = utcnow_iso()
|
|
new_balance = await add_credits(
|
|
supplier_id,
|
|
row["monthly_credits"],
|
|
"monthly_allocation",
|
|
note=f"Monthly refill ({row['tier']} plan)",
|
|
)
|
|
await execute(
|
|
"UPDATE suppliers SET last_credit_refill = ? WHERE id = ?",
|
|
(now, supplier_id),
|
|
)
|
|
return new_balance
|
|
|
|
|
|
async def get_ledger(supplier_id: int, limit: int = 50) -> list[dict]:
|
|
"""Get credit ledger entries for a supplier."""
|
|
return await fetch_all(
|
|
"""SELECT cl.*, lf.lead_id
|
|
FROM credit_ledger cl
|
|
LEFT JOIN lead_forwards lf ON cl.reference_id = lf.id AND cl.event_type = 'lead_unlock'
|
|
WHERE cl.supplier_id = ?
|
|
ORDER BY cl.created_at DESC, cl.id DESC LIMIT ?""",
|
|
(supplier_id, limit),
|
|
)
|