feat: add authorization complete and token endpoints

This commit is contained in:
Johan Lundberg 2026-02-16 13:47:48 +01:00
parent 18e9e7f2b5
commit e4e7cd237e
No known key found for this signature in database
GPG key ID: A6C152738D03C7D1
2 changed files with 254 additions and 2 deletions

View file

@ -3,9 +3,14 @@
from __future__ import annotations from __future__ import annotations
import json import json
from types import SimpleNamespace
from urllib.parse import urlencode
from fastapi import APIRouter, Request, Response from fastapi import APIRouter, Request, Response
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from idpyoidc.time_util import utc_time_sans_frac
from fastapi_oidc_op.oidc.claims import PorchlightUserInfo, user_to_claims
router = APIRouter(tags=["oidc"]) router = APIRouter(tags=["oidc"])
@ -65,6 +70,33 @@ async def authorization(request: Request) -> Response:
return RedirectResponse("/login", status_code=303) return RedirectResponse("/login", status_code=303)
@router.get("/authorization/complete")
async def authorization_complete(request: Request) -> Response:
"""Resume OIDC authorization after login."""
oidc_server = request.app.state.oidc_server
endpoint = oidc_server.get_endpoint("authorization")
auth_request_params = request.session.pop("oidc_auth_request", None)
if auth_request_params is None:
return HTMLResponse("<h1>Error</h1><p>No pending authorization request</p>", status_code=400)
userid = request.session.get("userid")
username = request.session.get("username")
if not userid or not username:
return RedirectResponse("/login", status_code=303)
try:
parsed = endpoint.parse_request(auth_request_params)
except Exception as exc:
return HTMLResponse(f"<h1>Error</h1><p>{exc}</p>", status_code=400)
if "error" in parsed:
error_desc = parsed.get("error_description", parsed["error"])
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username)
async def _complete_authorization( async def _complete_authorization(
request: Request, request: Request,
oidc_server: object, oidc_server: object,
@ -73,5 +105,87 @@ async def _complete_authorization(
userid: str, userid: str,
username: str, username: str,
) -> Response: ) -> Response:
"""Complete the authorization after authentication. Placeholder — implemented in Task 7.""" """Complete OIDC authorization after user authentication."""
return HTMLResponse("Authorization completion not yet implemented", status_code=501) # Populate userinfo cache
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
if user is None:
return HTMLResponse("<h1>Error</h1><p>User not found</p>", status_code=400)
userinfo: PorchlightUserInfo = oidc_server.context.userinfo # type: ignore[union-attr]
claims = user_to_claims(user)
userinfo.set_user_claims(username, claims)
# Create idpyoidc session — authn_method needs a kwargs dict
authn_method = SimpleNamespace(kwargs={})
session_id = endpoint.create_session( # type: ignore[union-attr]
request=parsed,
user_id=username,
acr="urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport",
time_stamp=utc_time_sans_frac(),
authn_method=authn_method,
)
# Complete authorization (mints code, builds redirect)
result = endpoint.authz_part2(request=parsed, session_id=session_id) # type: ignore[union-attr]
if "error" in result.get("response_args", {}):
response_args = result["response_args"]
error_desc = response_args.get("error_description", response_args["error"])
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
# Build redirect URL
response_args = result.get("response_args", {})
return_uri = result.get("return_uri", "")
if hasattr(response_args, "to_dict"):
params = response_args.to_dict()
elif isinstance(response_args, dict):
params = response_args
else:
params = dict(response_args)
redirect_url = f"{return_uri}?{urlencode(params)}"
return RedirectResponse(redirect_url, status_code=303)
@router.post("/token")
async def token_endpoint(request: Request) -> JSONResponse:
"""OIDC Token endpoint."""
oidc_server = request.app.state.oidc_server
endpoint = oidc_server.get_endpoint("token")
body = await request.body()
body_str = body.decode("utf-8")
http_info = {
"headers": dict(request.headers),
"url": str(request.url),
}
try:
parsed = endpoint.parse_request(body_str, http_info=http_info)
except Exception as exc:
return JSONResponse({"error": "invalid_request", "error_description": str(exc)}, status_code=400)
if isinstance(parsed, dict) and "error" in parsed:
return JSONResponse(parsed, status_code=400)
elif hasattr(parsed, "to_dict") and "error" in parsed:
return JSONResponse(parsed.to_dict(), status_code=400)
result = endpoint.process_request(parsed)
if hasattr(result, "to_dict") and "error" in result:
return JSONResponse(result.to_dict(), status_code=400)
elif isinstance(result, dict) and "error" in result:
return JSONResponse(result, status_code=400)
resp_info = endpoint.do_response(response_args=result.get("response_args"), request=parsed)
response_data = resp_info["response"]
if isinstance(response_data, str):
response_data = json.loads(response_data)
elif hasattr(response_data, "to_dict"):
response_data = response_data.to_dict()
return JSONResponse(response_data)

View file

@ -0,0 +1,138 @@
import secrets
from base64 import b64encode
from datetime import UTC, datetime
from urllib.parse import parse_qs, urlparse
from argon2 import PasswordHasher
from httpx import AsyncClient
from fastapi_oidc_op.authn.password import PasswordService
from fastapi_oidc_op.models import PasswordCredential, User
def _register_test_client(client: AsyncClient) -> str:
app = client._transport.app # type: ignore[union-attr]
oidc_server = app.state.oidc_server
client_secret = "test-secret-0123456789abcdef"
oidc_server.context.cdb["test-rp"] = {
"client_id": "test-rp",
"client_secret": client_secret,
"redirect_uris": [("http://localhost:9000/callback", {})],
"response_types_supported": ["code"],
"token_endpoint_auth_method": "client_secret_basic",
"scope": ["openid", "profile", "email"],
"allowed_scopes": ["openid", "profile", "email"],
"client_salt": secrets.token_hex(8),
}
oidc_server.keyjar.add_symmetric("test-rp", client_secret)
return client_secret
async def _create_user_and_login(client: AsyncClient) -> str:
"""Create user, log in, return userid."""
app = client._transport.app # type: ignore[union-attr]
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = User(
userid="lusab-bansen",
username="alice",
email="alice@example.com",
email_verified=True,
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
await client.post(
"/login/password",
data={"username": "alice", "password": "testpass"},
headers={"HX-Request": "true"},
)
return user.userid
async def _get_authorization_code(client: AsyncClient) -> str:
"""Run full auth flow and extract the authorization code."""
_register_test_client(client)
# Start authorization (unauthenticated — stores in session)
await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": "test-rp",
"redirect_uri": "http://localhost:9000/callback",
"scope": "openid profile email",
"state": "test-state",
"nonce": "test-nonce",
},
follow_redirects=False,
)
# Create user and log in
await _create_user_and_login(client)
# Complete authorization (now authenticated, session has oidc_auth_request)
complete_res = await client.get("/authorization/complete", follow_redirects=False)
assert complete_res.status_code in (302, 303), (
f"Expected redirect, got {complete_res.status_code}: {complete_res.text}"
)
location = complete_res.headers["location"]
parsed = urlparse(location)
params = parse_qs(parsed.query)
assert "code" in params, f"No code in redirect: {location}"
return params["code"][0]
async def test_authorization_complete_redirects_with_code(client: AsyncClient) -> None:
code = await _get_authorization_code(client)
assert len(code) > 0
async def test_token_endpoint_exchanges_code(client: AsyncClient) -> None:
code = await _get_authorization_code(client)
client_secret = "test-secret-0123456789abcdef"
auth_header = b64encode(f"test-rp:{client_secret}".encode()).decode()
token_res = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": "http://localhost:9000/callback",
},
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
},
)
assert token_res.status_code == 200, f"Token endpoint failed: {token_res.text}"
data = token_res.json()
assert "access_token" in data
assert "id_token" in data
assert data["token_type"].lower() == "bearer"
async def test_token_endpoint_invalid_code_returns_error(client: AsyncClient) -> None:
_register_test_client(client)
client_secret = "test-secret-0123456789abcdef"
auth_header = b64encode(f"test-rp:{client_secret}".encode()).decode()
token_res = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": "invalid-code",
"redirect_uri": "http://localhost:9000/callback",
},
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
},
)
assert token_res.status_code == 400 or "error" in token_res.json()