From aedb45112852321913ae5cc3c723abc33e2691b4 Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Fri, 5 Jun 2026 13:31:39 +0200 Subject: [PATCH] fix(security): prevent removing the last active admin Admins could remove the admin group from, deactivate, or delete the last active admin, locking the system out of all administration. Add a count_active_admins() repo method and a _is_last_active_admin() guard, and block all three operations when they would leave zero active admins. Refs: porchlight-yq7 Co-Authored-By: Claude Opus 4.8 (1M context) --- src/porchlight/admin/routes.py | 19 ++++++ src/porchlight/store/protocols.py | 2 + src/porchlight/store/sqlite/repositories.py | 11 ++++ tests/test_admin/test_admin_routes.py | 67 +++++++++++++++++++++ tests/test_store/test_sqlite_user_repo.py | 11 ++++ 5 files changed, 110 insertions(+) diff --git a/src/porchlight/admin/routes.py b/src/porchlight/admin/routes.py index de20d8a..fe7404f 100644 --- a/src/porchlight/admin/routes.py +++ b/src/porchlight/admin/routes.py @@ -11,6 +11,15 @@ from porchlight.validation import GroupListInput, ProfileUpdate, UsernameInput, router = APIRouter(prefix="/admin", tags=["admin"]) +ADMIN_GROUP = "admin" + + +async def _is_last_active_admin(request: Request, user: User) -> bool: + """True if removing this user's admin access would leave zero active admins.""" + if not user.active or ADMIN_GROUP not in user.groups: + return False + return await request.app.state.user_repo.count_active_admins() <= 1 + async def _get_admin_user(request: Request) -> User | None: """Return the current user if they are an admin, else None.""" @@ -198,6 +207,9 @@ async def update_user_groups( if user is None: return HTMLResponse("User not found", status_code=404) + if ADMIN_GROUP not in validated.group_list and await _is_last_active_admin(request, user): + return HTMLResponse('
Cannot remove the last active admin
') + updated = user.model_copy(update={"groups": validated.group_list}) await user_repo.update(updated) return HTMLResponse('
Groups updated
') @@ -241,6 +253,9 @@ async def deactivate_user(request: Request, userid: str) -> Response: if user is None: return HTMLResponse("User not found", status_code=404) + if await _is_last_active_admin(request, user): + return HTMLResponse('
Cannot deactivate the last active admin
') + updated = user.model_copy(update={"active": False}) await user_repo.update(updated) return HTMLResponse( @@ -350,6 +365,10 @@ async def delete_user(request: Request, userid: str) -> Response: return HTMLResponse('
Cannot delete your own account
') user_repo = request.app.state.user_repo + target = await user_repo.get_by_userid(userid) + if target is not None and await _is_last_active_admin(request, target): + return HTMLResponse('
Cannot delete the last active admin
') + deleted = await user_repo.delete(userid) if not deleted: return HTMLResponse("User not found", status_code=404) diff --git a/src/porchlight/store/protocols.py b/src/porchlight/store/protocols.py index 16d7c1a..53e7c37 100644 --- a/src/porchlight/store/protocols.py +++ b/src/porchlight/store/protocols.py @@ -25,6 +25,8 @@ class UserRepository(Protocol): async def count_users(self, query: str | None = None) -> int: ... + async def count_active_admins(self) -> int: ... + async def delete(self, userid: str) -> bool: ... diff --git a/src/porchlight/store/sqlite/repositories.py b/src/porchlight/store/sqlite/repositories.py index 41799d2..f9ebd81 100644 --- a/src/porchlight/store/sqlite/repositories.py +++ b/src/porchlight/store/sqlite/repositories.py @@ -163,6 +163,17 @@ class SQLiteUserRepository: row = await cursor.fetchone() return row[0] if row else 0 + async def count_active_admins(self) -> int: + async with self._db.execute( + """ + SELECT COUNT(*) FROM users u + JOIN user_groups g ON g.userid = u.userid + WHERE u.active = 1 AND g.group_name = 'admin' + """ + ) as cursor: + row = await cursor.fetchone() + return row[0] if row else 0 + async def delete(self, userid: str) -> bool: cursor = await self._db.execute("DELETE FROM users WHERE userid = ?", (userid,)) await self._db.commit() diff --git a/tests/test_admin/test_admin_routes.py b/tests/test_admin/test_admin_routes.py index b2138bf..ae87b93 100644 --- a/tests/test_admin/test_admin_routes.py +++ b/tests/test_admin/test_admin_routes.py @@ -463,3 +463,70 @@ async def test_admin_cannot_delete_users_last_credential(client: AsyncClient) -> # The last credential must survive, and the admin is told why. assert await cred_repo.get_password_by_user(target.userid) is not None assert "last credential" in res.text.lower() + + +async def _make_sole_admin_target(client: AsyncClient) -> User: + """Create a second admin, then return it as the target. The logged-in + admin from _login plus this one means 2 admins; callers deactivate the + _login admin first if they need this to be the *last* admin.""" + return await _create_target_user(client, userid="admin2-01", username="admin2", groups=["admin"]) + + +@pytest.mark.asyncio +async def test_cannot_deactivate_last_active_admin(client: AsyncClient) -> None: + await _login(client) # the only admin + app = client._transport.app # type: ignore[union-attr] + admin = await app.state.user_repo.get_by_username("admin") + + token = await get_csrf_token(client) + res = await client.post( + f"/admin/users/{admin.userid}/deactivate", + headers={"X-CSRF-Token": token, "HX-Request": "true"}, + ) + still = await app.state.user_repo.get_by_userid(admin.userid) + assert still.active is True + msg = res.text.lower() + assert "last" in msg + assert "admin" in msg + + +@pytest.mark.asyncio +async def test_cannot_remove_admin_group_from_last_admin(client: AsyncClient) -> None: + await _login(client) + app = client._transport.app # type: ignore[union-attr] + admin = await app.state.user_repo.get_by_username("admin") + + token = await get_csrf_token(client) + res = await client.post( + f"/admin/users/{admin.userid}/groups", + data={"groups": "users"}, # drops "admin" + headers={"X-CSRF-Token": token, "HX-Request": "true"}, + ) + still = await app.state.user_repo.get_by_userid(admin.userid) + assert "admin" in still.groups + msg = res.text.lower() + assert "last" in msg + assert "admin" in msg + + +@pytest.mark.asyncio +async def test_cannot_delete_last_active_admin(client: AsyncClient) -> None: + await _login(client) + app = client._transport.app # type: ignore[union-attr] + # A second, separate admin to delete (not self — self-delete is blocked separately). + target = await _create_target_user(client, userid="otheradmin-01", username="otheradmin", groups=["admin"]) + # Deactivate the logged-in admin's peer? Instead make target the only ACTIVE admin + # by deactivating the _login admin via repo, so deleting target would orphan admins. + login_admin = await app.state.user_repo.get_by_username("admin") + await app.state.user_repo.update(login_admin.model_copy(update={"active": False})) + + token = await get_csrf_token(client) + res = await client.request( + "DELETE", + f"/admin/users/{target.userid}", + headers={"X-CSRF-Token": token, "HX-Request": "true"}, + ) + assert await app.state.user_repo.get_by_userid(target.userid) is not None + msg = res.text.lower() + assert "last" in msg + assert "admin" in msg diff --git a/tests/test_store/test_sqlite_user_repo.py b/tests/test_store/test_sqlite_user_repo.py index bc6255e..aa2e555 100644 --- a/tests/test_store/test_sqlite_user_repo.py +++ b/tests/test_store/test_sqlite_user_repo.py @@ -237,3 +237,14 @@ async def test_roundtrip_preserves_all_fields(user_repo: SQLiteUserRepository) - assert sorted(fetched.groups) == ["admin", "users"] assert fetched.created_at == user.created_at assert fetched.updated_at == user.updated_at + + +async def test_count_active_admins(user_repo: SQLiteUserRepository) -> None: + await user_repo.create(_make_user(userid="a1", username="admin1", groups=["admin", "users"])) + await user_repo.create(_make_user(userid="a2", username="admin2", groups=["admin"])) + # Inactive admin must not count. + await user_repo.create(_make_user(userid="a3", username="admin3", groups=["admin"], active=False)) + # Non-admin must not count. + await user_repo.create(_make_user(userid="u1", username="user1", groups=["users"])) + + assert await user_repo.count_active_admins() == 2