feat: wire ProfileUpdate validation into manage profile route
This commit is contained in:
parent
3cbf7cda5f
commit
5fd63d61ff
2 changed files with 173 additions and 30 deletions
|
|
@ -1,12 +1,13 @@
|
|||
from base64 import urlsafe_b64decode
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from fastapi import APIRouter, Form, Request, Response
|
||||
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
|
||||
from fido2.webauthn import PublicKeyCredentialDescriptor, PublicKeyCredentialType
|
||||
from pydantic import ValidationError
|
||||
|
||||
from porchlight.dependencies import get_session_user
|
||||
from porchlight.models import PasswordCredential, WebAuthnCredential
|
||||
from porchlight.validation import ProfileUpdate
|
||||
|
||||
router = APIRouter(prefix="/manage", tags=["manage"])
|
||||
|
||||
|
|
@ -213,41 +214,51 @@ async def update_profile(
|
|||
|
||||
userid, _username = session_user
|
||||
|
||||
# Validate field lengths
|
||||
for field_name, value, max_len in [
|
||||
("Given name", given_name, 255),
|
||||
("Family name", family_name, 255),
|
||||
("Display name", preferred_username, 255),
|
||||
("Email", email, 255),
|
||||
("Phone number", phone_number, 50),
|
||||
("Picture URL", picture, 2048),
|
||||
("Locale", locale, 20),
|
||||
]:
|
||||
if len(value) > max_len:
|
||||
return HTMLResponse(f'<div role="alert">{field_name} is too long</div>')
|
||||
|
||||
# Validate email format
|
||||
if email and "@" not in email:
|
||||
return HTMLResponse('<div role="alert">Invalid email address</div>')
|
||||
|
||||
# Validate picture URL format
|
||||
if picture:
|
||||
parsed = urlparse(picture)
|
||||
if parsed.scheme not in ("http", "https") or not parsed.netloc:
|
||||
return HTMLResponse('<div role="alert">Picture URL must be a valid HTTP or HTTPS URL</div>')
|
||||
try:
|
||||
profile = ProfileUpdate(
|
||||
given_name=given_name,
|
||||
family_name=family_name,
|
||||
preferred_username=preferred_username,
|
||||
email=email,
|
||||
phone_number=phone_number,
|
||||
picture=picture,
|
||||
locale=locale,
|
||||
)
|
||||
except ValidationError as exc:
|
||||
error = exc.errors()[0]
|
||||
field = error["loc"][-1] if error["loc"] else "input"
|
||||
msg = error["msg"]
|
||||
# Produce user-friendly field labels
|
||||
labels = {
|
||||
"given_name": "Given name",
|
||||
"family_name": "Family name",
|
||||
"preferred_username": "Display name",
|
||||
"email": "Email",
|
||||
"phone_number": "Phone number",
|
||||
"picture": "Picture URL",
|
||||
"locale": "Locale",
|
||||
}
|
||||
label = labels.get(str(field), str(field))
|
||||
# Use custom message for value errors (e.g. picture URL), generic pydantic message otherwise
|
||||
if error["type"] == "value_error":
|
||||
# Strip Pydantic's "Value error, " prefix to get the original message
|
||||
display_msg = msg.removeprefix("Value error, ")
|
||||
else:
|
||||
display_msg = f"{label}: {msg}"
|
||||
return HTMLResponse(f'<div role="alert">{display_msg}</div>')
|
||||
|
||||
user_repo = request.app.state.user_repo
|
||||
user = await user_repo.get_by_userid(userid)
|
||||
|
||||
updated = user.model_copy(
|
||||
update={
|
||||
"given_name": given_name or None,
|
||||
"family_name": family_name or None,
|
||||
"preferred_username": preferred_username or None,
|
||||
"email": email or None,
|
||||
"phone_number": phone_number or None,
|
||||
"picture": picture or None,
|
||||
"locale": locale or None,
|
||||
"given_name": profile.given_name or None,
|
||||
"family_name": profile.family_name or None,
|
||||
"preferred_username": profile.preferred_username or None,
|
||||
"email": profile.email,
|
||||
"phone_number": profile.phone_number,
|
||||
"picture": profile.picture,
|
||||
"locale": profile.locale or None,
|
||||
}
|
||||
)
|
||||
await user_repo.update(updated)
|
||||
|
|
|
|||
132
tests/test_manage_profile.py
Normal file
132
tests/test_manage_profile.py
Normal file
|
|
@ -0,0 +1,132 @@
|
|||
from datetime import UTC, datetime
|
||||
|
||||
from argon2 import PasswordHasher
|
||||
from httpx import AsyncClient
|
||||
|
||||
from porchlight.authn.password import PasswordService
|
||||
from porchlight.models import PasswordCredential, User
|
||||
from tests.conftest import get_csrf_token
|
||||
|
||||
|
||||
async def _login(client: AsyncClient, username: str = "alice", userid: str = "user-01") -> None:
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
user_repo = app.state.user_repo
|
||||
cred_repo = app.state.credential_repo
|
||||
|
||||
user = await user_repo.get_by_username(username)
|
||||
if user is None:
|
||||
user = User(
|
||||
userid=userid,
|
||||
username=username,
|
||||
created_at=datetime.now(UTC),
|
||||
updated_at=datetime.now(UTC),
|
||||
)
|
||||
await user_repo.create(user)
|
||||
|
||||
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||
existing = await cred_repo.get_password_by_user(user.userid)
|
||||
if existing is None:
|
||||
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("password")))
|
||||
|
||||
token = await get_csrf_token(client)
|
||||
await client.post(
|
||||
"/login/password",
|
||||
data={"username": username, "password": "password"},
|
||||
headers={"HX-Request": "true", "X-CSRF-Token": token},
|
||||
)
|
||||
|
||||
|
||||
async def test_update_profile_success(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
token = await get_csrf_token(client)
|
||||
response = await client.post(
|
||||
"/manage/profile",
|
||||
data={
|
||||
"given_name": "Alice",
|
||||
"family_name": "Smith",
|
||||
"preferred_username": "ally",
|
||||
"email": "alice@example.com",
|
||||
"phone_number": "+46701234567",
|
||||
"picture": "https://example.com/alice.jpg",
|
||||
"locale": "en",
|
||||
},
|
||||
headers={"X-CSRF-Token": token},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "Profile updated" in response.text
|
||||
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
user = await app.state.user_repo.get_by_userid("user-01")
|
||||
assert user is not None
|
||||
assert user.given_name == "Alice"
|
||||
assert user.email == "alice@example.com"
|
||||
assert user.phone_number == "+46701234567"
|
||||
|
||||
|
||||
async def test_update_profile_invalid_email(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
token = await get_csrf_token(client)
|
||||
response = await client.post(
|
||||
"/manage/profile",
|
||||
data={"email": "not-an-email"},
|
||||
headers={"X-CSRF-Token": token},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "alert" in response.text
|
||||
assert "email" in response.text.lower()
|
||||
|
||||
|
||||
async def test_update_profile_invalid_phone(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
token = await get_csrf_token(client)
|
||||
response = await client.post(
|
||||
"/manage/profile",
|
||||
data={"phone_number": "not-a-phone"},
|
||||
headers={"X-CSRF-Token": token},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "alert" in response.text
|
||||
assert "phone" in response.text.lower()
|
||||
|
||||
|
||||
async def test_update_profile_phone_normalized(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
token = await get_csrf_token(client)
|
||||
response = await client.post(
|
||||
"/manage/profile",
|
||||
data={"phone_number": "+46 70 123 45 67"},
|
||||
headers={"X-CSRF-Token": token},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "Profile updated" in response.text
|
||||
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
user = await app.state.user_repo.get_by_userid("user-01")
|
||||
assert user is not None
|
||||
assert user.phone_number == "+46701234567"
|
||||
|
||||
|
||||
async def test_update_profile_invalid_picture(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
token = await get_csrf_token(client)
|
||||
response = await client.post(
|
||||
"/manage/profile",
|
||||
data={"picture": "not-a-url"},
|
||||
headers={"X-CSRF-Token": token},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "alert" in response.text
|
||||
assert "Picture URL" in response.text
|
||||
|
||||
|
||||
async def test_update_profile_field_too_long(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
token = await get_csrf_token(client)
|
||||
response = await client.post(
|
||||
"/manage/profile",
|
||||
data={"given_name": "x" * 256},
|
||||
headers={"X-CSRF-Token": token},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "alert" in response.text
|
||||
assert "Given name" in response.text
|
||||
Loading…
Add table
Add a link
Reference in a new issue