Files
padelnomics/web/src/padelnomics/admin/routes.py
Deeman ad1da5c335 feat: outreach follow-up scheduling, activity timeline, and pSEO noindex (migration 0025)
Feature A — Outreach follow-up + activity timeline:
- follow_up_at column on suppliers (migration 0025)
- HTMX date picker on outreach rows, POST /admin/outreach/<id>/follow-up
- Amber due-today banner on /admin/outreach with ?follow_up=due filter
- get_follow_up_due_count() for dashboard widget
- Activity timeline on /admin/suppliers/<id>: merges sent + received emails by contact_email

Feature B — pSEO article noindex:
- noindex column on articles (migration 0025)
- NOINDEX_THRESHOLDS per-template lambdas in content/__init__.py
- generate_articles() evaluates threshold and stores noindex=1 for thin-data articles
- <meta name="robots" content="noindex, follow"> in article_detail.html
- Sitemap excludes noindex articles (AND noindex = 0)
- pSEO dashboard noindex count card + article row badge

Tests: 49 new tests (29 outreach, 20 noindex), 1377 total, 0 failures

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-25 17:51:38 +01:00

2985 lines
104 KiB
Python

"""
Admin domain: role-based admin panel for managing users, tasks, etc.
"""
import csv
import io
import json
import logging
from datetime import date, timedelta
from pathlib import Path
import mistune
import resend
from quart import (
Blueprint,
Response,
flash,
jsonify,
redirect,
render_template,
request,
session,
url_for,
)
from ..auth.routes import role_required
from ..core import (
EMAIL_ADDRESSES,
config,
csrf_protect,
execute,
fetch_all,
fetch_one,
send_email,
slugify,
utcnow,
utcnow_iso,
)
from ..email_templates import EMAIL_TEMPLATE_REGISTRY, render_email_template
logger = logging.getLogger(__name__)
def _build_article_seo_head(
url_path: str,
title: str,
meta_desc: str,
language: str,
published_at: str,
*,
base_url: str = "https://padelnomics.io",
) -> str:
"""Build SEO head block (canonical, OG, JSON-LD) for a manually created article."""
def _esc(text: str) -> str:
return text.replace("&", "&amp;").replace('"', "&quot;").replace("<", "&lt;")
full_url = f"{base_url}/{language}{url_path}"
jsonld = json.dumps({
"@context": "https://schema.org",
"@type": "Article",
"headline": title[:110],
"description": meta_desc[:200],
"url": full_url,
"inLanguage": language,
"datePublished": published_at,
"dateModified": published_at,
"author": {"@type": "Organization", "name": "Padelnomics", "url": "https://padelnomics.io"},
"publisher": {"@type": "Organization", "name": "Padelnomics", "url": "https://padelnomics.io"},
}, ensure_ascii=False)
return "\n".join([
f'<link rel="canonical" href="{full_url}" />',
f'<meta property="og:title" content="{_esc(title)}" />',
f'<meta property="og:description" content="{_esc(meta_desc)}" />',
f'<meta property="og:url" content="{full_url}" />',
'<meta property="og:type" content="article" />',
f'<script type="application/ld+json">{jsonld}</script>',
])
# Blueprint with its own template folder
bp = Blueprint(
"admin",
__name__,
template_folder=str(Path(__file__).parent / "templates"),
url_prefix="/admin",
)
@bp.before_request
async def _inject_admin_sidebar_data():
"""Load unread inbox count for sidebar badge on every admin page."""
from quart import g
try:
row = await fetch_one("SELECT COUNT(*) as cnt FROM inbound_emails WHERE is_read = 0")
g.admin_unread_count = row["cnt"] if row else 0
except Exception:
logger.exception("Failed to load admin sidebar unread count")
g.admin_unread_count = 0
@bp.context_processor
def _admin_context():
"""Expose admin-specific variables to all admin templates."""
from quart import g
return {"unread_count": getattr(g, "admin_unread_count", 0)}
# =============================================================================
# SQL Queries
# =============================================================================
async def get_dashboard_stats() -> dict:
"""Get admin dashboard statistics."""
now = utcnow()
today = now.date().isoformat()
week_ago = (now - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
users_total = await fetch_one("SELECT COUNT(*) as count FROM users WHERE deleted_at IS NULL")
users_today = await fetch_one(
"SELECT COUNT(*) as count FROM users WHERE created_at >= ? AND deleted_at IS NULL",
(today,)
)
users_week = await fetch_one(
"SELECT COUNT(*) as count FROM users WHERE created_at >= ? AND deleted_at IS NULL",
(week_ago,)
)
subs = await fetch_one(
"SELECT COUNT(*) as count FROM subscriptions WHERE status = 'active'"
)
tasks_pending = await fetch_one("SELECT COUNT(*) as count FROM tasks WHERE status = 'pending'")
tasks_failed = await fetch_one("SELECT COUNT(*) as count FROM tasks WHERE status = 'failed'")
# Lead funnel stats
leads_total = await fetch_one(
"SELECT COUNT(*) as count FROM lead_requests WHERE lead_type = 'quote'"
)
leads_new = await fetch_one(
"SELECT COUNT(*) as count FROM lead_requests WHERE status = 'new' AND lead_type = 'quote'"
)
leads_verified = await fetch_one(
"SELECT COUNT(*) as count FROM lead_requests WHERE verified_at IS NOT NULL AND lead_type = 'quote'"
)
leads_unlocked = await fetch_one(
"SELECT COUNT(*) as count FROM lead_requests WHERE unlock_count > 0 AND lead_type = 'quote'"
)
# Planner users
planner_users = await fetch_one(
"SELECT COUNT(DISTINCT user_id) as count FROM scenarios WHERE deleted_at IS NULL"
)
# Supplier stats
suppliers_claimed = await fetch_one(
"SELECT COUNT(*) as count FROM suppliers WHERE claimed_by IS NOT NULL"
)
suppliers_growth = await fetch_one(
"SELECT COUNT(*) as count FROM suppliers WHERE tier = 'growth'"
)
suppliers_pro = await fetch_one(
"SELECT COUNT(*) as count FROM suppliers WHERE tier = 'pro'"
)
total_credits_spent = await fetch_one(
"SELECT COALESCE(SUM(ABS(delta)), 0) as total FROM credit_ledger WHERE delta < 0"
)
leads_unlocked_by_suppliers = await fetch_one(
"SELECT COUNT(*) as count FROM lead_forwards"
)
return {
"users_total": users_total["count"] if users_total else 0,
"users_today": users_today["count"] if users_today else 0,
"users_week": users_week["count"] if users_week else 0,
"active_subscriptions": subs["count"] if subs else 0,
"tasks_pending": tasks_pending["count"] if tasks_pending else 0,
"tasks_failed": tasks_failed["count"] if tasks_failed else 0,
"leads_total": leads_total["count"] if leads_total else 0,
"leads_new": leads_new["count"] if leads_new else 0,
"leads_verified": leads_verified["count"] if leads_verified else 0,
"leads_unlocked": leads_unlocked["count"] if leads_unlocked else 0,
"planner_users": planner_users["count"] if planner_users else 0,
"suppliers_claimed": suppliers_claimed["count"] if suppliers_claimed else 0,
"suppliers_growth": suppliers_growth["count"] if suppliers_growth else 0,
"suppliers_pro": suppliers_pro["count"] if suppliers_pro else 0,
"total_credits_spent": total_credits_spent["total"] if total_credits_spent else 0,
"leads_unlocked_by_suppliers": leads_unlocked_by_suppliers["count"] if leads_unlocked_by_suppliers else 0,
}
async def get_users(limit: int = 50, offset: int = 0, search: str = None) -> list[dict]:
"""Get users with optional search."""
if search:
return await fetch_all(
"""
SELECT u.*, s.plan, s.status as sub_status
FROM users u
LEFT JOIN subscriptions s ON s.user_id = u.id AND s.status = 'active'
WHERE u.deleted_at IS NULL AND u.email LIKE ?
ORDER BY u.created_at DESC
LIMIT ? OFFSET ?
""",
(f"%{search}%", limit, offset)
)
return await fetch_all(
"""
SELECT u.*, s.plan, s.status as sub_status
FROM users u
LEFT JOIN subscriptions s ON s.user_id = u.id AND s.status = 'active'
WHERE u.deleted_at IS NULL
ORDER BY u.created_at DESC
LIMIT ? OFFSET ?
""",
(limit, offset)
)
async def get_user_by_id(user_id: int) -> dict | None:
"""Get user by ID with subscription and billing info."""
return await fetch_one(
"""
SELECT u.*, s.plan, s.status as sub_status, bc.provider_customer_id
FROM users u
LEFT JOIN subscriptions s ON s.id = (
SELECT id FROM subscriptions WHERE user_id = u.id
ORDER BY created_at DESC LIMIT 1
)
LEFT JOIN billing_customers bc ON bc.user_id = u.id
WHERE u.id = ?
""",
(user_id,)
)
async def get_recent_tasks(limit: int = 50) -> list[dict]:
"""Get recent tasks."""
return await fetch_all(
"""
SELECT * FROM tasks
ORDER BY created_at DESC
LIMIT ?
""",
(limit,)
)
async def get_failed_tasks() -> list[dict]:
"""Get failed tasks."""
return await fetch_all(
"SELECT * FROM tasks WHERE status = 'failed' ORDER BY created_at DESC"
)
async def retry_task(task_id: int) -> bool:
"""Retry a failed task."""
result = await execute(
"""
UPDATE tasks
SET status = 'pending', run_at = ?, error = NULL
WHERE id = ? AND status = 'failed'
""",
(utcnow_iso(), task_id)
)
return result > 0
async def delete_task(task_id: int) -> bool:
"""Delete a task."""
result = await execute("DELETE FROM tasks WHERE id = ?", (task_id,))
return result > 0
# =============================================================================
# Routes
# =============================================================================
@bp.route("/")
@role_required("admin")
async def index():
"""Admin dashboard."""
stats = await get_dashboard_stats()
recent_users = await get_users(limit=10)
failed_tasks = await get_failed_tasks()
return await render_template(
"admin/index.html",
stats=stats,
recent_users=recent_users,
failed_tasks=failed_tasks,
)
@bp.route("/users")
@role_required("admin")
async def users():
"""User list."""
search = request.args.get("search", "").strip()
page = int(request.args.get("page", 1))
per_page = 50
offset = (page - 1) * per_page
user_list = await get_users(limit=per_page, offset=offset, search=search or None)
return await render_template(
"admin/users.html",
users=user_list,
search=search,
page=page,
)
@bp.route("/users/results")
@role_required("admin")
async def user_results():
"""HTMX partial for user list (live search)."""
search = request.args.get("search", "").strip()
page = int(request.args.get("page", 1))
per_page = 50
offset = (page - 1) * per_page
user_list = await get_users(limit=per_page, offset=offset, search=search or None)
return await render_template(
"admin/partials/user_results.html",
users=user_list,
search=search,
page=page,
)
@bp.route("/users/<int:user_id>")
@role_required("admin")
async def user_detail(user_id: int):
"""User detail page."""
user = await get_user_by_id(user_id)
if not user:
await flash("User not found.", "error")
return redirect(url_for("admin.users"))
return await render_template("admin/user_detail.html", user=user)
@bp.route("/users/<int:user_id>/impersonate", methods=["POST"])
@role_required("admin")
@csrf_protect
async def impersonate(user_id: int):
"""Impersonate a user (login as them)."""
user = await get_user_by_id(user_id)
if not user:
await flash("User not found.", "error")
return redirect(url_for("admin.users"))
# Store admin session so we can return
session["admin_impersonating"] = True
session["user_id"] = user_id
# Redirect to supplier dashboard if user owns a supplier, otherwise generic
supplier = await fetch_one(
"SELECT id FROM suppliers WHERE claimed_by = ? LIMIT 1", (user_id,)
)
await flash(f"Now impersonating {user['email']}. Return to admin to stop.", "warning")
if supplier:
return redirect(url_for("suppliers.dashboard"))
return redirect(url_for("dashboard.index"))
@bp.route("/stop-impersonating", methods=["POST"])
@csrf_protect
async def stop_impersonating():
"""Stop impersonating and return to admin."""
session.pop("user_id", None)
session.pop("admin_impersonating", None)
await flash("Stopped impersonating.", "info")
return redirect(url_for("admin.index"))
@bp.route("/tasks")
@role_required("admin")
async def tasks():
"""Task queue management."""
task_list = await get_recent_tasks(limit=100)
failed = await get_failed_tasks()
return await render_template(
"admin/tasks.html",
tasks=task_list,
failed_tasks=failed,
)
@bp.route("/tasks/<int:task_id>/retry", methods=["POST"])
@role_required("admin")
@csrf_protect
async def task_retry(task_id: int):
"""Retry a failed task."""
success = await retry_task(task_id)
if success:
await flash("Task queued for retry.", "success")
else:
await flash("Could not retry task.", "error")
return redirect(url_for("admin.tasks"))
@bp.route("/tasks/<int:task_id>/delete", methods=["POST"])
@role_required("admin")
@csrf_protect
async def task_delete(task_id: int):
"""Delete a task."""
success = await delete_task(task_id)
if success:
await flash("Task deleted.", "success")
else:
await flash("Could not delete task.", "error")
return redirect(url_for("admin.tasks"))
# =============================================================================
# Lead Management
# =============================================================================
LEAD_STATUSES = ["new", "pending_verification", "contacted", "forwarded", "closed_won", "closed_lost"]
HEAT_OPTIONS = ["hot", "warm", "cool"]
async def get_leads(
status: str = None, heat: str = None, country: str = None,
search: str = None, days: int = None,
page: int = 1, per_page: int = 50,
) -> tuple[list[dict], int]:
"""Get leads with optional filters. Returns (leads, total_count)."""
wheres = ["lead_type = 'quote'"]
params: list = []
if status:
wheres.append("status = ?")
params.append(status)
if heat:
wheres.append("heat_score = ?")
params.append(heat)
if country:
wheres.append("country = ?")
params.append(country)
if search:
term = f"%{search}%"
wheres.append("(contact_name LIKE ? OR contact_email LIKE ? OR contact_company LIKE ?)")
params.extend([term, term, term])
if days:
wheres.append("created_at >= datetime('now', ?)")
params.append(f"-{days} days")
where = " AND ".join(wheres)
count_row = await fetch_one(
f"SELECT COUNT(*) as cnt FROM lead_requests WHERE {where}", tuple(params)
)
total = count_row["cnt"] if count_row else 0
offset = (page - 1) * per_page
rows = await fetch_all(
f"""SELECT * FROM lead_requests WHERE {where}
ORDER BY created_at DESC LIMIT ? OFFSET ?""",
tuple(params) + (per_page, offset),
)
return rows, total
async def get_lead_detail(lead_id: int) -> dict | None:
"""Get lead with forward history."""
lead = await fetch_one("SELECT * FROM lead_requests WHERE id = ?", (lead_id,))
if not lead:
return None
lead = dict(lead)
lead["forwards"] = await fetch_all(
"""SELECT lf.*, s.name as supplier_name, s.slug as supplier_slug
FROM lead_forwards lf
JOIN suppliers s ON s.id = lf.supplier_id
WHERE lf.lead_id = ?
ORDER BY lf.created_at DESC""",
(lead_id,),
)
return lead
async def get_lead_stats() -> dict:
"""Get lead conversion funnel counts + summary card metrics."""
rows = await fetch_all(
"SELECT status, COUNT(*) as cnt FROM lead_requests WHERE lead_type = 'quote' GROUP BY status"
)
by_status = {r["status"]: r["cnt"] for r in rows}
# Summary card aggregates
agg = await fetch_one(
"""SELECT
COUNT(*) as total,
SUM(CASE WHEN status IN ('new', 'pending_verification') THEN 1 ELSE 0 END) as new_unverified,
SUM(CASE WHEN heat_score = 'hot' AND status = 'new' THEN credit_cost ELSE 0 END) as hot_pipeline,
SUM(CASE WHEN status = 'forwarded' THEN 1 ELSE 0 END) as forwarded
FROM lead_requests WHERE lead_type = 'quote'"""
)
total = agg["total"] or 0
forwarded = agg["forwarded"] or 0
forward_rate = round((forwarded / total) * 100) if total > 0 else 0
return {
**by_status,
"_total": total,
"_new_unverified": agg["new_unverified"] or 0,
"_hot_pipeline": agg["hot_pipeline"] or 0,
"_forward_rate": forward_rate,
}
@bp.route("/leads")
@role_required("admin")
async def leads():
"""Lead management list."""
status = request.args.get("status", "")
heat = request.args.get("heat", "")
country = request.args.get("country", "")
search = request.args.get("search", "")
days_str = request.args.get("days", "")
days = int(days_str) if days_str.isdigit() else None
page = max(1, int(request.args.get("page", "1") or "1"))
per_page = 50
lead_list, total = await get_leads(
status=status or None, heat=heat or None, country=country or None,
search=search or None, days=days, page=page, per_page=per_page,
)
lead_stats = await get_lead_stats()
countries = await fetch_all(
"SELECT DISTINCT country FROM lead_requests WHERE country IS NOT NULL AND country != '' ORDER BY country"
)
return await render_template(
"admin/leads.html",
leads=lead_list,
lead_stats=lead_stats,
statuses=LEAD_STATUSES,
heat_options=HEAT_OPTIONS,
countries=[c["country"] for c in countries],
current_status=status,
current_heat=heat,
current_country=country,
current_search=search,
current_days=days_str,
page=page,
per_page=per_page,
total=total,
)
@bp.route("/leads/results")
@role_required("admin")
async def lead_results():
"""HTMX partial for filtered lead results."""
status = request.args.get("status", "")
heat = request.args.get("heat", "")
country = request.args.get("country", "")
search = request.args.get("search", "")
days_str = request.args.get("days", "")
days = int(days_str) if days_str.isdigit() else None
page = max(1, int(request.args.get("page", "1") or "1"))
per_page = 50
lead_list, total = await get_leads(
status=status or None, heat=heat or None, country=country or None,
search=search or None, days=days, page=page, per_page=per_page,
)
return await render_template(
"admin/partials/lead_results.html",
leads=lead_list,
page=page,
per_page=per_page,
total=total,
current_status=status,
current_heat=heat,
current_country=country,
current_search=search,
current_days=days_str,
)
@bp.route("/leads/<int:lead_id>")
@role_required("admin")
async def lead_detail(lead_id: int):
"""Lead detail page."""
lead = await get_lead_detail(lead_id)
if not lead:
await flash("Lead not found.", "error")
return redirect(url_for("admin.leads"))
suppliers = await fetch_all(
"SELECT id, name, slug, country_code, category FROM suppliers ORDER BY name"
)
return await render_template(
"admin/lead_detail.html",
lead=lead,
statuses=LEAD_STATUSES,
suppliers=suppliers,
)
@bp.route("/leads/<int:lead_id>/status", methods=["POST"])
@role_required("admin")
@csrf_protect
async def lead_status(lead_id: int):
"""Update lead status."""
form = await request.form
new_status = form.get("status", "")
if new_status not in LEAD_STATUSES:
await flash("Invalid status.", "error")
return redirect(url_for("admin.lead_detail", lead_id=lead_id))
await execute(
"UPDATE lead_requests SET status = ? WHERE id = ?", (new_status, lead_id)
)
await flash(f"Lead status updated to {new_status}.", "success")
return redirect(url_for("admin.lead_detail", lead_id=lead_id))
@bp.route("/leads/new", methods=["GET", "POST"])
@role_required("admin")
@csrf_protect
async def lead_new():
"""Create a new lead from admin."""
if request.method == "POST":
form = await request.form
contact_name = form.get("contact_name", "").strip()
contact_email = form.get("contact_email", "").strip()
facility_type = form.get("facility_type", "indoor")
build_context = form.get("build_context", "")
glass_type = form.get("glass_type", "")
lighting_type = form.get("lighting_type", "")
court_count = int(form.get("court_count", 6) or 6)
country = form.get("country", "")
city = form.get("city", "").strip()
location_status = form.get("location_status", "")
timeline = form.get("timeline", "")
budget_estimate = int(form.get("budget_estimate", 0) or 0)
financing_status = form.get("financing_status", "")
services_needed = form.get("services_needed", "").strip()
additional_info = form.get("additional_info", "").strip()
stakeholder_type = form.get("stakeholder_type", "")
heat_score = form.get("heat_score", "warm")
status = form.get("status", "new")
if not contact_name or not contact_email:
await flash("Name and email are required.", "error")
return await render_template(
"admin/lead_form.html", data=dict(form), statuses=LEAD_STATUSES,
)
from ..credits import HEAT_CREDIT_COSTS
credit_cost = HEAT_CREDIT_COSTS.get(heat_score, 8)
now = utcnow_iso()
verified_at = now if status != "pending_verification" else None
lead_id = await execute(
"""INSERT INTO lead_requests
(lead_type, facility_type, build_context, glass_type, lighting_type,
court_count, country, location, location_status, timeline,
budget_estimate, financing_status, services_needed, additional_info,
stakeholder_type, heat_score, status,
contact_name, contact_email, contact_phone, contact_company,
credit_cost, verified_at, created_at)
VALUES ('quote', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
facility_type, build_context, glass_type, lighting_type,
court_count, country, city, location_status, timeline,
budget_estimate, financing_status, services_needed, additional_info,
stakeholder_type, heat_score, status,
contact_name, contact_email,
form.get("contact_phone", ""), form.get("contact_company", ""),
credit_cost, verified_at, now,
),
)
await flash(f"Lead #{lead_id} created.", "success")
return redirect(url_for("admin.lead_detail", lead_id=lead_id))
return await render_template("admin/lead_form.html", data={}, statuses=LEAD_STATUSES)
@bp.route("/leads/<int:lead_id>/forward", methods=["POST"])
@role_required("admin")
@csrf_protect
async def lead_forward(lead_id: int):
"""Manually forward a lead to a supplier (no credit cost)."""
form = await request.form
supplier_id = int(form.get("supplier_id", 0))
if not supplier_id:
await flash("Select a supplier.", "error")
return redirect(url_for("admin.lead_detail", lead_id=lead_id))
# Check if already forwarded
existing = await fetch_one(
"SELECT 1 FROM lead_forwards WHERE lead_id = ? AND supplier_id = ?",
(lead_id, supplier_id),
)
if existing:
await flash("Already forwarded to this supplier.", "warning")
return redirect(url_for("admin.lead_detail", lead_id=lead_id))
now = utcnow_iso()
await execute(
"""INSERT INTO lead_forwards (lead_id, supplier_id, credit_cost, status, created_at)
VALUES (?, ?, 0, 'sent', ?)""",
(lead_id, supplier_id, now),
)
await execute(
"UPDATE lead_requests SET unlock_count = unlock_count + 1, status = 'forwarded' WHERE id = ?",
(lead_id,),
)
# Enqueue forward email
from ..worker import enqueue
await enqueue("send_lead_forward_email", {
"lead_id": lead_id,
"supplier_id": supplier_id,
})
await flash("Lead forwarded to supplier.", "success")
return redirect(url_for("admin.lead_detail", lead_id=lead_id))
@bp.route("/leads/<int:lead_id>/status-htmx", methods=["POST"])
@role_required("admin")
@csrf_protect
async def lead_status_htmx(lead_id: int):
"""HTMX: Update lead status, return updated status badge partial."""
form = await request.form
new_status = form.get("status", "")
if new_status not in LEAD_STATUSES:
return Response("Invalid status", status=422)
await execute(
"UPDATE lead_requests SET status = ? WHERE id = ?", (new_status, lead_id)
)
return await render_template(
"admin/partials/lead_status_badge.html", status=new_status, lead_id=lead_id,
)
@bp.route("/leads/<int:lead_id>/forward-htmx", methods=["POST"])
@role_required("admin")
@csrf_protect
async def lead_forward_htmx(lead_id: int):
"""HTMX: Forward lead to supplier, return updated forward history partial."""
form = await request.form
supplier_id_str = form.get("supplier_id", "")
if not supplier_id_str.isdigit():
return Response("Select a supplier.", status=422)
supplier_id = int(supplier_id_str)
existing = await fetch_one(
"SELECT 1 FROM lead_forwards WHERE lead_id = ? AND supplier_id = ?",
(lead_id, supplier_id),
)
if existing:
return Response("Already forwarded to this supplier.", status=422)
now = utcnow_iso()
await execute(
"""INSERT INTO lead_forwards (lead_id, supplier_id, credit_cost, status, created_at)
VALUES (?, ?, 0, 'sent', ?)""",
(lead_id, supplier_id, now),
)
await execute(
"UPDATE lead_requests SET unlock_count = unlock_count + 1, status = 'forwarded' WHERE id = ?",
(lead_id,),
)
from ..worker import enqueue
await enqueue("send_lead_forward_email", {"lead_id": lead_id, "supplier_id": supplier_id})
lead = await get_lead_detail(lead_id)
return await render_template(
"admin/partials/lead_forward_history.html",
forwards=lead["forwards"] if lead else [],
)
@bp.route("/marketplace")
@role_required("admin")
async def marketplace_dashboard():
"""Marketplace health dashboard."""
# Lead funnel
funnel = await fetch_one(
"""SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'new' AND verified_at IS NOT NULL THEN 1 ELSE 0 END) as verified_new,
SUM(CASE WHEN status = 'forwarded' THEN 1 ELSE 0 END) as forwarded_count,
SUM(CASE WHEN status = 'closed_won' THEN 1 ELSE 0 END) as won_count
FROM lead_requests WHERE lead_type = 'quote'"""
)
total = funnel["total"] or 0
won = funnel["won_count"] or 0
conversion_rate = round((won / total) * 100, 1) if total > 0 else 0
unlocked_count = (await fetch_one(
"SELECT COUNT(DISTINCT lead_id) as cnt FROM lead_forwards"
) or {}).get("cnt", 0)
# Credit economy
credit_agg = await fetch_one(
"""SELECT
SUM(CASE WHEN delta > 0 THEN delta ELSE 0 END) as total_issued,
SUM(CASE WHEN event_type = 'lead_unlock' THEN ABS(delta) ELSE 0 END) as total_consumed,
SUM(CASE WHEN event_type = 'lead_unlock'
AND created_at >= datetime('now', '-30 days')
THEN ABS(delta) ELSE 0 END) as monthly_burn
FROM credit_ledger"""
)
outstanding = (await fetch_one(
"SELECT SUM(credit_balance) as bal FROM suppliers WHERE tier != 'free'"
) or {}).get("bal", 0) or 0
# Supplier engagement
supplier_agg = await fetch_one(
"""SELECT
COUNT(*) as active_count,
ROUND(AVG(unlock_count), 1) as avg_unlocks
FROM (
SELECT s.id, COUNT(lf.id) as unlock_count
FROM suppliers s
LEFT JOIN lead_forwards lf ON lf.supplier_id = s.id
WHERE s.tier != 'free' AND s.credit_balance > 0
GROUP BY s.id
)"""
)
response_agg = await fetch_one(
"""SELECT
COUNT(*) as total,
SUM(CASE WHEN status != 'sent' THEN 1 ELSE 0 END) as responded
FROM lead_forwards"""
)
resp_total = (response_agg or {}).get("total", 0) or 0
resp_responded = (response_agg or {}).get("responded", 0) or 0
response_rate = round((resp_responded / resp_total) * 100) if resp_total > 0 else 0
# Feature flags
flags = await fetch_all(
"SELECT name, enabled FROM feature_flags WHERE name IN ('lead_unlock', 'supplier_signup')"
)
flag_map = {f["name"]: bool(f["enabled"]) for f in flags}
return await render_template(
"admin/marketplace.html",
funnel={
"total": total,
"verified_new": funnel["verified_new"] or 0,
"unlocked": unlocked_count,
"won": won,
"conversion_rate": conversion_rate,
},
credits={
"issued": (credit_agg or {}).get("total_issued", 0) or 0,
"consumed": (credit_agg or {}).get("total_consumed", 0) or 0,
"outstanding": outstanding,
"monthly_burn": (credit_agg or {}).get("monthly_burn", 0) or 0,
},
suppliers={
"active": (supplier_agg or {}).get("active_count", 0) or 0,
"avg_unlocks": (supplier_agg or {}).get("avg_unlocks", 0) or 0,
"response_rate": response_rate,
},
flags=flag_map,
)
@bp.route("/marketplace/activity")
@role_required("admin")
async def marketplace_activity():
"""HTMX: Recent marketplace activity stream."""
rows = await fetch_all(
"""SELECT 'lead' as event_type, id as ref_id, NULL as ref2_id,
contact_name as actor, status as detail,
country as extra, created_at
FROM lead_requests WHERE lead_type = 'quote'
UNION ALL
SELECT 'unlock' as event_type, lf.lead_id as ref_id, lf.supplier_id as ref2_id,
s.name as actor, lf.status as detail,
CAST(lf.credit_cost AS TEXT) as extra, lf.created_at
FROM lead_forwards lf
JOIN suppliers s ON s.id = lf.supplier_id
UNION ALL
SELECT 'credit' as event_type, id as ref_id, supplier_id as ref2_id,
s.name as actor, cl.event_type as detail,
CAST(cl.delta AS TEXT) as extra, cl.created_at
FROM credit_ledger cl
JOIN suppliers s ON s.id = cl.supplier_id
ORDER BY created_at DESC LIMIT 50"""
)
return await render_template("admin/partials/marketplace_activity.html", events=rows)
# =============================================================================
# Supplier Management
# =============================================================================
SUPPLIER_TIERS = ["free", "basic", "growth", "pro"]
async def get_suppliers_list(
tier: str = None, country: str = None, search: str = None,
page: int = 1, per_page: int = 50,
) -> list[dict]:
"""Get suppliers with optional filters."""
wheres = ["1=1"]
params: list = []
if tier:
wheres.append("tier = ?")
params.append(tier)
if country:
wheres.append("country_code = ?")
params.append(country)
if search:
wheres.append("name LIKE ?")
params.append(f"%{search}%")
where = " AND ".join(wheres)
offset = (page - 1) * per_page
params.extend([per_page, offset])
return await fetch_all(
f"""SELECT * FROM suppliers WHERE {where}
ORDER BY tier DESC, name ASC LIMIT ? OFFSET ?""",
tuple(params),
)
async def get_supplier_stats() -> dict:
"""Get aggregate supplier stats for the admin list header."""
claimed = await fetch_one("SELECT COUNT(*) as cnt FROM suppliers WHERE claimed_by IS NOT NULL")
growth = await fetch_one("SELECT COUNT(*) as cnt FROM suppliers WHERE tier = 'growth'")
pro = await fetch_one("SELECT COUNT(*) as cnt FROM suppliers WHERE tier = 'pro'")
return {
"claimed": claimed["cnt"] if claimed else 0,
"growth": growth["cnt"] if growth else 0,
"pro": pro["cnt"] if pro else 0,
}
@bp.route("/suppliers")
@role_required("admin")
async def suppliers():
"""Supplier management list."""
search = request.args.get("search", "").strip()
tier = request.args.get("tier", "")
country = request.args.get("country", "")
page = max(1, int(request.args.get("page", "1") or "1"))
supplier_list = await get_suppliers_list(
tier=tier or None, country=country or None,
search=search or None, page=page,
)
supplier_stats = await get_supplier_stats()
countries = await fetch_all(
"SELECT DISTINCT country_code FROM suppliers WHERE country_code IS NOT NULL AND country_code != '' ORDER BY country_code"
)
return await render_template(
"admin/suppliers.html",
suppliers=supplier_list,
supplier_stats=supplier_stats,
tiers=SUPPLIER_TIERS,
countries=[c["country_code"] for c in countries],
current_search=search,
current_tier=tier,
current_country=country,
page=page,
)
@bp.route("/suppliers/results")
@role_required("admin")
async def supplier_results():
"""HTMX partial for filtered supplier results."""
search = request.args.get("search", "").strip()
tier = request.args.get("tier", "")
country = request.args.get("country", "")
page = max(1, int(request.args.get("page", "1") or "1"))
supplier_list = await get_suppliers_list(
tier=tier or None, country=country or None,
search=search or None, page=page,
)
return await render_template("admin/partials/supplier_results.html", suppliers=supplier_list)
@bp.route("/suppliers/<int:supplier_id>")
@role_required("admin")
async def supplier_detail(supplier_id: int):
"""Supplier detail page."""
supplier = await fetch_one("SELECT * FROM suppliers WHERE id = ?", (supplier_id,))
if not supplier:
await flash("Supplier not found.", "error")
return redirect(url_for("admin.suppliers"))
# Credit balance
from ..credits import get_balance, get_ledger
credit_balance = await get_balance(supplier_id)
ledger = await get_ledger(supplier_id, limit=50)
# Active boosts
boosts = await fetch_all(
"SELECT * FROM supplier_boosts WHERE supplier_id = ? ORDER BY created_at DESC",
(supplier_id,),
)
# Lead forward history
forwards = await fetch_all(
"""SELECT lf.*, lr.contact_name, lr.country, lr.heat_score
FROM lead_forwards lf
LEFT JOIN lead_requests lr ON lr.id = lf.lead_id
WHERE lf.supplier_id = ?
ORDER BY lf.created_at DESC LIMIT 50""",
(supplier_id,),
)
enquiry_row = await fetch_one(
"SELECT COUNT(*) as cnt FROM supplier_enquiries WHERE supplier_id = ?",
(supplier_id,),
)
enquiry_count = enquiry_row["cnt"] if enquiry_row else 0
# Email activity timeline — correlate by contact_email (no FK)
timeline = []
contact_email = supplier["contact_email"] if supplier else None
if contact_email:
sent = await fetch_all(
"""SELECT created_at, subject, 'sent' AS direction
FROM email_log
WHERE to_addr = ? AND email_type = 'outreach'
ORDER BY created_at DESC LIMIT 50""",
(contact_email,),
)
received = await fetch_all(
"""SELECT received_at AS created_at, subject, 'received' AS direction
FROM inbound_emails
WHERE from_addr = ?
ORDER BY received_at DESC LIMIT 50""",
(contact_email,),
)
timeline = sorted(
list(sent) + list(received),
key=lambda x: x["created_at"] or "",
reverse=True,
)[:50]
return await render_template(
"admin/supplier_detail.html",
supplier=supplier,
tiers=SUPPLIER_TIERS,
credit_balance=credit_balance,
ledger=ledger,
boosts=boosts,
forwards=forwards,
enquiry_count=enquiry_count,
timeline=timeline,
)
@bp.route("/suppliers/new", methods=["GET", "POST"])
@role_required("admin")
@csrf_protect
async def supplier_new():
"""Create a new supplier from admin."""
if request.method == "POST":
form = await request.form
name = form.get("name", "").strip()
slug = form.get("slug", "").strip()
country_code = form.get("country_code", "").strip().upper()
city = form.get("city", "").strip()
region = form.get("region", "Europe")
category = form.get("category", "manufacturer")
tier = form.get("tier", "free")
website = form.get("website", "").strip()
description = form.get("description", "").strip()
contact_name = form.get("contact_name", "").strip()
contact_email = form.get("contact_email", "").strip()
if not name or not country_code:
await flash("Name and country code are required.", "error")
return await render_template("admin/supplier_form.html", data=dict(form))
if not slug:
slug = name.lower().replace(" ", "-").replace("&", "and").replace(",", "")
# Check slug uniqueness
existing = await fetch_one("SELECT 1 FROM suppliers WHERE slug = ?", (slug,))
if existing:
await flash(f"Slug '{slug}' already exists.", "error")
return await render_template("admin/supplier_form.html", data=dict(form))
contact_role = form.get("contact_role", "").strip()
services_offered = form.get("services_offered", "").strip()
linkedin_url = form.get("linkedin_url", "").strip()
instagram_url = form.get("instagram_url", "").strip()
youtube_url = form.get("youtube_url", "").strip()
now = utcnow_iso()
supplier_id = await execute(
"""INSERT INTO suppliers
(name, slug, country_code, city, region, website, description, category,
tier, contact_name, contact_email, contact_role, services_offered,
linkedin_url, instagram_url, youtube_url, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(name, slug, country_code, city, region, website, description,
category, tier, contact_name, contact_email, contact_role,
services_offered, linkedin_url, instagram_url, youtube_url, now),
)
await flash(f"Supplier '{name}' created.", "success")
return redirect(url_for("admin.supplier_detail", supplier_id=supplier_id))
return await render_template("admin/supplier_form.html", data={})
@bp.route("/suppliers/<int:supplier_id>/credits", methods=["POST"])
@role_required("admin")
@csrf_protect
async def supplier_credits(supplier_id: int):
"""Manually adjust supplier credits."""
form = await request.form
amount = int(form.get("amount", 0))
action = form.get("action", "add")
note = form.get("note", "").strip() or "Admin adjustment"
if amount <= 0:
await flash("Amount must be positive.", "error")
return redirect(url_for("admin.supplier_detail", supplier_id=supplier_id))
from ..credits import InsufficientCredits, add_credits, spend_credits
if action == "subtract":
try:
await spend_credits(supplier_id, amount, "admin_adjustment", note=note)
except InsufficientCredits:
await flash("Insufficient credits for subtraction.", "error")
return redirect(url_for("admin.supplier_detail", supplier_id=supplier_id))
else:
await add_credits(supplier_id, amount, "admin_adjustment", note=note)
await flash(f"Credits {'added' if action == 'add' else 'subtracted'}: {amount}.", "success")
return redirect(url_for("admin.supplier_detail", supplier_id=supplier_id))
@bp.route("/suppliers/<int:supplier_id>/tier", methods=["POST"])
@role_required("admin")
@csrf_protect
async def supplier_tier(supplier_id: int):
"""Manually change supplier tier."""
form = await request.form
new_tier = form.get("tier", "")
if new_tier not in SUPPLIER_TIERS:
await flash("Invalid tier.", "error")
return redirect(url_for("admin.supplier_detail", supplier_id=supplier_id))
await execute(
"UPDATE suppliers SET tier = ? WHERE id = ?", (new_tier, supplier_id)
)
await flash(f"Supplier tier updated to {new_tier}.", "success")
return redirect(url_for("admin.supplier_detail", supplier_id=supplier_id))
# =============================================================================
# Feature Flags
# =============================================================================
@bp.route("/flags")
@role_required("admin")
async def flags():
"""Feature flags management."""
flag_list = await fetch_all("SELECT * FROM feature_flags ORDER BY name")
return await render_template("admin/flags.html", flags=flag_list, admin_page="flags")
@bp.route("/flags/toggle", methods=["POST"])
@role_required("admin")
@csrf_protect
async def flag_toggle():
"""Toggle a feature flag on/off."""
form = await request.form
flag_name = form.get("name", "").strip()
if not flag_name:
await flash("Flag name required.", "error")
return redirect(url_for("admin.flags"))
# Get current state and flip it
row = await fetch_one("SELECT enabled FROM feature_flags WHERE name = ?", (flag_name,))
if not row:
await flash(f"Flag '{flag_name}' not found.", "error")
return redirect(url_for("admin.flags"))
new_enabled = 0 if row["enabled"] else 1
now = utcnow_iso()
await execute(
"UPDATE feature_flags SET enabled = ?, updated_at = ? WHERE name = ?",
(new_enabled, now, flag_name),
)
state = "enabled" if new_enabled else "disabled"
await flash(f"Flag '{flag_name}' {state}.", "success")
next_url = form.get("next", "") or url_for("admin.flags")
return redirect(next_url)
# =============================================================================
# Feedback Management
# =============================================================================
@bp.route("/feedback")
@role_required("admin")
async def feedback():
"""View user feedback submissions."""
feedback_list = await fetch_all(
"""SELECT f.*, u.email
FROM feedback f
LEFT JOIN users u ON u.id = f.user_id
ORDER BY f.created_at DESC
LIMIT 200"""
)
return await render_template("admin/feedback.html", feedback_list=feedback_list)
# =============================================================================
# Email Hub
# =============================================================================
EMAIL_TYPES = [
"ad_hoc", "magic_link", "welcome", "quote_verification", "waitlist",
"lead_forward", "lead_matched", "supplier_enquiry", "business_plan",
"generic", "admin_compose", "admin_reply", "outreach",
]
EVENT_TYPES = ["sent", "delivered", "opened", "clicked", "bounced", "complained"]
async def get_email_log(
email_type: str = None, last_event: str = None, search: str = None,
page: int = 1, per_page: int = 50,
) -> list[dict]:
"""Get email log with optional filters."""
wheres = ["1=1"]
params: list = []
if email_type:
wheres.append("email_type = ?")
params.append(email_type)
if last_event:
wheres.append("last_event = ?")
params.append(last_event)
if search:
wheres.append("(to_addr LIKE ? OR subject LIKE ?)")
params.extend([f"%{search}%", f"%{search}%"])
where = " AND ".join(wheres)
offset = (page - 1) * per_page
params.extend([per_page, offset])
return await fetch_all(
f"""SELECT * FROM email_log WHERE {where}
ORDER BY created_at DESC LIMIT ? OFFSET ?""",
tuple(params),
)
async def get_email_stats() -> dict:
"""Aggregate email stats for the list header."""
total = await fetch_one("SELECT COUNT(*) as cnt FROM email_log")
delivered = await fetch_one("SELECT COUNT(*) as cnt FROM email_log WHERE last_event = 'delivered'")
bounced = await fetch_one("SELECT COUNT(*) as cnt FROM email_log WHERE last_event = 'bounced'")
today = utcnow().date().isoformat()
sent_today = await fetch_one("SELECT COUNT(*) as cnt FROM email_log WHERE created_at >= ?", (today,))
return {
"total": total["cnt"] if total else 0,
"delivered": delivered["cnt"] if delivered else 0,
"bounced": bounced["cnt"] if bounced else 0,
"sent_today": sent_today["cnt"] if sent_today else 0,
}
async def get_unread_count() -> int:
"""Count unread inbound emails."""
row = await fetch_one("SELECT COUNT(*) as cnt FROM inbound_emails WHERE is_read = 0")
return row["cnt"] if row else 0
@bp.route("/emails")
@role_required("admin")
async def emails():
"""Sent email log."""
email_type = request.args.get("email_type", "")
last_event = request.args.get("last_event", "")
search = request.args.get("search", "").strip()
page = max(1, int(request.args.get("page", "1") or "1"))
log = await get_email_log(
email_type=email_type or None, last_event=last_event or None,
search=search or None, page=page,
)
stats = await get_email_stats()
unread = await get_unread_count()
return await render_template(
"admin/emails.html",
emails=log,
email_stats=stats,
email_types=EMAIL_TYPES,
event_types=EVENT_TYPES,
current_type=email_type,
current_event=last_event,
current_search=search,
page=page,
unread_count=unread,
)
@bp.route("/emails/gallery")
@role_required("admin")
async def email_gallery():
"""Gallery of all email template types with sample previews."""
return await render_template(
"admin/email_gallery.html",
registry=EMAIL_TEMPLATE_REGISTRY,
)
@bp.route("/emails/gallery/<slug>")
@role_required("admin")
async def email_gallery_preview(slug: str):
"""Rendered preview of a single email template with sample data."""
entry = EMAIL_TEMPLATE_REGISTRY.get(slug)
if not entry:
await flash(f"Unknown email template: {slug!r}", "error")
return redirect(url_for("admin.email_gallery"))
lang = request.args.get("lang", "en")
if lang not in ("en", "de"):
lang = "en"
try:
sample = entry["sample_data"](lang)
rendered_html = render_email_template(entry["template"], lang=lang, **sample)
except Exception:
logger.exception("email_gallery_preview: render failed for %s (lang=%s)", slug, lang)
rendered_html = "<p style='padding:2rem;color:#DC2626;'>Render error — see logs.</p>"
return await render_template(
"admin/email_gallery_preview.html",
slug=slug,
entry=entry,
lang=lang,
rendered_html=rendered_html,
)
@bp.route("/emails/results")
@role_required("admin")
async def email_results():
"""HTMX partial for filtered email log."""
email_type = request.args.get("email_type", "")
last_event = request.args.get("last_event", "")
search = request.args.get("search", "").strip()
page = max(1, int(request.args.get("page", "1") or "1"))
log = await get_email_log(
email_type=email_type or None, last_event=last_event or None,
search=search or None, page=page,
)
return await render_template("admin/partials/email_results.html", emails=log)
@bp.route("/emails/<int:email_id>")
@role_required("admin")
async def email_detail(email_id: int):
"""Email detail — enriches with Resend API for HTML body."""
email = await fetch_one("SELECT * FROM email_log WHERE id = ?", (email_id,))
if not email:
await flash("Email not found.", "error")
return redirect(url_for("admin.emails"))
# Try to fetch full email from Resend API (5s timeout)
enriched_html = None
if email["resend_id"] and email["resend_id"] != "dev" and config.RESEND_API_KEY:
resend.api_key = config.RESEND_API_KEY
try:
result = resend.Emails.get(email["resend_id"])
if isinstance(result, dict):
enriched_html = result.get("html", "")
else:
enriched_html = getattr(result, "html", "")
except Exception:
logger.warning("Failed to fetch email body from Resend for %s", email["resend_id"], exc_info=True)
related_lead = await fetch_one(
"SELECT id FROM lead_requests WHERE contact_email = ? LIMIT 1", (email["to_addr"],)
)
related_supplier = await fetch_one(
"SELECT id, name FROM suppliers WHERE contact_email = ? LIMIT 1", (email["to_addr"],)
)
return await render_template(
"admin/email_detail.html",
email=email,
enriched_html=enriched_html,
related_lead=related_lead,
related_supplier=related_supplier,
)
# --- Inbox ---
@bp.route("/emails/inbox")
@role_required("admin")
async def inbox():
"""Inbound email list."""
page = max(1, int(request.args.get("page", "1") or "1"))
per_page = 50
offset = (page - 1) * per_page
unread = await get_unread_count()
messages = await fetch_all(
"SELECT * FROM inbound_emails ORDER BY received_at DESC LIMIT ? OFFSET ?",
(per_page, offset),
)
return await render_template(
"admin/inbox.html", messages=messages, unread_count=unread, page=page,
)
@bp.route("/emails/inbox/<int:msg_id>")
@role_required("admin")
async def inbox_detail(msg_id: int):
"""Inbound email detail — marks as read."""
msg = await fetch_one("SELECT * FROM inbound_emails WHERE id = ?", (msg_id,))
if not msg:
await flash("Message not found.", "error")
return redirect(url_for("admin.inbox"))
if not msg["is_read"]:
await execute("UPDATE inbound_emails SET is_read = 1 WHERE id = ?", (msg_id,))
return await render_template("admin/inbox_detail.html", msg=msg)
@bp.route("/emails/inbox/<int:msg_id>/reply", methods=["POST"])
@role_required("admin")
@csrf_protect
async def inbox_reply(msg_id: int):
"""Reply to an inbound email."""
msg = await fetch_one("SELECT * FROM inbound_emails WHERE id = ?", (msg_id,))
if not msg:
await flash("Message not found.", "error")
return redirect(url_for("admin.inbox"))
form = await request.form
body = form.get("body", "").strip()
from_addr = form.get("from_addr", "") or EMAIL_ADDRESSES["transactional"]
if not body:
await flash("Reply body is required.", "error")
return redirect(url_for("admin.inbox_detail", msg_id=msg_id))
subject = msg["subject"] or ""
if not subject.lower().startswith("re:"):
subject = f"Re: {subject}"
html = f"<p>{body.replace(chr(10), '<br>')}</p>"
result = await send_email(
to=msg["from_addr"],
subject=subject,
html=html,
from_addr=from_addr,
email_type="admin_reply",
)
if result:
await flash("Reply sent.", "success")
else:
await flash("Failed to send reply.", "error")
return redirect(url_for("admin.inbox_detail", msg_id=msg_id))
# --- Compose ---
@bp.route("/emails/compose", methods=["GET", "POST"])
@role_required("admin")
@csrf_protect
async def email_compose():
"""Compose and send an ad-hoc email.
Supports outreach pre-fill via query params:
?to=<email>&from_key=outreach&email_type=outreach&supplier_id=<id>&subject=<text>
When email_type=outreach and supplier_id is set, a successful send auto-updates
the supplier's outreach pipeline state:
- outreach_status: 'prospect''contacted' (only advances from prospect)
- last_contacted_at: set to now
- outreach_sequence_step: incremented by 1
"""
if request.method == "POST":
form = await request.form
to = form.get("to", "").strip()
subject = form.get("subject", "").strip()
body = form.get("body", "").strip()
from_addr = form.get("from_addr", "") or EMAIL_ADDRESSES["transactional"]
wrap = form.get("wrap", "") == "1"
email_type = form.get("email_type", "admin_compose").strip()
supplier_id_raw = form.get("supplier_id", "").strip()
supplier_id = int(supplier_id_raw) if supplier_id_raw.isdigit() else None
# Only allow known email_type values — default unknown to admin_compose
if email_type not in EMAIL_TYPES:
email_type = "admin_compose"
if not to or not subject or not body:
await flash("To, subject, and body are required.", "error")
return await render_template(
"admin/email_compose.html",
data={
"to": to, "subject": subject, "body": body,
"from_addr": from_addr, "email_type": email_type,
"supplier_id": supplier_id,
},
email_addresses=EMAIL_ADDRESSES,
)
body_html = f"<p>{body.replace(chr(10), '<br>')}</p>"
if wrap:
html = render_email_template(
"emails/admin_compose.html",
lang="en",
body_html=body_html,
preheader="",
)
else:
html = body_html
result = await send_email(
to=to, subject=subject, html=html,
from_addr=from_addr, email_type=email_type,
)
if result:
await flash(f"Email sent to {to}.", "success")
# Auto-update outreach pipeline when sending an outreach email to a known supplier
if email_type == "outreach" and supplier_id:
now = utcnow_iso()
await execute(
"""UPDATE suppliers
SET last_contacted_at = ?,
outreach_sequence_step = outreach_sequence_step + 1,
outreach_status = CASE
WHEN outreach_status = 'prospect' THEN 'contacted'
ELSE outreach_status
END
WHERE id = ? AND outreach_status IS NOT NULL""",
(now, supplier_id),
)
return redirect(url_for("admin.emails"))
else:
await flash("Failed to send email.", "error")
return await render_template(
"admin/email_compose.html",
data={
"to": to, "subject": subject, "body": body,
"from_addr": from_addr, "email_type": email_type,
"supplier_id": supplier_id,
},
email_addresses=EMAIL_ADDRESSES,
)
# GET: pre-fill from query params
prefill_to = request.args.get("to", "")
prefill_subject = request.args.get("subject", "")
from_key = request.args.get("from_key", "")
email_type = request.args.get("email_type", "admin_compose")
supplier_id_raw = request.args.get("supplier_id", "")
supplier_id = int(supplier_id_raw) if supplier_id_raw.isdigit() else None
# Pre-select from_addr when from_key provided (e.g. from_key=outreach)
prefill_from_addr = EMAIL_ADDRESSES.get(from_key, "") if from_key else ""
return await render_template(
"admin/email_compose.html",
data={
"to": prefill_to,
"subject": prefill_subject,
"from_addr": prefill_from_addr,
"email_type": email_type,
"supplier_id": supplier_id,
# Default wrap=0 for outreach — plain text best practice
"wrap": email_type != "outreach",
},
email_addresses=EMAIL_ADDRESSES,
)
@bp.route("/emails/compose/preview", methods=["POST"])
@role_required("admin")
async def compose_preview():
"""HTMX endpoint: render live preview for compose textarea (no CSRF — read-only)."""
form = await request.form
body = form.get("body", "").strip()
wrap = form.get("wrap", "") == "1"
body_html = f"<p>{body.replace(chr(10), '<br>')}</p>" if body else ""
if wrap and body_html:
try:
rendered_html = render_email_template(
"emails/admin_compose.html",
lang="en",
body_html=body_html,
preheader="",
)
except Exception:
logger.exception("compose_preview: template render failed")
rendered_html = body_html
else:
rendered_html = body_html
return await render_template(
"admin/partials/email_preview_frame.html",
rendered_html=rendered_html,
)
# --- Audiences ---
@bp.route("/emails/audiences")
@role_required("admin")
async def audiences():
"""List Resend audiences with local cache + API contact counts."""
# Cap at 20 — Resend free plan limit is 3 audiences, paid is more but still
# small. One API call per audience is unavoidable (no bulk contacts endpoint).
audience_list = await fetch_all("SELECT * FROM resend_audiences ORDER BY name LIMIT 20")
# Enrich with contact count from API (best-effort, one call per audience)
for a in audience_list:
a["contact_count"] = None
if config.RESEND_API_KEY and a.get("audience_id"):
resend.api_key = config.RESEND_API_KEY
try:
contacts = resend.Contacts.list(a["audience_id"])
if isinstance(contacts, dict):
a["contact_count"] = len(contacts.get("data", []))
elif isinstance(contacts, list):
a["contact_count"] = len(contacts)
else:
data = getattr(contacts, "data", [])
a["contact_count"] = len(data) if data else 0
except Exception:
logger.warning("Failed to fetch contact count for audience %s", a.get("audience_id"), exc_info=True)
return await render_template("admin/audiences.html", audiences=audience_list)
@bp.route("/emails/audiences/<audience_id>/contacts")
@role_required("admin")
async def audience_contacts(audience_id: str):
"""List contacts in a Resend audience."""
audience = await fetch_one("SELECT * FROM resend_audiences WHERE audience_id = ?", (audience_id,))
if not audience:
await flash("Audience not found.", "error")
return redirect(url_for("admin.audiences"))
contacts = []
if config.RESEND_API_KEY:
resend.api_key = config.RESEND_API_KEY
try:
result = resend.Contacts.list(audience_id)
if isinstance(result, dict):
contacts = result.get("data", [])
elif isinstance(result, list):
contacts = result
else:
contacts = getattr(result, "data", []) or []
except Exception:
logger.exception("Failed to fetch contacts from Resend for audience %s", audience_id)
await flash("Failed to fetch contacts from Resend.", "error")
return await render_template(
"admin/audience_contacts.html", audience=audience, contacts=contacts,
)
@bp.route("/emails/audiences/<audience_id>/contacts/remove", methods=["POST"])
@role_required("admin")
@csrf_protect
async def audience_contact_remove(audience_id: str):
"""Remove a contact from a Resend audience."""
form = await request.form
contact_id = form.get("contact_id", "")
if not contact_id:
await flash("No contact specified.", "error")
return redirect(url_for("admin.audience_contacts", audience_id=audience_id))
if config.RESEND_API_KEY:
resend.api_key = config.RESEND_API_KEY
try:
resend.Contacts.remove(audience_id, contact_id)
await flash("Contact removed.", "success")
except Exception as e:
await flash(f"Failed to remove contact: {e}", "error")
return redirect(url_for("admin.audience_contacts", audience_id=audience_id))
# =============================================================================
# Content Templates (read-only — templates live in git as .md.jinja files)
# =============================================================================
@bp.route("/templates")
@role_required("admin")
async def templates():
"""List content templates scanned from disk."""
from ..content import count_template_data, discover_templates
template_list = discover_templates()
# Single query: article counts for all templates — avoids N SQLite round-trips
counts_raw = await fetch_all(
"SELECT template_slug, COUNT(*) as cnt FROM articles GROUP BY template_slug"
)
article_counts = {r["template_slug"]: r["cnt"] for r in counts_raw}
# One DuckDB COUNT(*) per template (N queries, but cheap vs SELECT * LIMIT 501)
for t in template_list:
t["data_count"] = await count_template_data(t["data_table"])
t["generated_count"] = article_counts.get(t["slug"], 0)
return await render_template("admin/templates.html", templates=template_list)
@bp.route("/templates/<slug>")
@role_required("admin")
async def template_detail(slug: str):
"""Template detail: config (read-only), columns, sample data, actions."""
from ..content import fetch_template_data, get_table_columns, load_template
try:
config = load_template(slug)
except (AssertionError, FileNotFoundError):
await flash("Template not found.", "error")
return redirect(url_for("admin.templates"))
columns = await get_table_columns(config["data_table"])
sample_rows = await fetch_template_data(config["data_table"], limit=10)
# Count generated articles
row = await fetch_one(
"SELECT COUNT(*) as cnt FROM articles WHERE template_slug = ?", (slug,),
)
generated_count = row["cnt"] if row else 0
return await render_template(
"admin/template_detail.html",
config_data=config,
columns=columns,
sample_rows=sample_rows,
generated_count=generated_count,
)
@bp.route("/templates/<slug>/preview/<row_key>")
@role_required("admin")
async def template_preview(slug: str, row_key: str):
"""Preview a single article rendered from template + DuckDB row."""
from ..content import preview_article
lang = request.args.get("lang", "en")
try:
result = await preview_article(slug, row_key, lang=lang)
except (AssertionError, Exception) as exc:
await flash(f"Preview error: {exc}", "error")
return redirect(url_for("admin.template_detail", slug=slug))
return await render_template(
"admin/template_preview.html",
config={"slug": slug},
preview=result,
lang=lang,
)
@bp.route("/templates/<slug>/generate", methods=["GET", "POST"])
@role_required("admin")
@csrf_protect
async def template_generate(slug: str):
"""Generate articles from template + DuckDB data."""
from ..content import fetch_template_data, load_template
try:
config = load_template(slug)
except (AssertionError, FileNotFoundError):
await flash("Template not found.", "error")
return redirect(url_for("admin.templates"))
data_rows = await fetch_template_data(config["data_table"], limit=501)
row_count = len(data_rows)
if request.method == "POST":
form = await request.form
start_date_str = form.get("start_date", "")
articles_per_day = int(form.get("articles_per_day", 3) or 3)
start_date = date.fromisoformat(start_date_str) if start_date_str else date.today()
from ..worker import enqueue
await enqueue("generate_articles", {
"template_slug": slug,
"start_date": start_date.isoformat(),
"articles_per_day": articles_per_day,
"limit": 500,
})
await flash(
f"Article generation queued for '{config['name']}'. "
"The worker will process it in the background.",
"success",
)
return redirect(url_for("admin.articles"))
return await render_template(
"admin/generate_form.html",
config_data=config,
row_count=row_count,
today=date.today().isoformat(),
)
@bp.route("/templates/<slug>/regenerate", methods=["POST"])
@role_required("admin")
@csrf_protect
async def template_regenerate(slug: str):
"""Re-generate all articles for a template with fresh DuckDB data."""
from ..content import load_template
try:
load_template(slug)
except (AssertionError, FileNotFoundError):
await flash("Template not found.", "error")
return redirect(url_for("admin.templates"))
from ..worker import enqueue
await enqueue("generate_articles", {
"template_slug": slug,
"start_date": date.today().isoformat(),
"articles_per_day": 500,
"limit": 500,
})
await flash("Regeneration queued. The worker will process it in the background.", "success")
return redirect(url_for("admin.template_detail", slug=slug))
# =============================================================================
# Published Scenario Management
# =============================================================================
SCENARIO_FORM_FIELDS = [
"title", "slug", "subtitle", "location", "country",
"venue", "own", "dblCourts", "sglCourts",
"ratePeak", "rateOffPeak", "rateSingle", "peakPct", "hoursPerDay", "utilTarget",
"rentSqm", "electricity", "heating", "staff", "insurance", "maintenance", "cleaning", "marketing",
"courtCostDbl", "courtCostSgl", "hallCostSqm", "landPriceSqm", "contingencyPct", "fitout",
"loanPct", "interestRate", "loanTerm",
]
async def _query_scenarios(search: str, country: str, venue_type: str) -> tuple[list, int]:
"""Execute filtered scenario query. Returns (rows, total_count)."""
wheres = ["1=1"]
params: list = []
if search:
wheres.append("(title LIKE ? OR location LIKE ? OR slug LIKE ?)")
params.extend([f"%{search}%", f"%{search}%", f"%{search}%"])
if country:
wheres.append("country = ?")
params.append(country)
if venue_type:
wheres.append("venue_type = ?")
params.append(venue_type)
where = " AND ".join(wheres)
rows = await fetch_all(
f"SELECT * FROM published_scenarios WHERE {where} ORDER BY created_at DESC LIMIT 500",
tuple(params),
)
total_row = await fetch_one("SELECT COUNT(*) as cnt FROM published_scenarios")
return rows, (total_row["cnt"] if total_row else 0)
@bp.route("/scenarios")
@role_required("admin")
async def scenarios():
"""List published scenarios with optional filters."""
search = request.args.get("search", "").strip()
country_filter = request.args.get("country", "")
venue_filter = request.args.get("venue_type", "")
scenario_list, total = await _query_scenarios(search, country_filter, venue_filter)
countries = await fetch_all(
"SELECT DISTINCT country FROM published_scenarios WHERE country != '' ORDER BY country"
)
venue_types = await fetch_all(
"SELECT DISTINCT venue_type FROM published_scenarios WHERE venue_type != '' ORDER BY venue_type"
)
return await render_template(
"admin/scenarios.html",
scenarios=scenario_list,
countries=[r["country"] for r in countries],
venue_types=[r["venue_type"] for r in venue_types],
total=total,
current_search=search,
current_country=country_filter,
current_venue_type=venue_filter,
is_generating=await _is_generating(),
)
@bp.route("/scenarios/results")
@role_required("admin")
async def scenario_results():
"""HTMX partial for scenario results (used by live polling)."""
search = request.args.get("search", "").strip()
country_filter = request.args.get("country", "")
venue_filter = request.args.get("venue_type", "")
scenario_list, total = await _query_scenarios(search, country_filter, venue_filter)
return await render_template(
"admin/partials/scenario_results.html",
scenarios=scenario_list,
total=total,
is_generating=await _is_generating(),
)
@bp.route("/scenarios/new", methods=["GET", "POST"])
@role_required("admin")
@csrf_protect
async def scenario_new():
"""Create a published scenario manually."""
from ..planner.calculator import DEFAULTS, calc, validate_state
if request.method == "POST":
form = await request.form
title = form.get("title", "").strip()
scenario_slug = form.get("slug", "").strip() or slugify(title)
subtitle = form.get("subtitle", "").strip()
location = form.get("location", "").strip()
country = form.get("country", "").strip()
if not title or not location or not country:
await flash("Title, location, and country are required.", "error")
return await render_template(
"admin/scenario_form.html", data=dict(form), editing=False, defaults=DEFAULTS,
)
# Build calc state from form
calc_overrides = {}
for key in DEFAULTS:
val = form.get(key, "")
if val != "":
calc_overrides[key] = val
state = validate_state(calc_overrides)
d = calc(state)
dbl = state.get("dblCourts", 0)
sgl = state.get("sglCourts", 0)
court_config = f"{dbl} double + {sgl} single"
await execute(
"""INSERT INTO published_scenarios
(slug, title, subtitle, location, country, venue_type, ownership,
court_config, state_json, calc_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
scenario_slug, title, subtitle, location, country,
state.get("venue", "indoor"), state.get("own", "rent"),
court_config, json.dumps(state), json.dumps(d),
),
)
await flash(f"Scenario '{title}' created.", "success")
return redirect(url_for("admin.scenarios"))
return await render_template(
"admin/scenario_form.html", data={}, editing=False, defaults=DEFAULTS,
)
@bp.route("/scenarios/<int:scenario_id>/edit", methods=["GET", "POST"])
@role_required("admin")
@csrf_protect
async def scenario_edit(scenario_id: int):
"""Edit a published scenario."""
from ..planner.calculator import DEFAULTS, calc, validate_state
scenario = await fetch_one("SELECT * FROM published_scenarios WHERE id = ?", (scenario_id,))
if not scenario:
await flash("Scenario not found.", "error")
return redirect(url_for("admin.scenarios"))
if request.method == "POST":
form = await request.form
title = form.get("title", "").strip()
subtitle = form.get("subtitle", "").strip()
location = form.get("location", "").strip()
country = form.get("country", "").strip()
calc_overrides = {}
for key in DEFAULTS:
val = form.get(key, "")
if val != "":
calc_overrides[key] = val
state = validate_state(calc_overrides)
d = calc(state)
dbl = state.get("dblCourts", 0)
sgl = state.get("sglCourts", 0)
court_config = f"{dbl} double + {sgl} single"
now = utcnow_iso()
await execute(
"""UPDATE published_scenarios
SET title = ?, subtitle = ?, location = ?, country = ?,
venue_type = ?, ownership = ?, court_config = ?,
state_json = ?, calc_json = ?, updated_at = ?
WHERE id = ?""",
(
title, subtitle, location, country,
state.get("venue", "indoor"), state.get("own", "rent"),
court_config, json.dumps(state), json.dumps(d), now, scenario_id,
),
)
await flash("Scenario updated and recalculated.", "success")
return redirect(url_for("admin.scenarios"))
# Merge scenario metadata + state for the form
state = json.loads(scenario["state_json"])
data = {
"title": scenario["title"],
"slug": scenario["slug"],
"subtitle": scenario["subtitle"] or "",
"location": scenario["location"],
"country": scenario["country"],
**state,
}
return await render_template(
"admin/scenario_form.html", data=data, editing=True, scenario_id=scenario_id, defaults=DEFAULTS,
)
@bp.route("/scenarios/<int:scenario_id>/delete", methods=["POST"])
@role_required("admin")
@csrf_protect
async def scenario_delete(scenario_id: int):
"""Delete a published scenario."""
await execute("DELETE FROM published_scenarios WHERE id = ?", (scenario_id,))
await flash("Scenario deleted.", "success")
return redirect(url_for("admin.scenarios"))
@bp.route("/scenarios/<int:scenario_id>/preview")
@role_required("admin")
async def scenario_preview(scenario_id: int):
"""Preview a rendered scenario card."""
scenario = await fetch_one("SELECT * FROM published_scenarios WHERE id = ?", (scenario_id,))
if not scenario:
await flash("Scenario not found.", "error")
return redirect(url_for("admin.scenarios"))
d = json.loads(scenario["calc_json"])
s = json.loads(scenario["state_json"])
return await render_template(
"admin/scenario_preview.html", scenario=scenario, d=d, s=s,
)
@bp.route("/scenarios/<int:scenario_id>/pdf")
@role_required("admin")
async def scenario_pdf(scenario_id: int):
"""Generate and immediately download a business plan PDF for a published scenario."""
from ..businessplan import get_plan_sections
from ..planner.calculator import validate_state
scenario = await fetch_one("SELECT * FROM published_scenarios WHERE id = ?", (scenario_id,))
if not scenario:
return jsonify({"error": "Scenario not found."}), 404
lang = request.args.get("lang", "en")
if lang not in ("en", "de"):
lang = "en"
state = validate_state(json.loads(scenario["state_json"]))
d = json.loads(scenario["calc_json"])
sections = get_plan_sections(state, d, lang)
sections["scenario_name"] = scenario["title"]
sections["location"] = scenario.get("location", "")
from pathlib import Path
template_dir = Path(__file__).parent.parent / "templates" / "businessplan"
html_template = (template_dir / "plan.html").read_text()
css = (template_dir / "plan.css").read_text()
from jinja2 import Template
rendered_html = Template(html_template).render(s=sections, css=css)
from weasyprint import HTML
pdf_bytes = HTML(string=rendered_html).write_pdf()
slug = scenario["slug"] or f"scenario-{scenario_id}"
filename = f"padel-business-plan-{slug}-{lang}.pdf"
return Response(
pdf_bytes,
mimetype="application/pdf",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
# =============================================================================
# Article Management
# =============================================================================
async def _get_article_list(
status: str = None,
template_slug: str = None,
language: str = None,
search: str = None,
page: int = 1,
per_page: int = 50,
) -> list[dict]:
"""Get articles with optional filters and pagination."""
wheres = ["1=1"]
params: list = []
if status == "live":
wheres.append("status = 'published' AND published_at <= datetime('now')")
elif status == "scheduled":
wheres.append("status = 'published' AND published_at > datetime('now')")
elif status == "draft":
wheres.append("status = 'draft'")
if template_slug:
wheres.append("template_slug = ?")
params.append(template_slug)
if language:
wheres.append("language = ?")
params.append(language)
if search:
wheres.append("title LIKE ?")
params.append(f"%{search}%")
where = " AND ".join(wheres)
offset = (page - 1) * per_page
params.extend([per_page, offset])
return await fetch_all(
f"""SELECT *,
CASE WHEN status = 'published' AND published_at > datetime('now')
THEN 'scheduled'
WHEN status = 'published' THEN 'live'
ELSE status END AS display_status
FROM articles WHERE {where}
ORDER BY created_at DESC LIMIT ? OFFSET ?""",
tuple(params),
)
async def _get_article_stats() -> dict:
"""Get aggregate article stats for the admin list header."""
row = await fetch_one(
"""SELECT
COUNT(*) AS total,
COALESCE(SUM(CASE WHEN status='published' AND published_at <= datetime('now') THEN 1 ELSE 0 END), 0) AS live,
COALESCE(SUM(CASE WHEN status='published' AND published_at > datetime('now') THEN 1 ELSE 0 END), 0) AS scheduled,
COALESCE(SUM(CASE WHEN status='draft' THEN 1 ELSE 0 END), 0) AS draft
FROM articles"""
)
return dict(row) if row else {"total": 0, "live": 0, "scheduled": 0, "draft": 0}
async def _is_generating() -> bool:
"""Return True if a generate_articles task is currently pending."""
row = await fetch_one(
"SELECT COUNT(*) AS cnt FROM tasks WHERE task_name = 'generate_articles' AND status = 'pending'"
)
return bool(row and row["cnt"] > 0)
@bp.route("/articles")
@role_required("admin")
async def articles():
"""List all articles with filters."""
search = request.args.get("search", "").strip()
status_filter = request.args.get("status", "")
template_filter = request.args.get("template", "")
language_filter = request.args.get("language", "")
page = max(1, int(request.args.get("page", "1") or "1"))
article_list = await _get_article_list(
status=status_filter or None, template_slug=template_filter or None,
language=language_filter or None, search=search or None, page=page,
)
stats = await _get_article_stats()
templates = await fetch_all(
"SELECT DISTINCT template_slug FROM articles WHERE template_slug IS NOT NULL ORDER BY template_slug"
)
return await render_template(
"admin/articles.html",
articles=article_list,
stats=stats,
template_slugs=[t["template_slug"] for t in templates],
current_search=search,
current_status=status_filter,
current_template=template_filter,
current_language=language_filter,
page=page,
is_generating=await _is_generating(),
)
@bp.route("/articles/results")
@role_required("admin")
async def article_results():
"""HTMX partial for filtered article results."""
search = request.args.get("search", "").strip()
status_filter = request.args.get("status", "")
template_filter = request.args.get("template", "")
language_filter = request.args.get("language", "")
page = max(1, int(request.args.get("page", "1") or "1"))
article_list = await _get_article_list(
status=status_filter or None, template_slug=template_filter or None,
language=language_filter or None, search=search or None, page=page,
)
return await render_template(
"admin/partials/article_results.html",
articles=article_list,
page=page,
is_generating=await _is_generating(),
)
@bp.route("/articles/new", methods=["GET", "POST"])
@role_required("admin")
@csrf_protect
async def article_new():
"""Create a manual article."""
from ..content.routes import BUILD_DIR, bake_scenario_cards, is_reserved_path
if request.method == "POST":
form = await request.form
title = form.get("title", "").strip()
article_slug = form.get("slug", "").strip() or slugify(title)
url_path = form.get("url_path", "").strip() or ("/" + article_slug)
meta_description = form.get("meta_description", "").strip()
og_image_url = form.get("og_image_url", "").strip()
country = form.get("country", "").strip()
region = form.get("region", "").strip()
body = form.get("body", "").strip()
language = form.get("language", "en").strip() or "en"
status = form.get("status", "draft")
published_at = form.get("published_at", "").strip()
if not title or not body:
await flash("Title and body are required.", "error")
return await render_template("admin/article_form.html", data=dict(form), editing=False)
if is_reserved_path(url_path):
await flash(f"URL path '{url_path}' conflicts with a reserved route.", "error")
return await render_template("admin/article_form.html", data=dict(form), editing=False)
# Render markdown → HTML with scenario cards baked in
body_html = mistune.html(body)
body_html = await bake_scenario_cards(body_html)
build_dir = BUILD_DIR / language
build_dir.mkdir(parents=True, exist_ok=True)
(build_dir / f"{article_slug}.html").write_text(body_html)
# Save markdown source
md_dir = Path("data/content/articles")
md_dir.mkdir(parents=True, exist_ok=True)
(md_dir / f"{article_slug}.md").write_text(body)
pub_dt = published_at or utcnow_iso()
seo_head = _build_article_seo_head(url_path, title, meta_description, language, pub_dt)
await execute(
"""INSERT INTO articles
(url_path, slug, title, meta_description, og_image_url,
country, region, language, status, published_at, seo_head)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(url_path, article_slug, title, meta_description, og_image_url,
country, region, language, status, pub_dt, seo_head),
)
from ..sitemap import invalidate_sitemap_cache
invalidate_sitemap_cache()
await flash(f"Article '{title}' created.", "success")
return redirect(url_for("admin.articles"))
return await render_template("admin/article_form.html", data={}, editing=False)
@bp.route("/articles/<int:article_id>/edit", methods=["GET", "POST"])
@role_required("admin")
@csrf_protect
async def article_edit(article_id: int):
"""Edit a manual article."""
from ..content.routes import BUILD_DIR, bake_scenario_cards, is_reserved_path
article = await fetch_one("SELECT * FROM articles WHERE id = ?", (article_id,))
if not article:
await flash("Article not found.", "error")
return redirect(url_for("admin.articles"))
if request.method == "POST":
form = await request.form
title = form.get("title", "").strip()
url_path = form.get("url_path", "").strip()
meta_description = form.get("meta_description", "").strip()
og_image_url = form.get("og_image_url", "").strip()
country = form.get("country", "").strip()
region = form.get("region", "").strip()
body = form.get("body", "").strip()
language = form.get("language", article.get("language", "en")).strip() or "en"
status = form.get("status", article["status"])
published_at = form.get("published_at", "").strip()
if is_reserved_path(url_path):
await flash(f"URL path '{url_path}' conflicts with a reserved route.", "error")
return await render_template(
"admin/article_form.html", data=dict(form), editing=True, article_id=article_id,
)
# Re-render if body provided
if body:
body_html = mistune.html(body)
body_html = await bake_scenario_cards(body_html)
build_dir = BUILD_DIR / language
build_dir.mkdir(parents=True, exist_ok=True)
(build_dir / f"{article['slug']}.html").write_text(body_html)
md_dir = Path("data/content/articles")
md_dir.mkdir(parents=True, exist_ok=True)
(md_dir / f"{article['slug']}.md").write_text(body)
now = utcnow_iso()
pub_dt = published_at or article["published_at"]
seo_head = _build_article_seo_head(url_path, title, meta_description, language, pub_dt)
await execute(
"""UPDATE articles
SET title = ?, url_path = ?, meta_description = ?, og_image_url = ?,
country = ?, region = ?, language = ?, status = ?, published_at = ?,
seo_head = ?, updated_at = ?
WHERE id = ?""",
(title, url_path, meta_description, og_image_url,
country, region, language, status, pub_dt, seo_head, now, article_id),
)
await flash("Article updated.", "success")
return redirect(url_for("admin.articles"))
# Load markdown source if available (manual or generated)
from ..content.routes import BUILD_DIR as CONTENT_BUILD_DIR
md_path = Path("data/content/articles") / f"{article['slug']}.md"
if not md_path.exists():
lang = article["language"] or "en"
md_path = CONTENT_BUILD_DIR / lang / "md" / f"{article['slug']}.md"
body = md_path.read_text() if md_path.exists() else ""
data = {**dict(article), "body": body}
return await render_template(
"admin/article_form.html", data=data, editing=True, article_id=article_id,
)
@bp.route("/articles/<int:article_id>/delete", methods=["POST"])
@role_required("admin")
@csrf_protect
async def article_delete(article_id: int):
"""Delete an article."""
article = await fetch_one("SELECT slug FROM articles WHERE id = ?", (article_id,))
if article:
# Clean up files
from ..content.routes import BUILD_DIR
build_path = BUILD_DIR / f"{article['slug']}.html"
if build_path.exists():
build_path.unlink()
md_path = Path("data/content/articles") / f"{article['slug']}.md"
if md_path.exists():
md_path.unlink()
await execute("DELETE FROM articles WHERE id = ?", (article_id,))
from ..sitemap import invalidate_sitemap_cache
invalidate_sitemap_cache()
if request.headers.get("HX-Request"):
return "" # row removed via hx-swap="outerHTML"
await flash("Article deleted.", "success")
return redirect(url_for("admin.articles"))
@bp.route("/articles/<int:article_id>/publish", methods=["POST"])
@role_required("admin")
@csrf_protect
async def article_publish(article_id: int):
"""Toggle article status between draft and published."""
article = await fetch_one("SELECT status FROM articles WHERE id = ?", (article_id,))
if not article:
await flash("Article not found.", "error")
return redirect(url_for("admin.articles"))
new_status = "published" if article["status"] == "draft" else "draft"
now = utcnow_iso()
await execute(
"UPDATE articles SET status = ?, updated_at = ? WHERE id = ?",
(new_status, now, article_id),
)
from ..sitemap import invalidate_sitemap_cache
invalidate_sitemap_cache()
if request.headers.get("HX-Request"):
updated = await fetch_one(
"""SELECT *,
CASE WHEN status = 'published' AND published_at > datetime('now')
THEN 'scheduled'
WHEN status = 'published' THEN 'live'
ELSE status END AS display_status
FROM articles WHERE id = ?""",
(article_id,),
)
return await render_template("admin/partials/article_row.html", a=updated)
await flash(f"Article status changed to {new_status}.", "success")
return redirect(url_for("admin.articles"))
@bp.route("/articles/<int:article_id>/rebuild", methods=["POST"])
@role_required("admin")
@csrf_protect
async def article_rebuild(article_id: int):
"""Re-render an article's HTML from source."""
await _rebuild_article(article_id)
await flash("Article rebuilt.", "success")
return redirect(url_for("admin.articles"))
@bp.route("/rebuild-all", methods=["POST"])
@role_required("admin")
@csrf_protect
async def rebuild_all():
"""Re-render all articles via background worker."""
from ..content import discover_templates
from ..worker import enqueue
templates = discover_templates()
for t in templates:
await enqueue("generate_articles", {
"template_slug": t["slug"],
"start_date": date.today().isoformat(),
"articles_per_day": 500,
"limit": 500,
})
# Manual articles still need inline rebuild
manual = await fetch_all("SELECT id FROM articles WHERE template_slug IS NULL")
for a in manual:
await _rebuild_article(a["id"])
await flash(
f"Queued rebuild for {len(templates)} templates"
f" + rebuilt {len(manual)} manual articles.",
"success",
)
return redirect(url_for("admin.articles"))
async def _rebuild_article(article_id: int):
"""Re-render a single article from its source."""
from ..content.routes import BUILD_DIR, bake_scenario_cards
article = await fetch_one("SELECT * FROM articles WHERE id = ?", (article_id,))
if not article:
return
if article["template_slug"]:
# SSG-generated article: regenerate via the content module
from ..content import generate_articles, load_template
try:
load_template(article["template_slug"])
except (AssertionError, FileNotFoundError):
return
# Regenerate all articles for this template (upserts, so safe)
await generate_articles(
article["template_slug"], date.today(), articles_per_day=500,
)
else:
# Manual article: re-render from markdown file
md_path = Path("data/content/articles") / f"{article['slug']}.md"
if not md_path.exists():
return
body_html = mistune.html(md_path.read_text())
lang = article.get("language", "en") if hasattr(article, "get") else "en"
body_html = await bake_scenario_cards(body_html, lang=lang)
BUILD_DIR.mkdir(parents=True, exist_ok=True)
(BUILD_DIR / f"{article['slug']}.html").write_text(body_html)
# =============================================================================
# SEO Hub
# =============================================================================
@bp.route("/seo")
@role_required("admin")
async def seo():
"""SEO metrics hub — overview + tabs for search, funnel, scorecard."""
from ..seo import get_search_performance, get_sync_status
date_range_days = int(request.args.get("days", "28") or "28")
date_range_days = max(1, min(date_range_days, 730))
overview = await get_search_performance(date_range_days=date_range_days)
sync_status = await get_sync_status()
return await render_template(
"admin/seo.html",
overview=overview,
sync_status=sync_status,
date_range_days=date_range_days,
)
@bp.route("/seo/search")
@role_required("admin")
async def seo_search():
"""HTMX partial: search performance tab."""
from ..seo import (
get_country_breakdown,
get_device_breakdown,
get_top_pages,
get_top_queries,
)
days = int(request.args.get("days", "28") or "28")
days = max(1, min(days, 730))
source = request.args.get("source", "") or None
queries = await get_top_queries(date_range_days=days, source=source)
pages = await get_top_pages(date_range_days=days, source=source)
countries = await get_country_breakdown(date_range_days=days)
devices = await get_device_breakdown(date_range_days=days)
return await render_template(
"admin/partials/seo_search.html",
queries=queries,
pages=pages,
countries=countries,
devices=devices,
date_range_days=days,
current_source=source,
)
@bp.route("/seo/funnel")
@role_required("admin")
async def seo_funnel():
"""HTMX partial: full funnel view."""
from ..seo import get_funnel_metrics
days = int(request.args.get("days", "28") or "28")
days = max(1, min(days, 730))
funnel = await get_funnel_metrics(date_range_days=days)
return await render_template(
"admin/partials/seo_funnel.html",
funnel=funnel,
date_range_days=days,
)
@bp.route("/seo/scorecard")
@role_required("admin")
async def seo_scorecard():
"""HTMX partial: article scorecard."""
from ..seo import get_article_scorecard
days = int(request.args.get("days", "28") or "28")
days = max(1, min(days, 730))
template_slug = request.args.get("template_slug", "") or None
country_filter = request.args.get("country", "") or None
language = request.args.get("language", "") or None
sort_by = request.args.get("sort", "impressions")
sort_dir = request.args.get("dir", "desc")
scorecard = await get_article_scorecard(
date_range_days=days,
template_slug=template_slug,
country=country_filter,
language=language,
sort_by=sort_by,
sort_dir=sort_dir,
)
return await render_template(
"admin/partials/seo_scorecard.html",
scorecard=scorecard,
date_range_days=days,
current_template=template_slug,
current_country=country_filter,
current_language=language,
current_sort=sort_by,
current_dir=sort_dir,
)
@bp.route("/seo/sync", methods=["POST"])
@role_required("admin")
@csrf_protect
async def seo_sync_now():
"""Manually trigger SEO data sync."""
from ..worker import enqueue
form = await request.form
source = form.get("source", "all")
if source == "all":
await enqueue("sync_gsc")
await enqueue("sync_bing")
await enqueue("sync_umami")
await flash("All SEO syncs queued.", "success")
elif source in ("gsc", "bing", "umami"):
await enqueue(f"sync_{source}")
await flash(f"{source.upper()} sync queued.", "success")
else:
await flash("Unknown source.", "error")
return redirect(url_for("admin.seo"))
# =============================================================================
# Outreach Pipeline
# =============================================================================
OUTREACH_STATUSES = [
"prospect", "contacted", "replied", "signed_up", "declined", "not_interested",
]
# Advancing from prospect to contacted happens automatically on first email send.
# Other status changes are manual via the status dropdown HTMX endpoint.
_PROSPECT_ADVANCE_TARGET = "contacted"
# CSV import: columns accepted (minimum: name + contact_email)
_CSV_REQUIRED = {"name", "contact_email"}
_CSV_OPTIONAL = {"country_code", "category", "website"}
_CSV_IMPORT_LIMIT = 500 # guard against huge uploads
async def get_follow_up_due_count() -> int:
"""Count pipeline suppliers with follow_up_at <= today."""
row = await fetch_one(
"""SELECT COUNT(*) as cnt FROM suppliers
WHERE outreach_status IS NOT NULL AND follow_up_at <= date('now')"""
)
return row["cnt"] if row else 0
async def get_outreach_pipeline() -> dict:
"""Count suppliers per outreach status for the pipeline summary cards."""
rows = await fetch_all(
"""SELECT outreach_status, COUNT(*) as cnt
FROM suppliers
WHERE outreach_status IS NOT NULL
GROUP BY outreach_status"""
)
counts = {r["outreach_status"]: r["cnt"] for r in rows}
return {
"total": sum(counts.values()),
"counts": counts,
}
async def get_outreach_suppliers(
status: str = None,
country: str = None,
search: str = None,
follow_up: str = None,
page: int = 1,
per_page: int = 50,
) -> list[dict]:
"""Filtered list of suppliers that are in the outreach pipeline."""
wheres = ["outreach_status IS NOT NULL"]
params: list = []
if status:
wheres.append("outreach_status = ?")
params.append(status)
if country:
wheres.append("country_code = ?")
params.append(country)
if search:
wheres.append("(name LIKE ? OR contact_email LIKE ?)")
params.extend([f"%{search}%", f"%{search}%"])
if follow_up == "due":
wheres.append("follow_up_at <= date('now')")
elif follow_up == "set":
wheres.append("follow_up_at IS NOT NULL")
where = " AND ".join(wheres)
offset = (page - 1) * per_page
params.extend([per_page, offset])
return await fetch_all(
f"""SELECT id, name, country_code, category, contact_email,
outreach_status, outreach_notes, last_contacted_at,
outreach_sequence_step, follow_up_at
FROM suppliers
WHERE {where}
ORDER BY
CASE outreach_status
WHEN 'replied' THEN 1
WHEN 'contacted' THEN 2
WHEN 'prospect' THEN 3
ELSE 4
END,
last_contacted_at DESC NULLS LAST,
name ASC
LIMIT ? OFFSET ?""",
tuple(params),
)
@bp.route("/outreach")
@role_required("admin")
async def outreach():
"""Outreach pipeline dashboard."""
status = request.args.get("status", "")
country = request.args.get("country", "")
search = request.args.get("search", "").strip()
follow_up = request.args.get("follow_up", "")
page = max(1, int(request.args.get("page", "1") or "1"))
pipeline = await get_outreach_pipeline()
follow_up_due = await get_follow_up_due_count()
supplier_list = await get_outreach_suppliers(
status=status or None, country=country or None,
search=search or None, follow_up=follow_up or None, page=page,
)
countries = await fetch_all(
"""SELECT DISTINCT country_code FROM suppliers
WHERE outreach_status IS NOT NULL AND country_code IS NOT NULL
ORDER BY country_code"""
)
return await render_template(
"admin/outreach.html",
pipeline=pipeline,
follow_up_due=follow_up_due,
suppliers=supplier_list,
statuses=OUTREACH_STATUSES,
countries=[c["country_code"] for c in countries],
current_status=status,
current_country=country,
current_search=search,
current_follow_up=follow_up,
page=page,
)
@bp.route("/outreach/results")
@role_required("admin")
async def outreach_results():
"""HTMX partial: filtered outreach supplier rows."""
status = request.args.get("status", "")
country = request.args.get("country", "")
search = request.args.get("search", "").strip()
follow_up = request.args.get("follow_up", "")
page = max(1, int(request.args.get("page", "1") or "1"))
supplier_list = await get_outreach_suppliers(
status=status or None, country=country or None,
search=search or None, follow_up=follow_up or None, page=page,
)
return await render_template(
"admin/partials/outreach_results.html", suppliers=supplier_list,
)
@bp.route("/outreach/<int:supplier_id>/status", methods=["POST"])
@role_required("admin")
@csrf_protect
async def outreach_status(supplier_id: int):
"""HTMX: update outreach_status for a supplier, return the updated row."""
supplier = await fetch_one(
"SELECT * FROM suppliers WHERE id = ? AND outreach_status IS NOT NULL",
(supplier_id,),
)
if not supplier:
return Response("Not found", status=404)
form = await request.form
new_status = form.get("outreach_status", "").strip()
assert new_status in OUTREACH_STATUSES, f"invalid status: {new_status!r}"
await execute(
"UPDATE suppliers SET outreach_status = ? WHERE id = ?",
(new_status, supplier_id),
)
updated = await fetch_one("SELECT * FROM suppliers WHERE id = ?", (supplier_id,))
# Template uses `s` to match the loop variable in outreach_results.html
return await render_template(
"admin/partials/outreach_row.html", s=updated,
)
@bp.route("/outreach/<int:supplier_id>/note", methods=["POST"])
@role_required("admin")
@csrf_protect
async def outreach_note(supplier_id: int):
"""HTMX: update outreach_notes for a supplier, return the note cell."""
supplier = await fetch_one(
"SELECT id FROM suppliers WHERE id = ? AND outreach_status IS NOT NULL",
(supplier_id,),
)
if not supplier:
return Response("Not found", status=404)
form = await request.form
note = form.get("note", "").strip()
await execute(
"UPDATE suppliers SET outreach_notes = ? WHERE id = ?",
(note or None, supplier_id),
)
return note[:80] + ("" if len(note) > 80 else "") if note else ""
@bp.route("/outreach/<int:supplier_id>/follow-up", methods=["POST"])
@role_required("admin")
@csrf_protect
async def outreach_follow_up(supplier_id: int):
"""HTMX: set or clear the follow_up_at date for a supplier, return the updated row."""
supplier = await fetch_one(
"SELECT * FROM suppliers WHERE id = ? AND outreach_status IS NOT NULL",
(supplier_id,),
)
if not supplier:
return Response("Not found", status=404)
form = await request.form
follow_up_at_raw = form.get("follow_up_at", "").strip()
# Accept YYYY-MM-DD or empty (to clear)
follow_up_at = follow_up_at_raw if follow_up_at_raw else None
await execute(
"UPDATE suppliers SET follow_up_at = ? WHERE id = ?",
(follow_up_at, supplier_id),
)
updated = await fetch_one("SELECT * FROM suppliers WHERE id = ?", (supplier_id,))
return await render_template("admin/partials/outreach_row.html", s=updated)
@bp.route("/outreach/add-prospects", methods=["POST"])
@role_required("admin")
@csrf_protect
async def outreach_add_prospects():
"""Bulk-set existing suppliers to 'prospect' status.
Accepts comma-separated supplier_ids from the suppliers list page.
Only updates suppliers where outreach_status IS NULL (not already in pipeline).
"""
form = await request.form
ids_raw = form.get("supplier_ids", "").strip()
if not ids_raw:
await flash("No suppliers selected.", "error")
return redirect(url_for("admin.suppliers"))
# Parse and validate — ignore non-integer tokens
supplier_ids = [int(i) for i in ids_raw.split(",") if i.strip().isdigit()]
assert len(supplier_ids) <= 500, "too many supplier IDs in bulk action"
if not supplier_ids:
await flash("No valid supplier IDs.", "error")
return redirect(url_for("admin.suppliers"))
# Build parameterized query — no string formatting of IDs
placeholders = ",".join("?" for _ in supplier_ids)
await execute(
f"""UPDATE suppliers
SET outreach_status = 'prospect'
WHERE id IN ({placeholders}) AND outreach_status IS NULL""",
tuple(supplier_ids),
)
await flash(f"Added up to {len(supplier_ids)} suppliers to the outreach pipeline.", "success")
return redirect(url_for("admin.outreach"))
@bp.route("/outreach/import", methods=["GET", "POST"])
@role_required("admin")
@csrf_protect
async def outreach_import():
"""CSV import: create supplier rows as prospects.
CSV columns: name, contact_email (required), country_code, category, website (optional).
Deduplicates by contact_email — skips rows where email already exists.
"""
if request.method == "GET":
return await render_template("admin/outreach_import.html")
files = await request.files
csv_file = files.get("csv_file")
if not csv_file or not csv_file.filename:
await flash("No file uploaded.", "error")
return await render_template("admin/outreach_import.html")
raw = csv_file.read().decode("utf-8", errors="replace")
reader = csv.DictReader(io.StringIO(raw))
# Validate headers
fieldnames = set(reader.fieldnames or [])
missing = _CSV_REQUIRED - fieldnames
if missing:
await flash(f"CSV missing required columns: {', '.join(sorted(missing))}", "error")
return await render_template("admin/outreach_import.html")
# Collect existing emails to dedup in one query
existing_rows = await fetch_all(
"SELECT contact_email FROM suppliers WHERE contact_email IS NOT NULL"
)
existing_emails = {r["contact_email"].lower() for r in existing_rows}
imported = 0
skipped = 0
now = utcnow_iso()
for row_num, row in enumerate(reader, start=2):
if row_num > _CSV_IMPORT_LIMIT + 1:
await flash(f"Import capped at {_CSV_IMPORT_LIMIT} rows.", "warning")
break
name = (row.get("name") or "").strip()
contact_email = (row.get("contact_email") or "").strip().lower()
if not name or not contact_email:
skipped += 1
continue
if contact_email in existing_emails:
skipped += 1
continue
country_code = (row.get("country_code") or "").strip().upper() or None
category = (row.get("category") or "").strip() or None
website = (row.get("website") or "").strip() or None
slug_base = slugify(name)
# Ensure unique slug by appending a counter if needed
slug = slug_base
counter = 1
while await fetch_one("SELECT id FROM suppliers WHERE slug = ?", (slug,)):
slug = f"{slug_base}-{counter}"
counter += 1
assert counter <= 100, f"slug collision loop for {name!r}"
await execute(
"""INSERT INTO suppliers
(name, slug, country_code, region, category, website,
tier, outreach_status, created_at)
VALUES (?, ?, ?, 'Europe', ?, ?, 'free', 'prospect', ?)""",
(name, slug, country_code, category, website, now),
)
existing_emails.add(contact_email)
imported += 1
await flash(f"Imported {imported} suppliers. Skipped {skipped} (duplicates or missing data).", "success")
return redirect(url_for("admin.outreach"))