fix(security): prevent removing the last active admin
Admins could remove the admin group from, deactivate, or delete the last active admin, locking the system out of all administration. Add a count_active_admins() repo method and a _is_last_active_admin() guard, and block all three operations when they would leave zero active admins. Refs: porchlight-yq7 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
e54764cda9
commit
aedb451128
5 changed files with 110 additions and 0 deletions
|
|
@ -11,6 +11,15 @@ from porchlight.validation import GroupListInput, ProfileUpdate, UsernameInput,
|
||||||
|
|
||||||
router = APIRouter(prefix="/admin", tags=["admin"])
|
router = APIRouter(prefix="/admin", tags=["admin"])
|
||||||
|
|
||||||
|
ADMIN_GROUP = "admin"
|
||||||
|
|
||||||
|
|
||||||
|
async def _is_last_active_admin(request: Request, user: User) -> bool:
|
||||||
|
"""True if removing this user's admin access would leave zero active admins."""
|
||||||
|
if not user.active or ADMIN_GROUP not in user.groups:
|
||||||
|
return False
|
||||||
|
return await request.app.state.user_repo.count_active_admins() <= 1
|
||||||
|
|
||||||
|
|
||||||
async def _get_admin_user(request: Request) -> User | None:
|
async def _get_admin_user(request: Request) -> User | None:
|
||||||
"""Return the current user if they are an admin, else None."""
|
"""Return the current user if they are an admin, else None."""
|
||||||
|
|
@ -198,6 +207,9 @@ async def update_user_groups(
|
||||||
if user is None:
|
if user is None:
|
||||||
return HTMLResponse("User not found", status_code=404)
|
return HTMLResponse("User not found", status_code=404)
|
||||||
|
|
||||||
|
if ADMIN_GROUP not in validated.group_list and await _is_last_active_admin(request, user):
|
||||||
|
return HTMLResponse('<div role="alert">Cannot remove the last active admin</div>')
|
||||||
|
|
||||||
updated = user.model_copy(update={"groups": validated.group_list})
|
updated = user.model_copy(update={"groups": validated.group_list})
|
||||||
await user_repo.update(updated)
|
await user_repo.update(updated)
|
||||||
return HTMLResponse('<div role="status">Groups updated</div>')
|
return HTMLResponse('<div role="status">Groups updated</div>')
|
||||||
|
|
@ -241,6 +253,9 @@ async def deactivate_user(request: Request, userid: str) -> Response:
|
||||||
if user is None:
|
if user is None:
|
||||||
return HTMLResponse("User not found", status_code=404)
|
return HTMLResponse("User not found", status_code=404)
|
||||||
|
|
||||||
|
if await _is_last_active_admin(request, user):
|
||||||
|
return HTMLResponse('<div role="alert">Cannot deactivate the last active admin</div>')
|
||||||
|
|
||||||
updated = user.model_copy(update={"active": False})
|
updated = user.model_copy(update={"active": False})
|
||||||
await user_repo.update(updated)
|
await user_repo.update(updated)
|
||||||
return HTMLResponse(
|
return HTMLResponse(
|
||||||
|
|
@ -350,6 +365,10 @@ async def delete_user(request: Request, userid: str) -> Response:
|
||||||
return HTMLResponse('<div role="alert">Cannot delete your own account</div>')
|
return HTMLResponse('<div role="alert">Cannot delete your own account</div>')
|
||||||
|
|
||||||
user_repo = request.app.state.user_repo
|
user_repo = request.app.state.user_repo
|
||||||
|
target = await user_repo.get_by_userid(userid)
|
||||||
|
if target is not None and await _is_last_active_admin(request, target):
|
||||||
|
return HTMLResponse('<div role="alert">Cannot delete the last active admin</div>')
|
||||||
|
|
||||||
deleted = await user_repo.delete(userid)
|
deleted = await user_repo.delete(userid)
|
||||||
if not deleted:
|
if not deleted:
|
||||||
return HTMLResponse("User not found", status_code=404)
|
return HTMLResponse("User not found", status_code=404)
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,8 @@ class UserRepository(Protocol):
|
||||||
|
|
||||||
async def count_users(self, query: str | None = None) -> int: ...
|
async def count_users(self, query: str | None = None) -> int: ...
|
||||||
|
|
||||||
|
async def count_active_admins(self) -> int: ...
|
||||||
|
|
||||||
async def delete(self, userid: str) -> bool: ...
|
async def delete(self, userid: str) -> bool: ...
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -163,6 +163,17 @@ class SQLiteUserRepository:
|
||||||
row = await cursor.fetchone()
|
row = await cursor.fetchone()
|
||||||
return row[0] if row else 0
|
return row[0] if row else 0
|
||||||
|
|
||||||
|
async def count_active_admins(self) -> int:
|
||||||
|
async with self._db.execute(
|
||||||
|
"""
|
||||||
|
SELECT COUNT(*) FROM users u
|
||||||
|
JOIN user_groups g ON g.userid = u.userid
|
||||||
|
WHERE u.active = 1 AND g.group_name = 'admin'
|
||||||
|
"""
|
||||||
|
) as cursor:
|
||||||
|
row = await cursor.fetchone()
|
||||||
|
return row[0] if row else 0
|
||||||
|
|
||||||
async def delete(self, userid: str) -> bool:
|
async def delete(self, userid: str) -> bool:
|
||||||
cursor = await self._db.execute("DELETE FROM users WHERE userid = ?", (userid,))
|
cursor = await self._db.execute("DELETE FROM users WHERE userid = ?", (userid,))
|
||||||
await self._db.commit()
|
await self._db.commit()
|
||||||
|
|
|
||||||
|
|
@ -463,3 +463,70 @@ async def test_admin_cannot_delete_users_last_credential(client: AsyncClient) ->
|
||||||
# The last credential must survive, and the admin is told why.
|
# The last credential must survive, and the admin is told why.
|
||||||
assert await cred_repo.get_password_by_user(target.userid) is not None
|
assert await cred_repo.get_password_by_user(target.userid) is not None
|
||||||
assert "last credential" in res.text.lower()
|
assert "last credential" in res.text.lower()
|
||||||
|
|
||||||
|
|
||||||
|
async def _make_sole_admin_target(client: AsyncClient) -> User:
|
||||||
|
"""Create a second admin, then return it as the target. The logged-in
|
||||||
|
admin from _login plus this one means 2 admins; callers deactivate the
|
||||||
|
_login admin first if they need this to be the *last* admin."""
|
||||||
|
return await _create_target_user(client, userid="admin2-01", username="admin2", groups=["admin"])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cannot_deactivate_last_active_admin(client: AsyncClient) -> None:
|
||||||
|
await _login(client) # the only admin
|
||||||
|
app = client._transport.app # type: ignore[union-attr]
|
||||||
|
admin = await app.state.user_repo.get_by_username("admin")
|
||||||
|
|
||||||
|
token = await get_csrf_token(client)
|
||||||
|
res = await client.post(
|
||||||
|
f"/admin/users/{admin.userid}/deactivate",
|
||||||
|
headers={"X-CSRF-Token": token, "HX-Request": "true"},
|
||||||
|
)
|
||||||
|
still = await app.state.user_repo.get_by_userid(admin.userid)
|
||||||
|
assert still.active is True
|
||||||
|
msg = res.text.lower()
|
||||||
|
assert "last" in msg
|
||||||
|
assert "admin" in msg
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cannot_remove_admin_group_from_last_admin(client: AsyncClient) -> None:
|
||||||
|
await _login(client)
|
||||||
|
app = client._transport.app # type: ignore[union-attr]
|
||||||
|
admin = await app.state.user_repo.get_by_username("admin")
|
||||||
|
|
||||||
|
token = await get_csrf_token(client)
|
||||||
|
res = await client.post(
|
||||||
|
f"/admin/users/{admin.userid}/groups",
|
||||||
|
data={"groups": "users"}, # drops "admin"
|
||||||
|
headers={"X-CSRF-Token": token, "HX-Request": "true"},
|
||||||
|
)
|
||||||
|
still = await app.state.user_repo.get_by_userid(admin.userid)
|
||||||
|
assert "admin" in still.groups
|
||||||
|
msg = res.text.lower()
|
||||||
|
assert "last" in msg
|
||||||
|
assert "admin" in msg
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cannot_delete_last_active_admin(client: AsyncClient) -> None:
|
||||||
|
await _login(client)
|
||||||
|
app = client._transport.app # type: ignore[union-attr]
|
||||||
|
# A second, separate admin to delete (not self — self-delete is blocked separately).
|
||||||
|
target = await _create_target_user(client, userid="otheradmin-01", username="otheradmin", groups=["admin"])
|
||||||
|
# Deactivate the logged-in admin's peer? Instead make target the only ACTIVE admin
|
||||||
|
# by deactivating the _login admin via repo, so deleting target would orphan admins.
|
||||||
|
login_admin = await app.state.user_repo.get_by_username("admin")
|
||||||
|
await app.state.user_repo.update(login_admin.model_copy(update={"active": False}))
|
||||||
|
|
||||||
|
token = await get_csrf_token(client)
|
||||||
|
res = await client.request(
|
||||||
|
"DELETE",
|
||||||
|
f"/admin/users/{target.userid}",
|
||||||
|
headers={"X-CSRF-Token": token, "HX-Request": "true"},
|
||||||
|
)
|
||||||
|
assert await app.state.user_repo.get_by_userid(target.userid) is not None
|
||||||
|
msg = res.text.lower()
|
||||||
|
assert "last" in msg
|
||||||
|
assert "admin" in msg
|
||||||
|
|
|
||||||
|
|
@ -237,3 +237,14 @@ async def test_roundtrip_preserves_all_fields(user_repo: SQLiteUserRepository) -
|
||||||
assert sorted(fetched.groups) == ["admin", "users"]
|
assert sorted(fetched.groups) == ["admin", "users"]
|
||||||
assert fetched.created_at == user.created_at
|
assert fetched.created_at == user.created_at
|
||||||
assert fetched.updated_at == user.updated_at
|
assert fetched.updated_at == user.updated_at
|
||||||
|
|
||||||
|
|
||||||
|
async def test_count_active_admins(user_repo: SQLiteUserRepository) -> None:
|
||||||
|
await user_repo.create(_make_user(userid="a1", username="admin1", groups=["admin", "users"]))
|
||||||
|
await user_repo.create(_make_user(userid="a2", username="admin2", groups=["admin"]))
|
||||||
|
# Inactive admin must not count.
|
||||||
|
await user_repo.create(_make_user(userid="a3", username="admin3", groups=["admin"], active=False))
|
||||||
|
# Non-admin must not count.
|
||||||
|
await user_repo.create(_make_user(userid="u1", username="user1", groups=["users"]))
|
||||||
|
|
||||||
|
assert await user_repo.count_active_admins() == 2
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue