feat(marketplace): lead matching notifications + weekly digest + CTA tracking

- notify_matching_suppliers task: on lead verification, finds growth/pro
  suppliers whose service_area matches the lead country and sends an
  instant alert email (LIMIT 20 suppliers per lead)
- send_weekly_lead_digest task: every Monday 08:00 UTC, sends paid
  suppliers a table of new matching leads from the past 7 days they
  haven't seen yet (LIMIT 5 per supplier)
- One-click CTA token: forward emails now include a "Mark as contacted"
  footer link; clicking sets forward status to 'contacted' immediately
- cta_token stored on lead_forwards after email send
- Supplier lead_respond endpoint: HTMX status update for forwarded leads
  (sent / viewed / contacted / quoted / won / lost / no_response)
- Supplier lead_cta_contacted endpoint: handles one-click email CTA,
  redirects to dashboard leads tab
- leads/routes.py: enqueue notify_matching_suppliers on quote verification

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-25 09:31:23 +01:00
parent c84a5ffdd1
commit 7af612504b
3 changed files with 225 additions and 3 deletions

View File

@@ -556,6 +556,7 @@ async def verify_quote():
from ..worker import enqueue from ..worker import enqueue
await enqueue("send_welcome", {"email": contact_email, "lang": g.get("lang", "en")}) await enqueue("send_welcome", {"email": contact_email, "lang": g.get("lang", "en")})
await enqueue("notify_matching_suppliers", {"lead_id": lead["id"], "lang": g.get("lang", "en")})
return await render_template( return await render_template(
"quote_submitted.html", "quote_submitted.html",

View File

@@ -646,6 +646,57 @@ async def unlock_lead(token: str):
) )
FORWARD_STATUSES = ["sent", "viewed", "contacted", "quoted", "won", "lost", "no_response"]
@bp.route("/leads/<token>/respond", methods=["POST"])
@_lead_tier_required
@csrf_protect
async def lead_respond(token: str):
"""Update response status on a forwarded lead. HTMX or full-page."""
supplier = g.supplier
form = await request.form
new_status = form.get("status", "")
note = form.get("note", "").strip()
if new_status not in FORWARD_STATUSES:
return "Invalid status", 422
lead_row = await fetch_one(
"SELECT id FROM lead_requests WHERE token = ?", (token,)
)
if not lead_row:
return "Lead not found", 404
from ..core import utcnow_iso
await execute(
"""UPDATE lead_forwards
SET status = ?, supplier_note = ?, status_updated_at = ?
WHERE lead_id = ? AND supplier_id = ?""",
(new_status, note or None, utcnow_iso(), lead_row["id"], supplier["id"]),
)
return "", 204
@bp.route("/leads/cta/<cta_token>")
async def lead_cta_contacted(cta_token: str):
"""One-click CTA from forward email: mark as contacted, redirect to dashboard."""
row = await fetch_one(
"SELECT id, lead_id, supplier_id, status FROM lead_forwards WHERE cta_token = ?",
(cta_token,),
)
if not row:
return redirect(url_for("suppliers.dashboard"))
if row["status"] == "sent":
from ..core import utcnow_iso
await execute(
"UPDATE lead_forwards SET status = 'contacted', status_updated_at = ? WHERE id = ?",
(utcnow_iso(), row["id"]),
)
return redirect(url_for("suppliers.dashboard") + "?tab=leads")
# ============================================================================= # =============================================================================
# Supplier Dashboard # Supplier Dashboard
# ============================================================================= # =============================================================================

View File

@@ -5,6 +5,7 @@ Background task worker - SQLite-based queue (no Redis needed).
import asyncio import asyncio
import json import json
import logging import logging
import secrets
import traceback import traceback
from datetime import datetime, timedelta from datetime import datetime, timedelta
@@ -498,6 +499,15 @@ async def handle_send_lead_forward_email(payload: dict) -> None:
logger.warning("No email for supplier %s, skipping lead forward", supplier_id) logger.warning("No email for supplier %s, skipping lead forward", supplier_id)
return return
# Generate one-click "I've contacted this lead" CTA token
cta_token = secrets.token_urlsafe(24)
cta_url = f"{config.BASE_URL}/suppliers/leads/cta/{cta_token}"
body += (
f'<p style="font-size:12px;color:#94A3B8;text-align:center;margin:16px 0 0;">'
f'<a href="{cta_url}" style="color:#94A3B8;">'
f'&#10003; Mark as contacted</a></p>'
)
await send_email( await send_email(
to=to_email, to=to_email,
subject=subject, subject=subject,
@@ -506,11 +516,11 @@ async def handle_send_lead_forward_email(payload: dict) -> None:
email_type="lead_forward", email_type="lead_forward",
) )
# Update email_sent_at on lead_forward # Update email_sent_at and store cta_token on lead_forward
now = utcnow_iso() now = utcnow_iso()
await execute( await execute(
"UPDATE lead_forwards SET email_sent_at = ? WHERE lead_id = ? AND supplier_id = ?", "UPDATE lead_forwards SET email_sent_at = ?, cta_token = ? WHERE lead_id = ? AND supplier_id = ?",
(now, lead_id, supplier_id), (now, cta_token, lead_id, supplier_id),
) )
@@ -550,6 +560,159 @@ async def handle_send_lead_matched_notification(payload: dict) -> None:
) )
@task("notify_matching_suppliers")
async def handle_notify_matching_suppliers(payload: dict) -> None:
"""Notify growth/pro suppliers whose service_area matches a newly verified lead."""
lead_id = payload["lead_id"]
lang = payload.get("lang", "en")
lead = await fetch_one(
"SELECT * FROM lead_requests WHERE id = ? AND status = 'new' AND verified_at IS NOT NULL",
(lead_id,),
)
if not lead or not lead.get("country"):
return
country = lead["country"]
heat = (lead["heat_score"] or "cool").upper()
# Find matching suppliers: paid tier, have credits, service_area includes lead country
# service_area is comma-separated country codes (e.g. "DE,AT,CH")
matching = await fetch_all(
"""SELECT id, name, contact_email, contact, tier
FROM suppliers
WHERE tier IN ('growth', 'pro')
AND credit_balance > 0
AND (service_area = ? OR service_area LIKE ? OR service_area LIKE ? OR service_area LIKE ?)
LIMIT 20""",
(country, f"{country},%", f"%,{country}", f"%,{country},%"),
)
if not matching:
return
courts = lead["court_count"] or "?"
timeline = lead["timeline"] or ""
facility_type = lead["facility_type"] or "padel"
for supplier in matching:
to_email = supplier.get("contact_email") or supplier.get("contact") or ""
if not to_email:
continue
body = (
f'<h2 style="margin:0 0 8px;color:#0F172A;font-size:18px;">'
f'New [{heat}] lead in {country}</h2>'
f'<hr style="border:none;border-top:1px solid #E2E8F0;margin:0 0 16px;">'
f'<p style="font-size:14px;color:#334155;">A new project brief has been submitted that matches your service area.</p>'
f'<table cellpadding="0" cellspacing="0" style="margin-bottom:20px">'
f'<tr><td style="padding:4px 12px 4px 0;color:#94A3B8;font-size:13px">Facility</td>'
f'<td style="font-size:13px;color:#1E293B">{facility_type}</td></tr>'
f'<tr><td style="padding:4px 12px 4px 0;color:#94A3B8;font-size:13px">Courts</td>'
f'<td style="font-size:13px;color:#1E293B">{courts}</td></tr>'
f'<tr><td style="padding:4px 12px 4px 0;color:#94A3B8;font-size:13px">Country</td>'
f'<td style="font-size:13px;color:#1E293B">{country}</td></tr>'
f'<tr><td style="padding:4px 12px 4px 0;color:#94A3B8;font-size:13px">Timeline</td>'
f'<td style="font-size:13px;color:#1E293B">{timeline or "-"}</td></tr>'
f'</table>'
f'<p style="font-size:13px;color:#64748B;">'
f'Contact details are available after unlocking. Credits required: {lead.get("credit_cost", "?")}.</p>'
f'{_email_button(f"{config.BASE_URL}/suppliers/leads", "View lead feed")}'
)
await send_email(
to=to_email,
subject=f"[{heat}] New {facility_type} project in {country}{courts} courts",
html=_email_wrap(body, lang, preheader=f"New matching lead in {country}"),
from_addr=EMAIL_ADDRESSES["leads"],
email_type="lead_match_notify",
)
@task("send_weekly_lead_digest")
async def handle_send_weekly_lead_digest(payload: dict) -> None:
"""Weekly digest to active suppliers: new matching leads in their area."""
# Find paid suppliers with credits
active_suppliers = await fetch_all(
"SELECT id, name, service_area, contact_email, contact FROM suppliers WHERE tier IN ('growth','pro') AND credit_balance > 0"
)
for supplier in active_suppliers:
to_email = supplier.get("contact_email") or supplier.get("contact") or ""
if not to_email:
continue
service_area_raw = (supplier.get("service_area") or "").strip()
if not service_area_raw:
continue
countries = [c.strip() for c in service_area_raw.split(",") if c.strip()]
if not countries:
continue
placeholders = ",".join("?" * len(countries))
new_leads = await fetch_all(
f"""SELECT id, heat_score, country, court_count, facility_type, timeline, credit_cost, created_at
FROM lead_requests
WHERE lead_type = 'quote' AND status = 'new' AND verified_at IS NOT NULL
AND country IN ({placeholders})
AND created_at >= datetime('now', '-7 days')
AND NOT EXISTS (
SELECT 1 FROM lead_forwards WHERE lead_id = lead_requests.id AND supplier_id = ?
)
ORDER BY
CASE heat_score WHEN 'hot' THEN 0 WHEN 'warm' THEN 1 ELSE 2 END,
created_at DESC
LIMIT 5""",
tuple(countries) + (supplier["id"],),
)
if not new_leads:
continue
lead_rows_html = ""
for ld in new_leads:
heat = (ld["heat_score"] or "cool").upper()
heat_colors = {"HOT": "#DC2626", "WARM": "#EA580C", "COOL": "#2563EB"}
hc = heat_colors.get(heat, "#2563EB")
badge = (
f'<span style="display:inline-block;padding:1px 6px;border-radius:4px;'
f'background-color:{hc};color:#fff;font-size:10px;font-weight:700">{heat}</span>'
)
lead_rows_html += (
f'<tr>'
f'<td style="padding:6px 12px 6px 0;font-size:13px;color:#1E293B">'
f'{badge} {ld["facility_type"] or "Padel"}, {ld["court_count"] or "?"} courts</td>'
f'<td style="padding:6px 12px 6px 0;font-size:13px;color:#64748B">{ld["country"] or "-"}</td>'
f'<td style="padding:6px 0;font-size:13px;color:#64748B">{ld["timeline"] or "-"}</td>'
f'</tr>'
)
body = (
f'<h2 style="margin:0 0 8px;color:#0F172A;font-size:18px;">'
f'Your weekly lead digest — {len(new_leads)} new {"lead" if len(new_leads) == 1 else "leads"}</h2>'
f'<hr style="border:none;border-top:1px solid #E2E8F0;margin:0 0 16px;">'
f'<p style="font-size:14px;color:#334155;">New matching leads in your service area this week:</p>'
f'<table cellpadding="0" cellspacing="0" style="margin-bottom:20px;width:100%">'
f'<thead><tr>'
f'<th style="text-align:left;font-size:11px;color:#94A3B8;padding:0 12px 6px 0;text-transform:uppercase">Project</th>'
f'<th style="text-align:left;font-size:11px;color:#94A3B8;padding:0 12px 6px 0;text-transform:uppercase">Country</th>'
f'<th style="text-align:left;font-size:11px;color:#94A3B8;text-transform:uppercase">Timeline</th>'
f'</tr></thead>'
f'<tbody>{lead_rows_html}</tbody>'
f'</table>'
f'{_email_button(f"{config.BASE_URL}/suppliers/leads", "Unlock leads →")}'
)
area_summary = ", ".join(countries[:3])
if len(countries) > 3:
area_summary += f" +{len(countries) - 3}"
await send_email(
to=to_email,
subject=f"{len(new_leads)} new padel {'lead' if len(new_leads) == 1 else 'leads'} in {area_summary}",
html=_email_wrap(body, "en", preheader=f"{len(new_leads)} new leads matching your service area"),
from_addr=EMAIL_ADDRESSES["leads"],
email_type="weekly_digest",
)
@task("send_supplier_enquiry_email") @task("send_supplier_enquiry_email")
async def handle_send_supplier_enquiry_email(payload: dict) -> None: async def handle_send_supplier_enquiry_email(payload: dict) -> None:
"""Relay a directory enquiry form submission to the supplier's contact email.""" """Relay a directory enquiry form submission to the supplier's contact email."""
@@ -823,6 +986,7 @@ async def run_scheduler() -> None:
last_credit_refill = None last_credit_refill = None
last_seo_sync_date = None last_seo_sync_date = None
last_weekly_digest = None
while True: while True:
try: try:
@@ -850,6 +1014,12 @@ async def run_scheduler() -> None:
last_seo_sync_date = today_date last_seo_sync_date = today_date
scheduler_logger.info("Queued SEO metric syncs for %s", today_date) scheduler_logger.info("Queued SEO metric syncs for %s", today_date)
# Weekly lead digest — every Monday after 8am UTC
if today.weekday() == 0 and today.hour >= 8 and last_weekly_digest != today_date:
await enqueue("send_weekly_lead_digest", {})
last_weekly_digest = today_date
scheduler_logger.info("Queued weekly lead digest for %s", today_date)
await asyncio.sleep(3600) # 1 hour await asyncio.sleep(3600) # 1 hour
except Exception as e: except Exception as e: