Compare commits

..

No commits in common. "baef5e0e2efcf898909759068347c0bb26b20e8c" and "2fc2bdcabbb827b4cd7fa68d05333e42fa54505a" have entirely different histories.

32 changed files with 125 additions and 812 deletions

View file

@ -11,15 +11,6 @@ from porchlight.validation import GroupListInput, ProfileUpdate, UsernameInput,
router = APIRouter(prefix="/admin", tags=["admin"])
ADMIN_GROUP = "admin"
async def _is_last_active_admin(request: Request, user: User) -> bool:
"""True if removing this user's admin access would leave zero active admins."""
if not user.active or ADMIN_GROUP not in user.groups:
return False
return await request.app.state.user_repo.count_active_admins() <= 1
async def _get_admin_user(request: Request) -> User | None:
"""Return the current user if they are an admin, else None."""
@ -207,9 +198,6 @@ async def update_user_groups(
if user is None:
return HTMLResponse("User not found", status_code=404)
if ADMIN_GROUP not in validated.group_list and await _is_last_active_admin(request, user):
return HTMLResponse('<div role="alert">Cannot remove the last active admin</div>')
updated = user.model_copy(update={"groups": validated.group_list})
await user_repo.update(updated)
return HTMLResponse('<div role="status">Groups updated</div>')
@ -253,9 +241,6 @@ async def deactivate_user(request: Request, userid: str) -> Response:
if user is None:
return HTMLResponse("User not found", status_code=404)
if await _is_last_active_admin(request, user):
return HTMLResponse('<div role="alert">Cannot deactivate the last active admin</div>')
updated = user.model_copy(update={"active": False})
await user_repo.update(updated)
return HTMLResponse(
@ -276,12 +261,10 @@ async def delete_user_password(request: Request, userid: str) -> Response:
return HTMLResponse("Forbidden", status_code=403)
cred_repo = request.app.state.credential_repo
# Refuse atomically if this is the user's last credential (avoids lockout).
deleted = await cred_repo.delete_password_if_not_last(userid)
await cred_repo.delete_password(userid)
# Re-render credentials section
webauthn_credentials = await cred_repo.get_webauthn_by_user(userid)
password = await cred_repo.get_password_by_user(userid)
templates = request.app.state.templates
return templates.TemplateResponse(
request,
@ -289,8 +272,7 @@ async def delete_user_password(request: Request, userid: str) -> Response:
{
"target_user": await request.app.state.user_repo.get_by_userid(userid),
"webauthn_credentials": webauthn_credentials,
"has_password": password is not None,
"error": None if deleted else "Cannot remove the user's last credential",
"has_password": False,
},
)
@ -307,8 +289,7 @@ async def delete_user_webauthn(request: Request, userid: str, credential_id_b64:
padded = credential_id_b64 + "=" * (-len(credential_id_b64) % 4)
credential_id = urlsafe_b64decode(padded)
cred_repo = request.app.state.credential_repo
# Refuse atomically if this is the user's last credential (avoids lockout).
deleted = await cred_repo.delete_webauthn_if_not_last(userid, credential_id)
await cred_repo.delete_webauthn(userid, credential_id)
# Re-render credentials section
webauthn_credentials = await cred_repo.get_webauthn_by_user(userid)
@ -321,7 +302,6 @@ async def delete_user_webauthn(request: Request, userid: str, credential_id_b64:
"target_user": await request.app.state.user_repo.get_by_userid(userid),
"webauthn_credentials": webauthn_credentials,
"has_password": password is not None,
"error": None if deleted else "Cannot remove the user's last credential",
},
)
@ -365,10 +345,6 @@ async def delete_user(request: Request, userid: str) -> Response:
return HTMLResponse('<div role="alert">Cannot delete your own account</div>')
user_repo = request.app.state.user_repo
target = await user_repo.get_by_userid(userid)
if target is not None and await _is_last_active_admin(request, target):
return HTMLResponse('<div role="alert">Cannot delete the last active admin</div>')
deleted = await user_repo.delete(userid)
if not deleted:
return HTMLResponse("User not found", status_code=404)

View file

@ -23,21 +23,6 @@ def _login_redirect_target(request: Request) -> str:
return "/manage/credentials"
def _establish_authenticated_session(request: Request, user: User) -> None:
"""Reset the session before recording the authenticated identity.
Clearing first defeats session fixation: any values an attacker planted in
the pre-auth session are dropped and the session cookie is reissued. A
pending OIDC authorization request is the only pre-auth state worth keeping.
"""
pending_oidc = request.session.get("oidc_auth_request")
request.session.clear()
if pending_oidc is not None:
request.session["oidc_auth_request"] = pending_oidc
request.session["userid"] = user.userid
request.session["username"] = user.username
@router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request) -> HTMLResponse:
templates = request.app.state.templates
@ -71,7 +56,8 @@ async def login_password(
if not user.active:
return HTMLResponse(error_html)
_establish_authenticated_session(request, user)
request.session["userid"] = user.userid
request.session["username"] = user.username
response = Response()
response.headers["HX-Redirect"] = _login_redirect_target(request)
@ -86,34 +72,12 @@ async def logout(request: Request) -> Response:
return response
@router.get("/register/{token}", response_class=HTMLResponse)
async def register_magic_link_page(request: Request, token: str) -> Response:
"""Show the registration confirmation page.
GET is side-effect free: it only validates the token (without consuming it)
and renders a form. The token is consumed by the POST below, so that simply
visiting the link (email scanners, prefetchers) cannot create a session.
"""
magic_link_service = request.app.state.magic_link_service
link = await magic_link_service.validate(token)
if link is None:
return HTMLResponse("<p>Invalid or expired registration link.</p>", status_code=400)
templates = request.app.state.templates
return templates.TemplateResponse(
request,
"register.html",
{"token": token, "username": link.username},
)
@router.post("/register/{token}")
@router.get("/register/{token}")
async def register_magic_link(request: Request, token: str) -> Response:
magic_link_service = request.app.state.magic_link_service
user_repo = request.app.state.user_repo
# Atomically validate and consume the token (single-use, no replay race).
link = await magic_link_service.consume(token)
link = await magic_link_service.validate(token)
if link is None:
return HTMLResponse("<p>Invalid or expired registration link.</p>", status_code=400)
@ -121,25 +85,16 @@ async def register_magic_link(request: Request, token: str) -> Response:
if existing_user is not None:
if not existing_user.active:
return HTMLResponse("<p>This account has been deactivated.</p>", status_code=400)
# An invite link is for account setup, not authentication. If the
# account already has credentials, refuse to establish a session —
# otherwise a re-invite would be a passwordless login. Account
# recovery must go through a separate, explicitly authenticated flow.
cred_repo = request.app.state.credential_repo
has_password = await cred_repo.get_password_by_user(existing_user.userid) is not None
has_webauthn = bool(await cred_repo.get_webauthn_by_user(existing_user.userid))
if has_password or has_webauthn:
return HTMLResponse(
"<p>This account is already set up. Please sign in instead.</p>",
status_code=400,
)
user = existing_user
else:
userid = await generate_unique_userid(user_repo)
user = User(userid=userid, username=link.username, groups=["users"])
await user_repo.create(user)
_establish_authenticated_session(request, user)
await magic_link_service.mark_used(token)
request.session["userid"] = user.userid
request.session["username"] = user.username
return RedirectResponse("/manage/credentials?setup=1", status_code=303)
@ -201,6 +156,7 @@ async def login_webauthn_complete(request: Request) -> Response:
if user is None or not user.active:
return JSONResponse({"error": "Authentication failed"}, status_code=400)
_establish_authenticated_session(request, user)
request.session["userid"] = user.userid
request.session["username"] = user.username
return JSONResponse({"redirect": _login_redirect_target(request)})

View file

@ -54,10 +54,6 @@ class Settings(BaseSettings):
# Rate limiting (disable for e2e/load tests that authenticate repeatedly)
rate_limit_enabled: bool = True
# Number of trusted reverse proxies in front of the app. When > 0, the
# client IP for rate limiting is taken from X-Forwarded-For, counting this
# many hops from the right. Keep 0 unless deployed behind a known proxy.
trusted_proxy_count: int = 0
# Signing keys
signing_key_path: str = "data/keys"

View file

@ -1,5 +1,3 @@
import hashlib
import hmac
import secrets
from datetime import UTC, datetime, timedelta
@ -8,23 +6,11 @@ from porchlight.store.protocols import MagicLinkRepository
class MagicLinkService:
"""Magic link token lifecycle management.
"""Magic link token lifecycle management."""
The raw token is only ever exposed to the user (in the registration URL).
Only a hash of the token is persisted, so a database disclosure does not
yield usable tokens. A pepper, when configured, keys the hash (HMAC);
otherwise a plain SHA-256 of the high-entropy token is stored.
"""
def __init__(self, repo: MagicLinkRepository, ttl: int = 86400, pepper: str | None = None) -> None:
def __init__(self, repo: MagicLinkRepository, ttl: int = 86400) -> None:
self._repo = repo
self._ttl = ttl
self._pepper = pepper
def _hash_token(self, token: str) -> str:
if self._pepper:
return hmac.new(self._pepper.encode(), token.encode(), hashlib.sha256).hexdigest()
return hashlib.sha256(token.encode()).hexdigest()
async def create(
self,
@ -32,52 +18,34 @@ class MagicLinkService:
created_by: str | None = None,
note: str | None = None,
) -> MagicLink:
"""Generate and store a new magic link for the given username.
Persists only the hashed token; the returned MagicLink carries the raw
token so the caller can build the registration URL.
"""
"""Generate and store a new magic link for the given username."""
token = secrets.token_urlsafe(32)
expires_at = datetime.now(UTC) + timedelta(seconds=self._ttl)
stored = MagicLink(
token=self._hash_token(token),
link = MagicLink(
token=token,
username=username,
expires_at=expires_at,
created_by=created_by,
note=note,
)
await self._repo.create(stored)
return stored.model_copy(update={"token": token})
return await self._repo.create(link)
async def validate(self, token: str) -> MagicLink | None:
"""Validate a magic link token.
Returns the MagicLink (with the raw token re-attached) if it exists, is
not used, and has not expired. Returns None otherwise.
Returns the MagicLink if it exists, is not used, and has not expired.
Returns None otherwise.
"""
link = await self._repo.get_by_token(self._hash_token(token))
link = await self._repo.get_by_token(token)
if link is None or link.used:
return None
if link.expires_at < datetime.now(UTC):
return None
return link.model_copy(update={"token": token})
return link
async def mark_used(self, token: str) -> bool:
"""Mark a magic link as used. Returns True if found and marked."""
return await self._repo.mark_used(self._hash_token(token))
async def consume(self, token: str) -> MagicLink | None:
"""Atomically validate and consume a token in one step.
Prefer this over validate()+mark_used(): it closes the replay race
where two concurrent requests could both pass validation before either
marks the token used. Returns the link (raw token re-attached) on
success, else None.
"""
link = await self._repo.consume(self._hash_token(token))
if link is None:
return None
return link.model_copy(update={"token": token})
return await self._repo.mark_used(token)
async def cleanup_expired(self) -> int:
"""Delete expired unused links. Returns count deleted."""

View file

@ -13,6 +13,13 @@ from porchlight.validation import PasswordChange, PasswordSet, ProfileUpdate, fo
router = APIRouter(prefix="/manage", tags=["manage"])
async def _count_credentials(cred_repo: object, userid: str) -> int:
"""Count total credentials (password + webauthn) for a user."""
webauthn = await cred_repo.get_webauthn_by_user(userid) # type: ignore[union-attr]
password = await cred_repo.get_password_by_user(userid) # type: ignore[union-attr]
return len(webauthn) + (1 if password else 0)
@router.get("/credentials", response_class=HTMLResponse)
async def credentials_page(request: Request) -> Response:
session_user = get_session_user(request)
@ -100,9 +107,11 @@ async def delete_password(request: Request) -> Response:
userid, _username = session_user
cred_repo = request.app.state.credential_repo
# Atomic: refuses if this is the user's last credential (not raceable).
if not await cred_repo.delete_password_if_not_last(userid):
count = await _count_credentials(cred_repo, userid)
if count <= 1:
return HTMLResponse('<div role="alert">Cannot remove your last credential</div>')
await cred_repo.delete_password(userid)
return HTMLResponse('<div role="status">Password removed</div>')
@ -173,9 +182,11 @@ async def delete_webauthn(request: Request, credential_id_b64: str) -> Response:
padded = credential_id_b64 + "=" * (-len(credential_id_b64) % 4)
credential_id = urlsafe_b64decode(padded)
# Atomic: refuses if this is the user's last credential (not raceable).
if not await cred_repo.delete_webauthn_if_not_last(userid, credential_id):
count = await _count_credentials(cred_repo, userid)
if count <= 1:
return HTMLResponse('<div role="alert">Cannot remove your last credential</div>')
await cred_repo.delete_webauthn(userid, credential_id)
return HTMLResponse('<div role="status">Security key removed</div>')

View file

@ -2,7 +2,6 @@
from __future__ import annotations
import html
import json
from types import SimpleNamespace
from urllib.parse import urlencode
@ -15,15 +14,6 @@ from porchlight.oidc.claims import PorchlightUserInfo, user_to_claims
router = APIRouter(tags=["oidc"])
def _error_page(message: object, status_code: int = 400, title: str = "Error") -> HTMLResponse:
"""Render an error page, escaping the (possibly request-derived) message."""
return HTMLResponse(
f"<h1>{html.escape(title)}</h1><p>{html.escape(str(message))}</p>",
status_code=status_code,
)
SCOPE_DESCRIPTIONS: dict[str, str] = {
"openid": "Sign you in (required)",
"profile": "Your name and profile information",
@ -69,11 +59,11 @@ async def authorization(request: Request) -> Response:
try:
parsed = endpoint.parse_request(query_params)
except Exception as exc:
return _error_page(exc, title="Invalid Request")
return HTMLResponse(f"<h1>Invalid Request</h1><p>{exc}</p>", status_code=400)
if "error" in parsed:
error_desc = parsed.get("error_description", parsed["error"])
return _error_page(error_desc)
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
# Check if user is authenticated
userid = request.session.get("userid")
@ -105,11 +95,11 @@ async def authorization_complete(request: Request) -> Response:
try:
parsed = endpoint.parse_request(auth_request_params)
except Exception as exc:
return _error_page(exc)
return HTMLResponse(f"<h1>Error</h1><p>{exc}</p>", status_code=400)
if "error" in parsed:
error_desc = parsed.get("error_description", parsed["error"])
return _error_page(error_desc)
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
return await _check_consent_or_complete(
request, oidc_server, endpoint, parsed, userid, username, auth_request_params
@ -182,7 +172,7 @@ async def _complete_authorization( # noqa: PLR0913
if "error" in result.get("response_args", {}):
response_args = result["response_args"]
error_desc = response_args.get("error_description", response_args["error"])
return _error_page(error_desc)
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
# Build redirect URL
response_args = result.get("response_args", {})
@ -348,10 +338,8 @@ async def consent_submit(request: Request) -> Response:
return RedirectResponse(f"{redirect_uri}?{params}", status_code=303)
return HTMLResponse("<h1>Error</h1><p>Invalid action</p>", status_code=400)
# Allow — collect approved scopes, rejecting anything outside the
# originally requested set (a forged form must not escalate scope).
requested_scopes = set(auth_params.get("scope", "openid").split())
approved_scopes: list[str] = [str(s) for s in form.getlist("scope") if str(s) in requested_scopes]
# Allow — collect approved scopes
approved_scopes: list[str] = [str(s) for s in form.getlist("scope")]
if "openid" not in approved_scopes:
approved_scopes = ["openid", *list(approved_scopes)]
@ -371,6 +359,6 @@ async def consent_submit(request: Request) -> Response:
if "error" in parsed:
raise ValueError(parsed.get("error_description", parsed["error"]))
except Exception as exc:
return _error_page(exc)
return HTMLResponse(f"<h1>Error</h1><p>{exc}</p>", status_code=400)
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username)

View file

@ -3,7 +3,6 @@
from pathlib import Path
from idpyoidc.server import Server
from idpyoidc.server.oauth2.add_on.pkce import CC_METHOD
from porchlight.config import Settings
from porchlight.oidc.claims import PorchlightUserInfo
@ -127,20 +126,6 @@ def _build_server_config(settings: Settings) -> dict:
"phone": ["phone_number", "phone_number_verified"],
},
"authentication": {},
# PKCE: advertise and verify S256 (only). Not "essential", so existing
# relying parties that do not yet send a code_challenge keep working;
# but any RP that does use PKCE must use S256 and is verified at the
# token endpoint. Set essential=True (or per-client pkce_essential) to
# require it once all clients have migrated.
"add_on": {
"pkce": {
"function": "idpyoidc.server.oauth2.add_on.pkce.add_support",
"kwargs": {
"essential": False,
"code_challenge_methods": {"S256": CC_METHOD["S256"]},
},
},
},
}

View file

@ -1,25 +1,4 @@
from slowapi import Limiter
from slowapi.util import get_remote_address
from starlette.requests import Request
def _client_identifier(request: Request) -> str:
"""Rate-limit key: the client IP, proxy-aware.
By default the direct peer address is used. When ``trusted_proxy_count`` is
configured (the number of trusted reverse proxies in front of the app), the
client address is taken from the ``X-Forwarded-For`` chain instead, counting
that many hops back from the right. The header is ignored unless proxies are
trusted, so it cannot be spoofed in a direct-connection deployment.
"""
settings = getattr(getattr(request.app, "state", None), "settings", None)
hops: int = getattr(settings, "trusted_proxy_count", 0) or 0
if hops > 0:
forwarded = request.headers.get("x-forwarded-for", "")
parts = [p.strip() for p in forwarded.split(",") if p.strip()]
if len(parts) >= hops:
return parts[-hops]
return get_remote_address(request)
limiter = Limiter(key_func=_client_identifier)
limiter = Limiter(key_func=get_remote_address)

View file

@ -205,10 +205,6 @@ main {
line-height: 1;
}
#user-table-container {
overflow-x: auto;
}
.admin-table {
width: 100%;
border-collapse: collapse;
@ -218,7 +214,7 @@ main {
.admin-table th,
.admin-table td {
text-align: left;
padding: var(--sp-2);
padding: var(--sp-2) var(--sp-3);
border-bottom: 1px solid var(--border);
}

View file

@ -25,8 +25,6 @@ class UserRepository(Protocol):
async def count_users(self, query: str | None = None) -> int: ...
async def count_active_admins(self) -> int: ...
async def delete(self, userid: str) -> bool: ...
@ -48,10 +46,6 @@ class CredentialRepository(Protocol):
async def delete_password(self, user_id: str) -> bool: ...
async def delete_password_if_not_last(self, user_id: str) -> bool: ...
async def delete_webauthn_if_not_last(self, user_id: str, credential_id: bytes) -> bool: ...
@runtime_checkable
class MagicLinkRepository(Protocol):
@ -61,8 +55,6 @@ class MagicLinkRepository(Protocol):
async def mark_used(self, token: str) -> bool: ...
async def consume(self, token: str) -> MagicLink | None: ...
async def delete_expired(self) -> int: ...

View file

@ -163,17 +163,6 @@ class SQLiteUserRepository:
row = await cursor.fetchone()
return row[0] if row else 0
async def count_active_admins(self) -> int:
async with self._db.execute(
"""
SELECT COUNT(*) FROM users u
JOIN user_groups g ON g.userid = u.userid
WHERE u.active = 1 AND g.group_name = 'admin'
"""
) as cursor:
row = await cursor.fetchone()
return row[0] if row else 0
async def delete(self, userid: str) -> bool:
cursor = await self._db.execute("DELETE FROM users WHERE userid = ?", (userid,))
await self._db.commit()
@ -277,41 +266,6 @@ class SQLiteCredentialRepository:
await self._db.commit()
return cursor.rowcount > 0
async def delete_password_if_not_last(self, user_id: str) -> bool:
"""Delete the password credential only if it is not the user's last
credential. The count and delete happen in one atomic statement, so it
is not raceable. Returns True if a row was deleted."""
cursor = await self._db.execute(
"""
DELETE FROM password_credentials
WHERE user_id = ?
AND (
(SELECT COUNT(*) FROM password_credentials WHERE user_id = ?)
+ (SELECT COUNT(*) FROM webauthn_credentials WHERE user_id = ?)
) > 1
""",
(user_id, user_id, user_id),
)
await self._db.commit()
return cursor.rowcount > 0
async def delete_webauthn_if_not_last(self, user_id: str, credential_id: bytes) -> bool:
"""Delete a WebAuthn credential only if it is not the user's last
credential, atomically. Returns True if a row was deleted."""
cursor = await self._db.execute(
"""
DELETE FROM webauthn_credentials
WHERE user_id = ? AND credential_id = ?
AND (
(SELECT COUNT(*) FROM password_credentials WHERE user_id = ?)
+ (SELECT COUNT(*) FROM webauthn_credentials WHERE user_id = ?)
) > 1
""",
(user_id, credential_id, user_id, user_id),
)
await self._db.commit()
return cursor.rowcount > 0
class SQLiteMagicLinkRepository:
def __init__(self, db: aiosqlite.Connection) -> None:
@ -357,25 +311,6 @@ class SQLiteMagicLinkRepository:
await self._db.commit()
return cursor.rowcount > 0
async def consume(self, token: str) -> MagicLink | None:
"""Atomically validate-and-mark a token as used.
The conditional UPDATE is the single point of decision, so two
concurrent requests cannot both consume the same token. Returns the
link if this call won the race (unused and unexpired), else None.
"""
now = datetime.now(UTC).isoformat()
cursor = await self._db.execute(
"UPDATE magic_links SET used = 1 WHERE token = ? AND used = 0 AND expires_at > ?",
(token, now),
)
await self._db.commit()
if cursor.rowcount == 0:
return None
async with self._db.execute("SELECT * FROM magic_links WHERE token = ?", (token,)) as c:
row = await c.fetchone()
return self._row_to_magic_link(row) if row is not None else None
async def delete_expired(self) -> int:
now = datetime.now(UTC).isoformat()
cursor = await self._db.execute("DELETE FROM magic_links WHERE expires_at < ? AND used = 0", (now,))

View file

@ -1,4 +1,3 @@
{% if error %}<div role="alert">{{ error }}</div>{% endif %}
<h3>Password</h3>
{% if has_password %}
<p>Password is set.

View file

@ -4,7 +4,6 @@
<nav class="admin-nav" aria-label="Administration">
<span class="admin-badge">Admin</span>
<a href="/admin/users" {% if active_page == "users" %}aria-current="page"{% endif %}>Users</a>
<a href="/manage/profile">Profile</a>
<button class="nav-logout" hx-post="/logout">Log out</button>
</nav>
{% block admin_content %}{% endblock %}

View file

@ -1,14 +0,0 @@
{% extends "base.html" %}
{% block title %}Set up your account — Porchlight{% endblock %}
{% block content %}
<h1>Set up your account</h1>
<p>You're about to set up the account for <strong>{{ username }}</strong>.</p>
<form method="post" action="/register/{{ token }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">
<button type="submit">Continue</button>
</form>
{% endblock %}

View file

@ -1,4 +1,3 @@
import html
import re
from typing import Annotated
from urllib.parse import urlparse
@ -167,9 +166,7 @@ def format_validation_errors(exc: ValidationError) -> str:
display_msg = f"{label}: {raw}"
else:
display_msg = f"{label}: {msg}"
# Escape: messages can echo user input (e.g. an invalid group name),
# and the result is interpolated into HTML below.
messages.append(html.escape(display_msg))
messages.append(display_msg)
if len(messages) == 1:
return f'<div role="alert">{messages[0]}</div>'

View file

@ -9,9 +9,7 @@ test.describe('Full user journey', () => {
expect(fixtures.register_token).toBeTruthy();
// ---- Step 1: Register via magic link ----
// GET shows a confirmation page; submitting it (POST) consumes the token.
await page.goto(`/register/${fixtures.register_token}`);
await page.click('button[type="submit"]');
// Should redirect to /manage/credentials?setup=1
await page.waitForURL('**/manage/credentials?setup=1', { timeout: 5000 });

View file

@ -328,15 +328,11 @@ async def test_delete_password_credential(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob")
# Create a password credential for the target user, plus a second
# credential so the password is not the last one (last-credential guard).
# Create a password credential for the target user
app = client._transport.app # type: ignore[union-attr]
cred_repo = app.state.credential_repo
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("bobpass")))
await cred_repo.create_webauthn(
WebAuthnCredential(user_id=target.userid, credential_id=b"\xaa\xbb", public_key=b"\x00" * 32)
)
token = await get_csrf_token(client)
response = await client.request(
@ -355,8 +351,7 @@ async def test_delete_webauthn_credential(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob")
# Create a webauthn credential for the target user, plus a second
# credential so the key is not the last one (last-credential guard).
# Create a webauthn credential for the target user
app = client._transport.app # type: ignore[union-attr]
cred_repo = app.state.credential_repo
credential_id = b"\x01\x02\x03\x04\x05\x06\x07\x08"
@ -369,8 +364,6 @@ async def test_delete_webauthn_credential(client: AsyncClient) -> None:
device_name="test-key",
)
)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("bobpass")))
# URL uses base64url without padding
credential_id_b64 = urlsafe_b64encode(credential_id).decode().rstrip("=")
@ -441,92 +434,3 @@ async def test_reinvite_user_404_for_nonexistent(client: AsyncClient) -> None:
response = await client.post("/admin/users/nonexistent-id/invite", headers={"X-CSRF-Token": token})
assert response.status_code == 404
assert "not found" in response.text.lower()
@pytest.mark.asyncio
async def test_admin_cannot_delete_users_last_credential(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, userid="lonecred-01", username="lonecred")
app = client._transport.app # type: ignore[union-attr]
cred_repo = app.state.credential_repo
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("x")))
token = await get_csrf_token(client)
res = await client.request(
"DELETE",
f"/admin/users/{target.userid}/credentials/password",
headers={"X-CSRF-Token": token, "HX-Request": "true"},
)
# The last credential must survive, and the admin is told why.
assert await cred_repo.get_password_by_user(target.userid) is not None
assert "last credential" in res.text.lower()
async def _make_sole_admin_target(client: AsyncClient) -> User:
"""Create a second admin, then return it as the target. The logged-in
admin from _login plus this one means 2 admins; callers deactivate the
_login admin first if they need this to be the *last* admin."""
return await _create_target_user(client, userid="admin2-01", username="admin2", groups=["admin"])
@pytest.mark.asyncio
async def test_cannot_deactivate_last_active_admin(client: AsyncClient) -> None:
await _login(client) # the only admin
app = client._transport.app # type: ignore[union-attr]
admin = await app.state.user_repo.get_by_username("admin")
token = await get_csrf_token(client)
res = await client.post(
f"/admin/users/{admin.userid}/deactivate",
headers={"X-CSRF-Token": token, "HX-Request": "true"},
)
still = await app.state.user_repo.get_by_userid(admin.userid)
assert still.active is True
msg = res.text.lower()
assert "last" in msg
assert "admin" in msg
@pytest.mark.asyncio
async def test_cannot_remove_admin_group_from_last_admin(client: AsyncClient) -> None:
await _login(client)
app = client._transport.app # type: ignore[union-attr]
admin = await app.state.user_repo.get_by_username("admin")
token = await get_csrf_token(client)
res = await client.post(
f"/admin/users/{admin.userid}/groups",
data={"groups": "users"}, # drops "admin"
headers={"X-CSRF-Token": token, "HX-Request": "true"},
)
still = await app.state.user_repo.get_by_userid(admin.userid)
assert "admin" in still.groups
msg = res.text.lower()
assert "last" in msg
assert "admin" in msg
@pytest.mark.asyncio
async def test_cannot_delete_last_active_admin(client: AsyncClient) -> None:
await _login(client)
app = client._transport.app # type: ignore[union-attr]
# A second, separate admin to delete (not self — self-delete is blocked separately).
target = await _create_target_user(client, userid="otheradmin-01", username="otheradmin", groups=["admin"])
# Deactivate the logged-in admin's peer? Instead make target the only ACTIVE admin
# by deactivating the _login admin via repo, so deleting target would orphan admins.
login_admin = await app.state.user_repo.get_by_username("admin")
await app.state.user_repo.update(login_admin.model_copy(update={"active": False}))
token = await get_csrf_token(client)
res = await client.request(
"DELETE",
f"/admin/users/{target.userid}",
headers={"X-CSRF-Token": token, "HX-Request": "true"},
)
assert await app.state.user_repo.get_by_userid(target.userid) is not None
msg = res.text.lower()
assert "last" in msg
assert "admin" in msg

View file

@ -41,8 +41,6 @@ async def _setup_admin_and_target(client: AsyncClient) -> tuple[str, str]:
data={"username": "admin_g", "password": "AdminPass123!"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
# Login resets the session (fixation defense); fetch a fresh CSRF token.
token = await get_csrf_token(client)
return token, target.userid

View file

@ -32,8 +32,7 @@ async def _login_admin(client: AsyncClient) -> str:
data={"username": "admin", "password": "AdminPass123!"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
# Login resets the session (fixation defense); fetch a fresh CSRF token.
return await get_csrf_token(client)
return token
@pytest.mark.asyncio

View file

@ -1,42 +0,0 @@
from datetime import UTC, datetime
from httpx import AsyncClient
from porchlight.authn.password import PasswordHasher, PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
async def _create_active_user(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
user = User(
userid="login-01",
username="loginuser",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await app.state.user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await app.state.credential_repo.create_password(
PasswordCredential(user_id=user.userid, password_hash=svc.hash("password123!Secure"))
)
async def test_login_resets_session_to_prevent_fixation(client: AsyncClient) -> None:
"""Logging in must reset the pre-auth session so an attacker-fixed session
cannot become authenticated. We observe this via the CSRF token (stored in
the session) changing across login."""
await _create_active_user(client)
pre_login_token = await get_csrf_token(client)
res = await client.post(
"/login/password",
data={"username": "loginuser", "password": "password123!Secure"},
headers={"HX-Request": "true", "X-CSRF-Token": pre_login_token},
)
assert res.status_code == 200
assert res.headers.get("HX-Redirect") # logged in
post_login_token = await get_csrf_token(client)
assert post_login_token != pre_login_token

View file

@ -1,10 +1,8 @@
from argon2 import PasswordHasher
from datetime import UTC, datetime, timedelta
from httpx import AsyncClient
from porchlight.authn.password import PasswordService
from porchlight.invite.service import MagicLinkService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
from porchlight.models import MagicLink, User
async def test_register_invalid_token_returns_error_page(client: AsyncClient) -> None:
@ -15,48 +13,42 @@ async def test_register_invalid_token_returns_error_page(client: AsyncClient) ->
async def test_register_expired_token_returns_error_page(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
# An already-expired link (negative TTL), stored hashed like the real service.
expired_service = MagicLinkService(repo=app.state.magic_link_repo, ttl=-3600)
link = await expired_service.create(username="newuser")
repo = app.state.magic_link_repo
await repo.create(
MagicLink(
token="expired",
username="newuser",
expires_at=datetime.now(UTC) - timedelta(hours=1),
)
)
res = await client.get(f"/register/{link.token}", follow_redirects=False)
res = await client.get("/register/expired", follow_redirects=False)
assert res.status_code == 400
assert "Invalid or expired" in res.text
async def test_register_get_does_not_consume_token(client: AsyncClient) -> None:
"""GET is side-effect free: it shows a confirmation form and must not
consume the token or create a session."""
async def test_register_valid_token_creates_user_and_redirects(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
magic_link_service = app.state.magic_link_service
link = await magic_link_service.create(username="newuser")
res = await client.get(f"/register/{link.token}", follow_redirects=False)
assert res.status_code == 200
assert f'action="/register/{link.token}"' in res.text
# Token still valid (not consumed by the GET).
assert await magic_link_service.validate(link.token) is not None
async def test_register_post_creates_user_and_redirects(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
magic_link_service = app.state.magic_link_service
magic_link_repo = app.state.magic_link_repo
user_repo = app.state.user_repo
link = await magic_link_service.create(username="newuser")
csrf = await get_csrf_token(client)
res = await client.post(
f"/register/{link.token}",
headers={"X-CSRF-Token": csrf},
follow_redirects=False,
await magic_link_repo.create(
MagicLink(
token="t1",
username="newuser",
expires_at=datetime.now(UTC) + timedelta(hours=1),
)
)
res = await client.get("/register/t1", follow_redirects=False)
assert res.status_code in (302, 303)
assert "/manage/credentials" in res.headers["location"]
assert "setup=1" in res.headers["location"]
# Token should be consumed (no longer valid)
assert await magic_link_service.validate(link.token) is None
# Token should be marked used
link = await magic_link_repo.get_by_token("t1")
assert link is not None
assert link.used is True
# User should exist
user = await user_repo.get_by_username("newuser")
@ -64,81 +56,53 @@ async def test_register_post_creates_user_and_redirects(client: AsyncClient) ->
assert "users" in user.groups
async def test_register_post_requires_csrf(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
magic_link_service = app.state.magic_link_service
link = await magic_link_service.create(username="newuser")
res = await client.post(f"/register/{link.token}", follow_redirects=False)
assert res.status_code == 403
# Token must not have been consumed by the rejected request.
assert await magic_link_service.validate(link.token) is not None
async def test_register_used_token_returns_error(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
magic_link_service = app.state.magic_link_service
link = await magic_link_service.create(username="newuser")
await magic_link_service.mark_used(link.token)
repo = app.state.magic_link_repo
await repo.create(
MagicLink(
token="used",
username="newuser",
expires_at=datetime.now(UTC) + timedelta(hours=1),
used=True,
)
)
res = await client.get(f"/register/{link.token}", follow_redirects=False)
res = await client.get("/register/used", follow_redirects=False)
assert res.status_code == 400
async def test_register_existing_user_logs_in_and_redirects(client: AsyncClient) -> None:
"""When initial-admin creates a user (no credentials yet), the invite link
should let them set up their account."""
"""When initial-admin creates a user, the invite link should log them in."""
app = client._transport.app # type: ignore[union-attr]
magic_link_service = app.state.magic_link_service
magic_link_repo = app.state.magic_link_repo
user_repo = app.state.user_repo
# Pre-create the user (as initial-admin would), with no credentials.
# Pre-create the user (as initial-admin would)
user = User(userid="lusab-bansen", username="admin", groups=["admin", "users"])
await user_repo.create(user)
link = await magic_link_service.create(username="admin")
csrf = await get_csrf_token(client)
res = await client.post(
f"/register/{link.token}",
headers={"X-CSRF-Token": csrf},
follow_redirects=False,
# Create invite for the same username
await magic_link_repo.create(
MagicLink(
token="admin-setup",
username="admin",
expires_at=datetime.now(UTC) + timedelta(hours=1),
)
)
res = await client.get("/register/admin-setup", follow_redirects=False)
assert res.status_code in (302, 303)
assert "/manage/credentials" in res.headers["location"]
assert "setup=1" in res.headers["location"]
# Token should be consumed
assert await magic_link_service.validate(link.token) is None
# Token should be marked used
link = await magic_link_repo.get_by_token("admin-setup")
assert link is not None
assert link.used is True
# Original user should still exist with original groups
existing = await user_repo.get_by_username("admin")
assert existing is not None
assert existing.userid == "lusab-bansen"
assert "admin" in existing.groups
async def test_register_rejects_user_that_already_has_credentials(client: AsyncClient) -> None:
"""An invite/re-invite link must not act as a passwordless login for an
account that already has credentials. Recovery is a separate flow."""
app = client._transport.app # type: ignore[union-attr]
magic_link_service = app.state.magic_link_service
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = User(userid="lusab-hascreds", username="hascreds", groups=["users"])
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("existing-pass")))
link = await magic_link_service.create(username="hascreds")
csrf = await get_csrf_token(client)
res = await client.post(
f"/register/{link.token}",
headers={"X-CSRF-Token": csrf},
follow_redirects=False,
)
# No passwordless login: rejected, not redirected to setup.
assert res.status_code == 400
assert "setup=1" not in res.headers.get("location", "")

View file

@ -61,11 +61,6 @@ async def test_inactive_user_cannot_register_magic_link(client: AsyncClient) ->
link = await magic_link_service.create(username="deactivated", created_by="admin", note="test")
csrf = await get_csrf_token(client)
response = await client.post(
f"/register/{link.token}",
headers={"X-CSRF-Token": csrf},
follow_redirects=False,
)
response = await client.get(f"/register/{link.token}", follow_redirects=False)
assert response.status_code == 400 or "deactivated" in response.text.lower() or "Invalid" in response.text

View file

@ -42,18 +42,6 @@ async def test_create_returns_magic_link(service: MagicLinkService) -> None:
assert link.expires_at > datetime.now(UTC)
async def test_token_stored_hashed_not_plaintext(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None:
# The raw token handed to the user must never be the value persisted in
# the store; a DB read must not yield a usable token.
link = await service.create(username="alice")
raw = link.token
assert await repo.get_by_token(raw) is None
# ...but the raw token must still validate through the service.
validated = await service.validate(raw)
assert validated is not None
assert validated.username == "alice"
async def test_create_generates_unique_tokens(service: MagicLinkService) -> None:
link1 = await service.create(username="alice")
link2 = await service.create(username="bob")
@ -102,22 +90,6 @@ async def test_validate_expired_token(service: MagicLinkService, repo: SQLiteMag
assert result is None
async def test_consume_validates_and_is_single_use(service: MagicLinkService) -> None:
link = await service.create(username="alice")
consumed = await service.consume(link.token)
assert consumed is not None
assert consumed.username == "alice"
assert consumed.token == link.token # raw token re-attached
# A second consume of the same token fails.
assert await service.consume(link.token) is None
async def test_consume_expired_returns_none(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None:
expired_service = MagicLinkService(repo=repo, ttl=-1)
link = await expired_service.create(username="alice")
assert await service.consume(link.token) is None
async def test_mark_used_returns_true(service: MagicLinkService) -> None:
link = await service.create(username="alice")
result = await service.mark_used(link.token)

View file

@ -2,16 +2,6 @@ import secrets
from httpx import AsyncClient
from porchlight.oidc.endpoints import _error_page
def test_error_page_escapes_html() -> None:
# OIDC error pages must not interpolate request-derived text as raw HTML.
resp = _error_page("<script>alert(1)</script>")
body = resp.body.decode()
assert "<script>" not in body
assert "&lt;script&gt;alert(1)&lt;/script&gt;" in body
def _register_test_client(
client: AsyncClient,

View file

@ -223,30 +223,6 @@ async def test_partial_consent_filters_scopes(client: AsyncClient) -> None:
assert set(consent.scopes) == {"openid", "profile"}
async def test_consent_rejects_scopes_outside_request(client: AsyncClient) -> None:
"""A forged consent POST cannot approve scopes that were not requested."""
app = client._transport.app # type: ignore[union-attr]
_register_test_rp(app)
await _create_test_user(app)
# Only openid + profile were requested; "email" is forged into the POST.
await _login_and_start_auth(client, scope="openid profile")
token = await get_csrf_token(client)
res = await client.post(
"/consent",
data={"action": "allow", "scope": ["openid", "profile", "email"]},
headers={"X-CSRF-Token": token},
follow_redirects=False,
)
assert res.status_code == 303
consent_repo = app.state.consent_repo
consent = await consent_repo.get_consent("lusab-consent", "consent-rp")
assert consent is not None
assert "email" not in consent.scopes
assert set(consent.scopes) == {"openid", "profile"}
# -- Test helpers --

View file

@ -1,7 +1,6 @@
import hashlib
import json
import secrets
from base64 import b64encode, urlsafe_b64encode
from base64 import b64encode
from datetime import UTC, datetime
from typing import Any
from urllib.parse import parse_qs, urlparse
@ -55,36 +54,19 @@ async def _setup_rp_and_user(app: FastAPI) -> None:
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
def _pkce_pair() -> tuple[str, str]:
"""Return a (code_verifier, S256 code_challenge) pair."""
verifier = secrets.token_urlsafe(64)
challenge = urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()).rstrip(b"=").decode()
return verifier, challenge
async def _login_and_authorize(
client: AsyncClient,
state: str,
nonce: str,
code_challenge: str | None = None,
code_challenge_method: str | None = None,
) -> str:
async def _login_and_authorize(client: AsyncClient, state: str, nonce: str) -> str:
"""Perform authorization request, login, consent, and return the auth code."""
params = {
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"scope": "openid profile email",
"state": state,
"nonce": nonce,
}
if code_challenge is not None:
params["code_challenge"] = code_challenge
params["code_challenge_method"] = code_challenge_method or "S256"
# Step 1: Authorization request (unauthenticated) -> redirect to /login
auth_res = await client.get(
"/authorization",
params=params,
params={
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"scope": "openid profile email",
"state": state,
"nonce": nonce,
},
follow_redirects=False,
)
assert auth_res.status_code in (302, 303), f"Expected redirect to /login, got {auth_res.status_code}"
@ -134,19 +116,16 @@ async def _login_and_authorize(
return code
async def _exchange_token(client: AsyncClient, code: str, code_verifier: str | None = None) -> dict[str, Any]:
async def _exchange_token(client: AsyncClient, code: str) -> dict[str, Any]:
"""Exchange authorization code for tokens."""
auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
}
if code_verifier is not None:
data["code_verifier"] = code_verifier
token_res = await client.post(
"/token",
data=data,
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
},
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
@ -220,46 +199,3 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None:
# sub should match between id_token and userinfo
assert userinfo_data["sub"] == id_token_sub
async def test_discovery_advertises_s256_pkce(client: AsyncClient) -> None:
res = await client.get("/.well-known/openid-configuration")
assert res.status_code == 200
assert res.json().get("code_challenge_methods_supported") == ["S256"]
async def test_pkce_s256_flow_succeeds(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
await _setup_rp_and_user(app)
state, nonce = secrets.token_urlsafe(16), secrets.token_urlsafe(16)
verifier, challenge = _pkce_pair()
code = await _login_and_authorize(client, state, nonce, code_challenge=challenge)
token_data = await _exchange_token(client, code, code_verifier=verifier)
assert "access_token" in token_data
async def test_pkce_wrong_verifier_rejected(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
await _setup_rp_and_user(app)
state, nonce = secrets.token_urlsafe(16), secrets.token_urlsafe(16)
_verifier, challenge = _pkce_pair()
code = await _login_and_authorize(client, state, nonce, code_challenge=challenge)
# Exchange with a verifier that does not match the challenge -> rejected.
auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()
res = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"code_verifier": "wrong-" + secrets.token_urlsafe(48),
},
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
},
)
assert res.status_code != 200, f"PKCE verifier mismatch should be rejected, got {res.status_code}: {res.text}"

View file

@ -31,8 +31,7 @@ async def _login_user_with_password(client: AsyncClient) -> str:
data={"username": "pwuser", "password": "OldPass123!ok"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
# Login resets the session (fixation defense); fetch a fresh CSRF token.
return await get_csrf_token(client)
return token
async def _login_user_without_password(client: AsyncClient) -> str:
@ -48,15 +47,11 @@ async def _login_user_without_password(client: AsyncClient) -> str:
)
await user_repo.create(user)
# Simulate session login via magic link (GET shows the form, POST consumes)
# Simulate session login via magic link
token = await get_csrf_token(client)
magic_link_service = app.state.magic_link_service
link = await magic_link_service.create(username="newuser", created_by="admin", note="test")
await client.post(
f"/register/{link.token}",
headers={"X-CSRF-Token": token},
follow_redirects=False,
)
await client.get(f"/register/{link.token}", follow_redirects=False)
# Re-fetch CSRF token after session change
token = await get_csrf_token(client)
return token

View file

@ -1,10 +1,6 @@
from types import SimpleNamespace
import pytest
from httpx import AsyncClient
from starlette.requests import Request
from porchlight.rate_limit import _client_identifier
from tests.conftest import get_csrf_token
@ -27,38 +23,3 @@ async def test_password_login_rate_limited(client: AsyncClient) -> None:
)
assert response.status_code == 429
def _make_request(headers: dict[str, str], client_host: str, trusted_proxy_count: int) -> Request:
scope = {
"type": "http",
"headers": [(k.lower().encode(), v.encode()) for k, v in headers.items()],
"client": (client_host, 12345),
"app": SimpleNamespace(
state=SimpleNamespace(settings=SimpleNamespace(trusted_proxy_count=trusted_proxy_count))
),
}
return Request(scope)
def test_client_identifier_ignores_forwarded_when_no_trusted_proxy() -> None:
req = _make_request({"x-forwarded-for": "203.0.113.9"}, "127.0.0.1", trusted_proxy_count=0)
# Spoofable header must be ignored when no proxy is trusted.
assert _client_identifier(req) == "127.0.0.1"
def test_client_identifier_uses_forwarded_with_one_trusted_proxy() -> None:
req = _make_request({"x-forwarded-for": "203.0.113.9"}, "10.0.0.1", trusted_proxy_count=1)
assert _client_identifier(req) == "203.0.113.9"
def test_client_identifier_picks_correct_hop_with_two_trusted_proxies() -> None:
req = _make_request({"x-forwarded-for": "client, proxyA, proxyB"}, "10.0.0.1", trusted_proxy_count=2)
# Two trusted proxies appended their peers; the client as seen by the
# outermost trusted proxy is the 2nd-from-right entry.
assert _client_identifier(req) == "proxyA"
def test_client_identifier_falls_back_when_forwarded_missing() -> None:
req = _make_request({}, "10.0.0.1", trusted_proxy_count=1)
assert _client_identifier(req) == "10.0.0.1"

View file

@ -182,48 +182,3 @@ async def test_create_duplicate_webauthn(credential_repo: SQLiteCredentialReposi
with pytest.raises(DuplicateError):
await credential_repo.create_webauthn(cred)
async def test_delete_password_if_not_last_refuses_last(
credential_repo: SQLiteCredentialRepository, alice: User
) -> None:
await credential_repo.create_password(PasswordCredential(user_id=alice.userid, password_hash="h"))
# Only credential -> must refuse and leave it in place.
assert await credential_repo.delete_password_if_not_last(alice.userid) is False
assert await credential_repo.get_password_by_user(alice.userid) is not None
async def test_delete_password_if_not_last_allows_when_others_exist(
credential_repo: SQLiteCredentialRepository, alice: User
) -> None:
await credential_repo.create_password(PasswordCredential(user_id=alice.userid, password_hash="h"))
await credential_repo.create_webauthn(
WebAuthnCredential(user_id=alice.userid, credential_id=b"\x01", public_key=b"\x02")
)
assert await credential_repo.delete_password_if_not_last(alice.userid) is True
assert await credential_repo.get_password_by_user(alice.userid) is None
async def test_delete_webauthn_if_not_last_refuses_last(
credential_repo: SQLiteCredentialRepository, alice: User
) -> None:
await credential_repo.create_webauthn(
WebAuthnCredential(user_id=alice.userid, credential_id=b"\x01", public_key=b"\x02")
)
assert await credential_repo.delete_webauthn_if_not_last(alice.userid, b"\x01") is False
assert len(await credential_repo.get_webauthn_by_user(alice.userid)) == 1
async def test_delete_webauthn_if_not_last_allows_when_others_exist(
credential_repo: SQLiteCredentialRepository, alice: User
) -> None:
await credential_repo.create_webauthn(
WebAuthnCredential(user_id=alice.userid, credential_id=b"\x01", public_key=b"\x02")
)
await credential_repo.create_password(PasswordCredential(user_id=alice.userid, password_hash="h"))
assert await credential_repo.delete_webauthn_if_not_last(alice.userid, b"\x01") is True
assert len(await credential_repo.get_webauthn_by_user(alice.userid)) == 0

View file

@ -63,34 +63,6 @@ async def test_mark_used_not_found(magic_link_repo: SQLiteMagicLinkRepository) -
assert marked is False
async def test_consume_marks_used_and_returns_link(magic_link_repo: SQLiteMagicLinkRepository) -> None:
await magic_link_repo.create(_make_link())
consumed = await magic_link_repo.consume("abc123")
assert consumed is not None
assert consumed.used is True
assert consumed.username == "alice"
async def test_consume_is_single_use(magic_link_repo: SQLiteMagicLinkRepository) -> None:
await magic_link_repo.create(_make_link())
first = await magic_link_repo.consume("abc123")
assert first is not None
# A second consume of the same token must fail (atomic single-use).
second = await magic_link_repo.consume("abc123")
assert second is None
async def test_consume_expired_returns_none(magic_link_repo: SQLiteMagicLinkRepository) -> None:
await magic_link_repo.create(_make_link(token="exp", expires_at=datetime.now(UTC) - timedelta(hours=1)))
assert await magic_link_repo.consume("exp") is None
async def test_consume_nonexistent_returns_none(magic_link_repo: SQLiteMagicLinkRepository) -> None:
assert await magic_link_repo.consume("nope") is None
async def test_delete_expired(magic_link_repo: SQLiteMagicLinkRepository) -> None:
expired = _make_link(token="expired", expires_at=datetime.now(UTC) - timedelta(hours=1))
await magic_link_repo.create(expired)

View file

@ -237,14 +237,3 @@ async def test_roundtrip_preserves_all_fields(user_repo: SQLiteUserRepository) -
assert sorted(fetched.groups) == ["admin", "users"]
assert fetched.created_at == user.created_at
assert fetched.updated_at == user.updated_at
async def test_count_active_admins(user_repo: SQLiteUserRepository) -> None:
await user_repo.create(_make_user(userid="a1", username="admin1", groups=["admin", "users"]))
await user_repo.create(_make_user(userid="a2", username="admin2", groups=["admin"]))
# Inactive admin must not count.
await user_repo.create(_make_user(userid="a3", username="admin3", groups=["admin"], active=False))
# Non-admin must not count.
await user_repo.create(_make_user(userid="u1", username="user1", groups=["users"]))
assert await user_repo.count_active_admins() == 2

View file

@ -292,15 +292,3 @@ class TestFormatValidationErrors:
result = format_validation_errors(exc)
assert "Picture URL must be a valid HTTP or HTTPS URL" in result
assert "Value error" not in result
def test_escapes_user_input_in_error_message(self) -> None:
# A validation error message that echoes user input (e.g. an invalid
# group name) must not emit raw HTML — otherwise it is reflected XSS.
payload = "<img src=x onerror=alert(1)>"
try:
GroupListInput(groups=payload)
except ValidationError as exc:
result = format_validation_errors(exc)
assert payload not in result
assert "&lt;img src=x onerror=alert(1)&gt;" in result
assert "<img" not in result