feat: add admin action routes (profile, groups, activate, credentials, invite, delete)

This commit is contained in:
Johan Lundberg 2026-02-19 13:47:36 +01:00
parent 2b8d3e9800
commit 3975d5ce88
No known key found for this signature in database
GPG key ID: A6C152738D03C7D1
2 changed files with 256 additions and 0 deletions

View file

@ -1,3 +1,6 @@
from base64 import urlsafe_b64decode
from urllib.parse import urlparse
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, RedirectResponse
@ -117,3 +120,229 @@ async def create_invite(
return HTMLResponse(
f'<div role="status">Invite created for <strong>{username}</strong>:</div><div class="invite-url">{url}</div>'
)
# --- Profile update ---
@router.post("/users/{userid}/profile", response_class=HTMLResponse)
async def update_user_profile(
request: Request,
userid: str,
given_name: str = Form(""),
family_name: str = Form(""),
preferred_username: str = Form(""),
email: str = Form(""),
phone_number: str = Form(""),
picture: str = Form(""),
locale: str = Form(""),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
return RedirectResponse("/login", status_code=303)
admin = await _get_admin_user(request)
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
# Validate
if email and "@" not in email:
return HTMLResponse('<div role="alert">Invalid email address</div>')
if picture:
parsed = urlparse(picture)
if parsed.scheme not in ("http", "https") or not parsed.netloc:
return HTMLResponse('<div role="alert">Picture URL must be a valid HTTP or HTTPS URL</div>')
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
if user is None:
return HTMLResponse("User not found", status_code=404)
updated = user.model_copy(
update={
"given_name": given_name or None,
"family_name": family_name or None,
"preferred_username": preferred_username or None,
"email": email or None,
"phone_number": phone_number or None,
"picture": picture or None,
"locale": locale or None,
}
)
await user_repo.update(updated)
return HTMLResponse('<div role="status">Profile updated</div>')
# --- Groups update ---
@router.post("/users/{userid}/groups", response_class=HTMLResponse)
async def update_user_groups(
request: Request,
userid: str,
groups: str = Form(""),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
return RedirectResponse("/login", status_code=303)
admin = await _get_admin_user(request)
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
if user is None:
return HTMLResponse("User not found", status_code=404)
group_list = [g.strip() for g in groups.split(",") if g.strip()]
updated = user.model_copy(update={"groups": group_list})
await user_repo.update(updated)
return HTMLResponse('<div role="status">Groups updated</div>')
# --- Activate / deactivate ---
@router.post("/users/{userid}/activate", response_class=HTMLResponse)
async def activate_user(request: Request, userid: str) -> Response:
session_user = get_session_user(request)
if session_user is None:
return RedirectResponse("/login", status_code=303)
admin = await _get_admin_user(request)
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
if user is None:
return HTMLResponse("User not found", status_code=404)
updated = user.model_copy(update={"active": True})
await user_repo.update(updated)
return HTMLResponse(
'<div role="status">User activated</div>'
f'<button class="btn-secondary" hx-post="/admin/users/{userid}/deactivate"'
' hx-target="#actions-section" hx-swap="innerHTML">Deactivate user</button>'
)
@router.post("/users/{userid}/deactivate", response_class=HTMLResponse)
async def deactivate_user(request: Request, userid: str) -> Response:
session_user = get_session_user(request)
if session_user is None:
return RedirectResponse("/login", status_code=303)
admin = await _get_admin_user(request)
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
if user is None:
return HTMLResponse("User not found", status_code=404)
updated = user.model_copy(update={"active": False})
await user_repo.update(updated)
return HTMLResponse(
'<div role="status">User deactivated</div>'
f'<button class="btn-secondary" hx-post="/admin/users/{userid}/activate"'
' hx-target="#actions-section" hx-swap="innerHTML">Activate user</button>'
)
# --- Delete credentials ---
@router.delete("/users/{userid}/credentials/password", response_class=HTMLResponse)
async def delete_user_password(request: Request, userid: str) -> Response:
session_user = get_session_user(request)
if session_user is None:
return RedirectResponse("/login", status_code=303)
admin = await _get_admin_user(request)
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
cred_repo = request.app.state.credential_repo
await cred_repo.delete_password(userid)
# Re-render credentials section
webauthn_credentials = await cred_repo.get_webauthn_by_user(userid)
templates = request.app.state.templates
return templates.TemplateResponse(
request,
"admin/_credentials_section.html",
{
"target_user": await request.app.state.user_repo.get_by_userid(userid),
"webauthn_credentials": webauthn_credentials,
"has_password": False,
},
)
@router.delete("/users/{userid}/credentials/webauthn/{credential_id_b64}", response_class=HTMLResponse)
async def delete_user_webauthn(request: Request, userid: str, credential_id_b64: str) -> Response:
session_user = get_session_user(request)
if session_user is None:
return RedirectResponse("/login", status_code=303)
admin = await _get_admin_user(request)
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
padded = credential_id_b64 + "=" * (-len(credential_id_b64) % 4)
credential_id = urlsafe_b64decode(padded)
cred_repo = request.app.state.credential_repo
await cred_repo.delete_webauthn(userid, credential_id)
# Re-render credentials section
webauthn_credentials = await cred_repo.get_webauthn_by_user(userid)
password = await cred_repo.get_password_by_user(userid)
templates = request.app.state.templates
return templates.TemplateResponse(
request,
"admin/_credentials_section.html",
{
"target_user": await request.app.state.user_repo.get_by_userid(userid),
"webauthn_credentials": webauthn_credentials,
"has_password": password is not None,
},
)
# --- Re-invite ---
@router.post("/users/{userid}/invite", response_class=HTMLResponse)
async def reinvite_user(request: Request, userid: str) -> Response:
session_user = get_session_user(request)
if session_user is None:
return RedirectResponse("/login", status_code=303)
admin = await _get_admin_user(request)
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
if user is None:
return HTMLResponse("User not found", status_code=404)
magic_link_service = request.app.state.magic_link_service
settings = request.app.state.settings
link = await magic_link_service.create(username=user.username, created_by=admin.username, note="admin re-invite")
url = f"{settings.issuer}/register/{link.token}"
return HTMLResponse(f'<div role="status">Invite link generated:</div><div class="invite-url">{url}</div>')
# --- Delete user ---
@router.delete("/users/{userid}", response_class=HTMLResponse)
async def delete_user(request: Request, userid: str) -> Response:
session_user = get_session_user(request)
if session_user is None:
return RedirectResponse("/login", status_code=303)
admin = await _get_admin_user(request)
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
# Prevent self-deletion
admin_userid, _ = get_session_user(request)
if userid == admin_userid:
return HTMLResponse('<div role="alert">Cannot delete your own account</div>')
user_repo = request.app.state.user_repo
deleted = await user_repo.delete(userid)
if not deleted:
return HTMLResponse("User not found", status_code=404)
return HTMLResponse(
status_code=200,
content='<div role="status">User deleted</div>',
headers={"HX-Redirect": "/admin/users"},
)

View file

@ -0,0 +1,27 @@
<h3>Password</h3>
{% if has_password %}
<p>Password is set.
<button class="btn-danger" hx-delete="/admin/users/{{ target_user.userid }}/credentials/password"
hx-target="#credentials-section" hx-swap="innerHTML"
hx-confirm="Remove this user's password?">Remove password</button>
</p>
{% else %}
<p>No password set.</p>
{% endif %}
<h3>Security keys</h3>
{% if webauthn_credentials %}
<ul>
{% for cred in webauthn_credentials %}
<li>
{{ cred.device_name or "Security key" }}
<small>(added {{ cred.created_at.strftime('%Y-%m-%d') }})</small>
<button class="btn-danger" hx-delete="/admin/users/{{ target_user.userid }}/credentials/webauthn/{{ cred.credential_id|b64encode }}"
hx-target="#credentials-section" hx-swap="innerHTML"
hx-confirm="Remove this security key?">Remove</button>
</li>
{% endfor %}
</ul>
{% else %}
<p>No security keys registered.</p>
{% endif %}