fix(security): make self-service last-credential guard atomic

The self-service credential delete handlers counted credentials and then
deleted in separate steps, so concurrent deletes could each see >1 and both
proceed, removing the user's last credential and locking them out.

Add atomic delete_password_if_not_last / delete_webauthn_if_not_last repo
methods (count + delete in one conditional statement) and use them in the
manage delete handlers. Removes the now-unused _count_credentials helper.

Refs: porchlight-2nv

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Johan Lundberg 2026-06-04 15:00:08 +02:00
parent 407db57279
commit 1bb76899a5
No known key found for this signature in database
GPG key ID: A6C152738D03C7D1
4 changed files with 88 additions and 15 deletions

View file

@ -13,13 +13,6 @@ from porchlight.validation import PasswordChange, PasswordSet, ProfileUpdate, fo
router = APIRouter(prefix="/manage", tags=["manage"])
async def _count_credentials(cred_repo: object, userid: str) -> int:
"""Count total credentials (password + webauthn) for a user."""
webauthn = await cred_repo.get_webauthn_by_user(userid) # type: ignore[union-attr]
password = await cred_repo.get_password_by_user(userid) # type: ignore[union-attr]
return len(webauthn) + (1 if password else 0)
@router.get("/credentials", response_class=HTMLResponse)
async def credentials_page(request: Request) -> Response:
session_user = get_session_user(request)
@ -107,11 +100,9 @@ async def delete_password(request: Request) -> Response:
userid, _username = session_user
cred_repo = request.app.state.credential_repo
count = await _count_credentials(cred_repo, userid)
if count <= 1:
# Atomic: refuses if this is the user's last credential (not raceable).
if not await cred_repo.delete_password_if_not_last(userid):
return HTMLResponse('<div role="alert">Cannot remove your last credential</div>')
await cred_repo.delete_password(userid)
return HTMLResponse('<div role="status">Password removed</div>')
@ -182,11 +173,9 @@ async def delete_webauthn(request: Request, credential_id_b64: str) -> Response:
padded = credential_id_b64 + "=" * (-len(credential_id_b64) % 4)
credential_id = urlsafe_b64decode(padded)
count = await _count_credentials(cred_repo, userid)
if count <= 1:
# Atomic: refuses if this is the user's last credential (not raceable).
if not await cred_repo.delete_webauthn_if_not_last(userid, credential_id):
return HTMLResponse('<div role="alert">Cannot remove your last credential</div>')
await cred_repo.delete_webauthn(userid, credential_id)
return HTMLResponse('<div role="status">Security key removed</div>')

View file

@ -46,6 +46,10 @@ class CredentialRepository(Protocol):
async def delete_password(self, user_id: str) -> bool: ...
async def delete_password_if_not_last(self, user_id: str) -> bool: ...
async def delete_webauthn_if_not_last(self, user_id: str, credential_id: bytes) -> bool: ...
@runtime_checkable
class MagicLinkRepository(Protocol):

View file

@ -266,6 +266,41 @@ class SQLiteCredentialRepository:
await self._db.commit()
return cursor.rowcount > 0
async def delete_password_if_not_last(self, user_id: str) -> bool:
"""Delete the password credential only if it is not the user's last
credential. The count and delete happen in one atomic statement, so it
is not raceable. Returns True if a row was deleted."""
cursor = await self._db.execute(
"""
DELETE FROM password_credentials
WHERE user_id = ?
AND (
(SELECT COUNT(*) FROM password_credentials WHERE user_id = ?)
+ (SELECT COUNT(*) FROM webauthn_credentials WHERE user_id = ?)
) > 1
""",
(user_id, user_id, user_id),
)
await self._db.commit()
return cursor.rowcount > 0
async def delete_webauthn_if_not_last(self, user_id: str, credential_id: bytes) -> bool:
"""Delete a WebAuthn credential only if it is not the user's last
credential, atomically. Returns True if a row was deleted."""
cursor = await self._db.execute(
"""
DELETE FROM webauthn_credentials
WHERE user_id = ? AND credential_id = ?
AND (
(SELECT COUNT(*) FROM password_credentials WHERE user_id = ?)
+ (SELECT COUNT(*) FROM webauthn_credentials WHERE user_id = ?)
) > 1
""",
(user_id, credential_id, user_id, user_id),
)
await self._db.commit()
return cursor.rowcount > 0
class SQLiteMagicLinkRepository:
def __init__(self, db: aiosqlite.Connection) -> None:

View file

@ -182,3 +182,48 @@ async def test_create_duplicate_webauthn(credential_repo: SQLiteCredentialReposi
with pytest.raises(DuplicateError):
await credential_repo.create_webauthn(cred)
async def test_delete_password_if_not_last_refuses_last(
credential_repo: SQLiteCredentialRepository, alice: User
) -> None:
await credential_repo.create_password(PasswordCredential(user_id=alice.userid, password_hash="h"))
# Only credential -> must refuse and leave it in place.
assert await credential_repo.delete_password_if_not_last(alice.userid) is False
assert await credential_repo.get_password_by_user(alice.userid) is not None
async def test_delete_password_if_not_last_allows_when_others_exist(
credential_repo: SQLiteCredentialRepository, alice: User
) -> None:
await credential_repo.create_password(PasswordCredential(user_id=alice.userid, password_hash="h"))
await credential_repo.create_webauthn(
WebAuthnCredential(user_id=alice.userid, credential_id=b"\x01", public_key=b"\x02")
)
assert await credential_repo.delete_password_if_not_last(alice.userid) is True
assert await credential_repo.get_password_by_user(alice.userid) is None
async def test_delete_webauthn_if_not_last_refuses_last(
credential_repo: SQLiteCredentialRepository, alice: User
) -> None:
await credential_repo.create_webauthn(
WebAuthnCredential(user_id=alice.userid, credential_id=b"\x01", public_key=b"\x02")
)
assert await credential_repo.delete_webauthn_if_not_last(alice.userid, b"\x01") is False
assert len(await credential_repo.get_webauthn_by_user(alice.userid)) == 1
async def test_delete_webauthn_if_not_last_allows_when_others_exist(
credential_repo: SQLiteCredentialRepository, alice: User
) -> None:
await credential_repo.create_webauthn(
WebAuthnCredential(user_id=alice.userid, credential_id=b"\x01", public_key=b"\x02")
)
await credential_repo.create_password(PasswordCredential(user_id=alice.userid, password_hash="h"))
assert await credential_repo.delete_webauthn_if_not_last(alice.userid, b"\x01") is True
assert len(await credential_repo.get_webauthn_by_user(alice.userid)) == 0