porchlight/docs/plans/2026-02-16-oidc-provider-plan.md

1705 lines
54 KiB
Markdown

# OIDC Provider Implementation Plan (Phase 5)
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Wire up `idpyoidc` to serve standard OIDC endpoints (discovery, authorization code flow, token exchange, userinfo, JWKS) using the existing authentication and user management infrastructure.
**Architecture:** FastAPI routes wrap idpyoidc's endpoint pipeline (`parse_request``process_request``do_response`). A custom `PorchlightUserInfo` subclass provides claims from our `UserRepository`. The authorization flow stores pending auth requests in the Starlette session; after login, a resume route creates an idpyoidc session and completes the authorization.
**Tech Stack:** FastAPI, idpyoidc >=5.0, Starlette SessionMiddleware, aiosqlite.
**Quality gate:** `./scripts/check.sh`
**Known constraints:**
- idpyoidc's `UserInfo.__call__` is synchronous. We populate an in-memory claims cache when creating the idpyoidc session.
- idpyoidc stores sessions/grants in memory. Restarting the server invalidates active tokens.
- The `Server` constructor is synchronous and does file I/O for key generation. Call it outside the async lifespan context if needed, or ensure key directories exist beforehand.
- idpyoidc's `OPConfiguration` auto-wraps a plain dict. Provide the config as a dict for simplicity.
- `redirect_uris` in `cdb` must be a list of `(base_url, query_dict)` tuples, not plain strings.
- Client secrets must also be added to the keyjar via `server.keyjar.add_symmetric(client_id, secret)`.
---
### Task 1: Config + Claims Mapping + UserInfo Source
**Files:**
- Modify: `src/fastapi_oidc_op/config.py`
- Create: `src/fastapi_oidc_op/oidc/claims.py`
- Create: `tests/test_oidc/test_claims.py`
**Step 1: Write the failing tests**
Create `tests/test_oidc/test_claims.py`:
```python
from datetime import UTC, datetime
from fastapi_oidc_op.models import User
from fastapi_oidc_op.oidc.claims import PorchlightUserInfo, user_to_claims
def test_user_to_claims_minimal() -> None:
user = User(userid="lusab-bansen", username="alice")
claims = user_to_claims(user)
assert claims["sub"] == "lusab-bansen"
assert claims["preferred_username"] == "alice"
assert "email" not in claims # None fields excluded
def test_user_to_claims_full() -> None:
user = User(
userid="lusab-bansen",
username="alice",
preferred_username="Alice W.",
given_name="Alice",
family_name="Wonderland",
nickname="ali",
email="alice@example.com",
email_verified=True,
phone_number="+1234567890",
phone_number_verified=False,
picture="https://example.com/alice.jpg",
locale="en",
updated_at=datetime(2025, 1, 1, tzinfo=UTC),
)
claims = user_to_claims(user)
assert claims["sub"] == "lusab-bansen"
assert claims["preferred_username"] == "Alice W."
assert claims["given_name"] == "Alice"
assert claims["family_name"] == "Wonderland"
assert claims["nickname"] == "ali"
assert claims["email"] == "alice@example.com"
assert claims["email_verified"] is True
assert claims["phone_number"] == "+1234567890"
assert claims["phone_number_verified"] is False
assert claims["picture"] == "https://example.com/alice.jpg"
assert claims["locale"] == "en"
assert claims["updated_at"] == int(datetime(2025, 1, 1, tzinfo=UTC).timestamp())
def test_porchlight_userinfo_returns_claims() -> None:
userinfo = PorchlightUserInfo()
userinfo.set_user_claims("lusab-bansen", {"sub": "lusab-bansen", "email": "a@b.com"})
result = userinfo("lusab-bansen", "client1")
assert result["sub"] == "lusab-bansen"
assert result["email"] == "a@b.com"
def test_porchlight_userinfo_filters_claims() -> None:
userinfo = PorchlightUserInfo()
userinfo.set_user_claims("lusab-bansen", {"sub": "lusab-bansen", "email": "a@b.com", "name": "Alice"})
result = userinfo("lusab-bansen", "client1", user_info_claims={"email": None})
assert "email" in result
assert "name" not in result
def test_porchlight_userinfo_unknown_user() -> None:
userinfo = PorchlightUserInfo()
result = userinfo("unknown", "client1")
assert result == {}
```
**Step 2: Run tests to verify they fail**
Run: `uv run pytest tests/test_oidc/test_claims.py -v`
Expected: FAIL (`ImportError` — module doesn't exist)
**Step 3: Implement**
Add to `src/fastapi_oidc_op/config.py`:
```python
# Signing keys
signing_key_path: str = "data/keys"
```
Create `src/fastapi_oidc_op/oidc/__init__.py` (empty file).
Create `src/fastapi_oidc_op/oidc/claims.py`:
```python
"""OIDC claims mapping and UserInfo source."""
from idpyoidc.server.user_info import UserInfo
from fastapi_oidc_op.models import User
def user_to_claims(user: User) -> dict:
"""Convert a User model to an OIDC claims dict.
Only includes claims that have non-None values.
The 'sub' claim always uses the userid (proquint).
"""
claims: dict = {"sub": user.userid}
# preferred_username: use explicit field, fall back to username
claims["preferred_username"] = user.preferred_username or user.username
optional_fields = {
"given_name": user.given_name,
"family_name": user.family_name,
"nickname": user.nickname,
"email": user.email,
"email_verified": user.email_verified if user.email else None,
"phone_number": user.phone_number,
"phone_number_verified": user.phone_number_verified if user.phone_number else None,
"picture": user.picture,
"locale": user.locale,
}
for claim_name, value in optional_fields.items():
if value is not None:
claims[claim_name] = value
# updated_at as Unix timestamp (OIDC spec requires number)
if user.updated_at:
claims["updated_at"] = int(user.updated_at.timestamp())
return claims
class PorchlightUserInfo(UserInfo):
"""UserInfo source backed by an in-memory claims cache.
Claims are populated via set_user_claims() after authentication.
idpyoidc calls __call__() synchronously to look up claims.
"""
def __init__(self, **kwargs): # type: ignore[no-untyped-def]
super().__init__(db={}, **kwargs)
def set_user_claims(self, user_id: str, claims: dict) -> None:
"""Populate claims cache for a user."""
self.db[user_id] = claims
```
**Step 4: Run tests to verify they pass**
Run: `uv run pytest tests/test_oidc/test_claims.py -v`
Expected: PASS
**Step 5: Run quality gate**
Run: `./scripts/check.sh`
Expected: All green
**Step 6: Commit**
```bash
git add src/fastapi_oidc_op/config.py src/fastapi_oidc_op/oidc/ tests/test_oidc/test_claims.py
git commit -m "feat: add OIDC claims mapping and PorchlightUserInfo source"
```
---
### Task 2: idpyoidc Server Initialization
**Files:**
- Create: `src/fastapi_oidc_op/oidc/provider.py`
- Create: `tests/test_oidc/test_provider.py`
**Step 1: Write the failing tests**
Create `tests/test_oidc/test_provider.py`:
```python
import shutil
from pathlib import Path
from fastapi_oidc_op.config import Settings
from fastapi_oidc_op.oidc.provider import create_oidc_server
def test_create_server_has_endpoints() -> None:
key_path = Path("test_keys_provider")
key_path.mkdir(exist_ok=True)
try:
settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:", signing_key_path=str(key_path))
server = create_oidc_server(settings)
assert "authorization" in server.endpoint
assert "token" in server.endpoint
assert "userinfo" in server.endpoint
assert "provider_config" in server.endpoint
finally:
shutil.rmtree(key_path, ignore_errors=True)
def test_create_server_has_issuer() -> None:
key_path = Path("test_keys_issuer")
key_path.mkdir(exist_ok=True)
try:
settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:", signing_key_path=str(key_path))
server = create_oidc_server(settings)
assert server.context.issuer == "http://localhost:8000"
finally:
shutil.rmtree(key_path, ignore_errors=True)
def test_create_server_jwks_available() -> None:
key_path = Path("test_keys_jwks")
key_path.mkdir(exist_ok=True)
try:
settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:", signing_key_path=str(key_path))
server = create_oidc_server(settings)
keys = server.keyjar.export_jwks()
assert "keys" in keys
assert len(keys["keys"]) > 0
finally:
shutil.rmtree(key_path, ignore_errors=True)
def test_create_server_userinfo_is_porchlight() -> None:
key_path = Path("test_keys_userinfo")
key_path.mkdir(exist_ok=True)
try:
settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:", signing_key_path=str(key_path))
server = create_oidc_server(settings)
from fastapi_oidc_op.oidc.claims import PorchlightUserInfo
assert isinstance(server.context.userinfo, PorchlightUserInfo)
finally:
shutil.rmtree(key_path, ignore_errors=True)
```
**Step 2: Run tests to verify they fail**
Run: `uv run pytest tests/test_oidc/test_provider.py -v`
Expected: FAIL (`ImportError`)
**Step 3: Implement**
Create `src/fastapi_oidc_op/oidc/provider.py`:
```python
"""Initialize the idpyoidc OIDC Server."""
from pathlib import Path
from idpyoidc.server import Server
from fastapi_oidc_op.config import Settings
from fastapi_oidc_op.oidc.claims import PorchlightUserInfo
def _build_server_config(settings: Settings) -> dict:
"""Build the idpyoidc server configuration dict."""
key_path = Path(settings.signing_key_path)
key_path.mkdir(parents=True, exist_ok=True)
return {
"issuer": settings.issuer,
"key_conf": {
"key_defs": [
{"type": "RSA", "use": ["sig"]},
{"type": "EC", "crv": "P-256", "use": ["sig"]},
],
"uri_path": "jwks",
"private_path": str(key_path / "private_jwks.json"),
"read_only": False,
},
"endpoint": {
"provider_config": {
"path": ".well-known/openid-configuration",
"class": "idpyoidc.server.oidc.provider_config.ProviderConfiguration",
"kwargs": {"client_authn_method": None},
},
"authorization": {
"path": "authorization",
"class": "idpyoidc.server.oidc.authorization.Authorization",
"kwargs": {
"client_authn_method": None,
"response_types_supported": ["code"],
"response_modes_supported": ["query"],
},
},
"token": {
"path": "token",
"class": "idpyoidc.server.oidc.token.Token",
"kwargs": {
"client_authn_method": ["client_secret_post", "client_secret_basic"],
},
},
"userinfo": {
"path": "userinfo",
"class": "idpyoidc.server.oidc.userinfo.UserInfo",
"kwargs": {},
},
},
"userinfo": {
"class": PorchlightUserInfo,
"kwargs": {},
},
"authentication": {},
"authz": {
"class": "idpyoidc.server.authz.AuthzHandling",
"kwargs": {
"grant_config": {
"usage_rules": {
"authorization_code": {
"supports_minting": ["access_token", "refresh_token", "id_token"],
"max_usage": 1,
"expires_in": 120,
},
"access_token": {"expires_in": 3600},
"refresh_token": {
"supports_minting": ["access_token", "refresh_token", "id_token"],
"expires_in": 86400,
},
},
"expires_in": 2592000,
},
},
},
"token_handler_args": {
"jwks_file": str(key_path / "token_jwks.json"),
"code": {"kwargs": {"lifetime": 600}},
"token": {
"class": "idpyoidc.server.token.jwt_token.JWTToken",
"kwargs": {
"lifetime": 3600,
"add_claims_by_scope": True,
"aud": [settings.issuer],
},
},
"refresh": {
"class": "idpyoidc.server.token.jwt_token.JWTToken",
"kwargs": {
"lifetime": 86400,
"aud": [settings.issuer],
},
},
"id_token": {
"class": "idpyoidc.server.token.id_token.IDToken",
"kwargs": {
"lifetime": 3600,
"add_claims_by_scope": True,
},
},
},
"scopes_to_claims": {
"openid": ["sub"],
"profile": [
"preferred_username",
"given_name",
"family_name",
"nickname",
"picture",
"locale",
"updated_at",
],
"email": ["email", "email_verified"],
"phone": ["phone_number", "phone_number_verified"],
},
}
def create_oidc_server(settings: Settings) -> Server:
"""Create and configure the idpyoidc Server instance."""
config = _build_server_config(settings)
server = Server(conf=config)
return server
```
Note: The exact config structure may need adjustments based on what idpyoidc accepts. If the `userinfo` config with a class instance doesn't work, we may need to set `server.context.userinfo` directly after creation:
```python
server = Server(conf=config)
server.context.userinfo = PorchlightUserInfo()
```
Run the tests and adjust accordingly.
**Step 4: Run tests to verify they pass**
Run: `uv run pytest tests/test_oidc/test_provider.py -v`
Expected: PASS
If `server.context.userinfo` is not a `PorchlightUserInfo` instance, the last test will guide us to fix the initialization.
**Step 5: Run quality gate**
Run: `./scripts/check.sh`
Expected: All green
**Step 6: Commit**
```bash
git add src/fastapi_oidc_op/oidc/provider.py tests/test_oidc/test_provider.py
git commit -m "feat: add idpyoidc server initialization"
```
---
### Task 3: App Integration (Lifespan + Client Registration)
**Files:**
- Modify: `src/fastapi_oidc_op/app.py`
- Modify: `tests/test_app.py`
**Step 1: Write the failing tests**
Add to `tests/test_app.py` (or create `tests/test_oidc/test_app_integration.py`):
```python
from httpx import AsyncClient
async def test_oidc_server_on_app_state(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
assert hasattr(app.state, "oidc_server")
assert app.state.oidc_server is not None
async def test_manage_client_registered(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
oidc_server = app.state.oidc_server
settings = app.state.settings
assert settings.manage_client_id in oidc_server.context.cdb
```
**Step 2: Run tests to verify they fail**
Run: `uv run pytest tests/test_oidc/test_app_integration.py -v`
Expected: FAIL (`app.state.oidc_server` doesn't exist)
**Step 3: Implement**
Modify `src/fastapi_oidc_op/app.py` to:
1. Import `create_oidc_server` from `oidc.provider`
2. In the lifespan, after setting up repos and services, create the OIDC server
3. Register the management client in `server.context.cdb`
4. Store the server on `app.state.oidc_server`
In the lifespan, after the existing service setup:
```python
from fastapi_oidc_op.oidc.provider import create_oidc_server
# OIDC Server
oidc_server = create_oidc_server(settings)
app.state.oidc_server = oidc_server
# Register management client
import secrets as _secrets
manage_secret = settings.session_secret or _secrets.token_hex(32)
oidc_server.context.cdb[settings.manage_client_id] = {
"client_id": settings.manage_client_id,
"client_secret": manage_secret,
"redirect_uris": [(f"{settings.issuer}/manage/callback", {})],
"response_types_supported": ["code"],
"token_endpoint_auth_method": "client_secret_basic",
"scope": ["openid", "profile", "email"],
"allowed_scopes": ["openid", "profile", "email"],
"client_salt": _secrets.token_hex(8),
}
oidc_server.keyjar.add_symmetric(settings.manage_client_id, manage_secret)
```
**Step 4: Run tests to verify they pass**
Run: `uv run pytest tests/test_oidc/test_app_integration.py -v`
Expected: PASS
**Step 5: Run quality gate**
Run: `./scripts/check.sh`
Expected: All green (including existing 120 tests)
**Step 6: Commit**
```bash
git add src/fastapi_oidc_op/app.py tests/test_oidc/test_app_integration.py
git commit -m "feat: integrate idpyoidc server into app lifespan"
```
---
### Task 4: Discovery + JWKS Endpoints
**Files:**
- Create: `src/fastapi_oidc_op/oidc/endpoints.py`
- Modify: `src/fastapi_oidc_op/app.py`
- Create: `tests/test_oidc/test_discovery.py`
**Step 1: Write the failing tests**
Create `tests/test_oidc/test_discovery.py`:
```python
from httpx import AsyncClient
async def test_discovery_endpoint_returns_metadata(client: AsyncClient) -> None:
res = await client.get("/.well-known/openid-configuration")
assert res.status_code == 200
data = res.json()
assert data["issuer"] == "http://localhost:8000"
assert "authorization_endpoint" in data
assert "token_endpoint" in data
assert "userinfo_endpoint" in data
assert "jwks_uri" in data
async def test_discovery_response_types_supported(client: AsyncClient) -> None:
res = await client.get("/.well-known/openid-configuration")
data = res.json()
assert "code" in data["response_types_supported"]
async def test_discovery_scopes_supported(client: AsyncClient) -> None:
res = await client.get("/.well-known/openid-configuration")
data = res.json()
assert "openid" in data["scopes_supported"]
async def test_jwks_endpoint_returns_keys(client: AsyncClient) -> None:
res = await client.get("/jwks")
assert res.status_code == 200
data = res.json()
assert "keys" in data
assert len(data["keys"]) > 0
# Each key should have kty and use
for key in data["keys"]:
assert "kty" in key
```
**Step 2: Run tests to verify they fail**
Run: `uv run pytest tests/test_oidc/test_discovery.py -v`
Expected: FAIL (404 — routes don't exist)
**Step 3: Implement**
Create `src/fastapi_oidc_op/oidc/endpoints.py`:
```python
"""FastAPI routes wrapping idpyoidc endpoint processing."""
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
router = APIRouter(tags=["oidc"])
@router.get("/.well-known/openid-configuration")
async def provider_configuration(request: Request) -> JSONResponse:
"""OIDC Discovery endpoint."""
oidc_server = request.app.state.oidc_server
endpoint = oidc_server.get_endpoint("provider_config")
parsed = endpoint.parse_request({})
result = endpoint.process_request(parsed)
response_info = endpoint.do_response(response_args=result.get("response_args"), request=parsed, **result)
return JSONResponse(content=response_info["response"].to_dict())
@router.get("/jwks")
async def jwks(request: Request) -> JSONResponse:
"""Public signing keys (JWK Set)."""
oidc_server = request.app.state.oidc_server
keys = oidc_server.keyjar.export_jwks()
return JSONResponse(content=keys)
```
Modify `src/fastapi_oidc_op/app.py` to include the OIDC router:
```python
from fastapi_oidc_op.oidc.endpoints import router as oidc_router
# ...
app.include_router(oidc_router)
```
Note: The discovery endpoint implementation depends on how `do_response` returns data. If `response_info["response"]` is a string (JSON), parse it. If it's a `Message` object, call `.to_dict()`. Adjust based on test output.
**Step 4: Run tests to verify they pass**
Run: `uv run pytest tests/test_oidc/test_discovery.py -v`
Expected: PASS
**Step 5: Run quality gate**
Run: `./scripts/check.sh`
Expected: All green
**Step 6: Commit**
```bash
git add src/fastapi_oidc_op/oidc/endpoints.py src/fastapi_oidc_op/app.py tests/test_oidc/test_discovery.py
git commit -m "feat: add OIDC discovery and JWKS endpoints"
```
---
### Task 5: Authorization Endpoint (Redirect to Login)
**Files:**
- Modify: `src/fastapi_oidc_op/oidc/endpoints.py`
- Create: `tests/test_oidc/test_authorization.py`
This task implements the first half of the authorization flow: receiving the request from an RP and redirecting to the login page when the user is not authenticated.
**Step 1: Write the failing tests**
Create `tests/test_oidc/test_authorization.py`:
```python
import secrets
from httpx import AsyncClient
def _register_test_client(client: AsyncClient, client_id: str = "test-rp", redirect_uri: str = "http://localhost:9000/callback") -> str:
"""Register a test client in the OIDC server. Returns client_secret."""
app = client._transport.app # type: ignore[union-attr]
oidc_server = app.state.oidc_server
client_secret = secrets.token_hex(16)
oidc_server.context.cdb[client_id] = {
"client_id": client_id,
"client_secret": client_secret,
"redirect_uris": [(redirect_uri, {})],
"response_types_supported": ["code"],
"token_endpoint_auth_method": "client_secret_basic",
"scope": ["openid", "profile", "email"],
"allowed_scopes": ["openid", "profile", "email"],
"client_salt": secrets.token_hex(8),
}
oidc_server.keyjar.add_symmetric(client_id, client_secret)
return client_secret
async def test_authorization_redirects_to_login_when_unauthenticated(client: AsyncClient) -> None:
_register_test_client(client)
res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": "test-rp",
"redirect_uri": "http://localhost:9000/callback",
"scope": "openid",
"state": "test-state",
"nonce": "test-nonce",
},
follow_redirects=False,
)
assert res.status_code in (302, 303)
assert "/login" in res.headers["location"]
async def test_authorization_stores_auth_request_in_session(client: AsyncClient) -> None:
_register_test_client(client)
# After the redirect, the auth request should be stored in the session
# We verify this indirectly: make the auth request, then check that
# GET /login works (the session cookie is set by the redirect)
res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": "test-rp",
"redirect_uri": "http://localhost:9000/callback",
"scope": "openid",
"state": "test-state",
"nonce": "test-nonce",
},
follow_redirects=False,
)
assert res.status_code in (302, 303)
# Follow up: login page should be accessible
login_res = await client.get("/login")
assert login_res.status_code == 200
async def test_authorization_invalid_client_returns_error(client: AsyncClient) -> None:
res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": "nonexistent",
"redirect_uri": "http://evil.com/callback",
"scope": "openid",
"state": "test-state",
},
follow_redirects=False,
)
# Should return an error (not redirect to evil.com)
assert res.status_code >= 400 or "error" in res.text.lower()
```
**Step 2: Run tests to verify they fail**
Run: `uv run pytest tests/test_oidc/test_authorization.py -v`
Expected: FAIL (404 — `/authorization` doesn't exist)
**Step 3: Implement**
Add to `src/fastapi_oidc_op/oidc/endpoints.py`:
```python
from fastapi.responses import RedirectResponse, HTMLResponse
from urllib.parse import urlencode
@router.get("/authorization")
async def authorization(request: Request) -> Response:
"""OIDC Authorization endpoint.
Validates the authorization request, then:
- If user is authenticated (session): creates grant + redirects to RP
- If not authenticated: stores request in session, redirects to /login
"""
oidc_server = request.app.state.oidc_server
endpoint = oidc_server.get_endpoint("authorization")
# Build the query string from request params
query_params = dict(request.query_params)
try:
parsed = endpoint.parse_request(query_params)
except Exception as exc:
return HTMLResponse(f"<h1>Invalid Request</h1><p>{exc}</p>", status_code=400)
# Check for parse errors
if "error" in parsed:
error_desc = parsed.get("error_description", parsed["error"])
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
# Check if user is authenticated via Starlette session
userid = request.session.get("userid")
username = request.session.get("username")
if userid and username:
# User is authenticated — complete the authorization
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username)
# Not authenticated — store auth request in session and redirect to login
# Store as a serializable dict
request.session["oidc_auth_request"] = query_params
return RedirectResponse("/login", status_code=303)
```
The `_complete_authorization` helper will be implemented in Task 7 (authorization complete). For now, make it return a placeholder:
```python
async def _complete_authorization(request, oidc_server, endpoint, parsed, userid, username):
"""Complete the authorization after authentication. Implemented in Task 7."""
return HTMLResponse("Authorization completion not yet implemented", status_code=501)
```
**Step 4: Run tests to verify they pass**
Run: `uv run pytest tests/test_oidc/test_authorization.py -v`
Expected: PASS
**Step 5: Run quality gate**
Run: `./scripts/check.sh`
Expected: All green
**Step 6: Commit**
```bash
git add src/fastapi_oidc_op/oidc/endpoints.py tests/test_oidc/test_authorization.py
git commit -m "feat: add authorization endpoint with login redirect"
```
---
### Task 6: Login Route Integration (Resume OIDC Flow)
**Files:**
- Modify: `src/fastapi_oidc_op/authn/routes.py`
- Create: `tests/test_oidc/test_login_oidc_redirect.py`
After a successful login, if there's a pending OIDC authorization request in the session, the login routes should redirect to `/authorization/complete` instead of `/manage/credentials`.
**Step 1: Write the failing tests**
Create `tests/test_oidc/test_login_oidc_redirect.py`:
```python
import secrets
from datetime import UTC, datetime
from argon2 import PasswordHasher
from httpx import AsyncClient
from fastapi_oidc_op.authn.password import PasswordService
from fastapi_oidc_op.models import PasswordCredential, User
def _register_test_client(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
oidc_server = app.state.oidc_server
oidc_server.context.cdb["test-rp"] = {
"client_id": "test-rp",
"client_secret": "test-secret",
"redirect_uris": [("http://localhost:9000/callback", {})],
"response_types_supported": ["code"],
"token_endpoint_auth_method": "client_secret_basic",
"scope": ["openid", "profile", "email"],
"allowed_scopes": ["openid", "profile", "email"],
"client_salt": secrets.token_hex(8),
}
oidc_server.keyjar.add_symmetric("test-rp", "test-secret")
async def _create_user(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = User(userid="lusab-bansen", username="alice", created_at=datetime.now(UTC), updated_at=datetime.now(UTC))
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
async def test_login_with_pending_oidc_redirects_to_authorization_complete(client: AsyncClient) -> None:
_register_test_client(client)
await _create_user(client)
# Step 1: Start OIDC authorization (stores request in session, redirects to /login)
auth_res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": "test-rp",
"redirect_uri": "http://localhost:9000/callback",
"scope": "openid",
"state": "test-state",
"nonce": "test-nonce",
},
follow_redirects=False,
)
assert auth_res.status_code in (302, 303)
# Step 2: Login via password
login_res = await client.post(
"/login/password",
data={"username": "alice", "password": "testpass"},
headers={"HX-Request": "true"},
)
assert login_res.status_code == 200
# Should redirect to /authorization/complete, not /manage/credentials
redirect_target = login_res.headers.get("HX-Redirect", "")
assert "/authorization/complete" in redirect_target
async def test_login_without_pending_oidc_redirects_to_manage(client: AsyncClient) -> None:
await _create_user(client)
login_res = await client.post(
"/login/password",
data={"username": "alice", "password": "testpass"},
headers={"HX-Request": "true"},
)
assert login_res.status_code == 200
redirect_target = login_res.headers.get("HX-Redirect", "")
assert redirect_target == "/manage/credentials"
```
**Step 2: Run tests to verify they fail**
Run: `uv run pytest tests/test_oidc/test_login_oidc_redirect.py -v`
Expected: FAIL (login always redirects to `/manage/credentials`)
**Step 3: Implement**
Modify `src/fastapi_oidc_op/authn/routes.py`:
Add a helper function:
```python
def _login_redirect_target(request: Request) -> str:
"""Determine where to redirect after successful login.
If there's a pending OIDC authorization request, redirect to complete it.
Otherwise, redirect to credential management.
"""
if "oidc_auth_request" in request.session:
return "/authorization/complete"
return "/manage/credentials"
```
In `login_password`, change:
```python
response.headers["HX-Redirect"] = "/manage/credentials"
```
to:
```python
response.headers["HX-Redirect"] = _login_redirect_target(request)
```
In `login_webauthn_complete`, change the same line:
```python
response.headers["HX-Redirect"] = "/manage/credentials"
```
to:
```python
response.headers["HX-Redirect"] = _login_redirect_target(request)
```
**Step 4: Run tests to verify they pass**
Run: `uv run pytest tests/test_oidc/test_login_oidc_redirect.py -v`
Expected: PASS
Also run existing login tests to make sure nothing broke:
Run: `uv run pytest tests/test_auth_routes/ -v`
Expected: All PASS
**Step 5: Run quality gate**
Run: `./scripts/check.sh`
Expected: All green
**Step 6: Commit**
```bash
git add src/fastapi_oidc_op/authn/routes.py tests/test_oidc/test_login_oidc_redirect.py
git commit -m "feat: redirect to OIDC authorization after login when pending"
```
---
### Task 7: Authorization Complete + Token Endpoint
**Files:**
- Modify: `src/fastapi_oidc_op/oidc/endpoints.py`
- Create: `tests/test_oidc/test_token.py`
This is the core task: completing the authorization flow and implementing the token endpoint. The `/authorization/complete` route creates an idpyoidc session, mints an authorization code, and redirects to the RP. The token endpoint exchanges the code for tokens.
**Step 1: Write the failing tests**
Create `tests/test_oidc/test_token.py`:
```python
import secrets
from base64 import b64encode
from datetime import UTC, datetime
from urllib.parse import parse_qs, urlparse
from argon2 import PasswordHasher
from httpx import AsyncClient
from fastapi_oidc_op.authn.password import PasswordService
from fastapi_oidc_op.models import PasswordCredential, User
def _register_test_client(client: AsyncClient) -> str:
app = client._transport.app # type: ignore[union-attr]
oidc_server = app.state.oidc_server
client_secret = "test-secret-123"
oidc_server.context.cdb["test-rp"] = {
"client_id": "test-rp",
"client_secret": client_secret,
"redirect_uris": [("http://localhost:9000/callback", {})],
"response_types_supported": ["code"],
"token_endpoint_auth_method": "client_secret_basic",
"scope": ["openid", "profile", "email"],
"allowed_scopes": ["openid", "profile", "email"],
"client_salt": secrets.token_hex(8),
}
oidc_server.keyjar.add_symmetric("test-rp", client_secret)
return client_secret
async def _create_user_and_login(client: AsyncClient) -> str:
"""Create user, log in, return userid."""
app = client._transport.app # type: ignore[union-attr]
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = User(
userid="lusab-bansen",
username="alice",
email="alice@example.com",
email_verified=True,
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
await client.post(
"/login/password",
data={"username": "alice", "password": "testpass"},
headers={"HX-Request": "true"},
)
return user.userid
async def _get_authorization_code(client: AsyncClient) -> str:
"""Run full auth flow and extract the authorization code."""
client_secret = _register_test_client(client)
# Start authorization
auth_res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": "test-rp",
"redirect_uri": "http://localhost:9000/callback",
"scope": "openid profile email",
"state": "test-state",
"nonce": "test-nonce",
},
follow_redirects=False,
)
assert auth_res.status_code in (302, 303)
# Create user and log in
await _create_user_and_login(client)
# Start authorization again (now authenticated)
auth_res2 = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": "test-rp",
"redirect_uri": "http://localhost:9000/callback",
"scope": "openid profile email",
"state": "test-state",
"nonce": "test-nonce",
},
follow_redirects=False,
)
assert auth_res2.status_code in (302, 303)
# Parse the redirect URL to extract the code
location = auth_res2.headers["location"]
parsed = urlparse(location)
params = parse_qs(parsed.query)
assert "code" in params, f"No code in redirect: {location}"
return params["code"][0]
async def test_authorization_complete_redirects_with_code(client: AsyncClient) -> None:
code = await _get_authorization_code(client)
assert len(code) > 0
async def test_token_endpoint_exchanges_code(client: AsyncClient) -> None:
code = await _get_authorization_code(client)
client_secret = "test-secret-123"
# Exchange code for tokens
auth_header = b64encode(f"test-rp:{client_secret}".encode()).decode()
token_res = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": "http://localhost:9000/callback",
},
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
},
)
assert token_res.status_code == 200
data = token_res.json()
assert "access_token" in data
assert "id_token" in data
assert data["token_type"].lower() == "bearer"
async def test_token_endpoint_invalid_code_returns_error(client: AsyncClient) -> None:
_register_test_client(client)
client_secret = "test-secret-123"
auth_header = b64encode(f"test-rp:{client_secret}".encode()).decode()
token_res = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": "invalid-code",
"redirect_uri": "http://localhost:9000/callback",
},
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
},
)
assert token_res.status_code == 400 or "error" in token_res.json()
```
**Step 2: Run tests to verify they fail**
Run: `uv run pytest tests/test_oidc/test_token.py -v`
Expected: FAIL
**Step 3: Implement**
Modify `src/fastapi_oidc_op/oidc/endpoints.py`:
Replace the placeholder `_complete_authorization` with the real implementation, and add the token and authorization complete endpoints:
```python
import json
from urllib.parse import urlencode
from fastapi import APIRouter, Request, Response
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from idpyoidc.server.authn_event import create_authn_event
from idpyoidc.time_util import utc_time_sans_frac
from fastapi_oidc_op.oidc.claims import PorchlightUserInfo, user_to_claims
router = APIRouter(tags=["oidc"])
async def _complete_authorization(
request: Request,
oidc_server,
endpoint,
parsed_request,
userid: str,
username: str,
) -> Response:
"""Complete OIDC authorization after user authentication.
Creates an idpyoidc session, populates the userinfo cache, and
returns a redirect to the RP with an authorization code.
"""
# Populate userinfo cache
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
if user is None:
return HTMLResponse("<h1>Error</h1><p>User not found</p>", status_code=400)
userinfo: PorchlightUserInfo = oidc_server.context.userinfo
claims = user_to_claims(user)
userinfo.set_user_claims(username, claims)
# Create idpyoidc session
session_id = endpoint.create_session(
request=parsed_request,
user_id=username,
acr="urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport",
time_stamp=utc_time_sans_frac(),
authn_method=None,
)
# Complete authorization (mints code, builds redirect)
result = endpoint.authz_part2(request=parsed_request, session_id=session_id)
if "error" in result:
error_desc = result.get("error_description", result["error"])
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
resp_info = endpoint.do_response(
response_args=result.get("response_args"),
request=parsed_request,
**result,
)
# The response is the redirect URL
redirect_url = resp_info["response"]
if hasattr(redirect_url, "to_dict"):
# It's a Message object — build URL from return_uri + params
params = redirect_url.to_dict()
return_uri = result.get("return_uri", parsed_request.get("redirect_uri", ""))
redirect_url = f"{return_uri}?{urlencode(params)}"
return RedirectResponse(str(redirect_url), status_code=303)
@router.get("/authorization/complete")
async def authorization_complete(request: Request) -> Response:
"""Resume OIDC authorization after login.
Called after the user authenticates via /login. Pops the pending
auth request from the session and completes the authorization.
"""
oidc_server = request.app.state.oidc_server
endpoint = oidc_server.get_endpoint("authorization")
# Pop the stored auth request
auth_request_params = request.session.pop("oidc_auth_request", None)
if auth_request_params is None:
return HTMLResponse("<h1>Error</h1><p>No pending authorization request</p>", status_code=400)
userid = request.session.get("userid")
username = request.session.get("username")
if not userid or not username:
return RedirectResponse("/login", status_code=303)
# Re-parse the original authorization request
try:
parsed = endpoint.parse_request(auth_request_params)
except Exception as exc:
return HTMLResponse(f"<h1>Error</h1><p>{exc}</p>", status_code=400)
if "error" in parsed:
error_desc = parsed.get("error_description", parsed["error"])
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username)
@router.post("/token")
async def token_endpoint(request: Request) -> JSONResponse:
"""OIDC Token endpoint."""
oidc_server = request.app.state.oidc_server
endpoint = oidc_server.get_endpoint("token")
# Read form body
body = await request.body()
body_str = body.decode("utf-8")
# Build http_info for client authentication
http_info = {
"headers": dict(request.headers),
"url": str(request.url),
}
try:
parsed = endpoint.parse_request(body_str, http_info=http_info)
except Exception as exc:
return JSONResponse({"error": "invalid_request", "error_description": str(exc)}, status_code=400)
if "error" in parsed:
return JSONResponse(parsed.to_dict(), status_code=400)
result = endpoint.process_request(parsed)
if "error" in result:
return JSONResponse(result, status_code=400)
resp_info = endpoint.do_response(response_args=result.get("response_args"), request=parsed, **result)
# Parse the response
response_data = resp_info["response"]
if isinstance(response_data, str):
response_data = json.loads(response_data)
elif hasattr(response_data, "to_dict"):
response_data = response_data.to_dict()
return JSONResponse(response_data)
```
Note: The exact way idpyoidc returns responses (as strings, Message objects, or dicts) may vary. The implementation should handle all cases. Adjust based on test output.
Also note: `create_session` with `authn_method=None` may fail. If so, create a minimal `UserAuthnMethod` subclass or pass a different value. The test output will guide the fix.
**Step 4: Run tests to verify they pass**
Run: `uv run pytest tests/test_oidc/test_token.py -v`
Expected: PASS
**Step 5: Run quality gate**
Run: `./scripts/check.sh`
Expected: All green
**Step 6: Commit**
```bash
git add src/fastapi_oidc_op/oidc/endpoints.py tests/test_oidc/test_token.py
git commit -m "feat: add authorization complete and token endpoints"
```
---
### Task 8: UserInfo Endpoint
**Files:**
- Modify: `src/fastapi_oidc_op/oidc/endpoints.py`
- Create: `tests/test_oidc/test_userinfo.py`
**Step 1: Write the failing tests**
Create `tests/test_oidc/test_userinfo.py`:
```python
import secrets
from base64 import b64encode
from datetime import UTC, datetime
from urllib.parse import parse_qs, urlparse
from argon2 import PasswordHasher
from httpx import AsyncClient
from fastapi_oidc_op.authn.password import PasswordService
from fastapi_oidc_op.models import PasswordCredential, User
def _register_test_client(client: AsyncClient) -> str:
app = client._transport.app # type: ignore[union-attr]
oidc_server = app.state.oidc_server
client_secret = "test-secret-456"
oidc_server.context.cdb["test-rp"] = {
"client_id": "test-rp",
"client_secret": client_secret,
"redirect_uris": [("http://localhost:9000/callback", {})],
"response_types_supported": ["code"],
"token_endpoint_auth_method": "client_secret_basic",
"scope": ["openid", "profile", "email"],
"allowed_scopes": ["openid", "profile", "email"],
"client_salt": secrets.token_hex(8),
}
oidc_server.keyjar.add_symmetric("test-rp", client_secret)
return client_secret
async def _get_access_token(client: AsyncClient) -> str:
"""Full OIDC flow to get an access token."""
client_secret = _register_test_client(client)
app = client._transport.app # type: ignore[union-attr]
# Create user
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = User(
userid="lusab-bansen",
username="alice",
email="alice@example.com",
email_verified=True,
given_name="Alice",
family_name="Wonderland",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
# Start auth flow
await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": "test-rp",
"redirect_uri": "http://localhost:9000/callback",
"scope": "openid profile email",
"state": "test-state",
"nonce": "test-nonce",
},
follow_redirects=False,
)
# Login
await client.post(
"/login/password",
data={"username": "alice", "password": "testpass"},
headers={"HX-Request": "true"},
)
# Complete authorization
auth_res = await client.get("/authorization/complete", follow_redirects=False)
location = auth_res.headers["location"]
params = parse_qs(urlparse(location).query)
code = params["code"][0]
# Exchange code for tokens
auth_header = b64encode(f"test-rp:{client_secret}".encode()).decode()
token_res = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": "http://localhost:9000/callback",
},
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
},
)
return token_res.json()["access_token"]
async def test_userinfo_returns_claims(client: AsyncClient) -> None:
access_token = await _get_access_token(client)
res = await client.get(
"/userinfo",
headers={"Authorization": f"Bearer {access_token}"},
)
assert res.status_code == 200
data = res.json()
assert data["sub"] == "lusab-bansen"
async def test_userinfo_includes_email_claims(client: AsyncClient) -> None:
access_token = await _get_access_token(client)
res = await client.get(
"/userinfo",
headers={"Authorization": f"Bearer {access_token}"},
)
data = res.json()
assert data.get("email") == "alice@example.com"
assert data.get("email_verified") is True
async def test_userinfo_invalid_token_returns_error(client: AsyncClient) -> None:
res = await client.get(
"/userinfo",
headers={"Authorization": "Bearer invalid-token"},
)
assert res.status_code in (401, 403)
```
**Step 2: Run tests to verify they fail**
Run: `uv run pytest tests/test_oidc/test_userinfo.py -v`
Expected: FAIL
**Step 3: Implement**
Add to `src/fastapi_oidc_op/oidc/endpoints.py`:
```python
@router.get("/userinfo")
@router.post("/userinfo")
async def userinfo_endpoint(request: Request) -> Response:
"""OIDC UserInfo endpoint."""
oidc_server = request.app.state.oidc_server
endpoint = oidc_server.get_endpoint("userinfo")
# Extract access token from Authorization header
http_info = {
"headers": dict(request.headers),
"url": str(request.url),
}
# For POST requests, include the body
if request.method == "POST":
body = await request.body()
request_data = body.decode("utf-8")
else:
request_data = {}
try:
parsed = endpoint.parse_request(request_data, http_info=http_info)
except Exception:
return JSONResponse({"error": "invalid_token"}, status_code=401)
if "error" in parsed:
return JSONResponse(parsed.to_dict() if hasattr(parsed, "to_dict") else parsed, status_code=401)
result = endpoint.process_request(parsed)
if "error" in result:
return JSONResponse(result, status_code=401)
resp_info = endpoint.do_response(response_args=result.get("response_args"), request=parsed, **result)
response_data = resp_info["response"]
if isinstance(response_data, str):
response_data = json.loads(response_data)
elif hasattr(response_data, "to_dict"):
response_data = response_data.to_dict()
return JSONResponse(response_data)
```
**Step 4: Run tests to verify they pass**
Run: `uv run pytest tests/test_oidc/test_userinfo.py -v`
Expected: PASS
**Step 5: Run quality gate**
Run: `./scripts/check.sh`
Expected: All green
**Step 6: Commit**
```bash
git add src/fastapi_oidc_op/oidc/endpoints.py tests/test_oidc/test_userinfo.py
git commit -m "feat: add OIDC userinfo endpoint"
```
---
### Task 9: End-to-End Flow Test
**Files:**
- Create: `tests/test_oidc/test_e2e_flow.py`
A comprehensive test that exercises the entire OIDC flow from start to finish, including verifying the ID token.
**Step 1: Write the test**
Create `tests/test_oidc/test_e2e_flow.py`:
```python
"""End-to-end OIDC Authorization Code flow test."""
import json
import secrets
from base64 import b64encode
from datetime import UTC, datetime
from urllib.parse import parse_qs, urlparse
from argon2 import PasswordHasher
from cryptojwt.jwk.jwk import key_from_jwk_dict
from cryptojwt.jws.jws import JWS
from httpx import AsyncClient
from fastapi_oidc_op.authn.password import PasswordService
from fastapi_oidc_op.models import PasswordCredential, User
async def test_full_authorization_code_flow(client: AsyncClient) -> None:
"""Complete OIDC Authorization Code flow: authorize → login → code → token → userinfo → validate ID token."""
app = client._transport.app # type: ignore[union-attr]
oidc_server = app.state.oidc_server
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
# --- Setup: register client ---
client_secret = "e2e-secret"
oidc_server.context.cdb["e2e-rp"] = {
"client_id": "e2e-rp",
"client_secret": client_secret,
"redirect_uris": [("http://localhost:9000/callback", {})],
"response_types_supported": ["code"],
"token_endpoint_auth_method": "client_secret_basic",
"scope": ["openid", "profile", "email"],
"allowed_scopes": ["openid", "profile", "email"],
"client_salt": secrets.token_hex(8),
}
oidc_server.keyjar.add_symmetric("e2e-rp", client_secret)
# --- Setup: create user ---
user = User(
userid="lusab-bansen",
username="alice",
given_name="Alice",
family_name="Wonderland",
email="alice@example.com",
email_verified=True,
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
# --- Step 1: Authorization request (unauthenticated) ---
auth_res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": "e2e-rp",
"redirect_uri": "http://localhost:9000/callback",
"scope": "openid profile email",
"state": "e2e-state",
"nonce": "e2e-nonce",
},
follow_redirects=False,
)
assert auth_res.status_code in (302, 303), f"Expected redirect, got {auth_res.status_code}"
assert "/login" in auth_res.headers["location"]
# --- Step 2: Password login ---
login_res = await client.post(
"/login/password",
data={"username": "alice", "password": "testpass"},
headers={"HX-Request": "true"},
)
assert login_res.status_code == 200
assert "/authorization/complete" in login_res.headers.get("HX-Redirect", "")
# --- Step 3: Complete authorization ---
complete_res = await client.get("/authorization/complete", follow_redirects=False)
assert complete_res.status_code in (302, 303)
location = complete_res.headers["location"]
assert location.startswith("http://localhost:9000/callback")
params = parse_qs(urlparse(location).query)
assert "code" in params
assert params.get("state", [None])[0] == "e2e-state"
code = params["code"][0]
# --- Step 4: Token exchange ---
auth_header = b64encode(f"e2e-rp:{client_secret}".encode()).decode()
token_res = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": "http://localhost:9000/callback",
},
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
},
)
assert token_res.status_code == 200
token_data = token_res.json()
assert "access_token" in token_data
assert "id_token" in token_data
assert token_data["token_type"].lower() == "bearer"
# --- Step 5: Validate ID token ---
id_token_raw = token_data["id_token"]
# Get JWKS
jwks_res = await client.get("/jwks")
jwks = jwks_res.json()
# Verify the JWT signature
jws = JWS()
_keys = [key_from_jwk_dict(k) for k in jwks["keys"]]
id_token_payload = jws.verify_compact(id_token_raw, _keys)
assert id_token_payload["iss"] == "http://localhost:8000"
assert id_token_payload["sub"] == "lusab-bansen"
assert "e2e-rp" in id_token_payload["aud"]
# --- Step 6: UserInfo ---
userinfo_res = await client.get(
"/userinfo",
headers={"Authorization": f"Bearer {token_data['access_token']}"},
)
assert userinfo_res.status_code == 200
userinfo = userinfo_res.json()
assert userinfo["sub"] == "lusab-bansen"
assert userinfo.get("email") == "alice@example.com"
```
**Step 2: Run the test**
Run: `uv run pytest tests/test_oidc/test_e2e_flow.py -v`
Expected: PASS (if Tasks 1-8 are all correct)
If the test fails, this is where integration issues surface. Debug and fix based on the error output. Common issues:
- idpyoidc's `create_session` needs `authn_method` to not be None
- Response format mismatches (string vs Message vs dict)
- `user_id` in idpyoidc session vs `userid` in our model (the session uses `username` as the `user_id` key for idpyoidc, not `userid`)
- Cookie handling between requests
**Step 3: Commit**
```bash
git add tests/test_oidc/test_e2e_flow.py
git commit -m "test: add end-to-end OIDC authorization code flow test"
```
---
### Task 10: Full Quality Gate
**Step 1: Run full quality checks**
Run: `./scripts/check.sh`
Expected: All green (formatting, linting, type checking, all tests pass)
**Step 2: Fix any issues**
If ruff or ty reports issues, fix them. Common fixes:
- Type ignore comments for idpyoidc's untyped APIs
- Import sorting
- Unused imports
**Step 3: Commit any fixes**
```bash
git add -A
git diff --cached --quiet || git commit -m "style: fix formatting and lint issues"
```