merge(outreach): admin outreach pipeline + separate sending domain

# Conflicts:
#	CHANGELOG.md
This commit is contained in:
Deeman
2026-02-25 15:29:20 +01:00
15 changed files with 1325 additions and 7 deletions

View File

@@ -15,6 +15,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- **`analytics.execute_user_query()`** — new function returning `(columns, rows, error, elapsed_ms)` for admin query editor
- **`worker.run_extraction` task** — background handler shells out to `uv run extract` from repo root (2h timeout)
- 29 new tests covering all routes, data access helpers, security checks, and `execute_user_query()`
- **Outreach pipeline** — cold B2B supplier outreach isolated from transactional emails:
- **Separate sending domain** (`hello.padelnomics.io`) — added `"outreach"` key to `EMAIL_ADDRESSES`; reputation isolated from `notifications.padelnomics.io` magic-link/lead-forward traffic (manual DNS step: add domain in Resend dashboard)
- **Migration 0024** — 4 new columns on `suppliers`: `outreach_status`, `outreach_notes`, `last_contacted_at`, `outreach_sequence_step`; `NULL` status = not in pipeline (no backfill needed for existing suppliers)
- **Admin outreach pipeline tab** (`/admin/outreach`) — 6 pipeline cards (prospect → contacted → replied → signed_up → declined → not_interested) with click-to-filter; HTMX-powered supplier table with inline status dropdown + note editing; sidebar link added
- **HTMX endpoints** — `POST /admin/outreach/<id>/status` returns updated row; `POST /admin/outreach/<id>/note` returns truncated note text
- **Bulk add-to-pipeline** — checkbox column on `/admin/suppliers`, "Add to Outreach Pipeline" form action → `POST /admin/outreach/add-prospects`; skips suppliers already in pipeline
- **CSV import** (`GET/POST /admin/outreach/import`) — uploads CSV (`name`, `contact_email` required; `country_code`, `category`, `website` optional); creates new supplier rows as `prospect`; auto-generates slug; deduplicates by `contact_email`; capped at 500 rows
- **Compose integration** — `GET /admin/emails/compose` now accepts `?from_key=outreach&email_type=outreach&supplier_id=<id>` query params; pre-selects outreach from-address and unchecks HTML wrap (plain text best practice for cold email); on successful send with `email_type=outreach` + `supplier_id`, auto-updates supplier: `prospect→contacted`, `last_contacted_at=now`, `outreach_sequence_step+1`
- **Supplier detail outreach card** — shown when supplier is in the outreach pipeline; displays status, step, last contact date, notes, and "Send Outreach Email" compose link
- 44 new tests in `web/tests/test_outreach.py`
- **Email template system** — all 11 transactional emails migrated from inline f-string HTML in `worker.py` to Jinja2 templates:
- **Standalone renderer** (`email_templates.py`) — `render_email_template()` uses a module-level `jinja2.Environment` with `autoescape=True`, works outside Quart request context (worker process); `tformat` filter mirrors the one in `app.py`
- **`_base.html`** — branded shell (dark header, 3px blue accent, white card body, footer with tagline + copyright); replaces the old `_email_wrap()` helper

View File

@@ -117,6 +117,7 @@
- [x] **Pipeline Console** (`/admin/pipeline`) — 4-tab operational dashboard: extraction status grid per source, filterable run history with stale-run management ("Mark Failed"), data catalog with column schema + 10-row sample, SQL query editor with dark-themed textarea + schema sidebar + read-only security sandboxing (keyword blocklist, 10s timeout, 1,000-row cap)
- [x] **Lead matching notifications**`notify_matching_suppliers` task on quote verification + `send_weekly_lead_digest` every Monday; one-click CTA token in forward emails
- [x] **Migration 0022**`status_updated_at`, `supplier_note`, `cta_token` on `lead_forwards`; supplier respond endpoint; inline HTMX lead detail actions; extended quote form fields
- [x] **Outreach pipeline** (`/admin/outreach`) — cold B2B supplier outreach with separate sending domain (`hello.padelnomics.io`), 6-stage pipeline cards, HTMX inline status + note editing, CSV import, bulk add-to-pipeline from supplier list, compose integration (auto-updates pipeline on send); migration 0024 adds 4 outreach columns to suppliers; 44 tests
### SEO & Legal
- [x] Sitemap (both language variants, `<lastmod>` on all entries)

View File

@@ -1,6 +1,8 @@
"""
Admin domain: role-based admin panel for managing users, tasks, etc.
"""
import csv
import io
import json
import logging
from datetime import date, timedelta
@@ -1199,7 +1201,7 @@ async def feedback():
EMAIL_TYPES = [
"ad_hoc", "magic_link", "welcome", "quote_verification", "waitlist",
"lead_forward", "lead_matched", "supplier_enquiry", "business_plan",
"generic", "admin_compose", "admin_reply",
"generic", "admin_compose", "admin_reply", "outreach",
]
EVENT_TYPES = ["sent", "delivered", "opened", "clicked", "bounced", "complained"]
@@ -1457,7 +1459,17 @@ async def inbox_reply(msg_id: int):
@role_required("admin")
@csrf_protect
async def email_compose():
"""Compose and send an ad-hoc email."""
"""Compose and send an ad-hoc email.
Supports outreach pre-fill via query params:
?to=<email>&from_key=outreach&email_type=outreach&supplier_id=<id>&subject=<text>
When email_type=outreach and supplier_id is set, a successful send auto-updates
the supplier's outreach pipeline state:
- outreach_status: 'prospect''contacted' (only advances from prospect)
- last_contacted_at: set to now
- outreach_sequence_step: incremented by 1
"""
if request.method == "POST":
form = await request.form
to = form.get("to", "").strip()
@@ -1465,12 +1477,23 @@ async def email_compose():
body = form.get("body", "").strip()
from_addr = form.get("from_addr", "") or EMAIL_ADDRESSES["transactional"]
wrap = form.get("wrap", "") == "1"
email_type = form.get("email_type", "admin_compose").strip()
supplier_id_raw = form.get("supplier_id", "").strip()
supplier_id = int(supplier_id_raw) if supplier_id_raw.isdigit() else None
# Only allow known email_type values — default unknown to admin_compose
if email_type not in EMAIL_TYPES:
email_type = "admin_compose"
if not to or not subject or not body:
await flash("To, subject, and body are required.", "error")
return await render_template(
"admin/email_compose.html",
data={"to": to, "subject": subject, "body": body, "from_addr": from_addr},
data={
"to": to, "subject": subject, "body": body,
"from_addr": from_addr, "email_type": email_type,
"supplier_id": supplier_id,
},
email_addresses=EMAIL_ADDRESSES,
)
@@ -1487,22 +1510,60 @@ async def email_compose():
result = await send_email(
to=to, subject=subject, html=html,
from_addr=from_addr, email_type="admin_compose",
from_addr=from_addr, email_type=email_type,
)
if result:
await flash(f"Email sent to {to}.", "success")
# Auto-update outreach pipeline when sending an outreach email to a known supplier
if email_type == "outreach" and supplier_id:
now = utcnow_iso()
await execute(
"""UPDATE suppliers
SET last_contacted_at = ?,
outreach_sequence_step = outreach_sequence_step + 1,
outreach_status = CASE
WHEN outreach_status = 'prospect' THEN 'contacted'
ELSE outreach_status
END
WHERE id = ? AND outreach_status IS NOT NULL""",
(now, supplier_id),
)
return redirect(url_for("admin.emails"))
else:
await flash("Failed to send email.", "error")
return await render_template(
"admin/email_compose.html",
data={"to": to, "subject": subject, "body": body, "from_addr": from_addr},
data={
"to": to, "subject": subject, "body": body,
"from_addr": from_addr, "email_type": email_type,
"supplier_id": supplier_id,
},
email_addresses=EMAIL_ADDRESSES,
)
# GET: pre-fill from query params
prefill_to = request.args.get("to", "")
prefill_subject = request.args.get("subject", "")
from_key = request.args.get("from_key", "")
email_type = request.args.get("email_type", "admin_compose")
supplier_id_raw = request.args.get("supplier_id", "")
supplier_id = int(supplier_id_raw) if supplier_id_raw.isdigit() else None
# Pre-select from_addr when from_key provided (e.g. from_key=outreach)
prefill_from_addr = EMAIL_ADDRESSES.get(from_key, "") if from_key else ""
return await render_template(
"admin/email_compose.html", data={"to": prefill_to}, email_addresses=EMAIL_ADDRESSES,
"admin/email_compose.html",
data={
"to": prefill_to,
"subject": prefill_subject,
"from_addr": prefill_from_addr,
"email_type": email_type,
"supplier_id": supplier_id,
# Default wrap=0 for outreach — plain text best practice
"wrap": email_type != "outreach",
},
email_addresses=EMAIL_ADDRESSES,
)
@@ -2556,3 +2617,297 @@ async def seo_sync_now():
await flash("Unknown source.", "error")
return redirect(url_for("admin.seo"))
# =============================================================================
# Outreach Pipeline
# =============================================================================
OUTREACH_STATUSES = [
"prospect", "contacted", "replied", "signed_up", "declined", "not_interested",
]
# Advancing from prospect to contacted happens automatically on first email send.
# Other status changes are manual via the status dropdown HTMX endpoint.
_PROSPECT_ADVANCE_TARGET = "contacted"
# CSV import: columns accepted (minimum: name + contact_email)
_CSV_REQUIRED = {"name", "contact_email"}
_CSV_OPTIONAL = {"country_code", "category", "website"}
_CSV_IMPORT_LIMIT = 500 # guard against huge uploads
async def get_outreach_pipeline() -> dict:
"""Count suppliers per outreach status for the pipeline summary cards."""
rows = await fetch_all(
"""SELECT outreach_status, COUNT(*) as cnt
FROM suppliers
WHERE outreach_status IS NOT NULL
GROUP BY outreach_status"""
)
counts = {r["outreach_status"]: r["cnt"] for r in rows}
return {
"total": sum(counts.values()),
"counts": counts,
}
async def get_outreach_suppliers(
status: str = None,
country: str = None,
search: str = None,
page: int = 1,
per_page: int = 50,
) -> list[dict]:
"""Filtered list of suppliers that are in the outreach pipeline."""
wheres = ["outreach_status IS NOT NULL"]
params: list = []
if status:
wheres.append("outreach_status = ?")
params.append(status)
if country:
wheres.append("country_code = ?")
params.append(country)
if search:
wheres.append("(name LIKE ? OR contact_email LIKE ?)")
params.extend([f"%{search}%", f"%{search}%"])
where = " AND ".join(wheres)
offset = (page - 1) * per_page
params.extend([per_page, offset])
return await fetch_all(
f"""SELECT id, name, country_code, category, contact_email,
outreach_status, outreach_notes, last_contacted_at,
outreach_sequence_step
FROM suppliers
WHERE {where}
ORDER BY
CASE outreach_status
WHEN 'replied' THEN 1
WHEN 'contacted' THEN 2
WHEN 'prospect' THEN 3
ELSE 4
END,
last_contacted_at DESC NULLS LAST,
name ASC
LIMIT ? OFFSET ?""",
tuple(params),
)
@bp.route("/outreach")
@role_required("admin")
async def outreach():
"""Outreach pipeline dashboard."""
status = request.args.get("status", "")
country = request.args.get("country", "")
search = request.args.get("search", "").strip()
page = max(1, int(request.args.get("page", "1") or "1"))
pipeline = await get_outreach_pipeline()
supplier_list = await get_outreach_suppliers(
status=status or None, country=country or None,
search=search or None, page=page,
)
countries = await fetch_all(
"""SELECT DISTINCT country_code FROM suppliers
WHERE outreach_status IS NOT NULL AND country_code IS NOT NULL
ORDER BY country_code"""
)
return await render_template(
"admin/outreach.html",
pipeline=pipeline,
suppliers=supplier_list,
statuses=OUTREACH_STATUSES,
countries=[c["country_code"] for c in countries],
current_status=status,
current_country=country,
current_search=search,
page=page,
)
@bp.route("/outreach/results")
@role_required("admin")
async def outreach_results():
"""HTMX partial: filtered outreach supplier rows."""
status = request.args.get("status", "")
country = request.args.get("country", "")
search = request.args.get("search", "").strip()
page = max(1, int(request.args.get("page", "1") or "1"))
supplier_list = await get_outreach_suppliers(
status=status or None, country=country or None,
search=search or None, page=page,
)
return await render_template(
"admin/partials/outreach_results.html", suppliers=supplier_list,
)
@bp.route("/outreach/<int:supplier_id>/status", methods=["POST"])
@role_required("admin")
@csrf_protect
async def outreach_status(supplier_id: int):
"""HTMX: update outreach_status for a supplier, return the updated row."""
supplier = await fetch_one(
"SELECT * FROM suppliers WHERE id = ? AND outreach_status IS NOT NULL",
(supplier_id,),
)
if not supplier:
return Response("Not found", status=404)
form = await request.form
new_status = form.get("outreach_status", "").strip()
assert new_status in OUTREACH_STATUSES, f"invalid status: {new_status!r}"
await execute(
"UPDATE suppliers SET outreach_status = ? WHERE id = ?",
(new_status, supplier_id),
)
updated = await fetch_one("SELECT * FROM suppliers WHERE id = ?", (supplier_id,))
# Template uses `s` to match the loop variable in outreach_results.html
return await render_template(
"admin/partials/outreach_row.html", s=updated,
)
@bp.route("/outreach/<int:supplier_id>/note", methods=["POST"])
@role_required("admin")
@csrf_protect
async def outreach_note(supplier_id: int):
"""HTMX: update outreach_notes for a supplier, return the note cell."""
supplier = await fetch_one(
"SELECT id FROM suppliers WHERE id = ? AND outreach_status IS NOT NULL",
(supplier_id,),
)
if not supplier:
return Response("Not found", status=404)
form = await request.form
note = form.get("note", "").strip()
await execute(
"UPDATE suppliers SET outreach_notes = ? WHERE id = ?",
(note or None, supplier_id),
)
return note[:80] + ("" if len(note) > 80 else "") if note else ""
@bp.route("/outreach/add-prospects", methods=["POST"])
@role_required("admin")
@csrf_protect
async def outreach_add_prospects():
"""Bulk-set existing suppliers to 'prospect' status.
Accepts comma-separated supplier_ids from the suppliers list page.
Only updates suppliers where outreach_status IS NULL (not already in pipeline).
"""
form = await request.form
ids_raw = form.get("supplier_ids", "").strip()
if not ids_raw:
await flash("No suppliers selected.", "error")
return redirect(url_for("admin.suppliers"))
# Parse and validate — ignore non-integer tokens
supplier_ids = [int(i) for i in ids_raw.split(",") if i.strip().isdigit()]
assert len(supplier_ids) <= 500, "too many supplier IDs in bulk action"
if not supplier_ids:
await flash("No valid supplier IDs.", "error")
return redirect(url_for("admin.suppliers"))
# Build parameterized query — no string formatting of IDs
placeholders = ",".join("?" for _ in supplier_ids)
await execute(
f"""UPDATE suppliers
SET outreach_status = 'prospect'
WHERE id IN ({placeholders}) AND outreach_status IS NULL""",
tuple(supplier_ids),
)
await flash(f"Added up to {len(supplier_ids)} suppliers to the outreach pipeline.", "success")
return redirect(url_for("admin.outreach"))
@bp.route("/outreach/import", methods=["GET", "POST"])
@role_required("admin")
@csrf_protect
async def outreach_import():
"""CSV import: create supplier rows as prospects.
CSV columns: name, contact_email (required), country_code, category, website (optional).
Deduplicates by contact_email — skips rows where email already exists.
"""
if request.method == "GET":
return await render_template("admin/outreach_import.html")
files = await request.files
csv_file = files.get("csv_file")
if not csv_file or not csv_file.filename:
await flash("No file uploaded.", "error")
return await render_template("admin/outreach_import.html")
raw = csv_file.read().decode("utf-8", errors="replace")
reader = csv.DictReader(io.StringIO(raw))
# Validate headers
fieldnames = set(reader.fieldnames or [])
missing = _CSV_REQUIRED - fieldnames
if missing:
await flash(f"CSV missing required columns: {', '.join(sorted(missing))}", "error")
return await render_template("admin/outreach_import.html")
# Collect existing emails to dedup in one query
existing_rows = await fetch_all(
"SELECT contact_email FROM suppliers WHERE contact_email IS NOT NULL"
)
existing_emails = {r["contact_email"].lower() for r in existing_rows}
imported = 0
skipped = 0
now = utcnow_iso()
for row_num, row in enumerate(reader, start=2):
if row_num > _CSV_IMPORT_LIMIT + 1:
await flash(f"Import capped at {_CSV_IMPORT_LIMIT} rows.", "warning")
break
name = (row.get("name") or "").strip()
contact_email = (row.get("contact_email") or "").strip().lower()
if not name or not contact_email:
skipped += 1
continue
if contact_email in existing_emails:
skipped += 1
continue
country_code = (row.get("country_code") or "").strip().upper() or None
category = (row.get("category") or "").strip() or None
website = (row.get("website") or "").strip() or None
slug_base = slugify(name)
# Ensure unique slug by appending a counter if needed
slug = slug_base
counter = 1
while await fetch_one("SELECT id FROM suppliers WHERE slug = ?", (slug,)):
slug = f"{slug_base}-{counter}"
counter += 1
assert counter <= 100, f"slug collision loop for {name!r}"
await execute(
"""INSERT INTO suppliers
(name, slug, country_code, region, category, website,
tier, outreach_status, created_at)
VALUES (?, ?, ?, 'Europe', ?, ?, 'free', 'prospect', ?)""",
(name, slug, country_code, category, website, now),
)
existing_emails.add(contact_email)
imported += 1
await flash(f"Imported {imported} suppliers. Skipped {skipped} (duplicates or missing data).", "success")
return redirect(url_for("admin.outreach"))

View File

@@ -126,6 +126,10 @@
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke-width="1.5" stroke="currentColor"><path stroke-linecap="round" stroke-linejoin="round" d="M18 18.72a9.094 9.094 0 0 0 3.741-.479 3 3 0 0 0-4.682-2.72m.94 3.198.001.031c0 .225-.012.447-.037.666A11.944 11.944 0 0 1 12 21c-2.17 0-4.207-.576-5.963-1.584A6.062 6.062 0 0 1 6 18.719m12 0a5.971 5.971 0 0 0-.941-3.197m0 0A5.995 5.995 0 0 0 12 12.75a5.995 5.995 0 0 0-5.058 2.772m0 0a3 3 0 0 0-4.681 2.72 8.986 8.986 0 0 0 3.74.477m.94-3.197a5.971 5.971 0 0 0-.94 3.197M15 6.75a3 3 0 1 1-6 0 3 3 0 0 1 6 0Zm6 3a2.25 2.25 0 1 1-4.5 0 2.25 2.25 0 0 1 4.5 0Zm-13.5 0a2.25 2.25 0 1 1-4.5 0 2.25 2.25 0 0 1 4.5 0Z"/></svg>
Audiences
</a>
<a href="{{ url_for('admin.outreach') }}" class="{% if admin_page == 'outreach' %}active{% endif %}">
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke-width="1.5" stroke="currentColor"><path stroke-linecap="round" stroke-linejoin="round" d="M6 12 3.269 3.125A59.769 59.769 0 0 1 21.485 12 59.768 59.768 0 0 1 3.27 20.875L5.999 12Zm0 0h7.5"/></svg>
Outreach
</a>
<div class="admin-sidebar__section">Analytics</div>
<a href="{{ url_for('admin.seo') }}" class="{% if admin_page == 'seo' %}active{% endif %}">

View File

@@ -25,6 +25,17 @@
<div class="card" style="padding:1.5rem;">
<form id="compose-form" method="post" action="{{ url_for('admin.email_compose') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="email_type" value="{{ data.get('email_type', 'admin_compose') }}">
{% if data.get('supplier_id') %}
<input type="hidden" name="supplier_id" value="{{ data.get('supplier_id') }}">
{% endif %}
{% if data.get('email_type') == 'outreach' %}
<div class="mb-3" style="background:#EFF6FF;border:1px solid #BFDBFE;border-radius:6px;padding:0.5rem 0.875rem;font-size:0.8125rem;color:#1E40AF">
Outreach email — sending via <strong>hello.padelnomics.io</strong> (separate domain).
{% if data.get('supplier_id') %} Supplier #{{ data.get('supplier_id') }} pipeline will update on send.{% endif %}
</div>
{% endif %}
<div class="mb-4">
<label class="text-xs font-semibold text-slate block mb-1">From</label>
@@ -63,7 +74,7 @@
<label class="flex items-center gap-2 text-sm">
<input
type="checkbox" name="wrap" value="1"
{% if data.get('wrap', True) %}checked{% endif %}
{% if data.get('wrap') is not defined %}checked{% elif data.get('wrap') %}checked{% endif %}
hx-post="{{ url_for('admin.compose_preview') }}"
hx-trigger="change"
hx-target="#preview-pane"

View File

@@ -0,0 +1,86 @@
{% extends "admin/base_admin.html" %}
{% set admin_page = "outreach" %}
{% block title %}Outreach Pipeline - Admin - {{ config.APP_NAME }}{% endblock %}
{% block admin_content %}
<header class="flex justify-between items-center mb-6">
<div>
<h1 class="text-2xl">Outreach</h1>
<p class="text-sm text-slate mt-1">
{{ pipeline.total }} supplier{{ 's' if pipeline.total != 1 else '' }} in pipeline
&middot; Sending domain: <span class="mono text-xs">hello.padelnomics.io</span>
</p>
</div>
<div class="flex gap-2">
<a href="{{ url_for('admin.outreach_import') }}" class="btn-outline btn-sm">Import CSV</a>
<a href="{{ url_for('admin.suppliers') }}" class="btn-outline btn-sm">+ Add from Suppliers</a>
</div>
</header>
<!-- Pipeline cards -->
<div style="display:grid;grid-template-columns:repeat(6,1fr);gap:0.75rem;margin-bottom:1.5rem">
{% set status_colors = {
'prospect': '#E2E8F0',
'contacted': '#DBEAFE',
'replied': '#D1FAE5',
'signed_up': '#A7F3D0',
'declined': '#FEE2E2',
'not_interested': '#F1F5F9',
} %}
{% for s in statuses %}
<a href="{{ url_for('admin.outreach') }}?status={{ s }}"
class="card text-center"
style="padding:1rem;cursor:pointer;background:{{ status_colors.get(s,'#F8FAFC') }};{{ 'outline:2px solid #1D4ED8;' if current_status == s else '' }}">
<div class="text-2xl font-bold">{{ pipeline.counts.get(s, 0) }}</div>
<div class="text-xs text-slate mt-1">{{ s | replace('_', ' ') | title }}</div>
</a>
{% endfor %}
</div>
<!-- Filters -->
<div class="card mb-4" style="padding:1rem 1.25rem;">
<form class="flex flex-wrap gap-3 items-end"
hx-get="{{ url_for('admin.outreach_results') }}"
hx-target="#outreach-results"
hx-trigger="change, input delay:300ms"
hx-indicator="#outreach-loading">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<div>
<label class="text-xs font-semibold text-slate block mb-1">Status</label>
<select name="status" class="form-input" style="min-width:140px">
<option value="">All statuses</option>
{% for s in statuses %}
<option value="{{ s }}" {% if s == current_status %}selected{% endif %}>{{ s | replace('_', ' ') | title }}</option>
{% endfor %}
</select>
</div>
<div>
<label class="text-xs font-semibold text-slate block mb-1">Country</label>
<select name="country" class="form-input" style="min-width:100px">
<option value="">All</option>
{% for c in countries %}
<option value="{{ c }}" {% if c == current_country %}selected{% endif %}>{{ c }}</option>
{% endfor %}
</select>
</div>
<div>
<label class="text-xs font-semibold text-slate block mb-1">Search</label>
<input type="text" name="search" value="{{ current_search }}" placeholder="Name or email…"
class="form-input" style="min-width:200px">
</div>
<svg id="outreach-loading" class="htmx-indicator search-spinner" width="14" height="14" viewBox="0 0 24 24" fill="none" aria-hidden="true">
<circle cx="12" cy="12" r="10" stroke="#CBD5E1" stroke-width="3"/>
<path d="M12 2a10 10 0 0 1 10 10" stroke="#0EA5E9" stroke-width="3" stroke-linecap="round"/>
</svg>
</form>
</div>
<!-- Results -->
<div id="outreach-results">
{% include "admin/partials/outreach_results.html" %}
</div>
{% endblock %}

View File

@@ -0,0 +1,41 @@
{% extends "admin/base_admin.html" %}
{% set admin_page = "outreach" %}
{% block title %}Import Suppliers - Outreach - Admin - {{ config.APP_NAME }}{% endblock %}
{% block admin_content %}
<header class="mb-6">
<a href="{{ url_for('admin.outreach') }}" class="text-sm text-slate">&larr; Outreach Pipeline</a>
<h1 class="text-2xl mt-1">Import Suppliers from CSV</h1>
</header>
<div style="max-width:600px">
<div class="card" style="padding:1.5rem">
<p class="text-sm text-slate mb-4">
Upload a CSV file to create supplier prospects in bulk.
Rows with a <code>contact_email</code> that already exists in the database are skipped (dedup).
Max {{ 500 }} rows per upload.
</p>
<div class="mb-4" style="background:#F0F9FF;border:1px solid #BAE6FD;border-radius:6px;padding:0.875rem 1rem;">
<p class="text-xs font-semibold mb-1">Required columns</p>
<code class="text-xs">name, contact_email</code>
<p class="text-xs font-semibold mt-2 mb-1">Optional columns</p>
<code class="text-xs">country_code, category, website</code>
</div>
<form method="post" enctype="multipart/form-data" action="{{ url_for('admin.outreach_import') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<div class="mb-4">
<label class="text-xs font-semibold text-slate block mb-1">CSV file</label>
<input type="file" name="csv_file" accept=".csv,text/csv" class="form-input" required>
</div>
<div class="flex gap-2">
<button type="submit" class="btn">Import</button>
<a href="{{ url_for('admin.outreach') }}" class="btn-outline">Cancel</a>
</div>
</form>
</div>
</div>
{% endblock %}

View File

@@ -0,0 +1,30 @@
{% if suppliers %}
<div class="card">
<table class="table">
<thead>
<tr>
<th>Name</th>
<th>Country</th>
<th>Status</th>
<th>Step</th>
<th>Last Contact</th>
<th>Notes</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{% for s in suppliers %}
{% include "admin/partials/outreach_row.html" %}
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<div class="card text-center" style="padding:2rem">
<p class="text-slate">No suppliers match the current filters.</p>
<div class="mt-4 flex gap-2 justify-center">
<a href="{{ url_for('admin.outreach_import') }}" class="btn-outline btn-sm">Import CSV</a>
<a href="{{ url_for('admin.suppliers') }}" class="btn-outline btn-sm">Add from Supplier List</a>
</div>
</div>
{% endif %}

View File

@@ -0,0 +1,66 @@
{% set status_styles = {
'prospect': 'background:#E2E8F0;color:#475569',
'contacted': 'background:#DBEAFE;color:#1E40AF',
'replied': 'background:#D1FAE5;color:#065F46',
'signed_up': 'background:#A7F3D0;color:#065F46',
'declined': 'background:#FEE2E2;color:#991B1B',
'not_interested': 'background:#F1F5F9;color:#64748B',
} %}
<tr id="outreach-row-{{ s.id }}">
<td>
<a href="{{ url_for('admin.supplier_detail', supplier_id=s.id) }}" class="text-sm font-semibold">{{ s.name }}</a>
{% if s.contact_email %}<br><span class="text-xs text-slate">{{ s.contact_email }}</span>{% endif %}
</td>
<td class="text-sm">{{ s.country_code or '-' }}</td>
<td>
{# Inline status dropdown via HTMX #}
<form hx-post="{{ url_for('admin.outreach_status', supplier_id=s.id) }}"
hx-target="#outreach-row-{{ s.id }}"
hx-swap="outerHTML"
class="m-0">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<select name="outreach_status" class="form-input" style="font-size:0.75rem;padding:2px 6px;min-width:120px"
onchange="this.form.requestSubmit()">
{% for st in ['prospect','contacted','replied','signed_up','declined','not_interested'] %}
<option value="{{ st }}" {% if st == s.outreach_status %}selected{% endif %}>{{ st | replace('_',' ') | title }}</option>
{% endfor %}
</select>
</form>
</td>
<td class="text-sm text-center">{{ s.outreach_sequence_step or 0 }}</td>
<td class="text-sm mono">
{% if s.last_contacted_at %}
{{ s.last_contacted_at[:10] }}
{% else %}
{% endif %}
</td>
<td style="max-width:160px">
{# Inline note edit #}
<form hx-post="{{ url_for('admin.outreach_note', supplier_id=s.id) }}"
hx-target="#note-{{ s.id }}"
hx-swap="innerHTML"
class="m-0">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="text" name="note"
value="{{ s.outreach_notes or '' }}"
placeholder="Add note…"
class="form-input"
style="font-size:0.75rem;padding:2px 6px;width:100%"
hx-trigger="change"
hx-post="{{ url_for('admin.outreach_note', supplier_id=s.id) }}"
hx-target="#note-{{ s.id }}"
hx-swap="innerHTML"
hx-include="this">
<span id="note-{{ s.id }}" class="text-xs text-slate">
{% if s.outreach_notes %}{{ s.outreach_notes[:60] }}{% if s.outreach_notes|length > 60 %}…{% endif %}{% endif %}
</span>
</form>
</td>
<td>
{% if s.contact_email %}
{% set compose_url = url_for('admin.email_compose') ~ '?to=' ~ s.contact_email ~ '&from_key=outreach&email_type=outreach&supplier_id=' ~ s.id %}
<a href="{{ compose_url }}" class="btn-outline btn-sm" title="Compose outreach email" style="font-size:0.75rem;padding:2px 8px">✉ Send</a>
{% endif %}
</td>
</tr>

View File

@@ -3,6 +3,7 @@
<table class="table">
<thead>
<tr>
<th style="width:32px"><input type="checkbox" id="select-all" onchange="document.querySelectorAll('.supplier-checkbox').forEach(cb => { cb.checked = this.checked; toggleSelect(Number(cb.dataset.id), this.checked); })"></th>
<th>ID</th>
<th>Name</th>
<th>Country</th>
@@ -16,6 +17,7 @@
<tbody>
{% for s in suppliers %}
<tr data-href="{{ url_for('admin.supplier_detail', supplier_id=s.id) }}">
<td onclick="event.stopPropagation()"><input type="checkbox" class="supplier-checkbox" data-id="{{ s.id }}" onchange="toggleSelect({{ s.id }}, this.checked)"></td>
<td><a href="{{ url_for('admin.supplier_detail', supplier_id=s.id) }}">#{{ s.id }}</a></td>
<td>
<span class="text-sm font-semibold">{{ s.name }}</span>

View File

@@ -119,6 +119,30 @@
</form>
</div>
<!-- Outreach pipeline (shown when in pipeline) -->
{% if supplier.outreach_status %}
<div class="card mb-4" style="padding:1.5rem;border-left:3px solid #1D4ED8">
<h2 class="text-lg mb-3">Outreach Pipeline</h2>
<dl style="display:grid;grid-template-columns:130px 1fr;gap:4px 12px;font-size:0.8125rem;margin-bottom:1rem">
<dt class="text-slate">Status</dt>
<dd><span class="badge">{{ supplier.outreach_status | replace('_', ' ') | title }}</span></dd>
<dt class="text-slate">Step</dt>
<dd>{{ supplier.outreach_sequence_step or 0 }}</dd>
<dt class="text-slate">Last contacted</dt>
<dd class="mono">{{ supplier.last_contacted_at[:10] if supplier.last_contacted_at else '—' }}</dd>
{% if supplier.outreach_notes %}
<dt class="text-slate">Notes</dt>
<dd class="text-xs">{{ supplier.outreach_notes }}</dd>
{% endif %}
</dl>
{% if supplier.contact_email %}
{% set compose_url = url_for('admin.email_compose') ~ '?to=' ~ supplier.contact_email ~ '&from_key=outreach&email_type=outreach&supplier_id=' ~ supplier.id %}
<a href="{{ compose_url }}" class="btn-outline btn-sm">Send Outreach Email</a>
{% endif %}
<a href="{{ url_for('admin.outreach') }}" class="btn-outline btn-sm" style="margin-left:0.5rem">View Pipeline</a>
</div>
{% endif %}
<!-- Active boosts -->
<div class="card" style="padding:1.5rem">
<h2 class="text-lg mb-3">Active Boosts</h2>

View File

@@ -61,8 +61,51 @@
</form>
</div>
<!-- Bulk action bar -->
<form id="bulk-outreach-form" method="post" action="{{ url_for('admin.outreach_add_prospects') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="supplier_ids" id="bulk-supplier-ids" value="">
<div id="bulk-bar" class="card mb-4" style="padding:0.75rem 1.25rem;display:none;align-items:center;gap:1rem;background:#EFF6FF;border:1px solid #BFDBFE;">
<span id="bulk-count" class="text-sm font-semibold text-navy">0 selected</span>
<button type="submit" class="btn btn-sm">Add to Outreach Pipeline</button>
<button type="button" class="btn-outline btn-sm" onclick="clearSelection()">Clear</button>
</div>
</form>
<!-- Results -->
<div id="supplier-results">
{% include "admin/partials/supplier_results.html" %}
</div>
<script>
const selectedIds = new Set();
function toggleSelect(id, checked) {
if (checked) selectedIds.add(id);
else selectedIds.delete(id);
updateBulkBar();
}
function clearSelection() {
selectedIds.clear();
document.querySelectorAll('.supplier-checkbox').forEach(cb => cb.checked = false);
updateBulkBar();
}
function updateBulkBar() {
const bar = document.getElementById('bulk-bar');
const count = document.getElementById('bulk-count');
const ids = document.getElementById('bulk-supplier-ids');
bar.style.display = selectedIds.size > 0 ? 'flex' : 'none';
count.textContent = selectedIds.size + ' selected';
ids.value = Array.from(selectedIds).join(',');
}
// Re-attach after HTMX swap
document.body.addEventListener('htmx:afterSwap', () => {
document.querySelectorAll('.supplier-checkbox').forEach(cb => {
if (selectedIds.has(Number(cb.dataset.id))) cb.checked = true;
});
});
</script>
{% endblock %}

View File

@@ -230,6 +230,8 @@ EMAIL_ADDRESSES = {
"transactional": "Padelnomics <hello@notifications.padelnomics.io>",
"leads": "Padelnomics Leads <leads@notifications.padelnomics.io>",
"nurture": "Padelnomics <coach@notifications.padelnomics.io>",
# Separate sending domain isolates outreach reputation from transactional emails.
"outreach": "Padelnomics <hello@hello.padelnomics.io>",
}

View File

@@ -0,0 +1,17 @@
"""Migration 0024: Add outreach pipeline fields to suppliers table.
Four columns track cold B2B outreach state. NULL outreach_status means the
supplier is not in the pipeline — this keeps existing claimed/paid suppliers
unaffected without requiring a backfill.
"""
def up(conn) -> None:
# Status in the outreach pipeline. NULL = not in pipeline.
conn.execute("ALTER TABLE suppliers ADD COLUMN outreach_status TEXT DEFAULT NULL")
# Freeform admin notes about this contact.
conn.execute("ALTER TABLE suppliers ADD COLUMN outreach_notes TEXT DEFAULT NULL")
# Timestamp of last outreach email sent.
conn.execute("ALTER TABLE suppliers ADD COLUMN last_contacted_at TEXT DEFAULT NULL")
# 0 = not started, 1 = initial contact, 2-5 = follow-ups.
conn.execute("ALTER TABLE suppliers ADD COLUMN outreach_sequence_step INTEGER DEFAULT 0")

626
web/tests/test_outreach.py Normal file
View File

@@ -0,0 +1,626 @@
"""
Tests for the outreach pipeline feature.
Covers:
- Access control (unauthenticated + non-admin → redirect)
- Pipeline dashboard (GET /admin/outreach)
- HTMX filtered results (GET /admin/outreach/results)
- Status update (POST /admin/outreach/<id>/status)
- Note update (POST /admin/outreach/<id>/note)
- Add-prospects bulk action (POST /admin/outreach/add-prospects)
- CSV import (GET + POST /admin/outreach/import)
- Compose pre-fill (GET /admin/emails/compose with outreach params)
- Compose send pipeline update (POST /admin/emails/compose with outreach type)
"""
import io
from datetime import UTC, datetime
from unittest.mock import AsyncMock, patch
import pytest
from quart.datastructures import FileStorage
from padelnomics import core
from padelnomics.admin.routes import (
OUTREACH_STATUSES,
get_outreach_pipeline,
get_outreach_suppliers,
)
# ── Fixtures ──────────────────────────────────────────────────────────────────
_TEST_CSRF = "test-csrf-token-fixed"
@pytest.fixture(autouse=True)
def _bypass_csrf():
"""Disable CSRF validation for all outreach tests.
The core module's validate_csrf_token checks session["csrf_token"] against
the submitted token. In tests we skip the session-cookie round-trip by
patching it to always return True.
"""
with patch("padelnomics.core.validate_csrf_token", return_value=True):
yield
@pytest.fixture
async def admin_client(app, db):
"""Test client with an admin user pre-loaded in session."""
now = datetime.now(UTC).isoformat()
async with db.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("admin@example.com", "Admin User", now),
) as cursor:
user_id = cursor.lastrowid
await db.execute(
"INSERT INTO user_roles (user_id, role) VALUES (?, 'admin')", (user_id,)
)
await db.commit()
async with app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = user_id
yield c
async def _insert_supplier(
db,
name: str = "Test Supplier",
contact_email: str = "supplier@example.com",
country_code: str = "DE",
outreach_status: str = None,
outreach_notes: str = None,
outreach_sequence_step: int = 0,
last_contacted_at: str = None,
) -> int:
"""Insert a supplier row and return its id."""
now = datetime.now(UTC).isoformat()
slug = name.lower().replace(" ", "-")
async with db.execute(
"""INSERT INTO suppliers
(name, slug, country_code, region, category, tier,
contact_email, outreach_status, outreach_notes, outreach_sequence_step,
last_contacted_at, created_at)
VALUES (?, ?, ?, 'Europe', 'construction', 'free',
?, ?, ?, ?, ?, ?)""",
(name, slug, country_code, contact_email,
outreach_status, outreach_notes, outreach_sequence_step,
last_contacted_at, now),
) as cursor:
supplier_id = cursor.lastrowid
await db.commit()
return supplier_id
# ── Constants ─────────────────────────────────────────────────────────────────
class TestConstants:
def test_outreach_statuses_complete(self):
expected = {"prospect", "contacted", "replied", "signed_up", "declined", "not_interested"}
assert set(OUTREACH_STATUSES) == expected
def test_outreach_in_email_addresses(self):
assert "outreach" in core.EMAIL_ADDRESSES
assert "hello.padelnomics.io" in core.EMAIL_ADDRESSES["outreach"]
def test_outreach_in_email_types(self):
from padelnomics.admin.routes import EMAIL_TYPES
assert "outreach" in EMAIL_TYPES
# ── Access Control ────────────────────────────────────────────────────────────
class TestAccessControl:
async def test_unauthenticated_redirects(self, client):
resp = await client.get("/admin/outreach")
assert resp.status_code == 302
async def test_non_admin_redirects(self, auth_client):
resp = await auth_client.get("/admin/outreach")
assert resp.status_code == 302
async def test_admin_gets_200(self, admin_client):
resp = await admin_client.get("/admin/outreach")
assert resp.status_code == 200
async def test_import_unauthenticated_redirects(self, client):
resp = await client.get("/admin/outreach/import")
assert resp.status_code == 302
async def test_import_admin_gets_200(self, admin_client):
resp = await admin_client.get("/admin/outreach/import")
assert resp.status_code == 200
# ── Query Functions ───────────────────────────────────────────────────────────
class TestQueryFunctions:
async def test_pipeline_empty_when_no_outreach_suppliers(self, db):
pipeline = await get_outreach_pipeline()
assert pipeline["total"] == 0
assert pipeline["counts"] == {}
async def test_pipeline_counts_by_status(self, db):
await _insert_supplier(db, name="A", outreach_status="prospect")
await _insert_supplier(db, name="B", outreach_status="prospect")
await _insert_supplier(db, name="C", outreach_status="contacted")
# supplier without outreach_status should NOT appear
await _insert_supplier(db, name="D", outreach_status=None)
pipeline = await get_outreach_pipeline()
assert pipeline["total"] == 3
assert pipeline["counts"]["prospect"] == 2
assert pipeline["counts"]["contacted"] == 1
assert "D" not in str(pipeline)
async def test_get_outreach_suppliers_excludes_null_status(self, db):
await _insert_supplier(db, name="In Pipeline", outreach_status="prospect")
await _insert_supplier(db, name="Not In Pipeline", outreach_status=None)
results = await get_outreach_suppliers()
names = [s["name"] for s in results]
assert "In Pipeline" in names
assert "Not In Pipeline" not in names
async def test_get_outreach_suppliers_filter_by_status(self, db):
await _insert_supplier(db, name="Prospect Co", outreach_status="prospect")
await _insert_supplier(db, name="Contacted Co", outreach_status="contacted")
results = await get_outreach_suppliers(status="prospect")
assert len(results) == 1
assert results[0]["name"] == "Prospect Co"
async def test_get_outreach_suppliers_filter_by_country(self, db):
await _insert_supplier(db, name="DE Corp", country_code="DE", outreach_status="prospect")
await _insert_supplier(db, name="ES Corp", country_code="ES", outreach_status="prospect")
results = await get_outreach_suppliers(country="DE")
assert len(results) == 1
assert results[0]["name"] == "DE Corp"
async def test_get_outreach_suppliers_search(self, db):
await _insert_supplier(db, name="PadelBuild GmbH", contact_email="info@padelbuild.de", outreach_status="prospect")
await _insert_supplier(db, name="CourtTech SL", contact_email="hi@courttech.es", outreach_status="prospect")
results = await get_outreach_suppliers(search="padelb")
assert len(results) == 1
assert results[0]["name"] == "PadelBuild GmbH"
# ── Dashboard Route ───────────────────────────────────────────────────────────
class TestDashboard:
async def test_shows_pipeline_cards(self, admin_client, db):
await _insert_supplier(db, name="P1", outreach_status="prospect")
resp = await admin_client.get("/admin/outreach")
html = (await resp.data).decode()
assert resp.status_code == 200
assert "Prospect" in html
async def test_shows_supplier_in_results(self, admin_client, db):
await _insert_supplier(db, name="Outreach Supplier", outreach_status="prospect")
resp = await admin_client.get("/admin/outreach")
html = (await resp.data).decode()
assert "Outreach Supplier" in html
async def test_does_not_show_non_pipeline_supplier(self, admin_client, db):
await _insert_supplier(db, name="Regular Supplier", outreach_status=None)
resp = await admin_client.get("/admin/outreach")
html = (await resp.data).decode()
assert "Regular Supplier" not in html
async def test_filter_by_status_via_querystring(self, admin_client, db):
await _insert_supplier(db, name="ProspectA", outreach_status="prospect")
await _insert_supplier(db, name="ContactedB", outreach_status="contacted")
resp = await admin_client.get("/admin/outreach?status=prospect")
html = (await resp.data).decode()
assert "ProspectA" in html
assert "ContactedB" not in html
# ── HTMX Results Partial ──────────────────────────────────────────────────────
class TestOutreachResults:
async def test_returns_partial_html(self, admin_client, db):
await _insert_supplier(db, name="Partial Test", outreach_status="contacted")
resp = await admin_client.get("/admin/outreach/results?status=contacted")
assert resp.status_code == 200
html = (await resp.data).decode()
assert "Partial Test" in html
async def test_empty_state_when_no_match(self, admin_client, db):
resp = await admin_client.get("/admin/outreach/results?status=replied")
assert resp.status_code == 200
html = (await resp.data).decode()
assert "No suppliers" in html
# ── Status Update ─────────────────────────────────────────────────────────────
class TestStatusUpdate:
async def test_updates_status(self, admin_client, db):
supplier_id = await _insert_supplier(db, outreach_status="prospect")
resp = await admin_client.post(
f"/admin/outreach/{supplier_id}/status",
form={"outreach_status": "contacted", "csrf_token": _TEST_CSRF},
)
assert resp.status_code == 200
updated = await core.fetch_one(
"SELECT outreach_status FROM suppliers WHERE id = ?", (supplier_id,)
)
assert updated["outreach_status"] == "contacted"
async def test_returns_404_for_non_pipeline_supplier(self, admin_client, db):
supplier_id = await _insert_supplier(db, outreach_status=None)
resp = await admin_client.post(
f"/admin/outreach/{supplier_id}/status",
form={"outreach_status": "contacted", "csrf_token": _TEST_CSRF},
)
assert resp.status_code == 404
async def test_returns_row_html(self, admin_client, db):
supplier_id = await _insert_supplier(db, outreach_status="prospect")
resp = await admin_client.post(
f"/admin/outreach/{supplier_id}/status",
form={"outreach_status": "replied", "csrf_token": _TEST_CSRF},
)
html = (await resp.data).decode()
# The updated row should contain the supplier ID
assert str(supplier_id) in html
# ── Note Update ───────────────────────────────────────────────────────────────
class TestNoteUpdate:
async def test_saves_note(self, admin_client, db):
supplier_id = await _insert_supplier(db, outreach_status="prospect")
resp = await admin_client.post(
f"/admin/outreach/{supplier_id}/note",
form={"note": "Spoke to CEO interested", "csrf_token": _TEST_CSRF},
)
assert resp.status_code == 200
row = await core.fetch_one(
"SELECT outreach_notes FROM suppliers WHERE id = ?", (supplier_id,)
)
assert row["outreach_notes"] == "Spoke to CEO interested"
async def test_clears_note_when_empty(self, admin_client, db):
supplier_id = await _insert_supplier(
db, outreach_status="prospect", outreach_notes="Old note"
)
await admin_client.post(
f"/admin/outreach/{supplier_id}/note",
form={"note": "", "csrf_token": _TEST_CSRF},
)
row = await core.fetch_one(
"SELECT outreach_notes FROM suppliers WHERE id = ?", (supplier_id,)
)
assert row["outreach_notes"] is None
async def test_returns_404_for_non_pipeline_supplier(self, admin_client, db):
supplier_id = await _insert_supplier(db, outreach_status=None)
resp = await admin_client.post(
f"/admin/outreach/{supplier_id}/note",
form={"note": "test", "csrf_token": _TEST_CSRF},
)
assert resp.status_code == 404
async def test_returns_truncated_note_in_response(self, admin_client, db):
supplier_id = await _insert_supplier(db, outreach_status="prospect")
long_note = "A" * 100
resp = await admin_client.post(
f"/admin/outreach/{supplier_id}/note",
form={"note": long_note, "csrf_token": _TEST_CSRF},
)
body = (await resp.data).decode()
assert "\u2026" in body # truncation marker (…)
assert len(body) < 110 # truncated to 80 chars + marker
# ── Add Prospects ─────────────────────────────────────────────────────────────
class TestAddProspects:
async def test_sets_prospect_status(self, admin_client, db):
s1 = await _insert_supplier(db, name="Alpha", outreach_status=None)
s2 = await _insert_supplier(db, name="Beta", outreach_status=None)
resp = await admin_client.post(
"/admin/outreach/add-prospects",
form={"supplier_ids": f"{s1},{s2}", "csrf_token": _TEST_CSRF},
)
assert resp.status_code == 302 # redirect after success
rows = await core.fetch_all(
"SELECT outreach_status FROM suppliers WHERE id IN (?, ?)", (s1, s2)
)
assert all(r["outreach_status"] == "prospect" for r in rows)
async def test_does_not_override_existing_status(self, admin_client, db):
supplier_id = await _insert_supplier(
db, name="Already Contacted", outreach_status="contacted"
)
await admin_client.post(
"/admin/outreach/add-prospects",
form={"supplier_ids": str(supplier_id), "csrf_token": _TEST_CSRF},
)
row = await core.fetch_one(
"SELECT outreach_status FROM suppliers WHERE id = ?", (supplier_id,)
)
# Should NOT have been changed to prospect
assert row["outreach_status"] == "contacted"
async def test_no_ids_redirects_with_error(self, admin_client):
resp = await admin_client.post(
"/admin/outreach/add-prospects",
form={"supplier_ids": "", "csrf_token": _TEST_CSRF},
)
assert resp.status_code == 302
# ── CSV Import ────────────────────────────────────────────────────────────────
def _make_csv_file(csv_bytes: bytes, filename: str = "test.csv") -> FileStorage:
"""Wrap CSV bytes in a FileStorage for Quart's files= parameter."""
return FileStorage(stream=io.BytesIO(csv_bytes), filename=filename, name="csv_file")
class TestCSVImport:
async def test_import_creates_prospects(self, admin_client, db):
csv_bytes = (
b"name,contact_email,country_code,category\n"
b"BuildCo,build@example.com,DE,construction\n"
b"CourtTech,court@example.com,ES,equipment\n"
)
resp = await admin_client.post(
"/admin/outreach/import",
form={"csrf_token": _TEST_CSRF},
files={"csv_file": _make_csv_file(csv_bytes, "prospects.csv")},
)
assert resp.status_code == 302
rows = await core.fetch_all(
"SELECT name, outreach_status FROM suppliers WHERE outreach_status = 'prospect'"
)
names = {r["name"] for r in rows}
assert "BuildCo" in names
assert "CourtTech" in names
async def test_import_deduplicates_by_email(self, admin_client, db):
await _insert_supplier(db, name="Existing", contact_email="dupe@example.com", outreach_status=None)
csv_bytes = (
b"name,contact_email\n"
b"New Name,dupe@example.com\n"
)
await admin_client.post(
"/admin/outreach/import",
form={"csrf_token": _TEST_CSRF},
files={"csv_file": _make_csv_file(csv_bytes)},
)
rows = await core.fetch_all(
"SELECT * FROM suppliers WHERE contact_email = 'dupe@example.com'"
)
assert len(rows) == 1
async def test_import_skips_missing_required_fields(self, admin_client, db):
csv_bytes = (
b"name,contact_email\n"
b",missing@example.com\n"
b"No Email,\n"
)
await admin_client.post(
"/admin/outreach/import",
form={"csrf_token": _TEST_CSRF},
files={"csv_file": _make_csv_file(csv_bytes)},
)
rows = await core.fetch_all(
"SELECT * FROM suppliers WHERE outreach_status = 'prospect'"
)
assert len(rows) == 0
async def test_import_missing_required_columns_returns_error(self, admin_client):
csv_bytes = b"company_name,email_address\nFoo,foo@bar.com\n"
resp = await admin_client.post(
"/admin/outreach/import",
form={"csrf_token": _TEST_CSRF},
files={"csv_file": _make_csv_file(csv_bytes, "bad.csv")},
)
assert resp.status_code == 200 # renders form again with error
html = (await resp.data).decode()
assert "missing" in html.lower() or "required" in html.lower() or "contact_email" in html
async def test_import_no_file_returns_error(self, admin_client):
resp = await admin_client.post(
"/admin/outreach/import",
form={"csrf_token": _TEST_CSRF},
)
assert resp.status_code == 200
html = (await resp.data).decode()
assert "No file" in html or "file" in html.lower()
async def test_get_import_page(self, admin_client):
resp = await admin_client.get("/admin/outreach/import")
assert resp.status_code == 200
html = (await resp.data).decode()
assert "csv" in html.lower() or "CSV" in html
# ── Compose Pre-fill ──────────────────────────────────────────────────────────
class TestComposePrefill:
async def test_compose_prefills_to_field(self, admin_client):
resp = await admin_client.get(
"/admin/emails/compose?to=supplier@example.com&from_key=outreach&email_type=outreach&supplier_id=42"
)
assert resp.status_code == 200
html = (await resp.data).decode()
assert "supplier@example.com" in html
async def test_compose_outreach_shows_banner(self, admin_client):
resp = await admin_client.get(
"/admin/emails/compose?email_type=outreach&supplier_id=42"
)
html = (await resp.data).decode()
assert "hello.padelnomics.io" in html
async def test_compose_outreach_wrap_unchecked_by_default(self, admin_client):
"""Outreach emails default to plain text (wrap=0)."""
resp = await admin_client.get(
"/admin/emails/compose?email_type=outreach"
)
html = (await resp.data).decode()
# The wrap checkbox should NOT be checked
assert 'name="wrap"' in html
# checked should not appear on the wrap input when email_type=outreach
import re
wrap_input = re.search(r'<input[^>]+name="wrap"[^>]*>', html)
assert wrap_input, "wrap input not found"
assert "checked" not in wrap_input.group(0)
async def test_compose_preselects_outreach_from_addr(self, admin_client):
"""from_key=outreach should pre-select the outreach from address."""
resp = await admin_client.get(
"/admin/emails/compose?from_key=outreach"
)
html = (await resp.data).decode()
assert "hello.padelnomics.io" in html
# ── Compose → Pipeline Update ─────────────────────────────────────────────────
class TestComposePipelineUpdate:
async def test_sends_outreach_email_and_updates_supplier(self, admin_client, db):
"""Sending an outreach email should advance prospect → contacted."""
supplier_id = await _insert_supplier(
db, name="Pipeline Test", contact_email="pt@example.com",
outreach_status="prospect", outreach_sequence_step=0,
)
with patch("padelnomics.admin.routes.send_email", new_callable=AsyncMock) as mock_send:
mock_send.return_value = "test-resend-id"
resp = await admin_client.post(
"/admin/emails/compose",
form={
"to": "pt@example.com",
"subject": "Hello from Padelnomics",
"body": "Would you like to join our platform?",
"from_addr": core.EMAIL_ADDRESSES["outreach"],
"email_type": "outreach",
"supplier_id": str(supplier_id),
"csrf_token": _TEST_CSRF,
},
)
assert resp.status_code == 302 # redirect on success
updated = await core.fetch_one(
"SELECT outreach_status, outreach_sequence_step, last_contacted_at FROM suppliers WHERE id = ?",
(supplier_id,),
)
assert updated["outreach_status"] == "contacted"
assert updated["outreach_sequence_step"] == 1
assert updated["last_contacted_at"] is not None
async def test_sends_outreach_increments_step_when_already_contacted(self, admin_client, db):
"""A follow-up email increments step without changing status."""
supplier_id = await _insert_supplier(
db, name="Follow-up Test", contact_email="fu@example.com",
outreach_status="contacted", outreach_sequence_step=1,
)
with patch("padelnomics.admin.routes.send_email", new_callable=AsyncMock) as mock_send:
mock_send.return_value = "test-resend-id-2"
await admin_client.post(
"/admin/emails/compose",
form={
"to": "fu@example.com",
"subject": "Following up",
"body": "Just checking in.",
"from_addr": core.EMAIL_ADDRESSES["outreach"],
"email_type": "outreach",
"supplier_id": str(supplier_id),
"csrf_token": _TEST_CSRF,
},
)
updated = await core.fetch_one(
"SELECT outreach_status, outreach_sequence_step FROM suppliers WHERE id = ?",
(supplier_id,),
)
assert updated["outreach_status"] == "contacted" # unchanged
assert updated["outreach_sequence_step"] == 2 # incremented
async def test_non_outreach_compose_does_not_update_supplier(self, admin_client, db):
"""admin_compose emails should NOT touch outreach pipeline state."""
supplier_id = await _insert_supplier(
db, name="No Update", contact_email="noupdate@example.com",
outreach_status="prospect", outreach_sequence_step=0,
)
with patch("padelnomics.admin.routes.send_email", new_callable=AsyncMock) as mock_send:
mock_send.return_value = "resend-id-adhoc"
await admin_client.post(
"/admin/emails/compose",
form={
"to": "noupdate@example.com",
"subject": "General message",
"body": "Hi there.",
"from_addr": core.EMAIL_ADDRESSES["transactional"],
"email_type": "admin_compose",
"csrf_token": _TEST_CSRF,
},
)
row = await core.fetch_one(
"SELECT outreach_status, outreach_sequence_step FROM suppliers WHERE id = ?",
(supplier_id,),
)
assert row["outreach_status"] == "prospect" # unchanged
assert row["outreach_sequence_step"] == 0 # unchanged
async def test_failed_send_does_not_update_supplier(self, admin_client, db):
"""Failed send (returns None) must NOT update the pipeline."""
supplier_id = await _insert_supplier(
db, name="Fail Send", contact_email="fail@example.com",
outreach_status="prospect", outreach_sequence_step=0,
)
with patch("padelnomics.admin.routes.send_email", new_callable=AsyncMock) as mock_send:
mock_send.return_value = None # simulate send failure
await admin_client.post(
"/admin/emails/compose",
form={
"to": "fail@example.com",
"subject": "Test",
"body": "Hello.",
"from_addr": core.EMAIL_ADDRESSES["outreach"],
"email_type": "outreach",
"supplier_id": str(supplier_id),
"csrf_token": _TEST_CSRF,
},
)
row = await core.fetch_one(
"SELECT outreach_status, outreach_sequence_step FROM suppliers WHERE id = ?",
(supplier_id,),
)
assert row["outreach_status"] == "prospect" # unchanged
assert row["outreach_sequence_step"] == 0 # unchanged
# ── CSV writer helper (avoids importing DictWriter at module level) ────────────
import csv as _csv_module
def _csv_writer(buf, fieldnames):
return _csv_module.DictWriter(buf, fieldnames=fieldnames)