import secrets from base64 import b64encode from datetime import UTC, datetime from urllib.parse import parse_qs, urlparse from argon2 import PasswordHasher from httpx import AsyncClient from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User from tests.conftest import get_csrf_token def _register_test_client(client: AsyncClient) -> str: app = client._transport.app # type: ignore[union-attr] oidc_server = app.state.oidc_server client_secret = "test-secret-0123456789abcdef" oidc_server.context.cdb["test-rp"] = { "client_id": "test-rp", "client_secret": client_secret, "redirect_uris": [("http://localhost:9000/callback", {})], "response_types_supported": ["code"], "token_endpoint_auth_method": "client_secret_basic", "scope": ["openid", "profile", "email"], "allowed_scopes": ["openid", "profile", "email"], "client_salt": secrets.token_hex(8), } oidc_server.keyjar.add_symmetric("test-rp", client_secret) return client_secret async def _create_user_and_login(client: AsyncClient) -> str: """Create user, log in, return userid.""" app = client._transport.app # type: ignore[union-attr] user_repo = app.state.user_repo cred_repo = app.state.credential_repo user = User( userid="lusab-bansen", username="alice", email="alice@example.com", email_verified=True, given_name="Alice", family_name="Wonderland", created_at=datetime.now(UTC), updated_at=datetime.now(UTC), ) await user_repo.create(user) svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) token = await get_csrf_token(client) await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, headers={"HX-Request": "true", "X-CSRF-Token": token}, ) return user.userid async def _get_access_token(client: AsyncClient) -> str: """Run full auth + token flow and return the access_token.""" client_secret = _register_test_client(client) app = client._transport.app # type: ignore[union-attr] # Start authorization (unauthenticated — stores in session) await client.get( "/authorization", params={ "response_type": "code", "client_id": "test-rp", "redirect_uri": "http://localhost:9000/callback", "scope": "openid profile email", "state": "test-state", "nonce": "test-nonce", }, follow_redirects=False, ) # Create user and log in userid = await _create_user_and_login(client) # Pre-seed consent so the consent screen is skipped consent_repo = app.state.consent_repo await consent_repo.set_consent(userid, "test-rp", ["openid", "profile", "email"]) # Complete authorization (now authenticated, consent exists → redirects to callback) complete_res = await client.get("/authorization/complete", follow_redirects=False) assert complete_res.status_code in (302, 303), ( f"Expected redirect, got {complete_res.status_code}: {complete_res.text}" ) location = complete_res.headers["location"] parsed = urlparse(location) params = parse_qs(parsed.query) assert "code" in params, f"No code in redirect: {location}" code = params["code"][0] # Exchange code for tokens auth_header = b64encode(f"test-rp:{client_secret}".encode()).decode() token_res = await client.post( "/token", data={ "grant_type": "authorization_code", "code": code, "redirect_uri": "http://localhost:9000/callback", }, headers={ "Authorization": f"Basic {auth_header}", "Content-Type": "application/x-www-form-urlencoded", }, ) assert token_res.status_code == 200, f"Token endpoint failed: {token_res.text}" data = token_res.json() assert "access_token" in data, f"No access_token in response: {data}" return data["access_token"] async def test_userinfo_returns_claims(client: AsyncClient) -> None: access_token = await _get_access_token(client) res = await client.get( "/userinfo", headers={"Authorization": f"Bearer {access_token}"}, ) assert res.status_code == 200, f"UserInfo failed: {res.status_code} {res.text}" data = res.json() assert "sub" in data assert len(data["sub"]) > 0 async def test_userinfo_includes_email_claims(client: AsyncClient) -> None: access_token = await _get_access_token(client) res = await client.get( "/userinfo", headers={"Authorization": f"Bearer {access_token}"}, ) assert res.status_code == 200, f"UserInfo failed: {res.status_code} {res.text}" data = res.json() assert data["email"] == "alice@example.com" assert data["email_verified"] is True async def test_userinfo_invalid_token_returns_error(client: AsyncClient) -> None: res = await client.get( "/userinfo", headers={"Authorization": "Bearer invalid-token"}, ) assert res.status_code == 401