# OIDC Provider Implementation Plan (Phase 5) > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Wire up `idpyoidc` to serve standard OIDC endpoints (discovery, authorization code flow, token exchange, userinfo, JWKS) using the existing authentication and user management infrastructure. **Architecture:** FastAPI routes wrap idpyoidc's endpoint pipeline (`parse_request` → `process_request` → `do_response`). A custom `PorchlightUserInfo` subclass provides claims from our `UserRepository`. The authorization flow stores pending auth requests in the Starlette session; after login, a resume route creates an idpyoidc session and completes the authorization. **Tech Stack:** FastAPI, idpyoidc >=5.0, Starlette SessionMiddleware, aiosqlite. **Quality gate:** `./scripts/check.sh` **Known constraints:** - idpyoidc's `UserInfo.__call__` is synchronous. We populate an in-memory claims cache when creating the idpyoidc session. - idpyoidc stores sessions/grants in memory. Restarting the server invalidates active tokens. - The `Server` constructor is synchronous and does file I/O for key generation. Call it outside the async lifespan context if needed, or ensure key directories exist beforehand. - idpyoidc's `OPConfiguration` auto-wraps a plain dict. Provide the config as a dict for simplicity. - `redirect_uris` in `cdb` must be a list of `(base_url, query_dict)` tuples, not plain strings. - Client secrets must also be added to the keyjar via `server.keyjar.add_symmetric(client_id, secret)`. --- ### Task 1: Config + Claims Mapping + UserInfo Source **Files:** - Modify: `src/fastapi_oidc_op/config.py` - Create: `src/fastapi_oidc_op/oidc/claims.py` - Create: `tests/test_oidc/test_claims.py` **Step 1: Write the failing tests** Create `tests/test_oidc/test_claims.py`: ```python from datetime import UTC, datetime from fastapi_oidc_op.models import User from fastapi_oidc_op.oidc.claims import PorchlightUserInfo, user_to_claims def test_user_to_claims_minimal() -> None: user = User(userid="lusab-bansen", username="alice") claims = user_to_claims(user) assert claims["sub"] == "lusab-bansen" assert claims["preferred_username"] == "alice" assert "email" not in claims # None fields excluded def test_user_to_claims_full() -> None: user = User( userid="lusab-bansen", username="alice", preferred_username="Alice W.", given_name="Alice", family_name="Wonderland", nickname="ali", email="alice@example.com", email_verified=True, phone_number="+1234567890", phone_number_verified=False, picture="https://example.com/alice.jpg", locale="en", updated_at=datetime(2025, 1, 1, tzinfo=UTC), ) claims = user_to_claims(user) assert claims["sub"] == "lusab-bansen" assert claims["preferred_username"] == "Alice W." assert claims["given_name"] == "Alice" assert claims["family_name"] == "Wonderland" assert claims["nickname"] == "ali" assert claims["email"] == "alice@example.com" assert claims["email_verified"] is True assert claims["phone_number"] == "+1234567890" assert claims["phone_number_verified"] is False assert claims["picture"] == "https://example.com/alice.jpg" assert claims["locale"] == "en" assert claims["updated_at"] == int(datetime(2025, 1, 1, tzinfo=UTC).timestamp()) def test_porchlight_userinfo_returns_claims() -> None: userinfo = PorchlightUserInfo() userinfo.set_user_claims("lusab-bansen", {"sub": "lusab-bansen", "email": "a@b.com"}) result = userinfo("lusab-bansen", "client1") assert result["sub"] == "lusab-bansen" assert result["email"] == "a@b.com" def test_porchlight_userinfo_filters_claims() -> None: userinfo = PorchlightUserInfo() userinfo.set_user_claims("lusab-bansen", {"sub": "lusab-bansen", "email": "a@b.com", "name": "Alice"}) result = userinfo("lusab-bansen", "client1", user_info_claims={"email": None}) assert "email" in result assert "name" not in result def test_porchlight_userinfo_unknown_user() -> None: userinfo = PorchlightUserInfo() result = userinfo("unknown", "client1") assert result == {} ``` **Step 2: Run tests to verify they fail** Run: `uv run pytest tests/test_oidc/test_claims.py -v` Expected: FAIL (`ImportError` — module doesn't exist) **Step 3: Implement** Add to `src/fastapi_oidc_op/config.py`: ```python # Signing keys signing_key_path: str = "data/keys" ``` Create `src/fastapi_oidc_op/oidc/__init__.py` (empty file). Create `src/fastapi_oidc_op/oidc/claims.py`: ```python """OIDC claims mapping and UserInfo source.""" from idpyoidc.server.user_info import UserInfo from fastapi_oidc_op.models import User def user_to_claims(user: User) -> dict: """Convert a User model to an OIDC claims dict. Only includes claims that have non-None values. The 'sub' claim always uses the userid (proquint). """ claims: dict = {"sub": user.userid} # preferred_username: use explicit field, fall back to username claims["preferred_username"] = user.preferred_username or user.username optional_fields = { "given_name": user.given_name, "family_name": user.family_name, "nickname": user.nickname, "email": user.email, "email_verified": user.email_verified if user.email else None, "phone_number": user.phone_number, "phone_number_verified": user.phone_number_verified if user.phone_number else None, "picture": user.picture, "locale": user.locale, } for claim_name, value in optional_fields.items(): if value is not None: claims[claim_name] = value # updated_at as Unix timestamp (OIDC spec requires number) if user.updated_at: claims["updated_at"] = int(user.updated_at.timestamp()) return claims class PorchlightUserInfo(UserInfo): """UserInfo source backed by an in-memory claims cache. Claims are populated via set_user_claims() after authentication. idpyoidc calls __call__() synchronously to look up claims. """ def __init__(self, **kwargs): # type: ignore[no-untyped-def] super().__init__(db={}, **kwargs) def set_user_claims(self, user_id: str, claims: dict) -> None: """Populate claims cache for a user.""" self.db[user_id] = claims ``` **Step 4: Run tests to verify they pass** Run: `uv run pytest tests/test_oidc/test_claims.py -v` Expected: PASS **Step 5: Run quality gate** Run: `./scripts/check.sh` Expected: All green **Step 6: Commit** ```bash git add src/fastapi_oidc_op/config.py src/fastapi_oidc_op/oidc/ tests/test_oidc/test_claims.py git commit -m "feat: add OIDC claims mapping and PorchlightUserInfo source" ``` --- ### Task 2: idpyoidc Server Initialization **Files:** - Create: `src/fastapi_oidc_op/oidc/provider.py` - Create: `tests/test_oidc/test_provider.py` **Step 1: Write the failing tests** Create `tests/test_oidc/test_provider.py`: ```python import shutil from pathlib import Path from fastapi_oidc_op.config import Settings from fastapi_oidc_op.oidc.provider import create_oidc_server def test_create_server_has_endpoints() -> None: key_path = Path("test_keys_provider") key_path.mkdir(exist_ok=True) try: settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:", signing_key_path=str(key_path)) server = create_oidc_server(settings) assert "authorization" in server.endpoint assert "token" in server.endpoint assert "userinfo" in server.endpoint assert "provider_config" in server.endpoint finally: shutil.rmtree(key_path, ignore_errors=True) def test_create_server_has_issuer() -> None: key_path = Path("test_keys_issuer") key_path.mkdir(exist_ok=True) try: settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:", signing_key_path=str(key_path)) server = create_oidc_server(settings) assert server.context.issuer == "http://localhost:8000" finally: shutil.rmtree(key_path, ignore_errors=True) def test_create_server_jwks_available() -> None: key_path = Path("test_keys_jwks") key_path.mkdir(exist_ok=True) try: settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:", signing_key_path=str(key_path)) server = create_oidc_server(settings) keys = server.keyjar.export_jwks() assert "keys" in keys assert len(keys["keys"]) > 0 finally: shutil.rmtree(key_path, ignore_errors=True) def test_create_server_userinfo_is_porchlight() -> None: key_path = Path("test_keys_userinfo") key_path.mkdir(exist_ok=True) try: settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:", signing_key_path=str(key_path)) server = create_oidc_server(settings) from fastapi_oidc_op.oidc.claims import PorchlightUserInfo assert isinstance(server.context.userinfo, PorchlightUserInfo) finally: shutil.rmtree(key_path, ignore_errors=True) ``` **Step 2: Run tests to verify they fail** Run: `uv run pytest tests/test_oidc/test_provider.py -v` Expected: FAIL (`ImportError`) **Step 3: Implement** Create `src/fastapi_oidc_op/oidc/provider.py`: ```python """Initialize the idpyoidc OIDC Server.""" from pathlib import Path from idpyoidc.server import Server from fastapi_oidc_op.config import Settings from fastapi_oidc_op.oidc.claims import PorchlightUserInfo def _build_server_config(settings: Settings) -> dict: """Build the idpyoidc server configuration dict.""" key_path = Path(settings.signing_key_path) key_path.mkdir(parents=True, exist_ok=True) return { "issuer": settings.issuer, "key_conf": { "key_defs": [ {"type": "RSA", "use": ["sig"]}, {"type": "EC", "crv": "P-256", "use": ["sig"]}, ], "uri_path": "jwks", "private_path": str(key_path / "private_jwks.json"), "read_only": False, }, "endpoint": { "provider_config": { "path": ".well-known/openid-configuration", "class": "idpyoidc.server.oidc.provider_config.ProviderConfiguration", "kwargs": {"client_authn_method": None}, }, "authorization": { "path": "authorization", "class": "idpyoidc.server.oidc.authorization.Authorization", "kwargs": { "client_authn_method": None, "response_types_supported": ["code"], "response_modes_supported": ["query"], }, }, "token": { "path": "token", "class": "idpyoidc.server.oidc.token.Token", "kwargs": { "client_authn_method": ["client_secret_post", "client_secret_basic"], }, }, "userinfo": { "path": "userinfo", "class": "idpyoidc.server.oidc.userinfo.UserInfo", "kwargs": {}, }, }, "userinfo": { "class": PorchlightUserInfo, "kwargs": {}, }, "authentication": {}, "authz": { "class": "idpyoidc.server.authz.AuthzHandling", "kwargs": { "grant_config": { "usage_rules": { "authorization_code": { "supports_minting": ["access_token", "refresh_token", "id_token"], "max_usage": 1, "expires_in": 120, }, "access_token": {"expires_in": 3600}, "refresh_token": { "supports_minting": ["access_token", "refresh_token", "id_token"], "expires_in": 86400, }, }, "expires_in": 2592000, }, }, }, "token_handler_args": { "jwks_file": str(key_path / "token_jwks.json"), "code": {"kwargs": {"lifetime": 600}}, "token": { "class": "idpyoidc.server.token.jwt_token.JWTToken", "kwargs": { "lifetime": 3600, "add_claims_by_scope": True, "aud": [settings.issuer], }, }, "refresh": { "class": "idpyoidc.server.token.jwt_token.JWTToken", "kwargs": { "lifetime": 86400, "aud": [settings.issuer], }, }, "id_token": { "class": "idpyoidc.server.token.id_token.IDToken", "kwargs": { "lifetime": 3600, "add_claims_by_scope": True, }, }, }, "scopes_to_claims": { "openid": ["sub"], "profile": [ "preferred_username", "given_name", "family_name", "nickname", "picture", "locale", "updated_at", ], "email": ["email", "email_verified"], "phone": ["phone_number", "phone_number_verified"], }, } def create_oidc_server(settings: Settings) -> Server: """Create and configure the idpyoidc Server instance.""" config = _build_server_config(settings) server = Server(conf=config) return server ``` Note: The exact config structure may need adjustments based on what idpyoidc accepts. If the `userinfo` config with a class instance doesn't work, we may need to set `server.context.userinfo` directly after creation: ```python server = Server(conf=config) server.context.userinfo = PorchlightUserInfo() ``` Run the tests and adjust accordingly. **Step 4: Run tests to verify they pass** Run: `uv run pytest tests/test_oidc/test_provider.py -v` Expected: PASS If `server.context.userinfo` is not a `PorchlightUserInfo` instance, the last test will guide us to fix the initialization. **Step 5: Run quality gate** Run: `./scripts/check.sh` Expected: All green **Step 6: Commit** ```bash git add src/fastapi_oidc_op/oidc/provider.py tests/test_oidc/test_provider.py git commit -m "feat: add idpyoidc server initialization" ``` --- ### Task 3: App Integration (Lifespan + Client Registration) **Files:** - Modify: `src/fastapi_oidc_op/app.py` - Modify: `tests/test_app.py` **Step 1: Write the failing tests** Add to `tests/test_app.py` (or create `tests/test_oidc/test_app_integration.py`): ```python from httpx import AsyncClient async def test_oidc_server_on_app_state(client: AsyncClient) -> None: app = client._transport.app # type: ignore[union-attr] assert hasattr(app.state, "oidc_server") assert app.state.oidc_server is not None async def test_manage_client_registered(client: AsyncClient) -> None: app = client._transport.app # type: ignore[union-attr] oidc_server = app.state.oidc_server settings = app.state.settings assert settings.manage_client_id in oidc_server.context.cdb ``` **Step 2: Run tests to verify they fail** Run: `uv run pytest tests/test_oidc/test_app_integration.py -v` Expected: FAIL (`app.state.oidc_server` doesn't exist) **Step 3: Implement** Modify `src/fastapi_oidc_op/app.py` to: 1. Import `create_oidc_server` from `oidc.provider` 2. In the lifespan, after setting up repos and services, create the OIDC server 3. Register the management client in `server.context.cdb` 4. Store the server on `app.state.oidc_server` In the lifespan, after the existing service setup: ```python from fastapi_oidc_op.oidc.provider import create_oidc_server # OIDC Server oidc_server = create_oidc_server(settings) app.state.oidc_server = oidc_server # Register management client import secrets as _secrets manage_secret = settings.session_secret or _secrets.token_hex(32) oidc_server.context.cdb[settings.manage_client_id] = { "client_id": settings.manage_client_id, "client_secret": manage_secret, "redirect_uris": [(f"{settings.issuer}/manage/callback", {})], "response_types_supported": ["code"], "token_endpoint_auth_method": "client_secret_basic", "scope": ["openid", "profile", "email"], "allowed_scopes": ["openid", "profile", "email"], "client_salt": _secrets.token_hex(8), } oidc_server.keyjar.add_symmetric(settings.manage_client_id, manage_secret) ``` **Step 4: Run tests to verify they pass** Run: `uv run pytest tests/test_oidc/test_app_integration.py -v` Expected: PASS **Step 5: Run quality gate** Run: `./scripts/check.sh` Expected: All green (including existing 120 tests) **Step 6: Commit** ```bash git add src/fastapi_oidc_op/app.py tests/test_oidc/test_app_integration.py git commit -m "feat: integrate idpyoidc server into app lifespan" ``` --- ### Task 4: Discovery + JWKS Endpoints **Files:** - Create: `src/fastapi_oidc_op/oidc/endpoints.py` - Modify: `src/fastapi_oidc_op/app.py` - Create: `tests/test_oidc/test_discovery.py` **Step 1: Write the failing tests** Create `tests/test_oidc/test_discovery.py`: ```python from httpx import AsyncClient async def test_discovery_endpoint_returns_metadata(client: AsyncClient) -> None: res = await client.get("/.well-known/openid-configuration") assert res.status_code == 200 data = res.json() assert data["issuer"] == "http://localhost:8000" assert "authorization_endpoint" in data assert "token_endpoint" in data assert "userinfo_endpoint" in data assert "jwks_uri" in data async def test_discovery_response_types_supported(client: AsyncClient) -> None: res = await client.get("/.well-known/openid-configuration") data = res.json() assert "code" in data["response_types_supported"] async def test_discovery_scopes_supported(client: AsyncClient) -> None: res = await client.get("/.well-known/openid-configuration") data = res.json() assert "openid" in data["scopes_supported"] async def test_jwks_endpoint_returns_keys(client: AsyncClient) -> None: res = await client.get("/jwks") assert res.status_code == 200 data = res.json() assert "keys" in data assert len(data["keys"]) > 0 # Each key should have kty and use for key in data["keys"]: assert "kty" in key ``` **Step 2: Run tests to verify they fail** Run: `uv run pytest tests/test_oidc/test_discovery.py -v` Expected: FAIL (404 — routes don't exist) **Step 3: Implement** Create `src/fastapi_oidc_op/oidc/endpoints.py`: ```python """FastAPI routes wrapping idpyoidc endpoint processing.""" from fastapi import APIRouter, Request from fastapi.responses import JSONResponse router = APIRouter(tags=["oidc"]) @router.get("/.well-known/openid-configuration") async def provider_configuration(request: Request) -> JSONResponse: """OIDC Discovery endpoint.""" oidc_server = request.app.state.oidc_server endpoint = oidc_server.get_endpoint("provider_config") parsed = endpoint.parse_request({}) result = endpoint.process_request(parsed) response_info = endpoint.do_response(response_args=result.get("response_args"), request=parsed, **result) return JSONResponse(content=response_info["response"].to_dict()) @router.get("/jwks") async def jwks(request: Request) -> JSONResponse: """Public signing keys (JWK Set).""" oidc_server = request.app.state.oidc_server keys = oidc_server.keyjar.export_jwks() return JSONResponse(content=keys) ``` Modify `src/fastapi_oidc_op/app.py` to include the OIDC router: ```python from fastapi_oidc_op.oidc.endpoints import router as oidc_router # ... app.include_router(oidc_router) ``` Note: The discovery endpoint implementation depends on how `do_response` returns data. If `response_info["response"]` is a string (JSON), parse it. If it's a `Message` object, call `.to_dict()`. Adjust based on test output. **Step 4: Run tests to verify they pass** Run: `uv run pytest tests/test_oidc/test_discovery.py -v` Expected: PASS **Step 5: Run quality gate** Run: `./scripts/check.sh` Expected: All green **Step 6: Commit** ```bash git add src/fastapi_oidc_op/oidc/endpoints.py src/fastapi_oidc_op/app.py tests/test_oidc/test_discovery.py git commit -m "feat: add OIDC discovery and JWKS endpoints" ``` --- ### Task 5: Authorization Endpoint (Redirect to Login) **Files:** - Modify: `src/fastapi_oidc_op/oidc/endpoints.py` - Create: `tests/test_oidc/test_authorization.py` This task implements the first half of the authorization flow: receiving the request from an RP and redirecting to the login page when the user is not authenticated. **Step 1: Write the failing tests** Create `tests/test_oidc/test_authorization.py`: ```python import secrets from httpx import AsyncClient def _register_test_client(client: AsyncClient, client_id: str = "test-rp", redirect_uri: str = "http://localhost:9000/callback") -> str: """Register a test client in the OIDC server. Returns client_secret.""" app = client._transport.app # type: ignore[union-attr] oidc_server = app.state.oidc_server client_secret = secrets.token_hex(16) oidc_server.context.cdb[client_id] = { "client_id": client_id, "client_secret": client_secret, "redirect_uris": [(redirect_uri, {})], "response_types_supported": ["code"], "token_endpoint_auth_method": "client_secret_basic", "scope": ["openid", "profile", "email"], "allowed_scopes": ["openid", "profile", "email"], "client_salt": secrets.token_hex(8), } oidc_server.keyjar.add_symmetric(client_id, client_secret) return client_secret async def test_authorization_redirects_to_login_when_unauthenticated(client: AsyncClient) -> None: _register_test_client(client) res = await client.get( "/authorization", params={ "response_type": "code", "client_id": "test-rp", "redirect_uri": "http://localhost:9000/callback", "scope": "openid", "state": "test-state", "nonce": "test-nonce", }, follow_redirects=False, ) assert res.status_code in (302, 303) assert "/login" in res.headers["location"] async def test_authorization_stores_auth_request_in_session(client: AsyncClient) -> None: _register_test_client(client) # After the redirect, the auth request should be stored in the session # We verify this indirectly: make the auth request, then check that # GET /login works (the session cookie is set by the redirect) res = await client.get( "/authorization", params={ "response_type": "code", "client_id": "test-rp", "redirect_uri": "http://localhost:9000/callback", "scope": "openid", "state": "test-state", "nonce": "test-nonce", }, follow_redirects=False, ) assert res.status_code in (302, 303) # Follow up: login page should be accessible login_res = await client.get("/login") assert login_res.status_code == 200 async def test_authorization_invalid_client_returns_error(client: AsyncClient) -> None: res = await client.get( "/authorization", params={ "response_type": "code", "client_id": "nonexistent", "redirect_uri": "http://evil.com/callback", "scope": "openid", "state": "test-state", }, follow_redirects=False, ) # Should return an error (not redirect to evil.com) assert res.status_code >= 400 or "error" in res.text.lower() ``` **Step 2: Run tests to verify they fail** Run: `uv run pytest tests/test_oidc/test_authorization.py -v` Expected: FAIL (404 — `/authorization` doesn't exist) **Step 3: Implement** Add to `src/fastapi_oidc_op/oidc/endpoints.py`: ```python from fastapi.responses import RedirectResponse, HTMLResponse from urllib.parse import urlencode @router.get("/authorization") async def authorization(request: Request) -> Response: """OIDC Authorization endpoint. Validates the authorization request, then: - If user is authenticated (session): creates grant + redirects to RP - If not authenticated: stores request in session, redirects to /login """ oidc_server = request.app.state.oidc_server endpoint = oidc_server.get_endpoint("authorization") # Build the query string from request params query_params = dict(request.query_params) try: parsed = endpoint.parse_request(query_params) except Exception as exc: return HTMLResponse(f"

Invalid Request

{exc}

", status_code=400) # Check for parse errors if "error" in parsed: error_desc = parsed.get("error_description", parsed["error"]) return HTMLResponse(f"

Error

{error_desc}

", status_code=400) # Check if user is authenticated via Starlette session userid = request.session.get("userid") username = request.session.get("username") if userid and username: # User is authenticated — complete the authorization return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username) # Not authenticated — store auth request in session and redirect to login # Store as a serializable dict request.session["oidc_auth_request"] = query_params return RedirectResponse("/login", status_code=303) ``` The `_complete_authorization` helper will be implemented in Task 7 (authorization complete). For now, make it return a placeholder: ```python async def _complete_authorization(request, oidc_server, endpoint, parsed, userid, username): """Complete the authorization after authentication. Implemented in Task 7.""" return HTMLResponse("Authorization completion not yet implemented", status_code=501) ``` **Step 4: Run tests to verify they pass** Run: `uv run pytest tests/test_oidc/test_authorization.py -v` Expected: PASS **Step 5: Run quality gate** Run: `./scripts/check.sh` Expected: All green **Step 6: Commit** ```bash git add src/fastapi_oidc_op/oidc/endpoints.py tests/test_oidc/test_authorization.py git commit -m "feat: add authorization endpoint with login redirect" ``` --- ### Task 6: Login Route Integration (Resume OIDC Flow) **Files:** - Modify: `src/fastapi_oidc_op/authn/routes.py` - Create: `tests/test_oidc/test_login_oidc_redirect.py` After a successful login, if there's a pending OIDC authorization request in the session, the login routes should redirect to `/authorization/complete` instead of `/manage/credentials`. **Step 1: Write the failing tests** Create `tests/test_oidc/test_login_oidc_redirect.py`: ```python import secrets from datetime import UTC, datetime from argon2 import PasswordHasher from httpx import AsyncClient from fastapi_oidc_op.authn.password import PasswordService from fastapi_oidc_op.models import PasswordCredential, User def _register_test_client(client: AsyncClient) -> None: app = client._transport.app # type: ignore[union-attr] oidc_server = app.state.oidc_server oidc_server.context.cdb["test-rp"] = { "client_id": "test-rp", "client_secret": "test-secret", "redirect_uris": [("http://localhost:9000/callback", {})], "response_types_supported": ["code"], "token_endpoint_auth_method": "client_secret_basic", "scope": ["openid", "profile", "email"], "allowed_scopes": ["openid", "profile", "email"], "client_salt": secrets.token_hex(8), } oidc_server.keyjar.add_symmetric("test-rp", "test-secret") async def _create_user(client: AsyncClient) -> None: app = client._transport.app # type: ignore[union-attr] user_repo = app.state.user_repo cred_repo = app.state.credential_repo user = User(userid="lusab-bansen", username="alice", created_at=datetime.now(UTC), updated_at=datetime.now(UTC)) await user_repo.create(user) svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) async def test_login_with_pending_oidc_redirects_to_authorization_complete(client: AsyncClient) -> None: _register_test_client(client) await _create_user(client) # Step 1: Start OIDC authorization (stores request in session, redirects to /login) auth_res = await client.get( "/authorization", params={ "response_type": "code", "client_id": "test-rp", "redirect_uri": "http://localhost:9000/callback", "scope": "openid", "state": "test-state", "nonce": "test-nonce", }, follow_redirects=False, ) assert auth_res.status_code in (302, 303) # Step 2: Login via password login_res = await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, headers={"HX-Request": "true"}, ) assert login_res.status_code == 200 # Should redirect to /authorization/complete, not /manage/credentials redirect_target = login_res.headers.get("HX-Redirect", "") assert "/authorization/complete" in redirect_target async def test_login_without_pending_oidc_redirects_to_manage(client: AsyncClient) -> None: await _create_user(client) login_res = await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, headers={"HX-Request": "true"}, ) assert login_res.status_code == 200 redirect_target = login_res.headers.get("HX-Redirect", "") assert redirect_target == "/manage/credentials" ``` **Step 2: Run tests to verify they fail** Run: `uv run pytest tests/test_oidc/test_login_oidc_redirect.py -v` Expected: FAIL (login always redirects to `/manage/credentials`) **Step 3: Implement** Modify `src/fastapi_oidc_op/authn/routes.py`: Add a helper function: ```python def _login_redirect_target(request: Request) -> str: """Determine where to redirect after successful login. If there's a pending OIDC authorization request, redirect to complete it. Otherwise, redirect to credential management. """ if "oidc_auth_request" in request.session: return "/authorization/complete" return "/manage/credentials" ``` In `login_password`, change: ```python response.headers["HX-Redirect"] = "/manage/credentials" ``` to: ```python response.headers["HX-Redirect"] = _login_redirect_target(request) ``` In `login_webauthn_complete`, change the same line: ```python response.headers["HX-Redirect"] = "/manage/credentials" ``` to: ```python response.headers["HX-Redirect"] = _login_redirect_target(request) ``` **Step 4: Run tests to verify they pass** Run: `uv run pytest tests/test_oidc/test_login_oidc_redirect.py -v` Expected: PASS Also run existing login tests to make sure nothing broke: Run: `uv run pytest tests/test_auth_routes/ -v` Expected: All PASS **Step 5: Run quality gate** Run: `./scripts/check.sh` Expected: All green **Step 6: Commit** ```bash git add src/fastapi_oidc_op/authn/routes.py tests/test_oidc/test_login_oidc_redirect.py git commit -m "feat: redirect to OIDC authorization after login when pending" ``` --- ### Task 7: Authorization Complete + Token Endpoint **Files:** - Modify: `src/fastapi_oidc_op/oidc/endpoints.py` - Create: `tests/test_oidc/test_token.py` This is the core task: completing the authorization flow and implementing the token endpoint. The `/authorization/complete` route creates an idpyoidc session, mints an authorization code, and redirects to the RP. The token endpoint exchanges the code for tokens. **Step 1: Write the failing tests** Create `tests/test_oidc/test_token.py`: ```python import secrets from base64 import b64encode from datetime import UTC, datetime from urllib.parse import parse_qs, urlparse from argon2 import PasswordHasher from httpx import AsyncClient from fastapi_oidc_op.authn.password import PasswordService from fastapi_oidc_op.models import PasswordCredential, User def _register_test_client(client: AsyncClient) -> str: app = client._transport.app # type: ignore[union-attr] oidc_server = app.state.oidc_server client_secret = "test-secret-123" oidc_server.context.cdb["test-rp"] = { "client_id": "test-rp", "client_secret": client_secret, "redirect_uris": [("http://localhost:9000/callback", {})], "response_types_supported": ["code"], "token_endpoint_auth_method": "client_secret_basic", "scope": ["openid", "profile", "email"], "allowed_scopes": ["openid", "profile", "email"], "client_salt": secrets.token_hex(8), } oidc_server.keyjar.add_symmetric("test-rp", client_secret) return client_secret async def _create_user_and_login(client: AsyncClient) -> str: """Create user, log in, return userid.""" app = client._transport.app # type: ignore[union-attr] user_repo = app.state.user_repo cred_repo = app.state.credential_repo user = User( userid="lusab-bansen", username="alice", email="alice@example.com", email_verified=True, created_at=datetime.now(UTC), updated_at=datetime.now(UTC), ) await user_repo.create(user) svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, headers={"HX-Request": "true"}, ) return user.userid async def _get_authorization_code(client: AsyncClient) -> str: """Run full auth flow and extract the authorization code.""" client_secret = _register_test_client(client) # Start authorization auth_res = await client.get( "/authorization", params={ "response_type": "code", "client_id": "test-rp", "redirect_uri": "http://localhost:9000/callback", "scope": "openid profile email", "state": "test-state", "nonce": "test-nonce", }, follow_redirects=False, ) assert auth_res.status_code in (302, 303) # Create user and log in await _create_user_and_login(client) # Start authorization again (now authenticated) auth_res2 = await client.get( "/authorization", params={ "response_type": "code", "client_id": "test-rp", "redirect_uri": "http://localhost:9000/callback", "scope": "openid profile email", "state": "test-state", "nonce": "test-nonce", }, follow_redirects=False, ) assert auth_res2.status_code in (302, 303) # Parse the redirect URL to extract the code location = auth_res2.headers["location"] parsed = urlparse(location) params = parse_qs(parsed.query) assert "code" in params, f"No code in redirect: {location}" return params["code"][0] async def test_authorization_complete_redirects_with_code(client: AsyncClient) -> None: code = await _get_authorization_code(client) assert len(code) > 0 async def test_token_endpoint_exchanges_code(client: AsyncClient) -> None: code = await _get_authorization_code(client) client_secret = "test-secret-123" # Exchange code for tokens auth_header = b64encode(f"test-rp:{client_secret}".encode()).decode() token_res = await client.post( "/token", data={ "grant_type": "authorization_code", "code": code, "redirect_uri": "http://localhost:9000/callback", }, headers={ "Authorization": f"Basic {auth_header}", "Content-Type": "application/x-www-form-urlencoded", }, ) assert token_res.status_code == 200 data = token_res.json() assert "access_token" in data assert "id_token" in data assert data["token_type"].lower() == "bearer" async def test_token_endpoint_invalid_code_returns_error(client: AsyncClient) -> None: _register_test_client(client) client_secret = "test-secret-123" auth_header = b64encode(f"test-rp:{client_secret}".encode()).decode() token_res = await client.post( "/token", data={ "grant_type": "authorization_code", "code": "invalid-code", "redirect_uri": "http://localhost:9000/callback", }, headers={ "Authorization": f"Basic {auth_header}", "Content-Type": "application/x-www-form-urlencoded", }, ) assert token_res.status_code == 400 or "error" in token_res.json() ``` **Step 2: Run tests to verify they fail** Run: `uv run pytest tests/test_oidc/test_token.py -v` Expected: FAIL **Step 3: Implement** Modify `src/fastapi_oidc_op/oidc/endpoints.py`: Replace the placeholder `_complete_authorization` with the real implementation, and add the token and authorization complete endpoints: ```python import json from urllib.parse import urlencode from fastapi import APIRouter, Request, Response from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from idpyoidc.server.authn_event import create_authn_event from idpyoidc.time_util import utc_time_sans_frac from fastapi_oidc_op.oidc.claims import PorchlightUserInfo, user_to_claims router = APIRouter(tags=["oidc"]) async def _complete_authorization( request: Request, oidc_server, endpoint, parsed_request, userid: str, username: str, ) -> Response: """Complete OIDC authorization after user authentication. Creates an idpyoidc session, populates the userinfo cache, and returns a redirect to the RP with an authorization code. """ # Populate userinfo cache user_repo = request.app.state.user_repo user = await user_repo.get_by_userid(userid) if user is None: return HTMLResponse("

Error

User not found

", status_code=400) userinfo: PorchlightUserInfo = oidc_server.context.userinfo claims = user_to_claims(user) userinfo.set_user_claims(username, claims) # Create idpyoidc session session_id = endpoint.create_session( request=parsed_request, user_id=username, acr="urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport", time_stamp=utc_time_sans_frac(), authn_method=None, ) # Complete authorization (mints code, builds redirect) result = endpoint.authz_part2(request=parsed_request, session_id=session_id) if "error" in result: error_desc = result.get("error_description", result["error"]) return HTMLResponse(f"

Error

{error_desc}

", status_code=400) resp_info = endpoint.do_response( response_args=result.get("response_args"), request=parsed_request, **result, ) # The response is the redirect URL redirect_url = resp_info["response"] if hasattr(redirect_url, "to_dict"): # It's a Message object — build URL from return_uri + params params = redirect_url.to_dict() return_uri = result.get("return_uri", parsed_request.get("redirect_uri", "")) redirect_url = f"{return_uri}?{urlencode(params)}" return RedirectResponse(str(redirect_url), status_code=303) @router.get("/authorization/complete") async def authorization_complete(request: Request) -> Response: """Resume OIDC authorization after login. Called after the user authenticates via /login. Pops the pending auth request from the session and completes the authorization. """ oidc_server = request.app.state.oidc_server endpoint = oidc_server.get_endpoint("authorization") # Pop the stored auth request auth_request_params = request.session.pop("oidc_auth_request", None) if auth_request_params is None: return HTMLResponse("

Error

No pending authorization request

", status_code=400) userid = request.session.get("userid") username = request.session.get("username") if not userid or not username: return RedirectResponse("/login", status_code=303) # Re-parse the original authorization request try: parsed = endpoint.parse_request(auth_request_params) except Exception as exc: return HTMLResponse(f"

Error

{exc}

", status_code=400) if "error" in parsed: error_desc = parsed.get("error_description", parsed["error"]) return HTMLResponse(f"

Error

{error_desc}

", status_code=400) return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username) @router.post("/token") async def token_endpoint(request: Request) -> JSONResponse: """OIDC Token endpoint.""" oidc_server = request.app.state.oidc_server endpoint = oidc_server.get_endpoint("token") # Read form body body = await request.body() body_str = body.decode("utf-8") # Build http_info for client authentication http_info = { "headers": dict(request.headers), "url": str(request.url), } try: parsed = endpoint.parse_request(body_str, http_info=http_info) except Exception as exc: return JSONResponse({"error": "invalid_request", "error_description": str(exc)}, status_code=400) if "error" in parsed: return JSONResponse(parsed.to_dict(), status_code=400) result = endpoint.process_request(parsed) if "error" in result: return JSONResponse(result, status_code=400) resp_info = endpoint.do_response(response_args=result.get("response_args"), request=parsed, **result) # Parse the response response_data = resp_info["response"] if isinstance(response_data, str): response_data = json.loads(response_data) elif hasattr(response_data, "to_dict"): response_data = response_data.to_dict() return JSONResponse(response_data) ``` Note: The exact way idpyoidc returns responses (as strings, Message objects, or dicts) may vary. The implementation should handle all cases. Adjust based on test output. Also note: `create_session` with `authn_method=None` may fail. If so, create a minimal `UserAuthnMethod` subclass or pass a different value. The test output will guide the fix. **Step 4: Run tests to verify they pass** Run: `uv run pytest tests/test_oidc/test_token.py -v` Expected: PASS **Step 5: Run quality gate** Run: `./scripts/check.sh` Expected: All green **Step 6: Commit** ```bash git add src/fastapi_oidc_op/oidc/endpoints.py tests/test_oidc/test_token.py git commit -m "feat: add authorization complete and token endpoints" ``` --- ### Task 8: UserInfo Endpoint **Files:** - Modify: `src/fastapi_oidc_op/oidc/endpoints.py` - Create: `tests/test_oidc/test_userinfo.py` **Step 1: Write the failing tests** Create `tests/test_oidc/test_userinfo.py`: ```python import secrets from base64 import b64encode from datetime import UTC, datetime from urllib.parse import parse_qs, urlparse from argon2 import PasswordHasher from httpx import AsyncClient from fastapi_oidc_op.authn.password import PasswordService from fastapi_oidc_op.models import PasswordCredential, User def _register_test_client(client: AsyncClient) -> str: app = client._transport.app # type: ignore[union-attr] oidc_server = app.state.oidc_server client_secret = "test-secret-456" oidc_server.context.cdb["test-rp"] = { "client_id": "test-rp", "client_secret": client_secret, "redirect_uris": [("http://localhost:9000/callback", {})], "response_types_supported": ["code"], "token_endpoint_auth_method": "client_secret_basic", "scope": ["openid", "profile", "email"], "allowed_scopes": ["openid", "profile", "email"], "client_salt": secrets.token_hex(8), } oidc_server.keyjar.add_symmetric("test-rp", client_secret) return client_secret async def _get_access_token(client: AsyncClient) -> str: """Full OIDC flow to get an access token.""" client_secret = _register_test_client(client) app = client._transport.app # type: ignore[union-attr] # Create user user_repo = app.state.user_repo cred_repo = app.state.credential_repo user = User( userid="lusab-bansen", username="alice", email="alice@example.com", email_verified=True, given_name="Alice", family_name="Wonderland", created_at=datetime.now(UTC), updated_at=datetime.now(UTC), ) await user_repo.create(user) svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) # Start auth flow await client.get( "/authorization", params={ "response_type": "code", "client_id": "test-rp", "redirect_uri": "http://localhost:9000/callback", "scope": "openid profile email", "state": "test-state", "nonce": "test-nonce", }, follow_redirects=False, ) # Login await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, headers={"HX-Request": "true"}, ) # Complete authorization auth_res = await client.get("/authorization/complete", follow_redirects=False) location = auth_res.headers["location"] params = parse_qs(urlparse(location).query) code = params["code"][0] # Exchange code for tokens auth_header = b64encode(f"test-rp:{client_secret}".encode()).decode() token_res = await client.post( "/token", data={ "grant_type": "authorization_code", "code": code, "redirect_uri": "http://localhost:9000/callback", }, headers={ "Authorization": f"Basic {auth_header}", "Content-Type": "application/x-www-form-urlencoded", }, ) return token_res.json()["access_token"] async def test_userinfo_returns_claims(client: AsyncClient) -> None: access_token = await _get_access_token(client) res = await client.get( "/userinfo", headers={"Authorization": f"Bearer {access_token}"}, ) assert res.status_code == 200 data = res.json() assert data["sub"] == "lusab-bansen" async def test_userinfo_includes_email_claims(client: AsyncClient) -> None: access_token = await _get_access_token(client) res = await client.get( "/userinfo", headers={"Authorization": f"Bearer {access_token}"}, ) data = res.json() assert data.get("email") == "alice@example.com" assert data.get("email_verified") is True async def test_userinfo_invalid_token_returns_error(client: AsyncClient) -> None: res = await client.get( "/userinfo", headers={"Authorization": "Bearer invalid-token"}, ) assert res.status_code in (401, 403) ``` **Step 2: Run tests to verify they fail** Run: `uv run pytest tests/test_oidc/test_userinfo.py -v` Expected: FAIL **Step 3: Implement** Add to `src/fastapi_oidc_op/oidc/endpoints.py`: ```python @router.get("/userinfo") @router.post("/userinfo") async def userinfo_endpoint(request: Request) -> Response: """OIDC UserInfo endpoint.""" oidc_server = request.app.state.oidc_server endpoint = oidc_server.get_endpoint("userinfo") # Extract access token from Authorization header http_info = { "headers": dict(request.headers), "url": str(request.url), } # For POST requests, include the body if request.method == "POST": body = await request.body() request_data = body.decode("utf-8") else: request_data = {} try: parsed = endpoint.parse_request(request_data, http_info=http_info) except Exception: return JSONResponse({"error": "invalid_token"}, status_code=401) if "error" in parsed: return JSONResponse(parsed.to_dict() if hasattr(parsed, "to_dict") else parsed, status_code=401) result = endpoint.process_request(parsed) if "error" in result: return JSONResponse(result, status_code=401) resp_info = endpoint.do_response(response_args=result.get("response_args"), request=parsed, **result) response_data = resp_info["response"] if isinstance(response_data, str): response_data = json.loads(response_data) elif hasattr(response_data, "to_dict"): response_data = response_data.to_dict() return JSONResponse(response_data) ``` **Step 4: Run tests to verify they pass** Run: `uv run pytest tests/test_oidc/test_userinfo.py -v` Expected: PASS **Step 5: Run quality gate** Run: `./scripts/check.sh` Expected: All green **Step 6: Commit** ```bash git add src/fastapi_oidc_op/oidc/endpoints.py tests/test_oidc/test_userinfo.py git commit -m "feat: add OIDC userinfo endpoint" ``` --- ### Task 9: End-to-End Flow Test **Files:** - Create: `tests/test_oidc/test_e2e_flow.py` A comprehensive test that exercises the entire OIDC flow from start to finish, including verifying the ID token. **Step 1: Write the test** Create `tests/test_oidc/test_e2e_flow.py`: ```python """End-to-end OIDC Authorization Code flow test.""" import json import secrets from base64 import b64encode from datetime import UTC, datetime from urllib.parse import parse_qs, urlparse from argon2 import PasswordHasher from cryptojwt.jwk.jwk import key_from_jwk_dict from cryptojwt.jws.jws import JWS from httpx import AsyncClient from fastapi_oidc_op.authn.password import PasswordService from fastapi_oidc_op.models import PasswordCredential, User async def test_full_authorization_code_flow(client: AsyncClient) -> None: """Complete OIDC Authorization Code flow: authorize → login → code → token → userinfo → validate ID token.""" app = client._transport.app # type: ignore[union-attr] oidc_server = app.state.oidc_server user_repo = app.state.user_repo cred_repo = app.state.credential_repo # --- Setup: register client --- client_secret = "e2e-secret" oidc_server.context.cdb["e2e-rp"] = { "client_id": "e2e-rp", "client_secret": client_secret, "redirect_uris": [("http://localhost:9000/callback", {})], "response_types_supported": ["code"], "token_endpoint_auth_method": "client_secret_basic", "scope": ["openid", "profile", "email"], "allowed_scopes": ["openid", "profile", "email"], "client_salt": secrets.token_hex(8), } oidc_server.keyjar.add_symmetric("e2e-rp", client_secret) # --- Setup: create user --- user = User( userid="lusab-bansen", username="alice", given_name="Alice", family_name="Wonderland", email="alice@example.com", email_verified=True, created_at=datetime.now(UTC), updated_at=datetime.now(UTC), ) await user_repo.create(user) svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) # --- Step 1: Authorization request (unauthenticated) --- auth_res = await client.get( "/authorization", params={ "response_type": "code", "client_id": "e2e-rp", "redirect_uri": "http://localhost:9000/callback", "scope": "openid profile email", "state": "e2e-state", "nonce": "e2e-nonce", }, follow_redirects=False, ) assert auth_res.status_code in (302, 303), f"Expected redirect, got {auth_res.status_code}" assert "/login" in auth_res.headers["location"] # --- Step 2: Password login --- login_res = await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, headers={"HX-Request": "true"}, ) assert login_res.status_code == 200 assert "/authorization/complete" in login_res.headers.get("HX-Redirect", "") # --- Step 3: Complete authorization --- complete_res = await client.get("/authorization/complete", follow_redirects=False) assert complete_res.status_code in (302, 303) location = complete_res.headers["location"] assert location.startswith("http://localhost:9000/callback") params = parse_qs(urlparse(location).query) assert "code" in params assert params.get("state", [None])[0] == "e2e-state" code = params["code"][0] # --- Step 4: Token exchange --- auth_header = b64encode(f"e2e-rp:{client_secret}".encode()).decode() token_res = await client.post( "/token", data={ "grant_type": "authorization_code", "code": code, "redirect_uri": "http://localhost:9000/callback", }, headers={ "Authorization": f"Basic {auth_header}", "Content-Type": "application/x-www-form-urlencoded", }, ) assert token_res.status_code == 200 token_data = token_res.json() assert "access_token" in token_data assert "id_token" in token_data assert token_data["token_type"].lower() == "bearer" # --- Step 5: Validate ID token --- id_token_raw = token_data["id_token"] # Get JWKS jwks_res = await client.get("/jwks") jwks = jwks_res.json() # Verify the JWT signature jws = JWS() _keys = [key_from_jwk_dict(k) for k in jwks["keys"]] id_token_payload = jws.verify_compact(id_token_raw, _keys) assert id_token_payload["iss"] == "http://localhost:8000" assert id_token_payload["sub"] == "lusab-bansen" assert "e2e-rp" in id_token_payload["aud"] # --- Step 6: UserInfo --- userinfo_res = await client.get( "/userinfo", headers={"Authorization": f"Bearer {token_data['access_token']}"}, ) assert userinfo_res.status_code == 200 userinfo = userinfo_res.json() assert userinfo["sub"] == "lusab-bansen" assert userinfo.get("email") == "alice@example.com" ``` **Step 2: Run the test** Run: `uv run pytest tests/test_oidc/test_e2e_flow.py -v` Expected: PASS (if Tasks 1-8 are all correct) If the test fails, this is where integration issues surface. Debug and fix based on the error output. Common issues: - idpyoidc's `create_session` needs `authn_method` to not be None - Response format mismatches (string vs Message vs dict) - `user_id` in idpyoidc session vs `userid` in our model (the session uses `username` as the `user_id` key for idpyoidc, not `userid`) - Cookie handling between requests **Step 3: Commit** ```bash git add tests/test_oidc/test_e2e_flow.py git commit -m "test: add end-to-end OIDC authorization code flow test" ``` --- ### Task 10: Full Quality Gate **Step 1: Run full quality checks** Run: `./scripts/check.sh` Expected: All green (formatting, linting, type checking, all tests pass) **Step 2: Fix any issues** If ruff or ty reports issues, fix them. Common fixes: - Type ignore comments for idpyoidc's untyped APIs - Import sorting - Unused imports **Step 3: Commit any fixes** ```bash git add -A git diff --cached --quiet || git commit -m "style: fix formatting and lint issues" ```