porchlight/examples/rp-reference/app.py
Johan Lundberg 8e8c33a407
reference RP
2026-06-29 09:23:22 +02:00

204 lines
7.4 KiB
Python

"""FastAPI web glue tying the browser to the hand-rolled OIDC steps.
Routes:
GET / home with the login button
GET /login build the authorization URL and redirect to the OP
GET /callback validate state, exchange code, verify ID token, fetch userinfo
POST /refresh use the refresh token to get a new access token
POST /logout clear the local RP session (see note below)
Session model: a random session id lives in a signed cookie (itsdangerous). The
actual per-session data (PKCE/state during login, then tokens) lives in an
in-memory dict keyed by that id. Restarting the RP drops all sessions — fine for
a reference, not for production.
Logout note: porchlight exposes NO end_session_endpoint, so OIDC RP-initiated
(global single-logout) is not possible. /logout therefore only clears this RP's
local session; the user's session at the OP is untouched.
"""
from __future__ import annotations
import secrets
from typing import Any
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from itsdangerous import BadSignature, URLSafeSerializer
import oidc_client as oc
import templates
from config import Config
SESSION_COOKIE = "rp_session"
def create_app() -> FastAPI:
config = Config()
signer = URLSafeSerializer(config.session_secret, salt="rp-session")
# session_id -> arbitrary dict (login state, then tokens + claims).
sessions: dict[str, dict[str, Any]] = {}
app = FastAPI(title="Porchlight RP Reference")
# ---- session helpers --------------------------------------------------
def load_session(request: Request) -> dict[str, Any]:
raw = request.cookies.get(SESSION_COOKIE)
if not raw:
return {}
try:
sid = signer.loads(raw)
except BadSignature:
return {}
return sessions.get(sid, {})
def attach_session(response: Response, request: Request, data: dict[str, Any]) -> None:
"""Persist `data` for this browser, minting a session id if needed."""
raw = request.cookies.get(SESSION_COOKIE)
sid = None
if raw:
try:
sid = signer.loads(raw)
except BadSignature:
sid = None
if sid is None:
sid = secrets.token_urlsafe(24)
response.set_cookie(
SESSION_COOKIE,
signer.dumps(sid),
httponly=True,
samesite="lax",
)
sessions[sid] = data
def clear_session(request: Request) -> None:
raw = request.cookies.get(SESSION_COOKIE)
if not raw:
return
try:
sid = signer.loads(raw)
except BadSignature:
return
sessions.pop(sid, None)
# ---- routes -----------------------------------------------------------
@app.get("/", response_class=HTMLResponse)
async def home() -> str:
return templates.home_page()
@app.get("/login")
async def login(request: Request) -> Response:
discovery = await oc.fetch_discovery(config.issuer)
url, state = oc.build_authorization_url(
discovery,
client_id=config.client_id,
redirect_uri=config.redirect_uri,
scope=config.scope,
)
# Remember the per-login secrets (state / nonce / code_verifier).
response = RedirectResponse(url, status_code=303)
session = load_session(request)
session["login"] = state
attach_session(response, request, session)
return response
@app.get("/callback", response_class=HTMLResponse)
async def callback(request: Request) -> Response:
session = load_session(request)
login = session.get("login")
params = request.query_params
# The OP may redirect back with an error instead of a code.
if "error" in params:
desc = params.get("error_description", "")
return HTMLResponse(templates.error_page(f"{params['error']}: {desc}"))
if not login:
return HTMLResponse(templates.error_page("No login in progress."))
# CSRF: the state we get back must match the one we sent.
if params.get("state") != login["state"]:
return HTMLResponse(templates.error_page("state mismatch (possible CSRF)."))
code = params.get("code")
if not code:
return HTMLResponse(templates.error_page("No authorization code returned."))
try:
discovery = await oc.fetch_discovery(config.issuer)
tokens = await oc.exchange_code(
discovery,
code=code,
code_verifier=login["code_verifier"],
redirect_uri=config.redirect_uri,
client_id=config.client_id,
client_secret=config.client_secret,
)
id_claims = await oc.verify_id_token(
tokens["id_token"],
discovery=discovery,
issuer=config.issuer,
client_id=config.client_id,
expected_nonce=login["nonce"],
leeway=config.leeway,
)
userinfo = await oc.fetch_userinfo(
discovery, access_token=tokens["access_token"]
)
except Exception as exc: # reference app: surface the failure plainly
return HTMLResponse(templates.error_page(f"{type(exc).__name__}: {exc}"))
# Login done: drop the transient login secrets, store the result.
session.pop("login", None)
session["tokens"] = tokens
session["id_claims"] = id_claims
session["userinfo"] = userinfo
response = HTMLResponse(
templates.result_page(id_claims=id_claims, userinfo=userinfo, tokens=tokens)
)
attach_session(response, request, session)
return response
@app.post("/refresh", response_class=HTMLResponse)
async def refresh(request: Request) -> Response:
session = load_session(request)
tokens = session.get("tokens")
if not tokens or "refresh_token" not in tokens:
return HTMLResponse(templates.error_page("No refresh token available."))
try:
discovery = await oc.fetch_discovery(config.issuer)
new_tokens = await oc.refresh_tokens(
discovery,
refresh_token=tokens["refresh_token"],
client_id=config.client_id,
client_secret=config.client_secret,
)
except Exception as exc:
return HTMLResponse(templates.error_page(f"{type(exc).__name__}: {exc}"))
# Merge: keep prior values (e.g. id_token) but overlay the refreshed ones.
merged = {**tokens, **new_tokens}
session["tokens"] = merged
response = HTMLResponse(
templates.result_page(
id_claims=session.get("id_claims", {}),
userinfo=session.get("userinfo", {}),
tokens=merged,
)
)
attach_session(response, request, session)
return response
@app.post("/logout")
async def logout(request: Request) -> Response:
# Local logout only — porchlight has no end_session_endpoint.
clear_session(request)
response = RedirectResponse("/", status_code=303)
response.delete_cookie(SESSION_COOKIE)
return response
return app