diff --git a/src/porchlight/admin/routes.py b/src/porchlight/admin/routes.py index b6692cd..931dc6e 100644 --- a/src/porchlight/admin/routes.py +++ b/src/porchlight/admin/routes.py @@ -6,7 +6,7 @@ from pydantic import ValidationError from porchlight.dependencies import get_session_user from porchlight.models import User -from porchlight.validation import ProfileUpdate +from porchlight.validation import GroupListInput, ProfileUpdate, UsernameInput, format_validation_errors router = APIRouter(prefix="/admin", tags=["admin"]) @@ -109,17 +109,21 @@ async def create_invite( if admin is None: return HTMLResponse("Forbidden", status_code=403) - username = username.strip() - if not username: - return HTMLResponse('
Username is required
') + try: + validated = UsernameInput(username=username) + except ValidationError as exc: + return HTMLResponse(format_validation_errors(exc)) magic_link_service = request.app.state.magic_link_service settings = request.app.state.settings - link = await magic_link_service.create(username=username, created_by=admin.username, note="admin invite") + link = await magic_link_service.create( + username=validated.username, created_by=admin.username, note="admin invite" + ) url = f"{settings.issuer}/register/{link.token}" return HTMLResponse( - f'
Invite created for {username}:
{url}
' + f'
Invite created for {validated.username}:
' + f'
{url}
' ) @@ -155,21 +159,7 @@ async def update_user_profile( locale=locale, ) except ValidationError as exc: - error = exc.errors()[0] - field = error["loc"][-1] if error["loc"] else "input" - msg = error["msg"] - labels = { - "given_name": "Given name", - "family_name": "Family name", - "preferred_username": "Display name", - "email": "Email", - "phone_number": "Phone number", - "picture": "Picture URL", - "locale": "Locale", - } - label = labels.get(str(field), str(field)) - display_msg = msg.removeprefix("Value error, ") if error["type"] == "value_error" else f"{label}: {msg}" - return HTMLResponse(f'
{display_msg}
') + return HTMLResponse(format_validation_errors(exc)) user_repo = request.app.state.user_repo user = await user_repo.get_by_userid(userid) @@ -205,13 +195,17 @@ async def update_user_groups( if admin is None: return HTMLResponse("Forbidden", status_code=403) + try: + validated = GroupListInput(groups=groups) + except ValidationError as exc: + return HTMLResponse(format_validation_errors(exc)) + user_repo = request.app.state.user_repo user = await user_repo.get_by_userid(userid) if user is None: return HTMLResponse("User not found", status_code=404) - group_list = [g.strip() for g in groups.split(",") if g.strip()] - updated = user.model_copy(update={"groups": group_list}) + updated = user.model_copy(update={"groups": validated.group_list}) await user_repo.update(updated) return HTMLResponse('
Groups updated
') diff --git a/src/porchlight/manage/routes.py b/src/porchlight/manage/routes.py index a3c4887..7f15ca0 100644 --- a/src/porchlight/manage/routes.py +++ b/src/porchlight/manage/routes.py @@ -7,7 +7,7 @@ from pydantic import ValidationError from porchlight.dependencies import get_session_user from porchlight.models import PasswordCredential, WebAuthnCredential -from porchlight.validation import ProfileUpdate +from porchlight.validation import PasswordChange, PasswordSet, ProfileUpdate, format_validation_errors router = APIRouter(prefix="/manage", tags=["manage"]) @@ -55,6 +55,7 @@ async def set_password( request: Request, password: str = Form(), confirm: str = Form(), + current_password: str = Form(""), ) -> Response: session_user = get_session_user(request) if session_user is None: @@ -64,15 +65,30 @@ async def set_password( cred_repo = request.app.state.credential_repo password_service = request.app.state.password_service - if password != confirm: - return HTMLResponse('
Passwords do not match
') - - if len(password) < 8: - return HTMLResponse('
Password must be at least 8 characters
') - - password_hash = password_service.hash(password) - existing = await cred_repo.get_password_by_user(userid) + has_password = existing is not None + + # Validate input + try: + if has_password: + validated = PasswordChange( + current_password=current_password, + password=password, + confirm=confirm, + ) + else: + validated = PasswordSet(password=password, confirm=confirm) + except ValidationError as exc: + return HTMLResponse(format_validation_errors(exc)) + + # Verify current password if changing + if has_password: + if not password_service.verify(existing.password_hash, validated.current_password): + return HTMLResponse('
Current password is incorrect
') + + # Store new password + password_hash = password_service.hash(validated.password) + if existing is not None: await cred_repo.delete_password(userid) @@ -225,23 +241,7 @@ async def update_profile( locale=locale, ) except ValidationError as exc: - error = exc.errors()[0] - field = error["loc"][-1] if error["loc"] else "input" - msg = error["msg"] - # Produce user-friendly field labels - labels = { - "given_name": "Given name", - "family_name": "Family name", - "preferred_username": "Display name", - "email": "Email", - "phone_number": "Phone number", - "picture": "Picture URL", - "locale": "Locale", - } - label = labels.get(str(field), str(field)) - # Use custom message for value errors (e.g. picture URL), generic pydantic message otherwise - display_msg = msg.removeprefix("Value error, ") if error["type"] == "value_error" else f"{label}: {msg}" - return HTMLResponse(f'
{display_msg}
') + return HTMLResponse(format_validation_errors(exc)) user_repo = request.app.state.user_repo user = await user_repo.get_by_userid(userid) diff --git a/tests/test_admin_groups_validation.py b/tests/test_admin_groups_validation.py new file mode 100644 index 0000000..9d4a602 --- /dev/null +++ b/tests/test_admin_groups_validation.py @@ -0,0 +1,88 @@ +from datetime import UTC, datetime + +import pytest +from httpx import AsyncClient + +from porchlight.authn.password import PasswordHasher, PasswordService +from porchlight.models import PasswordCredential, User + +from tests.conftest import get_csrf_token + + +async def _setup_admin_and_target(client: AsyncClient) -> tuple[str, str]: + """Create admin + target user, login as admin, return (token, target_userid).""" + app = client._transport.app + user_repo = app.state.user_repo + cred_repo = app.state.credential_repo + + admin = User( + userid="admin-g01", + username="admin_g", + groups=["admin", "users"], + created_at=datetime.now(UTC), + updated_at=datetime.now(UTC), + ) + await user_repo.create(admin) + + target = User( + userid="target-g01", + username="target_g", + groups=["users"], + created_at=datetime.now(UTC), + updated_at=datetime.now(UTC), + ) + await user_repo.create(target) + + svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) + await cred_repo.create_password( + PasswordCredential(user_id=admin.userid, password_hash=svc.hash("AdminPass123!")) + ) + + token = await get_csrf_token(client) + await client.post( + "/login/password", + data={"username": "admin_g", "password": "AdminPass123!"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, + ) + return token, target.userid + + +@pytest.mark.asyncio +async def test_valid_groups(client: AsyncClient) -> None: + token, userid = await _setup_admin_and_target(client) + response = await client.post( + f"/admin/users/{userid}/groups", + data={"groups": "users, staff"}, + headers={"X-CSRF-Token": token}, + ) + assert "Groups updated" in response.text + + app = client._transport.app + user = await app.state.user_repo.get_by_userid(userid) + assert sorted(user.groups) == ["staff", "users"] + + +@pytest.mark.asyncio +async def test_invalid_group_name_rejected(client: AsyncClient) -> None: + token, userid = await _setup_admin_and_target(client) + response = await client.post( + f"/admin/users/{userid}/groups", + data={"groups": "users, Bad Group!"}, + headers={"X-CSRF-Token": token}, + ) + assert "alert" in response.text + + +@pytest.mark.asyncio +async def test_empty_groups_clears(client: AsyncClient) -> None: + token, userid = await _setup_admin_and_target(client) + response = await client.post( + f"/admin/users/{userid}/groups", + data={"groups": ""}, + headers={"X-CSRF-Token": token}, + ) + assert "Groups updated" in response.text + + app = client._transport.app + user = await app.state.user_repo.get_by_userid(userid) + assert user.groups == [] diff --git a/tests/test_admin_invite_validation.py b/tests/test_admin_invite_validation.py new file mode 100644 index 0000000..16cb5e6 --- /dev/null +++ b/tests/test_admin_invite_validation.py @@ -0,0 +1,74 @@ +from datetime import UTC, datetime + +import pytest +from httpx import AsyncClient + +from porchlight.authn.password import PasswordHasher, PasswordService +from porchlight.models import PasswordCredential, User + +from tests.conftest import get_csrf_token + + +async def _login_admin(client: AsyncClient) -> str: + """Create and login as admin user, return CSRF token.""" + app = client._transport.app + user_repo = app.state.user_repo + cred_repo = app.state.credential_repo + + user = User( + userid="admin-01", + username="admin", + groups=["admin", "users"], + created_at=datetime.now(UTC), + updated_at=datetime.now(UTC), + ) + await user_repo.create(user) + + svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) + await cred_repo.create_password( + PasswordCredential(user_id=user.userid, password_hash=svc.hash("AdminPass123!")) + ) + + token = await get_csrf_token(client) + await client.post( + "/login/password", + data={"username": "admin", "password": "AdminPass123!"}, + headers={"HX-Request": "true", "X-CSRF-Token": token}, + ) + return token + + +@pytest.mark.asyncio +async def test_invite_valid_username(client: AsyncClient) -> None: + token = await _login_admin(client) + response = await client.post( + "/admin/invite", + data={"username": "newuser@example.com"}, + headers={"X-CSRF-Token": token}, + ) + assert response.status_code == 200 + assert "Invite created" in response.text + + +@pytest.mark.asyncio +async def test_invite_empty_username_rejected(client: AsyncClient) -> None: + token = await _login_admin(client) + response = await client.post( + "/admin/invite", + data={"username": ""}, + headers={"X-CSRF-Token": token}, + ) + # Empty username is rejected — either by FastAPI (422) or validation (alert) + assert response.status_code == 422 or "alert" in response.text + + +@pytest.mark.asyncio +async def test_invite_invalid_username_rejected(client: AsyncClient) -> None: + token = await _login_admin(client) + response = await client.post( + "/admin/invite", + data={"username": "bad user