porchlight/tests/test_oidc/test_e2e_flow.py
Johan Lundberg 27763d19ea
fix(security): don't mint new ID tokens on refresh; confirm offline_access gating
idpyoidc already gates refresh-token issuance on the offline_access scope
(verified by test), but the refresh-token grant was configured to also mint
fresh ID tokens. Drop id_token from the refresh_token grant's supports_minting
so refreshing yields only access (and a rotated refresh) token; ID tokens come
from authentication. Refresh-token rotation is retained.

Refs: porchlight-553

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-10 09:25:49 +02:00

331 lines
12 KiB
Python

import hashlib
import json
import secrets
from base64 import b64encode, urlsafe_b64encode
from datetime import UTC, datetime
from typing import Any
from urllib.parse import parse_qs, urlparse
from argon2 import PasswordHasher
from cryptojwt.jwk.jwk import key_from_jwk_dict
from cryptojwt.jws.jws import JWS
from fastapi import FastAPI
from httpx import AsyncClient
from porchlight.authn.password import PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
CLIENT_ID = "e2e-rp"
CLIENT_SECRET = "e2e-secret-0123456789abcdef" # 30+ chars
REDIRECT_URI = "http://localhost:9000/callback"
async def _setup_rp_and_user(app: FastAPI) -> None:
"""Register an RP client and create a test user with password."""
oidc_server = app.state.oidc_server
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
oidc_server.context.cdb[CLIENT_ID] = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"redirect_uris": [(REDIRECT_URI, {})],
"response_types_supported": ["code"],
"token_endpoint_auth_method": "client_secret_basic",
"scope": ["openid", "profile", "email", "offline_access"],
"allowed_scopes": ["openid", "profile", "email", "offline_access"],
"client_salt": secrets.token_hex(8),
}
oidc_server.keyjar.add_symmetric(CLIENT_ID, CLIENT_SECRET)
user = User(
userid="lusab-bansen",
username="alice",
email="alice@example.com",
email_verified=True,
given_name="Alice",
family_name="Wonderland",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
def _pkce_pair() -> tuple[str, str]:
"""Return a (code_verifier, S256 code_challenge) pair."""
verifier = secrets.token_urlsafe(64)
challenge = urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()).rstrip(b"=").decode()
return verifier, challenge
async def _login_and_authorize(
client: AsyncClient,
state: str,
nonce: str,
code_challenge: str | None = None,
code_challenge_method: str | None = None,
) -> str:
"""Perform authorization request, login, consent, and return the auth code."""
params = {
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"scope": "openid profile email",
"state": state,
"nonce": nonce,
}
if code_challenge is not None:
params["code_challenge"] = code_challenge
params["code_challenge_method"] = code_challenge_method or "S256"
# Step 1: Authorization request (unauthenticated) -> redirect to /login
auth_res = await client.get(
"/authorization",
params=params,
follow_redirects=False,
)
assert auth_res.status_code in (302, 303), f"Expected redirect to /login, got {auth_res.status_code}"
assert "/login" in auth_res.headers["location"]
# Step 2: Password login -> HX-Redirect to /authorization/complete
token = await get_csrf_token(client)
login_res = await client.post(
"/login/password",
data={"username": "alice", "password": "testpass"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
assert login_res.status_code == 200
hx_redirect = login_res.headers.get("HX-Redirect", "")
assert "/authorization/complete" in hx_redirect, (
f"Expected HX-Redirect to /authorization/complete, got '{hx_redirect}'"
)
# Step 3: Complete authorization -> redirect to consent
complete_res = await client.get("/authorization/complete", follow_redirects=False)
assert complete_res.status_code in (302, 303), (
f"Expected redirect to /consent, got {complete_res.status_code}: {complete_res.text}"
)
assert "/consent" in complete_res.headers["location"]
# Step 3b: Approve consent -> redirect to callback with code + state
token = await get_csrf_token(client)
consent_res = await client.post(
"/consent",
data={"action": "allow", "scope": ["openid", "profile", "email"]},
headers={"X-CSRF-Token": token},
follow_redirects=False,
)
assert consent_res.status_code in (302, 303)
location = consent_res.headers["location"]
parsed = urlparse(location)
assert parsed.netloc == "localhost:9000"
assert parsed.path == "/callback"
callback_params = parse_qs(parsed.query)
assert "code" in callback_params, f"No code in redirect: {location}"
assert callback_params["state"] == [state], f"State mismatch: expected {state}, got {callback_params.get('state')}"
code = callback_params["code"][0]
assert len(code) > 0
return code
async def _exchange_token(client: AsyncClient, code: str, code_verifier: str | None = None) -> dict[str, Any]:
"""Exchange authorization code for tokens."""
auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
}
if code_verifier is not None:
data["code_verifier"] = code_verifier
token_res = await client.post(
"/token",
data=data,
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
},
)
assert token_res.status_code == 200, f"Token endpoint failed: {token_res.text}"
token_data = token_res.json()
assert "access_token" in token_data
assert "id_token" in token_data
assert token_data["token_type"].lower() == "bearer"
return token_data
async def _validate_id_token(client: AsyncClient, id_token_jwt: str, nonce: str) -> str:
"""Validate the ID token via JWKS and return the sub claim."""
jwks_res = await client.get("/jwks")
assert jwks_res.status_code == 200
jwks = jwks_res.json()
assert "keys" in jwks
assert len(jwks["keys"]) > 0
id_token_header = json.loads(JWS()._decode(id_token_jwt.split(".", maxsplit=1)[0]))
kid = id_token_header.get("kid")
matching_keys = [k for k in jwks["keys"] if k.get("kid") == kid]
assert len(matching_keys) == 1, f"Expected 1 matching key for kid={kid}, found {len(matching_keys)}"
key = key_from_jwk_dict(matching_keys[0])
verifier = JWS()
payload = verifier.verify_compact(id_token_jwt, keys=[key])
id_token_payload = json.loads(payload) if isinstance(payload, (str, bytes)) else payload
assert id_token_payload["iss"] == "http://localhost:8000"
assert CLIENT_ID in id_token_payload["aud"]
assert id_token_payload["nonce"] == nonce
id_token_sub = id_token_payload["sub"]
assert isinstance(id_token_sub, str)
assert len(id_token_sub) > 0
return id_token_sub
async def test_full_authorization_code_flow(client: AsyncClient) -> None:
"""End-to-end test of the complete OIDC Authorization Code flow.
Exercises: discovery, client registration, authorization request,
password login, authorization completion, token exchange, ID token
validation (JWKS + JWT signature), and userinfo.
"""
app = client._transport.app # type: ignore[union-attr]
state = secrets.token_urlsafe(16)
nonce = secrets.token_urlsafe(16)
await _setup_rp_and_user(app)
code = await _login_and_authorize(client, state, nonce)
token_data = await _exchange_token(client, code)
id_token_sub = await _validate_id_token(client, token_data["id_token"], nonce)
# -- Step 6: UserInfo — GET /userinfo with Bearer token --
userinfo_res = await client.get(
"/userinfo",
headers={"Authorization": f"Bearer {token_data['access_token']}"},
)
assert userinfo_res.status_code == 200, f"UserInfo failed: {userinfo_res.status_code} {userinfo_res.text}"
userinfo_data = userinfo_res.json()
assert userinfo_data["email"] == "alice@example.com"
assert userinfo_data["email_verified"] is True
# sub should match between id_token and userinfo
assert userinfo_data["sub"] == id_token_sub
async def test_discovery_advertises_s256_pkce(client: AsyncClient) -> None:
res = await client.get("/.well-known/openid-configuration")
assert res.status_code == 200
assert res.json().get("code_challenge_methods_supported") == ["S256"]
async def test_pkce_s256_flow_succeeds(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
await _setup_rp_and_user(app)
state, nonce = secrets.token_urlsafe(16), secrets.token_urlsafe(16)
verifier, challenge = _pkce_pair()
code = await _login_and_authorize(client, state, nonce, code_challenge=challenge)
token_data = await _exchange_token(client, code, code_verifier=verifier)
assert "access_token" in token_data
async def test_pkce_wrong_verifier_rejected(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
await _setup_rp_and_user(app)
state, nonce = secrets.token_urlsafe(16), secrets.token_urlsafe(16)
_verifier, challenge = _pkce_pair()
code = await _login_and_authorize(client, state, nonce, code_challenge=challenge)
# Exchange with a verifier that does not match the challenge -> rejected.
auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()
res = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"code_verifier": "wrong-" + secrets.token_urlsafe(48),
},
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
},
)
assert res.status_code != 200, f"PKCE verifier mismatch should be rejected, got {res.status_code}: {res.text}"
async def _authorize_with_offline_access(client: AsyncClient, state: str, nonce: str) -> str:
"""Run the auth-code flow requesting offline_access, return the auth code."""
scope = "openid profile offline_access"
# offline_access requires prompt=consent per the OIDC spec.
await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"scope": scope,
"state": state,
"nonce": nonce,
"prompt": "consent",
},
follow_redirects=False,
)
token = await get_csrf_token(client)
await client.post(
"/login/password",
data={"username": "alice", "password": "testpass"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
complete = await client.get("/authorization/complete", follow_redirects=False)
assert "/consent" in complete.headers["location"]
token = await get_csrf_token(client)
consent = await client.post(
"/consent",
data={"action": "allow", "scope": ["openid", "profile", "offline_access"]},
headers={"X-CSRF-Token": token},
follow_redirects=False,
)
return parse_qs(urlparse(consent.headers["location"]).query)["code"][0]
async def test_refresh_token_issued_only_with_offline_access(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
await _setup_rp_and_user(app)
# Without offline_access: no refresh token.
code = await _login_and_authorize(client, secrets.token_urlsafe(8), secrets.token_urlsafe(8))
no_offline = await _exchange_token(client, code)
assert "refresh_token" not in no_offline
async def test_refresh_grant_does_not_mint_id_token(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
await _setup_rp_and_user(app)
code = await _authorize_with_offline_access(client, secrets.token_urlsafe(8), secrets.token_urlsafe(8))
token_data = await _exchange_token(client, code)
assert "refresh_token" in token_data, "offline_access should yield a refresh token"
# Use the refresh token; the response must not contain a new ID token.
auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()
res = await client.post(
"/token",
data={"grant_type": "refresh_token", "refresh_token": token_data["refresh_token"]},
headers={"Authorization": f"Basic {auth_header}", "Content-Type": "application/x-www-form-urlencoded"},
)
assert res.status_code == 200, res.text
refreshed = res.json()
assert "access_token" in refreshed
assert "id_token" not in refreshed