diff --git a/src/fastapi_oidc_op/authn/webauthn.py b/src/fastapi_oidc_op/authn/webauthn.py new file mode 100644 index 0000000..fe37447 --- /dev/null +++ b/src/fastapi_oidc_op/authn/webauthn.py @@ -0,0 +1,76 @@ +from collections.abc import Sequence +from typing import Any + +from fido2.server import Fido2Server +from fido2.webauthn import ( + AttestedCredentialData, + AuthenticationResponse, + AuthenticatorData, + PublicKeyCredentialDescriptor, + PublicKeyCredentialRpEntity, + PublicKeyCredentialUserEntity, + RegistrationResponse, +) + + +class WebAuthnService: + """FIDO2 WebAuthn registration and authentication.""" + + def __init__(self, rp_id: str, rp_name: str, origin: str) -> None: + rp = PublicKeyCredentialRpEntity(name=rp_name, id=rp_id) + self._server = Fido2Server(rp, verify_origin=lambda o: o == origin) + + def begin_registration( + self, + user_id: bytes, + username: str, + existing_credentials: Sequence[PublicKeyCredentialDescriptor] | None = None, + ) -> tuple[dict[str, Any], dict[str, Any]]: + """Begin WebAuthn registration. + + Returns (options_dict, state_dict). + options_dict is JSON-serializable for sending to the browser. + state_dict must be stored by the caller and passed to complete_registration. + """ + user = PublicKeyCredentialUserEntity(id=user_id, name=username, display_name=username) + options, state = self._server.register_begin( + user=user, + credentials=existing_credentials, + ) + return dict(options), state + + def complete_registration( + self, + state: dict[str, Any], + response: RegistrationResponse | dict[str, Any], + ) -> AuthenticatorData: + """Complete WebAuthn registration. + + Returns AuthenticatorData with credential_data containing the public key. + """ + return self._server.register_complete(state, response) + + def begin_authentication( + self, + credentials: Sequence[PublicKeyCredentialDescriptor] | None = None, + ) -> tuple[dict[str, Any], dict[str, Any]]: + """Begin WebAuthn authentication. + + Returns (options_dict, state_dict). + """ + options, state = self._server.authenticate_begin(credentials=credentials) + return dict(options), state + + def complete_authentication( + self, + state: dict[str, Any], + credentials: Sequence[AttestedCredentialData], + response: AuthenticationResponse | dict[str, Any], + ) -> AttestedCredentialData: + """Complete WebAuthn authentication. + + Verifies the assertion signature against stored credentials. + Returns the matched AttestedCredentialData. + Raises on verification failure. + """ + return self._server.authenticate_complete(state, credentials, response) diff --git a/tests/test_authn/test_webauthn.py b/tests/test_authn/test_webauthn.py new file mode 100644 index 0000000..64d400c --- /dev/null +++ b/tests/test_authn/test_webauthn.py @@ -0,0 +1,225 @@ +import os + +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.hashes import SHA256 +from fido2.cose import ES256 +from fido2.utils import sha256 +from fido2.webauthn import ( + Aaguid, + AttestationObject, + AttestedCredentialData, + AuthenticationResponse, + AuthenticatorAssertionResponse, + AuthenticatorAttestationResponse, + AuthenticatorData, + CollectedClientData, + PublicKeyCredentialDescriptor, + PublicKeyCredentialType, + RegistrationResponse, +) + +from fastapi_oidc_op.authn.webauthn import WebAuthnService + +RP_ID = "localhost" +RP_NAME = "Test RP" +ORIGIN = "http://localhost:8000" + + +def _make_service() -> WebAuthnService: + return WebAuthnService(rp_id=RP_ID, rp_name=RP_NAME, origin=ORIGIN) + + +def _generate_credential() -> tuple[ec.EllipticCurvePrivateKey, bytes, AttestedCredentialData]: + """Generate a test credential: (private_key, credential_id, attested_credential_data).""" + private_key = ec.generate_private_key(ec.SECP256R1()) + cose_key = ES256.from_cryptography_key(private_key.public_key()) + credential_id = os.urandom(32) + attested = AttestedCredentialData.create( + aaguid=Aaguid.NONE, + credential_id=credential_id, + public_key=cose_key, + ) + return private_key, credential_id, attested + + +def _build_registration_response( + credential_id: bytes, + attested: AttestedCredentialData, + challenge: bytes, +) -> RegistrationResponse: + """Build a valid registration response for the given challenge.""" + auth_data = AuthenticatorData.create( + rp_id_hash=sha256(RP_ID.encode()), + flags=AuthenticatorData.FLAG.UP | AuthenticatorData.FLAG.AT, + counter=0, + credential_data=attested, + ) + attestation_object = AttestationObject.create(fmt="none", auth_data=auth_data, att_stmt={}) + client_data = CollectedClientData.create( + type=CollectedClientData.TYPE.CREATE, + challenge=challenge, + origin=ORIGIN, + ) + return RegistrationResponse( + raw_id=credential_id, + response=AuthenticatorAttestationResponse( + client_data=client_data, + attestation_object=attestation_object, + ), + ) + + +def _build_authentication_response( + private_key: ec.EllipticCurvePrivateKey, + credential_id: bytes, + challenge: bytes, + counter: int = 1, +) -> AuthenticationResponse: + """Build a valid authentication response signed with the private key.""" + client_data = CollectedClientData.create( + type=CollectedClientData.TYPE.GET, + challenge=challenge, + origin=ORIGIN, + ) + auth_data = AuthenticatorData.create( + rp_id_hash=sha256(RP_ID.encode()), + flags=AuthenticatorData.FLAG.UP, + counter=counter, + ) + signature = private_key.sign(auth_data + client_data.hash, ec.ECDSA(SHA256())) + return AuthenticationResponse( + raw_id=credential_id, + response=AuthenticatorAssertionResponse( + client_data=client_data, + authenticator_data=auth_data, + signature=signature, + ), + ) + + +# --- Registration tests --- + + +def test_begin_registration_returns_options_and_state() -> None: + service = _make_service() + options, state = service.begin_registration( + user_id=b"user-123", + username="alice", + ) + # Options should be a dict suitable for JSON serialization + assert "publicKey" in options + pub_key = options["publicKey"] + assert "challenge" in pub_key + assert "rp" in pub_key + assert "user" in pub_key + # State should be a dict with challenge + assert "challenge" in state + + +def test_complete_registration_returns_credential_data() -> None: + service = _make_service() + _private_key, credential_id, attested = _generate_credential() + + _options, state = service.begin_registration( + user_id=b"user-123", + username="alice", + ) + + # Extract challenge from state to build a matching response + challenge = state["challenge"] + response = _build_registration_response(credential_id, attested, challenge) + + result = service.complete_registration(state, response) + assert result.credential_data is not None + assert result.credential_data.credential_id == credential_id + + +def test_begin_registration_with_existing_credentials() -> None: + service = _make_service() + _, cred_id, _attested = _generate_credential() + existing = [ + PublicKeyCredentialDescriptor( + type=PublicKeyCredentialType.PUBLIC_KEY, + id=cred_id, + ) + ] + + options, _state = service.begin_registration( + user_id=b"user-123", + username="alice", + existing_credentials=existing, + ) + pub_key = options["publicKey"] + assert "excludeCredentials" in pub_key + assert len(pub_key["excludeCredentials"]) == 1 + + +# --- Authentication tests --- + + +def test_begin_authentication_returns_options_and_state() -> None: + service = _make_service() + _, cred_id, _attested = _generate_credential() + credentials = [ + PublicKeyCredentialDescriptor( + type=PublicKeyCredentialType.PUBLIC_KEY, + id=cred_id, + ) + ] + + options, state = service.begin_authentication(credentials=credentials) + assert "publicKey" in options + assert "challenge" in state + + +def test_complete_authentication_verifies_signature() -> None: + service = _make_service() + private_key, credential_id, attested = _generate_credential() + + credentials = [ + PublicKeyCredentialDescriptor( + type=PublicKeyCredentialType.PUBLIC_KEY, + id=credential_id, + ) + ] + + _options, state = service.begin_authentication(credentials=credentials) + challenge = state["challenge"] + + response = _build_authentication_response(private_key, credential_id, challenge) + + result = service.complete_authentication( + state=state, + credentials=[attested], + response=response, + ) + assert result.credential_id == credential_id + + +def test_complete_authentication_wrong_signature_raises() -> None: + import pytest + + service = _make_service() + _private_key, credential_id, attested = _generate_credential() + + # Generate a different key to produce a wrong signature + wrong_key = ec.generate_private_key(ec.SECP256R1()) + + credentials = [ + PublicKeyCredentialDescriptor( + type=PublicKeyCredentialType.PUBLIC_KEY, + id=credential_id, + ) + ] + + _options, state = service.begin_authentication(credentials=credentials) + challenge = state["challenge"] + + response = _build_authentication_response(wrong_key, credential_id, challenge) + + with pytest.raises(ValueError, match="Invalid signature"): + service.complete_authentication( + state=state, + credentials=[attested], + response=response, + )