diff --git a/tests/test_oidc/test_e2e_flow.py b/tests/test_oidc/test_e2e_flow.py new file mode 100644 index 0000000..01d9d8c --- /dev/null +++ b/tests/test_oidc/test_e2e_flow.py @@ -0,0 +1,175 @@ +import json +import secrets +from base64 import b64encode +from datetime import UTC, datetime +from urllib.parse import parse_qs, urlparse + +from argon2 import PasswordHasher +from cryptojwt.jwk.jwk import key_from_jwk_dict +from cryptojwt.jws.jws import JWS +from httpx import AsyncClient + +from fastapi_oidc_op.authn.password import PasswordService +from fastapi_oidc_op.models import PasswordCredential, User + + +async def test_full_authorization_code_flow(client: AsyncClient) -> None: + """End-to-end test of the complete OIDC Authorization Code flow. + + Exercises: discovery, client registration, authorization request, + password login, authorization completion, token exchange, ID token + validation (JWKS + JWT signature), and userinfo. + """ + # -- Setup: register RP client and create user -- + app = client._transport.app # type: ignore[union-attr] + oidc_server = app.state.oidc_server + user_repo = app.state.user_repo + cred_repo = app.state.credential_repo + + client_id = "e2e-rp" + client_secret = "e2e-secret-0123456789abcdef" # 30+ chars + redirect_uri = "http://localhost:9000/callback" + state = secrets.token_urlsafe(16) + nonce = secrets.token_urlsafe(16) + + oidc_server.context.cdb[client_id] = { + "client_id": client_id, + "client_secret": client_secret, + "redirect_uris": [(redirect_uri, {})], + "response_types_supported": ["code"], + "token_endpoint_auth_method": "client_secret_basic", + "scope": ["openid", "profile", "email"], + "allowed_scopes": ["openid", "profile", "email"], + "client_salt": secrets.token_hex(8), + } + oidc_server.keyjar.add_symmetric(client_id, client_secret) + + user = User( + userid="lusab-bansen", + username="alice", + email="alice@example.com", + email_verified=True, + given_name="Alice", + family_name="Wonderland", + created_at=datetime.now(UTC), + updated_at=datetime.now(UTC), + ) + await user_repo.create(user) + + svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) + await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) + + # -- Step 1: Authorization request (unauthenticated) → redirect to /login -- + auth_res = await client.get( + "/authorization", + params={ + "response_type": "code", + "client_id": client_id, + "redirect_uri": redirect_uri, + "scope": "openid profile email", + "state": state, + "nonce": nonce, + }, + follow_redirects=False, + ) + assert auth_res.status_code in (302, 303), f"Expected redirect to /login, got {auth_res.status_code}" + assert "/login" in auth_res.headers["location"] + + # -- Step 2: Password login → HX-Redirect to /authorization/complete -- + login_res = await client.post( + "/login/password", + data={"username": "alice", "password": "testpass"}, + headers={"HX-Request": "true"}, + ) + assert login_res.status_code == 200 + hx_redirect = login_res.headers.get("HX-Redirect", "") + assert "/authorization/complete" in hx_redirect, ( + f"Expected HX-Redirect to /authorization/complete, got '{hx_redirect}'" + ) + + # -- Step 3: Complete authorization → redirect to callback with code + state -- + complete_res = await client.get("/authorization/complete", follow_redirects=False) + assert complete_res.status_code in (302, 303), ( + f"Expected redirect to callback, got {complete_res.status_code}: {complete_res.text}" + ) + + location = complete_res.headers["location"] + parsed = urlparse(location) + assert parsed.netloc == "localhost:9000" + assert parsed.path == "/callback" + + callback_params = parse_qs(parsed.query) + assert "code" in callback_params, f"No code in redirect: {location}" + assert callback_params["state"] == [state], f"State mismatch: expected {state}, got {callback_params.get('state')}" + + code = callback_params["code"][0] + assert len(code) > 0 + + # -- Step 4: Token exchange → access_token, id_token, token_type -- + auth_header = b64encode(f"{client_id}:{client_secret}".encode()).decode() + token_res = await client.post( + "/token", + data={ + "grant_type": "authorization_code", + "code": code, + "redirect_uri": redirect_uri, + }, + headers={ + "Authorization": f"Basic {auth_header}", + "Content-Type": "application/x-www-form-urlencoded", + }, + ) + assert token_res.status_code == 200, f"Token endpoint failed: {token_res.text}" + + token_data = token_res.json() + assert "access_token" in token_data + assert "id_token" in token_data + assert token_data["token_type"].lower() == "bearer" + + access_token = token_data["access_token"] + id_token_jwt = token_data["id_token"] + + # -- Step 5: Validate ID token — fetch JWKS, verify JWT signature -- + jwks_res = await client.get("/jwks") + assert jwks_res.status_code == 200 + jwks = jwks_res.json() + assert "keys" in jwks + assert len(jwks["keys"]) > 0 + + # Decode the ID token header to find the key ID + id_token_header = json.loads(JWS()._decode(id_token_jwt.split(".")[0])) + kid = id_token_header.get("kid") + + # Find the matching key in JWKS + matching_keys = [k for k in jwks["keys"] if k.get("kid") == kid] + assert len(matching_keys) == 1, f"Expected 1 matching key for kid={kid}, found {len(matching_keys)}" + + key = key_from_jwk_dict(matching_keys[0]) + + # Verify JWT signature and decode payload + verifier = JWS() + payload = verifier.verify_compact(id_token_jwt, keys=[key]) + id_token_payload = json.loads(payload) if isinstance(payload, (str, bytes)) else payload + + assert id_token_payload["iss"] == "http://localhost:8000" + assert client_id in id_token_payload["aud"] + assert id_token_payload["nonce"] == nonce + + # sub is a pairwise identifier — just verify it exists and is non-empty + id_token_sub = id_token_payload["sub"] + assert isinstance(id_token_sub, str) + assert len(id_token_sub) > 0 + + # -- Step 6: UserInfo — GET /userinfo with Bearer token -- + userinfo_res = await client.get( + "/userinfo", + headers={"Authorization": f"Bearer {access_token}"}, + ) + assert userinfo_res.status_code == 200, f"UserInfo failed: {userinfo_res.status_code} {userinfo_res.text}" + + userinfo_data = userinfo_res.json() + assert userinfo_data["email"] == "alice@example.com" + assert userinfo_data["email_verified"] is True + + # sub should match between id_token and userinfo + assert userinfo_data["sub"] == id_token_sub