Admin flow: - Remove /admin/login (password-based) and /admin/dev-login routes entirely - admin_required now checks only the 'admin' role; redirects to auth.login - auth/dev-login with an ADMIN_EMAILS address redirects directly to /admin/ - .env.example: replace ADMIN_PASSWORD with ADMIN_EMAILS=admin@beanflows.coffee Dev seeding: - Add dev_seed.py: idempotent upsert of 4 fixed accounts (admin, free, starter, pro) so every access tier is testable after dev_run.sh - dev_run.sh: seed after migrations, show all 4 login shortcuts Regression tests (37 passing): - test_analytics.py: concurrent fetch_analytics calls return correct row counts (cursor thread-safety regression), column names are lowercase - test_roles.py TestAdminAuthFlow: password login routes return 404, admin_required redirects to auth.login, dev-login grants admin role and redirects to admin panel when email is in ADMIN_EMAILS - conftest.py: add mock_analytics fixture (fixes 7 pre-existing dashboard test errors); fix assertion text and lowercase metric param in tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
337 lines
14 KiB
Python
337 lines
14 KiB
Python
"""
|
|
Tests for role-based access control: role_required decorator, grant/revoke/ensure_admin_role,
|
|
and admin route protection.
|
|
"""
|
|
import pytest
|
|
from quart import Blueprint
|
|
|
|
from beanflows.auth.routes import (
|
|
ensure_admin_role,
|
|
grant_role,
|
|
revoke_role,
|
|
role_required,
|
|
)
|
|
from beanflows import core
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# grant_role / revoke_role
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestGrantRole:
|
|
async def test_grants_role(self, db, test_user):
|
|
await grant_role(test_user["id"], "admin")
|
|
row = await core.fetch_one(
|
|
"SELECT role FROM user_roles WHERE user_id = ?",
|
|
(test_user["id"],),
|
|
)
|
|
assert row is not None
|
|
assert row["role"] == "admin"
|
|
|
|
async def test_idempotent(self, db, test_user):
|
|
await grant_role(test_user["id"], "admin")
|
|
await grant_role(test_user["id"], "admin")
|
|
rows = await core.fetch_all(
|
|
"SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'",
|
|
(test_user["id"],),
|
|
)
|
|
assert len(rows) == 1
|
|
|
|
|
|
class TestRevokeRole:
|
|
async def test_revokes_existing_role(self, db, test_user):
|
|
await grant_role(test_user["id"], "admin")
|
|
await revoke_role(test_user["id"], "admin")
|
|
row = await core.fetch_one(
|
|
"SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'",
|
|
(test_user["id"],),
|
|
)
|
|
assert row is None
|
|
|
|
async def test_noop_for_missing_role(self, db, test_user):
|
|
# Should not raise
|
|
await revoke_role(test_user["id"], "nonexistent")
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# ensure_admin_role
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestEnsureAdminRole:
|
|
async def test_grants_admin_for_listed_email(self, db, test_user):
|
|
core.config.ADMIN_EMAILS = ["test@example.com"]
|
|
try:
|
|
await ensure_admin_role(test_user["id"], "test@example.com")
|
|
row = await core.fetch_one(
|
|
"SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'",
|
|
(test_user["id"],),
|
|
)
|
|
assert row is not None
|
|
finally:
|
|
core.config.ADMIN_EMAILS = []
|
|
|
|
async def test_skips_for_unlisted_email(self, db, test_user):
|
|
core.config.ADMIN_EMAILS = ["boss@example.com"]
|
|
try:
|
|
await ensure_admin_role(test_user["id"], "test@example.com")
|
|
row = await core.fetch_one(
|
|
"SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'",
|
|
(test_user["id"],),
|
|
)
|
|
assert row is None
|
|
finally:
|
|
core.config.ADMIN_EMAILS = []
|
|
|
|
async def test_empty_admin_emails_grants_nothing(self, db, test_user):
|
|
core.config.ADMIN_EMAILS = []
|
|
await ensure_admin_role(test_user["id"], "test@example.com")
|
|
row = await core.fetch_one(
|
|
"SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'",
|
|
(test_user["id"],),
|
|
)
|
|
assert row is None
|
|
|
|
async def test_case_insensitive_matching(self, db, test_user):
|
|
core.config.ADMIN_EMAILS = ["test@example.com"]
|
|
try:
|
|
await ensure_admin_role(test_user["id"], "Test@Example.COM")
|
|
row = await core.fetch_one(
|
|
"SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'",
|
|
(test_user["id"],),
|
|
)
|
|
assert row is not None
|
|
finally:
|
|
core.config.ADMIN_EMAILS = []
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# role_required decorator
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
role_test_bp = Blueprint("role_test", __name__)
|
|
|
|
|
|
@role_test_bp.route("/admin-only")
|
|
@role_required("admin")
|
|
async def admin_only_route():
|
|
return "admin-ok", 200
|
|
|
|
|
|
@role_test_bp.route("/multi-role")
|
|
@role_required("admin", "editor")
|
|
async def multi_role_route():
|
|
return "multi-ok", 200
|
|
|
|
|
|
class TestRoleRequired:
|
|
@pytest.fixture
|
|
async def role_app(self, app):
|
|
app.register_blueprint(role_test_bp)
|
|
return app
|
|
|
|
@pytest.fixture
|
|
async def role_client(self, role_app):
|
|
async with role_app.test_client() as c:
|
|
yield c
|
|
|
|
async def test_redirects_unauthenticated(self, role_client, db):
|
|
response = await role_client.get("/admin-only", follow_redirects=False)
|
|
assert response.status_code in (302, 303, 307)
|
|
|
|
async def test_rejects_user_without_role(self, role_client, db, test_user):
|
|
async with role_client.session_transaction() as sess:
|
|
sess["user_id"] = test_user["id"]
|
|
|
|
response = await role_client.get("/admin-only", follow_redirects=False)
|
|
assert response.status_code in (302, 303, 307)
|
|
|
|
async def test_allows_user_with_matching_role(self, role_client, db, test_user):
|
|
await grant_role(test_user["id"], "admin")
|
|
async with role_client.session_transaction() as sess:
|
|
sess["user_id"] = test_user["id"]
|
|
|
|
response = await role_client.get("/admin-only")
|
|
assert response.status_code == 200
|
|
|
|
async def test_multi_role_allows_any_match(self, role_client, db, test_user):
|
|
await grant_role(test_user["id"], "editor")
|
|
async with role_client.session_transaction() as sess:
|
|
sess["user_id"] = test_user["id"]
|
|
|
|
response = await role_client.get("/multi-role")
|
|
assert response.status_code == 200
|
|
|
|
async def test_multi_role_rejects_none(self, role_client, db, test_user):
|
|
await grant_role(test_user["id"], "viewer")
|
|
async with role_client.session_transaction() as sess:
|
|
sess["user_id"] = test_user["id"]
|
|
|
|
response = await role_client.get("/multi-role", follow_redirects=False)
|
|
assert response.status_code in (302, 303, 307)
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# Admin route protection
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestAdminRouteProtection:
|
|
async def test_admin_index_requires_admin_role(self, auth_client, db):
|
|
response = await auth_client.get("/admin/", follow_redirects=False)
|
|
assert response.status_code in (302, 303, 307)
|
|
|
|
async def test_admin_index_accessible_with_admin_role(self, admin_client, db):
|
|
response = await admin_client.get("/admin/")
|
|
assert response.status_code == 200
|
|
|
|
async def test_admin_users_requires_admin_role(self, auth_client, db):
|
|
response = await auth_client.get("/admin/users", follow_redirects=False)
|
|
assert response.status_code in (302, 303, 307)
|
|
|
|
async def test_admin_tasks_requires_admin_role(self, auth_client, db):
|
|
response = await auth_client.get("/admin/tasks", follow_redirects=False)
|
|
assert response.status_code in (302, 303, 307)
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# Impersonation
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestImpersonation:
|
|
async def test_impersonate_stores_admin_id(self, admin_client, db, test_user):
|
|
"""Impersonating stores admin's user_id in session['admin_impersonating']."""
|
|
# Create a second user to impersonate
|
|
now = "2025-01-01T00:00:00"
|
|
other_id = await core.execute(
|
|
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
|
|
("other@example.com", "Other", now),
|
|
)
|
|
|
|
async with admin_client.session_transaction() as sess:
|
|
sess["csrf_token"] = "test_csrf"
|
|
|
|
response = await admin_client.post(
|
|
f"/admin/users/{other_id}/impersonate",
|
|
form={"csrf_token": "test_csrf"},
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code in (302, 303, 307)
|
|
|
|
async with admin_client.session_transaction() as sess:
|
|
assert sess["user_id"] == other_id
|
|
assert sess["admin_impersonating"] == test_user["id"]
|
|
|
|
async def test_stop_impersonating_restores_admin(self, app, db, test_user, grant_role):
|
|
"""Stopping impersonation restores the admin's user_id."""
|
|
await grant_role(test_user["id"], "admin")
|
|
|
|
async with app.test_client() as c:
|
|
async with c.session_transaction() as sess:
|
|
sess["user_id"] = 999 # impersonated user
|
|
sess["admin_impersonating"] = test_user["id"]
|
|
sess["csrf_token"] = "test_csrf"
|
|
|
|
response = await c.post(
|
|
"/admin/stop-impersonating",
|
|
form={"csrf_token": "test_csrf"},
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code in (302, 303, 307)
|
|
|
|
async with c.session_transaction() as sess:
|
|
assert sess["user_id"] == test_user["id"]
|
|
assert "admin_impersonating" not in sess
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# Admin auth flow regressions
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestAdminAuthFlow:
|
|
"""Regression tests for the admin authentication flow.
|
|
|
|
Previously the admin panel used a password-based login (/admin/login).
|
|
That has been removed; the only way in is via the 'admin' role, granted
|
|
through ADMIN_EMAILS + auth/dev-login (dev) or the user_roles table (prod).
|
|
"""
|
|
|
|
async def test_admin_login_route_removed(self, client):
|
|
"""
|
|
Regression: /admin/login existed as a password-based route.
|
|
It has been removed; any request should 404.
|
|
"""
|
|
response = await client.get("/admin/login")
|
|
assert response.status_code == 404
|
|
|
|
async def test_admin_dev_login_route_removed(self, client):
|
|
"""
|
|
Regression: /admin/dev-login set session['is_admin']=True without
|
|
a user_id, making the user invisible to load_user and breaking
|
|
everything that checked g.user. It has been removed.
|
|
"""
|
|
response = await client.get("/admin/dev-login")
|
|
assert response.status_code == 404
|
|
|
|
async def test_admin_required_redirects_to_auth_login(self, auth_client, db):
|
|
"""
|
|
Regression: admin_required used to redirect to admin.login (the
|
|
password page). Now it redirects to auth.login.
|
|
"""
|
|
response = await auth_client.get("/admin/", follow_redirects=False)
|
|
assert response.status_code in (302, 303, 307)
|
|
location = response.headers.get("Location", "")
|
|
assert "/auth/login" in location, (
|
|
f"Expected redirect to /auth/login, got: {location}"
|
|
)
|
|
|
|
async def test_dev_login_grants_admin_role_for_admin_email(
|
|
self, app, db, monkeypatch
|
|
):
|
|
"""
|
|
Regression: auth/dev-login was not granting the admin role because
|
|
ADMIN_EMAILS was empty (missing from .env). The role must be granted
|
|
when the email matches ADMIN_EMAILS.
|
|
"""
|
|
from beanflows import core as _core
|
|
|
|
monkeypatch.setattr(_core.config, "ADMIN_EMAILS", ["admin@example.com"])
|
|
|
|
async with app.test_client() as c:
|
|
response = await c.get(
|
|
"/auth/dev-login?email=admin@example.com",
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code in (302, 303, 307)
|
|
|
|
# user_id must be set in session
|
|
async with c.session_transaction() as sess:
|
|
user_id = sess.get("user_id")
|
|
assert user_id is not None
|
|
|
|
# admin role must exist in the DB
|
|
row = await core.fetch_one(
|
|
"SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'",
|
|
(user_id,),
|
|
)
|
|
assert row is not None, "admin role was not granted via dev-login"
|
|
|
|
async def test_dev_login_admin_redirects_to_admin_panel(
|
|
self, app, db, monkeypatch
|
|
):
|
|
"""
|
|
Regression: auth/dev-login redirected everyone to dashboard.index.
|
|
Admin-email logins should land directly on /admin/.
|
|
"""
|
|
from beanflows import core as _core
|
|
|
|
monkeypatch.setattr(_core.config, "ADMIN_EMAILS", ["admin@example.com"])
|
|
|
|
async with app.test_client() as c:
|
|
response = await c.get(
|
|
"/auth/dev-login?email=admin@example.com",
|
|
follow_redirects=False,
|
|
)
|
|
location = response.headers.get("Location", "")
|
|
assert "/admin" in location, (
|
|
f"Admin dev-login should redirect to /admin, got: {location}"
|
|
)
|