The CSRF middleware added to main after the admin-pages branch was created caused all admin test POSTs/DELETEs to be rejected. Add get_csrf_token() calls and X-CSRF-Token headers to login helpers and all mutation requests, matching the pattern used by other tests.
421 lines
14 KiB
Python
421 lines
14 KiB
Python
from datetime import UTC, datetime
|
|
|
|
import pytest
|
|
from argon2 import PasswordHasher
|
|
from httpx import AsyncClient
|
|
|
|
from porchlight.authn.password import PasswordService
|
|
from porchlight.models import PasswordCredential, User, WebAuthnCredential
|
|
from tests.conftest import get_csrf_token
|
|
|
|
|
|
async def _login(
|
|
client: AsyncClient,
|
|
username: str = "admin",
|
|
password: str = "adminpass",
|
|
*,
|
|
userid: str = "admin-user-01",
|
|
groups: list[str] | None = None,
|
|
) -> None:
|
|
"""Helper: create user + password credential and log in via POST /login/password."""
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user_repo = app.state.user_repo
|
|
cred_repo = app.state.credential_repo
|
|
|
|
user = await user_repo.get_by_username(username)
|
|
if user is None:
|
|
user = User(
|
|
userid=userid,
|
|
username=username,
|
|
groups=groups if groups is not None else ["admin"],
|
|
created_at=datetime.now(UTC),
|
|
updated_at=datetime.now(UTC),
|
|
)
|
|
await user_repo.create(user)
|
|
|
|
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
|
existing = await cred_repo.get_password_by_user(user.userid)
|
|
if existing is None:
|
|
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash(password)))
|
|
|
|
token = await get_csrf_token(client)
|
|
await client.post(
|
|
"/login/password",
|
|
data={"username": username, "password": password},
|
|
headers={"HX-Request": "true", "X-CSRF-Token": token},
|
|
)
|
|
|
|
|
|
async def _create_target_user(
|
|
client: AsyncClient,
|
|
*,
|
|
userid: str = "target-user-01",
|
|
username: str = "bob",
|
|
email: str | None = None,
|
|
groups: list[str] | None = None,
|
|
active: bool = True,
|
|
) -> User:
|
|
"""Helper: create a target user in the database (does NOT log in)."""
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user_repo = app.state.user_repo
|
|
user = User(
|
|
userid=userid,
|
|
username=username,
|
|
email=email,
|
|
groups=groups or [],
|
|
active=active,
|
|
created_at=datetime.now(UTC),
|
|
updated_at=datetime.now(UTC),
|
|
)
|
|
return await user_repo.create(user)
|
|
|
|
|
|
# --- User list ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_users_list_returns_html(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
response = await client.get("/admin/users")
|
|
assert response.status_code == 200
|
|
assert "text/html" in response.headers["content-type"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_users_list_shows_users(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
await _create_target_user(client, username="bob")
|
|
response = await client.get("/admin/users")
|
|
assert response.status_code == 200
|
|
assert "bob" in response.text
|
|
assert "admin" in response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_users_list_search(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
await _create_target_user(client, userid="target-user-01", username="bob")
|
|
await _create_target_user(client, userid="target-user-02", username="carol")
|
|
response = await client.get("/admin/users?q=bob")
|
|
assert response.status_code == 200
|
|
assert "bob" in response.text
|
|
assert "carol" not in response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_users_list_htmx_search_returns_partial(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
await _create_target_user(client, username="bob")
|
|
response = await client.get(
|
|
"/admin/users?q=bob",
|
|
headers={"HX-Request": "true", "HX-Trigger-Name": "q"},
|
|
)
|
|
assert response.status_code == 200
|
|
# Partial should contain user row data but not the full page layout
|
|
assert "bob" in response.text
|
|
assert "<thead>" not in response.text
|
|
assert "<html" not in response.text
|
|
|
|
|
|
# --- User detail ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_user_detail_returns_html(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
response = await client.get(f"/admin/users/{target.userid}")
|
|
assert response.status_code == 200
|
|
assert "text/html" in response.headers["content-type"]
|
|
assert "bob" in response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_user_detail_404_for_nonexistent(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
response = await client.get("/admin/users/nonexistent-id")
|
|
assert response.status_code == 404
|
|
assert "not found" in response.text.lower()
|
|
|
|
|
|
# --- Profile update ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_profile(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
f"/admin/users/{target.userid}/profile",
|
|
data={
|
|
"given_name": "Bob",
|
|
"family_name": "Smith",
|
|
"preferred_username": "bobby",
|
|
"email": "bob@example.com",
|
|
"phone_number": "+1234567890",
|
|
"picture": "https://example.com/bob.jpg",
|
|
"locale": "en-US",
|
|
},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "Profile updated" in response.text
|
|
|
|
# Verify user was actually updated
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user = await app.state.user_repo.get_by_userid(target.userid)
|
|
assert user is not None
|
|
assert user.given_name == "Bob"
|
|
assert user.family_name == "Smith"
|
|
assert user.email == "bob@example.com"
|
|
assert user.picture == "https://example.com/bob.jpg"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_profile_invalid_email(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
f"/admin/users/{target.userid}/profile",
|
|
data={"email": "not-an-email"},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "Invalid email" in response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_profile_invalid_picture_url(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
f"/admin/users/{target.userid}/profile",
|
|
data={"picture": "not-a-url"},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "Picture URL must be a valid HTTP or HTTPS URL" in response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_profile_404_for_nonexistent(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
"/admin/users/nonexistent-id/profile",
|
|
data={"given_name": "Ghost"},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 404
|
|
assert "not found" in response.text.lower()
|
|
|
|
|
|
# --- Groups update ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_groups(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
f"/admin/users/{target.userid}/groups",
|
|
data={"groups": "admin, users, editors"},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "Groups updated" in response.text
|
|
|
|
# Verify groups were actually updated
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user = await app.state.user_repo.get_by_userid(target.userid)
|
|
assert user is not None
|
|
assert set(user.groups) == {"admin", "users", "editors"}
|
|
|
|
|
|
# --- Activate / deactivate ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_activate_user(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob", active=False)
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(f"/admin/users/{target.userid}/activate", headers={"X-CSRF-Token": token})
|
|
assert response.status_code == 200
|
|
assert "User activated" in response.text
|
|
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user = await app.state.user_repo.get_by_userid(target.userid)
|
|
assert user is not None
|
|
assert user.active is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_deactivate_user(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob", active=True)
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(f"/admin/users/{target.userid}/deactivate", headers={"X-CSRF-Token": token})
|
|
assert response.status_code == 200
|
|
assert "User deactivated" in response.text
|
|
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user = await app.state.user_repo.get_by_userid(target.userid)
|
|
assert user is not None
|
|
assert user.active is False
|
|
|
|
|
|
# --- Delete user ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_user(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
token = await get_csrf_token(client)
|
|
response = await client.request("DELETE", f"/admin/users/{target.userid}", headers={"X-CSRF-Token": token})
|
|
assert response.status_code == 200
|
|
assert "User deleted" in response.text
|
|
assert response.headers.get("hx-redirect") == "/admin/users"
|
|
|
|
# Verify user was actually deleted
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user = await app.state.user_repo.get_by_userid(target.userid)
|
|
assert user is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_user_self_prevention(client: AsyncClient) -> None:
|
|
await _login(client, userid="admin-user-01")
|
|
# Try to delete ourselves
|
|
token = await get_csrf_token(client)
|
|
response = await client.request("DELETE", "/admin/users/admin-user-01", headers={"X-CSRF-Token": token})
|
|
assert response.status_code == 200
|
|
assert "Cannot delete your own account" in response.text
|
|
|
|
# Verify admin user was NOT deleted
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user = await app.state.user_repo.get_by_userid("admin-user-01")
|
|
assert user is not None
|
|
|
|
|
|
# --- Delete credentials ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_password_credential(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
|
|
# Create a password credential for the target user
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
cred_repo = app.state.credential_repo
|
|
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
|
await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("bobpass")))
|
|
|
|
token = await get_csrf_token(client)
|
|
response = await client.request(
|
|
"DELETE", f"/admin/users/{target.userid}/credentials/password", headers={"X-CSRF-Token": token}
|
|
)
|
|
assert response.status_code == 200
|
|
assert "text/html" in response.headers["content-type"]
|
|
|
|
# Verify password was deleted
|
|
pw = await cred_repo.get_password_by_user(target.userid)
|
|
assert pw is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_webauthn_credential(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
|
|
# Create a webauthn credential for the target user
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
cred_repo = app.state.credential_repo
|
|
credential_id = b"\x01\x02\x03\x04\x05\x06\x07\x08"
|
|
await cred_repo.create_webauthn(
|
|
WebAuthnCredential(
|
|
user_id=target.userid,
|
|
credential_id=credential_id,
|
|
public_key=b"\x00" * 32,
|
|
sign_count=0,
|
|
device_name="test-key",
|
|
)
|
|
)
|
|
|
|
# URL uses base64url without padding
|
|
from base64 import urlsafe_b64encode
|
|
|
|
credential_id_b64 = urlsafe_b64encode(credential_id).decode().rstrip("=")
|
|
|
|
token = await get_csrf_token(client)
|
|
response = await client.request(
|
|
"DELETE",
|
|
f"/admin/users/{target.userid}/credentials/webauthn/{credential_id_b64}",
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "text/html" in response.headers["content-type"]
|
|
|
|
# Verify credential was deleted
|
|
creds = await cred_repo.get_webauthn_by_user(target.userid)
|
|
assert len(creds) == 0
|
|
|
|
|
|
# --- Create invite ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_invite(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
"/admin/invite",
|
|
data={"username": "newuser"},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "Invite created" in response.text
|
|
assert "newuser" in response.text
|
|
assert "/register/" in response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_invite_empty_username(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
"/admin/invite",
|
|
data={"username": " "},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "Username is required" in response.text
|
|
|
|
|
|
# --- Re-invite ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reinvite_user(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(f"/admin/users/{target.userid}/invite", headers={"X-CSRF-Token": token})
|
|
assert response.status_code == 200
|
|
assert "Invite link generated" in response.text
|
|
assert "/register/" in response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reinvite_user_404_for_nonexistent(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
token = await get_csrf_token(client)
|
|
response = await client.post("/admin/users/nonexistent-id/invite", headers={"X-CSRF-Token": token})
|
|
assert response.status_code == 404
|
|
assert "not found" in response.text.lower()
|