Update from Copier template v0.4.0
- Accept RBAC system: user_roles table, role_required decorator, grant_role/revoke_role/ensure_admin_role functions - Accept improved billing architecture: billing_customers table separation, provider-agnostic naming - Accept enhanced user loading with subscription/roles eager loading in app.py - Accept improved email templates with branded styling - Accept new infrastructure: migration tracking, transaction logging, A/B testing - Accept template improvements: Resend SDK, Tailwind build stage, UMAMI analytics config - Keep beanflows-specific configs: BASE_URL 5001, coffee PLAN_FEATURES/PLAN_LIMITS - Keep beanflows analytics integration and DuckDB health check - Add new test files and utility scripts from template Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com>
This commit is contained in:
242
web/tests/test_roles.py
Normal file
242
web/tests/test_roles.py
Normal file
@@ -0,0 +1,242 @@
|
||||
"""
|
||||
Tests for role-based access control: role_required decorator, grant/revoke/ensure_admin_role,
|
||||
and admin route protection.
|
||||
"""
|
||||
import pytest
|
||||
from quart import Blueprint
|
||||
|
||||
from beanflows.auth.routes import (
|
||||
ensure_admin_role,
|
||||
grant_role,
|
||||
revoke_role,
|
||||
role_required,
|
||||
)
|
||||
from beanflows import core
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════
|
||||
# grant_role / revoke_role
|
||||
# ════════════════════════════════════════════════════════════
|
||||
|
||||
class TestGrantRole:
|
||||
async def test_grants_role(self, db, test_user):
|
||||
await grant_role(test_user["id"], "admin")
|
||||
row = await core.fetch_one(
|
||||
"SELECT role FROM user_roles WHERE user_id = ?",
|
||||
(test_user["id"],),
|
||||
)
|
||||
assert row is not None
|
||||
assert row["role"] == "admin"
|
||||
|
||||
async def test_idempotent(self, db, test_user):
|
||||
await grant_role(test_user["id"], "admin")
|
||||
await grant_role(test_user["id"], "admin")
|
||||
rows = await core.fetch_all(
|
||||
"SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'",
|
||||
(test_user["id"],),
|
||||
)
|
||||
assert len(rows) == 1
|
||||
|
||||
|
||||
class TestRevokeRole:
|
||||
async def test_revokes_existing_role(self, db, test_user):
|
||||
await grant_role(test_user["id"], "admin")
|
||||
await revoke_role(test_user["id"], "admin")
|
||||
row = await core.fetch_one(
|
||||
"SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'",
|
||||
(test_user["id"],),
|
||||
)
|
||||
assert row is None
|
||||
|
||||
async def test_noop_for_missing_role(self, db, test_user):
|
||||
# Should not raise
|
||||
await revoke_role(test_user["id"], "nonexistent")
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════
|
||||
# ensure_admin_role
|
||||
# ════════════════════════════════════════════════════════════
|
||||
|
||||
class TestEnsureAdminRole:
|
||||
async def test_grants_admin_for_listed_email(self, db, test_user):
|
||||
core.config.ADMIN_EMAILS = ["test@example.com"]
|
||||
try:
|
||||
await ensure_admin_role(test_user["id"], "test@example.com")
|
||||
row = await core.fetch_one(
|
||||
"SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'",
|
||||
(test_user["id"],),
|
||||
)
|
||||
assert row is not None
|
||||
finally:
|
||||
core.config.ADMIN_EMAILS = []
|
||||
|
||||
async def test_skips_for_unlisted_email(self, db, test_user):
|
||||
core.config.ADMIN_EMAILS = ["boss@example.com"]
|
||||
try:
|
||||
await ensure_admin_role(test_user["id"], "test@example.com")
|
||||
row = await core.fetch_one(
|
||||
"SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'",
|
||||
(test_user["id"],),
|
||||
)
|
||||
assert row is None
|
||||
finally:
|
||||
core.config.ADMIN_EMAILS = []
|
||||
|
||||
async def test_empty_admin_emails_grants_nothing(self, db, test_user):
|
||||
core.config.ADMIN_EMAILS = []
|
||||
await ensure_admin_role(test_user["id"], "test@example.com")
|
||||
row = await core.fetch_one(
|
||||
"SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'",
|
||||
(test_user["id"],),
|
||||
)
|
||||
assert row is None
|
||||
|
||||
async def test_case_insensitive_matching(self, db, test_user):
|
||||
core.config.ADMIN_EMAILS = ["test@example.com"]
|
||||
try:
|
||||
await ensure_admin_role(test_user["id"], "Test@Example.COM")
|
||||
row = await core.fetch_one(
|
||||
"SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'",
|
||||
(test_user["id"],),
|
||||
)
|
||||
assert row is not None
|
||||
finally:
|
||||
core.config.ADMIN_EMAILS = []
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════
|
||||
# role_required decorator
|
||||
# ════════════════════════════════════════════════════════════
|
||||
|
||||
role_test_bp = Blueprint("role_test", __name__)
|
||||
|
||||
|
||||
@role_test_bp.route("/admin-only")
|
||||
@role_required("admin")
|
||||
async def admin_only_route():
|
||||
return "admin-ok", 200
|
||||
|
||||
|
||||
@role_test_bp.route("/multi-role")
|
||||
@role_required("admin", "editor")
|
||||
async def multi_role_route():
|
||||
return "multi-ok", 200
|
||||
|
||||
|
||||
class TestRoleRequired:
|
||||
@pytest.fixture
|
||||
async def role_app(self, app):
|
||||
app.register_blueprint(role_test_bp)
|
||||
return app
|
||||
|
||||
@pytest.fixture
|
||||
async def role_client(self, role_app):
|
||||
async with role_app.test_client() as c:
|
||||
yield c
|
||||
|
||||
async def test_redirects_unauthenticated(self, role_client, db):
|
||||
response = await role_client.get("/admin-only", follow_redirects=False)
|
||||
assert response.status_code in (302, 303, 307)
|
||||
|
||||
async def test_rejects_user_without_role(self, role_client, db, test_user):
|
||||
async with role_client.session_transaction() as sess:
|
||||
sess["user_id"] = test_user["id"]
|
||||
|
||||
response = await role_client.get("/admin-only", follow_redirects=False)
|
||||
assert response.status_code in (302, 303, 307)
|
||||
|
||||
async def test_allows_user_with_matching_role(self, role_client, db, test_user):
|
||||
await grant_role(test_user["id"], "admin")
|
||||
async with role_client.session_transaction() as sess:
|
||||
sess["user_id"] = test_user["id"]
|
||||
|
||||
response = await role_client.get("/admin-only")
|
||||
assert response.status_code == 200
|
||||
|
||||
async def test_multi_role_allows_any_match(self, role_client, db, test_user):
|
||||
await grant_role(test_user["id"], "editor")
|
||||
async with role_client.session_transaction() as sess:
|
||||
sess["user_id"] = test_user["id"]
|
||||
|
||||
response = await role_client.get("/multi-role")
|
||||
assert response.status_code == 200
|
||||
|
||||
async def test_multi_role_rejects_none(self, role_client, db, test_user):
|
||||
await grant_role(test_user["id"], "viewer")
|
||||
async with role_client.session_transaction() as sess:
|
||||
sess["user_id"] = test_user["id"]
|
||||
|
||||
response = await role_client.get("/multi-role", follow_redirects=False)
|
||||
assert response.status_code in (302, 303, 307)
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════
|
||||
# Admin route protection
|
||||
# ════════════════════════════════════════════════════════════
|
||||
|
||||
class TestAdminRouteProtection:
|
||||
async def test_admin_index_requires_admin_role(self, auth_client, db):
|
||||
response = await auth_client.get("/admin/", follow_redirects=False)
|
||||
assert response.status_code in (302, 303, 307)
|
||||
|
||||
async def test_admin_index_accessible_with_admin_role(self, admin_client, db):
|
||||
response = await admin_client.get("/admin/")
|
||||
assert response.status_code == 200
|
||||
|
||||
async def test_admin_users_requires_admin_role(self, auth_client, db):
|
||||
response = await auth_client.get("/admin/users", follow_redirects=False)
|
||||
assert response.status_code in (302, 303, 307)
|
||||
|
||||
async def test_admin_tasks_requires_admin_role(self, auth_client, db):
|
||||
response = await auth_client.get("/admin/tasks", follow_redirects=False)
|
||||
assert response.status_code in (302, 303, 307)
|
||||
|
||||
|
||||
# ════════════════════════════════════════════════════════════
|
||||
# Impersonation
|
||||
# ════════════════════════════════════════════════════════════
|
||||
|
||||
class TestImpersonation:
|
||||
async def test_impersonate_stores_admin_id(self, admin_client, db, test_user):
|
||||
"""Impersonating stores admin's user_id in session['admin_impersonating']."""
|
||||
# Create a second user to impersonate
|
||||
now = "2025-01-01T00:00:00"
|
||||
other_id = await core.execute(
|
||||
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
|
||||
("other@example.com", "Other", now),
|
||||
)
|
||||
|
||||
async with admin_client.session_transaction() as sess:
|
||||
sess["csrf_token"] = "test_csrf"
|
||||
|
||||
response = await admin_client.post(
|
||||
f"/admin/users/{other_id}/impersonate",
|
||||
form={"csrf_token": "test_csrf"},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert response.status_code in (302, 303, 307)
|
||||
|
||||
async with admin_client.session_transaction() as sess:
|
||||
assert sess["user_id"] == other_id
|
||||
assert sess["admin_impersonating"] == test_user["id"]
|
||||
|
||||
async def test_stop_impersonating_restores_admin(self, app, db, test_user, grant_role):
|
||||
"""Stopping impersonation restores the admin's user_id."""
|
||||
await grant_role(test_user["id"], "admin")
|
||||
|
||||
async with app.test_client() as c:
|
||||
async with c.session_transaction() as sess:
|
||||
sess["user_id"] = 999 # impersonated user
|
||||
sess["admin_impersonating"] = test_user["id"]
|
||||
sess["csrf_token"] = "test_csrf"
|
||||
|
||||
response = await c.post(
|
||||
"/admin/stop-impersonating",
|
||||
form={"csrf_token": "test_csrf"},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert response.status_code in (302, 303, 307)
|
||||
|
||||
async with c.session_transaction() as sess:
|
||||
assert sess["user_id"] == test_user["id"]
|
||||
assert "admin_impersonating" not in sess
|
||||
Reference in New Issue
Block a user