- Use Annotated[str, Form()] for FastAPI dependencies (FAST002) - Add missing type annotations across src/ and tests/ (ANN001/003/201/202) - Reduce function arguments via request.form() reads (PLR0913) - Combine return paths to reduce return statements (PLR0911) - Use anyio.Path for async-safe filesystem operations (ASYNC240) - Extract constants, helpers, and dict comprehensions for clarity - Move inline imports to top-level (PLC0415) - Use raw strings for regex match patterns (RUF043) - Fix redundant get_session_user call in delete_user (not-iterable) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
436 lines
14 KiB
Python
436 lines
14 KiB
Python
from base64 import urlsafe_b64encode
|
|
from datetime import UTC, datetime
|
|
|
|
import pytest
|
|
from argon2 import PasswordHasher
|
|
from httpx import AsyncClient
|
|
|
|
from porchlight.authn.password import PasswordService
|
|
from porchlight.models import PasswordCredential, User, WebAuthnCredential
|
|
from tests.conftest import get_csrf_token
|
|
|
|
|
|
async def _login(
|
|
client: AsyncClient,
|
|
username: str = "admin",
|
|
password: str = "adminpass",
|
|
*,
|
|
userid: str = "admin-user-01",
|
|
groups: list[str] | None = None,
|
|
) -> None:
|
|
"""Helper: create user + password credential and log in via POST /login/password."""
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user_repo = app.state.user_repo
|
|
cred_repo = app.state.credential_repo
|
|
|
|
user = await user_repo.get_by_username(username)
|
|
if user is None:
|
|
user = User(
|
|
userid=userid,
|
|
username=username,
|
|
groups=groups if groups is not None else ["admin"],
|
|
created_at=datetime.now(UTC),
|
|
updated_at=datetime.now(UTC),
|
|
)
|
|
await user_repo.create(user)
|
|
|
|
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
|
existing = await cred_repo.get_password_by_user(user.userid)
|
|
if existing is None:
|
|
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash(password)))
|
|
|
|
token = await get_csrf_token(client)
|
|
await client.post(
|
|
"/login/password",
|
|
data={"username": username, "password": password},
|
|
headers={"HX-Request": "true", "X-CSRF-Token": token},
|
|
)
|
|
|
|
|
|
async def _create_target_user( # noqa: PLR0913
|
|
client: AsyncClient,
|
|
*,
|
|
userid: str = "target-user-01",
|
|
username: str = "bob",
|
|
email: str | None = None,
|
|
groups: list[str] | None = None,
|
|
active: bool = True,
|
|
) -> User:
|
|
"""Helper: create a target user in the database (does NOT log in)."""
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user_repo = app.state.user_repo
|
|
user = User(
|
|
userid=userid,
|
|
username=username,
|
|
email=email,
|
|
groups=groups or [],
|
|
active=active,
|
|
created_at=datetime.now(UTC),
|
|
updated_at=datetime.now(UTC),
|
|
)
|
|
return await user_repo.create(user)
|
|
|
|
|
|
# --- User list ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_users_list_returns_html(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
response = await client.get("/admin/users")
|
|
assert response.status_code == 200
|
|
assert "text/html" in response.headers["content-type"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_users_list_shows_users(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
await _create_target_user(client, username="bob")
|
|
response = await client.get("/admin/users")
|
|
assert response.status_code == 200
|
|
assert "bob" in response.text
|
|
assert "admin" in response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_users_list_search(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
await _create_target_user(client, userid="target-user-01", username="bob")
|
|
await _create_target_user(client, userid="target-user-02", username="carol")
|
|
response = await client.get("/admin/users?q=bob")
|
|
assert response.status_code == 200
|
|
assert "bob" in response.text
|
|
assert "carol" not in response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_users_list_htmx_search_returns_partial(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
await _create_target_user(client, username="bob")
|
|
response = await client.get(
|
|
"/admin/users?q=bob",
|
|
headers={"HX-Request": "true", "HX-Trigger-Name": "q"},
|
|
)
|
|
assert response.status_code == 200
|
|
# Partial should contain user row data but not the full page layout
|
|
assert "bob" in response.text
|
|
assert "<thead>" not in response.text
|
|
assert "<html" not in response.text
|
|
|
|
|
|
# --- User detail ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_user_detail_returns_html(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
response = await client.get(f"/admin/users/{target.userid}")
|
|
assert response.status_code == 200
|
|
assert "text/html" in response.headers["content-type"]
|
|
assert "bob" in response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_user_detail_404_for_nonexistent(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
response = await client.get("/admin/users/nonexistent-id")
|
|
assert response.status_code == 404
|
|
assert "not found" in response.text.lower()
|
|
|
|
|
|
# --- Profile update ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_profile(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
f"/admin/users/{target.userid}/profile",
|
|
data={
|
|
"given_name": "Bob",
|
|
"family_name": "Smith",
|
|
"preferred_username": "bobby",
|
|
"email": "bob@example.com",
|
|
"phone_number": "+46701234567",
|
|
"picture": "https://example.com/bob.jpg",
|
|
"locale": "en-US",
|
|
},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "Profile updated" in response.text
|
|
|
|
# Verify user was actually updated
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user = await app.state.user_repo.get_by_userid(target.userid)
|
|
assert user is not None
|
|
assert user.given_name == "Bob"
|
|
assert user.family_name == "Smith"
|
|
assert user.email == "bob@example.com"
|
|
assert user.picture == "https://example.com/bob.jpg"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_profile_invalid_email(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
f"/admin/users/{target.userid}/profile",
|
|
data={"email": "not-an-email"},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "alert" in response.text
|
|
assert "email" in response.text.lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_profile_invalid_phone(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
f"/admin/users/{target.userid}/profile",
|
|
data={"phone_number": "not-a-phone"},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "alert" in response.text
|
|
assert "phone" in response.text.lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_profile_invalid_picture_url(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
f"/admin/users/{target.userid}/profile",
|
|
data={"picture": "not-a-url"},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "Picture URL must be a valid HTTP or HTTPS URL" in response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_profile_404_for_nonexistent(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
"/admin/users/nonexistent-id/profile",
|
|
data={"given_name": "Ghost"},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 404
|
|
assert "not found" in response.text.lower()
|
|
|
|
|
|
# --- Groups update ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_groups(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
f"/admin/users/{target.userid}/groups",
|
|
data={"groups": "admin, users, editors"},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "Groups updated" in response.text
|
|
|
|
# Verify groups were actually updated
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user = await app.state.user_repo.get_by_userid(target.userid)
|
|
assert user is not None
|
|
assert set(user.groups) == {"admin", "users", "editors"}
|
|
|
|
|
|
# --- Activate / deactivate ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_activate_user(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob", active=False)
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(f"/admin/users/{target.userid}/activate", headers={"X-CSRF-Token": token})
|
|
assert response.status_code == 200
|
|
assert "User activated" in response.text
|
|
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user = await app.state.user_repo.get_by_userid(target.userid)
|
|
assert user is not None
|
|
assert user.active is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_deactivate_user(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob", active=True)
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(f"/admin/users/{target.userid}/deactivate", headers={"X-CSRF-Token": token})
|
|
assert response.status_code == 200
|
|
assert "User deactivated" in response.text
|
|
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user = await app.state.user_repo.get_by_userid(target.userid)
|
|
assert user is not None
|
|
assert user.active is False
|
|
|
|
|
|
# --- Delete user ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_user(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
token = await get_csrf_token(client)
|
|
response = await client.request("DELETE", f"/admin/users/{target.userid}", headers={"X-CSRF-Token": token})
|
|
assert response.status_code == 200
|
|
assert "User deleted" in response.text
|
|
assert response.headers.get("hx-redirect") == "/admin/users"
|
|
|
|
# Verify user was actually deleted
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user = await app.state.user_repo.get_by_userid(target.userid)
|
|
assert user is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_user_self_prevention(client: AsyncClient) -> None:
|
|
await _login(client, userid="admin-user-01")
|
|
# Try to delete ourselves
|
|
token = await get_csrf_token(client)
|
|
response = await client.request("DELETE", "/admin/users/admin-user-01", headers={"X-CSRF-Token": token})
|
|
assert response.status_code == 200
|
|
assert "Cannot delete your own account" in response.text
|
|
|
|
# Verify admin user was NOT deleted
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user = await app.state.user_repo.get_by_userid("admin-user-01")
|
|
assert user is not None
|
|
|
|
|
|
# --- Delete credentials ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_password_credential(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
|
|
# Create a password credential for the target user
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
cred_repo = app.state.credential_repo
|
|
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
|
await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("bobpass")))
|
|
|
|
token = await get_csrf_token(client)
|
|
response = await client.request(
|
|
"DELETE", f"/admin/users/{target.userid}/credentials/password", headers={"X-CSRF-Token": token}
|
|
)
|
|
assert response.status_code == 200
|
|
assert "text/html" in response.headers["content-type"]
|
|
|
|
# Verify password was deleted
|
|
pw = await cred_repo.get_password_by_user(target.userid)
|
|
assert pw is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_webauthn_credential(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
|
|
# Create a webauthn credential for the target user
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
cred_repo = app.state.credential_repo
|
|
credential_id = b"\x01\x02\x03\x04\x05\x06\x07\x08"
|
|
await cred_repo.create_webauthn(
|
|
WebAuthnCredential(
|
|
user_id=target.userid,
|
|
credential_id=credential_id,
|
|
public_key=b"\x00" * 32,
|
|
sign_count=0,
|
|
device_name="test-key",
|
|
)
|
|
)
|
|
|
|
# URL uses base64url without padding
|
|
credential_id_b64 = urlsafe_b64encode(credential_id).decode().rstrip("=")
|
|
|
|
token = await get_csrf_token(client)
|
|
response = await client.request(
|
|
"DELETE",
|
|
f"/admin/users/{target.userid}/credentials/webauthn/{credential_id_b64}",
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "text/html" in response.headers["content-type"]
|
|
|
|
# Verify credential was deleted
|
|
creds = await cred_repo.get_webauthn_by_user(target.userid)
|
|
assert len(creds) == 0
|
|
|
|
|
|
# --- Create invite ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_invite(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
"/admin/invite",
|
|
data={"username": "newuser"},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "Invite created" in response.text
|
|
assert "newuser" in response.text
|
|
assert "/register/" in response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_invite_empty_username(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(
|
|
"/admin/invite",
|
|
data={"username": " "},
|
|
headers={"X-CSRF-Token": token},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "Username is required" in response.text
|
|
|
|
|
|
# --- Re-invite ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reinvite_user(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
target = await _create_target_user(client, username="bob")
|
|
token = await get_csrf_token(client)
|
|
response = await client.post(f"/admin/users/{target.userid}/invite", headers={"X-CSRF-Token": token})
|
|
assert response.status_code == 200
|
|
assert "Invite link generated" in response.text
|
|
assert "/register/" in response.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reinvite_user_404_for_nonexistent(client: AsyncClient) -> None:
|
|
await _login(client)
|
|
token = await get_csrf_token(client)
|
|
response = await client.post("/admin/users/nonexistent-id/invite", headers={"X-CSRF-Token": token})
|
|
assert response.status_code == 404
|
|
assert "not found" in response.text.lower()
|