feat: add WebAuthnService with fido2 registration and authentication
This commit is contained in:
parent
e6f5ea7f0c
commit
872001c6de
2 changed files with 301 additions and 0 deletions
76
src/fastapi_oidc_op/authn/webauthn.py
Normal file
76
src/fastapi_oidc_op/authn/webauthn.py
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
from collections.abc import Sequence
|
||||
from typing import Any
|
||||
|
||||
from fido2.server import Fido2Server
|
||||
from fido2.webauthn import (
|
||||
AttestedCredentialData,
|
||||
AuthenticationResponse,
|
||||
AuthenticatorData,
|
||||
PublicKeyCredentialDescriptor,
|
||||
PublicKeyCredentialRpEntity,
|
||||
PublicKeyCredentialUserEntity,
|
||||
RegistrationResponse,
|
||||
)
|
||||
|
||||
|
||||
class WebAuthnService:
|
||||
"""FIDO2 WebAuthn registration and authentication."""
|
||||
|
||||
def __init__(self, rp_id: str, rp_name: str, origin: str) -> None:
|
||||
rp = PublicKeyCredentialRpEntity(name=rp_name, id=rp_id)
|
||||
self._server = Fido2Server(rp, verify_origin=lambda o: o == origin)
|
||||
|
||||
def begin_registration(
|
||||
self,
|
||||
user_id: bytes,
|
||||
username: str,
|
||||
existing_credentials: Sequence[PublicKeyCredentialDescriptor] | None = None,
|
||||
) -> tuple[dict[str, Any], dict[str, Any]]:
|
||||
"""Begin WebAuthn registration.
|
||||
|
||||
Returns (options_dict, state_dict).
|
||||
options_dict is JSON-serializable for sending to the browser.
|
||||
state_dict must be stored by the caller and passed to complete_registration.
|
||||
"""
|
||||
user = PublicKeyCredentialUserEntity(id=user_id, name=username, display_name=username)
|
||||
options, state = self._server.register_begin(
|
||||
user=user,
|
||||
credentials=existing_credentials,
|
||||
)
|
||||
return dict(options), state
|
||||
|
||||
def complete_registration(
|
||||
self,
|
||||
state: dict[str, Any],
|
||||
response: RegistrationResponse | dict[str, Any],
|
||||
) -> AuthenticatorData:
|
||||
"""Complete WebAuthn registration.
|
||||
|
||||
Returns AuthenticatorData with credential_data containing the public key.
|
||||
"""
|
||||
return self._server.register_complete(state, response)
|
||||
|
||||
def begin_authentication(
|
||||
self,
|
||||
credentials: Sequence[PublicKeyCredentialDescriptor] | None = None,
|
||||
) -> tuple[dict[str, Any], dict[str, Any]]:
|
||||
"""Begin WebAuthn authentication.
|
||||
|
||||
Returns (options_dict, state_dict).
|
||||
"""
|
||||
options, state = self._server.authenticate_begin(credentials=credentials)
|
||||
return dict(options), state
|
||||
|
||||
def complete_authentication(
|
||||
self,
|
||||
state: dict[str, Any],
|
||||
credentials: Sequence[AttestedCredentialData],
|
||||
response: AuthenticationResponse | dict[str, Any],
|
||||
) -> AttestedCredentialData:
|
||||
"""Complete WebAuthn authentication.
|
||||
|
||||
Verifies the assertion signature against stored credentials.
|
||||
Returns the matched AttestedCredentialData.
|
||||
Raises on verification failure.
|
||||
"""
|
||||
return self._server.authenticate_complete(state, credentials, response)
|
||||
225
tests/test_authn/test_webauthn.py
Normal file
225
tests/test_authn/test_webauthn.py
Normal file
|
|
@ -0,0 +1,225 @@
|
|||
import os
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.hashes import SHA256
|
||||
from fido2.cose import ES256
|
||||
from fido2.utils import sha256
|
||||
from fido2.webauthn import (
|
||||
Aaguid,
|
||||
AttestationObject,
|
||||
AttestedCredentialData,
|
||||
AuthenticationResponse,
|
||||
AuthenticatorAssertionResponse,
|
||||
AuthenticatorAttestationResponse,
|
||||
AuthenticatorData,
|
||||
CollectedClientData,
|
||||
PublicKeyCredentialDescriptor,
|
||||
PublicKeyCredentialType,
|
||||
RegistrationResponse,
|
||||
)
|
||||
|
||||
from fastapi_oidc_op.authn.webauthn import WebAuthnService
|
||||
|
||||
RP_ID = "localhost"
|
||||
RP_NAME = "Test RP"
|
||||
ORIGIN = "http://localhost:8000"
|
||||
|
||||
|
||||
def _make_service() -> WebAuthnService:
|
||||
return WebAuthnService(rp_id=RP_ID, rp_name=RP_NAME, origin=ORIGIN)
|
||||
|
||||
|
||||
def _generate_credential() -> tuple[ec.EllipticCurvePrivateKey, bytes, AttestedCredentialData]:
|
||||
"""Generate a test credential: (private_key, credential_id, attested_credential_data)."""
|
||||
private_key = ec.generate_private_key(ec.SECP256R1())
|
||||
cose_key = ES256.from_cryptography_key(private_key.public_key())
|
||||
credential_id = os.urandom(32)
|
||||
attested = AttestedCredentialData.create(
|
||||
aaguid=Aaguid.NONE,
|
||||
credential_id=credential_id,
|
||||
public_key=cose_key,
|
||||
)
|
||||
return private_key, credential_id, attested
|
||||
|
||||
|
||||
def _build_registration_response(
|
||||
credential_id: bytes,
|
||||
attested: AttestedCredentialData,
|
||||
challenge: bytes,
|
||||
) -> RegistrationResponse:
|
||||
"""Build a valid registration response for the given challenge."""
|
||||
auth_data = AuthenticatorData.create(
|
||||
rp_id_hash=sha256(RP_ID.encode()),
|
||||
flags=AuthenticatorData.FLAG.UP | AuthenticatorData.FLAG.AT,
|
||||
counter=0,
|
||||
credential_data=attested,
|
||||
)
|
||||
attestation_object = AttestationObject.create(fmt="none", auth_data=auth_data, att_stmt={})
|
||||
client_data = CollectedClientData.create(
|
||||
type=CollectedClientData.TYPE.CREATE,
|
||||
challenge=challenge,
|
||||
origin=ORIGIN,
|
||||
)
|
||||
return RegistrationResponse(
|
||||
raw_id=credential_id,
|
||||
response=AuthenticatorAttestationResponse(
|
||||
client_data=client_data,
|
||||
attestation_object=attestation_object,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _build_authentication_response(
|
||||
private_key: ec.EllipticCurvePrivateKey,
|
||||
credential_id: bytes,
|
||||
challenge: bytes,
|
||||
counter: int = 1,
|
||||
) -> AuthenticationResponse:
|
||||
"""Build a valid authentication response signed with the private key."""
|
||||
client_data = CollectedClientData.create(
|
||||
type=CollectedClientData.TYPE.GET,
|
||||
challenge=challenge,
|
||||
origin=ORIGIN,
|
||||
)
|
||||
auth_data = AuthenticatorData.create(
|
||||
rp_id_hash=sha256(RP_ID.encode()),
|
||||
flags=AuthenticatorData.FLAG.UP,
|
||||
counter=counter,
|
||||
)
|
||||
signature = private_key.sign(auth_data + client_data.hash, ec.ECDSA(SHA256()))
|
||||
return AuthenticationResponse(
|
||||
raw_id=credential_id,
|
||||
response=AuthenticatorAssertionResponse(
|
||||
client_data=client_data,
|
||||
authenticator_data=auth_data,
|
||||
signature=signature,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# --- Registration tests ---
|
||||
|
||||
|
||||
def test_begin_registration_returns_options_and_state() -> None:
|
||||
service = _make_service()
|
||||
options, state = service.begin_registration(
|
||||
user_id=b"user-123",
|
||||
username="alice",
|
||||
)
|
||||
# Options should be a dict suitable for JSON serialization
|
||||
assert "publicKey" in options
|
||||
pub_key = options["publicKey"]
|
||||
assert "challenge" in pub_key
|
||||
assert "rp" in pub_key
|
||||
assert "user" in pub_key
|
||||
# State should be a dict with challenge
|
||||
assert "challenge" in state
|
||||
|
||||
|
||||
def test_complete_registration_returns_credential_data() -> None:
|
||||
service = _make_service()
|
||||
_private_key, credential_id, attested = _generate_credential()
|
||||
|
||||
_options, state = service.begin_registration(
|
||||
user_id=b"user-123",
|
||||
username="alice",
|
||||
)
|
||||
|
||||
# Extract challenge from state to build a matching response
|
||||
challenge = state["challenge"]
|
||||
response = _build_registration_response(credential_id, attested, challenge)
|
||||
|
||||
result = service.complete_registration(state, response)
|
||||
assert result.credential_data is not None
|
||||
assert result.credential_data.credential_id == credential_id
|
||||
|
||||
|
||||
def test_begin_registration_with_existing_credentials() -> None:
|
||||
service = _make_service()
|
||||
_, cred_id, _attested = _generate_credential()
|
||||
existing = [
|
||||
PublicKeyCredentialDescriptor(
|
||||
type=PublicKeyCredentialType.PUBLIC_KEY,
|
||||
id=cred_id,
|
||||
)
|
||||
]
|
||||
|
||||
options, _state = service.begin_registration(
|
||||
user_id=b"user-123",
|
||||
username="alice",
|
||||
existing_credentials=existing,
|
||||
)
|
||||
pub_key = options["publicKey"]
|
||||
assert "excludeCredentials" in pub_key
|
||||
assert len(pub_key["excludeCredentials"]) == 1
|
||||
|
||||
|
||||
# --- Authentication tests ---
|
||||
|
||||
|
||||
def test_begin_authentication_returns_options_and_state() -> None:
|
||||
service = _make_service()
|
||||
_, cred_id, _attested = _generate_credential()
|
||||
credentials = [
|
||||
PublicKeyCredentialDescriptor(
|
||||
type=PublicKeyCredentialType.PUBLIC_KEY,
|
||||
id=cred_id,
|
||||
)
|
||||
]
|
||||
|
||||
options, state = service.begin_authentication(credentials=credentials)
|
||||
assert "publicKey" in options
|
||||
assert "challenge" in state
|
||||
|
||||
|
||||
def test_complete_authentication_verifies_signature() -> None:
|
||||
service = _make_service()
|
||||
private_key, credential_id, attested = _generate_credential()
|
||||
|
||||
credentials = [
|
||||
PublicKeyCredentialDescriptor(
|
||||
type=PublicKeyCredentialType.PUBLIC_KEY,
|
||||
id=credential_id,
|
||||
)
|
||||
]
|
||||
|
||||
_options, state = service.begin_authentication(credentials=credentials)
|
||||
challenge = state["challenge"]
|
||||
|
||||
response = _build_authentication_response(private_key, credential_id, challenge)
|
||||
|
||||
result = service.complete_authentication(
|
||||
state=state,
|
||||
credentials=[attested],
|
||||
response=response,
|
||||
)
|
||||
assert result.credential_id == credential_id
|
||||
|
||||
|
||||
def test_complete_authentication_wrong_signature_raises() -> None:
|
||||
import pytest
|
||||
|
||||
service = _make_service()
|
||||
_private_key, credential_id, attested = _generate_credential()
|
||||
|
||||
# Generate a different key to produce a wrong signature
|
||||
wrong_key = ec.generate_private_key(ec.SECP256R1())
|
||||
|
||||
credentials = [
|
||||
PublicKeyCredentialDescriptor(
|
||||
type=PublicKeyCredentialType.PUBLIC_KEY,
|
||||
id=credential_id,
|
||||
)
|
||||
]
|
||||
|
||||
_options, state = service.begin_authentication(credentials=credentials)
|
||||
challenge = state["challenge"]
|
||||
|
||||
response = _build_authentication_response(wrong_key, credential_id, challenge)
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid signature"):
|
||||
service.complete_authentication(
|
||||
state=state,
|
||||
credentials=[attested],
|
||||
response=response,
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue