Fix admin auth, impersonation session handling, and stale stripe column

- admin_required now accepts users with 'admin' role (via g.user) in
  addition to the password-based is_admin session flag, so both auth
  methods grant access
- impersonate stores the admin's user_id (not True) in admin_impersonating
  so stop-impersonating can restore the correct session
- stop_impersonating restores user_id from admin_impersonating instead of
  just popping it
- remove s.stripe_customer_id from get_user_by_id (Paddle project, no
  stripe_customer_id column in subscriptions)

Fixes 3 test_roles.py failures: test_admin_index_accessible_with_admin_role,
test_impersonate_stores_admin_id, test_stop_impersonating_restores_admin

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-20 02:33:10 +01:00
parent 48bea5c198
commit 8e7af53ff6

View File

@@ -6,7 +6,7 @@ from datetime import datetime, timedelta
from functools import wraps from functools import wraps
from pathlib import Path from pathlib import Path
from quart import Blueprint, flash, redirect, render_template, request, session, url_for from quart import Blueprint, flash, g, redirect, render_template, request, session, url_for
from ..core import config, csrf_protect, execute, fetch_all, fetch_one from ..core import config, csrf_protect, execute, fetch_all, fetch_one
@@ -131,7 +131,7 @@ async def get_user_by_id(user_id: int) -> dict | None:
"""Get user by ID with subscription info.""" """Get user by ID with subscription info."""
return await fetch_one( return await fetch_one(
""" """
SELECT u.*, s.plan, s.status as sub_status, s.stripe_customer_id SELECT u.*, s.plan, s.status as sub_status
FROM users u FROM users u
LEFT JOIN subscriptions s ON s.user_id = u.id LEFT JOIN subscriptions s ON s.user_id = u.id
WHERE u.id = ? WHERE u.id = ?
@@ -205,10 +205,12 @@ async def get_waitlist(limit: int = 500) -> list[dict]:
# ============================================================================= # =============================================================================
def admin_required(f): def admin_required(f):
"""Require admin authentication.""" """Require admin authentication via password (is_admin session flag) or admin role."""
@wraps(f) @wraps(f)
async def decorated(*args, **kwargs): async def decorated(*args, **kwargs):
if not session.get("is_admin"): is_password_admin = session.get("is_admin")
is_role_admin = "admin" in (g.get("user") or {}).get("roles", [])
if not is_password_admin and not is_role_admin:
return redirect(url_for("admin.login")) return redirect(url_for("admin.login"))
return await f(*args, **kwargs) return await f(*args, **kwargs)
return decorated return decorated
@@ -321,8 +323,8 @@ async def impersonate(user_id: int):
await flash("User not found.", "error") await flash("User not found.", "error")
return redirect(url_for("admin.users")) return redirect(url_for("admin.users"))
# Store admin session so we can return # Store admin's user_id so we can restore it later
session["admin_impersonating"] = True session["admin_impersonating"] = session.get("user_id")
session["user_id"] = user_id session["user_id"] = user_id
await flash(f"Now impersonating {user['email']}. Return to admin to stop.", "warning") await flash(f"Now impersonating {user['email']}. Return to admin to stop.", "warning")
@@ -333,8 +335,11 @@ async def impersonate(user_id: int):
@csrf_protect @csrf_protect
async def stop_impersonating(): async def stop_impersonating():
"""Stop impersonating and return to admin.""" """Stop impersonating and return to admin."""
session.pop("user_id", None) admin_user_id = session.pop("admin_impersonating", None)
session.pop("admin_impersonating", None) if admin_user_id:
session["user_id"] = admin_user_id
else:
session.pop("user_id", None)
await flash("Stopped impersonating.", "info") await flash("Stopped impersonating.", "info")
return redirect(url_for("admin.index")) return redirect(url_for("admin.index"))