fix: add CSRF token handling to admin tests after merge

The CSRF middleware added to main after the admin-pages branch was
created caused all admin test POSTs/DELETEs to be rejected. Add
get_csrf_token() calls and X-CSRF-Token headers to login helpers and
all mutation requests, matching the pattern used by other tests.
This commit is contained in:
Johan Lundberg 2026-02-19 15:02:51 +01:00
parent 33a61ecc2a
commit befcef9395
No known key found for this signature in database
GPG key ID: A6C152738D03C7D1
2 changed files with 42 additions and 10 deletions

View file

@ -6,6 +6,7 @@ from httpx import AsyncClient
from porchlight.authn.password import PasswordService from porchlight.authn.password import PasswordService
from porchlight.models import PasswordCredential, User from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
async def _login( async def _login(
@ -32,10 +33,11 @@ async def _login(
if existing is None: if existing is None:
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash(password))) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash(password)))
token = await get_csrf_token(client)
await client.post( await client.post(
"/login/password", "/login/password",
data={"username": username, "password": password}, data={"username": username, "password": password},
headers={"HX-Request": "true"}, headers={"HX-Request": "true", "X-CSRF-Token": token},
) )

View file

@ -6,6 +6,7 @@ from httpx import AsyncClient
from porchlight.authn.password import PasswordService from porchlight.authn.password import PasswordService
from porchlight.models import PasswordCredential, User, WebAuthnCredential from porchlight.models import PasswordCredential, User, WebAuthnCredential
from tests.conftest import get_csrf_token
async def _login( async def _login(
@ -37,10 +38,11 @@ async def _login(
if existing is None: if existing is None:
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash(password))) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash(password)))
token = await get_csrf_token(client)
await client.post( await client.post(
"/login/password", "/login/password",
data={"username": username, "password": password}, data={"username": username, "password": password},
headers={"HX-Request": "true"}, headers={"HX-Request": "true", "X-CSRF-Token": token},
) )
@ -143,6 +145,7 @@ async def test_user_detail_404_for_nonexistent(client: AsyncClient) -> None:
async def test_update_profile(client: AsyncClient) -> None: async def test_update_profile(client: AsyncClient) -> None:
await _login(client) await _login(client)
target = await _create_target_user(client, username="bob") target = await _create_target_user(client, username="bob")
token = await get_csrf_token(client)
response = await client.post( response = await client.post(
f"/admin/users/{target.userid}/profile", f"/admin/users/{target.userid}/profile",
data={ data={
@ -154,6 +157,7 @@ async def test_update_profile(client: AsyncClient) -> None:
"picture": "https://example.com/bob.jpg", "picture": "https://example.com/bob.jpg",
"locale": "en-US", "locale": "en-US",
}, },
headers={"X-CSRF-Token": token},
) )
assert response.status_code == 200 assert response.status_code == 200
assert "Profile updated" in response.text assert "Profile updated" in response.text
@ -172,9 +176,11 @@ async def test_update_profile(client: AsyncClient) -> None:
async def test_update_profile_invalid_email(client: AsyncClient) -> None: async def test_update_profile_invalid_email(client: AsyncClient) -> None:
await _login(client) await _login(client)
target = await _create_target_user(client, username="bob") target = await _create_target_user(client, username="bob")
token = await get_csrf_token(client)
response = await client.post( response = await client.post(
f"/admin/users/{target.userid}/profile", f"/admin/users/{target.userid}/profile",
data={"email": "not-an-email"}, data={"email": "not-an-email"},
headers={"X-CSRF-Token": token},
) )
assert response.status_code == 200 assert response.status_code == 200
assert "Invalid email" in response.text assert "Invalid email" in response.text
@ -184,9 +190,11 @@ async def test_update_profile_invalid_email(client: AsyncClient) -> None:
async def test_update_profile_invalid_picture_url(client: AsyncClient) -> None: async def test_update_profile_invalid_picture_url(client: AsyncClient) -> None:
await _login(client) await _login(client)
target = await _create_target_user(client, username="bob") target = await _create_target_user(client, username="bob")
token = await get_csrf_token(client)
response = await client.post( response = await client.post(
f"/admin/users/{target.userid}/profile", f"/admin/users/{target.userid}/profile",
data={"picture": "not-a-url"}, data={"picture": "not-a-url"},
headers={"X-CSRF-Token": token},
) )
assert response.status_code == 200 assert response.status_code == 200
assert "Picture URL must be a valid HTTP or HTTPS URL" in response.text assert "Picture URL must be a valid HTTP or HTTPS URL" in response.text
@ -195,9 +203,11 @@ async def test_update_profile_invalid_picture_url(client: AsyncClient) -> None:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_update_profile_404_for_nonexistent(client: AsyncClient) -> None: async def test_update_profile_404_for_nonexistent(client: AsyncClient) -> None:
await _login(client) await _login(client)
token = await get_csrf_token(client)
response = await client.post( response = await client.post(
"/admin/users/nonexistent-id/profile", "/admin/users/nonexistent-id/profile",
data={"given_name": "Ghost"}, data={"given_name": "Ghost"},
headers={"X-CSRF-Token": token},
) )
assert response.status_code == 404 assert response.status_code == 404
assert "not found" in response.text.lower() assert "not found" in response.text.lower()
@ -210,9 +220,11 @@ async def test_update_profile_404_for_nonexistent(client: AsyncClient) -> None:
async def test_update_groups(client: AsyncClient) -> None: async def test_update_groups(client: AsyncClient) -> None:
await _login(client) await _login(client)
target = await _create_target_user(client, username="bob") target = await _create_target_user(client, username="bob")
token = await get_csrf_token(client)
response = await client.post( response = await client.post(
f"/admin/users/{target.userid}/groups", f"/admin/users/{target.userid}/groups",
data={"groups": "admin, users, editors"}, data={"groups": "admin, users, editors"},
headers={"X-CSRF-Token": token},
) )
assert response.status_code == 200 assert response.status_code == 200
assert "Groups updated" in response.text assert "Groups updated" in response.text
@ -231,7 +243,8 @@ async def test_update_groups(client: AsyncClient) -> None:
async def test_activate_user(client: AsyncClient) -> None: async def test_activate_user(client: AsyncClient) -> None:
await _login(client) await _login(client)
target = await _create_target_user(client, username="bob", active=False) target = await _create_target_user(client, username="bob", active=False)
response = await client.post(f"/admin/users/{target.userid}/activate") token = await get_csrf_token(client)
response = await client.post(f"/admin/users/{target.userid}/activate", headers={"X-CSRF-Token": token})
assert response.status_code == 200 assert response.status_code == 200
assert "User activated" in response.text assert "User activated" in response.text
@ -245,7 +258,8 @@ async def test_activate_user(client: AsyncClient) -> None:
async def test_deactivate_user(client: AsyncClient) -> None: async def test_deactivate_user(client: AsyncClient) -> None:
await _login(client) await _login(client)
target = await _create_target_user(client, username="bob", active=True) target = await _create_target_user(client, username="bob", active=True)
response = await client.post(f"/admin/users/{target.userid}/deactivate") token = await get_csrf_token(client)
response = await client.post(f"/admin/users/{target.userid}/deactivate", headers={"X-CSRF-Token": token})
assert response.status_code == 200 assert response.status_code == 200
assert "User deactivated" in response.text assert "User deactivated" in response.text
@ -262,7 +276,8 @@ async def test_deactivate_user(client: AsyncClient) -> None:
async def test_delete_user(client: AsyncClient) -> None: async def test_delete_user(client: AsyncClient) -> None:
await _login(client) await _login(client)
target = await _create_target_user(client, username="bob") target = await _create_target_user(client, username="bob")
response = await client.request("DELETE", f"/admin/users/{target.userid}") token = await get_csrf_token(client)
response = await client.request("DELETE", f"/admin/users/{target.userid}", headers={"X-CSRF-Token": token})
assert response.status_code == 200 assert response.status_code == 200
assert "User deleted" in response.text assert "User deleted" in response.text
assert response.headers.get("hx-redirect") == "/admin/users" assert response.headers.get("hx-redirect") == "/admin/users"
@ -277,7 +292,8 @@ async def test_delete_user(client: AsyncClient) -> None:
async def test_delete_user_self_prevention(client: AsyncClient) -> None: async def test_delete_user_self_prevention(client: AsyncClient) -> None:
await _login(client, userid="admin-user-01") await _login(client, userid="admin-user-01")
# Try to delete ourselves # Try to delete ourselves
response = await client.request("DELETE", "/admin/users/admin-user-01") token = await get_csrf_token(client)
response = await client.request("DELETE", "/admin/users/admin-user-01", headers={"X-CSRF-Token": token})
assert response.status_code == 200 assert response.status_code == 200
assert "Cannot delete your own account" in response.text assert "Cannot delete your own account" in response.text
@ -301,7 +317,10 @@ async def test_delete_password_credential(client: AsyncClient) -> None:
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("bobpass"))) await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("bobpass")))
response = await client.request("DELETE", f"/admin/users/{target.userid}/credentials/password") token = await get_csrf_token(client)
response = await client.request(
"DELETE", f"/admin/users/{target.userid}/credentials/password", headers={"X-CSRF-Token": token}
)
assert response.status_code == 200 assert response.status_code == 200
assert "text/html" in response.headers["content-type"] assert "text/html" in response.headers["content-type"]
@ -334,7 +353,12 @@ async def test_delete_webauthn_credential(client: AsyncClient) -> None:
credential_id_b64 = urlsafe_b64encode(credential_id).decode().rstrip("=") credential_id_b64 = urlsafe_b64encode(credential_id).decode().rstrip("=")
response = await client.request("DELETE", f"/admin/users/{target.userid}/credentials/webauthn/{credential_id_b64}") token = await get_csrf_token(client)
response = await client.request(
"DELETE",
f"/admin/users/{target.userid}/credentials/webauthn/{credential_id_b64}",
headers={"X-CSRF-Token": token},
)
assert response.status_code == 200 assert response.status_code == 200
assert "text/html" in response.headers["content-type"] assert "text/html" in response.headers["content-type"]
@ -349,9 +373,11 @@ async def test_delete_webauthn_credential(client: AsyncClient) -> None:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_create_invite(client: AsyncClient) -> None: async def test_create_invite(client: AsyncClient) -> None:
await _login(client) await _login(client)
token = await get_csrf_token(client)
response = await client.post( response = await client.post(
"/admin/invite", "/admin/invite",
data={"username": "newuser"}, data={"username": "newuser"},
headers={"X-CSRF-Token": token},
) )
assert response.status_code == 200 assert response.status_code == 200
assert "Invite created" in response.text assert "Invite created" in response.text
@ -362,9 +388,11 @@ async def test_create_invite(client: AsyncClient) -> None:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_create_invite_empty_username(client: AsyncClient) -> None: async def test_create_invite_empty_username(client: AsyncClient) -> None:
await _login(client) await _login(client)
token = await get_csrf_token(client)
response = await client.post( response = await client.post(
"/admin/invite", "/admin/invite",
data={"username": " "}, data={"username": " "},
headers={"X-CSRF-Token": token},
) )
assert response.status_code == 200 assert response.status_code == 200
assert "Username is required" in response.text assert "Username is required" in response.text
@ -377,7 +405,8 @@ async def test_create_invite_empty_username(client: AsyncClient) -> None:
async def test_reinvite_user(client: AsyncClient) -> None: async def test_reinvite_user(client: AsyncClient) -> None:
await _login(client) await _login(client)
target = await _create_target_user(client, username="bob") target = await _create_target_user(client, username="bob")
response = await client.post(f"/admin/users/{target.userid}/invite") token = await get_csrf_token(client)
response = await client.post(f"/admin/users/{target.userid}/invite", headers={"X-CSRF-Token": token})
assert response.status_code == 200 assert response.status_code == 200
assert "Invite link generated" in response.text assert "Invite link generated" in response.text
assert "/register/" in response.text assert "/register/" in response.text
@ -386,6 +415,7 @@ async def test_reinvite_user(client: AsyncClient) -> None:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_reinvite_user_404_for_nonexistent(client: AsyncClient) -> None: async def test_reinvite_user_404_for_nonexistent(client: AsyncClient) -> None:
await _login(client) await _login(client)
response = await client.post("/admin/users/nonexistent-id/invite") token = await get_csrf_token(client)
response = await client.post("/admin/users/nonexistent-id/invite", headers={"X-CSRF-Token": token})
assert response.status_code == 404 assert response.status_code == 404
assert "not found" in response.text.lower() assert "not found" in response.text.lower()