feat: add consent check to authorization flow

This commit is contained in:
Johan Lundberg 2026-02-19 09:47:59 +01:00
parent 9ccc6c885f
commit 1d8fd91f68
No known key found for this signature in database
GPG key ID: A6C152738D03C7D1
3 changed files with 384 additions and 2 deletions

View file

@ -19,6 +19,7 @@ from porchlight.oidc.endpoints import router as oidc_router
from porchlight.oidc.provider import create_oidc_server from porchlight.oidc.provider import create_oidc_server
from porchlight.store.sqlite.db import open_db from porchlight.store.sqlite.db import open_db
from porchlight.store.sqlite.repositories import ( from porchlight.store.sqlite.repositories import (
SQLiteConsentRepository,
SQLiteCredentialRepository, SQLiteCredentialRepository,
SQLiteMagicLinkRepository, SQLiteMagicLinkRepository,
SQLiteUserRepository, SQLiteUserRepository,
@ -36,6 +37,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
app.state.user_repo = SQLiteUserRepository(db) app.state.user_repo = SQLiteUserRepository(db)
app.state.credential_repo = SQLiteCredentialRepository(db) app.state.credential_repo = SQLiteCredentialRepository(db)
app.state.magic_link_repo = SQLiteMagicLinkRepository(db) app.state.magic_link_repo = SQLiteMagicLinkRepository(db)
app.state.consent_repo = SQLiteConsentRepository(db)
# Auth services # Auth services
app.state.password_service = PasswordService() app.state.password_service = PasswordService()

View file

@ -14,6 +14,13 @@ from porchlight.oidc.claims import PorchlightUserInfo, user_to_claims
router = APIRouter(tags=["oidc"]) router = APIRouter(tags=["oidc"])
SCOPE_DESCRIPTIONS: dict[str, str] = {
"openid": "Sign you in (required)",
"profile": "Your name and profile information",
"email": "Your email address",
"phone": "Your phone number",
}
@router.get("/.well-known/openid-configuration") @router.get("/.well-known/openid-configuration")
async def provider_configuration(request: Request) -> JSONResponse: async def provider_configuration(request: Request) -> JSONResponse:
@ -63,7 +70,7 @@ async def authorization(request: Request) -> Response:
username = request.session.get("username") username = request.session.get("username")
if userid and username: if userid and username:
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username) return await _check_consent_or_complete(request, oidc_server, endpoint, parsed, userid, username, query_params)
# Not authenticated — store and redirect to login # Not authenticated — store and redirect to login
request.session["oidc_auth_request"] = query_params request.session["oidc_auth_request"] = query_params
@ -94,7 +101,40 @@ async def authorization_complete(request: Request) -> Response:
error_desc = parsed.get("error_description", parsed["error"]) error_desc = parsed.get("error_description", parsed["error"])
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400) return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username) return await _check_consent_or_complete(
request, oidc_server, endpoint, parsed, userid, username, auth_request_params
)
async def _check_consent_or_complete(
request: Request,
oidc_server: object,
endpoint: object,
parsed: object,
userid: str,
username: str,
auth_params: dict,
) -> Response:
"""Check if consent is needed; if so redirect to /consent, otherwise complete."""
settings = request.app.state.settings
client_id = auth_params.get("client_id", "")
# Manage-app bypasses consent
if client_id == settings.manage_client_id:
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username)
# Check stored consent
consent_repo = request.app.state.consent_repo
requested_scopes = auth_params.get("scope", "openid").split()
stored_consent = await consent_repo.get_consent(userid, client_id)
if stored_consent and set(requested_scopes) <= set(stored_consent.scopes):
# All requested scopes already approved
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username)
# Consent needed — store auth state and redirect
request.session["consent_auth_request"] = auth_params
return RedirectResponse("/consent", status_code=303)
async def _complete_authorization( async def _complete_authorization(
@ -246,3 +286,75 @@ async def userinfo_endpoint(request: Request) -> JSONResponse:
response_data = response_data.to_dict() response_data = response_data.to_dict()
return JSONResponse(response_data) return JSONResponse(response_data)
@router.get("/consent")
async def consent_page(request: Request) -> Response:
"""Show the consent form."""
auth_params = request.session.get("consent_auth_request")
if auth_params is None:
return HTMLResponse("<h1>Error</h1><p>No pending consent request</p>", status_code=400)
userid = request.session.get("userid")
if not userid:
return RedirectResponse("/login", status_code=303)
client_id = auth_params.get("client_id", "")
requested_scopes = auth_params.get("scope", "openid").split()
scope_info = [
{"name": s, "description": SCOPE_DESCRIPTIONS.get(s, s), "required": s == "openid"} for s in requested_scopes
]
templates = request.app.state.templates
return templates.TemplateResponse(
request,
"consent.html",
{"client_id": client_id, "scopes": scope_info},
)
@router.post("/consent")
async def consent_submit(request: Request) -> Response:
"""Handle consent form submission."""
auth_params = request.session.pop("consent_auth_request", None)
if auth_params is None:
return HTMLResponse("<h1>Error</h1><p>No pending consent request</p>", status_code=400)
userid = request.session.get("userid")
username = request.session.get("username")
if not userid or not username:
return RedirectResponse("/login", status_code=303)
form = await request.form()
action = form.get("action")
client_id = auth_params.get("client_id", "")
redirect_uri = auth_params.get("redirect_uri", "")
state = auth_params.get("state", "")
if action == "deny":
params = urlencode({"error": "access_denied", "state": state})
return RedirectResponse(f"{redirect_uri}?{params}", status_code=303)
# Allow — collect approved scopes
approved_scopes = form.getlist("scope")
if "openid" not in approved_scopes:
approved_scopes = ["openid", *list(approved_scopes)]
# Save consent
consent_repo = request.app.state.consent_repo
await consent_repo.set_consent(userid, client_id, list(approved_scopes))
# Filter auth request scopes to only approved
auth_params["scope"] = " ".join(approved_scopes)
# Re-parse and complete
oidc_server = request.app.state.oidc_server
endpoint = oidc_server.get_endpoint("authorization")
try:
parsed = endpoint.parse_request(auth_params)
except Exception as exc:
return HTMLResponse(f"<h1>Error</h1><p>{exc}</p>", status_code=400)
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username)

View file

@ -0,0 +1,268 @@
import secrets
from datetime import UTC, datetime
from urllib.parse import parse_qs, urlparse
from argon2 import PasswordHasher
from httpx import AsyncClient
from porchlight.authn.password import PasswordService
from porchlight.models import PasswordCredential, User
async def test_authorization_shows_consent_for_new_client(client: AsyncClient) -> None:
"""First-time authorization for an RP should redirect to /consent."""
app = client._transport.app # type: ignore[union-attr]
_register_test_rp(app)
await _create_test_user(app)
# Login
await client.post(
"/login/password",
data={"username": "consentuser", "password": "testpass"},
headers={"HX-Request": "true"},
)
# Authorization request
res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": "consent-rp",
"redirect_uri": "http://localhost:9000/callback",
"scope": "openid profile",
"state": "teststate",
},
follow_redirects=False,
)
assert res.status_code == 303
assert "/consent" in res.headers["location"]
async def test_consent_page_renders(client: AsyncClient) -> None:
"""GET /consent should render the consent form."""
app = client._transport.app # type: ignore[union-attr]
_register_test_rp(app)
await _create_test_user(app)
await _login_and_start_auth(client)
res = await client.get("/consent")
assert res.status_code == 200
assert "consent-rp" in res.text
assert "profile" in res.text.lower()
async def test_consent_allow_redirects_with_code(client: AsyncClient) -> None:
"""Approving consent should complete the authorization flow."""
app = client._transport.app # type: ignore[union-attr]
_register_test_rp(app)
await _create_test_user(app)
await _login_and_start_auth(client)
res = await client.post(
"/consent",
data={"action": "allow", "scope": ["openid", "profile"]},
follow_redirects=False,
)
assert res.status_code == 303
location = res.headers["location"]
parsed = urlparse(location)
params = parse_qs(parsed.query)
assert "code" in params
async def test_consent_deny_redirects_with_error(client: AsyncClient) -> None:
"""Denying consent should redirect with access_denied error."""
app = client._transport.app # type: ignore[union-attr]
_register_test_rp(app)
await _create_test_user(app)
await _login_and_start_auth(client)
res = await client.post(
"/consent",
data={"action": "deny"},
follow_redirects=False,
)
assert res.status_code == 303
location = res.headers["location"]
parsed = urlparse(location)
params = parse_qs(parsed.query)
assert params["error"] == ["access_denied"]
async def test_saved_consent_skips_consent_screen(client: AsyncClient) -> None:
"""Second authorization with same scopes should skip consent."""
app = client._transport.app # type: ignore[union-attr]
_register_test_rp(app)
await _create_test_user(app)
# First flow: login, authorize, consent
await _login_and_start_auth(client)
await client.post(
"/consent",
data={"action": "allow", "scope": ["openid", "profile"]},
follow_redirects=False,
)
# Second flow: same scopes, should skip consent
res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": "consent-rp",
"redirect_uri": "http://localhost:9000/callback",
"scope": "openid profile",
"state": "teststate2",
},
follow_redirects=False,
)
assert res.status_code == 303
location = res.headers["location"]
# Should redirect directly to callback, not /consent
assert "callback" in location
assert "code" in location
async def test_new_scopes_reshows_consent(client: AsyncClient) -> None:
"""If RP requests new scopes, consent screen should reappear."""
app = client._transport.app # type: ignore[union-attr]
_register_test_rp(app)
await _create_test_user(app)
# First flow: consent to openid only
await _login_and_start_auth(client, scope="openid")
await client.post(
"/consent",
data={"action": "allow", "scope": ["openid"]},
follow_redirects=False,
)
# Second flow: request openid + profile (new scope)
res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": "consent-rp",
"redirect_uri": "http://localhost:9000/callback",
"scope": "openid profile",
"state": "teststate2",
},
follow_redirects=False,
)
assert res.status_code == 303
assert "/consent" in res.headers["location"]
async def test_manage_app_skips_consent(client: AsyncClient) -> None:
"""The manage-app client should bypass consent entirely."""
app = client._transport.app # type: ignore[union-attr]
settings = app.state.settings
await _create_test_user(app)
await client.post(
"/login/password",
data={"username": "consentuser", "password": "testpass"},
headers={"HX-Request": "true"},
)
manage_cdb = app.state.oidc_server.context.cdb[settings.manage_client_id]
redirect_uri = manage_cdb["redirect_uris"][0][0]
res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": settings.manage_client_id,
"redirect_uri": redirect_uri,
"scope": "openid profile email",
"state": "teststate",
},
follow_redirects=False,
)
assert res.status_code == 303
location = res.headers["location"]
# Should redirect directly to callback, not /consent
assert "code" in location
assert "/consent" not in location
async def test_partial_consent_filters_scopes(client: AsyncClient) -> None:
"""User can approve only some scopes (partial consent)."""
app = client._transport.app # type: ignore[union-attr]
_register_test_rp(app)
await _create_test_user(app)
# Request openid + profile + email, approve only openid + profile
await _login_and_start_auth(client, scope="openid profile email")
res = await client.post(
"/consent",
data={"action": "allow", "scope": ["openid", "profile"]},
follow_redirects=False,
)
assert res.status_code == 303
location = res.headers["location"]
assert "code" in location
# Verify consent was saved with only the approved scopes
consent_repo = app.state.consent_repo
consent = await consent_repo.get_consent("lusab-consent", "consent-rp")
assert consent is not None
assert set(consent.scopes) == {"openid", "profile"}
# -- Test helpers --
def _register_test_rp(app) -> None:
oidc_server = app.state.oidc_server
if "consent-rp" in oidc_server.context.cdb:
return
client_id = "consent-rp"
client_secret = "consent-secret-0123456789abcdef"
oidc_server.context.cdb[client_id] = {
"client_id": client_id,
"client_secret": client_secret,
"redirect_uris": [("http://localhost:9000/callback", {})],
"response_types_supported": ["code"],
"token_endpoint_auth_method": "client_secret_basic",
"scope": ["openid", "profile", "email"],
"allowed_scopes": ["openid", "profile", "email"],
"client_salt": secrets.token_hex(8),
}
oidc_server.keyjar.add_symmetric(client_id, client_secret)
async def _create_test_user(app) -> None:
user_repo = app.state.user_repo
existing = await user_repo.get_by_username("consentuser")
if existing:
return
user = User(
userid="lusab-consent",
username="consentuser",
email="consent@example.com",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
cred_repo = app.state.credential_repo
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
async def _login_and_start_auth(client: AsyncClient, scope: str = "openid profile") -> None:
await client.post(
"/login/password",
data={"username": "consentuser", "password": "testpass"},
headers={"HX-Request": "true"},
)
await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": "consent-rp",
"redirect_uri": "http://localhost:9000/callback",
"scope": scope,
"state": "teststate",
},
follow_redirects=False,
)