From 8e7af53ff62ca9300b4c29a36a5c7756a67c074d Mon Sep 17 00:00:00 2001 From: Deeman Date: Fri, 20 Feb 2026 02:33:10 +0100 Subject: [PATCH] Fix admin auth, impersonation session handling, and stale stripe column - admin_required now accepts users with 'admin' role (via g.user) in addition to the password-based is_admin session flag, so both auth methods grant access - impersonate stores the admin's user_id (not True) in admin_impersonating so stop-impersonating can restore the correct session - stop_impersonating restores user_id from admin_impersonating instead of just popping it - remove s.stripe_customer_id from get_user_by_id (Paddle project, no stripe_customer_id column in subscriptions) Fixes 3 test_roles.py failures: test_admin_index_accessible_with_admin_role, test_impersonate_stores_admin_id, test_stop_impersonating_restores_admin Co-Authored-By: Claude Sonnet 4.6 --- web/src/beanflows/admin/routes.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/web/src/beanflows/admin/routes.py b/web/src/beanflows/admin/routes.py index 2b13619..7b5e793 100644 --- a/web/src/beanflows/admin/routes.py +++ b/web/src/beanflows/admin/routes.py @@ -6,7 +6,7 @@ from datetime import datetime, timedelta from functools import wraps from pathlib import Path -from quart import Blueprint, flash, redirect, render_template, request, session, url_for +from quart import Blueprint, flash, g, redirect, render_template, request, session, url_for from ..core import config, csrf_protect, execute, fetch_all, fetch_one @@ -131,7 +131,7 @@ async def get_user_by_id(user_id: int) -> dict | None: """Get user by ID with subscription info.""" return await fetch_one( """ - SELECT u.*, s.plan, s.status as sub_status, s.stripe_customer_id + SELECT u.*, s.plan, s.status as sub_status FROM users u LEFT JOIN subscriptions s ON s.user_id = u.id WHERE u.id = ? @@ -205,10 +205,12 @@ async def get_waitlist(limit: int = 500) -> list[dict]: # ============================================================================= def admin_required(f): - """Require admin authentication.""" + """Require admin authentication via password (is_admin session flag) or admin role.""" @wraps(f) async def decorated(*args, **kwargs): - if not session.get("is_admin"): + is_password_admin = session.get("is_admin") + is_role_admin = "admin" in (g.get("user") or {}).get("roles", []) + if not is_password_admin and not is_role_admin: return redirect(url_for("admin.login")) return await f(*args, **kwargs) return decorated @@ -321,8 +323,8 @@ async def impersonate(user_id: int): await flash("User not found.", "error") return redirect(url_for("admin.users")) - # Store admin session so we can return - session["admin_impersonating"] = True + # Store admin's user_id so we can restore it later + session["admin_impersonating"] = session.get("user_id") session["user_id"] = user_id await flash(f"Now impersonating {user['email']}. Return to admin to stop.", "warning") @@ -333,8 +335,11 @@ async def impersonate(user_id: int): @csrf_protect async def stop_impersonating(): """Stop impersonating and return to admin.""" - session.pop("user_id", None) - session.pop("admin_impersonating", None) + admin_user_id = session.pop("admin_impersonating", None) + if admin_user_id: + session["user_id"] = admin_user_id + else: + session.pop("user_id", None) await flash("Stopped impersonating.", "info") return redirect(url_for("admin.index"))