""" Tests for role-based access control: role_required decorator, grant/revoke/ensure_admin_role, and admin route protection. """ import pytest from beanflows import core from beanflows.auth.routes import ( ensure_admin_role, grant_role, revoke_role, role_required, ) from quart import Blueprint # ════════════════════════════════════════════════════════════ # grant_role / revoke_role # ════════════════════════════════════════════════════════════ class TestGrantRole: async def test_grants_role(self, db, test_user): await grant_role(test_user["id"], "admin") row = await core.fetch_one( "SELECT role FROM user_roles WHERE user_id = ?", (test_user["id"],), ) assert row is not None assert row["role"] == "admin" async def test_idempotent(self, db, test_user): await grant_role(test_user["id"], "admin") await grant_role(test_user["id"], "admin") rows = await core.fetch_all( "SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'", (test_user["id"],), ) assert len(rows) == 1 class TestRevokeRole: async def test_revokes_existing_role(self, db, test_user): await grant_role(test_user["id"], "admin") await revoke_role(test_user["id"], "admin") row = await core.fetch_one( "SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'", (test_user["id"],), ) assert row is None async def test_noop_for_missing_role(self, db, test_user): # Should not raise await revoke_role(test_user["id"], "nonexistent") # ════════════════════════════════════════════════════════════ # ensure_admin_role # ════════════════════════════════════════════════════════════ class TestEnsureAdminRole: async def test_grants_admin_for_listed_email(self, db, test_user): core.config.ADMIN_EMAILS = ["test@example.com"] try: await ensure_admin_role(test_user["id"], "test@example.com") row = await core.fetch_one( "SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'", (test_user["id"],), ) assert row is not None finally: core.config.ADMIN_EMAILS = [] async def test_skips_for_unlisted_email(self, db, test_user): core.config.ADMIN_EMAILS = ["boss@example.com"] try: await ensure_admin_role(test_user["id"], "test@example.com") row = await core.fetch_one( "SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'", (test_user["id"],), ) assert row is None finally: core.config.ADMIN_EMAILS = [] async def test_empty_admin_emails_grants_nothing(self, db, test_user): core.config.ADMIN_EMAILS = [] await ensure_admin_role(test_user["id"], "test@example.com") row = await core.fetch_one( "SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'", (test_user["id"],), ) assert row is None async def test_case_insensitive_matching(self, db, test_user): core.config.ADMIN_EMAILS = ["test@example.com"] try: await ensure_admin_role(test_user["id"], "Test@Example.COM") row = await core.fetch_one( "SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'", (test_user["id"],), ) assert row is not None finally: core.config.ADMIN_EMAILS = [] # ════════════════════════════════════════════════════════════ # role_required decorator # ════════════════════════════════════════════════════════════ role_test_bp = Blueprint("role_test", __name__) @role_test_bp.route("/admin-only") @role_required("admin") async def admin_only_route(): return "admin-ok", 200 @role_test_bp.route("/multi-role") @role_required("admin", "editor") async def multi_role_route(): return "multi-ok", 200 class TestRoleRequired: @pytest.fixture async def role_app(self, app): app.register_blueprint(role_test_bp) return app @pytest.fixture async def role_client(self, role_app): async with role_app.test_client() as c: yield c async def test_redirects_unauthenticated(self, role_client, db): response = await role_client.get("/admin-only", follow_redirects=False) assert response.status_code in (302, 303, 307) async def test_rejects_user_without_role(self, role_client, db, test_user): async with role_client.session_transaction() as sess: sess["user_id"] = test_user["id"] response = await role_client.get("/admin-only", follow_redirects=False) assert response.status_code in (302, 303, 307) async def test_allows_user_with_matching_role(self, role_client, db, test_user): await grant_role(test_user["id"], "admin") async with role_client.session_transaction() as sess: sess["user_id"] = test_user["id"] response = await role_client.get("/admin-only") assert response.status_code == 200 async def test_multi_role_allows_any_match(self, role_client, db, test_user): await grant_role(test_user["id"], "editor") async with role_client.session_transaction() as sess: sess["user_id"] = test_user["id"] response = await role_client.get("/multi-role") assert response.status_code == 200 async def test_multi_role_rejects_none(self, role_client, db, test_user): await grant_role(test_user["id"], "viewer") async with role_client.session_transaction() as sess: sess["user_id"] = test_user["id"] response = await role_client.get("/multi-role", follow_redirects=False) assert response.status_code in (302, 303, 307) # ════════════════════════════════════════════════════════════ # Admin route protection # ════════════════════════════════════════════════════════════ class TestAdminRouteProtection: async def test_admin_index_requires_admin_role(self, auth_client, db): response = await auth_client.get("/admin/", follow_redirects=False) assert response.status_code in (302, 303, 307) async def test_admin_index_accessible_with_admin_role(self, admin_client, db): response = await admin_client.get("/admin/") assert response.status_code == 200 async def test_admin_users_requires_admin_role(self, auth_client, db): response = await auth_client.get("/admin/users", follow_redirects=False) assert response.status_code in (302, 303, 307) async def test_admin_tasks_requires_admin_role(self, auth_client, db): response = await auth_client.get("/admin/tasks", follow_redirects=False) assert response.status_code in (302, 303, 307) # ════════════════════════════════════════════════════════════ # Impersonation # ════════════════════════════════════════════════════════════ class TestImpersonation: async def test_impersonate_stores_admin_id(self, admin_client, db, test_user): """Impersonating stores admin's user_id in session['admin_impersonating'].""" # Create a second user to impersonate now = "2025-01-01T00:00:00" other_id = await core.execute( "INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)", ("other@example.com", "Other", now), ) async with admin_client.session_transaction() as sess: sess["csrf_token"] = "test_csrf" response = await admin_client.post( f"/admin/users/{other_id}/impersonate", form={"csrf_token": "test_csrf"}, follow_redirects=False, ) assert response.status_code in (302, 303, 307) async with admin_client.session_transaction() as sess: assert sess["user_id"] == other_id assert sess["admin_impersonating"] == test_user["id"] async def test_stop_impersonating_restores_admin(self, app, db, test_user, grant_role): """Stopping impersonation restores the admin's user_id.""" await grant_role(test_user["id"], "admin") async with app.test_client() as c: async with c.session_transaction() as sess: sess["user_id"] = 999 # impersonated user sess["admin_impersonating"] = test_user["id"] sess["csrf_token"] = "test_csrf" response = await c.post( "/admin/stop-impersonating", form={"csrf_token": "test_csrf"}, follow_redirects=False, ) assert response.status_code in (302, 303, 307) async with c.session_transaction() as sess: assert sess["user_id"] == test_user["id"] assert "admin_impersonating" not in sess # ════════════════════════════════════════════════════════════ # Admin auth flow regressions # ════════════════════════════════════════════════════════════ class TestAdminAuthFlow: """Regression tests for the admin authentication flow. Previously the admin panel used a password-based login (/admin/login). That has been removed; the only way in is via the 'admin' role, granted through ADMIN_EMAILS + auth/dev-login (dev) or the user_roles table (prod). """ async def test_admin_login_route_removed(self, client): """ Regression: /admin/login existed as a password-based route. It has been removed; any request should 404. """ response = await client.get("/admin/login") assert response.status_code == 404 async def test_admin_dev_login_route_removed(self, client): """ Regression: /admin/dev-login set session['is_admin']=True without a user_id, making the user invisible to load_user and breaking everything that checked g.user. It has been removed. """ response = await client.get("/admin/dev-login") assert response.status_code == 404 async def test_admin_required_redirects_to_auth_login(self, auth_client, db): """ Regression: admin_required used to redirect to admin.login (the password page). Now it redirects to auth.login. """ response = await auth_client.get("/admin/", follow_redirects=False) assert response.status_code in (302, 303, 307) location = response.headers.get("Location", "") assert "/auth/login" in location, ( f"Expected redirect to /auth/login, got: {location}" ) async def test_dev_login_grants_admin_role_for_admin_email( self, app, db, monkeypatch ): """ Regression: auth/dev-login was not granting the admin role because ADMIN_EMAILS was empty (missing from .env). The role must be granted when the email matches ADMIN_EMAILS. """ from beanflows import core as _core monkeypatch.setattr(_core.config, "ADMIN_EMAILS", ["admin@example.com"]) async with app.test_client() as c: response = await c.get( "/auth/dev-login?email=admin@example.com", follow_redirects=False, ) assert response.status_code in (302, 303, 307) # user_id must be set in session async with c.session_transaction() as sess: user_id = sess.get("user_id") assert user_id is not None # admin role must exist in the DB row = await core.fetch_one( "SELECT role FROM user_roles WHERE user_id = ? AND role = 'admin'", (user_id,), ) assert row is not None, "admin role was not granted via dev-login" async def test_dev_login_admin_redirects_to_admin_panel( self, app, db, monkeypatch ): """ Regression: auth/dev-login redirected everyone to dashboard.index. Admin-email logins should land directly on /admin/. """ from beanflows import core as _core monkeypatch.setattr(_core.config, "ADMIN_EMAILS", ["admin@example.com"]) async with app.test_client() as c: response = await c.get( "/auth/dev-login?email=admin@example.com", follow_redirects=False, ) location = response.headers.get("Location", "") assert "/admin" in location, ( f"Admin dev-login should redirect to /admin, got: {location}" )