from datetime import UTC, datetime import pytest from argon2 import PasswordHasher from httpx import AsyncClient from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User, WebAuthnCredential async def _login( client: AsyncClient, username: str = "admin", password: str = "adminpass", *, userid: str = "admin-user-01", groups: list[str] | None = None, ) -> None: """Helper: create user + password credential and log in via POST /login/password.""" app = client._transport.app # type: ignore[union-attr] user_repo = app.state.user_repo cred_repo = app.state.credential_repo user = await user_repo.get_by_username(username) if user is None: user = User( userid=userid, username=username, groups=groups if groups is not None else ["admin"], created_at=datetime.now(UTC), updated_at=datetime.now(UTC), ) await user_repo.create(user) svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) existing = await cred_repo.get_password_by_user(user.userid) if existing is None: await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash(password))) await client.post( "/login/password", data={"username": username, "password": password}, headers={"HX-Request": "true"}, ) async def _create_target_user( client: AsyncClient, *, userid: str = "target-user-01", username: str = "bob", email: str | None = None, groups: list[str] | None = None, active: bool = True, ) -> User: """Helper: create a target user in the database (does NOT log in).""" app = client._transport.app # type: ignore[union-attr] user_repo = app.state.user_repo user = User( userid=userid, username=username, email=email, groups=groups or [], active=active, created_at=datetime.now(UTC), updated_at=datetime.now(UTC), ) return await user_repo.create(user) # --- User list --- @pytest.mark.asyncio async def test_users_list_returns_html(client: AsyncClient) -> None: await _login(client) response = await client.get("/admin/users") assert response.status_code == 200 assert "text/html" in response.headers["content-type"] @pytest.mark.asyncio async def test_users_list_shows_users(client: AsyncClient) -> None: await _login(client) await _create_target_user(client, username="bob") response = await client.get("/admin/users") assert response.status_code == 200 assert "bob" in response.text assert "admin" in response.text @pytest.mark.asyncio async def test_users_list_search(client: AsyncClient) -> None: await _login(client) await _create_target_user(client, userid="target-user-01", username="bob") await _create_target_user(client, userid="target-user-02", username="carol") response = await client.get("/admin/users?q=bob") assert response.status_code == 200 assert "bob" in response.text assert "carol" not in response.text @pytest.mark.asyncio async def test_users_list_htmx_search_returns_partial(client: AsyncClient) -> None: await _login(client) await _create_target_user(client, username="bob") response = await client.get( "/admin/users?q=bob", headers={"HX-Request": "true", "HX-Trigger-Name": "q"}, ) assert response.status_code == 200 # Partial should contain user row data but not the full page layout assert "bob" in response.text assert "" not in response.text assert " None: await _login(client) target = await _create_target_user(client, username="bob") response = await client.get(f"/admin/users/{target.userid}") assert response.status_code == 200 assert "text/html" in response.headers["content-type"] assert "bob" in response.text @pytest.mark.asyncio async def test_user_detail_404_for_nonexistent(client: AsyncClient) -> None: await _login(client) response = await client.get("/admin/users/nonexistent-id") assert response.status_code == 404 assert "not found" in response.text.lower() # --- Profile update --- @pytest.mark.asyncio async def test_update_profile(client: AsyncClient) -> None: await _login(client) target = await _create_target_user(client, username="bob") response = await client.post( f"/admin/users/{target.userid}/profile", data={ "given_name": "Bob", "family_name": "Smith", "preferred_username": "bobby", "email": "bob@example.com", "phone_number": "+1234567890", "picture": "https://example.com/bob.jpg", "locale": "en-US", }, ) assert response.status_code == 200 assert "Profile updated" in response.text # Verify user was actually updated app = client._transport.app # type: ignore[union-attr] user = await app.state.user_repo.get_by_userid(target.userid) assert user is not None assert user.given_name == "Bob" assert user.family_name == "Smith" assert user.email == "bob@example.com" assert user.picture == "https://example.com/bob.jpg" @pytest.mark.asyncio async def test_update_profile_invalid_email(client: AsyncClient) -> None: await _login(client) target = await _create_target_user(client, username="bob") response = await client.post( f"/admin/users/{target.userid}/profile", data={"email": "not-an-email"}, ) assert response.status_code == 200 assert "Invalid email" in response.text @pytest.mark.asyncio async def test_update_profile_invalid_picture_url(client: AsyncClient) -> None: await _login(client) target = await _create_target_user(client, username="bob") response = await client.post( f"/admin/users/{target.userid}/profile", data={"picture": "not-a-url"}, ) assert response.status_code == 200 assert "Picture URL must be a valid HTTP or HTTPS URL" in response.text @pytest.mark.asyncio async def test_update_profile_404_for_nonexistent(client: AsyncClient) -> None: await _login(client) response = await client.post( "/admin/users/nonexistent-id/profile", data={"given_name": "Ghost"}, ) assert response.status_code == 404 assert "not found" in response.text.lower() # --- Groups update --- @pytest.mark.asyncio async def test_update_groups(client: AsyncClient) -> None: await _login(client) target = await _create_target_user(client, username="bob") response = await client.post( f"/admin/users/{target.userid}/groups", data={"groups": "admin, users, editors"}, ) assert response.status_code == 200 assert "Groups updated" in response.text # Verify groups were actually updated app = client._transport.app # type: ignore[union-attr] user = await app.state.user_repo.get_by_userid(target.userid) assert user is not None assert set(user.groups) == {"admin", "users", "editors"} # --- Activate / deactivate --- @pytest.mark.asyncio async def test_activate_user(client: AsyncClient) -> None: await _login(client) target = await _create_target_user(client, username="bob", active=False) response = await client.post(f"/admin/users/{target.userid}/activate") assert response.status_code == 200 assert "User activated" in response.text app = client._transport.app # type: ignore[union-attr] user = await app.state.user_repo.get_by_userid(target.userid) assert user is not None assert user.active is True @pytest.mark.asyncio async def test_deactivate_user(client: AsyncClient) -> None: await _login(client) target = await _create_target_user(client, username="bob", active=True) response = await client.post(f"/admin/users/{target.userid}/deactivate") assert response.status_code == 200 assert "User deactivated" in response.text app = client._transport.app # type: ignore[union-attr] user = await app.state.user_repo.get_by_userid(target.userid) assert user is not None assert user.active is False # --- Delete user --- @pytest.mark.asyncio async def test_delete_user(client: AsyncClient) -> None: await _login(client) target = await _create_target_user(client, username="bob") response = await client.request("DELETE", f"/admin/users/{target.userid}") assert response.status_code == 200 assert "User deleted" in response.text assert response.headers.get("hx-redirect") == "/admin/users" # Verify user was actually deleted app = client._transport.app # type: ignore[union-attr] user = await app.state.user_repo.get_by_userid(target.userid) assert user is None @pytest.mark.asyncio async def test_delete_user_self_prevention(client: AsyncClient) -> None: await _login(client, userid="admin-user-01") # Try to delete ourselves response = await client.request("DELETE", "/admin/users/admin-user-01") assert response.status_code == 200 assert "Cannot delete your own account" in response.text # Verify admin user was NOT deleted app = client._transport.app # type: ignore[union-attr] user = await app.state.user_repo.get_by_userid("admin-user-01") assert user is not None # --- Delete credentials --- @pytest.mark.asyncio async def test_delete_password_credential(client: AsyncClient) -> None: await _login(client) target = await _create_target_user(client, username="bob") # Create a password credential for the target user app = client._transport.app # type: ignore[union-attr] cred_repo = app.state.credential_repo svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("bobpass"))) response = await client.request("DELETE", f"/admin/users/{target.userid}/credentials/password") assert response.status_code == 200 assert "text/html" in response.headers["content-type"] # Verify password was deleted pw = await cred_repo.get_password_by_user(target.userid) assert pw is None @pytest.mark.asyncio async def test_delete_webauthn_credential(client: AsyncClient) -> None: await _login(client) target = await _create_target_user(client, username="bob") # Create a webauthn credential for the target user app = client._transport.app # type: ignore[union-attr] cred_repo = app.state.credential_repo credential_id = b"\x01\x02\x03\x04\x05\x06\x07\x08" await cred_repo.create_webauthn( WebAuthnCredential( user_id=target.userid, credential_id=credential_id, public_key=b"\x00" * 32, sign_count=0, device_name="test-key", ) ) # URL uses base64url without padding from base64 import urlsafe_b64encode credential_id_b64 = urlsafe_b64encode(credential_id).decode().rstrip("=") response = await client.request("DELETE", f"/admin/users/{target.userid}/credentials/webauthn/{credential_id_b64}") assert response.status_code == 200 assert "text/html" in response.headers["content-type"] # Verify credential was deleted creds = await cred_repo.get_webauthn_by_user(target.userid) assert len(creds) == 0 # --- Create invite --- @pytest.mark.asyncio async def test_create_invite(client: AsyncClient) -> None: await _login(client) response = await client.post( "/admin/invite", data={"username": "newuser"}, ) assert response.status_code == 200 assert "Invite created" in response.text assert "newuser" in response.text assert "/register/" in response.text @pytest.mark.asyncio async def test_create_invite_empty_username(client: AsyncClient) -> None: await _login(client) response = await client.post( "/admin/invite", data={"username": " "}, ) assert response.status_code == 200 assert "Username is required" in response.text # --- Re-invite --- @pytest.mark.asyncio async def test_reinvite_user(client: AsyncClient) -> None: await _login(client) target = await _create_target_user(client, username="bob") response = await client.post(f"/admin/users/{target.userid}/invite") assert response.status_code == 200 assert "Invite link generated" in response.text assert "/register/" in response.text @pytest.mark.asyncio async def test_reinvite_user_404_for_nonexistent(client: AsyncClient) -> None: await _login(client) response = await client.post("/admin/users/nonexistent-id/invite") assert response.status_code == 404 assert "not found" in response.text.lower()