Remove password admin login, seed dev accounts, add regression tests

Admin flow:
- Remove /admin/login (password-based) and /admin/dev-login routes entirely
- admin_required now checks only the 'admin' role; redirects to auth.login
- auth/dev-login with an ADMIN_EMAILS address redirects directly to /admin/
- .env.example: replace ADMIN_PASSWORD with ADMIN_EMAILS=admin@beanflows.coffee

Dev seeding:
- Add dev_seed.py: idempotent upsert of 4 fixed accounts (admin, free,
  starter, pro) so every access tier is testable after dev_run.sh
- dev_run.sh: seed after migrations, show all 4 login shortcuts

Regression tests (37 passing):
- test_analytics.py: concurrent fetch_analytics calls return correct row
  counts (cursor thread-safety regression), column names are lowercase
- test_roles.py TestAdminAuthFlow: password login routes return 404,
  admin_required redirects to auth.login, dev-login grants admin role
  and redirects to admin panel when email is in ADMIN_EMAILS
- conftest.py: add mock_analytics fixture (fixes 7 pre-existing dashboard
  test errors); fix assertion text and lowercase metric param in tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-20 20:10:45 +01:00
parent fef9f3d705
commit d09ba91023
9 changed files with 425 additions and 69 deletions

View File

@@ -1,7 +1,6 @@
"""
Admin domain: password-protected admin panel for managing users, tasks, etc.
Admin domain: role-gated admin panel for managing users, tasks, etc.
"""
import secrets
from datetime import datetime, timedelta
from functools import wraps
from pathlib import Path
@@ -19,20 +18,6 @@ bp = Blueprint(
)
# =============================================================================
# Config
# =============================================================================
def get_admin_password() -> str:
"""Get admin password from env. Generate one if not set (dev only)."""
import os
password = os.getenv("ADMIN_PASSWORD", "")
if not password and config.DEBUG:
# In dev, use a default password
return "admin"
return password
# =============================================================================
# SQL Queries
# =============================================================================
@@ -205,13 +190,12 @@ async def get_waitlist(limit: int = 500) -> list[dict]:
# =============================================================================
def admin_required(f):
"""Require admin authentication via password (is_admin session flag) or admin role."""
"""Require the user to have the 'admin' role (granted via ADMIN_EMAILS or user_roles table)."""
@wraps(f)
async def decorated(*args, **kwargs):
is_password_admin = session.get("is_admin")
is_role_admin = "admin" in (g.get("user") or {}).get("roles", [])
if not is_password_admin and not is_role_admin:
return redirect(url_for("admin.login"))
if "admin" not in (g.get("user") or {}).get("roles", []):
await flash("Admin access required.", "error")
return redirect(url_for("auth.login"))
return await f(*args, **kwargs)
return decorated
@@ -220,50 +204,13 @@ def admin_required(f):
# Routes
# =============================================================================
@bp.route("/dev-login")
async def dev_login():
"""Instant admin login for development. Only works in DEBUG mode."""
if not config.DEBUG:
return "Not available", 404
session["is_admin"] = True
await flash("Dev admin login.", "success")
return redirect(url_for("admin.index"))
@bp.route("/login", methods=["GET", "POST"])
@csrf_protect
async def login():
"""Admin login page."""
admin_password = get_admin_password()
if not admin_password:
await flash("Admin access not configured. Set ADMIN_PASSWORD env var.", "error")
return redirect(url_for("public.landing"))
if session.get("is_admin"):
return redirect(url_for("admin.index"))
if request.method == "POST":
form = await request.form
password = form.get("password", "")
if secrets.compare_digest(password, admin_password):
session["is_admin"] = True
await flash("Welcome, admin!", "success")
return redirect(url_for("admin.index"))
else:
await flash("Invalid password.", "error")
return await render_template("admin/login.html")
@bp.route("/logout", methods=["POST"])
@csrf_protect
async def logout():
"""Admin logout."""
session.pop("is_admin", None)
await flash("Logged out of admin.", "info")
return redirect(url_for("admin.login"))
"""Log out from admin (clears full session)."""
session.clear()
await flash("Logged out.", "info")
return redirect(url_for("auth.login"))
@bp.route("/")

View File

@@ -345,6 +345,9 @@ async def dev_login():
await ensure_admin_role(user_id, email)
await flash(f"Dev login as {email}", "success")
# Drop admins straight into the panel
if email.lower() in config.ADMIN_EMAILS:
return redirect(url_for("admin.index"))
return redirect(url_for("dashboard.index"))

View File

@@ -0,0 +1,74 @@
"""
Seed dev accounts for local development. Idempotent — safe to run multiple times.
Creates four fixed accounts so you can test every access tier immediately:
admin@beanflows.coffee — admin role, free plan
trader@beanflows.coffee — free plan (no subscription)
starter@beanflows.coffee — active starter subscription
pro@beanflows.coffee — active pro subscription
Login via: http://localhost:5001/auth/dev-login?email=<email>
"""
import asyncio
from datetime import datetime, timedelta
from .core import close_db, execute, fetch_one, init_db
# email, role (or None), plan (or None for free)
ACCOUNTS = [
("admin@beanflows.coffee", "admin", None),
("trader@beanflows.coffee", None, None),
("starter@beanflows.coffee", None, "starter"),
("pro@beanflows.coffee", None, "pro"),
]
async def _upsert_user(email: str) -> int:
row = await fetch_one("SELECT id FROM users WHERE email = ?", (email,))
if row:
return row["id"]
now = datetime.utcnow().isoformat()
return await execute(
"INSERT INTO users (email, created_at) VALUES (?, ?)",
(email, now),
)
async def _upsert_subscription(user_id: int, plan: str) -> None:
now = datetime.utcnow().isoformat()
period_end = (datetime.utcnow() + timedelta(days=365)).isoformat()
existing = await fetch_one(
"SELECT id FROM subscriptions WHERE user_id = ?", (user_id,)
)
if existing:
await execute(
"UPDATE subscriptions SET plan = ?, status = 'active', current_period_end = ? WHERE user_id = ?",
(plan, period_end, user_id),
)
else:
await execute(
"""INSERT INTO subscriptions (user_id, plan, status, current_period_end, created_at)
VALUES (?, ?, 'active', ?, ?)""",
(user_id, plan, period_end, now),
)
async def seed() -> None:
await init_db()
for email, role, plan in ACCOUNTS:
user_id = await _upsert_user(email)
if role:
await execute(
"INSERT OR IGNORE INTO user_roles (user_id, role) VALUES (?, ?)",
(user_id, role),
)
if plan:
await _upsert_subscription(user_id, plan)
tag = f"[{role or plan or 'free'}]"
print(f" {email} {tag}")
await close_db()
if __name__ == "__main__":
asyncio.run(seed())