From e4e7cd237ea7ad3d96af869b17c255c0376ab0f0 Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Mon, 16 Feb 2026 13:47:48 +0100 Subject: [PATCH] feat: add authorization complete and token endpoints --- src/fastapi_oidc_op/oidc/endpoints.py | 118 +++++++++++++++++++++- tests/test_oidc/test_token.py | 138 ++++++++++++++++++++++++++ 2 files changed, 254 insertions(+), 2 deletions(-) create mode 100644 tests/test_oidc/test_token.py diff --git a/src/fastapi_oidc_op/oidc/endpoints.py b/src/fastapi_oidc_op/oidc/endpoints.py index a068cbb..314e21a 100644 --- a/src/fastapi_oidc_op/oidc/endpoints.py +++ b/src/fastapi_oidc_op/oidc/endpoints.py @@ -3,9 +3,14 @@ from __future__ import annotations import json +from types import SimpleNamespace +from urllib.parse import urlencode from fastapi import APIRouter, Request, Response from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse +from idpyoidc.time_util import utc_time_sans_frac + +from fastapi_oidc_op.oidc.claims import PorchlightUserInfo, user_to_claims router = APIRouter(tags=["oidc"]) @@ -65,6 +70,33 @@ async def authorization(request: Request) -> Response: return RedirectResponse("/login", status_code=303) +@router.get("/authorization/complete") +async def authorization_complete(request: Request) -> Response: + """Resume OIDC authorization after login.""" + oidc_server = request.app.state.oidc_server + endpoint = oidc_server.get_endpoint("authorization") + + auth_request_params = request.session.pop("oidc_auth_request", None) + if auth_request_params is None: + return HTMLResponse("

Error

No pending authorization request

", status_code=400) + + userid = request.session.get("userid") + username = request.session.get("username") + if not userid or not username: + return RedirectResponse("/login", status_code=303) + + try: + parsed = endpoint.parse_request(auth_request_params) + except Exception as exc: + return HTMLResponse(f"

Error

{exc}

", status_code=400) + + if "error" in parsed: + error_desc = parsed.get("error_description", parsed["error"]) + return HTMLResponse(f"

Error

{error_desc}

", status_code=400) + + return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username) + + async def _complete_authorization( request: Request, oidc_server: object, @@ -73,5 +105,87 @@ async def _complete_authorization( userid: str, username: str, ) -> Response: - """Complete the authorization after authentication. Placeholder — implemented in Task 7.""" - return HTMLResponse("Authorization completion not yet implemented", status_code=501) + """Complete OIDC authorization after user authentication.""" + # Populate userinfo cache + user_repo = request.app.state.user_repo + user = await user_repo.get_by_userid(userid) + if user is None: + return HTMLResponse("

Error

User not found

", status_code=400) + + userinfo: PorchlightUserInfo = oidc_server.context.userinfo # type: ignore[union-attr] + claims = user_to_claims(user) + userinfo.set_user_claims(username, claims) + + # Create idpyoidc session — authn_method needs a kwargs dict + authn_method = SimpleNamespace(kwargs={}) + session_id = endpoint.create_session( # type: ignore[union-attr] + request=parsed, + user_id=username, + acr="urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport", + time_stamp=utc_time_sans_frac(), + authn_method=authn_method, + ) + + # Complete authorization (mints code, builds redirect) + result = endpoint.authz_part2(request=parsed, session_id=session_id) # type: ignore[union-attr] + + if "error" in result.get("response_args", {}): + response_args = result["response_args"] + error_desc = response_args.get("error_description", response_args["error"]) + return HTMLResponse(f"

Error

{error_desc}

", status_code=400) + + # Build redirect URL + response_args = result.get("response_args", {}) + return_uri = result.get("return_uri", "") + + if hasattr(response_args, "to_dict"): + params = response_args.to_dict() + elif isinstance(response_args, dict): + params = response_args + else: + params = dict(response_args) + + redirect_url = f"{return_uri}?{urlencode(params)}" + return RedirectResponse(redirect_url, status_code=303) + + +@router.post("/token") +async def token_endpoint(request: Request) -> JSONResponse: + """OIDC Token endpoint.""" + oidc_server = request.app.state.oidc_server + endpoint = oidc_server.get_endpoint("token") + + body = await request.body() + body_str = body.decode("utf-8") + + http_info = { + "headers": dict(request.headers), + "url": str(request.url), + } + + try: + parsed = endpoint.parse_request(body_str, http_info=http_info) + except Exception as exc: + return JSONResponse({"error": "invalid_request", "error_description": str(exc)}, status_code=400) + + if isinstance(parsed, dict) and "error" in parsed: + return JSONResponse(parsed, status_code=400) + elif hasattr(parsed, "to_dict") and "error" in parsed: + return JSONResponse(parsed.to_dict(), status_code=400) + + result = endpoint.process_request(parsed) + + if hasattr(result, "to_dict") and "error" in result: + return JSONResponse(result.to_dict(), status_code=400) + elif isinstance(result, dict) and "error" in result: + return JSONResponse(result, status_code=400) + + resp_info = endpoint.do_response(response_args=result.get("response_args"), request=parsed) + + response_data = resp_info["response"] + if isinstance(response_data, str): + response_data = json.loads(response_data) + elif hasattr(response_data, "to_dict"): + response_data = response_data.to_dict() + + return JSONResponse(response_data) diff --git a/tests/test_oidc/test_token.py b/tests/test_oidc/test_token.py new file mode 100644 index 0000000..c66be66 --- /dev/null +++ b/tests/test_oidc/test_token.py @@ -0,0 +1,138 @@ +import secrets +from base64 import b64encode +from datetime import UTC, datetime +from urllib.parse import parse_qs, urlparse + +from argon2 import PasswordHasher +from httpx import AsyncClient + +from fastapi_oidc_op.authn.password import PasswordService +from fastapi_oidc_op.models import PasswordCredential, User + + +def _register_test_client(client: AsyncClient) -> str: + app = client._transport.app # type: ignore[union-attr] + oidc_server = app.state.oidc_server + client_secret = "test-secret-0123456789abcdef" + oidc_server.context.cdb["test-rp"] = { + "client_id": "test-rp", + "client_secret": client_secret, + "redirect_uris": [("http://localhost:9000/callback", {})], + "response_types_supported": ["code"], + "token_endpoint_auth_method": "client_secret_basic", + "scope": ["openid", "profile", "email"], + "allowed_scopes": ["openid", "profile", "email"], + "client_salt": secrets.token_hex(8), + } + oidc_server.keyjar.add_symmetric("test-rp", client_secret) + return client_secret + + +async def _create_user_and_login(client: AsyncClient) -> str: + """Create user, log in, return userid.""" + app = client._transport.app # type: ignore[union-attr] + user_repo = app.state.user_repo + cred_repo = app.state.credential_repo + + user = User( + userid="lusab-bansen", + username="alice", + email="alice@example.com", + email_verified=True, + created_at=datetime.now(UTC), + updated_at=datetime.now(UTC), + ) + await user_repo.create(user) + + svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) + await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) + + await client.post( + "/login/password", + data={"username": "alice", "password": "testpass"}, + headers={"HX-Request": "true"}, + ) + return user.userid + + +async def _get_authorization_code(client: AsyncClient) -> str: + """Run full auth flow and extract the authorization code.""" + _register_test_client(client) + + # Start authorization (unauthenticated — stores in session) + await client.get( + "/authorization", + params={ + "response_type": "code", + "client_id": "test-rp", + "redirect_uri": "http://localhost:9000/callback", + "scope": "openid profile email", + "state": "test-state", + "nonce": "test-nonce", + }, + follow_redirects=False, + ) + + # Create user and log in + await _create_user_and_login(client) + + # Complete authorization (now authenticated, session has oidc_auth_request) + complete_res = await client.get("/authorization/complete", follow_redirects=False) + assert complete_res.status_code in (302, 303), ( + f"Expected redirect, got {complete_res.status_code}: {complete_res.text}" + ) + + location = complete_res.headers["location"] + parsed = urlparse(location) + params = parse_qs(parsed.query) + assert "code" in params, f"No code in redirect: {location}" + return params["code"][0] + + +async def test_authorization_complete_redirects_with_code(client: AsyncClient) -> None: + code = await _get_authorization_code(client) + assert len(code) > 0 + + +async def test_token_endpoint_exchanges_code(client: AsyncClient) -> None: + code = await _get_authorization_code(client) + client_secret = "test-secret-0123456789abcdef" + + auth_header = b64encode(f"test-rp:{client_secret}".encode()).decode() + token_res = await client.post( + "/token", + data={ + "grant_type": "authorization_code", + "code": code, + "redirect_uri": "http://localhost:9000/callback", + }, + headers={ + "Authorization": f"Basic {auth_header}", + "Content-Type": "application/x-www-form-urlencoded", + }, + ) + assert token_res.status_code == 200, f"Token endpoint failed: {token_res.text}" + data = token_res.json() + assert "access_token" in data + assert "id_token" in data + assert data["token_type"].lower() == "bearer" + + +async def test_token_endpoint_invalid_code_returns_error(client: AsyncClient) -> None: + _register_test_client(client) + client_secret = "test-secret-0123456789abcdef" + + auth_header = b64encode(f"test-rp:{client_secret}".encode()).decode() + token_res = await client.post( + "/token", + data={ + "grant_type": "authorization_code", + "code": "invalid-code", + "redirect_uri": "http://localhost:9000/callback", + }, + headers={ + "Authorization": f"Basic {auth_header}", + "Content-Type": "application/x-www-form-urlencoded", + }, + ) + assert token_res.status_code == 400 or "error" in token_res.json()