"""FastAPI web glue tying the browser to the hand-rolled OIDC steps. Routes: GET / home with the login button GET /login build the authorization URL and redirect to the OP GET /callback validate state, exchange code, verify ID token, fetch userinfo POST /refresh use the refresh token to get a new access token POST /logout clear the local RP session (see note below) Session model: a random session id lives in a signed cookie (itsdangerous). The actual per-session data (PKCE/state during login, then tokens) lives in an in-memory dict keyed by that id. Restarting the RP drops all sessions — fine for a reference, not for production. Logout note: porchlight exposes NO end_session_endpoint, so OIDC RP-initiated (global single-logout) is not possible. /logout therefore only clears this RP's local session; the user's session at the OP is untouched. """ from __future__ import annotations import secrets from typing import Any from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, RedirectResponse, Response from itsdangerous import BadSignature, URLSafeSerializer import oidc_client as oc import templates from config import Config SESSION_COOKIE = "rp_session" def create_app() -> FastAPI: config = Config() signer = URLSafeSerializer(config.session_secret, salt="rp-session") # session_id -> arbitrary dict (login state, then tokens + claims). sessions: dict[str, dict[str, Any]] = {} app = FastAPI(title="Porchlight RP Reference") # ---- session helpers -------------------------------------------------- def load_session(request: Request) -> dict[str, Any]: raw = request.cookies.get(SESSION_COOKIE) if not raw: return {} try: sid = signer.loads(raw) except BadSignature: return {} return sessions.get(sid, {}) def attach_session(response: Response, request: Request, data: dict[str, Any]) -> None: """Persist `data` for this browser, minting a session id if needed.""" raw = request.cookies.get(SESSION_COOKIE) sid = None if raw: try: sid = signer.loads(raw) except BadSignature: sid = None if sid is None: sid = secrets.token_urlsafe(24) response.set_cookie( SESSION_COOKIE, signer.dumps(sid), httponly=True, samesite="lax", ) sessions[sid] = data def clear_session(request: Request) -> None: raw = request.cookies.get(SESSION_COOKIE) if not raw: return try: sid = signer.loads(raw) except BadSignature: return sessions.pop(sid, None) # ---- routes ----------------------------------------------------------- @app.get("/", response_class=HTMLResponse) async def home() -> str: return templates.home_page() @app.get("/login") async def login(request: Request) -> Response: discovery = await oc.fetch_discovery(config.issuer) url, state = oc.build_authorization_url( discovery, client_id=config.client_id, redirect_uri=config.redirect_uri, scope=config.scope, ) # Remember the per-login secrets (state / nonce / code_verifier). response = RedirectResponse(url, status_code=303) session = load_session(request) session["login"] = state attach_session(response, request, session) return response @app.get("/callback", response_class=HTMLResponse) async def callback(request: Request) -> Response: session = load_session(request) login = session.get("login") params = request.query_params # The OP may redirect back with an error instead of a code. if "error" in params: desc = params.get("error_description", "") return HTMLResponse(templates.error_page(f"{params['error']}: {desc}")) if not login: return HTMLResponse(templates.error_page("No login in progress.")) # CSRF: the state we get back must match the one we sent. if params.get("state") != login["state"]: return HTMLResponse(templates.error_page("state mismatch (possible CSRF).")) code = params.get("code") if not code: return HTMLResponse(templates.error_page("No authorization code returned.")) try: discovery = await oc.fetch_discovery(config.issuer) tokens = await oc.exchange_code( discovery, code=code, code_verifier=login["code_verifier"], redirect_uri=config.redirect_uri, client_id=config.client_id, client_secret=config.client_secret, ) id_claims = await oc.verify_id_token( tokens["id_token"], discovery=discovery, issuer=config.issuer, client_id=config.client_id, expected_nonce=login["nonce"], leeway=config.leeway, ) userinfo = await oc.fetch_userinfo( discovery, access_token=tokens["access_token"] ) except Exception as exc: # reference app: surface the failure plainly return HTMLResponse(templates.error_page(f"{type(exc).__name__}: {exc}")) # Login done: drop the transient login secrets, store the result. session.pop("login", None) session["tokens"] = tokens session["id_claims"] = id_claims session["userinfo"] = userinfo response = HTMLResponse( templates.result_page(id_claims=id_claims, userinfo=userinfo, tokens=tokens) ) attach_session(response, request, session) return response @app.post("/refresh", response_class=HTMLResponse) async def refresh(request: Request) -> Response: session = load_session(request) tokens = session.get("tokens") if not tokens or "refresh_token" not in tokens: return HTMLResponse(templates.error_page("No refresh token available.")) try: discovery = await oc.fetch_discovery(config.issuer) new_tokens = await oc.refresh_tokens( discovery, refresh_token=tokens["refresh_token"], client_id=config.client_id, client_secret=config.client_secret, ) except Exception as exc: return HTMLResponse(templates.error_page(f"{type(exc).__name__}: {exc}")) # Merge: keep prior values (e.g. id_token) but overlay the refreshed ones. merged = {**tokens, **new_tokens} session["tokens"] = merged response = HTMLResponse( templates.result_page( id_claims=session.get("id_claims", {}), userinfo=session.get("userinfo", {}), tokens=merged, ) ) attach_session(response, request, session) return response @app.post("/logout") async def logout(request: Request) -> Response: # Local logout only — porchlight has no end_session_endpoint. clear_session(request) response = RedirectResponse("/", status_code=303) response.delete_cookie(SESSION_COOKIE) return response return app