import secrets from datetime import UTC, datetime from urllib.parse import parse_qs, urlparse from argon2 import PasswordHasher from httpx import AsyncClient from tests.conftest import get_csrf_token from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User async def test_authorization_shows_consent_for_new_client(client: AsyncClient) -> None: """First-time authorization for an RP should redirect to /consent.""" app = client._transport.app # type: ignore[union-attr] _register_test_rp(app) await _create_test_user(app) # Login token = await get_csrf_token(client) await client.post( "/login/password", data={"username": "consentuser", "password": "testpass"}, headers={"HX-Request": "true", "X-CSRF-Token": token}, ) # Authorization request res = await client.get( "/authorization", params={ "response_type": "code", "client_id": "consent-rp", "redirect_uri": "http://localhost:9000/callback", "scope": "openid profile", "state": "teststate", }, follow_redirects=False, ) assert res.status_code == 303 assert "/consent" in res.headers["location"] async def test_consent_page_renders(client: AsyncClient) -> None: """GET /consent should render the consent form.""" app = client._transport.app # type: ignore[union-attr] _register_test_rp(app) await _create_test_user(app) await _login_and_start_auth(client) res = await client.get("/consent") assert res.status_code == 200 assert "consent-rp" in res.text assert "profile" in res.text.lower() async def test_consent_allow_redirects_with_code(client: AsyncClient) -> None: """Approving consent should complete the authorization flow.""" app = client._transport.app # type: ignore[union-attr] _register_test_rp(app) await _create_test_user(app) await _login_and_start_auth(client) token = await get_csrf_token(client) res = await client.post( "/consent", data={"action": "allow", "scope": ["openid", "profile"]}, headers={"X-CSRF-Token": token}, follow_redirects=False, ) assert res.status_code == 303 location = res.headers["location"] parsed = urlparse(location) params = parse_qs(parsed.query) assert "code" in params async def test_consent_deny_redirects_with_error(client: AsyncClient) -> None: """Denying consent should redirect with access_denied error.""" app = client._transport.app # type: ignore[union-attr] _register_test_rp(app) await _create_test_user(app) await _login_and_start_auth(client) token = await get_csrf_token(client) res = await client.post( "/consent", data={"action": "deny"}, headers={"X-CSRF-Token": token}, follow_redirects=False, ) assert res.status_code == 303 location = res.headers["location"] parsed = urlparse(location) params = parse_qs(parsed.query) assert params["error"] == ["access_denied"] async def test_saved_consent_skips_consent_screen(client: AsyncClient) -> None: """Second authorization with same scopes should skip consent.""" app = client._transport.app # type: ignore[union-attr] _register_test_rp(app) await _create_test_user(app) # First flow: login, authorize, consent await _login_and_start_auth(client) token = await get_csrf_token(client) await client.post( "/consent", data={"action": "allow", "scope": ["openid", "profile"]}, headers={"X-CSRF-Token": token}, follow_redirects=False, ) # Second flow: same scopes, should skip consent res = await client.get( "/authorization", params={ "response_type": "code", "client_id": "consent-rp", "redirect_uri": "http://localhost:9000/callback", "scope": "openid profile", "state": "teststate2", }, follow_redirects=False, ) assert res.status_code == 303 location = res.headers["location"] # Should redirect directly to callback, not /consent assert "callback" in location assert "code" in location async def test_new_scopes_reshows_consent(client: AsyncClient) -> None: """If RP requests new scopes, consent screen should reappear.""" app = client._transport.app # type: ignore[union-attr] _register_test_rp(app) await _create_test_user(app) # First flow: consent to openid only await _login_and_start_auth(client, scope="openid") token = await get_csrf_token(client) await client.post( "/consent", data={"action": "allow", "scope": ["openid"]}, headers={"X-CSRF-Token": token}, follow_redirects=False, ) # Second flow: request openid + profile (new scope) res = await client.get( "/authorization", params={ "response_type": "code", "client_id": "consent-rp", "redirect_uri": "http://localhost:9000/callback", "scope": "openid profile", "state": "teststate2", }, follow_redirects=False, ) assert res.status_code == 303 assert "/consent" in res.headers["location"] async def test_manage_app_skips_consent(client: AsyncClient) -> None: """The manage-app client should bypass consent entirely.""" app = client._transport.app # type: ignore[union-attr] settings = app.state.settings await _create_test_user(app) token = await get_csrf_token(client) await client.post( "/login/password", data={"username": "consentuser", "password": "testpass"}, headers={"HX-Request": "true", "X-CSRF-Token": token}, ) manage_cdb = app.state.oidc_server.context.cdb[settings.manage_client_id] redirect_uri = manage_cdb["redirect_uris"][0][0] res = await client.get( "/authorization", params={ "response_type": "code", "client_id": settings.manage_client_id, "redirect_uri": redirect_uri, "scope": "openid profile email", "state": "teststate", }, follow_redirects=False, ) assert res.status_code == 303 location = res.headers["location"] # Should redirect directly to callback, not /consent assert "code" in location assert "/consent" not in location async def test_partial_consent_filters_scopes(client: AsyncClient) -> None: """User can approve only some scopes (partial consent).""" app = client._transport.app # type: ignore[union-attr] _register_test_rp(app) await _create_test_user(app) # Request openid + profile + email, approve only openid + profile await _login_and_start_auth(client, scope="openid profile email") token = await get_csrf_token(client) res = await client.post( "/consent", data={"action": "allow", "scope": ["openid", "profile"]}, headers={"X-CSRF-Token": token}, follow_redirects=False, ) assert res.status_code == 303 location = res.headers["location"] assert "code" in location # Verify consent was saved with only the approved scopes consent_repo = app.state.consent_repo consent = await consent_repo.get_consent("lusab-consent", "consent-rp") assert consent is not None assert set(consent.scopes) == {"openid", "profile"} # -- Test helpers -- def _register_test_rp(app) -> None: oidc_server = app.state.oidc_server if "consent-rp" in oidc_server.context.cdb: return client_id = "consent-rp" client_secret = "consent-secret-0123456789abcdef" oidc_server.context.cdb[client_id] = { "client_id": client_id, "client_secret": client_secret, "redirect_uris": [("http://localhost:9000/callback", {})], "response_types_supported": ["code"], "token_endpoint_auth_method": "client_secret_basic", "scope": ["openid", "profile", "email"], "allowed_scopes": ["openid", "profile", "email"], "client_salt": secrets.token_hex(8), } oidc_server.keyjar.add_symmetric(client_id, client_secret) async def _create_test_user(app) -> None: user_repo = app.state.user_repo existing = await user_repo.get_by_username("consentuser") if existing: return user = User( userid="lusab-consent", username="consentuser", email="consent@example.com", created_at=datetime.now(UTC), updated_at=datetime.now(UTC), ) await user_repo.create(user) svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) cred_repo = app.state.credential_repo await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) async def _login_and_start_auth(client: AsyncClient, scope: str = "openid profile") -> None: token = await get_csrf_token(client) await client.post( "/login/password", data={"username": "consentuser", "password": "testpass"}, headers={"HX-Request": "true", "X-CSRF-Token": token}, ) await client.get( "/authorization", params={ "response_type": "code", "client_id": "consent-rp", "redirect_uri": "http://localhost:9000/callback", "scope": scope, "state": "teststate", }, follow_redirects=False, )