Files
padelnomics/web/src/padelnomics/webhooks.py
Deeman 5644a1ebf8 fix: replace datetime.utcnow() with utcnow()/utcnow_iso() across all source files
Migrates 15 source files from the deprecated datetime.utcnow() API.
Uses utcnow() for in-memory math and utcnow_iso() (strftime format)
for SQLite TEXT column writes to preserve lexicographic sort order.
Also fixes datetime.utcfromtimestamp() in seo/_bing.py.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 10:22:42 +01:00

106 lines
3.3 KiB
Python

"""
Resend webhook handler — receives delivery events and inbound emails.
NOT behind @role_required: Resend posts here unauthenticated.
Verification uses RESEND_WEBHOOK_SECRET via the Resend SDK.
"""
import resend
from quart import Blueprint, jsonify, request
from .core import config, execute, utcnow_iso
bp = Blueprint("webhooks", __name__, url_prefix="/webhooks")
# Maps Resend event types to (column_to_update, timestamp_column) pairs.
_EVENT_UPDATES: dict[str, tuple[str, str | None]] = {
"email.delivered": ("delivered", "delivered_at"),
"email.bounced": ("bounced", "bounced_at"),
"email.opened": ("opened", "opened_at"),
"email.clicked": ("clicked", "clicked_at"),
"email.complained": ("complained", None),
}
@bp.route("/resend", methods=["POST"])
async def resend_webhook():
"""Handle Resend webhook events (delivery tracking + inbound email)."""
body = await request.get_data()
# Verify signature when secret is configured
if config.RESEND_WEBHOOK_SECRET:
svix_id = request.headers.get("svix-id", "")
svix_timestamp = request.headers.get("svix-timestamp", "")
svix_signature = request.headers.get("svix-signature", "")
try:
wh = resend.Webhooks(config.RESEND_WEBHOOK_SECRET)
wh.verify(body, {
"svix-id": svix_id,
"svix-timestamp": svix_timestamp,
"svix-signature": svix_signature,
})
except Exception:
return jsonify({"error": "invalid signature"}), 401
payload = await request.get_json()
if not payload:
return jsonify({"error": "empty payload"}), 400
event_type = payload.get("type", "")
data = payload.get("data", {})
if event_type in _EVENT_UPDATES:
_handle_delivery_event(event_type, data)
elif event_type == "email.received":
await _handle_inbound(data)
return jsonify({"ok": True})
async def _handle_delivery_event(event_type: str, data: dict) -> None:
"""Update email_log with delivery event (idempotent)."""
email_id = data.get("email_id", "")
if not email_id:
return
last_event, ts_col = _EVENT_UPDATES[event_type]
now = utcnow_iso()
if ts_col:
await execute(
f"UPDATE email_log SET last_event = ?, {ts_col} = ? WHERE resend_id = ?",
(last_event, now, email_id),
)
else:
await execute(
"UPDATE email_log SET last_event = ? WHERE resend_id = ?",
(last_event, email_id),
)
async def _handle_inbound(data: dict) -> None:
"""Store an inbound email (INSERT OR IGNORE on resend_id)."""
resend_id = data.get("email_id", "")
if not resend_id:
return
now = utcnow_iso()
await execute(
"""INSERT OR IGNORE INTO inbound_emails
(resend_id, message_id, in_reply_to, from_addr, to_addr,
subject, text_body, html_body, received_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
resend_id,
data.get("message_id", ""),
data.get("in_reply_to", ""),
data.get("from", ""),
data.get("to", [""])[0] if isinstance(data.get("to"), list) else data.get("to", ""),
data.get("subject", ""),
data.get("text", ""),
data.get("html", ""),
now,
),
)