porchlight/tests/test_admin/test_admin_routes.py
2026-02-19 14:17:41 +01:00

391 lines
13 KiB
Python

from datetime import UTC, datetime
import pytest
from argon2 import PasswordHasher
from httpx import AsyncClient
from porchlight.authn.password import PasswordService
from porchlight.models import PasswordCredential, User, WebAuthnCredential
async def _login(
client: AsyncClient,
username: str = "admin",
password: str = "adminpass",
*,
userid: str = "admin-user-01",
groups: list[str] | None = None,
) -> None:
"""Helper: create user + password credential and log in via POST /login/password."""
app = client._transport.app # type: ignore[union-attr]
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = await user_repo.get_by_username(username)
if user is None:
user = User(
userid=userid,
username=username,
groups=groups if groups is not None else ["admin"],
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
existing = await cred_repo.get_password_by_user(user.userid)
if existing is None:
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash(password)))
await client.post(
"/login/password",
data={"username": username, "password": password},
headers={"HX-Request": "true"},
)
async def _create_target_user(
client: AsyncClient,
*,
userid: str = "target-user-01",
username: str = "bob",
email: str | None = None,
groups: list[str] | None = None,
active: bool = True,
) -> User:
"""Helper: create a target user in the database (does NOT log in)."""
app = client._transport.app # type: ignore[union-attr]
user_repo = app.state.user_repo
user = User(
userid=userid,
username=username,
email=email,
groups=groups or [],
active=active,
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
return await user_repo.create(user)
# --- User list ---
@pytest.mark.asyncio
async def test_users_list_returns_html(client: AsyncClient) -> None:
await _login(client)
response = await client.get("/admin/users")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
@pytest.mark.asyncio
async def test_users_list_shows_users(client: AsyncClient) -> None:
await _login(client)
await _create_target_user(client, username="bob")
response = await client.get("/admin/users")
assert response.status_code == 200
assert "bob" in response.text
assert "admin" in response.text
@pytest.mark.asyncio
async def test_users_list_search(client: AsyncClient) -> None:
await _login(client)
await _create_target_user(client, userid="target-user-01", username="bob")
await _create_target_user(client, userid="target-user-02", username="carol")
response = await client.get("/admin/users?q=bob")
assert response.status_code == 200
assert "bob" in response.text
assert "carol" not in response.text
@pytest.mark.asyncio
async def test_users_list_htmx_search_returns_partial(client: AsyncClient) -> None:
await _login(client)
await _create_target_user(client, username="bob")
response = await client.get(
"/admin/users?q=bob",
headers={"HX-Request": "true", "HX-Trigger-Name": "q"},
)
assert response.status_code == 200
# Partial should contain user row data but not the full page layout
assert "bob" in response.text
assert "<thead>" not in response.text
assert "<html" not in response.text
# --- User detail ---
@pytest.mark.asyncio
async def test_user_detail_returns_html(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob")
response = await client.get(f"/admin/users/{target.userid}")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
assert "bob" in response.text
@pytest.mark.asyncio
async def test_user_detail_404_for_nonexistent(client: AsyncClient) -> None:
await _login(client)
response = await client.get("/admin/users/nonexistent-id")
assert response.status_code == 404
assert "not found" in response.text.lower()
# --- Profile update ---
@pytest.mark.asyncio
async def test_update_profile(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob")
response = await client.post(
f"/admin/users/{target.userid}/profile",
data={
"given_name": "Bob",
"family_name": "Smith",
"preferred_username": "bobby",
"email": "bob@example.com",
"phone_number": "+1234567890",
"picture": "https://example.com/bob.jpg",
"locale": "en-US",
},
)
assert response.status_code == 200
assert "Profile updated" in response.text
# Verify user was actually updated
app = client._transport.app # type: ignore[union-attr]
user = await app.state.user_repo.get_by_userid(target.userid)
assert user is not None
assert user.given_name == "Bob"
assert user.family_name == "Smith"
assert user.email == "bob@example.com"
assert user.picture == "https://example.com/bob.jpg"
@pytest.mark.asyncio
async def test_update_profile_invalid_email(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob")
response = await client.post(
f"/admin/users/{target.userid}/profile",
data={"email": "not-an-email"},
)
assert response.status_code == 200
assert "Invalid email" in response.text
@pytest.mark.asyncio
async def test_update_profile_invalid_picture_url(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob")
response = await client.post(
f"/admin/users/{target.userid}/profile",
data={"picture": "not-a-url"},
)
assert response.status_code == 200
assert "Picture URL must be a valid HTTP or HTTPS URL" in response.text
@pytest.mark.asyncio
async def test_update_profile_404_for_nonexistent(client: AsyncClient) -> None:
await _login(client)
response = await client.post(
"/admin/users/nonexistent-id/profile",
data={"given_name": "Ghost"},
)
assert response.status_code == 404
assert "not found" in response.text.lower()
# --- Groups update ---
@pytest.mark.asyncio
async def test_update_groups(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob")
response = await client.post(
f"/admin/users/{target.userid}/groups",
data={"groups": "admin, users, editors"},
)
assert response.status_code == 200
assert "Groups updated" in response.text
# Verify groups were actually updated
app = client._transport.app # type: ignore[union-attr]
user = await app.state.user_repo.get_by_userid(target.userid)
assert user is not None
assert set(user.groups) == {"admin", "users", "editors"}
# --- Activate / deactivate ---
@pytest.mark.asyncio
async def test_activate_user(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob", active=False)
response = await client.post(f"/admin/users/{target.userid}/activate")
assert response.status_code == 200
assert "User activated" in response.text
app = client._transport.app # type: ignore[union-attr]
user = await app.state.user_repo.get_by_userid(target.userid)
assert user is not None
assert user.active is True
@pytest.mark.asyncio
async def test_deactivate_user(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob", active=True)
response = await client.post(f"/admin/users/{target.userid}/deactivate")
assert response.status_code == 200
assert "User deactivated" in response.text
app = client._transport.app # type: ignore[union-attr]
user = await app.state.user_repo.get_by_userid(target.userid)
assert user is not None
assert user.active is False
# --- Delete user ---
@pytest.mark.asyncio
async def test_delete_user(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob")
response = await client.request("DELETE", f"/admin/users/{target.userid}")
assert response.status_code == 200
assert "User deleted" in response.text
assert response.headers.get("hx-redirect") == "/admin/users"
# Verify user was actually deleted
app = client._transport.app # type: ignore[union-attr]
user = await app.state.user_repo.get_by_userid(target.userid)
assert user is None
@pytest.mark.asyncio
async def test_delete_user_self_prevention(client: AsyncClient) -> None:
await _login(client, userid="admin-user-01")
# Try to delete ourselves
response = await client.request("DELETE", "/admin/users/admin-user-01")
assert response.status_code == 200
assert "Cannot delete your own account" in response.text
# Verify admin user was NOT deleted
app = client._transport.app # type: ignore[union-attr]
user = await app.state.user_repo.get_by_userid("admin-user-01")
assert user is not None
# --- Delete credentials ---
@pytest.mark.asyncio
async def test_delete_password_credential(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob")
# Create a password credential for the target user
app = client._transport.app # type: ignore[union-attr]
cred_repo = app.state.credential_repo
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("bobpass")))
response = await client.request("DELETE", f"/admin/users/{target.userid}/credentials/password")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
# Verify password was deleted
pw = await cred_repo.get_password_by_user(target.userid)
assert pw is None
@pytest.mark.asyncio
async def test_delete_webauthn_credential(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob")
# Create a webauthn credential for the target user
app = client._transport.app # type: ignore[union-attr]
cred_repo = app.state.credential_repo
credential_id = b"\x01\x02\x03\x04\x05\x06\x07\x08"
await cred_repo.create_webauthn(
WebAuthnCredential(
user_id=target.userid,
credential_id=credential_id,
public_key=b"\x00" * 32,
sign_count=0,
device_name="test-key",
)
)
# URL uses base64url without padding
from base64 import urlsafe_b64encode
credential_id_b64 = urlsafe_b64encode(credential_id).decode().rstrip("=")
response = await client.request("DELETE", f"/admin/users/{target.userid}/credentials/webauthn/{credential_id_b64}")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
# Verify credential was deleted
creds = await cred_repo.get_webauthn_by_user(target.userid)
assert len(creds) == 0
# --- Create invite ---
@pytest.mark.asyncio
async def test_create_invite(client: AsyncClient) -> None:
await _login(client)
response = await client.post(
"/admin/invite",
data={"username": "newuser"},
)
assert response.status_code == 200
assert "Invite created" in response.text
assert "newuser" in response.text
assert "/register/" in response.text
@pytest.mark.asyncio
async def test_create_invite_empty_username(client: AsyncClient) -> None:
await _login(client)
response = await client.post(
"/admin/invite",
data={"username": " "},
)
assert response.status_code == 200
assert "Username is required" in response.text
# --- Re-invite ---
@pytest.mark.asyncio
async def test_reinvite_user(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob")
response = await client.post(f"/admin/users/{target.userid}/invite")
assert response.status_code == 200
assert "Invite link generated" in response.text
assert "/register/" in response.text
@pytest.mark.asyncio
async def test_reinvite_user_404_for_nonexistent(client: AsyncClient) -> None:
await _login(client)
response = await client.post("/admin/users/nonexistent-id/invite")
assert response.status_code == 404
assert "not found" in response.text.lower()