feat: wire validation models into admin routes and deduplicate error handling

Replace manual validation error formatting with shared helper in both
admin and manage profile routes. Add UsernameInput validation to invite
route and GroupListInput validation to groups route.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Johan Lundberg 2026-03-31 15:34:28 +02:00
parent 56c177c817
commit 72a93984f2
No known key found for this signature in database
GPG key ID: A6C152738D03C7D1
4 changed files with 205 additions and 49 deletions

View file

@ -6,7 +6,7 @@ from pydantic import ValidationError
from porchlight.dependencies import get_session_user
from porchlight.models import User
from porchlight.validation import ProfileUpdate
from porchlight.validation import GroupListInput, ProfileUpdate, UsernameInput, format_validation_errors
router = APIRouter(prefix="/admin", tags=["admin"])
@ -109,17 +109,21 @@ async def create_invite(
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
username = username.strip()
if not username:
return HTMLResponse('<div role="alert">Username is required</div>')
try:
validated = UsernameInput(username=username)
except ValidationError as exc:
return HTMLResponse(format_validation_errors(exc))
magic_link_service = request.app.state.magic_link_service
settings = request.app.state.settings
link = await magic_link_service.create(username=username, created_by=admin.username, note="admin invite")
link = await magic_link_service.create(
username=validated.username, created_by=admin.username, note="admin invite"
)
url = f"{settings.issuer}/register/{link.token}"
return HTMLResponse(
f'<div role="status">Invite created for <strong>{username}</strong>:</div><div class="invite-url">{url}</div>'
f'<div role="status">Invite created for <strong>{validated.username}</strong>:</div>'
f'<div class="invite-url">{url}</div>'
)
@ -155,21 +159,7 @@ async def update_user_profile(
locale=locale,
)
except ValidationError as exc:
error = exc.errors()[0]
field = error["loc"][-1] if error["loc"] else "input"
msg = error["msg"]
labels = {
"given_name": "Given name",
"family_name": "Family name",
"preferred_username": "Display name",
"email": "Email",
"phone_number": "Phone number",
"picture": "Picture URL",
"locale": "Locale",
}
label = labels.get(str(field), str(field))
display_msg = msg.removeprefix("Value error, ") if error["type"] == "value_error" else f"{label}: {msg}"
return HTMLResponse(f'<div role="alert">{display_msg}</div>')
return HTMLResponse(format_validation_errors(exc))
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
@ -205,13 +195,17 @@ async def update_user_groups(
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
try:
validated = GroupListInput(groups=groups)
except ValidationError as exc:
return HTMLResponse(format_validation_errors(exc))
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
if user is None:
return HTMLResponse("User not found", status_code=404)
group_list = [g.strip() for g in groups.split(",") if g.strip()]
updated = user.model_copy(update={"groups": group_list})
updated = user.model_copy(update={"groups": validated.group_list})
await user_repo.update(updated)
return HTMLResponse('<div role="status">Groups updated</div>')

View file

@ -7,7 +7,7 @@ from pydantic import ValidationError
from porchlight.dependencies import get_session_user
from porchlight.models import PasswordCredential, WebAuthnCredential
from porchlight.validation import ProfileUpdate
from porchlight.validation import PasswordChange, PasswordSet, ProfileUpdate, format_validation_errors
router = APIRouter(prefix="/manage", tags=["manage"])
@ -55,6 +55,7 @@ async def set_password(
request: Request,
password: str = Form(),
confirm: str = Form(),
current_password: str = Form(""),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@ -64,15 +65,30 @@ async def set_password(
cred_repo = request.app.state.credential_repo
password_service = request.app.state.password_service
if password != confirm:
return HTMLResponse('<div role="alert">Passwords do not match</div>')
if len(password) < 8:
return HTMLResponse('<div role="alert">Password must be at least 8 characters</div>')
password_hash = password_service.hash(password)
existing = await cred_repo.get_password_by_user(userid)
has_password = existing is not None
# Validate input
try:
if has_password:
validated = PasswordChange(
current_password=current_password,
password=password,
confirm=confirm,
)
else:
validated = PasswordSet(password=password, confirm=confirm)
except ValidationError as exc:
return HTMLResponse(format_validation_errors(exc))
# Verify current password if changing
if has_password:
if not password_service.verify(existing.password_hash, validated.current_password):
return HTMLResponse('<div role="alert">Current password is incorrect</div>')
# Store new password
password_hash = password_service.hash(validated.password)
if existing is not None:
await cred_repo.delete_password(userid)
@ -225,23 +241,7 @@ async def update_profile(
locale=locale,
)
except ValidationError as exc:
error = exc.errors()[0]
field = error["loc"][-1] if error["loc"] else "input"
msg = error["msg"]
# Produce user-friendly field labels
labels = {
"given_name": "Given name",
"family_name": "Family name",
"preferred_username": "Display name",
"email": "Email",
"phone_number": "Phone number",
"picture": "Picture URL",
"locale": "Locale",
}
label = labels.get(str(field), str(field))
# Use custom message for value errors (e.g. picture URL), generic pydantic message otherwise
display_msg = msg.removeprefix("Value error, ") if error["type"] == "value_error" else f"{label}: {msg}"
return HTMLResponse(f'<div role="alert">{display_msg}</div>')
return HTMLResponse(format_validation_errors(exc))
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)

View file

@ -0,0 +1,88 @@
from datetime import UTC, datetime
import pytest
from httpx import AsyncClient
from porchlight.authn.password import PasswordHasher, PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
async def _setup_admin_and_target(client: AsyncClient) -> tuple[str, str]:
"""Create admin + target user, login as admin, return (token, target_userid)."""
app = client._transport.app
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
admin = User(
userid="admin-g01",
username="admin_g",
groups=["admin", "users"],
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(admin)
target = User(
userid="target-g01",
username="target_g",
groups=["users"],
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(target)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(
PasswordCredential(user_id=admin.userid, password_hash=svc.hash("AdminPass123!"))
)
token = await get_csrf_token(client)
await client.post(
"/login/password",
data={"username": "admin_g", "password": "AdminPass123!"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
return token, target.userid
@pytest.mark.asyncio
async def test_valid_groups(client: AsyncClient) -> None:
token, userid = await _setup_admin_and_target(client)
response = await client.post(
f"/admin/users/{userid}/groups",
data={"groups": "users, staff"},
headers={"X-CSRF-Token": token},
)
assert "Groups updated" in response.text
app = client._transport.app
user = await app.state.user_repo.get_by_userid(userid)
assert sorted(user.groups) == ["staff", "users"]
@pytest.mark.asyncio
async def test_invalid_group_name_rejected(client: AsyncClient) -> None:
token, userid = await _setup_admin_and_target(client)
response = await client.post(
f"/admin/users/{userid}/groups",
data={"groups": "users, Bad Group!"},
headers={"X-CSRF-Token": token},
)
assert "alert" in response.text
@pytest.mark.asyncio
async def test_empty_groups_clears(client: AsyncClient) -> None:
token, userid = await _setup_admin_and_target(client)
response = await client.post(
f"/admin/users/{userid}/groups",
data={"groups": ""},
headers={"X-CSRF-Token": token},
)
assert "Groups updated" in response.text
app = client._transport.app
user = await app.state.user_repo.get_by_userid(userid)
assert user.groups == []

View file

@ -0,0 +1,74 @@
from datetime import UTC, datetime
import pytest
from httpx import AsyncClient
from porchlight.authn.password import PasswordHasher, PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
async def _login_admin(client: AsyncClient) -> str:
"""Create and login as admin user, return CSRF token."""
app = client._transport.app
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = User(
userid="admin-01",
username="admin",
groups=["admin", "users"],
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(
PasswordCredential(user_id=user.userid, password_hash=svc.hash("AdminPass123!"))
)
token = await get_csrf_token(client)
await client.post(
"/login/password",
data={"username": "admin", "password": "AdminPass123!"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
return token
@pytest.mark.asyncio
async def test_invite_valid_username(client: AsyncClient) -> None:
token = await _login_admin(client)
response = await client.post(
"/admin/invite",
data={"username": "newuser@example.com"},
headers={"X-CSRF-Token": token},
)
assert response.status_code == 200
assert "Invite created" in response.text
@pytest.mark.asyncio
async def test_invite_empty_username_rejected(client: AsyncClient) -> None:
token = await _login_admin(client)
response = await client.post(
"/admin/invite",
data={"username": ""},
headers={"X-CSRF-Token": token},
)
# Empty username is rejected — either by FastAPI (422) or validation (alert)
assert response.status_code == 422 or "alert" in response.text
@pytest.mark.asyncio
async def test_invite_invalid_username_rejected(client: AsyncClient) -> None:
token = await _login_admin(client)
response = await client.post(
"/admin/invite",
data={"username": "bad user<script>"},
headers={"X-CSRF-Token": token},
)
assert "alert" in response.text
assert "letters" in response.text.lower() or "Username" in response.text