204 lines
7.4 KiB
Python
204 lines
7.4 KiB
Python
"""FastAPI web glue tying the browser to the hand-rolled OIDC steps.
|
|
|
|
Routes:
|
|
GET / home with the login button
|
|
GET /login build the authorization URL and redirect to the OP
|
|
GET /callback validate state, exchange code, verify ID token, fetch userinfo
|
|
POST /refresh use the refresh token to get a new access token
|
|
POST /logout clear the local RP session (see note below)
|
|
|
|
Session model: a random session id lives in a signed cookie (itsdangerous). The
|
|
actual per-session data (PKCE/state during login, then tokens) lives in an
|
|
in-memory dict keyed by that id. Restarting the RP drops all sessions — fine for
|
|
a reference, not for production.
|
|
|
|
Logout note: porchlight exposes NO end_session_endpoint, so OIDC RP-initiated
|
|
(global single-logout) is not possible. /logout therefore only clears this RP's
|
|
local session; the user's session at the OP is untouched.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import secrets
|
|
from typing import Any
|
|
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, Response
|
|
from itsdangerous import BadSignature, URLSafeSerializer
|
|
|
|
import oidc_client as oc
|
|
import templates
|
|
from config import Config
|
|
|
|
SESSION_COOKIE = "rp_session"
|
|
|
|
|
|
def create_app() -> FastAPI:
|
|
config = Config()
|
|
signer = URLSafeSerializer(config.session_secret, salt="rp-session")
|
|
|
|
# session_id -> arbitrary dict (login state, then tokens + claims).
|
|
sessions: dict[str, dict[str, Any]] = {}
|
|
|
|
app = FastAPI(title="Porchlight RP Reference")
|
|
|
|
# ---- session helpers --------------------------------------------------
|
|
def load_session(request: Request) -> dict[str, Any]:
|
|
raw = request.cookies.get(SESSION_COOKIE)
|
|
if not raw:
|
|
return {}
|
|
try:
|
|
sid = signer.loads(raw)
|
|
except BadSignature:
|
|
return {}
|
|
return sessions.get(sid, {})
|
|
|
|
def attach_session(response: Response, request: Request, data: dict[str, Any]) -> None:
|
|
"""Persist `data` for this browser, minting a session id if needed."""
|
|
raw = request.cookies.get(SESSION_COOKIE)
|
|
sid = None
|
|
if raw:
|
|
try:
|
|
sid = signer.loads(raw)
|
|
except BadSignature:
|
|
sid = None
|
|
if sid is None:
|
|
sid = secrets.token_urlsafe(24)
|
|
response.set_cookie(
|
|
SESSION_COOKIE,
|
|
signer.dumps(sid),
|
|
httponly=True,
|
|
samesite="lax",
|
|
)
|
|
sessions[sid] = data
|
|
|
|
def clear_session(request: Request) -> None:
|
|
raw = request.cookies.get(SESSION_COOKIE)
|
|
if not raw:
|
|
return
|
|
try:
|
|
sid = signer.loads(raw)
|
|
except BadSignature:
|
|
return
|
|
sessions.pop(sid, None)
|
|
|
|
# ---- routes -----------------------------------------------------------
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def home() -> str:
|
|
return templates.home_page()
|
|
|
|
@app.get("/login")
|
|
async def login(request: Request) -> Response:
|
|
discovery = await oc.fetch_discovery(config.issuer)
|
|
url, state = oc.build_authorization_url(
|
|
discovery,
|
|
client_id=config.client_id,
|
|
redirect_uri=config.redirect_uri,
|
|
scope=config.scope,
|
|
)
|
|
# Remember the per-login secrets (state / nonce / code_verifier).
|
|
response = RedirectResponse(url, status_code=303)
|
|
session = load_session(request)
|
|
session["login"] = state
|
|
attach_session(response, request, session)
|
|
return response
|
|
|
|
@app.get("/callback", response_class=HTMLResponse)
|
|
async def callback(request: Request) -> Response:
|
|
session = load_session(request)
|
|
login = session.get("login")
|
|
params = request.query_params
|
|
|
|
# The OP may redirect back with an error instead of a code.
|
|
if "error" in params:
|
|
desc = params.get("error_description", "")
|
|
return HTMLResponse(templates.error_page(f"{params['error']}: {desc}"))
|
|
|
|
if not login:
|
|
return HTMLResponse(templates.error_page("No login in progress."))
|
|
|
|
# CSRF: the state we get back must match the one we sent.
|
|
if params.get("state") != login["state"]:
|
|
return HTMLResponse(templates.error_page("state mismatch (possible CSRF)."))
|
|
|
|
code = params.get("code")
|
|
if not code:
|
|
return HTMLResponse(templates.error_page("No authorization code returned."))
|
|
|
|
try:
|
|
discovery = await oc.fetch_discovery(config.issuer)
|
|
tokens = await oc.exchange_code(
|
|
discovery,
|
|
code=code,
|
|
code_verifier=login["code_verifier"],
|
|
redirect_uri=config.redirect_uri,
|
|
client_id=config.client_id,
|
|
client_secret=config.client_secret,
|
|
)
|
|
id_claims = await oc.verify_id_token(
|
|
tokens["id_token"],
|
|
discovery=discovery,
|
|
issuer=config.issuer,
|
|
client_id=config.client_id,
|
|
expected_nonce=login["nonce"],
|
|
leeway=config.leeway,
|
|
)
|
|
userinfo = await oc.fetch_userinfo(
|
|
discovery, access_token=tokens["access_token"]
|
|
)
|
|
except Exception as exc: # reference app: surface the failure plainly
|
|
return HTMLResponse(templates.error_page(f"{type(exc).__name__}: {exc}"))
|
|
|
|
# Login done: drop the transient login secrets, store the result.
|
|
session.pop("login", None)
|
|
session["tokens"] = tokens
|
|
session["id_claims"] = id_claims
|
|
session["userinfo"] = userinfo
|
|
|
|
response = HTMLResponse(
|
|
templates.result_page(id_claims=id_claims, userinfo=userinfo, tokens=tokens)
|
|
)
|
|
attach_session(response, request, session)
|
|
return response
|
|
|
|
@app.post("/refresh", response_class=HTMLResponse)
|
|
async def refresh(request: Request) -> Response:
|
|
session = load_session(request)
|
|
tokens = session.get("tokens")
|
|
if not tokens or "refresh_token" not in tokens:
|
|
return HTMLResponse(templates.error_page("No refresh token available."))
|
|
|
|
try:
|
|
discovery = await oc.fetch_discovery(config.issuer)
|
|
new_tokens = await oc.refresh_tokens(
|
|
discovery,
|
|
refresh_token=tokens["refresh_token"],
|
|
client_id=config.client_id,
|
|
client_secret=config.client_secret,
|
|
)
|
|
except Exception as exc:
|
|
return HTMLResponse(templates.error_page(f"{type(exc).__name__}: {exc}"))
|
|
|
|
# Merge: keep prior values (e.g. id_token) but overlay the refreshed ones.
|
|
merged = {**tokens, **new_tokens}
|
|
session["tokens"] = merged
|
|
|
|
response = HTMLResponse(
|
|
templates.result_page(
|
|
id_claims=session.get("id_claims", {}),
|
|
userinfo=session.get("userinfo", {}),
|
|
tokens=merged,
|
|
)
|
|
)
|
|
attach_session(response, request, session)
|
|
return response
|
|
|
|
@app.post("/logout")
|
|
async def logout(request: Request) -> Response:
|
|
# Local logout only — porchlight has no end_session_endpoint.
|
|
clear_session(request)
|
|
response = RedirectResponse("/", status_code=303)
|
|
response.delete_cookie(SESSION_COOKIE)
|
|
return response
|
|
|
|
return app
|