fix(security): verify S256 PKCE when relying parties use it

idpyoidc advertised PKCE support but did not actually verify the code
challenge at the token endpoint, so a sent code_challenge provided no
protection. Enable the PKCE add-on restricted to S256.

Configured as non-essential: relying parties that do not send a
code_challenge continue to work (no breaking change), but any RP that uses
PKCE must use S256, and the code_verifier is verified at token exchange.
Flip essential=True (or per-client pkce_essential) to require PKCE once all
clients have migrated.

Refs: porchlight-s48

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Johan Lundberg 2026-06-04 11:00:30 +02:00
parent faeecaed59
commit 71a7c23bdd
No known key found for this signature in database
GPG key ID: A6C152738D03C7D1
2 changed files with 95 additions and 16 deletions

View file

@ -3,6 +3,7 @@
from pathlib import Path
from idpyoidc.server import Server
from idpyoidc.server.oauth2.add_on.pkce import CC_METHOD
from porchlight.config import Settings
from porchlight.oidc.claims import PorchlightUserInfo
@ -126,6 +127,20 @@ def _build_server_config(settings: Settings) -> dict:
"phone": ["phone_number", "phone_number_verified"],
},
"authentication": {},
# PKCE: advertise and verify S256 (only). Not "essential", so existing
# relying parties that do not yet send a code_challenge keep working;
# but any RP that does use PKCE must use S256 and is verified at the
# token endpoint. Set essential=True (or per-client pkce_essential) to
# require it once all clients have migrated.
"add_on": {
"pkce": {
"function": "idpyoidc.server.oauth2.add_on.pkce.add_support",
"kwargs": {
"essential": False,
"code_challenge_methods": {"S256": CC_METHOD["S256"]},
},
},
},
}

View file

@ -1,6 +1,7 @@
import hashlib
import json
import secrets
from base64 import b64encode
from base64 import b64encode, urlsafe_b64encode
from datetime import UTC, datetime
from typing import Any
from urllib.parse import parse_qs, urlparse
@ -54,19 +55,36 @@ async def _setup_rp_and_user(app: FastAPI) -> None:
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
async def _login_and_authorize(client: AsyncClient, state: str, nonce: str) -> str:
def _pkce_pair() -> tuple[str, str]:
"""Return a (code_verifier, S256 code_challenge) pair."""
verifier = secrets.token_urlsafe(64)
challenge = urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()).rstrip(b"=").decode()
return verifier, challenge
async def _login_and_authorize(
client: AsyncClient,
state: str,
nonce: str,
code_challenge: str | None = None,
code_challenge_method: str | None = None,
) -> str:
"""Perform authorization request, login, consent, and return the auth code."""
params = {
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"scope": "openid profile email",
"state": state,
"nonce": nonce,
}
if code_challenge is not None:
params["code_challenge"] = code_challenge
params["code_challenge_method"] = code_challenge_method or "S256"
# Step 1: Authorization request (unauthenticated) -> redirect to /login
auth_res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"scope": "openid profile email",
"state": state,
"nonce": nonce,
},
params=params,
follow_redirects=False,
)
assert auth_res.status_code in (302, 303), f"Expected redirect to /login, got {auth_res.status_code}"
@ -116,16 +134,19 @@ async def _login_and_authorize(client: AsyncClient, state: str, nonce: str) -> s
return code
async def _exchange_token(client: AsyncClient, code: str) -> dict[str, Any]:
async def _exchange_token(client: AsyncClient, code: str, code_verifier: str | None = None) -> dict[str, Any]:
"""Exchange authorization code for tokens."""
auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
}
if code_verifier is not None:
data["code_verifier"] = code_verifier
token_res = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
},
data=data,
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
@ -199,3 +220,46 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None:
# sub should match between id_token and userinfo
assert userinfo_data["sub"] == id_token_sub
async def test_discovery_advertises_s256_pkce(client: AsyncClient) -> None:
res = await client.get("/.well-known/openid-configuration")
assert res.status_code == 200
assert res.json().get("code_challenge_methods_supported") == ["S256"]
async def test_pkce_s256_flow_succeeds(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
await _setup_rp_and_user(app)
state, nonce = secrets.token_urlsafe(16), secrets.token_urlsafe(16)
verifier, challenge = _pkce_pair()
code = await _login_and_authorize(client, state, nonce, code_challenge=challenge)
token_data = await _exchange_token(client, code, code_verifier=verifier)
assert "access_token" in token_data
async def test_pkce_wrong_verifier_rejected(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
await _setup_rp_and_user(app)
state, nonce = secrets.token_urlsafe(16), secrets.token_urlsafe(16)
_verifier, challenge = _pkce_pair()
code = await _login_and_authorize(client, state, nonce, code_challenge=challenge)
# Exchange with a verifier that does not match the challenge -> rejected.
auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()
res = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"code_verifier": "wrong-" + secrets.token_urlsafe(48),
},
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
},
)
assert res.status_code != 200, f"PKCE verifier mismatch should be rejected, got {res.status_code}: {res.text}"