import os from base64 import urlsafe_b64encode from datetime import UTC, datetime from argon2 import PasswordHasher from cryptography.hazmat.primitives.asymmetric import ec from fido2.cose import ES256 from fido2.utils import sha256 from fido2.webauthn import ( Aaguid, AttestationObject, AttestedCredentialData, AuthenticatorAttestationResponse, AuthenticatorData, CollectedClientData, RegistrationResponse, ) from httpx import AsyncClient from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User, WebAuthnCredential from tests.conftest import get_csrf_token RP_ID = "localhost" ORIGIN = "http://localhost:8000" async def _create_user_and_login(client: AsyncClient) -> str: """Create user with password, log in, return userid.""" app = client._transport.app # type: ignore[union-attr] user_repo = app.state.user_repo cred_repo = app.state.credential_repo user = User(userid="lusab-bansen", username="alice", created_at=datetime.now(UTC), updated_at=datetime.now(UTC)) await user_repo.create(user) svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) token = await get_csrf_token(client) await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, headers={"HX-Request": "true", "X-CSRF-Token": token}, ) return user.userid def _generate_credential() -> tuple[ec.EllipticCurvePrivateKey, bytes, AttestedCredentialData]: private_key = ec.generate_private_key(ec.SECP256R1()) cose_key = ES256.from_cryptography_key(private_key.public_key()) credential_id = os.urandom(32) attested = AttestedCredentialData.create(aaguid=Aaguid.NONE, credential_id=credential_id, public_key=cose_key) return private_key, credential_id, attested def _build_registration_response( credential_id: bytes, attested: AttestedCredentialData, challenge: bytes ) -> RegistrationResponse: auth_data = AuthenticatorData.create( rp_id_hash=sha256(RP_ID.encode()), flags=AuthenticatorData.FLAG.UP | AuthenticatorData.FLAG.AT, counter=0, credential_data=attested, ) attestation_object = AttestationObject.create(fmt="none", auth_data=auth_data, att_stmt={}) client_data = CollectedClientData.create(type=CollectedClientData.TYPE.CREATE, challenge=challenge, origin=ORIGIN) return RegistrationResponse( raw_id=credential_id, response=AuthenticatorAttestationResponse(client_data=client_data, attestation_object=attestation_object), ) async def test_webauthn_begin_requires_session(client: AsyncClient) -> None: token = await get_csrf_token(client) res = await client.post( "/manage/credentials/webauthn/begin", headers={"X-CSRF-Token": token}, follow_redirects=False, ) assert res.status_code in (302, 303, 401) async def test_webauthn_begin_returns_options(client: AsyncClient) -> None: await _create_user_and_login(client) token = await get_csrf_token(client) res = await client.post( "/manage/credentials/webauthn/begin", headers={"X-CSRF-Token": token}, ) assert res.status_code == 200 data = res.json() assert "publicKey" in data assert "challenge" in data["publicKey"] async def test_webauthn_complete_creates_credential(client: AsyncClient) -> None: userid = await _create_user_and_login(client) app = client._transport.app # type: ignore[union-attr] cred_repo = app.state.credential_repo # Begin registration via service directly to get raw state # (the session-based flow is hard to test e2e since we can't extract the state) webauthn_service = app.state.webauthn_service _private_key, credential_id, attested = _generate_credential() _options, state = webauthn_service.begin_registration(user_id=userid.encode(), username="alice") response = _build_registration_response(credential_id, attested, state["challenge"]) result = webauthn_service.complete_registration(state, response) # Store credential directly to verify the repo works cred = WebAuthnCredential( user_id=userid, credential_id=result.credential_data.credential_id, public_key=bytes(result.credential_data), ) await cred_repo.create_webauthn(cred) creds = await cred_repo.get_webauthn_by_user(userid) assert len(creds) == 1 assert creds[0].credential_id == credential_id async def test_delete_webauthn_credential(client: AsyncClient) -> None: userid = await _create_user_and_login(client) app = client._transport.app # type: ignore[union-attr] cred_repo = app.state.credential_repo # User already has password credential from login. Add a webauthn credential. await cred_repo.create_webauthn(WebAuthnCredential(user_id=userid, credential_id=b"cred1", public_key=b"key1")) cred_id_b64 = urlsafe_b64encode(b"cred1").decode().rstrip("=") token = await get_csrf_token(client) res = await client.delete( f"/manage/credentials/webauthn/{cred_id_b64}", headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert res.status_code == 200 creds = await cred_repo.get_webauthn_by_user(userid) assert len(creds) == 0