diff --git a/tests/conftest.py b/tests/conftest.py index 63f47aa..3506c70 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +import re from collections.abc import AsyncIterator import pytest @@ -18,3 +19,15 @@ async def client(settings: Settings) -> AsyncIterator[AsyncClient]: transport = ASGITransport(app=app) async with app.router.lifespan_context(app), AsyncClient(transport=transport, base_url=settings.issuer) as ac: yield ac + + +async def get_csrf_token(client: AsyncClient) -> str: + """Get a CSRF token by visiting the login page. + + Returns the token string. The session cookie is automatically stored + in the client's cookie jar (httpx.AsyncClient persists cookies). + """ + resp = await client.get("/login") + match = re.search(r'name="csrf-token" content="([^"]+)"', resp.text) + assert match, "CSRF meta tag not found in page" + return match.group(1) diff --git a/tests/test_auth_routes/test_last_credential_guard.py b/tests/test_auth_routes/test_last_credential_guard.py index 9d9b557..33d2f4f 100644 --- a/tests/test_auth_routes/test_last_credential_guard.py +++ b/tests/test_auth_routes/test_last_credential_guard.py @@ -4,6 +4,7 @@ from datetime import UTC, datetime from argon2 import PasswordHasher from httpx import AsyncClient +from tests.conftest import get_csrf_token from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User, WebAuthnCredential @@ -20,10 +21,11 @@ async def _create_user_and_login(client: AsyncClient) -> str: svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) + token = await get_csrf_token(client) await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) return user.userid @@ -32,9 +34,10 @@ async def test_cannot_delete_last_password_credential(client: AsyncClient) -> No """User has only a password — cannot delete it.""" await _create_user_and_login(client) + token = await get_csrf_token(client) res = await client.delete( "/manage/credentials/password", - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert res.status_code == 200 assert 'role="alert"' in res.text @@ -57,9 +60,10 @@ async def test_cannot_delete_last_webauthn_credential(client: AsyncClient) -> No await cred_repo.delete_password(userid) cred_id_b64 = urlsafe_b64encode(b"cred1").decode().rstrip("=") + token = await get_csrf_token(client) res = await client.delete( f"/manage/credentials/webauthn/{cred_id_b64}", - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert res.status_code == 200 assert 'role="alert"' in res.text diff --git a/tests/test_auth_routes/test_manage_credentials_page.py b/tests/test_auth_routes/test_manage_credentials_page.py index 95297b7..42c0a84 100644 --- a/tests/test_auth_routes/test_manage_credentials_page.py +++ b/tests/test_auth_routes/test_manage_credentials_page.py @@ -3,6 +3,7 @@ from datetime import UTC, datetime from argon2 import PasswordHasher from httpx import AsyncClient +from tests.conftest import get_csrf_token from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User @@ -25,10 +26,11 @@ async def _login(client: AsyncClient, username: str = "alice", password: str = " if existing is None: await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash(password))) + token = await get_csrf_token(client) await client.post( "/login/password", data={"username": username, "password": password}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) diff --git a/tests/test_auth_routes/test_manage_password_credential.py b/tests/test_auth_routes/test_manage_password_credential.py index f8605f1..55b3047 100644 --- a/tests/test_auth_routes/test_manage_password_credential.py +++ b/tests/test_auth_routes/test_manage_password_credential.py @@ -3,6 +3,7 @@ from datetime import UTC, datetime from argon2 import PasswordHasher from httpx import AsyncClient +from tests.conftest import get_csrf_token from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User, WebAuthnCredential @@ -19,18 +20,21 @@ async def _create_user_and_login(client: AsyncClient) -> str: svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("old"))) + token = await get_csrf_token(client) await client.post( "/login/password", data={"username": "alice", "password": "old"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) return user.userid async def test_set_password_requires_session(client: AsyncClient) -> None: + token = await get_csrf_token(client) res = await client.post( "/manage/credentials/password", data={"password": "x", "confirm": "x"}, + headers={"X-CSRF-Token": token}, follow_redirects=False, ) assert res.status_code in (302, 303) @@ -39,10 +43,11 @@ async def test_set_password_requires_session(client: AsyncClient) -> None: async def test_set_password_mismatch_returns_error(client: AsyncClient) -> None: await _create_user_and_login(client) + token = await get_csrf_token(client) res = await client.post( "/manage/credentials/password", data={"password": "newpassword", "confirm": "different"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert res.status_code == 200 assert 'role="alert"' in res.text @@ -51,10 +56,11 @@ async def test_set_password_mismatch_returns_error(client: AsyncClient) -> None: async def test_set_password_too_short_returns_error(client: AsyncClient) -> None: await _create_user_and_login(client) + token = await get_csrf_token(client) res = await client.post( "/manage/credentials/password", data={"password": "short", "confirm": "short"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert res.status_code == 200 assert 'role="alert"' in res.text @@ -65,10 +71,11 @@ async def test_set_password_creates_or_replaces_password(client: AsyncClient) -> app = client._transport.app # type: ignore[union-attr] cred_repo = app.state.credential_repo + token = await get_csrf_token(client) res = await client.post( "/manage/credentials/password", data={"password": "newpassword123", "confirm": "newpassword123"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert res.status_code == 200 assert 'role="status"' in res.text or "Password" in res.text @@ -80,7 +87,12 @@ async def test_set_password_creates_or_replaces_password(client: AsyncClient) -> async def test_delete_password_requires_session(client: AsyncClient) -> None: - res = await client.delete("/manage/credentials/password", follow_redirects=False) + token = await get_csrf_token(client) + res = await client.delete( + "/manage/credentials/password", + headers={"X-CSRF-Token": token}, + follow_redirects=False, + ) assert res.status_code in (302, 303) @@ -93,9 +105,10 @@ async def test_delete_password_with_other_credential(client: AsyncClient) -> Non # Add a webauthn credential so password is not the last one await cred_repo.create_webauthn(WebAuthnCredential(user_id=userid, credential_id=b"cred1", public_key=b"key1")) + token = await get_csrf_token(client) res = await client.delete( "/manage/credentials/password", - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert res.status_code == 200 diff --git a/tests/test_auth_routes/test_manage_webauthn_credential.py b/tests/test_auth_routes/test_manage_webauthn_credential.py index 2e7dee3..09ddf3b 100644 --- a/tests/test_auth_routes/test_manage_webauthn_credential.py +++ b/tests/test_auth_routes/test_manage_webauthn_credential.py @@ -17,6 +17,7 @@ from fido2.webauthn import ( ) from httpx import AsyncClient +from tests.conftest import get_csrf_token from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User, WebAuthnCredential @@ -36,10 +37,11 @@ async def _create_user_and_login(client: AsyncClient) -> str: svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) + token = await get_csrf_token(client) await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) return user.userid @@ -70,14 +72,23 @@ def _build_registration_response( async def test_webauthn_begin_requires_session(client: AsyncClient) -> None: - res = await client.post("/manage/credentials/webauthn/begin", follow_redirects=False) + token = await get_csrf_token(client) + res = await client.post( + "/manage/credentials/webauthn/begin", + headers={"X-CSRF-Token": token}, + follow_redirects=False, + ) assert res.status_code in (302, 303, 401) async def test_webauthn_begin_returns_options(client: AsyncClient) -> None: await _create_user_and_login(client) - res = await client.post("/manage/credentials/webauthn/begin") + token = await get_csrf_token(client) + res = await client.post( + "/manage/credentials/webauthn/begin", + headers={"X-CSRF-Token": token}, + ) assert res.status_code == 200 data = res.json() assert "publicKey" in data @@ -122,9 +133,10 @@ async def test_delete_webauthn_credential(client: AsyncClient) -> None: cred_id_b64 = urlsafe_b64encode(b"cred1").decode().rstrip("=") + token = await get_csrf_token(client) res = await client.delete( f"/manage/credentials/webauthn/{cred_id_b64}", - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert res.status_code == 200 diff --git a/tests/test_auth_routes/test_password_login.py b/tests/test_auth_routes/test_password_login.py index 7d3f274..5a75116 100644 --- a/tests/test_auth_routes/test_password_login.py +++ b/tests/test_auth_routes/test_password_login.py @@ -3,15 +3,17 @@ from datetime import UTC, datetime from argon2 import PasswordHasher from httpx import AsyncClient +from tests.conftest import get_csrf_token from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User async def test_password_login_unknown_user_returns_error_fragment(client: AsyncClient) -> None: + token = await get_csrf_token(client) res = await client.post( "/login/password", data={"username": "nobody", "password": "wrong"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert res.status_code == 200 assert "Invalid username or password" in res.text @@ -29,10 +31,11 @@ async def test_password_login_wrong_password_returns_error_fragment(client: Asyn svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("correct"))) + token = await get_csrf_token(client) res = await client.post( "/login/password", data={"username": "alice", "password": "wrong"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert res.status_code == 200 assert "Invalid username or password" in res.text @@ -49,16 +52,18 @@ async def test_password_login_success_sets_session_and_hx_redirect(client: Async svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("correct"))) + token = await get_csrf_token(client) res = await client.post( "/login/password", data={"username": "alice", "password": "correct"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert res.status_code == 200 assert res.headers.get("HX-Redirect") == "/manage/credentials" async def test_logout_clears_session_and_redirects(client: AsyncClient) -> None: - res = await client.post("/logout", headers={"HX-Request": "true"}) + token = await get_csrf_token(client) + res = await client.post("/logout", headers={"HX-Request": "true", "X-CSRF-Token": token}) assert res.status_code == 200 assert res.headers.get("HX-Redirect") == "/login" diff --git a/tests/test_auth_routes/test_webauthn_login.py b/tests/test_auth_routes/test_webauthn_login.py index be616ac..9475e3c 100644 --- a/tests/test_auth_routes/test_webauthn_login.py +++ b/tests/test_auth_routes/test_webauthn_login.py @@ -6,6 +6,7 @@ from fido2.cose import ES256 from fido2.webauthn import Aaguid, AttestedCredentialData from httpx import AsyncClient +from tests.conftest import get_csrf_token from porchlight.models import User, WebAuthnCredential RP_ID = "localhost" @@ -66,9 +67,11 @@ async def test_webauthn_login_begin_has_user_verification_preferred(client: Asyn async def test_webauthn_login_complete_without_state_returns_400(client: AsyncClient) -> None: """Complete without prior begin should fail.""" + token = await get_csrf_token(client) res = await client.post( "/login/webauthn/complete", json={"id": "fake", "rawId": "fake", "type": "public-key", "response": {}}, + headers={"X-CSRF-Token": token}, ) assert res.status_code == 400 @@ -81,11 +84,13 @@ async def test_webauthn_login_complete_returns_json_redirect(client: AsyncClient res1 = await client.get("/login/webauthn/begin") assert res1.status_code == 200 + token = await get_csrf_token(client) # We can't easily complete the full assertion without browser interaction, # but we verify the endpoint returns 400 JSON (not HTML) for bad assertions res2 = await client.post( "/login/webauthn/complete", json={"id": "fake", "rawId": "fake", "type": "public-key", "response": {}}, + headers={"X-CSRF-Token": token}, ) # Should fail verification but not crash — returns error HTML for now assert res2.status_code in (200, 400) diff --git a/tests/test_oidc/test_consent_flow.py b/tests/test_oidc/test_consent_flow.py index c1a0993..bcf778f 100644 --- a/tests/test_oidc/test_consent_flow.py +++ b/tests/test_oidc/test_consent_flow.py @@ -5,6 +5,7 @@ from urllib.parse import parse_qs, urlparse from argon2 import PasswordHasher from httpx import AsyncClient +from tests.conftest import get_csrf_token from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User @@ -16,10 +17,11 @@ async def test_authorization_shows_consent_for_new_client(client: AsyncClient) - await _create_test_user(app) # Login + token = await get_csrf_token(client) await client.post( "/login/password", data={"username": "consentuser", "password": "testpass"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) # Authorization request @@ -58,9 +60,11 @@ async def test_consent_allow_redirects_with_code(client: AsyncClient) -> None: await _create_test_user(app) await _login_and_start_auth(client) + token = await get_csrf_token(client) res = await client.post( "/consent", data={"action": "allow", "scope": ["openid", "profile"]}, + headers={"X-CSRF-Token": token}, follow_redirects=False, ) assert res.status_code == 303 @@ -77,9 +81,11 @@ async def test_consent_deny_redirects_with_error(client: AsyncClient) -> None: await _create_test_user(app) await _login_and_start_auth(client) + token = await get_csrf_token(client) res = await client.post( "/consent", data={"action": "deny"}, + headers={"X-CSRF-Token": token}, follow_redirects=False, ) assert res.status_code == 303 @@ -97,9 +103,11 @@ async def test_saved_consent_skips_consent_screen(client: AsyncClient) -> None: # First flow: login, authorize, consent await _login_and_start_auth(client) + token = await get_csrf_token(client) await client.post( "/consent", data={"action": "allow", "scope": ["openid", "profile"]}, + headers={"X-CSRF-Token": token}, follow_redirects=False, ) @@ -130,9 +138,11 @@ async def test_new_scopes_reshows_consent(client: AsyncClient) -> None: # First flow: consent to openid only await _login_and_start_auth(client, scope="openid") + token = await get_csrf_token(client) await client.post( "/consent", data={"action": "allow", "scope": ["openid"]}, + headers={"X-CSRF-Token": token}, follow_redirects=False, ) @@ -158,10 +168,11 @@ async def test_manage_app_skips_consent(client: AsyncClient) -> None: settings = app.state.settings await _create_test_user(app) + token = await get_csrf_token(client) await client.post( "/login/password", data={"username": "consentuser", "password": "testpass"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) manage_cdb = app.state.oidc_server.context.cdb[settings.manage_client_id] @@ -193,9 +204,11 @@ async def test_partial_consent_filters_scopes(client: AsyncClient) -> None: # Request openid + profile + email, approve only openid + profile await _login_and_start_auth(client, scope="openid profile email") + token = await get_csrf_token(client) res = await client.post( "/consent", data={"action": "allow", "scope": ["openid", "profile"]}, + headers={"X-CSRF-Token": token}, follow_redirects=False, ) assert res.status_code == 303 @@ -250,10 +263,11 @@ async def _create_test_user(app) -> None: async def _login_and_start_auth(client: AsyncClient, scope: str = "openid profile") -> None: + token = await get_csrf_token(client) await client.post( "/login/password", data={"username": "consentuser", "password": "testpass"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) await client.get( "/authorization", diff --git a/tests/test_oidc/test_e2e_flow.py b/tests/test_oidc/test_e2e_flow.py index 9626a94..d2851c9 100644 --- a/tests/test_oidc/test_e2e_flow.py +++ b/tests/test_oidc/test_e2e_flow.py @@ -9,6 +9,7 @@ from cryptojwt.jwk.jwk import key_from_jwk_dict from cryptojwt.jws.jws import JWS from httpx import AsyncClient +from tests.conftest import get_csrf_token from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User @@ -76,10 +77,11 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None: assert "/login" in auth_res.headers["location"] # -- Step 2: Password login → HX-Redirect to /authorization/complete -- + token = await get_csrf_token(client) login_res = await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert login_res.status_code == 200 hx_redirect = login_res.headers.get("HX-Redirect", "") @@ -95,9 +97,11 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None: assert "/consent" in complete_res.headers["location"] # -- Step 3b: Approve consent → redirect to callback with code + state -- + token = await get_csrf_token(client) consent_res = await client.post( "/consent", data={"action": "allow", "scope": ["openid", "profile", "email"]}, + headers={"X-CSRF-Token": token}, follow_redirects=False, ) assert consent_res.status_code in (302, 303) diff --git a/tests/test_oidc/test_login_oidc_redirect.py b/tests/test_oidc/test_login_oidc_redirect.py index b37ade9..7e50832 100644 --- a/tests/test_oidc/test_login_oidc_redirect.py +++ b/tests/test_oidc/test_login_oidc_redirect.py @@ -4,6 +4,7 @@ from datetime import UTC, datetime from argon2 import PasswordHasher from httpx import AsyncClient +from tests.conftest import get_csrf_token from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User @@ -54,10 +55,11 @@ async def test_login_with_pending_oidc_redirects_to_authorization_complete(clien assert auth_res.status_code in (302, 303) # Step 2: Login via password + token = await get_csrf_token(client) login_res = await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert login_res.status_code == 200 redirect_target = login_res.headers.get("HX-Redirect", "") @@ -67,10 +69,11 @@ async def test_login_with_pending_oidc_redirects_to_authorization_complete(clien async def test_login_without_pending_oidc_redirects_to_manage(client: AsyncClient) -> None: await _create_user(client) + token = await get_csrf_token(client) login_res = await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) assert login_res.status_code == 200 redirect_target = login_res.headers.get("HX-Redirect", "") diff --git a/tests/test_oidc/test_token.py b/tests/test_oidc/test_token.py index 7d086ad..593409e 100644 --- a/tests/test_oidc/test_token.py +++ b/tests/test_oidc/test_token.py @@ -6,6 +6,7 @@ from urllib.parse import parse_qs, urlparse from argon2 import PasswordHasher from httpx import AsyncClient +from tests.conftest import get_csrf_token from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User @@ -47,10 +48,11 @@ async def _create_user_and_login(client: AsyncClient) -> str: svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) + token = await get_csrf_token(client) await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) return user.userid diff --git a/tests/test_oidc/test_userinfo.py b/tests/test_oidc/test_userinfo.py index 63f217b..40c2a3d 100644 --- a/tests/test_oidc/test_userinfo.py +++ b/tests/test_oidc/test_userinfo.py @@ -6,6 +6,7 @@ from urllib.parse import parse_qs, urlparse from argon2 import PasswordHasher from httpx import AsyncClient +from tests.conftest import get_csrf_token from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User @@ -49,10 +50,11 @@ async def _create_user_and_login(client: AsyncClient) -> str: svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) + token = await get_csrf_token(client) await client.post( "/login/password", data={"username": "alice", "password": "testpass"}, - headers={"HX-Request": "true"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, ) return user.userid