diff --git a/src/porchlight/oidc/provider.py b/src/porchlight/oidc/provider.py index d1ac38b..aae64d3 100644 --- a/src/porchlight/oidc/provider.py +++ b/src/porchlight/oidc/provider.py @@ -3,6 +3,7 @@ from pathlib import Path from idpyoidc.server import Server +from idpyoidc.server.oauth2.add_on.pkce import CC_METHOD from porchlight.config import Settings from porchlight.oidc.claims import PorchlightUserInfo @@ -126,6 +127,20 @@ def _build_server_config(settings: Settings) -> dict: "phone": ["phone_number", "phone_number_verified"], }, "authentication": {}, + # PKCE: advertise and verify S256 (only). Not "essential", so existing + # relying parties that do not yet send a code_challenge keep working; + # but any RP that does use PKCE must use S256 and is verified at the + # token endpoint. Set essential=True (or per-client pkce_essential) to + # require it once all clients have migrated. + "add_on": { + "pkce": { + "function": "idpyoidc.server.oauth2.add_on.pkce.add_support", + "kwargs": { + "essential": False, + "code_challenge_methods": {"S256": CC_METHOD["S256"]}, + }, + }, + }, } diff --git a/tests/test_oidc/test_e2e_flow.py b/tests/test_oidc/test_e2e_flow.py index b1b9545..cd3f547 100644 --- a/tests/test_oidc/test_e2e_flow.py +++ b/tests/test_oidc/test_e2e_flow.py @@ -1,6 +1,7 @@ +import hashlib import json import secrets -from base64 import b64encode +from base64 import b64encode, urlsafe_b64encode from datetime import UTC, datetime from typing import Any from urllib.parse import parse_qs, urlparse @@ -54,19 +55,36 @@ async def _setup_rp_and_user(app: FastAPI) -> None: await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) -async def _login_and_authorize(client: AsyncClient, state: str, nonce: str) -> str: +def _pkce_pair() -> tuple[str, str]: + """Return a (code_verifier, S256 code_challenge) pair.""" + verifier = secrets.token_urlsafe(64) + challenge = urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()).rstrip(b"=").decode() + return verifier, challenge + + +async def _login_and_authorize( + client: AsyncClient, + state: str, + nonce: str, + code_challenge: str | None = None, + code_challenge_method: str | None = None, +) -> str: """Perform authorization request, login, consent, and return the auth code.""" + params = { + "response_type": "code", + "client_id": CLIENT_ID, + "redirect_uri": REDIRECT_URI, + "scope": "openid profile email", + "state": state, + "nonce": nonce, + } + if code_challenge is not None: + params["code_challenge"] = code_challenge + params["code_challenge_method"] = code_challenge_method or "S256" # Step 1: Authorization request (unauthenticated) -> redirect to /login auth_res = await client.get( "/authorization", - params={ - "response_type": "code", - "client_id": CLIENT_ID, - "redirect_uri": REDIRECT_URI, - "scope": "openid profile email", - "state": state, - "nonce": nonce, - }, + params=params, follow_redirects=False, ) assert auth_res.status_code in (302, 303), f"Expected redirect to /login, got {auth_res.status_code}" @@ -116,16 +134,19 @@ async def _login_and_authorize(client: AsyncClient, state: str, nonce: str) -> s return code -async def _exchange_token(client: AsyncClient, code: str) -> dict[str, Any]: +async def _exchange_token(client: AsyncClient, code: str, code_verifier: str | None = None) -> dict[str, Any]: """Exchange authorization code for tokens.""" auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode() + data = { + "grant_type": "authorization_code", + "code": code, + "redirect_uri": REDIRECT_URI, + } + if code_verifier is not None: + data["code_verifier"] = code_verifier token_res = await client.post( "/token", - data={ - "grant_type": "authorization_code", - "code": code, - "redirect_uri": REDIRECT_URI, - }, + data=data, headers={ "Authorization": f"Basic {auth_header}", "Content-Type": "application/x-www-form-urlencoded", @@ -199,3 +220,46 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None: # sub should match between id_token and userinfo assert userinfo_data["sub"] == id_token_sub + + +async def test_discovery_advertises_s256_pkce(client: AsyncClient) -> None: + res = await client.get("/.well-known/openid-configuration") + assert res.status_code == 200 + assert res.json().get("code_challenge_methods_supported") == ["S256"] + + +async def test_pkce_s256_flow_succeeds(client: AsyncClient) -> None: + app = client._transport.app # type: ignore[union-attr] + await _setup_rp_and_user(app) + state, nonce = secrets.token_urlsafe(16), secrets.token_urlsafe(16) + verifier, challenge = _pkce_pair() + + code = await _login_and_authorize(client, state, nonce, code_challenge=challenge) + token_data = await _exchange_token(client, code, code_verifier=verifier) + assert "access_token" in token_data + + +async def test_pkce_wrong_verifier_rejected(client: AsyncClient) -> None: + app = client._transport.app # type: ignore[union-attr] + await _setup_rp_and_user(app) + state, nonce = secrets.token_urlsafe(16), secrets.token_urlsafe(16) + _verifier, challenge = _pkce_pair() + + code = await _login_and_authorize(client, state, nonce, code_challenge=challenge) + + # Exchange with a verifier that does not match the challenge -> rejected. + auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode() + res = await client.post( + "/token", + data={ + "grant_type": "authorization_code", + "code": code, + "redirect_uri": REDIRECT_URI, + "code_verifier": "wrong-" + secrets.token_urlsafe(48), + }, + headers={ + "Authorization": f"Basic {auth_header}", + "Content-Type": "application/x-www-form-urlencoded", + }, + ) + assert res.status_code != 200, f"PKCE verifier mismatch should be rejected, got {res.status_code}: {res.text}"