implement ACCR aware authentications

This commit is contained in:
Johan Lundberg 2026-06-29 09:19:52 +02:00
parent 7d06d747d6
commit 8143db5aea
No known key found for this signature in database
GPG key ID: A6C152738D03C7D1
4 changed files with 144 additions and 8 deletions

View file

@ -7,15 +7,28 @@ from typing import Any
from urllib.parse import parse_qs, urlparse
from argon2 import PasswordHasher
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.hashes import SHA256
from cryptojwt.jwk.jwk import key_from_jwk_dict
from cryptojwt.jws.jws import JWS
from fastapi import FastAPI
from fido2.cose import ES256
from fido2.utils import sha256, websafe_decode, websafe_encode
from fido2.webauthn import (
Aaguid,
AttestedCredentialData,
AuthenticatorData,
CollectedClientData,
)
from httpx import AsyncClient
from porchlight.authn.acr import ACR_PASSWORD, ACR_WEBAUTHN
from porchlight.authn.password import PasswordService
from porchlight.models import PasswordCredential, User
from porchlight.models import PasswordCredential, User, WebAuthnCredential
from tests.conftest import get_csrf_token
RP_ID = "localhost"
CLIENT_ID = "e2e-rp"
CLIENT_SECRET = "e2e-secret-0123456789abcdef" # 30+ chars
REDIRECT_URI = "http://localhost:9000/callback"
@ -161,7 +174,9 @@ async def _exchange_token(client: AsyncClient, code: str, code_verifier: str | N
return token_data
async def _validate_id_token(client: AsyncClient, id_token_jwt: str, nonce: str) -> str:
async def _validate_id_token(
client: AsyncClient, id_token_jwt: str, nonce: str, expected_acr: str = ACR_PASSWORD
) -> str:
"""Validate the ID token via JWKS and return the sub claim."""
jwks_res = await client.get("/jwks")
assert jwks_res.status_code == 200
@ -184,6 +199,7 @@ async def _validate_id_token(client: AsyncClient, id_token_jwt: str, nonce: str)
assert id_token_payload["iss"] == "http://localhost:8000"
assert CLIENT_ID in id_token_payload["aud"]
assert id_token_payload["nonce"] == nonce
assert id_token_payload["acr"] == expected_acr
id_token_sub = id_token_payload["sub"]
assert isinstance(id_token_sub, str)
@ -329,3 +345,93 @@ async def test_refresh_grant_does_not_mint_id_token(client: AsyncClient) -> None
refreshed = res.json()
assert "access_token" in refreshed
assert "id_token" not in refreshed
async def _add_webauthn_credential(
app: FastAPI, userid: str
) -> tuple[ec.EllipticCurvePrivateKey, bytes]:
"""Register a WebAuthn credential for an existing user; return (key, cred_id)."""
private_key = ec.generate_private_key(ec.SECP256R1())
cose_key = ES256.from_cryptography_key(private_key.public_key())
credential_id = secrets.token_bytes(32)
attested = AttestedCredentialData.create(
aaguid=Aaguid.NONE, credential_id=credential_id, public_key=cose_key
)
await app.state.credential_repo.create_webauthn(
WebAuthnCredential(user_id=userid, credential_id=credential_id, public_key=bytes(attested))
)
return private_key, credential_id
async def _login_webauthn_and_authorize(
client: AsyncClient, userid: str, private_key: ec.EllipticCurvePrivateKey, credential_id: bytes, state: str, nonce: str
) -> str:
"""Authorize, log in via WebAuthn assertion, consent, return the auth code."""
auth_res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"scope": "openid profile email",
"state": state,
"nonce": nonce,
},
follow_redirects=False,
)
assert "/login" in auth_res.headers["location"]
# Begin WebAuthn login (usernameless) to seed the challenge into the session.
token = await get_csrf_token(client)
begin = await client.post("/login/webauthn/begin", headers={"X-CSRF-Token": token})
assert begin.status_code == 200
challenge = websafe_decode(begin.json()["publicKey"]["challenge"])
# Sign a valid assertion with the registered key.
client_data = CollectedClientData.create(
type=CollectedClientData.TYPE.GET, challenge=challenge, origin=f"http://{RP_ID}:8000"
)
auth_data = AuthenticatorData.create(rp_id_hash=sha256(RP_ID.encode()), flags=AuthenticatorData.FLAG.UP, counter=1)
signature = private_key.sign(bytes(auth_data) + client_data.hash, ec.ECDSA(SHA256()))
complete = await client.post(
"/login/webauthn/complete",
json={
"id": websafe_encode(credential_id),
"rawId": websafe_encode(credential_id),
"type": "public-key",
"response": {
"clientDataJSON": websafe_encode(bytes(client_data)),
"authenticatorData": websafe_encode(bytes(auth_data)),
"signature": websafe_encode(signature),
"userHandle": websafe_encode(userid.encode()),
},
},
headers={"X-CSRF-Token": token},
)
assert complete.status_code == 200, f"WebAuthn login failed: {complete.text}"
assert "/authorization/complete" in complete.json()["redirect"]
complete_res = await client.get("/authorization/complete", follow_redirects=False)
assert "/consent" in complete_res.headers["location"]
token = await get_csrf_token(client)
consent_res = await client.post(
"/consent",
data={"action": "allow", "scope": ["openid", "profile", "email"]},
headers={"X-CSRF-Token": token},
follow_redirects=False,
)
return parse_qs(urlparse(consent_res.headers["location"]).query)["code"][0]
async def test_webauthn_login_emits_mfa_acr(client: AsyncClient) -> None:
"""A WebAuthn login must surface the phishing-resistant MFA acr, not the
static password acr."""
app = client._transport.app # type: ignore[union-attr]
await _setup_rp_and_user(app)
private_key, credential_id = await _add_webauthn_credential(app, "lusab-bansen")
state, nonce = secrets.token_urlsafe(16), secrets.token_urlsafe(16)
code = await _login_webauthn_and_authorize(client, "lusab-bansen", private_key, credential_id, state, nonce)
token_data = await _exchange_token(client, code)
await _validate_id_token(client, token_data["id_token"], nonce, expected_acr=ACR_WEBAUTHN)