merge: fix datetime.utcnow() deprecation warnings across all files

Replaces 94 occurrences of deprecated datetime.utcnow() and
datetime.utcfromtimestamp() across 22 files with utcnow()/utcnow_iso()
helpers. Zero DeprecationWarnings remain. All 1201 tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-24 10:36:26 +01:00
27 changed files with 199 additions and 166 deletions

View File

@@ -2,7 +2,7 @@
Admin domain: role-based admin panel for managing users, tasks, etc.
"""
import json
from datetime import date, datetime, timedelta
from datetime import date, timedelta
from pathlib import Path
import mistune
@@ -29,6 +29,8 @@ from ..core import (
fetch_one,
send_email,
slugify,
utcnow,
utcnow_iso,
)
# Blueprint with its own template folder
@@ -64,9 +66,9 @@ def _admin_context():
async def get_dashboard_stats() -> dict:
"""Get admin dashboard statistics."""
now = datetime.utcnow()
now = utcnow()
today = now.date().isoformat()
week_ago = (now - timedelta(days=7)).isoformat()
week_ago = (now - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
users_total = await fetch_one("SELECT COUNT(*) as count FROM users WHERE deleted_at IS NULL")
users_today = await fetch_one(
"SELECT COUNT(*) as count FROM users WHERE created_at >= ? AND deleted_at IS NULL",
@@ -211,7 +213,7 @@ async def retry_task(task_id: int) -> bool:
SET status = 'pending', run_at = ?, error = NULL
WHERE id = ? AND status = 'failed'
""",
(datetime.utcnow().isoformat(), task_id)
(utcnow_iso(), task_id)
)
return result > 0
@@ -522,7 +524,7 @@ async def lead_new():
from ..credits import HEAT_CREDIT_COSTS
credit_cost = HEAT_CREDIT_COSTS.get(heat_score, 8)
now = datetime.utcnow().isoformat()
now = utcnow_iso()
verified_at = now if status != "pending_verification" else None
lead_id = await execute(
@@ -567,7 +569,7 @@ async def lead_forward(lead_id: int):
await flash("Already forwarded to this supplier.", "warning")
return redirect(url_for("admin.lead_detail", lead_id=lead_id))
now = datetime.utcnow().isoformat()
now = utcnow_iso()
await execute(
"""INSERT INTO lead_forwards (lead_id, supplier_id, credit_cost, status, created_at)
VALUES (?, ?, 0, 'sent', ?)""",
@@ -771,7 +773,7 @@ async def supplier_new():
instagram_url = form.get("instagram_url", "").strip()
youtube_url = form.get("youtube_url", "").strip()
now = datetime.utcnow().isoformat()
now = utcnow_iso()
supplier_id = await execute(
"""INSERT INTO suppliers
(name, slug, country_code, city, region, website, description, category,
@@ -865,7 +867,7 @@ async def flag_toggle():
return redirect(url_for("admin.flags"))
new_enabled = 0 if row["enabled"] else 1
now = datetime.utcnow().isoformat()
now = utcnow_iso()
await execute(
"UPDATE feature_flags SET enabled = ?, updated_at = ? WHERE name = ?",
(new_enabled, now, flag_name),
@@ -940,7 +942,7 @@ async def get_email_stats() -> dict:
total = await fetch_one("SELECT COUNT(*) as cnt FROM email_log")
delivered = await fetch_one("SELECT COUNT(*) as cnt FROM email_log WHERE last_event = 'delivered'")
bounced = await fetch_one("SELECT COUNT(*) as cnt FROM email_log WHERE last_event = 'bounced'")
today = datetime.utcnow().date().isoformat()
today = utcnow().date().isoformat()
sent_today = await fetch_one("SELECT COUNT(*) as cnt FROM email_log WHERE created_at >= ?", (today,))
return {
"total": total["cnt"] if total else 0,
@@ -1558,7 +1560,7 @@ async def scenario_edit(scenario_id: int):
dbl = state.get("dblCourts", 0)
sgl = state.get("sglCourts", 0)
court_config = f"{dbl} double + {sgl} single"
now = datetime.utcnow().isoformat()
now = utcnow_iso()
await execute(
"""UPDATE published_scenarios
@@ -1823,7 +1825,7 @@ async def article_new():
md_dir.mkdir(parents=True, exist_ok=True)
(md_dir / f"{article_slug}.md").write_text(body)
pub_dt = published_at or datetime.utcnow().isoformat()
pub_dt = published_at or utcnow_iso()
await execute(
"""INSERT INTO articles
@@ -1883,7 +1885,7 @@ async def article_edit(article_id: int):
md_dir.mkdir(parents=True, exist_ok=True)
(md_dir / f"{article['slug']}.md").write_text(body)
now = datetime.utcnow().isoformat()
now = utcnow_iso()
pub_dt = published_at or article["published_at"]
await execute(
@@ -1950,7 +1952,7 @@ async def article_publish(article_id: int):
return redirect(url_for("admin.articles"))
new_status = "published" if article["status"] == "draft" else "draft"
now = datetime.utcnow().isoformat()
now = utcnow_iso()
await execute(
"UPDATE articles SET status = ?, updated_at = ? WHERE id = ?",
(new_status, now, article_id),

View File

@@ -208,7 +208,7 @@ def create_app() -> Quart:
@app.context_processor
def inject_globals():
from datetime import datetime
from .core import utcnow as _utcnow
lang = g.get("lang") or _detect_lang()
g.lang = lang # ensure g.lang is always set (e.g. for dashboard/billing routes)
effective_lang = lang if lang in SUPPORTED_LANGS else "en"
@@ -217,7 +217,7 @@ def create_app() -> Quart:
"user": g.get("user"),
"subscription": g.get("subscription"),
"is_admin": "admin" in (g.get("user") or {}).get("roles", []),
"now": datetime.utcnow(),
"now": _utcnow(),
"csrf_token": get_csrf_token,
"ab_variant": getattr(g, "ab_variant", None),
"ab_tag": getattr(g, "ab_tag", None),

View File

@@ -3,7 +3,7 @@ Auth domain: magic link authentication, user management, decorators.
"""
import secrets
from datetime import datetime, timedelta
from datetime import timedelta
from functools import wraps
from pathlib import Path
@@ -18,6 +18,8 @@ from ..core import (
fetch_one,
is_disposable_email,
is_flag_enabled,
utcnow,
utcnow_iso,
)
from ..i18n import SUPPORTED_LANGS, get_translations
@@ -64,7 +66,7 @@ async def get_user_by_email(email: str) -> dict | None:
async def create_user(email: str) -> int:
"""Create new user, return ID."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
return await execute(
"INSERT INTO users (email, created_at) VALUES (?, ?)", (email.lower(), now)
)
@@ -82,10 +84,10 @@ async def update_user(user_id: int, **fields) -> None:
async def create_auth_token(user_id: int, token: str, minutes: int = None) -> int:
"""Create auth token for user."""
minutes = minutes or config.MAGIC_LINK_EXPIRY_MINUTES
expires = datetime.utcnow() + timedelta(minutes=minutes)
expires = utcnow() + timedelta(minutes=minutes)
return await execute(
"INSERT INTO auth_tokens (user_id, token, expires_at) VALUES (?, ?, ?)",
(user_id, token, expires.isoformat()),
(user_id, token, expires.strftime("%Y-%m-%d %H:%M:%S")),
)
@@ -98,14 +100,14 @@ async def get_valid_token(token: str) -> dict | None:
JOIN users u ON u.id = at.user_id
WHERE at.token = ? AND at.expires_at > ? AND at.used_at IS NULL
""",
(token, datetime.utcnow().isoformat()),
(token, utcnow_iso()),
)
async def mark_token_used(token_id: int) -> None:
"""Mark token as used."""
await execute(
"UPDATE auth_tokens SET used_at = ? WHERE id = ?", (datetime.utcnow().isoformat(), token_id)
"UPDATE auth_tokens SET used_at = ? WHERE id = ?", (utcnow_iso(), token_id)
)
@@ -331,7 +333,7 @@ async def verify():
await mark_token_used(token_data["id"])
# Update last login
await update_user(token_data["user_id"], last_login_at=datetime.utcnow().isoformat())
await update_user(token_data["user_id"], last_login_at=utcnow_iso())
# Set session
session.permanent = True

View File

@@ -5,7 +5,7 @@ Payment provider: paddle
import json
import secrets
from datetime import datetime
from datetime import timedelta
from pathlib import Path
from paddle_billing import Client as PaddleClient
@@ -14,7 +14,7 @@ from paddle_billing.Notifications import Secret, Verifier
from quart import Blueprint, flash, g, jsonify, redirect, render_template, request, session, url_for
from ..auth.routes import login_required
from ..core import config, execute, fetch_one, get_paddle_price
from ..core import config, execute, fetch_one, get_paddle_price, utcnow, utcnow_iso
from ..i18n import get_translations
@@ -69,7 +69,7 @@ async def upsert_subscription(
current_period_end: str = None,
) -> int:
"""Create or update subscription. Finds existing by provider_subscription_id."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
existing = await fetch_one(
"SELECT id FROM subscriptions WHERE provider_subscription_id = ?",
@@ -104,7 +104,7 @@ async def get_subscription_by_provider_id(subscription_id: str) -> dict | None:
async def update_subscription_status(provider_subscription_id: str, status: str, **extra) -> None:
"""Update subscription status by provider subscription ID."""
extra["updated_at"] = datetime.utcnow().isoformat()
extra["updated_at"] = utcnow_iso()
extra["status"] = status
sets = ", ".join(f"{k} = ?" for k in extra)
values = list(extra.values())
@@ -343,7 +343,7 @@ async def _handle_supplier_subscription_activated(data: dict, custom_data: dict)
base_plan, tier = _derive_tier_from_plan(plan)
monthly_credits = PLAN_MONTHLY_CREDITS.get(base_plan, 0)
now = datetime.utcnow().isoformat()
now = utcnow_iso()
async with db_transaction() as db:
# Update supplier record — Basic tier also gets is_verified = 1
@@ -392,7 +392,7 @@ async def _handle_transaction_completed(data: dict, custom_data: dict) -> None:
"""Handle one-time transaction completion (credit packs, sticky boosts, business plan)."""
supplier_id = custom_data.get("supplier_id")
user_id = custom_data.get("user_id")
now = datetime.utcnow().isoformat()
now = utcnow_iso()
items = data.get("items", [])
for item in items:
@@ -412,10 +412,8 @@ async def _handle_transaction_completed(data: dict, custom_data: dict) -> None:
# Sticky boost purchases
elif key == "boost_sticky_week" and supplier_id:
from datetime import timedelta
from ..core import transaction as db_transaction
expires = (datetime.utcnow() + timedelta(weeks=1)).isoformat()
expires = (utcnow() + timedelta(weeks=1)).strftime("%Y-%m-%d %H:%M:%S")
country = custom_data.get("sticky_country", "")
async with db_transaction() as db:
await db.execute(
@@ -430,10 +428,8 @@ async def _handle_transaction_completed(data: dict, custom_data: dict) -> None:
)
elif key == "boost_sticky_month" and supplier_id:
from datetime import timedelta
from ..core import transaction as db_transaction
expires = (datetime.utcnow() + timedelta(days=30)).isoformat()
expires = (utcnow() + timedelta(days=30)).strftime("%Y-%m-%d %H:%M:%S")
country = custom_data.get("sticky_country", "")
async with db_transaction() as db:
await db.execute(

View File

@@ -15,7 +15,7 @@ import yaml
from jinja2 import ChainableUndefined, Environment
from ..analytics import fetch_analytics
from ..core import execute, fetch_one, slugify
from ..core import execute, fetch_one, slugify, utcnow_iso
# ── Constants ────────────────────────────────────────────────────────────────
@@ -135,7 +135,7 @@ def _validate_table_name(data_table: str) -> None:
def _datetimeformat(value: str, fmt: str = "%Y-%m-%d") -> str:
"""Jinja2 filter: format a date string (or 'now') with strftime."""
from datetime import UTC, datetime
from datetime import datetime
if value == "now":
dt = datetime.now(UTC)
@@ -301,7 +301,7 @@ async def generate_articles(
publish_date = start_date
published_today = 0
generated = 0
now_iso = datetime.now(UTC).isoformat()
now_iso = utcnow_iso()
for row in rows:
for lang in config["languages"]:

View File

@@ -10,7 +10,7 @@ import re
import secrets
import unicodedata
from contextvars import ContextVar
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
from functools import wraps
from pathlib import Path
@@ -88,6 +88,26 @@ class Config:
config = Config()
# =============================================================================
# Datetime helpers
# =============================================================================
def utcnow() -> datetime:
"""Timezone-aware UTC now (replaces deprecated datetime.utcnow())."""
return datetime.now(UTC)
def utcnow_iso() -> str:
"""UTC now as naive ISO string for SQLite TEXT columns.
Produces YYYY-MM-DD HH:MM:SS (space separator, no +00:00 suffix) to match
SQLite's native datetime('now') format so lexicographic SQL comparisons work.
"""
return datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S")
# =============================================================================
# Database
# =============================================================================
@@ -528,17 +548,18 @@ async def check_rate_limit(key: str, limit: int = None, window: int = None) -> t
"""
limit = limit or config.RATE_LIMIT_REQUESTS
window = window or config.RATE_LIMIT_WINDOW
now = datetime.utcnow()
now = utcnow()
window_start = now - timedelta(seconds=window)
# Clean old entries and count recent
await execute(
"DELETE FROM rate_limits WHERE key = ? AND timestamp < ?", (key, window_start.isoformat())
"DELETE FROM rate_limits WHERE key = ? AND timestamp < ?",
(key, window_start.strftime("%Y-%m-%d %H:%M:%S")),
)
result = await fetch_one(
"SELECT COUNT(*) as count FROM rate_limits WHERE key = ? AND timestamp > ?",
(key, window_start.isoformat()),
(key, window_start.strftime("%Y-%m-%d %H:%M:%S")),
)
count = result["count"] if result else 0
@@ -552,7 +573,10 @@ async def check_rate_limit(key: str, limit: int = None, window: int = None) -> t
return False, info
# Record this request
await execute("INSERT INTO rate_limits (key, timestamp) VALUES (?, ?)", (key, now.isoformat()))
await execute(
"INSERT INTO rate_limits (key, timestamp) VALUES (?, ?)",
(key, now.strftime("%Y-%m-%d %H:%M:%S")),
)
return True, info
@@ -628,7 +652,7 @@ async def soft_delete(table: str, id: int) -> bool:
"""Mark record as deleted."""
result = await execute(
f"UPDATE {table} SET deleted_at = ? WHERE id = ? AND deleted_at IS NULL",
(datetime.utcnow().isoformat(), id),
(utcnow_iso(), id),
)
return result > 0
@@ -647,7 +671,7 @@ async def hard_delete(table: str, id: int) -> bool:
async def purge_deleted(table: str, days: int = 30) -> int:
"""Purge records deleted more than X days ago."""
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
cutoff = (utcnow() - timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S")
return await execute(
f"DELETE FROM {table} WHERE deleted_at IS NOT NULL AND deleted_at < ?", (cutoff,)
)

View File

@@ -5,9 +5,7 @@ All balance mutations go through this module to keep credit_ledger (source of tr
and suppliers.credit_balance (denormalized cache) in sync within a single transaction.
"""
from datetime import datetime
from .core import execute, fetch_all, fetch_one, transaction
from .core import execute, fetch_all, fetch_one, transaction, utcnow_iso
# Credit cost per heat tier
HEAT_CREDIT_COSTS = {"hot": 35, "warm": 20, "cool": 8}
@@ -44,7 +42,7 @@ async def add_credits(
note: str = None,
) -> int:
"""Add credits to a supplier. Returns new balance."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
async with transaction() as db:
row = await db.execute_fetchall(
"SELECT credit_balance FROM suppliers WHERE id = ?", (supplier_id,)
@@ -73,7 +71,7 @@ async def spend_credits(
note: str = None,
) -> int:
"""Spend credits from a supplier. Returns new balance. Raises InsufficientCredits."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
async with transaction() as db:
row = await db.execute_fetchall(
"SELECT credit_balance FROM suppliers WHERE id = ?", (supplier_id,)
@@ -116,7 +114,7 @@ async def unlock_lead(supplier_id: int, lead_id: int) -> dict:
raise ValueError("Lead not found")
cost = lead["credit_cost"] or compute_credit_cost(lead)
now = datetime.utcnow().isoformat()
now = utcnow_iso()
async with transaction() as db:
# Check balance
@@ -180,7 +178,7 @@ async def monthly_credit_refill(supplier_id: int) -> int:
if not row or not row["monthly_credits"]:
return 0
now = datetime.utcnow().isoformat()
now = utcnow_iso()
new_balance = await add_credits(
supplier_id,
row["monthly_credits"],
@@ -201,6 +199,6 @@ async def get_ledger(supplier_id: int, limit: int = 50) -> list[dict]:
FROM credit_ledger cl
LEFT JOIN lead_forwards lf ON cl.reference_id = lf.id AND cl.event_type = 'lead_unlock'
WHERE cl.supplier_id = ?
ORDER BY cl.created_at DESC LIMIT ?""",
ORDER BY cl.created_at DESC, cl.id DESC LIMIT ?""",
(supplier_id, limit),
)

View File

@@ -1,13 +1,12 @@
"""
Dashboard domain: user dashboard and settings.
"""
from datetime import datetime
from pathlib import Path
from quart import Blueprint, flash, g, redirect, render_template, request, url_for
from ..auth.routes import login_required, update_user
from ..core import csrf_protect, fetch_one, soft_delete
from ..core import csrf_protect, fetch_one, soft_delete, utcnow_iso
from ..i18n import get_translations
bp = Blueprint(
@@ -57,7 +56,7 @@ async def settings():
await update_user(
g.user["id"],
name=form.get("name", "").strip() or None,
updated_at=datetime.utcnow().isoformat(),
updated_at=utcnow_iso(),
)
t = get_translations(g.get("lang") or "en")
await flash(t["dash_settings_saved"], "success")

View File

@@ -2,12 +2,11 @@
Supplier directory: public, searchable listing of padel court suppliers.
"""
from datetime import UTC, datetime
from pathlib import Path
from quart import Blueprint, g, make_response, redirect, render_template, request, url_for
from ..core import csrf_protect, execute, fetch_all, fetch_one
from ..core import csrf_protect, execute, fetch_all, fetch_one, utcnow_iso
from ..i18n import get_translations
bp = Blueprint(
@@ -89,7 +88,7 @@ async def _build_directory_query(q, country, category, region, page, per_page=24
lang = g.get("lang", "en")
cat_labels, country_labels, region_labels = get_directory_labels(lang)
now = datetime.now(UTC).isoformat()
now = utcnow_iso()
params: list = []
wheres: list[str] = []

View File

@@ -4,7 +4,6 @@ Leads domain: capture interest in court suppliers and financing.
import json
import secrets
from datetime import datetime
from pathlib import Path
from quart import Blueprint, flash, g, jsonify, redirect, render_template, request, session, url_for
@@ -27,6 +26,7 @@ from ..core import (
is_disposable_email,
is_plausible_phone,
send_email,
utcnow_iso,
)
from ..i18n import get_translations
@@ -102,7 +102,7 @@ async def suppliers():
form.get("court_count", 0),
form.get("budget", 0),
form.get("message", ""),
datetime.utcnow().isoformat(),
utcnow_iso(),
),
)
# Notify admin
@@ -147,7 +147,7 @@ async def financing():
form.get("court_count", 0),
form.get("budget", 0),
form.get("message", ""),
datetime.utcnow().isoformat(),
utcnow_iso(),
),
)
await send_email(
@@ -375,7 +375,7 @@ async def quote_request():
status,
credit_cost,
secrets.token_urlsafe(16),
datetime.utcnow().isoformat(),
utcnow_iso(),
),
)
@@ -520,7 +520,7 @@ async def verify_quote():
from ..credits import compute_credit_cost
credit_cost = compute_credit_cost(dict(lead))
now = datetime.utcnow().isoformat()
now = utcnow_iso()
await execute(
"UPDATE lead_requests SET status = 'new', verified_at = ?, credit_cost = ? WHERE id = ?",
(now, credit_cost, lead["id"]),

View File

@@ -4,7 +4,6 @@ Planner domain: padel court financial planner + scenario management.
import json
import math
from datetime import datetime
from pathlib import Path
from quart import Blueprint, Response, g, jsonify, render_template, request
@@ -18,6 +17,7 @@ from ..core import (
fetch_all,
fetch_one,
get_paddle_price,
utcnow_iso,
)
from ..i18n import get_translations
from .calculator import COUNTRY_CURRENCY, CURRENCY_DEFAULT, calc, validate_state
@@ -502,7 +502,7 @@ async def save_scenario():
location = form.get("location", "")
scenario_id = form.get("scenario_id")
now = datetime.utcnow().isoformat()
now = utcnow_iso()
is_first_save = not scenario_id and (await count_scenarios(g.user["id"])) == 0
@@ -563,7 +563,7 @@ async def get_scenario(scenario_id: int):
@login_required
@csrf_protect
async def delete_scenario(scenario_id: int):
now = datetime.utcnow().isoformat()
now = utcnow_iso()
await execute(
"UPDATE scenarios SET deleted_at = ? WHERE id = ? AND user_id = ? AND deleted_at IS NULL",
(now, scenario_id, g.user["id"]),

View File

@@ -18,7 +18,7 @@ import json
import os
import sqlite3
import sys
from datetime import date, timedelta
from datetime import UTC, date, datetime, timedelta
from pathlib import Path
from dotenv import load_dotenv
@@ -1390,7 +1390,7 @@ def seed_templates(conn: sqlite3.Connection) -> dict[str, int]:
def seed_data_rows(conn: sqlite3.Connection, template_ids: dict[str, int]) -> int:
"""Insert template_data rows for all cities × languages. Returns count inserted."""
now = __import__("datetime").datetime.utcnow().isoformat()
now = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S")
inserted = 0
en_id = template_ids.get("city-padel-cost-en")

View File

@@ -10,7 +10,7 @@ Usage:
import os
import sqlite3
import sys
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
from pathlib import Path
from dotenv import load_dotenv
@@ -292,7 +292,7 @@ def main():
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
now = datetime.utcnow()
now = datetime.now(UTC)
# 1. Create dev user
print("Creating dev user (dev@localhost)...")
@@ -303,7 +303,7 @@ def main():
else:
cursor = conn.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("dev@localhost", "Dev User", now.isoformat()),
("dev@localhost", "Dev User", now.strftime("%Y-%m-%d %H:%M:%S")),
)
dev_user_id = cursor.lastrowid
print(f" Created (id={dev_user_id})")
@@ -336,7 +336,7 @@ def main():
s["website"], s["description"], s["category"], s["tier"],
s["credit_balance"], s["monthly_credits"], s["contact_name"],
s["contact_email"], s["years_in_business"], s["project_count"],
s["service_area"], now.isoformat(),
s["service_area"], now.strftime("%Y-%m-%d %H:%M:%S"),
),
)
supplier_ids[s["slug"]] = cursor.lastrowid
@@ -349,7 +349,7 @@ def main():
("courtbuild-spain", "supplier_growth", "maria@courtbuild.example.com", "Maria Garcia"),
("desert-padel-fze", "supplier_pro", "ahmed@desertpadel.example.com", "Ahmed Al-Rashid"),
]
period_end = (now + timedelta(days=30)).isoformat()
period_end = (now + timedelta(days=30)).strftime("%Y-%m-%d %H:%M:%S")
for slug, plan, email, name in claimed_suppliers:
sid = supplier_ids.get(slug)
if not sid:
@@ -364,14 +364,14 @@ def main():
else:
cursor = conn.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
(email, name, now.isoformat()),
(email, name, now.strftime("%Y-%m-%d %H:%M:%S")),
)
owner_id = cursor.lastrowid
# Claim the supplier
conn.execute(
"UPDATE suppliers SET claimed_by = ?, claimed_at = ? WHERE id = ? AND claimed_by IS NULL",
(owner_id, now.isoformat(), sid),
(owner_id, now.strftime("%Y-%m-%d %H:%M:%S"), sid),
)
# Create billing customer record
@@ -382,7 +382,7 @@ def main():
conn.execute(
"""INSERT INTO billing_customers (user_id, provider_customer_id, created_at)
VALUES (?, ?, ?)""",
(owner_id, f"ctm_dev_{slug}", now.isoformat()),
(owner_id, f"ctm_dev_{slug}", now.strftime("%Y-%m-%d %H:%M:%S")),
)
# Create active subscription
@@ -396,7 +396,7 @@ def main():
current_period_end, created_at)
VALUES (?, ?, 'active', ?, ?, ?)""",
(owner_id, plan, f"sub_dev_{slug}",
period_end, now.isoformat()),
period_end, now.strftime("%Y-%m-%d %H:%M:%S")),
)
print(f" {slug} -> owner {email} ({plan})")

View File

@@ -3,12 +3,12 @@
Uses an API key for auth. Fetches query stats and page stats.
"""
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
from urllib.parse import urlparse
import httpx
from ..core import config, execute
from ..core import config, execute, utcnow, utcnow_iso
_TIMEOUT_SECONDS = 30
@@ -27,7 +27,7 @@ async def sync_bing(days_back: int = 3, timeout_seconds: int = _TIMEOUT_SECONDS)
if not config.BING_WEBMASTER_API_KEY or not config.BING_SITE_URL:
return 0 # Bing not configured — skip silently
started_at = datetime.utcnow()
started_at = utcnow()
try:
rows_synced = 0
@@ -48,14 +48,14 @@ async def sync_bing(days_back: int = 3, timeout_seconds: int = _TIMEOUT_SECONDS)
if not isinstance(entries, list):
entries = []
cutoff = datetime.utcnow() - timedelta(days=days_back)
cutoff = utcnow() - timedelta(days=days_back)
for entry in entries:
# Bing date format: "/Date(1708905600000)/" (ms since epoch)
date_str = entry.get("Date", "")
if "/Date(" in date_str:
ms = int(date_str.split("(")[1].split(")")[0])
entry_date = datetime.utcfromtimestamp(ms / 1000)
entry_date = datetime.fromtimestamp(ms / 1000, tz=UTC)
else:
continue
@@ -99,7 +99,7 @@ async def sync_bing(days_back: int = 3, timeout_seconds: int = _TIMEOUT_SECONDS)
date_str = entry.get("Date", "")
if "/Date(" in date_str:
ms = int(date_str.split("(")[1].split(")")[0])
entry_date = datetime.utcfromtimestamp(ms / 1000)
entry_date = datetime.fromtimestamp(ms / 1000, tz=UTC)
else:
continue
@@ -122,21 +122,21 @@ async def sync_bing(days_back: int = 3, timeout_seconds: int = _TIMEOUT_SECONDS)
)
rows_synced += 1
duration_ms = int((datetime.utcnow() - started_at).total_seconds() * 1000)
duration_ms = int((utcnow() - started_at).total_seconds() * 1000)
await execute(
"""INSERT INTO seo_sync_log
(source, status, rows_synced, started_at, completed_at, duration_ms)
VALUES ('bing', 'success', ?, ?, ?, ?)""",
(rows_synced, started_at.isoformat(), datetime.utcnow().isoformat(), duration_ms),
(rows_synced, started_at.strftime("%Y-%m-%d %H:%M:%S"), utcnow_iso(), duration_ms),
)
return rows_synced
except Exception as exc:
duration_ms = int((datetime.utcnow() - started_at).total_seconds() * 1000)
duration_ms = int((utcnow() - started_at).total_seconds() * 1000)
await execute(
"""INSERT INTO seo_sync_log
(source, status, rows_synced, error, started_at, completed_at, duration_ms)
VALUES ('bing', 'failed', 0, ?, ?, ?, ?)""",
(str(exc), started_at.isoformat(), datetime.utcnow().isoformat(), duration_ms),
(str(exc), started_at.strftime("%Y-%m-%d %H:%M:%S"), utcnow_iso(), duration_ms),
)
raise

View File

@@ -5,11 +5,11 @@ is synchronous, so sync runs in asyncio.to_thread().
"""
import asyncio
from datetime import datetime, timedelta
from datetime import timedelta
from pathlib import Path
from urllib.parse import urlparse
from ..core import config, execute
from ..core import config, execute, utcnow, utcnow_iso
# GSC returns max 25K rows per request
_ROWS_PER_PAGE = 25_000
@@ -95,11 +95,11 @@ async def sync_gsc(days_back: int = 3, max_pages: int = 10) -> int:
if not config.GSC_SERVICE_ACCOUNT_PATH or not config.GSC_SITE_URL:
return 0 # GSC not configured — skip silently
started_at = datetime.utcnow()
started_at = utcnow()
# GSC has ~2 day delay; fetch from days_back ago to 2 days ago
end_date = (datetime.utcnow() - timedelta(days=2)).strftime("%Y-%m-%d")
start_date = (datetime.utcnow() - timedelta(days=days_back + 2)).strftime("%Y-%m-%d")
end_date = (utcnow() - timedelta(days=2)).strftime("%Y-%m-%d")
start_date = (utcnow() - timedelta(days=days_back + 2)).strftime("%Y-%m-%d")
try:
rows = await asyncio.to_thread(
@@ -122,21 +122,21 @@ async def sync_gsc(days_back: int = 3, max_pages: int = 10) -> int:
)
rows_synced += 1
duration_ms = int((datetime.utcnow() - started_at).total_seconds() * 1000)
duration_ms = int((utcnow() - started_at).total_seconds() * 1000)
await execute(
"""INSERT INTO seo_sync_log
(source, status, rows_synced, started_at, completed_at, duration_ms)
VALUES ('gsc', 'success', ?, ?, ?, ?)""",
(rows_synced, started_at.isoformat(), datetime.utcnow().isoformat(), duration_ms),
(rows_synced, started_at.strftime("%Y-%m-%d %H:%M:%S"), utcnow_iso(), duration_ms),
)
return rows_synced
except Exception as exc:
duration_ms = int((datetime.utcnow() - started_at).total_seconds() * 1000)
duration_ms = int((utcnow() - started_at).total_seconds() * 1000)
await execute(
"""INSERT INTO seo_sync_log
(source, status, rows_synced, error, started_at, completed_at, duration_ms)
VALUES ('gsc', 'failed', 0, ?, ?, ?, ?)""",
(str(exc), started_at.isoformat(), datetime.utcnow().isoformat(), duration_ms),
(str(exc), started_at.strftime("%Y-%m-%d %H:%M:%S"), utcnow_iso(), duration_ms),
)
raise

View File

@@ -4,14 +4,14 @@ All heavy lifting happens in SQL. Functions accept filter parameters
and return plain dicts/lists.
"""
from datetime import datetime, timedelta
from datetime import timedelta
from ..core import execute, fetch_all, fetch_one
from ..core import execute, fetch_all, fetch_one, utcnow
def _date_cutoff(date_range_days: int) -> str:
"""Return ISO date string for N days ago."""
return (datetime.utcnow() - timedelta(days=date_range_days)).strftime("%Y-%m-%d")
return (utcnow() - timedelta(days=date_range_days)).strftime("%Y-%m-%d")
async def get_search_performance(

View File

@@ -4,11 +4,11 @@ Uses bearer token auth. Self-hosted instance, no rate limits.
Config already exists: UMAMI_API_URL, UMAMI_API_TOKEN, UMAMI_WEBSITE_ID.
"""
from datetime import datetime, timedelta
from datetime import timedelta
import httpx
from ..core import config, execute
from ..core import config, execute, utcnow, utcnow_iso
_TIMEOUT_SECONDS = 15
@@ -21,7 +21,7 @@ async def sync_umami(days_back: int = 3, timeout_seconds: int = _TIMEOUT_SECONDS
if not config.UMAMI_API_TOKEN or not config.UMAMI_API_URL:
return 0 # Umami not configured — skip silently
started_at = datetime.utcnow()
started_at = utcnow()
try:
rows_synced = 0
@@ -34,7 +34,7 @@ async def sync_umami(days_back: int = 3, timeout_seconds: int = _TIMEOUT_SECONDS
# (Umami's metrics endpoint returns totals for the period,
# so we query one day at a time for daily granularity)
for day_offset in range(days_back):
day = datetime.utcnow() - timedelta(days=day_offset + 1)
day = utcnow() - timedelta(days=day_offset + 1)
metric_date = day.strftime("%Y-%m-%d")
start_ms = int(day.replace(hour=0, minute=0, second=0).timestamp() * 1000)
end_ms = int(day.replace(hour=23, minute=59, second=59).timestamp() * 1000)
@@ -96,21 +96,21 @@ async def sync_umami(days_back: int = 3, timeout_seconds: int = _TIMEOUT_SECONDS
(metric_date, page_count, visitors, br, avg_time),
)
duration_ms = int((datetime.utcnow() - started_at).total_seconds() * 1000)
duration_ms = int((utcnow() - started_at).total_seconds() * 1000)
await execute(
"""INSERT INTO seo_sync_log
(source, status, rows_synced, started_at, completed_at, duration_ms)
VALUES ('umami', 'success', ?, ?, ?, ?)""",
(rows_synced, started_at.isoformat(), datetime.utcnow().isoformat(), duration_ms),
(rows_synced, started_at.strftime("%Y-%m-%d %H:%M:%S"), utcnow_iso(), duration_ms),
)
return rows_synced
except Exception as exc:
duration_ms = int((datetime.utcnow() - started_at).total_seconds() * 1000)
duration_ms = int((utcnow() - started_at).total_seconds() * 1000)
await execute(
"""INSERT INTO seo_sync_log
(source, status, rows_synced, error, started_at, completed_at, duration_ms)
VALUES ('umami', 'failed', 0, ?, ?, ?, ?)""",
(str(exc), started_at.isoformat(), datetime.utcnow().isoformat(), duration_ms),
(str(exc), started_at.strftime("%Y-%m-%d %H:%M:%S"), utcnow_iso(), duration_ms),
)
raise

View File

@@ -13,9 +13,9 @@ from ..core import (
config,
csrf_protect,
execute,
feature_gate,
fetch_all,
fetch_one,
feature_gate,
get_paddle_price,
is_flag_enabled,
)

View File

@@ -5,12 +5,10 @@ NOT behind @role_required: Resend posts here unauthenticated.
Verification uses RESEND_WEBHOOK_SECRET via the Resend SDK.
"""
from datetime import datetime
import resend
from quart import Blueprint, jsonify, request
from .core import config, execute
from .core import config, execute, utcnow_iso
bp = Blueprint("webhooks", __name__, url_prefix="/webhooks")
@@ -67,7 +65,7 @@ async def _handle_delivery_event(event_type: str, data: dict) -> None:
return
last_event, ts_col = _EVENT_UPDATES[event_type]
now = datetime.utcnow().isoformat()
now = utcnow_iso()
if ts_col:
await execute(
@@ -87,7 +85,7 @@ async def _handle_inbound(data: dict) -> None:
if not resend_id:
return
now = datetime.utcnow().isoformat()
now = utcnow_iso()
await execute(
"""INSERT OR IGNORE INTO inbound_emails
(resend_id, message_id, in_reply_to, from_addr, to_addr,

View File

@@ -7,7 +7,17 @@ import json
import traceback
from datetime import datetime, timedelta
from .core import EMAIL_ADDRESSES, config, execute, fetch_all, fetch_one, init_db, send_email
from .core import (
EMAIL_ADDRESSES,
config,
execute,
fetch_all,
fetch_one,
init_db,
send_email,
utcnow,
utcnow_iso,
)
from .i18n import get_translations
# Task handlers registry
@@ -29,7 +39,7 @@ def _email_wrap(body: str, lang: str = "en", preheader: str = "") -> str:
preheader: hidden preview text shown in email client list views.
"""
year = datetime.utcnow().year
year = utcnow().year
tagline = _t("email_footer_tagline", lang)
copyright_text = _t("email_footer_copyright", lang, year=year, app_name=config.APP_NAME)
# Hidden preheader trick: visible text + invisible padding to prevent
@@ -132,15 +142,15 @@ async def enqueue(task_name: str, payload: dict = None, run_at: datetime = None)
(
task_name,
json.dumps(payload or {}),
(run_at or datetime.utcnow()).isoformat(),
datetime.utcnow().isoformat(),
(run_at or utcnow()).strftime("%Y-%m-%d %H:%M:%S"),
utcnow_iso(),
),
)
async def get_pending_tasks(limit: int = 10) -> list[dict]:
"""Get pending tasks ready to run."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
return await fetch_all(
"""
SELECT * FROM tasks
@@ -156,7 +166,7 @@ async def mark_complete(task_id: int) -> None:
"""Mark task as completed."""
await execute(
"UPDATE tasks SET status = 'complete', completed_at = ? WHERE id = ?",
(datetime.utcnow().isoformat(), task_id),
(utcnow_iso(), task_id),
)
@@ -167,7 +177,7 @@ async def mark_failed(task_id: int, error: str, retries: int) -> None:
if retries < max_retries:
# Exponential backoff: 1min, 5min, 25min
delay = timedelta(minutes=5**retries)
run_at = datetime.utcnow() + delay
run_at = utcnow() + delay
await execute(
"""
@@ -385,13 +395,13 @@ async def handle_send_waitlist_confirmation(payload: dict) -> None:
@task("cleanup_expired_tokens")
async def handle_cleanup_tokens(payload: dict) -> None:
"""Clean up expired auth tokens."""
await execute("DELETE FROM auth_tokens WHERE expires_at < ?", (datetime.utcnow().isoformat(),))
await execute("DELETE FROM auth_tokens WHERE expires_at < ?", (utcnow_iso(),))
@task("cleanup_rate_limits")
async def handle_cleanup_rate_limits(payload: dict) -> None:
"""Clean up old rate limit entries."""
cutoff = (datetime.utcnow() - timedelta(hours=1)).isoformat()
cutoff = (utcnow() - timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S")
await execute("DELETE FROM rate_limits WHERE timestamp < ?", (cutoff,))
@@ -497,7 +507,7 @@ async def handle_send_lead_forward_email(payload: dict) -> None:
)
# Update email_sent_at on lead_forward
now = datetime.utcnow().isoformat()
now = utcnow_iso()
await execute(
"UPDATE lead_forwards SET email_sent_at = ? WHERE lead_id = ? AND supplier_id = ?",
(now, lead_id, supplier_id),
@@ -621,7 +631,7 @@ async def handle_generate_business_plan(payload: dict) -> None:
file_path.write_bytes(pdf_bytes)
# Update record
now = datetime.utcnow().isoformat()
now = utcnow_iso()
await execute(
"UPDATE business_plan_exports SET status = 'ready', file_path = ?, completed_at = ? WHERE id = ?",
(str(file_path), now, export_id),
@@ -664,7 +674,7 @@ async def handle_generate_business_plan(payload: dict) -> None:
@task("cleanup_old_tasks")
async def handle_cleanup_tasks(payload: dict) -> None:
"""Clean up completed/failed tasks older than 7 days."""
cutoff = (datetime.utcnow() - timedelta(days=7)).isoformat()
cutoff = (utcnow() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
await execute(
"DELETE FROM tasks WHERE status IN ('complete', 'failed') AND created_at < ?", (cutoff,)
)
@@ -791,9 +801,7 @@ async def run_scheduler() -> None:
await enqueue("cleanup_old_tasks")
# Monthly credit refill — run on the 1st of each month
from datetime import datetime
today = datetime.utcnow()
today = utcnow()
this_month = f"{today.year}-{today.month:02d}"
if today.day == 1 and last_credit_refill != this_month:
await enqueue("refill_monthly_credits")

View File

@@ -8,7 +8,7 @@ sitemap integration, admin CRUD routes, and path collision prevention.
import importlib
import json
import sqlite3
from datetime import date, datetime
from datetime import date
from pathlib import Path
import pytest
@@ -19,7 +19,7 @@ from padelnomics.content.routes import (
bake_scenario_cards,
is_reserved_path,
)
from padelnomics.core import execute, fetch_all, fetch_one, slugify
from padelnomics.core import execute, fetch_all, fetch_one, slugify, utcnow_iso
from padelnomics.planner.calculator import calc, validate_state
SCHEMA_PATH = Path(__file__).parent.parent / "src" / "padelnomics" / "migrations" / "schema.sql"
@@ -70,7 +70,7 @@ async def _create_published_scenario(slug="test-scenario", city="TestCity", coun
async def _create_article(slug="test-article", url_path="/test-article",
status="published", published_at=None):
"""Insert an article row, return its id."""
pub = published_at or datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
pub = published_at or utcnow_iso()
return await execute(
"""INSERT INTO articles
(url_path, slug, title, meta_description, country, region,
@@ -936,8 +936,7 @@ class TestRouteRegistration:
@pytest.fixture
async def admin_client(app, db):
"""Test client with admin user (has admin role)."""
from datetime import datetime
now = datetime.utcnow().isoformat()
now = utcnow_iso()
async with db.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("admin@test.com", "Admin", now),

View File

@@ -3,9 +3,8 @@ Tests for the credit system (credits.py).
Pure SQL operations against real in-memory SQLite — no mocking needed.
"""
from datetime import datetime
import pytest
from padelnomics.core import utcnow_iso
from padelnomics.credits import (
InsufficientCredits,
add_credits,
@@ -24,7 +23,7 @@ from padelnomics.credits import (
@pytest.fixture
async def supplier(db):
"""Supplier with credit_balance=100, monthly_credits=30, tier=growth."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
async with db.execute(
"""INSERT INTO suppliers
(name, slug, country_code, region, category, tier,
@@ -41,7 +40,7 @@ async def supplier(db):
@pytest.fixture
async def lead(db):
"""Lead request with heat_score=warm, credit_cost=20."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
async with db.execute(
"""INSERT INTO lead_requests
(lead_type, heat_score, credit_cost, status, created_at)
@@ -154,7 +153,7 @@ class TestAlreadyUnlocked:
assert await already_unlocked(supplier["id"], lead["id"]) is False
async def test_returns_true_after_unlock(self, db, supplier, lead):
now = datetime.utcnow().isoformat()
now = utcnow_iso()
await db.execute(
"""INSERT INTO lead_forwards (lead_id, supplier_id, credit_cost, created_at)
VALUES (?, ?, 20, ?)""",
@@ -210,7 +209,7 @@ class TestUnlockLead:
async def test_raises_insufficient_credits(self, db, lead):
"""Supplier with only 5 credits tries to unlock a 20-credit lead."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
async with db.execute(
"""INSERT INTO suppliers
(name, slug, country_code, region, category, tier,
@@ -247,7 +246,7 @@ class TestMonthlyRefill:
async def test_noop_when_no_monthly_credits(self, db):
"""Supplier with monthly_credits=0 gets no refill."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
async with db.execute(
"""INSERT INTO suppliers
(name, slug, country_code, region, category, tier,

View File

@@ -7,15 +7,13 @@ Integration tests exercise full request/response flows via Quart test client.
"""
import sqlite3
from datetime import datetime
from pathlib import Path
from unittest.mock import AsyncMock, patch
import pytest
from padelnomics import core
from padelnomics.core import utcnow_iso
from padelnomics.migrations.migrate import migrate
from padelnomics import core
# ── Fixtures & helpers ────────────────────────────────────────────
@@ -30,7 +28,7 @@ def mock_csrf_validation():
@pytest.fixture
async def admin_client(app, db):
"""Test client with an admin-role user session (module-level, follows test_content.py)."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
async with db.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("flags_admin@test.com", "Flags Admin", now),
@@ -293,8 +291,9 @@ class TestLeadUnlockGate:
@pytest.mark.asyncio
async def test_route_imports_is_flag_enabled(self):
"""suppliers/routes.py imports is_flag_enabled (gate is wired up)."""
from padelnomics.suppliers.routes import unlock_lead
import inspect
from padelnomics.suppliers.routes import unlock_lead
src = inspect.getsource(unlock_lead)
assert "is_flag_enabled" in src
assert "lead_unlock" in src

View File

@@ -1,9 +1,10 @@
"""Tests for the SEO metrics module: queries, sync functions, admin routes."""
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from padelnomics.core import utcnow_iso
from padelnomics.seo._queries import (
cleanup_old_metrics,
get_article_scorecard,
@@ -21,11 +22,11 @@ from padelnomics import core
# ── Fixtures ──────────────────────────────────────────────────
def _today():
return datetime.utcnow().strftime("%Y-%m-%d")
return datetime.now(UTC).strftime("%Y-%m-%d")
def _days_ago(n: int) -> str:
return (datetime.utcnow() - timedelta(days=n)).strftime("%Y-%m-%d")
return (datetime.now(UTC) - timedelta(days=n)).strftime("%Y-%m-%d")
@pytest.fixture
@@ -72,7 +73,7 @@ async def seo_data(db):
@pytest.fixture
async def articles_data(db, seo_data):
"""Create articles that match the SEO data URLs."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
pub = _days_ago(10)
for title, url, tpl, lang in [
@@ -91,7 +92,7 @@ async def articles_data(db, seo_data):
@pytest.fixture
async def admin_client(app, db):
"""Authenticated admin client."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
async with db.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("admin@test.com", "Admin", now),
@@ -258,7 +259,7 @@ class TestSyncStatus:
"""Tests for get_sync_status()."""
async def test_returns_last_sync_per_source(self, db):
now = datetime.utcnow().isoformat()
now = utcnow_iso()
await db.execute(
"""INSERT INTO seo_sync_log (source, status, rows_synced, started_at, completed_at, duration_ms)
VALUES ('gsc', 'success', 100, ?, ?, 500)""",
@@ -286,7 +287,7 @@ class TestCleanupOldMetrics:
"""Tests for cleanup_old_metrics()."""
async def test_deletes_old_data(self, db):
old_date = (datetime.utcnow() - timedelta(days=400)).strftime("%Y-%m-%d")
old_date = (datetime.now(UTC) - timedelta(days=400)).strftime("%Y-%m-%d")
recent_date = _today()
await db.execute(

View File

@@ -8,19 +8,16 @@ supervisor.py lives in src/padelnomics/ (not a uv workspace package), so we
add src/ to sys.path before importing.
"""
import sys
# Load supervisor.py directly by path — avoids clashing with the web app's
# 'padelnomics' namespace (which is the installed web package).
import importlib.util as _ilu
import textwrap
import tomllib
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# Load supervisor.py directly by path — avoids clashing with the web app's
# 'padelnomics' namespace (which is the installed web package).
import importlib.util as _ilu
_SUP_PATH = Path(__file__).parent.parent.parent / "src" / "padelnomics" / "supervisor.py"
_spec = _ilu.spec_from_file_location("padelnomics_supervisor", _SUP_PATH)
sup = _ilu.module_from_spec(_spec)
@@ -32,7 +29,6 @@ from padelnomics_extract.proxy import (
make_sticky_selector,
)
# ── load_workflows ────────────────────────────────────────────────

View File

@@ -5,11 +5,12 @@ POST real webhook payloads to /billing/webhook/paddle and verify DB state.
Uses the existing client, db, sign_payload from conftest.
"""
import json
from datetime import datetime
from datetime import UTC, datetime
from unittest.mock import AsyncMock, patch
import pytest
from conftest import sign_payload
from padelnomics.core import utcnow_iso
WEBHOOK_PATH = "/billing/webhook/paddle"
SIG_HEADER = "Paddle-Signature"
@@ -21,7 +22,7 @@ SIG_HEADER = "Paddle-Signature"
@pytest.fixture
async def supplier(db):
"""Supplier with tier=free, credit_balance=0."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
async with db.execute(
"""INSERT INTO suppliers
(name, slug, country_code, region, category, tier,
@@ -38,7 +39,7 @@ async def supplier(db):
@pytest.fixture
async def paddle_products(db):
"""Insert paddle_products rows for all keys the handlers need."""
now = datetime.utcnow().isoformat()
now = utcnow_iso()
products = [
("credits_25", "pri_credits25", "Credit Pack 25", 999, "one_time"),
("credits_100", "pri_credits100", "Credit Pack 100", 3290, "one_time"),
@@ -175,7 +176,7 @@ class TestStickyBoostPurchase:
assert boosts[0][1] == "active"
# expires_at should be ~7 days from now
expires = datetime.fromisoformat(boosts[0][2])
assert abs((expires - datetime.utcnow()).days - 7) <= 1
assert abs((expires - datetime.now(UTC).replace(tzinfo=None)).days - 7) <= 1
# Verify sticky_until set on supplier
sup = await db.execute_fetchall(
@@ -202,7 +203,7 @@ class TestStickyBoostPurchase:
assert len(boosts) == 1
assert boosts[0][0] == "sticky_month"
expires = datetime.fromisoformat(boosts[0][1])
assert abs((expires - datetime.utcnow()).days - 30) <= 1
assert abs((expires - datetime.now(UTC).replace(tzinfo=None)).days - 30) <= 1
async def test_sticky_boost_sets_country(self, client, db, supplier, paddle_products):
payload = make_transaction_payload(
@@ -387,7 +388,7 @@ class TestBusinessPlanPurchase:
self, client, db, supplier, paddle_products, test_user,
):
# Need a scenario for the export
now = datetime.utcnow().isoformat()
now = utcnow_iso()
async with db.execute(
"""INSERT INTO scenarios (user_id, name, state_json, created_at)
VALUES (?, 'Test Scenario', '{}', ?)""",