From 7af612504b97dcd79e5e9f85cca29e82beeb9e1c Mon Sep 17 00:00:00 2001 From: Deeman Date: Wed, 25 Feb 2026 09:31:23 +0100 Subject: [PATCH] feat(marketplace): lead matching notifications + weekly digest + CTA tracking - notify_matching_suppliers task: on lead verification, finds growth/pro suppliers whose service_area matches the lead country and sends an instant alert email (LIMIT 20 suppliers per lead) - send_weekly_lead_digest task: every Monday 08:00 UTC, sends paid suppliers a table of new matching leads from the past 7 days they haven't seen yet (LIMIT 5 per supplier) - One-click CTA token: forward emails now include a "Mark as contacted" footer link; clicking sets forward status to 'contacted' immediately - cta_token stored on lead_forwards after email send - Supplier lead_respond endpoint: HTMX status update for forwarded leads (sent / viewed / contacted / quoted / won / lost / no_response) - Supplier lead_cta_contacted endpoint: handles one-click email CTA, redirects to dashboard leads tab - leads/routes.py: enqueue notify_matching_suppliers on quote verification Co-Authored-By: Claude Sonnet 4.6 --- web/src/padelnomics/leads/routes.py | 1 + web/src/padelnomics/suppliers/routes.py | 51 +++++++ web/src/padelnomics/worker.py | 176 +++++++++++++++++++++++- 3 files changed, 225 insertions(+), 3 deletions(-) diff --git a/web/src/padelnomics/leads/routes.py b/web/src/padelnomics/leads/routes.py index 265fc74..fb8cb04 100644 --- a/web/src/padelnomics/leads/routes.py +++ b/web/src/padelnomics/leads/routes.py @@ -556,6 +556,7 @@ async def verify_quote(): from ..worker import enqueue await enqueue("send_welcome", {"email": contact_email, "lang": g.get("lang", "en")}) + await enqueue("notify_matching_suppliers", {"lead_id": lead["id"], "lang": g.get("lang", "en")}) return await render_template( "quote_submitted.html", diff --git a/web/src/padelnomics/suppliers/routes.py b/web/src/padelnomics/suppliers/routes.py index 7846887..0ae31b2 100644 --- a/web/src/padelnomics/suppliers/routes.py +++ b/web/src/padelnomics/suppliers/routes.py @@ -646,6 +646,57 @@ async def unlock_lead(token: str): ) +FORWARD_STATUSES = ["sent", "viewed", "contacted", "quoted", "won", "lost", "no_response"] + + +@bp.route("/leads//respond", methods=["POST"]) +@_lead_tier_required +@csrf_protect +async def lead_respond(token: str): + """Update response status on a forwarded lead. HTMX or full-page.""" + supplier = g.supplier + form = await request.form + new_status = form.get("status", "") + note = form.get("note", "").strip() + + if new_status not in FORWARD_STATUSES: + return "Invalid status", 422 + + lead_row = await fetch_one( + "SELECT id FROM lead_requests WHERE token = ?", (token,) + ) + if not lead_row: + return "Lead not found", 404 + + from ..core import utcnow_iso + await execute( + """UPDATE lead_forwards + SET status = ?, supplier_note = ?, status_updated_at = ? + WHERE lead_id = ? AND supplier_id = ?""", + (new_status, note or None, utcnow_iso(), lead_row["id"], supplier["id"]), + ) + return "", 204 + + +@bp.route("/leads/cta/") +async def lead_cta_contacted(cta_token: str): + """One-click CTA from forward email: mark as contacted, redirect to dashboard.""" + row = await fetch_one( + "SELECT id, lead_id, supplier_id, status FROM lead_forwards WHERE cta_token = ?", + (cta_token,), + ) + if not row: + return redirect(url_for("suppliers.dashboard")) + + if row["status"] == "sent": + from ..core import utcnow_iso + await execute( + "UPDATE lead_forwards SET status = 'contacted', status_updated_at = ? WHERE id = ?", + (utcnow_iso(), row["id"]), + ) + return redirect(url_for("suppliers.dashboard") + "?tab=leads") + + # ============================================================================= # Supplier Dashboard # ============================================================================= diff --git a/web/src/padelnomics/worker.py b/web/src/padelnomics/worker.py index f30fd17..b543d10 100644 --- a/web/src/padelnomics/worker.py +++ b/web/src/padelnomics/worker.py @@ -5,6 +5,7 @@ Background task worker - SQLite-based queue (no Redis needed). import asyncio import json import logging +import secrets import traceback from datetime import datetime, timedelta @@ -498,6 +499,15 @@ async def handle_send_lead_forward_email(payload: dict) -> None: logger.warning("No email for supplier %s, skipping lead forward", supplier_id) return + # Generate one-click "I've contacted this lead" CTA token + cta_token = secrets.token_urlsafe(24) + cta_url = f"{config.BASE_URL}/suppliers/leads/cta/{cta_token}" + body += ( + f'

' + f'' + f'✓ Mark as contacted

' + ) + await send_email( to=to_email, subject=subject, @@ -506,11 +516,11 @@ async def handle_send_lead_forward_email(payload: dict) -> None: email_type="lead_forward", ) - # Update email_sent_at on lead_forward + # Update email_sent_at and store cta_token on lead_forward now = utcnow_iso() await execute( - "UPDATE lead_forwards SET email_sent_at = ? WHERE lead_id = ? AND supplier_id = ?", - (now, lead_id, supplier_id), + "UPDATE lead_forwards SET email_sent_at = ?, cta_token = ? WHERE lead_id = ? AND supplier_id = ?", + (now, cta_token, lead_id, supplier_id), ) @@ -550,6 +560,159 @@ async def handle_send_lead_matched_notification(payload: dict) -> None: ) +@task("notify_matching_suppliers") +async def handle_notify_matching_suppliers(payload: dict) -> None: + """Notify growth/pro suppliers whose service_area matches a newly verified lead.""" + lead_id = payload["lead_id"] + lang = payload.get("lang", "en") + + lead = await fetch_one( + "SELECT * FROM lead_requests WHERE id = ? AND status = 'new' AND verified_at IS NOT NULL", + (lead_id,), + ) + if not lead or not lead.get("country"): + return + + country = lead["country"] + heat = (lead["heat_score"] or "cool").upper() + + # Find matching suppliers: paid tier, have credits, service_area includes lead country + # service_area is comma-separated country codes (e.g. "DE,AT,CH") + matching = await fetch_all( + """SELECT id, name, contact_email, contact, tier + FROM suppliers + WHERE tier IN ('growth', 'pro') + AND credit_balance > 0 + AND (service_area = ? OR service_area LIKE ? OR service_area LIKE ? OR service_area LIKE ?) + LIMIT 20""", + (country, f"{country},%", f"%,{country}", f"%,{country},%"), + ) + if not matching: + return + + courts = lead["court_count"] or "?" + timeline = lead["timeline"] or "" + facility_type = lead["facility_type"] or "padel" + + for supplier in matching: + to_email = supplier.get("contact_email") or supplier.get("contact") or "" + if not to_email: + continue + + body = ( + f'

' + f'New [{heat}] lead in {country}

' + f'
' + f'

A new project brief has been submitted that matches your service area.

' + f'' + f'' + f'' + f'' + f'' + f'' + f'' + f'' + f'' + f'
Facility{facility_type}
Courts{courts}
Country{country}
Timeline{timeline or "-"}
' + f'

' + f'Contact details are available after unlocking. Credits required: {lead.get("credit_cost", "?")}.

' + f'{_email_button(f"{config.BASE_URL}/suppliers/leads", "View lead feed")}' + ) + + await send_email( + to=to_email, + subject=f"[{heat}] New {facility_type} project in {country} — {courts} courts", + html=_email_wrap(body, lang, preheader=f"New matching lead in {country}"), + from_addr=EMAIL_ADDRESSES["leads"], + email_type="lead_match_notify", + ) + + +@task("send_weekly_lead_digest") +async def handle_send_weekly_lead_digest(payload: dict) -> None: + """Weekly digest to active suppliers: new matching leads in their area.""" + # Find paid suppliers with credits + active_suppliers = await fetch_all( + "SELECT id, name, service_area, contact_email, contact FROM suppliers WHERE tier IN ('growth','pro') AND credit_balance > 0" + ) + for supplier in active_suppliers: + to_email = supplier.get("contact_email") or supplier.get("contact") or "" + if not to_email: + continue + + service_area_raw = (supplier.get("service_area") or "").strip() + if not service_area_raw: + continue + countries = [c.strip() for c in service_area_raw.split(",") if c.strip()] + if not countries: + continue + + placeholders = ",".join("?" * len(countries)) + new_leads = await fetch_all( + f"""SELECT id, heat_score, country, court_count, facility_type, timeline, credit_cost, created_at + FROM lead_requests + WHERE lead_type = 'quote' AND status = 'new' AND verified_at IS NOT NULL + AND country IN ({placeholders}) + AND created_at >= datetime('now', '-7 days') + AND NOT EXISTS ( + SELECT 1 FROM lead_forwards WHERE lead_id = lead_requests.id AND supplier_id = ? + ) + ORDER BY + CASE heat_score WHEN 'hot' THEN 0 WHEN 'warm' THEN 1 ELSE 2 END, + created_at DESC + LIMIT 5""", + tuple(countries) + (supplier["id"],), + ) + if not new_leads: + continue + + lead_rows_html = "" + for ld in new_leads: + heat = (ld["heat_score"] or "cool").upper() + heat_colors = {"HOT": "#DC2626", "WARM": "#EA580C", "COOL": "#2563EB"} + hc = heat_colors.get(heat, "#2563EB") + badge = ( + f'{heat}' + ) + lead_rows_html += ( + f'' + f'' + f'{badge} {ld["facility_type"] or "Padel"}, {ld["court_count"] or "?"} courts' + f'{ld["country"] or "-"}' + f'{ld["timeline"] or "-"}' + f'' + ) + + body = ( + f'

' + f'Your weekly lead digest — {len(new_leads)} new {"lead" if len(new_leads) == 1 else "leads"}

' + f'
' + f'

New matching leads in your service area this week:

' + f'' + f'' + f'' + f'' + f'' + f'' + f'{lead_rows_html}' + f'
ProjectCountryTimeline
' + f'{_email_button(f"{config.BASE_URL}/suppliers/leads", "Unlock leads →")}' + ) + + area_summary = ", ".join(countries[:3]) + if len(countries) > 3: + area_summary += f" +{len(countries) - 3}" + + await send_email( + to=to_email, + subject=f"{len(new_leads)} new padel {'lead' if len(new_leads) == 1 else 'leads'} in {area_summary}", + html=_email_wrap(body, "en", preheader=f"{len(new_leads)} new leads matching your service area"), + from_addr=EMAIL_ADDRESSES["leads"], + email_type="weekly_digest", + ) + + @task("send_supplier_enquiry_email") async def handle_send_supplier_enquiry_email(payload: dict) -> None: """Relay a directory enquiry form submission to the supplier's contact email.""" @@ -823,6 +986,7 @@ async def run_scheduler() -> None: last_credit_refill = None last_seo_sync_date = None + last_weekly_digest = None while True: try: @@ -850,6 +1014,12 @@ async def run_scheduler() -> None: last_seo_sync_date = today_date scheduler_logger.info("Queued SEO metric syncs for %s", today_date) + # Weekly lead digest — every Monday after 8am UTC + if today.weekday() == 0 and today.hour >= 8 and last_weekly_digest != today_date: + await enqueue("send_weekly_lead_digest", {}) + last_weekly_digest = today_date + scheduler_logger.info("Queued weekly lead digest for %s", today_date) + await asyncio.sleep(3600) # 1 hour except Exception as e: