porchlight/tests/test_auth_routes/test_manage_webauthn_credential.py

132 lines
4.9 KiB
Python

import os
from base64 import urlsafe_b64encode
from datetime import UTC, datetime
from argon2 import PasswordHasher
from cryptography.hazmat.primitives.asymmetric import ec
from fido2.cose import ES256
from fido2.utils import sha256
from fido2.webauthn import (
Aaguid,
AttestationObject,
AttestedCredentialData,
AuthenticatorAttestationResponse,
AuthenticatorData,
CollectedClientData,
RegistrationResponse,
)
from httpx import AsyncClient
from porchlight.authn.password import PasswordService
from porchlight.models import PasswordCredential, User, WebAuthnCredential
RP_ID = "localhost"
ORIGIN = "http://localhost:8000"
async def _create_user_and_login(client: AsyncClient) -> str:
"""Create user with password, log in, return userid."""
app = client._transport.app # type: ignore[union-attr]
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = User(userid="lusab-bansen", username="alice", created_at=datetime.now(UTC), updated_at=datetime.now(UTC))
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
await client.post(
"/login/password",
data={"username": "alice", "password": "testpass"},
headers={"HX-Request": "true"},
)
return user.userid
def _generate_credential() -> tuple[ec.EllipticCurvePrivateKey, bytes, AttestedCredentialData]:
private_key = ec.generate_private_key(ec.SECP256R1())
cose_key = ES256.from_cryptography_key(private_key.public_key())
credential_id = os.urandom(32)
attested = AttestedCredentialData.create(aaguid=Aaguid.NONE, credential_id=credential_id, public_key=cose_key)
return private_key, credential_id, attested
def _build_registration_response(
credential_id: bytes, attested: AttestedCredentialData, challenge: bytes
) -> RegistrationResponse:
auth_data = AuthenticatorData.create(
rp_id_hash=sha256(RP_ID.encode()),
flags=AuthenticatorData.FLAG.UP | AuthenticatorData.FLAG.AT,
counter=0,
credential_data=attested,
)
attestation_object = AttestationObject.create(fmt="none", auth_data=auth_data, att_stmt={})
client_data = CollectedClientData.create(type=CollectedClientData.TYPE.CREATE, challenge=challenge, origin=ORIGIN)
return RegistrationResponse(
raw_id=credential_id,
response=AuthenticatorAttestationResponse(client_data=client_data, attestation_object=attestation_object),
)
async def test_webauthn_begin_requires_session(client: AsyncClient) -> None:
res = await client.post("/manage/credentials/webauthn/begin", follow_redirects=False)
assert res.status_code in (302, 303, 401)
async def test_webauthn_begin_returns_options(client: AsyncClient) -> None:
await _create_user_and_login(client)
res = await client.post("/manage/credentials/webauthn/begin")
assert res.status_code == 200
data = res.json()
assert "publicKey" in data
assert "challenge" in data["publicKey"]
async def test_webauthn_complete_creates_credential(client: AsyncClient) -> None:
userid = await _create_user_and_login(client)
app = client._transport.app # type: ignore[union-attr]
cred_repo = app.state.credential_repo
# Begin registration via service directly to get raw state
# (the session-based flow is hard to test e2e since we can't extract the state)
webauthn_service = app.state.webauthn_service
_private_key, credential_id, attested = _generate_credential()
_options, state = webauthn_service.begin_registration(user_id=userid.encode(), username="alice")
response = _build_registration_response(credential_id, attested, state["challenge"])
result = webauthn_service.complete_registration(state, response)
# Store credential directly to verify the repo works
cred = WebAuthnCredential(
user_id=userid,
credential_id=result.credential_data.credential_id,
public_key=bytes(result.credential_data),
)
await cred_repo.create_webauthn(cred)
creds = await cred_repo.get_webauthn_by_user(userid)
assert len(creds) == 1
assert creds[0].credential_id == credential_id
async def test_delete_webauthn_credential(client: AsyncClient) -> None:
userid = await _create_user_and_login(client)
app = client._transport.app # type: ignore[union-attr]
cred_repo = app.state.credential_repo
# User already has password credential from login. Add a webauthn credential.
await cred_repo.create_webauthn(WebAuthnCredential(user_id=userid, credential_id=b"cred1", public_key=b"key1"))
cred_id_b64 = urlsafe_b64encode(b"cred1").decode().rstrip("=")
res = await client.delete(
f"/manage/credentials/webauthn/{cred_id_b64}",
headers={"HX-Request": "true"},
)
assert res.status_code == 200
creds = await cred_repo.get_webauthn_by_user(userid)
assert len(creds) == 0