- Move logger= after imports in planner/routes.py and setup_paddle.py - Add # noqa: E402 to intentional post-setup imports (app.py, core.py, migrate.py, test_supervisor.py) - Fix unused cursor variables (test_noindex.py) → _ - Move stray csv import to top of test_outreach.py - Auto-sort import blocks (test_email_templates, test_noindex, test_outreach) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
839 lines
36 KiB
Python
839 lines
36 KiB
Python
"""
|
|
Tests for the outreach pipeline feature.
|
|
|
|
Covers:
|
|
- Access control (unauthenticated + non-admin → redirect)
|
|
- Pipeline dashboard (GET /admin/outreach)
|
|
- HTMX filtered results (GET /admin/outreach/results)
|
|
- Status update (POST /admin/outreach/<id>/status)
|
|
- Note update (POST /admin/outreach/<id>/note)
|
|
- Add-prospects bulk action (POST /admin/outreach/add-prospects)
|
|
- CSV import (GET + POST /admin/outreach/import)
|
|
- Compose pre-fill (GET /admin/emails/compose with outreach params)
|
|
- Compose send pipeline update (POST /admin/emails/compose with outreach type)
|
|
"""
|
|
import csv as _csv_module
|
|
import io
|
|
from datetime import UTC, datetime
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
from padelnomics.admin.routes import (
|
|
OUTREACH_STATUSES,
|
|
get_follow_up_due_count,
|
|
get_outreach_pipeline,
|
|
get_outreach_suppliers,
|
|
)
|
|
from quart.datastructures import FileStorage
|
|
|
|
from padelnomics import core
|
|
|
|
# ── Fixtures ──────────────────────────────────────────────────────────────────
|
|
|
|
|
|
_TEST_CSRF = "test-csrf-token-fixed"
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _bypass_csrf():
|
|
"""Disable CSRF validation for all outreach tests.
|
|
|
|
The core module's validate_csrf_token checks session["csrf_token"] against
|
|
the submitted token. In tests we skip the session-cookie round-trip by
|
|
patching it to always return True.
|
|
"""
|
|
with patch("padelnomics.core.validate_csrf_token", return_value=True):
|
|
yield
|
|
|
|
|
|
@pytest.fixture
|
|
async def admin_client(app, db):
|
|
"""Test client with an admin user pre-loaded in session."""
|
|
now = datetime.now(UTC).isoformat()
|
|
async with db.execute(
|
|
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
|
|
("admin@example.com", "Admin User", now),
|
|
) as cursor:
|
|
user_id = cursor.lastrowid
|
|
await db.execute(
|
|
"INSERT INTO user_roles (user_id, role) VALUES (?, 'admin')", (user_id,)
|
|
)
|
|
await db.commit()
|
|
|
|
async with app.test_client() as c:
|
|
async with c.session_transaction() as sess:
|
|
sess["user_id"] = user_id
|
|
yield c
|
|
|
|
|
|
async def _insert_supplier(
|
|
db,
|
|
name: str = "Test Supplier",
|
|
contact_email: str = "supplier@example.com",
|
|
country_code: str = "DE",
|
|
outreach_status: str = None,
|
|
outreach_notes: str = None,
|
|
outreach_sequence_step: int = 0,
|
|
last_contacted_at: str = None,
|
|
follow_up_at: str = None,
|
|
) -> int:
|
|
"""Insert a supplier row and return its id."""
|
|
now = datetime.now(UTC).isoformat()
|
|
slug = name.lower().replace(" ", "-")
|
|
async with db.execute(
|
|
"""INSERT INTO suppliers
|
|
(name, slug, country_code, region, category, tier,
|
|
contact_email, outreach_status, outreach_notes, outreach_sequence_step,
|
|
last_contacted_at, follow_up_at, created_at)
|
|
VALUES (?, ?, ?, 'Europe', 'construction', 'free',
|
|
?, ?, ?, ?, ?, ?, ?)""",
|
|
(name, slug, country_code, contact_email,
|
|
outreach_status, outreach_notes, outreach_sequence_step,
|
|
last_contacted_at, follow_up_at, now),
|
|
) as cursor:
|
|
supplier_id = cursor.lastrowid
|
|
await db.commit()
|
|
return supplier_id
|
|
|
|
|
|
# ── Constants ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestConstants:
|
|
def test_outreach_statuses_complete(self):
|
|
expected = {"prospect", "contacted", "replied", "signed_up", "declined", "not_interested"}
|
|
assert set(OUTREACH_STATUSES) == expected
|
|
|
|
def test_outreach_in_email_addresses(self):
|
|
assert "outreach" in core.EMAIL_ADDRESSES
|
|
assert "hello.padelnomics.io" in core.EMAIL_ADDRESSES["outreach"]
|
|
|
|
def test_outreach_in_email_types(self):
|
|
from padelnomics.admin.routes import EMAIL_TYPES
|
|
assert "outreach" in EMAIL_TYPES
|
|
|
|
|
|
# ── Access Control ────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestAccessControl:
|
|
async def test_unauthenticated_redirects(self, client):
|
|
resp = await client.get("/admin/outreach")
|
|
assert resp.status_code == 302
|
|
|
|
async def test_non_admin_redirects(self, auth_client):
|
|
resp = await auth_client.get("/admin/outreach")
|
|
assert resp.status_code == 302
|
|
|
|
async def test_admin_gets_200(self, admin_client):
|
|
resp = await admin_client.get("/admin/outreach")
|
|
assert resp.status_code == 200
|
|
|
|
async def test_import_unauthenticated_redirects(self, client):
|
|
resp = await client.get("/admin/outreach/import")
|
|
assert resp.status_code == 302
|
|
|
|
async def test_import_admin_gets_200(self, admin_client):
|
|
resp = await admin_client.get("/admin/outreach/import")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
# ── Query Functions ───────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestQueryFunctions:
|
|
async def test_pipeline_empty_when_no_outreach_suppliers(self, db):
|
|
pipeline = await get_outreach_pipeline()
|
|
assert pipeline["total"] == 0
|
|
assert pipeline["counts"] == {}
|
|
|
|
async def test_pipeline_counts_by_status(self, db):
|
|
await _insert_supplier(db, name="A", outreach_status="prospect")
|
|
await _insert_supplier(db, name="B", outreach_status="prospect")
|
|
await _insert_supplier(db, name="C", outreach_status="contacted")
|
|
# supplier without outreach_status should NOT appear
|
|
await _insert_supplier(db, name="D", outreach_status=None)
|
|
|
|
pipeline = await get_outreach_pipeline()
|
|
assert pipeline["total"] == 3
|
|
assert pipeline["counts"]["prospect"] == 2
|
|
assert pipeline["counts"]["contacted"] == 1
|
|
assert "D" not in str(pipeline)
|
|
|
|
async def test_get_outreach_suppliers_excludes_null_status(self, db):
|
|
await _insert_supplier(db, name="In Pipeline", outreach_status="prospect")
|
|
await _insert_supplier(db, name="Not In Pipeline", outreach_status=None)
|
|
|
|
results = await get_outreach_suppliers()
|
|
names = [s["name"] for s in results]
|
|
assert "In Pipeline" in names
|
|
assert "Not In Pipeline" not in names
|
|
|
|
async def test_get_outreach_suppliers_filter_by_status(self, db):
|
|
await _insert_supplier(db, name="Prospect Co", outreach_status="prospect")
|
|
await _insert_supplier(db, name="Contacted Co", outreach_status="contacted")
|
|
|
|
results = await get_outreach_suppliers(status="prospect")
|
|
assert len(results) == 1
|
|
assert results[0]["name"] == "Prospect Co"
|
|
|
|
async def test_get_outreach_suppliers_filter_by_country(self, db):
|
|
await _insert_supplier(db, name="DE Corp", country_code="DE", outreach_status="prospect")
|
|
await _insert_supplier(db, name="ES Corp", country_code="ES", outreach_status="prospect")
|
|
|
|
results = await get_outreach_suppliers(country="DE")
|
|
assert len(results) == 1
|
|
assert results[0]["name"] == "DE Corp"
|
|
|
|
async def test_get_outreach_suppliers_search(self, db):
|
|
await _insert_supplier(db, name="PadelBuild GmbH", contact_email="info@padelbuild.de", outreach_status="prospect")
|
|
await _insert_supplier(db, name="CourtTech SL", contact_email="hi@courttech.es", outreach_status="prospect")
|
|
|
|
results = await get_outreach_suppliers(search="padelb")
|
|
assert len(results) == 1
|
|
assert results[0]["name"] == "PadelBuild GmbH"
|
|
|
|
|
|
# ── Dashboard Route ───────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestDashboard:
|
|
async def test_shows_pipeline_cards(self, admin_client, db):
|
|
await _insert_supplier(db, name="P1", outreach_status="prospect")
|
|
resp = await admin_client.get("/admin/outreach")
|
|
html = (await resp.data).decode()
|
|
assert resp.status_code == 200
|
|
assert "Prospect" in html
|
|
|
|
async def test_shows_supplier_in_results(self, admin_client, db):
|
|
await _insert_supplier(db, name="Outreach Supplier", outreach_status="prospect")
|
|
resp = await admin_client.get("/admin/outreach")
|
|
html = (await resp.data).decode()
|
|
assert "Outreach Supplier" in html
|
|
|
|
async def test_does_not_show_non_pipeline_supplier(self, admin_client, db):
|
|
await _insert_supplier(db, name="Regular Supplier", outreach_status=None)
|
|
resp = await admin_client.get("/admin/outreach")
|
|
html = (await resp.data).decode()
|
|
assert "Regular Supplier" not in html
|
|
|
|
async def test_filter_by_status_via_querystring(self, admin_client, db):
|
|
await _insert_supplier(db, name="ProspectA", outreach_status="prospect")
|
|
await _insert_supplier(db, name="ContactedB", outreach_status="contacted")
|
|
resp = await admin_client.get("/admin/outreach?status=prospect")
|
|
html = (await resp.data).decode()
|
|
assert "ProspectA" in html
|
|
assert "ContactedB" not in html
|
|
|
|
|
|
# ── HTMX Results Partial ──────────────────────────────────────────────────────
|
|
|
|
|
|
class TestOutreachResults:
|
|
async def test_returns_partial_html(self, admin_client, db):
|
|
await _insert_supplier(db, name="Partial Test", outreach_status="contacted")
|
|
resp = await admin_client.get("/admin/outreach/results?status=contacted")
|
|
assert resp.status_code == 200
|
|
html = (await resp.data).decode()
|
|
assert "Partial Test" in html
|
|
|
|
async def test_empty_state_when_no_match(self, admin_client, db):
|
|
resp = await admin_client.get("/admin/outreach/results?status=replied")
|
|
assert resp.status_code == 200
|
|
html = (await resp.data).decode()
|
|
assert "No suppliers" in html
|
|
|
|
|
|
# ── Status Update ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestStatusUpdate:
|
|
async def test_updates_status(self, admin_client, db):
|
|
supplier_id = await _insert_supplier(db, outreach_status="prospect")
|
|
resp = await admin_client.post(
|
|
f"/admin/outreach/{supplier_id}/status",
|
|
form={"outreach_status": "contacted", "csrf_token": _TEST_CSRF},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
updated = await core.fetch_one(
|
|
"SELECT outreach_status FROM suppliers WHERE id = ?", (supplier_id,)
|
|
)
|
|
assert updated["outreach_status"] == "contacted"
|
|
|
|
async def test_returns_404_for_non_pipeline_supplier(self, admin_client, db):
|
|
supplier_id = await _insert_supplier(db, outreach_status=None)
|
|
resp = await admin_client.post(
|
|
f"/admin/outreach/{supplier_id}/status",
|
|
form={"outreach_status": "contacted", "csrf_token": _TEST_CSRF},
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
async def test_returns_row_html(self, admin_client, db):
|
|
supplier_id = await _insert_supplier(db, outreach_status="prospect")
|
|
resp = await admin_client.post(
|
|
f"/admin/outreach/{supplier_id}/status",
|
|
form={"outreach_status": "replied", "csrf_token": _TEST_CSRF},
|
|
)
|
|
html = (await resp.data).decode()
|
|
# The updated row should contain the supplier ID
|
|
assert str(supplier_id) in html
|
|
|
|
|
|
# ── Note Update ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestNoteUpdate:
|
|
async def test_saves_note(self, admin_client, db):
|
|
supplier_id = await _insert_supplier(db, outreach_status="prospect")
|
|
resp = await admin_client.post(
|
|
f"/admin/outreach/{supplier_id}/note",
|
|
form={"note": "Spoke to CEO interested", "csrf_token": _TEST_CSRF},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
row = await core.fetch_one(
|
|
"SELECT outreach_notes FROM suppliers WHERE id = ?", (supplier_id,)
|
|
)
|
|
assert row["outreach_notes"] == "Spoke to CEO interested"
|
|
|
|
async def test_clears_note_when_empty(self, admin_client, db):
|
|
supplier_id = await _insert_supplier(
|
|
db, outreach_status="prospect", outreach_notes="Old note"
|
|
)
|
|
await admin_client.post(
|
|
f"/admin/outreach/{supplier_id}/note",
|
|
form={"note": "", "csrf_token": _TEST_CSRF},
|
|
)
|
|
row = await core.fetch_one(
|
|
"SELECT outreach_notes FROM suppliers WHERE id = ?", (supplier_id,)
|
|
)
|
|
assert row["outreach_notes"] is None
|
|
|
|
async def test_returns_404_for_non_pipeline_supplier(self, admin_client, db):
|
|
supplier_id = await _insert_supplier(db, outreach_status=None)
|
|
resp = await admin_client.post(
|
|
f"/admin/outreach/{supplier_id}/note",
|
|
form={"note": "test", "csrf_token": _TEST_CSRF},
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
async def test_returns_truncated_note_in_response(self, admin_client, db):
|
|
supplier_id = await _insert_supplier(db, outreach_status="prospect")
|
|
long_note = "A" * 100
|
|
resp = await admin_client.post(
|
|
f"/admin/outreach/{supplier_id}/note",
|
|
form={"note": long_note, "csrf_token": _TEST_CSRF},
|
|
)
|
|
body = (await resp.data).decode()
|
|
assert "\u2026" in body # truncation marker (…)
|
|
assert len(body) < 110 # truncated to 80 chars + marker
|
|
|
|
|
|
# ── Add Prospects ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestAddProspects:
|
|
async def test_sets_prospect_status(self, admin_client, db):
|
|
s1 = await _insert_supplier(db, name="Alpha", outreach_status=None)
|
|
s2 = await _insert_supplier(db, name="Beta", outreach_status=None)
|
|
resp = await admin_client.post(
|
|
"/admin/outreach/add-prospects",
|
|
form={"supplier_ids": f"{s1},{s2}", "csrf_token": _TEST_CSRF},
|
|
)
|
|
assert resp.status_code == 302 # redirect after success
|
|
|
|
rows = await core.fetch_all(
|
|
"SELECT outreach_status FROM suppliers WHERE id IN (?, ?)", (s1, s2)
|
|
)
|
|
assert all(r["outreach_status"] == "prospect" for r in rows)
|
|
|
|
async def test_does_not_override_existing_status(self, admin_client, db):
|
|
supplier_id = await _insert_supplier(
|
|
db, name="Already Contacted", outreach_status="contacted"
|
|
)
|
|
await admin_client.post(
|
|
"/admin/outreach/add-prospects",
|
|
form={"supplier_ids": str(supplier_id), "csrf_token": _TEST_CSRF},
|
|
)
|
|
row = await core.fetch_one(
|
|
"SELECT outreach_status FROM suppliers WHERE id = ?", (supplier_id,)
|
|
)
|
|
# Should NOT have been changed to prospect
|
|
assert row["outreach_status"] == "contacted"
|
|
|
|
async def test_no_ids_redirects_with_error(self, admin_client):
|
|
resp = await admin_client.post(
|
|
"/admin/outreach/add-prospects",
|
|
form={"supplier_ids": "", "csrf_token": _TEST_CSRF},
|
|
)
|
|
assert resp.status_code == 302
|
|
|
|
|
|
# ── CSV Import ────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _make_csv_file(csv_bytes: bytes, filename: str = "test.csv") -> FileStorage:
|
|
"""Wrap CSV bytes in a FileStorage for Quart's files= parameter."""
|
|
return FileStorage(stream=io.BytesIO(csv_bytes), filename=filename, name="csv_file")
|
|
|
|
|
|
class TestCSVImport:
|
|
async def test_import_creates_prospects(self, admin_client, db):
|
|
csv_bytes = (
|
|
b"name,contact_email,country_code,category\n"
|
|
b"BuildCo,build@example.com,DE,construction\n"
|
|
b"CourtTech,court@example.com,ES,equipment\n"
|
|
)
|
|
resp = await admin_client.post(
|
|
"/admin/outreach/import",
|
|
form={"csrf_token": _TEST_CSRF},
|
|
files={"csv_file": _make_csv_file(csv_bytes, "prospects.csv")},
|
|
)
|
|
assert resp.status_code == 302
|
|
|
|
rows = await core.fetch_all(
|
|
"SELECT name, contact_email, outreach_status FROM suppliers WHERE outreach_status = 'prospect'"
|
|
)
|
|
by_name = {r["name"]: r for r in rows}
|
|
assert "BuildCo" in by_name
|
|
assert "CourtTech" in by_name
|
|
# contact_email must be persisted (regression: was silently dropped)
|
|
assert by_name["BuildCo"]["contact_email"] == "build@example.com"
|
|
assert by_name["CourtTech"]["contact_email"] == "court@example.com"
|
|
|
|
async def test_import_deduplicates_by_email(self, admin_client, db):
|
|
await _insert_supplier(db, name="Existing", contact_email="dupe@example.com", outreach_status=None)
|
|
|
|
csv_bytes = (
|
|
b"name,contact_email\n"
|
|
b"New Name,dupe@example.com\n"
|
|
)
|
|
await admin_client.post(
|
|
"/admin/outreach/import",
|
|
form={"csrf_token": _TEST_CSRF},
|
|
files={"csv_file": _make_csv_file(csv_bytes)},
|
|
)
|
|
rows = await core.fetch_all(
|
|
"SELECT * FROM suppliers WHERE contact_email = 'dupe@example.com'"
|
|
)
|
|
assert len(rows) == 1
|
|
|
|
async def test_import_skips_missing_required_fields(self, admin_client, db):
|
|
csv_bytes = (
|
|
b"name,contact_email\n"
|
|
b",missing@example.com\n"
|
|
b"No Email,\n"
|
|
)
|
|
await admin_client.post(
|
|
"/admin/outreach/import",
|
|
form={"csrf_token": _TEST_CSRF},
|
|
files={"csv_file": _make_csv_file(csv_bytes)},
|
|
)
|
|
rows = await core.fetch_all(
|
|
"SELECT * FROM suppliers WHERE outreach_status = 'prospect'"
|
|
)
|
|
assert len(rows) == 0
|
|
|
|
async def test_import_missing_required_columns_returns_error(self, admin_client):
|
|
csv_bytes = b"company_name,email_address\nFoo,foo@bar.com\n"
|
|
resp = await admin_client.post(
|
|
"/admin/outreach/import",
|
|
form={"csrf_token": _TEST_CSRF},
|
|
files={"csv_file": _make_csv_file(csv_bytes, "bad.csv")},
|
|
)
|
|
assert resp.status_code == 200 # renders form again with error
|
|
html = (await resp.data).decode()
|
|
assert "missing" in html.lower() or "required" in html.lower() or "contact_email" in html
|
|
|
|
async def test_import_no_file_returns_error(self, admin_client):
|
|
resp = await admin_client.post(
|
|
"/admin/outreach/import",
|
|
form={"csrf_token": _TEST_CSRF},
|
|
)
|
|
assert resp.status_code == 200
|
|
html = (await resp.data).decode()
|
|
assert "No file" in html or "file" in html.lower()
|
|
|
|
async def test_get_import_page(self, admin_client):
|
|
resp = await admin_client.get("/admin/outreach/import")
|
|
assert resp.status_code == 200
|
|
html = (await resp.data).decode()
|
|
assert "csv" in html.lower() or "CSV" in html
|
|
|
|
|
|
# ── Compose Pre-fill ──────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestComposePrefill:
|
|
async def test_compose_prefills_to_field(self, admin_client):
|
|
resp = await admin_client.get(
|
|
"/admin/emails/compose?to=supplier@example.com&from_key=outreach&email_type=outreach&supplier_id=42"
|
|
)
|
|
assert resp.status_code == 200
|
|
html = (await resp.data).decode()
|
|
assert "supplier@example.com" in html
|
|
|
|
async def test_compose_outreach_shows_banner(self, admin_client):
|
|
resp = await admin_client.get(
|
|
"/admin/emails/compose?email_type=outreach&supplier_id=42"
|
|
)
|
|
html = (await resp.data).decode()
|
|
assert "hello.padelnomics.io" in html
|
|
|
|
async def test_compose_outreach_wrap_unchecked_by_default(self, admin_client):
|
|
"""Outreach emails default to plain text (wrap=0)."""
|
|
resp = await admin_client.get(
|
|
"/admin/emails/compose?email_type=outreach"
|
|
)
|
|
html = (await resp.data).decode()
|
|
# The wrap checkbox should NOT be checked
|
|
assert 'name="wrap"' in html
|
|
# checked should not appear on the wrap input when email_type=outreach
|
|
import re
|
|
wrap_input = re.search(r'<input[^>]+name="wrap"[^>]*>', html)
|
|
assert wrap_input, "wrap input not found"
|
|
assert "checked" not in wrap_input.group(0)
|
|
|
|
async def test_compose_preselects_outreach_from_addr(self, admin_client):
|
|
"""from_key=outreach should pre-select the outreach from address."""
|
|
resp = await admin_client.get(
|
|
"/admin/emails/compose?from_key=outreach"
|
|
)
|
|
html = (await resp.data).decode()
|
|
assert "hello.padelnomics.io" in html
|
|
|
|
|
|
# ── Compose → Pipeline Update ─────────────────────────────────────────────────
|
|
|
|
|
|
class TestComposePipelineUpdate:
|
|
async def test_sends_outreach_email_and_updates_supplier(self, admin_client, db):
|
|
"""Sending an outreach email should advance prospect → contacted."""
|
|
supplier_id = await _insert_supplier(
|
|
db, name="Pipeline Test", contact_email="pt@example.com",
|
|
outreach_status="prospect", outreach_sequence_step=0,
|
|
)
|
|
with patch("padelnomics.admin.routes.send_email", new_callable=AsyncMock) as mock_send:
|
|
mock_send.return_value = "test-resend-id"
|
|
resp = await admin_client.post(
|
|
"/admin/emails/compose",
|
|
form={
|
|
"to": "pt@example.com",
|
|
"subject": "Hello from Padelnomics",
|
|
"body": "Would you like to join our platform?",
|
|
"from_addr": core.EMAIL_ADDRESSES["outreach"],
|
|
"email_type": "outreach",
|
|
"supplier_id": str(supplier_id),
|
|
"csrf_token": _TEST_CSRF,
|
|
},
|
|
)
|
|
assert resp.status_code == 302 # redirect on success
|
|
|
|
updated = await core.fetch_one(
|
|
"SELECT outreach_status, outreach_sequence_step, last_contacted_at FROM suppliers WHERE id = ?",
|
|
(supplier_id,),
|
|
)
|
|
assert updated["outreach_status"] == "contacted"
|
|
assert updated["outreach_sequence_step"] == 1
|
|
assert updated["last_contacted_at"] is not None
|
|
|
|
async def test_sends_outreach_increments_step_when_already_contacted(self, admin_client, db):
|
|
"""A follow-up email increments step without changing status."""
|
|
supplier_id = await _insert_supplier(
|
|
db, name="Follow-up Test", contact_email="fu@example.com",
|
|
outreach_status="contacted", outreach_sequence_step=1,
|
|
)
|
|
with patch("padelnomics.admin.routes.send_email", new_callable=AsyncMock) as mock_send:
|
|
mock_send.return_value = "test-resend-id-2"
|
|
await admin_client.post(
|
|
"/admin/emails/compose",
|
|
form={
|
|
"to": "fu@example.com",
|
|
"subject": "Following up",
|
|
"body": "Just checking in.",
|
|
"from_addr": core.EMAIL_ADDRESSES["outreach"],
|
|
"email_type": "outreach",
|
|
"supplier_id": str(supplier_id),
|
|
"csrf_token": _TEST_CSRF,
|
|
},
|
|
)
|
|
|
|
updated = await core.fetch_one(
|
|
"SELECT outreach_status, outreach_sequence_step FROM suppliers WHERE id = ?",
|
|
(supplier_id,),
|
|
)
|
|
assert updated["outreach_status"] == "contacted" # unchanged
|
|
assert updated["outreach_sequence_step"] == 2 # incremented
|
|
|
|
async def test_non_outreach_compose_does_not_update_supplier(self, admin_client, db):
|
|
"""admin_compose emails should NOT touch outreach pipeline state."""
|
|
supplier_id = await _insert_supplier(
|
|
db, name="No Update", contact_email="noupdate@example.com",
|
|
outreach_status="prospect", outreach_sequence_step=0,
|
|
)
|
|
with patch("padelnomics.admin.routes.send_email", new_callable=AsyncMock) as mock_send:
|
|
mock_send.return_value = "resend-id-adhoc"
|
|
await admin_client.post(
|
|
"/admin/emails/compose",
|
|
form={
|
|
"to": "noupdate@example.com",
|
|
"subject": "General message",
|
|
"body": "Hi there.",
|
|
"from_addr": core.EMAIL_ADDRESSES["transactional"],
|
|
"email_type": "admin_compose",
|
|
"csrf_token": _TEST_CSRF,
|
|
},
|
|
)
|
|
|
|
row = await core.fetch_one(
|
|
"SELECT outreach_status, outreach_sequence_step FROM suppliers WHERE id = ?",
|
|
(supplier_id,),
|
|
)
|
|
assert row["outreach_status"] == "prospect" # unchanged
|
|
assert row["outreach_sequence_step"] == 0 # unchanged
|
|
|
|
async def test_failed_send_does_not_update_supplier(self, admin_client, db):
|
|
"""Failed send (returns None) must NOT update the pipeline."""
|
|
supplier_id = await _insert_supplier(
|
|
db, name="Fail Send", contact_email="fail@example.com",
|
|
outreach_status="prospect", outreach_sequence_step=0,
|
|
)
|
|
with patch("padelnomics.admin.routes.send_email", new_callable=AsyncMock) as mock_send:
|
|
mock_send.return_value = None # simulate send failure
|
|
await admin_client.post(
|
|
"/admin/emails/compose",
|
|
form={
|
|
"to": "fail@example.com",
|
|
"subject": "Test",
|
|
"body": "Hello.",
|
|
"from_addr": core.EMAIL_ADDRESSES["outreach"],
|
|
"email_type": "outreach",
|
|
"supplier_id": str(supplier_id),
|
|
"csrf_token": _TEST_CSRF,
|
|
},
|
|
)
|
|
|
|
row = await core.fetch_one(
|
|
"SELECT outreach_status, outreach_sequence_step FROM suppliers WHERE id = ?",
|
|
(supplier_id,),
|
|
)
|
|
assert row["outreach_status"] == "prospect" # unchanged
|
|
assert row["outreach_sequence_step"] == 0 # unchanged
|
|
|
|
|
|
# ── Follow-up Scheduling ─────────────────────────────────────────────────────
|
|
|
|
|
|
class TestFollowUpRoute:
|
|
async def test_follow_up_set(self, admin_client, db):
|
|
"""POST a date → follow_up_at saved, updated row returned."""
|
|
supplier_id = await _insert_supplier(db, outreach_status="prospect")
|
|
resp = await admin_client.post(
|
|
f"/admin/outreach/{supplier_id}/follow-up",
|
|
form={"follow_up_at": "2026-03-01", "csrf_token": _TEST_CSRF},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
row = await core.fetch_one(
|
|
"SELECT follow_up_at FROM suppliers WHERE id = ?", (supplier_id,)
|
|
)
|
|
assert row["follow_up_at"] == "2026-03-01"
|
|
|
|
async def test_follow_up_set_returns_row_html(self, admin_client, db):
|
|
"""Response should contain the supplier row (for HTMX outerHTML swap)."""
|
|
supplier_id = await _insert_supplier(db, outreach_status="contacted")
|
|
resp = await admin_client.post(
|
|
f"/admin/outreach/{supplier_id}/follow-up",
|
|
form={"follow_up_at": "2026-04-15", "csrf_token": _TEST_CSRF},
|
|
)
|
|
html = (await resp.data).decode()
|
|
assert str(supplier_id) in html
|
|
|
|
async def test_follow_up_clear(self, admin_client, db):
|
|
"""POST empty date → follow_up_at set to NULL."""
|
|
supplier_id = await _insert_supplier(
|
|
db, outreach_status="contacted", follow_up_at="2026-03-01"
|
|
)
|
|
await admin_client.post(
|
|
f"/admin/outreach/{supplier_id}/follow-up",
|
|
form={"follow_up_at": "", "csrf_token": _TEST_CSRF},
|
|
)
|
|
row = await core.fetch_one(
|
|
"SELECT follow_up_at FROM suppliers WHERE id = ?", (supplier_id,)
|
|
)
|
|
assert row["follow_up_at"] is None
|
|
|
|
async def test_follow_up_404_for_non_pipeline_supplier(self, admin_client, db):
|
|
"""Supplier not in outreach pipeline → 404."""
|
|
supplier_id = await _insert_supplier(db, outreach_status=None)
|
|
resp = await admin_client.post(
|
|
f"/admin/outreach/{supplier_id}/follow-up",
|
|
form={"follow_up_at": "2026-03-01", "csrf_token": _TEST_CSRF},
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
class TestFollowUpDueCount:
|
|
async def test_counts_only_due_and_overdue(self, db):
|
|
"""Only suppliers with follow_up_at <= today are counted."""
|
|
# Past (overdue)
|
|
await _insert_supplier(db, name="Past", outreach_status="contacted",
|
|
follow_up_at="2020-01-01")
|
|
# Future (not yet due)
|
|
await _insert_supplier(db, name="Future", outreach_status="contacted",
|
|
follow_up_at="2099-12-31")
|
|
# No follow-up scheduled
|
|
await _insert_supplier(db, name="None", outreach_status="prospect",
|
|
follow_up_at=None)
|
|
count = await get_follow_up_due_count()
|
|
assert count == 1
|
|
|
|
async def test_zero_when_no_follow_ups(self, db):
|
|
await _insert_supplier(db, outreach_status="prospect")
|
|
count = await get_follow_up_due_count()
|
|
assert count == 0
|
|
|
|
async def test_follow_up_due_filter(self, db):
|
|
"""?follow_up=due filter returns only overdue suppliers."""
|
|
await _insert_supplier(db, name="Overdue", outreach_status="contacted",
|
|
follow_up_at="2020-06-01")
|
|
await _insert_supplier(db, name="NotYet", outreach_status="contacted",
|
|
follow_up_at="2099-01-01")
|
|
|
|
due = await get_outreach_suppliers(follow_up="due")
|
|
names = [s["name"] for s in due]
|
|
assert "Overdue" in names
|
|
assert "NotYet" not in names
|
|
|
|
async def test_follow_up_set_filter(self, db):
|
|
"""?follow_up=set returns suppliers with any follow-up date."""
|
|
await _insert_supplier(db, name="HasDate", outreach_status="contacted",
|
|
follow_up_at="2099-01-01")
|
|
await _insert_supplier(db, name="NoDate", outreach_status="contacted",
|
|
follow_up_at=None)
|
|
|
|
with_date = await get_outreach_suppliers(follow_up="set")
|
|
names = [s["name"] for s in with_date]
|
|
assert "HasDate" in names
|
|
assert "NoDate" not in names
|
|
|
|
async def test_outreach_dashboard_shows_follow_up_banner(self, admin_client, db):
|
|
"""Banner visible when there's at least one due follow-up."""
|
|
await _insert_supplier(db, name="DueNow", outreach_status="prospect",
|
|
follow_up_at="2020-01-01")
|
|
resp = await admin_client.get("/admin/outreach")
|
|
html = (await resp.data).decode()
|
|
assert "follow-up" in html.lower() or "Follow-up" in html
|
|
|
|
async def test_outreach_results_follow_up_filter(self, admin_client, db):
|
|
"""follow_up=due querystring filters results partial."""
|
|
await _insert_supplier(db, name="DueSupplier", outreach_status="contacted",
|
|
follow_up_at="2020-01-01")
|
|
await _insert_supplier(db, name="FutureSupplier", outreach_status="contacted",
|
|
follow_up_at="2099-01-01")
|
|
resp = await admin_client.get("/admin/outreach/results?follow_up=due")
|
|
html = (await resp.data).decode()
|
|
assert "DueSupplier" in html
|
|
assert "FutureSupplier" not in html
|
|
|
|
|
|
# ── Activity Timeline ─────────────────────────────────────────────────────────
|
|
|
|
|
|
async def _insert_email_log(db, to_addr: str, subject: str, email_type: str = "outreach") -> int:
|
|
"""Insert a sent email entry into email_log."""
|
|
now = datetime.now(UTC).isoformat()
|
|
async with db.execute(
|
|
"""INSERT INTO email_log (from_addr, to_addr, subject, email_type, created_at)
|
|
VALUES (?, ?, ?, ?, ?)""",
|
|
("hello@hello.padelnomics.io", to_addr, subject, email_type, now),
|
|
) as cursor:
|
|
row_id = cursor.lastrowid
|
|
await db.commit()
|
|
return row_id
|
|
|
|
|
|
async def _insert_inbound_email(db, from_addr: str, subject: str) -> int:
|
|
"""Insert a received email entry into inbound_emails."""
|
|
now = datetime.now(UTC).isoformat()
|
|
async with db.execute(
|
|
"""INSERT INTO inbound_emails
|
|
(resend_id, from_addr, to_addr, subject, received_at, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)""",
|
|
(f"test-{now}", from_addr, "inbox@hello.padelnomics.io", subject, now, now),
|
|
) as cursor:
|
|
row_id = cursor.lastrowid
|
|
await db.commit()
|
|
return row_id
|
|
|
|
|
|
class TestActivityTimeline:
|
|
async def test_timeline_shows_sent_emails(self, admin_client, db):
|
|
"""Sent outreach emails visible in supplier detail timeline."""
|
|
supplier_id = await _insert_supplier(
|
|
db, name="Timeline Test", contact_email="timeline@example.com",
|
|
outreach_status="contacted",
|
|
)
|
|
await _insert_email_log(db, "timeline@example.com", "Introduction email")
|
|
|
|
resp = await admin_client.get(f"/admin/suppliers/{supplier_id}")
|
|
assert resp.status_code == 200
|
|
html = (await resp.data).decode()
|
|
assert "Introduction email" in html
|
|
|
|
async def test_timeline_shows_received_emails(self, admin_client, db):
|
|
"""Received emails visible in supplier detail timeline."""
|
|
supplier_id = await _insert_supplier(
|
|
db, name="Inbound Test", contact_email="inbound@example.com",
|
|
outreach_status="replied",
|
|
)
|
|
await _insert_inbound_email(db, "inbound@example.com", "Re: Partnership")
|
|
|
|
resp = await admin_client.get(f"/admin/suppliers/{supplier_id}")
|
|
assert resp.status_code == 200
|
|
html = (await resp.data).decode()
|
|
assert "Re: Partnership" in html
|
|
|
|
async def test_timeline_empty_state(self, admin_client, db):
|
|
"""Supplier with no emails shows empty state message."""
|
|
supplier_id = await _insert_supplier(
|
|
db, name="No Emails", contact_email="noemail@example.com",
|
|
)
|
|
resp = await admin_client.get(f"/admin/suppliers/{supplier_id}")
|
|
assert resp.status_code == 200
|
|
html = (await resp.data).decode()
|
|
assert "No email history" in html
|
|
|
|
async def test_timeline_excludes_non_outreach_sent_emails(self, admin_client, db):
|
|
"""Transactional sent emails (magic links etc.) not shown in timeline."""
|
|
supplier_id = await _insert_supplier(
|
|
db, name="Transact Test", contact_email="trans@example.com",
|
|
)
|
|
await _insert_email_log(
|
|
db, "trans@example.com", "Magic link", email_type="magic_link"
|
|
)
|
|
resp = await admin_client.get(f"/admin/suppliers/{supplier_id}")
|
|
html = (await resp.data).decode()
|
|
assert "Magic link" not in html
|
|
|
|
async def test_timeline_not_shown_when_no_contact_email(self, admin_client, db):
|
|
"""Timeline section should still load (empty) when contact_email is NULL."""
|
|
now = datetime.now(UTC).isoformat()
|
|
async with db.execute(
|
|
"""INSERT INTO suppliers (name, slug, tier, country_code, region, category, created_at)
|
|
VALUES ('No Contact', 'no-contact', 'free', 'XX', 'Europe', 'construction', ?)""",
|
|
(now,),
|
|
) as cursor:
|
|
supplier_id = cursor.lastrowid
|
|
await db.commit()
|
|
|
|
resp = await admin_client.get(f"/admin/suppliers/{supplier_id}")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
|
|
|
|
def _csv_writer(buf, fieldnames):
|
|
return _csv_module.DictWriter(buf, fieldnames=fieldnames)
|