From 8143db5aea15ac69d2382e7632cc9ba9923929c4 Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Mon, 29 Jun 2026 09:19:52 +0200 Subject: [PATCH] implement ACCR aware authentications --- src/porchlight/authn/acr.py | 19 ++++++ src/porchlight/authn/routes.py | 16 +++-- src/porchlight/oidc/endpoints.py | 7 +- tests/test_oidc/test_e2e_flow.py | 110 ++++++++++++++++++++++++++++++- 4 files changed, 144 insertions(+), 8 deletions(-) create mode 100644 src/porchlight/authn/acr.py diff --git a/src/porchlight/authn/acr.py b/src/porchlight/authn/acr.py new file mode 100644 index 0000000..edf54ae --- /dev/null +++ b/src/porchlight/authn/acr.py @@ -0,0 +1,19 @@ +"""Authentication Context Class Reference (``acr``) values per login method. + +The ``acr`` claim tells relying parties *how* the user authenticated. idpyoidc +only carries ``acr`` through the authentication event (it has no ``amr`` +support), so this single value is our only signal of authentication strength — +it must reflect the method actually used, not a static placeholder. + +The chosen ``acr`` for a login is recorded in the session under +:data:`SESSION_ACR_KEY` and read back when the OIDC authorization is completed. +""" + +# Password over TLS. SAML Authentication Context class — single factor. +ACR_PASSWORD = "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport" + +# WebAuthn / passkey. REFEDS MFA profile — phishing-resistant authentication. +ACR_WEBAUTHN = "https://refeds.org/profile/mfa" + +# Session key holding the acr for the current authenticated login. +SESSION_ACR_KEY = "acr" diff --git a/src/porchlight/authn/routes.py b/src/porchlight/authn/routes.py index 465ebde..979cbc9 100644 --- a/src/porchlight/authn/routes.py +++ b/src/porchlight/authn/routes.py @@ -5,6 +5,7 @@ from fastapi import APIRouter, Form, Request, Response from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from fido2.webauthn import AttestedCredentialData, AuthenticationResponse +from porchlight.authn.acr import ACR_PASSWORD, ACR_WEBAUTHN, SESSION_ACR_KEY from porchlight.models import User from porchlight.rate_limit import limiter from porchlight.userid import generate_unique_userid @@ -36,12 +37,15 @@ def _is_sign_count_rollback(stored: int, presented: int) -> bool: return presented <= stored -def _establish_authenticated_session(request: Request, user: User) -> None: +def _establish_authenticated_session(request: Request, user: User, acr: str) -> None: """Reset the session before recording the authenticated identity. Clearing first defeats session fixation: any values an attacker planted in the pre-auth session are dropped and the session cookie is reissued. A pending OIDC authorization request is the only pre-auth state worth keeping. + + ``acr`` records how the user actually authenticated so the OIDC layer can + surface a truthful Authentication Context Class Reference (see acr.py). """ pending_oidc = request.session.get("oidc_auth_request") request.session.clear() @@ -49,6 +53,7 @@ def _establish_authenticated_session(request: Request, user: User) -> None: request.session["oidc_auth_request"] = pending_oidc request.session["userid"] = user.userid request.session["username"] = user.username + request.session[SESSION_ACR_KEY] = acr @router.get("/login", response_class=HTMLResponse) @@ -84,7 +89,7 @@ async def login_password( if not user.active: return HTMLResponse(error_html) - _establish_authenticated_session(request, user) + _establish_authenticated_session(request, user, ACR_PASSWORD) response = Response() response.headers["HX-Redirect"] = _login_redirect_target(request) @@ -152,7 +157,10 @@ async def register_magic_link(request: Request, token: str) -> Response: user = User(userid=userid, username=link.username, groups=["users"]) await user_repo.create(user) - _establish_authenticated_session(request, user) + # Magic-link registration is single-factor (email possession); mark it as + # such. It normally redirects to credential setup rather than completing an + # OIDC flow, but the session acr governs any later authorization too. + _establish_authenticated_session(request, user, ACR_PASSWORD) return RedirectResponse("/manage/credentials?setup=1", status_code=303) @@ -217,6 +225,6 @@ async def login_webauthn_complete(request: Request) -> Response: # noqa: PLR091 if user is None or not user.active: return JSONResponse({"error": "Authentication failed"}, status_code=400) - _establish_authenticated_session(request, user) + _establish_authenticated_session(request, user, ACR_WEBAUTHN) return JSONResponse({"redirect": _login_redirect_target(request)}) diff --git a/src/porchlight/oidc/endpoints.py b/src/porchlight/oidc/endpoints.py index 2390802..963ec2d 100644 --- a/src/porchlight/oidc/endpoints.py +++ b/src/porchlight/oidc/endpoints.py @@ -12,6 +12,7 @@ from fastapi import APIRouter, Request, Response from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from idpyoidc.time_util import utc_time_sans_frac +from porchlight.authn.acr import ACR_PASSWORD, SESSION_ACR_KEY from porchlight.oidc.claims import PorchlightUserInfo, user_to_claims logger = logging.getLogger(__name__) @@ -169,12 +170,14 @@ async def _complete_authorization( # noqa: PLR0913 claims = user_to_claims(user) userinfo.set_user_claims(username, claims) - # Create idpyoidc session — authn_method needs a kwargs dict + # Create idpyoidc session — authn_method needs a kwargs dict. The acr + # reflects how the user actually authenticated (set at login); fall back to + # the single-factor password acr if absent. authn_method = SimpleNamespace(kwargs={}) session_id = endpoint.create_session( # type: ignore[union-attr] request=parsed, user_id=username, - acr="urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport", + acr=request.session.get(SESSION_ACR_KEY, ACR_PASSWORD), time_stamp=utc_time_sans_frac(), authn_method=authn_method, ) diff --git a/tests/test_oidc/test_e2e_flow.py b/tests/test_oidc/test_e2e_flow.py index b0f367c..df74047 100644 --- a/tests/test_oidc/test_e2e_flow.py +++ b/tests/test_oidc/test_e2e_flow.py @@ -7,15 +7,28 @@ from typing import Any from urllib.parse import parse_qs, urlparse from argon2 import PasswordHasher +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.hashes import SHA256 from cryptojwt.jwk.jwk import key_from_jwk_dict from cryptojwt.jws.jws import JWS from fastapi import FastAPI +from fido2.cose import ES256 +from fido2.utils import sha256, websafe_decode, websafe_encode +from fido2.webauthn import ( + Aaguid, + AttestedCredentialData, + AuthenticatorData, + CollectedClientData, +) from httpx import AsyncClient +from porchlight.authn.acr import ACR_PASSWORD, ACR_WEBAUTHN from porchlight.authn.password import PasswordService -from porchlight.models import PasswordCredential, User +from porchlight.models import PasswordCredential, User, WebAuthnCredential from tests.conftest import get_csrf_token +RP_ID = "localhost" + CLIENT_ID = "e2e-rp" CLIENT_SECRET = "e2e-secret-0123456789abcdef" # 30+ chars REDIRECT_URI = "http://localhost:9000/callback" @@ -161,7 +174,9 @@ async def _exchange_token(client: AsyncClient, code: str, code_verifier: str | N return token_data -async def _validate_id_token(client: AsyncClient, id_token_jwt: str, nonce: str) -> str: +async def _validate_id_token( + client: AsyncClient, id_token_jwt: str, nonce: str, expected_acr: str = ACR_PASSWORD +) -> str: """Validate the ID token via JWKS and return the sub claim.""" jwks_res = await client.get("/jwks") assert jwks_res.status_code == 200 @@ -184,6 +199,7 @@ async def _validate_id_token(client: AsyncClient, id_token_jwt: str, nonce: str) assert id_token_payload["iss"] == "http://localhost:8000" assert CLIENT_ID in id_token_payload["aud"] assert id_token_payload["nonce"] == nonce + assert id_token_payload["acr"] == expected_acr id_token_sub = id_token_payload["sub"] assert isinstance(id_token_sub, str) @@ -329,3 +345,93 @@ async def test_refresh_grant_does_not_mint_id_token(client: AsyncClient) -> None refreshed = res.json() assert "access_token" in refreshed assert "id_token" not in refreshed + + +async def _add_webauthn_credential( + app: FastAPI, userid: str +) -> tuple[ec.EllipticCurvePrivateKey, bytes]: + """Register a WebAuthn credential for an existing user; return (key, cred_id).""" + private_key = ec.generate_private_key(ec.SECP256R1()) + cose_key = ES256.from_cryptography_key(private_key.public_key()) + credential_id = secrets.token_bytes(32) + attested = AttestedCredentialData.create( + aaguid=Aaguid.NONE, credential_id=credential_id, public_key=cose_key + ) + await app.state.credential_repo.create_webauthn( + WebAuthnCredential(user_id=userid, credential_id=credential_id, public_key=bytes(attested)) + ) + return private_key, credential_id + + +async def _login_webauthn_and_authorize( + client: AsyncClient, userid: str, private_key: ec.EllipticCurvePrivateKey, credential_id: bytes, state: str, nonce: str +) -> str: + """Authorize, log in via WebAuthn assertion, consent, return the auth code.""" + auth_res = await client.get( + "/authorization", + params={ + "response_type": "code", + "client_id": CLIENT_ID, + "redirect_uri": REDIRECT_URI, + "scope": "openid profile email", + "state": state, + "nonce": nonce, + }, + follow_redirects=False, + ) + assert "/login" in auth_res.headers["location"] + + # Begin WebAuthn login (usernameless) to seed the challenge into the session. + token = await get_csrf_token(client) + begin = await client.post("/login/webauthn/begin", headers={"X-CSRF-Token": token}) + assert begin.status_code == 200 + challenge = websafe_decode(begin.json()["publicKey"]["challenge"]) + + # Sign a valid assertion with the registered key. + client_data = CollectedClientData.create( + type=CollectedClientData.TYPE.GET, challenge=challenge, origin=f"http://{RP_ID}:8000" + ) + auth_data = AuthenticatorData.create(rp_id_hash=sha256(RP_ID.encode()), flags=AuthenticatorData.FLAG.UP, counter=1) + signature = private_key.sign(bytes(auth_data) + client_data.hash, ec.ECDSA(SHA256())) + complete = await client.post( + "/login/webauthn/complete", + json={ + "id": websafe_encode(credential_id), + "rawId": websafe_encode(credential_id), + "type": "public-key", + "response": { + "clientDataJSON": websafe_encode(bytes(client_data)), + "authenticatorData": websafe_encode(bytes(auth_data)), + "signature": websafe_encode(signature), + "userHandle": websafe_encode(userid.encode()), + }, + }, + headers={"X-CSRF-Token": token}, + ) + assert complete.status_code == 200, f"WebAuthn login failed: {complete.text}" + assert "/authorization/complete" in complete.json()["redirect"] + + complete_res = await client.get("/authorization/complete", follow_redirects=False) + assert "/consent" in complete_res.headers["location"] + + token = await get_csrf_token(client) + consent_res = await client.post( + "/consent", + data={"action": "allow", "scope": ["openid", "profile", "email"]}, + headers={"X-CSRF-Token": token}, + follow_redirects=False, + ) + return parse_qs(urlparse(consent_res.headers["location"]).query)["code"][0] + + +async def test_webauthn_login_emits_mfa_acr(client: AsyncClient) -> None: + """A WebAuthn login must surface the phishing-resistant MFA acr, not the + static password acr.""" + app = client._transport.app # type: ignore[union-attr] + await _setup_rp_and_user(app) + private_key, credential_id = await _add_webauthn_credential(app, "lusab-bansen") + + state, nonce = secrets.token_urlsafe(16), secrets.token_urlsafe(16) + code = await _login_webauthn_and_authorize(client, "lusab-bansen", private_key, credential_id, state, nonce) + token_data = await _exchange_token(client, code) + await _validate_id_token(client, token_data["id_token"], nonce, expected_acr=ACR_WEBAUTHN)