"""The hand-rolled OIDC logic. This module deliberately does NOT use an OIDC client library. Each protocol step is a small function so you can read exactly what an RP sends and checks. PyJWT is used only for the RS256 *signature* primitive and JWK parsing — every OIDC-level claim check (iss / aud / exp / nonce) is written out explicitly below, because that is the part worth seeing. Flow overview: discovery -> build_authorization_url -> (browser redirect to OP) -> exchange_code -> verify_id_token -> fetch_userinfo -> refresh_tokens References: - OpenID Connect Core 1.0, section 3.1 (Authorization Code Flow) - RFC 7636 (PKCE) """ from __future__ import annotations import base64 import hashlib import json import secrets import time from typing import Any import httpx import jwt from jwt.algorithms import RSAAlgorithm # -------------------------------------------------------------------------- # 0. PKCE helpers (RFC 7636) # -------------------------------------------------------------------------- def generate_pkce_pair() -> tuple[str, str]: """Return (code_verifier, code_challenge) for the S256 method. The verifier is a high-entropy random string we keep secret in our session. The challenge is its SHA-256, base64url-encoded, and is what we send to the OP in the authorization request. At the token endpoint we send the verifier; the OP re-hashes it and checks it matches the challenge it stored. """ code_verifier = _b64url(secrets.token_bytes(32)) digest = hashlib.sha256(code_verifier.encode("ascii")).digest() code_challenge = _b64url(digest) return code_verifier, code_challenge def _b64url(raw: bytes) -> str: """base64url without padding, per the JOSE/PKCE conventions.""" return base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii") # -------------------------------------------------------------------------- # 1. Discovery # -------------------------------------------------------------------------- async def fetch_discovery(issuer: str) -> dict[str, Any]: """Fetch the OP's provider metadata. The well-known URL is the issuer with a fixed suffix appended. The returned document tells us the real authorization / token / userinfo / jwks URLs, so we never hard-code endpoint paths. """ url = issuer.rstrip("/") + "/.well-known/openid-configuration" async with httpx.AsyncClient() as http: resp = await http.get(url) resp.raise_for_status() return resp.json() # -------------------------------------------------------------------------- # 2. Authorization request # -------------------------------------------------------------------------- def build_authorization_url( discovery: dict[str, Any], *, client_id: str, redirect_uri: str, scope: str, ) -> tuple[str, dict[str, str]]: """Build the URL we redirect the browser to, plus the per-request secrets. Returns (authorization_url, session_state) where session_state holds the values we must remember to validate the callback: the CSRF `state`, the replay-protection `nonce`, and the PKCE `code_verifier`. """ state = secrets.token_urlsafe(24) # CSRF protection for the redirect nonce = secrets.token_urlsafe(24) # binds the ID token to this request code_verifier, code_challenge = generate_pkce_pair() params = { "response_type": "code", "client_id": client_id, "redirect_uri": redirect_uri, "scope": scope, "state": state, "nonce": nonce, "code_challenge": code_challenge, "code_challenge_method": "S256", } # OIDC Core 11.1: requesting a refresh token via `offline_access` REQUIRES # prompt=consent, so the user explicitly approves long-lived access. The OP # (idpyoidc) rejects the request otherwise ("consent in prompt"). if "offline_access" in scope.split(): params["prompt"] = "consent" # httpx URL handles correct percent-encoding of each value: url = str(httpx.URL(discovery["authorization_endpoint"], params=params)) session_state = {"state": state, "nonce": nonce, "code_verifier": code_verifier} return url, session_state # -------------------------------------------------------------------------- # 3. Token exchange (authorization code -> tokens) # -------------------------------------------------------------------------- async def exchange_code( discovery: dict[str, Any], *, code: str, code_verifier: str, redirect_uri: str, client_id: str, client_secret: str, ) -> dict[str, Any]: """Swap the authorization `code` for tokens at the token endpoint. We authenticate with HTTP Basic (client_secret_basic): the client_id and client_secret go in the Authorization header, not the body. We also send the PKCE code_verifier so the OP can prove the same client that started the flow is finishing it. """ data = { "grant_type": "authorization_code", "code": code, "redirect_uri": redirect_uri, "code_verifier": code_verifier, # We authenticate with client_secret_basic (below), but we also include # client_id in the body. It is redundant for the code grant but required # by the OP for the refresh grant, so we send it consistently. "client_id": client_id, } async with httpx.AsyncClient() as http: resp = await http.post( discovery["token_endpoint"], data=data, auth=(client_id, client_secret), # client_secret_basic headers={"Accept": "application/json"}, ) resp.raise_for_status() return resp.json() # -------------------------------------------------------------------------- # 4. ID token verification # -------------------------------------------------------------------------- async def verify_id_token( id_token: str, *, discovery: dict[str, Any], issuer: str, client_id: str, expected_nonce: str, leeway: int, ) -> dict[str, Any]: """Verify the ID token signature, then check its claims by hand. PyJWT verifies the RS256 signature and the exp/iat timing for us (those need crypto / the current clock). Everything that is OIDC-specific — iss, aud, nonce — we check explicitly so the rules are visible. """ # (a) Read the unverified header to learn which key (kid) signed it. header = jwt.get_unverified_header(id_token) kid = header.get("kid") # (b) Fetch the OP's public keys and pick the matching one. async with httpx.AsyncClient() as http: resp = await http.get(discovery["jwks_uri"]) resp.raise_for_status() jwks = resp.json() jwk = next((k for k in jwks["keys"] if k.get("kid") == kid), None) if jwk is None: raise ValueError(f"no JWK in OP key set matches token kid={kid!r}") public_key = RSAAlgorithm.from_jwk(json.dumps(jwk)) # (c) Verify the signature + exp/iat. We turn off PyJWT's own iss/aud checks # so we can do them ourselves below. claims: dict[str, Any] = jwt.decode( id_token, key=public_key, algorithms=["RS256"], leeway=leeway, options={"verify_aud": False, "verify_iss": False, "require": ["exp", "iat"]}, ) # (d) Explicit OIDC claim checks — the heart of RP validation. if claims.get("iss") != issuer: raise ValueError(f"iss mismatch: {claims.get('iss')!r} != {issuer!r}") aud = claims.get("aud") audiences = aud if isinstance(aud, list) else [aud] if client_id not in audiences: raise ValueError(f"aud {audiences!r} does not contain client_id {client_id!r}") if claims.get("nonce") != expected_nonce: raise ValueError("nonce mismatch: ID token does not belong to this login") return claims # -------------------------------------------------------------------------- # 5. UserInfo # -------------------------------------------------------------------------- async def fetch_userinfo( discovery: dict[str, Any], *, access_token: str ) -> dict[str, Any]: """Call the UserInfo endpoint with the access token as a Bearer token.""" async with httpx.AsyncClient() as http: resp = await http.get( discovery["userinfo_endpoint"], headers={"Authorization": f"Bearer {access_token}"}, ) resp.raise_for_status() return resp.json() # -------------------------------------------------------------------------- # 6. Refresh # -------------------------------------------------------------------------- async def refresh_tokens( discovery: dict[str, Any], *, refresh_token: str, client_id: str, client_secret: str, ) -> dict[str, Any]: """Use the refresh token to get a fresh access token. Note (porchlight specific): the OP rotates the refresh token (you get a new one back) and does NOT re-mint an ID token on refresh — re-authentication is what issues ID tokens. So the response here typically has access_token and a new refresh_token, but no id_token. """ # client_id is required in the body for the refresh grant: the OP reads it # from the request message (it does not fall back to the Basic-auth client # for this grant), so omitting it causes a server error. data = { "grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": client_id, } async with httpx.AsyncClient() as http: resp = await http.post( discovery["token_endpoint"], data=data, auth=(client_id, client_secret), headers={"Accept": "application/json"}, ) resp.raise_for_status() return resp.json() def now() -> int: """Current unix time — handy for showing token age in the UI.""" return int(time.time())