Compare commits
15 commits
2fc2bdcabb
...
baef5e0e2e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
baef5e0e2e | ||
|
|
efb265a68b | ||
|
|
aedb451128 | ||
|
|
e54764cda9 | ||
|
|
1bb76899a5 | ||
|
|
407db57279 | ||
|
|
7c4dbf2cd9 | ||
|
|
71a7c23bdd | ||
|
|
faeecaed59 | ||
|
|
e4eb539e3f | ||
|
|
91a2277664 | ||
|
|
cdde3e3754 | ||
|
|
c52778326e | ||
|
|
437ad59658 | ||
|
|
3cbae6255b |
32 changed files with 812 additions and 125 deletions
|
|
@ -11,6 +11,15 @@ from porchlight.validation import GroupListInput, ProfileUpdate, UsernameInput,
|
|||
|
||||
router = APIRouter(prefix="/admin", tags=["admin"])
|
||||
|
||||
ADMIN_GROUP = "admin"
|
||||
|
||||
|
||||
async def _is_last_active_admin(request: Request, user: User) -> bool:
|
||||
"""True if removing this user's admin access would leave zero active admins."""
|
||||
if not user.active or ADMIN_GROUP not in user.groups:
|
||||
return False
|
||||
return await request.app.state.user_repo.count_active_admins() <= 1
|
||||
|
||||
|
||||
async def _get_admin_user(request: Request) -> User | None:
|
||||
"""Return the current user if they are an admin, else None."""
|
||||
|
|
@ -198,6 +207,9 @@ async def update_user_groups(
|
|||
if user is None:
|
||||
return HTMLResponse("User not found", status_code=404)
|
||||
|
||||
if ADMIN_GROUP not in validated.group_list and await _is_last_active_admin(request, user):
|
||||
return HTMLResponse('<div role="alert">Cannot remove the last active admin</div>')
|
||||
|
||||
updated = user.model_copy(update={"groups": validated.group_list})
|
||||
await user_repo.update(updated)
|
||||
return HTMLResponse('<div role="status">Groups updated</div>')
|
||||
|
|
@ -241,6 +253,9 @@ async def deactivate_user(request: Request, userid: str) -> Response:
|
|||
if user is None:
|
||||
return HTMLResponse("User not found", status_code=404)
|
||||
|
||||
if await _is_last_active_admin(request, user):
|
||||
return HTMLResponse('<div role="alert">Cannot deactivate the last active admin</div>')
|
||||
|
||||
updated = user.model_copy(update={"active": False})
|
||||
await user_repo.update(updated)
|
||||
return HTMLResponse(
|
||||
|
|
@ -261,10 +276,12 @@ async def delete_user_password(request: Request, userid: str) -> Response:
|
|||
return HTMLResponse("Forbidden", status_code=403)
|
||||
|
||||
cred_repo = request.app.state.credential_repo
|
||||
await cred_repo.delete_password(userid)
|
||||
# Refuse atomically if this is the user's last credential (avoids lockout).
|
||||
deleted = await cred_repo.delete_password_if_not_last(userid)
|
||||
|
||||
# Re-render credentials section
|
||||
webauthn_credentials = await cred_repo.get_webauthn_by_user(userid)
|
||||
password = await cred_repo.get_password_by_user(userid)
|
||||
templates = request.app.state.templates
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
|
|
@ -272,7 +289,8 @@ async def delete_user_password(request: Request, userid: str) -> Response:
|
|||
{
|
||||
"target_user": await request.app.state.user_repo.get_by_userid(userid),
|
||||
"webauthn_credentials": webauthn_credentials,
|
||||
"has_password": False,
|
||||
"has_password": password is not None,
|
||||
"error": None if deleted else "Cannot remove the user's last credential",
|
||||
},
|
||||
)
|
||||
|
||||
|
|
@ -289,7 +307,8 @@ async def delete_user_webauthn(request: Request, userid: str, credential_id_b64:
|
|||
padded = credential_id_b64 + "=" * (-len(credential_id_b64) % 4)
|
||||
credential_id = urlsafe_b64decode(padded)
|
||||
cred_repo = request.app.state.credential_repo
|
||||
await cred_repo.delete_webauthn(userid, credential_id)
|
||||
# Refuse atomically if this is the user's last credential (avoids lockout).
|
||||
deleted = await cred_repo.delete_webauthn_if_not_last(userid, credential_id)
|
||||
|
||||
# Re-render credentials section
|
||||
webauthn_credentials = await cred_repo.get_webauthn_by_user(userid)
|
||||
|
|
@ -302,6 +321,7 @@ async def delete_user_webauthn(request: Request, userid: str, credential_id_b64:
|
|||
"target_user": await request.app.state.user_repo.get_by_userid(userid),
|
||||
"webauthn_credentials": webauthn_credentials,
|
||||
"has_password": password is not None,
|
||||
"error": None if deleted else "Cannot remove the user's last credential",
|
||||
},
|
||||
)
|
||||
|
||||
|
|
@ -345,6 +365,10 @@ async def delete_user(request: Request, userid: str) -> Response:
|
|||
return HTMLResponse('<div role="alert">Cannot delete your own account</div>')
|
||||
|
||||
user_repo = request.app.state.user_repo
|
||||
target = await user_repo.get_by_userid(userid)
|
||||
if target is not None and await _is_last_active_admin(request, target):
|
||||
return HTMLResponse('<div role="alert">Cannot delete the last active admin</div>')
|
||||
|
||||
deleted = await user_repo.delete(userid)
|
||||
if not deleted:
|
||||
return HTMLResponse("User not found", status_code=404)
|
||||
|
|
|
|||
|
|
@ -23,6 +23,21 @@ def _login_redirect_target(request: Request) -> str:
|
|||
return "/manage/credentials"
|
||||
|
||||
|
||||
def _establish_authenticated_session(request: Request, user: User) -> None:
|
||||
"""Reset the session before recording the authenticated identity.
|
||||
|
||||
Clearing first defeats session fixation: any values an attacker planted in
|
||||
the pre-auth session are dropped and the session cookie is reissued. A
|
||||
pending OIDC authorization request is the only pre-auth state worth keeping.
|
||||
"""
|
||||
pending_oidc = request.session.get("oidc_auth_request")
|
||||
request.session.clear()
|
||||
if pending_oidc is not None:
|
||||
request.session["oidc_auth_request"] = pending_oidc
|
||||
request.session["userid"] = user.userid
|
||||
request.session["username"] = user.username
|
||||
|
||||
|
||||
@router.get("/login", response_class=HTMLResponse)
|
||||
async def login_page(request: Request) -> HTMLResponse:
|
||||
templates = request.app.state.templates
|
||||
|
|
@ -56,8 +71,7 @@ async def login_password(
|
|||
if not user.active:
|
||||
return HTMLResponse(error_html)
|
||||
|
||||
request.session["userid"] = user.userid
|
||||
request.session["username"] = user.username
|
||||
_establish_authenticated_session(request, user)
|
||||
|
||||
response = Response()
|
||||
response.headers["HX-Redirect"] = _login_redirect_target(request)
|
||||
|
|
@ -72,12 +86,34 @@ async def logout(request: Request) -> Response:
|
|||
return response
|
||||
|
||||
|
||||
@router.get("/register/{token}")
|
||||
@router.get("/register/{token}", response_class=HTMLResponse)
|
||||
async def register_magic_link_page(request: Request, token: str) -> Response:
|
||||
"""Show the registration confirmation page.
|
||||
|
||||
GET is side-effect free: it only validates the token (without consuming it)
|
||||
and renders a form. The token is consumed by the POST below, so that simply
|
||||
visiting the link (email scanners, prefetchers) cannot create a session.
|
||||
"""
|
||||
magic_link_service = request.app.state.magic_link_service
|
||||
link = await magic_link_service.validate(token)
|
||||
if link is None:
|
||||
return HTMLResponse("<p>Invalid or expired registration link.</p>", status_code=400)
|
||||
|
||||
templates = request.app.state.templates
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"register.html",
|
||||
{"token": token, "username": link.username},
|
||||
)
|
||||
|
||||
|
||||
@router.post("/register/{token}")
|
||||
async def register_magic_link(request: Request, token: str) -> Response:
|
||||
magic_link_service = request.app.state.magic_link_service
|
||||
user_repo = request.app.state.user_repo
|
||||
|
||||
link = await magic_link_service.validate(token)
|
||||
# Atomically validate and consume the token (single-use, no replay race).
|
||||
link = await magic_link_service.consume(token)
|
||||
if link is None:
|
||||
return HTMLResponse("<p>Invalid or expired registration link.</p>", status_code=400)
|
||||
|
||||
|
|
@ -85,16 +121,25 @@ async def register_magic_link(request: Request, token: str) -> Response:
|
|||
if existing_user is not None:
|
||||
if not existing_user.active:
|
||||
return HTMLResponse("<p>This account has been deactivated.</p>", status_code=400)
|
||||
# An invite link is for account setup, not authentication. If the
|
||||
# account already has credentials, refuse to establish a session —
|
||||
# otherwise a re-invite would be a passwordless login. Account
|
||||
# recovery must go through a separate, explicitly authenticated flow.
|
||||
cred_repo = request.app.state.credential_repo
|
||||
has_password = await cred_repo.get_password_by_user(existing_user.userid) is not None
|
||||
has_webauthn = bool(await cred_repo.get_webauthn_by_user(existing_user.userid))
|
||||
if has_password or has_webauthn:
|
||||
return HTMLResponse(
|
||||
"<p>This account is already set up. Please sign in instead.</p>",
|
||||
status_code=400,
|
||||
)
|
||||
user = existing_user
|
||||
else:
|
||||
userid = await generate_unique_userid(user_repo)
|
||||
user = User(userid=userid, username=link.username, groups=["users"])
|
||||
await user_repo.create(user)
|
||||
|
||||
await magic_link_service.mark_used(token)
|
||||
|
||||
request.session["userid"] = user.userid
|
||||
request.session["username"] = user.username
|
||||
_establish_authenticated_session(request, user)
|
||||
|
||||
return RedirectResponse("/manage/credentials?setup=1", status_code=303)
|
||||
|
||||
|
|
@ -156,7 +201,6 @@ async def login_webauthn_complete(request: Request) -> Response:
|
|||
if user is None or not user.active:
|
||||
return JSONResponse({"error": "Authentication failed"}, status_code=400)
|
||||
|
||||
request.session["userid"] = user.userid
|
||||
request.session["username"] = user.username
|
||||
_establish_authenticated_session(request, user)
|
||||
|
||||
return JSONResponse({"redirect": _login_redirect_target(request)})
|
||||
|
|
|
|||
|
|
@ -54,6 +54,10 @@ class Settings(BaseSettings):
|
|||
|
||||
# Rate limiting (disable for e2e/load tests that authenticate repeatedly)
|
||||
rate_limit_enabled: bool = True
|
||||
# Number of trusted reverse proxies in front of the app. When > 0, the
|
||||
# client IP for rate limiting is taken from X-Forwarded-For, counting this
|
||||
# many hops from the right. Keep 0 unless deployed behind a known proxy.
|
||||
trusted_proxy_count: int = 0
|
||||
|
||||
# Signing keys
|
||||
signing_key_path: str = "data/keys"
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
import hashlib
|
||||
import hmac
|
||||
import secrets
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
|
|
@ -6,11 +8,23 @@ from porchlight.store.protocols import MagicLinkRepository
|
|||
|
||||
|
||||
class MagicLinkService:
|
||||
"""Magic link token lifecycle management."""
|
||||
"""Magic link token lifecycle management.
|
||||
|
||||
def __init__(self, repo: MagicLinkRepository, ttl: int = 86400) -> None:
|
||||
The raw token is only ever exposed to the user (in the registration URL).
|
||||
Only a hash of the token is persisted, so a database disclosure does not
|
||||
yield usable tokens. A pepper, when configured, keys the hash (HMAC);
|
||||
otherwise a plain SHA-256 of the high-entropy token is stored.
|
||||
"""
|
||||
|
||||
def __init__(self, repo: MagicLinkRepository, ttl: int = 86400, pepper: str | None = None) -> None:
|
||||
self._repo = repo
|
||||
self._ttl = ttl
|
||||
self._pepper = pepper
|
||||
|
||||
def _hash_token(self, token: str) -> str:
|
||||
if self._pepper:
|
||||
return hmac.new(self._pepper.encode(), token.encode(), hashlib.sha256).hexdigest()
|
||||
return hashlib.sha256(token.encode()).hexdigest()
|
||||
|
||||
async def create(
|
||||
self,
|
||||
|
|
@ -18,34 +32,52 @@ class MagicLinkService:
|
|||
created_by: str | None = None,
|
||||
note: str | None = None,
|
||||
) -> MagicLink:
|
||||
"""Generate and store a new magic link for the given username."""
|
||||
"""Generate and store a new magic link for the given username.
|
||||
|
||||
Persists only the hashed token; the returned MagicLink carries the raw
|
||||
token so the caller can build the registration URL.
|
||||
"""
|
||||
token = secrets.token_urlsafe(32)
|
||||
expires_at = datetime.now(UTC) + timedelta(seconds=self._ttl)
|
||||
link = MagicLink(
|
||||
token=token,
|
||||
stored = MagicLink(
|
||||
token=self._hash_token(token),
|
||||
username=username,
|
||||
expires_at=expires_at,
|
||||
created_by=created_by,
|
||||
note=note,
|
||||
)
|
||||
return await self._repo.create(link)
|
||||
await self._repo.create(stored)
|
||||
return stored.model_copy(update={"token": token})
|
||||
|
||||
async def validate(self, token: str) -> MagicLink | None:
|
||||
"""Validate a magic link token.
|
||||
|
||||
Returns the MagicLink if it exists, is not used, and has not expired.
|
||||
Returns None otherwise.
|
||||
Returns the MagicLink (with the raw token re-attached) if it exists, is
|
||||
not used, and has not expired. Returns None otherwise.
|
||||
"""
|
||||
link = await self._repo.get_by_token(token)
|
||||
link = await self._repo.get_by_token(self._hash_token(token))
|
||||
if link is None or link.used:
|
||||
return None
|
||||
if link.expires_at < datetime.now(UTC):
|
||||
return None
|
||||
return link
|
||||
return link.model_copy(update={"token": token})
|
||||
|
||||
async def mark_used(self, token: str) -> bool:
|
||||
"""Mark a magic link as used. Returns True if found and marked."""
|
||||
return await self._repo.mark_used(token)
|
||||
return await self._repo.mark_used(self._hash_token(token))
|
||||
|
||||
async def consume(self, token: str) -> MagicLink | None:
|
||||
"""Atomically validate and consume a token in one step.
|
||||
|
||||
Prefer this over validate()+mark_used(): it closes the replay race
|
||||
where two concurrent requests could both pass validation before either
|
||||
marks the token used. Returns the link (raw token re-attached) on
|
||||
success, else None.
|
||||
"""
|
||||
link = await self._repo.consume(self._hash_token(token))
|
||||
if link is None:
|
||||
return None
|
||||
return link.model_copy(update={"token": token})
|
||||
|
||||
async def cleanup_expired(self) -> int:
|
||||
"""Delete expired unused links. Returns count deleted."""
|
||||
|
|
|
|||
|
|
@ -13,13 +13,6 @@ from porchlight.validation import PasswordChange, PasswordSet, ProfileUpdate, fo
|
|||
router = APIRouter(prefix="/manage", tags=["manage"])
|
||||
|
||||
|
||||
async def _count_credentials(cred_repo: object, userid: str) -> int:
|
||||
"""Count total credentials (password + webauthn) for a user."""
|
||||
webauthn = await cred_repo.get_webauthn_by_user(userid) # type: ignore[union-attr]
|
||||
password = await cred_repo.get_password_by_user(userid) # type: ignore[union-attr]
|
||||
return len(webauthn) + (1 if password else 0)
|
||||
|
||||
|
||||
@router.get("/credentials", response_class=HTMLResponse)
|
||||
async def credentials_page(request: Request) -> Response:
|
||||
session_user = get_session_user(request)
|
||||
|
|
@ -107,11 +100,9 @@ async def delete_password(request: Request) -> Response:
|
|||
userid, _username = session_user
|
||||
cred_repo = request.app.state.credential_repo
|
||||
|
||||
count = await _count_credentials(cred_repo, userid)
|
||||
if count <= 1:
|
||||
# Atomic: refuses if this is the user's last credential (not raceable).
|
||||
if not await cred_repo.delete_password_if_not_last(userid):
|
||||
return HTMLResponse('<div role="alert">Cannot remove your last credential</div>')
|
||||
|
||||
await cred_repo.delete_password(userid)
|
||||
return HTMLResponse('<div role="status">Password removed</div>')
|
||||
|
||||
|
||||
|
|
@ -182,11 +173,9 @@ async def delete_webauthn(request: Request, credential_id_b64: str) -> Response:
|
|||
padded = credential_id_b64 + "=" * (-len(credential_id_b64) % 4)
|
||||
credential_id = urlsafe_b64decode(padded)
|
||||
|
||||
count = await _count_credentials(cred_repo, userid)
|
||||
if count <= 1:
|
||||
# Atomic: refuses if this is the user's last credential (not raceable).
|
||||
if not await cred_repo.delete_webauthn_if_not_last(userid, credential_id):
|
||||
return HTMLResponse('<div role="alert">Cannot remove your last credential</div>')
|
||||
|
||||
await cred_repo.delete_webauthn(userid, credential_id)
|
||||
return HTMLResponse('<div role="status">Security key removed</div>')
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import html
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
from urllib.parse import urlencode
|
||||
|
|
@ -14,6 +15,15 @@ from porchlight.oidc.claims import PorchlightUserInfo, user_to_claims
|
|||
|
||||
router = APIRouter(tags=["oidc"])
|
||||
|
||||
|
||||
def _error_page(message: object, status_code: int = 400, title: str = "Error") -> HTMLResponse:
|
||||
"""Render an error page, escaping the (possibly request-derived) message."""
|
||||
return HTMLResponse(
|
||||
f"<h1>{html.escape(title)}</h1><p>{html.escape(str(message))}</p>",
|
||||
status_code=status_code,
|
||||
)
|
||||
|
||||
|
||||
SCOPE_DESCRIPTIONS: dict[str, str] = {
|
||||
"openid": "Sign you in (required)",
|
||||
"profile": "Your name and profile information",
|
||||
|
|
@ -59,11 +69,11 @@ async def authorization(request: Request) -> Response:
|
|||
try:
|
||||
parsed = endpoint.parse_request(query_params)
|
||||
except Exception as exc:
|
||||
return HTMLResponse(f"<h1>Invalid Request</h1><p>{exc}</p>", status_code=400)
|
||||
return _error_page(exc, title="Invalid Request")
|
||||
|
||||
if "error" in parsed:
|
||||
error_desc = parsed.get("error_description", parsed["error"])
|
||||
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
|
||||
return _error_page(error_desc)
|
||||
|
||||
# Check if user is authenticated
|
||||
userid = request.session.get("userid")
|
||||
|
|
@ -95,11 +105,11 @@ async def authorization_complete(request: Request) -> Response:
|
|||
try:
|
||||
parsed = endpoint.parse_request(auth_request_params)
|
||||
except Exception as exc:
|
||||
return HTMLResponse(f"<h1>Error</h1><p>{exc}</p>", status_code=400)
|
||||
return _error_page(exc)
|
||||
|
||||
if "error" in parsed:
|
||||
error_desc = parsed.get("error_description", parsed["error"])
|
||||
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
|
||||
return _error_page(error_desc)
|
||||
|
||||
return await _check_consent_or_complete(
|
||||
request, oidc_server, endpoint, parsed, userid, username, auth_request_params
|
||||
|
|
@ -172,7 +182,7 @@ async def _complete_authorization( # noqa: PLR0913
|
|||
if "error" in result.get("response_args", {}):
|
||||
response_args = result["response_args"]
|
||||
error_desc = response_args.get("error_description", response_args["error"])
|
||||
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
|
||||
return _error_page(error_desc)
|
||||
|
||||
# Build redirect URL
|
||||
response_args = result.get("response_args", {})
|
||||
|
|
@ -338,8 +348,10 @@ async def consent_submit(request: Request) -> Response:
|
|||
return RedirectResponse(f"{redirect_uri}?{params}", status_code=303)
|
||||
return HTMLResponse("<h1>Error</h1><p>Invalid action</p>", status_code=400)
|
||||
|
||||
# Allow — collect approved scopes
|
||||
approved_scopes: list[str] = [str(s) for s in form.getlist("scope")]
|
||||
# Allow — collect approved scopes, rejecting anything outside the
|
||||
# originally requested set (a forged form must not escalate scope).
|
||||
requested_scopes = set(auth_params.get("scope", "openid").split())
|
||||
approved_scopes: list[str] = [str(s) for s in form.getlist("scope") if str(s) in requested_scopes]
|
||||
if "openid" not in approved_scopes:
|
||||
approved_scopes = ["openid", *list(approved_scopes)]
|
||||
|
||||
|
|
@ -359,6 +371,6 @@ async def consent_submit(request: Request) -> Response:
|
|||
if "error" in parsed:
|
||||
raise ValueError(parsed.get("error_description", parsed["error"]))
|
||||
except Exception as exc:
|
||||
return HTMLResponse(f"<h1>Error</h1><p>{exc}</p>", status_code=400)
|
||||
return _error_page(exc)
|
||||
|
||||
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username)
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
from pathlib import Path
|
||||
|
||||
from idpyoidc.server import Server
|
||||
from idpyoidc.server.oauth2.add_on.pkce import CC_METHOD
|
||||
|
||||
from porchlight.config import Settings
|
||||
from porchlight.oidc.claims import PorchlightUserInfo
|
||||
|
|
@ -126,6 +127,20 @@ def _build_server_config(settings: Settings) -> dict:
|
|||
"phone": ["phone_number", "phone_number_verified"],
|
||||
},
|
||||
"authentication": {},
|
||||
# PKCE: advertise and verify S256 (only). Not "essential", so existing
|
||||
# relying parties that do not yet send a code_challenge keep working;
|
||||
# but any RP that does use PKCE must use S256 and is verified at the
|
||||
# token endpoint. Set essential=True (or per-client pkce_essential) to
|
||||
# require it once all clients have migrated.
|
||||
"add_on": {
|
||||
"pkce": {
|
||||
"function": "idpyoidc.server.oauth2.add_on.pkce.add_support",
|
||||
"kwargs": {
|
||||
"essential": False,
|
||||
"code_challenge_methods": {"S256": CC_METHOD["S256"]},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,25 @@
|
|||
from slowapi import Limiter
|
||||
from slowapi.util import get_remote_address
|
||||
from starlette.requests import Request
|
||||
|
||||
limiter = Limiter(key_func=get_remote_address)
|
||||
|
||||
def _client_identifier(request: Request) -> str:
|
||||
"""Rate-limit key: the client IP, proxy-aware.
|
||||
|
||||
By default the direct peer address is used. When ``trusted_proxy_count`` is
|
||||
configured (the number of trusted reverse proxies in front of the app), the
|
||||
client address is taken from the ``X-Forwarded-For`` chain instead, counting
|
||||
that many hops back from the right. The header is ignored unless proxies are
|
||||
trusted, so it cannot be spoofed in a direct-connection deployment.
|
||||
"""
|
||||
settings = getattr(getattr(request.app, "state", None), "settings", None)
|
||||
hops: int = getattr(settings, "trusted_proxy_count", 0) or 0
|
||||
if hops > 0:
|
||||
forwarded = request.headers.get("x-forwarded-for", "")
|
||||
parts = [p.strip() for p in forwarded.split(",") if p.strip()]
|
||||
if len(parts) >= hops:
|
||||
return parts[-hops]
|
||||
return get_remote_address(request)
|
||||
|
||||
|
||||
limiter = Limiter(key_func=_client_identifier)
|
||||
|
|
|
|||
|
|
@ -205,6 +205,10 @@ main {
|
|||
line-height: 1;
|
||||
}
|
||||
|
||||
#user-table-container {
|
||||
overflow-x: auto;
|
||||
}
|
||||
|
||||
.admin-table {
|
||||
width: 100%;
|
||||
border-collapse: collapse;
|
||||
|
|
@ -214,7 +218,7 @@ main {
|
|||
.admin-table th,
|
||||
.admin-table td {
|
||||
text-align: left;
|
||||
padding: var(--sp-2) var(--sp-3);
|
||||
padding: var(--sp-2);
|
||||
border-bottom: 1px solid var(--border);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@ class UserRepository(Protocol):
|
|||
|
||||
async def count_users(self, query: str | None = None) -> int: ...
|
||||
|
||||
async def count_active_admins(self) -> int: ...
|
||||
|
||||
async def delete(self, userid: str) -> bool: ...
|
||||
|
||||
|
||||
|
|
@ -46,6 +48,10 @@ class CredentialRepository(Protocol):
|
|||
|
||||
async def delete_password(self, user_id: str) -> bool: ...
|
||||
|
||||
async def delete_password_if_not_last(self, user_id: str) -> bool: ...
|
||||
|
||||
async def delete_webauthn_if_not_last(self, user_id: str, credential_id: bytes) -> bool: ...
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class MagicLinkRepository(Protocol):
|
||||
|
|
@ -55,6 +61,8 @@ class MagicLinkRepository(Protocol):
|
|||
|
||||
async def mark_used(self, token: str) -> bool: ...
|
||||
|
||||
async def consume(self, token: str) -> MagicLink | None: ...
|
||||
|
||||
async def delete_expired(self) -> int: ...
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -163,6 +163,17 @@ class SQLiteUserRepository:
|
|||
row = await cursor.fetchone()
|
||||
return row[0] if row else 0
|
||||
|
||||
async def count_active_admins(self) -> int:
|
||||
async with self._db.execute(
|
||||
"""
|
||||
SELECT COUNT(*) FROM users u
|
||||
JOIN user_groups g ON g.userid = u.userid
|
||||
WHERE u.active = 1 AND g.group_name = 'admin'
|
||||
"""
|
||||
) as cursor:
|
||||
row = await cursor.fetchone()
|
||||
return row[0] if row else 0
|
||||
|
||||
async def delete(self, userid: str) -> bool:
|
||||
cursor = await self._db.execute("DELETE FROM users WHERE userid = ?", (userid,))
|
||||
await self._db.commit()
|
||||
|
|
@ -266,6 +277,41 @@ class SQLiteCredentialRepository:
|
|||
await self._db.commit()
|
||||
return cursor.rowcount > 0
|
||||
|
||||
async def delete_password_if_not_last(self, user_id: str) -> bool:
|
||||
"""Delete the password credential only if it is not the user's last
|
||||
credential. The count and delete happen in one atomic statement, so it
|
||||
is not raceable. Returns True if a row was deleted."""
|
||||
cursor = await self._db.execute(
|
||||
"""
|
||||
DELETE FROM password_credentials
|
||||
WHERE user_id = ?
|
||||
AND (
|
||||
(SELECT COUNT(*) FROM password_credentials WHERE user_id = ?)
|
||||
+ (SELECT COUNT(*) FROM webauthn_credentials WHERE user_id = ?)
|
||||
) > 1
|
||||
""",
|
||||
(user_id, user_id, user_id),
|
||||
)
|
||||
await self._db.commit()
|
||||
return cursor.rowcount > 0
|
||||
|
||||
async def delete_webauthn_if_not_last(self, user_id: str, credential_id: bytes) -> bool:
|
||||
"""Delete a WebAuthn credential only if it is not the user's last
|
||||
credential, atomically. Returns True if a row was deleted."""
|
||||
cursor = await self._db.execute(
|
||||
"""
|
||||
DELETE FROM webauthn_credentials
|
||||
WHERE user_id = ? AND credential_id = ?
|
||||
AND (
|
||||
(SELECT COUNT(*) FROM password_credentials WHERE user_id = ?)
|
||||
+ (SELECT COUNT(*) FROM webauthn_credentials WHERE user_id = ?)
|
||||
) > 1
|
||||
""",
|
||||
(user_id, credential_id, user_id, user_id),
|
||||
)
|
||||
await self._db.commit()
|
||||
return cursor.rowcount > 0
|
||||
|
||||
|
||||
class SQLiteMagicLinkRepository:
|
||||
def __init__(self, db: aiosqlite.Connection) -> None:
|
||||
|
|
@ -311,6 +357,25 @@ class SQLiteMagicLinkRepository:
|
|||
await self._db.commit()
|
||||
return cursor.rowcount > 0
|
||||
|
||||
async def consume(self, token: str) -> MagicLink | None:
|
||||
"""Atomically validate-and-mark a token as used.
|
||||
|
||||
The conditional UPDATE is the single point of decision, so two
|
||||
concurrent requests cannot both consume the same token. Returns the
|
||||
link if this call won the race (unused and unexpired), else None.
|
||||
"""
|
||||
now = datetime.now(UTC).isoformat()
|
||||
cursor = await self._db.execute(
|
||||
"UPDATE magic_links SET used = 1 WHERE token = ? AND used = 0 AND expires_at > ?",
|
||||
(token, now),
|
||||
)
|
||||
await self._db.commit()
|
||||
if cursor.rowcount == 0:
|
||||
return None
|
||||
async with self._db.execute("SELECT * FROM magic_links WHERE token = ?", (token,)) as c:
|
||||
row = await c.fetchone()
|
||||
return self._row_to_magic_link(row) if row is not None else None
|
||||
|
||||
async def delete_expired(self) -> int:
|
||||
now = datetime.now(UTC).isoformat()
|
||||
cursor = await self._db.execute("DELETE FROM magic_links WHERE expires_at < ? AND used = 0", (now,))
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
{% if error %}<div role="alert">{{ error }}</div>{% endif %}
|
||||
<h3>Password</h3>
|
||||
{% if has_password %}
|
||||
<p>Password is set.
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
<nav class="admin-nav" aria-label="Administration">
|
||||
<span class="admin-badge">Admin</span>
|
||||
<a href="/admin/users" {% if active_page == "users" %}aria-current="page"{% endif %}>Users</a>
|
||||
<a href="/manage/profile">Profile</a>
|
||||
<button class="nav-logout" hx-post="/logout">Log out</button>
|
||||
</nav>
|
||||
{% block admin_content %}{% endblock %}
|
||||
|
|
|
|||
14
src/porchlight/templates/register.html
Normal file
14
src/porchlight/templates/register.html
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
{% extends "base.html" %}
|
||||
|
||||
{% block title %}Set up your account — Porchlight{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<h1>Set up your account</h1>
|
||||
|
||||
<p>You're about to set up the account for <strong>{{ username }}</strong>.</p>
|
||||
|
||||
<form method="post" action="/register/{{ token }}">
|
||||
<input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">
|
||||
<button type="submit">Continue</button>
|
||||
</form>
|
||||
{% endblock %}
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
import html
|
||||
import re
|
||||
from typing import Annotated
|
||||
from urllib.parse import urlparse
|
||||
|
|
@ -166,7 +167,9 @@ def format_validation_errors(exc: ValidationError) -> str:
|
|||
display_msg = f"{label}: {raw}"
|
||||
else:
|
||||
display_msg = f"{label}: {msg}"
|
||||
messages.append(display_msg)
|
||||
# Escape: messages can echo user input (e.g. an invalid group name),
|
||||
# and the result is interpolated into HTML below.
|
||||
messages.append(html.escape(display_msg))
|
||||
|
||||
if len(messages) == 1:
|
||||
return f'<div role="alert">{messages[0]}</div>'
|
||||
|
|
|
|||
|
|
@ -9,7 +9,9 @@ test.describe('Full user journey', () => {
|
|||
expect(fixtures.register_token).toBeTruthy();
|
||||
|
||||
// ---- Step 1: Register via magic link ----
|
||||
// GET shows a confirmation page; submitting it (POST) consumes the token.
|
||||
await page.goto(`/register/${fixtures.register_token}`);
|
||||
await page.click('button[type="submit"]');
|
||||
|
||||
// Should redirect to /manage/credentials?setup=1
|
||||
await page.waitForURL('**/manage/credentials?setup=1', { timeout: 5000 });
|
||||
|
|
|
|||
|
|
@ -328,11 +328,15 @@ async def test_delete_password_credential(client: AsyncClient) -> None:
|
|||
await _login(client)
|
||||
target = await _create_target_user(client, username="bob")
|
||||
|
||||
# Create a password credential for the target user
|
||||
# Create a password credential for the target user, plus a second
|
||||
# credential so the password is not the last one (last-credential guard).
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
cred_repo = app.state.credential_repo
|
||||
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||
await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("bobpass")))
|
||||
await cred_repo.create_webauthn(
|
||||
WebAuthnCredential(user_id=target.userid, credential_id=b"\xaa\xbb", public_key=b"\x00" * 32)
|
||||
)
|
||||
|
||||
token = await get_csrf_token(client)
|
||||
response = await client.request(
|
||||
|
|
@ -351,7 +355,8 @@ async def test_delete_webauthn_credential(client: AsyncClient) -> None:
|
|||
await _login(client)
|
||||
target = await _create_target_user(client, username="bob")
|
||||
|
||||
# Create a webauthn credential for the target user
|
||||
# Create a webauthn credential for the target user, plus a second
|
||||
# credential so the key is not the last one (last-credential guard).
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
cred_repo = app.state.credential_repo
|
||||
credential_id = b"\x01\x02\x03\x04\x05\x06\x07\x08"
|
||||
|
|
@ -364,6 +369,8 @@ async def test_delete_webauthn_credential(client: AsyncClient) -> None:
|
|||
device_name="test-key",
|
||||
)
|
||||
)
|
||||
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||
await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("bobpass")))
|
||||
|
||||
# URL uses base64url without padding
|
||||
credential_id_b64 = urlsafe_b64encode(credential_id).decode().rstrip("=")
|
||||
|
|
@ -434,3 +441,92 @@ async def test_reinvite_user_404_for_nonexistent(client: AsyncClient) -> None:
|
|||
response = await client.post("/admin/users/nonexistent-id/invite", headers={"X-CSRF-Token": token})
|
||||
assert response.status_code == 404
|
||||
assert "not found" in response.text.lower()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_cannot_delete_users_last_credential(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
target = await _create_target_user(client, userid="lonecred-01", username="lonecred")
|
||||
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
cred_repo = app.state.credential_repo
|
||||
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||
await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("x")))
|
||||
|
||||
token = await get_csrf_token(client)
|
||||
res = await client.request(
|
||||
"DELETE",
|
||||
f"/admin/users/{target.userid}/credentials/password",
|
||||
headers={"X-CSRF-Token": token, "HX-Request": "true"},
|
||||
)
|
||||
|
||||
# The last credential must survive, and the admin is told why.
|
||||
assert await cred_repo.get_password_by_user(target.userid) is not None
|
||||
assert "last credential" in res.text.lower()
|
||||
|
||||
|
||||
async def _make_sole_admin_target(client: AsyncClient) -> User:
|
||||
"""Create a second admin, then return it as the target. The logged-in
|
||||
admin from _login plus this one means 2 admins; callers deactivate the
|
||||
_login admin first if they need this to be the *last* admin."""
|
||||
return await _create_target_user(client, userid="admin2-01", username="admin2", groups=["admin"])
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cannot_deactivate_last_active_admin(client: AsyncClient) -> None:
|
||||
await _login(client) # the only admin
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
admin = await app.state.user_repo.get_by_username("admin")
|
||||
|
||||
token = await get_csrf_token(client)
|
||||
res = await client.post(
|
||||
f"/admin/users/{admin.userid}/deactivate",
|
||||
headers={"X-CSRF-Token": token, "HX-Request": "true"},
|
||||
)
|
||||
still = await app.state.user_repo.get_by_userid(admin.userid)
|
||||
assert still.active is True
|
||||
msg = res.text.lower()
|
||||
assert "last" in msg
|
||||
assert "admin" in msg
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cannot_remove_admin_group_from_last_admin(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
admin = await app.state.user_repo.get_by_username("admin")
|
||||
|
||||
token = await get_csrf_token(client)
|
||||
res = await client.post(
|
||||
f"/admin/users/{admin.userid}/groups",
|
||||
data={"groups": "users"}, # drops "admin"
|
||||
headers={"X-CSRF-Token": token, "HX-Request": "true"},
|
||||
)
|
||||
still = await app.state.user_repo.get_by_userid(admin.userid)
|
||||
assert "admin" in still.groups
|
||||
msg = res.text.lower()
|
||||
assert "last" in msg
|
||||
assert "admin" in msg
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cannot_delete_last_active_admin(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
# A second, separate admin to delete (not self — self-delete is blocked separately).
|
||||
target = await _create_target_user(client, userid="otheradmin-01", username="otheradmin", groups=["admin"])
|
||||
# Deactivate the logged-in admin's peer? Instead make target the only ACTIVE admin
|
||||
# by deactivating the _login admin via repo, so deleting target would orphan admins.
|
||||
login_admin = await app.state.user_repo.get_by_username("admin")
|
||||
await app.state.user_repo.update(login_admin.model_copy(update={"active": False}))
|
||||
|
||||
token = await get_csrf_token(client)
|
||||
res = await client.request(
|
||||
"DELETE",
|
||||
f"/admin/users/{target.userid}",
|
||||
headers={"X-CSRF-Token": token, "HX-Request": "true"},
|
||||
)
|
||||
assert await app.state.user_repo.get_by_userid(target.userid) is not None
|
||||
msg = res.text.lower()
|
||||
assert "last" in msg
|
||||
assert "admin" in msg
|
||||
|
|
|
|||
|
|
@ -41,6 +41,8 @@ async def _setup_admin_and_target(client: AsyncClient) -> tuple[str, str]:
|
|||
data={"username": "admin_g", "password": "AdminPass123!"},
|
||||
headers={"HX-Request": "true", "X-CSRF-Token": token},
|
||||
)
|
||||
# Login resets the session (fixation defense); fetch a fresh CSRF token.
|
||||
token = await get_csrf_token(client)
|
||||
return token, target.userid
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -32,7 +32,8 @@ async def _login_admin(client: AsyncClient) -> str:
|
|||
data={"username": "admin", "password": "AdminPass123!"},
|
||||
headers={"HX-Request": "true", "X-CSRF-Token": token},
|
||||
)
|
||||
return token
|
||||
# Login resets the session (fixation defense); fetch a fresh CSRF token.
|
||||
return await get_csrf_token(client)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
|
|||
42
tests/test_auth_routes/test_login_session.py
Normal file
42
tests/test_auth_routes/test_login_session.py
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
from datetime import UTC, datetime
|
||||
|
||||
from httpx import AsyncClient
|
||||
|
||||
from porchlight.authn.password import PasswordHasher, PasswordService
|
||||
from porchlight.models import PasswordCredential, User
|
||||
from tests.conftest import get_csrf_token
|
||||
|
||||
|
||||
async def _create_active_user(client: AsyncClient) -> None:
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
user = User(
|
||||
userid="login-01",
|
||||
username="loginuser",
|
||||
created_at=datetime.now(UTC),
|
||||
updated_at=datetime.now(UTC),
|
||||
)
|
||||
await app.state.user_repo.create(user)
|
||||
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||
await app.state.credential_repo.create_password(
|
||||
PasswordCredential(user_id=user.userid, password_hash=svc.hash("password123!Secure"))
|
||||
)
|
||||
|
||||
|
||||
async def test_login_resets_session_to_prevent_fixation(client: AsyncClient) -> None:
|
||||
"""Logging in must reset the pre-auth session so an attacker-fixed session
|
||||
cannot become authenticated. We observe this via the CSRF token (stored in
|
||||
the session) changing across login."""
|
||||
await _create_active_user(client)
|
||||
|
||||
pre_login_token = await get_csrf_token(client)
|
||||
|
||||
res = await client.post(
|
||||
"/login/password",
|
||||
data={"username": "loginuser", "password": "password123!Secure"},
|
||||
headers={"HX-Request": "true", "X-CSRF-Token": pre_login_token},
|
||||
)
|
||||
assert res.status_code == 200
|
||||
assert res.headers.get("HX-Redirect") # logged in
|
||||
|
||||
post_login_token = await get_csrf_token(client)
|
||||
assert post_login_token != pre_login_token
|
||||
|
|
@ -1,8 +1,10 @@
|
|||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
from argon2 import PasswordHasher
|
||||
from httpx import AsyncClient
|
||||
|
||||
from porchlight.models import MagicLink, User
|
||||
from porchlight.authn.password import PasswordService
|
||||
from porchlight.invite.service import MagicLinkService
|
||||
from porchlight.models import PasswordCredential, User
|
||||
from tests.conftest import get_csrf_token
|
||||
|
||||
|
||||
async def test_register_invalid_token_returns_error_page(client: AsyncClient) -> None:
|
||||
|
|
@ -13,42 +15,48 @@ async def test_register_invalid_token_returns_error_page(client: AsyncClient) ->
|
|||
|
||||
async def test_register_expired_token_returns_error_page(client: AsyncClient) -> None:
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
repo = app.state.magic_link_repo
|
||||
await repo.create(
|
||||
MagicLink(
|
||||
token="expired",
|
||||
username="newuser",
|
||||
expires_at=datetime.now(UTC) - timedelta(hours=1),
|
||||
)
|
||||
)
|
||||
# An already-expired link (negative TTL), stored hashed like the real service.
|
||||
expired_service = MagicLinkService(repo=app.state.magic_link_repo, ttl=-3600)
|
||||
link = await expired_service.create(username="newuser")
|
||||
|
||||
res = await client.get("/register/expired", follow_redirects=False)
|
||||
res = await client.get(f"/register/{link.token}", follow_redirects=False)
|
||||
assert res.status_code == 400
|
||||
assert "Invalid or expired" in res.text
|
||||
|
||||
|
||||
async def test_register_valid_token_creates_user_and_redirects(client: AsyncClient) -> None:
|
||||
async def test_register_get_does_not_consume_token(client: AsyncClient) -> None:
|
||||
"""GET is side-effect free: it shows a confirmation form and must not
|
||||
consume the token or create a session."""
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
magic_link_repo = app.state.magic_link_repo
|
||||
magic_link_service = app.state.magic_link_service
|
||||
|
||||
link = await magic_link_service.create(username="newuser")
|
||||
res = await client.get(f"/register/{link.token}", follow_redirects=False)
|
||||
assert res.status_code == 200
|
||||
assert f'action="/register/{link.token}"' in res.text
|
||||
# Token still valid (not consumed by the GET).
|
||||
assert await magic_link_service.validate(link.token) is not None
|
||||
|
||||
|
||||
async def test_register_post_creates_user_and_redirects(client: AsyncClient) -> None:
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
magic_link_service = app.state.magic_link_service
|
||||
user_repo = app.state.user_repo
|
||||
|
||||
await magic_link_repo.create(
|
||||
MagicLink(
|
||||
token="t1",
|
||||
username="newuser",
|
||||
expires_at=datetime.now(UTC) + timedelta(hours=1),
|
||||
)
|
||||
)
|
||||
link = await magic_link_service.create(username="newuser")
|
||||
|
||||
res = await client.get("/register/t1", follow_redirects=False)
|
||||
csrf = await get_csrf_token(client)
|
||||
res = await client.post(
|
||||
f"/register/{link.token}",
|
||||
headers={"X-CSRF-Token": csrf},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert res.status_code in (302, 303)
|
||||
assert "/manage/credentials" in res.headers["location"]
|
||||
assert "setup=1" in res.headers["location"]
|
||||
|
||||
# Token should be marked used
|
||||
link = await magic_link_repo.get_by_token("t1")
|
||||
assert link is not None
|
||||
assert link.used is True
|
||||
# Token should be consumed (no longer valid)
|
||||
assert await magic_link_service.validate(link.token) is None
|
||||
|
||||
# User should exist
|
||||
user = await user_repo.get_by_username("newuser")
|
||||
|
|
@ -56,53 +64,81 @@ async def test_register_valid_token_creates_user_and_redirects(client: AsyncClie
|
|||
assert "users" in user.groups
|
||||
|
||||
|
||||
async def test_register_post_requires_csrf(client: AsyncClient) -> None:
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
magic_link_service = app.state.magic_link_service
|
||||
link = await magic_link_service.create(username="newuser")
|
||||
|
||||
res = await client.post(f"/register/{link.token}", follow_redirects=False)
|
||||
assert res.status_code == 403
|
||||
# Token must not have been consumed by the rejected request.
|
||||
assert await magic_link_service.validate(link.token) is not None
|
||||
|
||||
|
||||
async def test_register_used_token_returns_error(client: AsyncClient) -> None:
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
repo = app.state.magic_link_repo
|
||||
await repo.create(
|
||||
MagicLink(
|
||||
token="used",
|
||||
username="newuser",
|
||||
expires_at=datetime.now(UTC) + timedelta(hours=1),
|
||||
used=True,
|
||||
)
|
||||
)
|
||||
magic_link_service = app.state.magic_link_service
|
||||
link = await magic_link_service.create(username="newuser")
|
||||
await magic_link_service.mark_used(link.token)
|
||||
|
||||
res = await client.get("/register/used", follow_redirects=False)
|
||||
res = await client.get(f"/register/{link.token}", follow_redirects=False)
|
||||
assert res.status_code == 400
|
||||
|
||||
|
||||
async def test_register_existing_user_logs_in_and_redirects(client: AsyncClient) -> None:
|
||||
"""When initial-admin creates a user, the invite link should log them in."""
|
||||
"""When initial-admin creates a user (no credentials yet), the invite link
|
||||
should let them set up their account."""
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
magic_link_repo = app.state.magic_link_repo
|
||||
magic_link_service = app.state.magic_link_service
|
||||
user_repo = app.state.user_repo
|
||||
|
||||
# Pre-create the user (as initial-admin would)
|
||||
# Pre-create the user (as initial-admin would), with no credentials.
|
||||
user = User(userid="lusab-bansen", username="admin", groups=["admin", "users"])
|
||||
await user_repo.create(user)
|
||||
|
||||
# Create invite for the same username
|
||||
await magic_link_repo.create(
|
||||
MagicLink(
|
||||
token="admin-setup",
|
||||
username="admin",
|
||||
expires_at=datetime.now(UTC) + timedelta(hours=1),
|
||||
)
|
||||
)
|
||||
link = await magic_link_service.create(username="admin")
|
||||
|
||||
res = await client.get("/register/admin-setup", follow_redirects=False)
|
||||
csrf = await get_csrf_token(client)
|
||||
res = await client.post(
|
||||
f"/register/{link.token}",
|
||||
headers={"X-CSRF-Token": csrf},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert res.status_code in (302, 303)
|
||||
assert "/manage/credentials" in res.headers["location"]
|
||||
assert "setup=1" in res.headers["location"]
|
||||
|
||||
# Token should be marked used
|
||||
link = await magic_link_repo.get_by_token("admin-setup")
|
||||
assert link is not None
|
||||
assert link.used is True
|
||||
# Token should be consumed
|
||||
assert await magic_link_service.validate(link.token) is None
|
||||
|
||||
# Original user should still exist with original groups
|
||||
existing = await user_repo.get_by_username("admin")
|
||||
assert existing is not None
|
||||
assert existing.userid == "lusab-bansen"
|
||||
assert "admin" in existing.groups
|
||||
|
||||
|
||||
async def test_register_rejects_user_that_already_has_credentials(client: AsyncClient) -> None:
|
||||
"""An invite/re-invite link must not act as a passwordless login for an
|
||||
account that already has credentials. Recovery is a separate flow."""
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
magic_link_service = app.state.magic_link_service
|
||||
user_repo = app.state.user_repo
|
||||
cred_repo = app.state.credential_repo
|
||||
|
||||
user = User(userid="lusab-hascreds", username="hascreds", groups=["users"])
|
||||
await user_repo.create(user)
|
||||
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("existing-pass")))
|
||||
|
||||
link = await magic_link_service.create(username="hascreds")
|
||||
csrf = await get_csrf_token(client)
|
||||
res = await client.post(
|
||||
f"/register/{link.token}",
|
||||
headers={"X-CSRF-Token": csrf},
|
||||
follow_redirects=False,
|
||||
)
|
||||
|
||||
# No passwordless login: rejected, not redirected to setup.
|
||||
assert res.status_code == 400
|
||||
assert "setup=1" not in res.headers.get("location", "")
|
||||
|
|
|
|||
|
|
@ -61,6 +61,11 @@ async def test_inactive_user_cannot_register_magic_link(client: AsyncClient) ->
|
|||
|
||||
link = await magic_link_service.create(username="deactivated", created_by="admin", note="test")
|
||||
|
||||
response = await client.get(f"/register/{link.token}", follow_redirects=False)
|
||||
csrf = await get_csrf_token(client)
|
||||
response = await client.post(
|
||||
f"/register/{link.token}",
|
||||
headers={"X-CSRF-Token": csrf},
|
||||
follow_redirects=False,
|
||||
)
|
||||
|
||||
assert response.status_code == 400 or "deactivated" in response.text.lower() or "Invalid" in response.text
|
||||
|
|
|
|||
|
|
@ -42,6 +42,18 @@ async def test_create_returns_magic_link(service: MagicLinkService) -> None:
|
|||
assert link.expires_at > datetime.now(UTC)
|
||||
|
||||
|
||||
async def test_token_stored_hashed_not_plaintext(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None:
|
||||
# The raw token handed to the user must never be the value persisted in
|
||||
# the store; a DB read must not yield a usable token.
|
||||
link = await service.create(username="alice")
|
||||
raw = link.token
|
||||
assert await repo.get_by_token(raw) is None
|
||||
# ...but the raw token must still validate through the service.
|
||||
validated = await service.validate(raw)
|
||||
assert validated is not None
|
||||
assert validated.username == "alice"
|
||||
|
||||
|
||||
async def test_create_generates_unique_tokens(service: MagicLinkService) -> None:
|
||||
link1 = await service.create(username="alice")
|
||||
link2 = await service.create(username="bob")
|
||||
|
|
@ -90,6 +102,22 @@ async def test_validate_expired_token(service: MagicLinkService, repo: SQLiteMag
|
|||
assert result is None
|
||||
|
||||
|
||||
async def test_consume_validates_and_is_single_use(service: MagicLinkService) -> None:
|
||||
link = await service.create(username="alice")
|
||||
consumed = await service.consume(link.token)
|
||||
assert consumed is not None
|
||||
assert consumed.username == "alice"
|
||||
assert consumed.token == link.token # raw token re-attached
|
||||
# A second consume of the same token fails.
|
||||
assert await service.consume(link.token) is None
|
||||
|
||||
|
||||
async def test_consume_expired_returns_none(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None:
|
||||
expired_service = MagicLinkService(repo=repo, ttl=-1)
|
||||
link = await expired_service.create(username="alice")
|
||||
assert await service.consume(link.token) is None
|
||||
|
||||
|
||||
async def test_mark_used_returns_true(service: MagicLinkService) -> None:
|
||||
link = await service.create(username="alice")
|
||||
result = await service.mark_used(link.token)
|
||||
|
|
|
|||
|
|
@ -2,6 +2,16 @@ import secrets
|
|||
|
||||
from httpx import AsyncClient
|
||||
|
||||
from porchlight.oidc.endpoints import _error_page
|
||||
|
||||
|
||||
def test_error_page_escapes_html() -> None:
|
||||
# OIDC error pages must not interpolate request-derived text as raw HTML.
|
||||
resp = _error_page("<script>alert(1)</script>")
|
||||
body = resp.body.decode()
|
||||
assert "<script>" not in body
|
||||
assert "<script>alert(1)</script>" in body
|
||||
|
||||
|
||||
def _register_test_client(
|
||||
client: AsyncClient,
|
||||
|
|
|
|||
|
|
@ -223,6 +223,30 @@ async def test_partial_consent_filters_scopes(client: AsyncClient) -> None:
|
|||
assert set(consent.scopes) == {"openid", "profile"}
|
||||
|
||||
|
||||
async def test_consent_rejects_scopes_outside_request(client: AsyncClient) -> None:
|
||||
"""A forged consent POST cannot approve scopes that were not requested."""
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
_register_test_rp(app)
|
||||
await _create_test_user(app)
|
||||
|
||||
# Only openid + profile were requested; "email" is forged into the POST.
|
||||
await _login_and_start_auth(client, scope="openid profile")
|
||||
token = await get_csrf_token(client)
|
||||
res = await client.post(
|
||||
"/consent",
|
||||
data={"action": "allow", "scope": ["openid", "profile", "email"]},
|
||||
headers={"X-CSRF-Token": token},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert res.status_code == 303
|
||||
|
||||
consent_repo = app.state.consent_repo
|
||||
consent = await consent_repo.get_consent("lusab-consent", "consent-rp")
|
||||
assert consent is not None
|
||||
assert "email" not in consent.scopes
|
||||
assert set(consent.scopes) == {"openid", "profile"}
|
||||
|
||||
|
||||
# -- Test helpers --
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import hashlib
|
||||
import json
|
||||
import secrets
|
||||
from base64 import b64encode
|
||||
from base64 import b64encode, urlsafe_b64encode
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
|
@ -54,19 +55,36 @@ async def _setup_rp_and_user(app: FastAPI) -> None:
|
|||
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
|
||||
|
||||
|
||||
async def _login_and_authorize(client: AsyncClient, state: str, nonce: str) -> str:
|
||||
def _pkce_pair() -> tuple[str, str]:
|
||||
"""Return a (code_verifier, S256 code_challenge) pair."""
|
||||
verifier = secrets.token_urlsafe(64)
|
||||
challenge = urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()).rstrip(b"=").decode()
|
||||
return verifier, challenge
|
||||
|
||||
|
||||
async def _login_and_authorize(
|
||||
client: AsyncClient,
|
||||
state: str,
|
||||
nonce: str,
|
||||
code_challenge: str | None = None,
|
||||
code_challenge_method: str | None = None,
|
||||
) -> str:
|
||||
"""Perform authorization request, login, consent, and return the auth code."""
|
||||
params = {
|
||||
"response_type": "code",
|
||||
"client_id": CLIENT_ID,
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
"scope": "openid profile email",
|
||||
"state": state,
|
||||
"nonce": nonce,
|
||||
}
|
||||
if code_challenge is not None:
|
||||
params["code_challenge"] = code_challenge
|
||||
params["code_challenge_method"] = code_challenge_method or "S256"
|
||||
# Step 1: Authorization request (unauthenticated) -> redirect to /login
|
||||
auth_res = await client.get(
|
||||
"/authorization",
|
||||
params={
|
||||
"response_type": "code",
|
||||
"client_id": CLIENT_ID,
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
"scope": "openid profile email",
|
||||
"state": state,
|
||||
"nonce": nonce,
|
||||
},
|
||||
params=params,
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert auth_res.status_code in (302, 303), f"Expected redirect to /login, got {auth_res.status_code}"
|
||||
|
|
@ -116,16 +134,19 @@ async def _login_and_authorize(client: AsyncClient, state: str, nonce: str) -> s
|
|||
return code
|
||||
|
||||
|
||||
async def _exchange_token(client: AsyncClient, code: str) -> dict[str, Any]:
|
||||
async def _exchange_token(client: AsyncClient, code: str, code_verifier: str | None = None) -> dict[str, Any]:
|
||||
"""Exchange authorization code for tokens."""
|
||||
auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()
|
||||
data = {
|
||||
"grant_type": "authorization_code",
|
||||
"code": code,
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
}
|
||||
if code_verifier is not None:
|
||||
data["code_verifier"] = code_verifier
|
||||
token_res = await client.post(
|
||||
"/token",
|
||||
data={
|
||||
"grant_type": "authorization_code",
|
||||
"code": code,
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
},
|
||||
data=data,
|
||||
headers={
|
||||
"Authorization": f"Basic {auth_header}",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
|
|
@ -199,3 +220,46 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None:
|
|||
|
||||
# sub should match between id_token and userinfo
|
||||
assert userinfo_data["sub"] == id_token_sub
|
||||
|
||||
|
||||
async def test_discovery_advertises_s256_pkce(client: AsyncClient) -> None:
|
||||
res = await client.get("/.well-known/openid-configuration")
|
||||
assert res.status_code == 200
|
||||
assert res.json().get("code_challenge_methods_supported") == ["S256"]
|
||||
|
||||
|
||||
async def test_pkce_s256_flow_succeeds(client: AsyncClient) -> None:
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
await _setup_rp_and_user(app)
|
||||
state, nonce = secrets.token_urlsafe(16), secrets.token_urlsafe(16)
|
||||
verifier, challenge = _pkce_pair()
|
||||
|
||||
code = await _login_and_authorize(client, state, nonce, code_challenge=challenge)
|
||||
token_data = await _exchange_token(client, code, code_verifier=verifier)
|
||||
assert "access_token" in token_data
|
||||
|
||||
|
||||
async def test_pkce_wrong_verifier_rejected(client: AsyncClient) -> None:
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
await _setup_rp_and_user(app)
|
||||
state, nonce = secrets.token_urlsafe(16), secrets.token_urlsafe(16)
|
||||
_verifier, challenge = _pkce_pair()
|
||||
|
||||
code = await _login_and_authorize(client, state, nonce, code_challenge=challenge)
|
||||
|
||||
# Exchange with a verifier that does not match the challenge -> rejected.
|
||||
auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()
|
||||
res = await client.post(
|
||||
"/token",
|
||||
data={
|
||||
"grant_type": "authorization_code",
|
||||
"code": code,
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
"code_verifier": "wrong-" + secrets.token_urlsafe(48),
|
||||
},
|
||||
headers={
|
||||
"Authorization": f"Basic {auth_header}",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
},
|
||||
)
|
||||
assert res.status_code != 200, f"PKCE verifier mismatch should be rejected, got {res.status_code}: {res.text}"
|
||||
|
|
|
|||
|
|
@ -31,7 +31,8 @@ async def _login_user_with_password(client: AsyncClient) -> str:
|
|||
data={"username": "pwuser", "password": "OldPass123!ok"},
|
||||
headers={"HX-Request": "true", "X-CSRF-Token": token},
|
||||
)
|
||||
return token
|
||||
# Login resets the session (fixation defense); fetch a fresh CSRF token.
|
||||
return await get_csrf_token(client)
|
||||
|
||||
|
||||
async def _login_user_without_password(client: AsyncClient) -> str:
|
||||
|
|
@ -47,11 +48,15 @@ async def _login_user_without_password(client: AsyncClient) -> str:
|
|||
)
|
||||
await user_repo.create(user)
|
||||
|
||||
# Simulate session login via magic link
|
||||
# Simulate session login via magic link (GET shows the form, POST consumes)
|
||||
token = await get_csrf_token(client)
|
||||
magic_link_service = app.state.magic_link_service
|
||||
link = await magic_link_service.create(username="newuser", created_by="admin", note="test")
|
||||
await client.get(f"/register/{link.token}", follow_redirects=False)
|
||||
await client.post(
|
||||
f"/register/{link.token}",
|
||||
headers={"X-CSRF-Token": token},
|
||||
follow_redirects=False,
|
||||
)
|
||||
# Re-fetch CSRF token after session change
|
||||
token = await get_csrf_token(client)
|
||||
return token
|
||||
|
|
|
|||
|
|
@ -1,6 +1,10 @@
|
|||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
from httpx import AsyncClient
|
||||
from starlette.requests import Request
|
||||
|
||||
from porchlight.rate_limit import _client_identifier
|
||||
from tests.conftest import get_csrf_token
|
||||
|
||||
|
||||
|
|
@ -23,3 +27,38 @@ async def test_password_login_rate_limited(client: AsyncClient) -> None:
|
|||
)
|
||||
|
||||
assert response.status_code == 429
|
||||
|
||||
|
||||
def _make_request(headers: dict[str, str], client_host: str, trusted_proxy_count: int) -> Request:
|
||||
scope = {
|
||||
"type": "http",
|
||||
"headers": [(k.lower().encode(), v.encode()) for k, v in headers.items()],
|
||||
"client": (client_host, 12345),
|
||||
"app": SimpleNamespace(
|
||||
state=SimpleNamespace(settings=SimpleNamespace(trusted_proxy_count=trusted_proxy_count))
|
||||
),
|
||||
}
|
||||
return Request(scope)
|
||||
|
||||
|
||||
def test_client_identifier_ignores_forwarded_when_no_trusted_proxy() -> None:
|
||||
req = _make_request({"x-forwarded-for": "203.0.113.9"}, "127.0.0.1", trusted_proxy_count=0)
|
||||
# Spoofable header must be ignored when no proxy is trusted.
|
||||
assert _client_identifier(req) == "127.0.0.1"
|
||||
|
||||
|
||||
def test_client_identifier_uses_forwarded_with_one_trusted_proxy() -> None:
|
||||
req = _make_request({"x-forwarded-for": "203.0.113.9"}, "10.0.0.1", trusted_proxy_count=1)
|
||||
assert _client_identifier(req) == "203.0.113.9"
|
||||
|
||||
|
||||
def test_client_identifier_picks_correct_hop_with_two_trusted_proxies() -> None:
|
||||
req = _make_request({"x-forwarded-for": "client, proxyA, proxyB"}, "10.0.0.1", trusted_proxy_count=2)
|
||||
# Two trusted proxies appended their peers; the client as seen by the
|
||||
# outermost trusted proxy is the 2nd-from-right entry.
|
||||
assert _client_identifier(req) == "proxyA"
|
||||
|
||||
|
||||
def test_client_identifier_falls_back_when_forwarded_missing() -> None:
|
||||
req = _make_request({}, "10.0.0.1", trusted_proxy_count=1)
|
||||
assert _client_identifier(req) == "10.0.0.1"
|
||||
|
|
|
|||
|
|
@ -182,3 +182,48 @@ async def test_create_duplicate_webauthn(credential_repo: SQLiteCredentialReposi
|
|||
|
||||
with pytest.raises(DuplicateError):
|
||||
await credential_repo.create_webauthn(cred)
|
||||
|
||||
|
||||
async def test_delete_password_if_not_last_refuses_last(
|
||||
credential_repo: SQLiteCredentialRepository, alice: User
|
||||
) -> None:
|
||||
await credential_repo.create_password(PasswordCredential(user_id=alice.userid, password_hash="h"))
|
||||
|
||||
# Only credential -> must refuse and leave it in place.
|
||||
assert await credential_repo.delete_password_if_not_last(alice.userid) is False
|
||||
assert await credential_repo.get_password_by_user(alice.userid) is not None
|
||||
|
||||
|
||||
async def test_delete_password_if_not_last_allows_when_others_exist(
|
||||
credential_repo: SQLiteCredentialRepository, alice: User
|
||||
) -> None:
|
||||
await credential_repo.create_password(PasswordCredential(user_id=alice.userid, password_hash="h"))
|
||||
await credential_repo.create_webauthn(
|
||||
WebAuthnCredential(user_id=alice.userid, credential_id=b"\x01", public_key=b"\x02")
|
||||
)
|
||||
|
||||
assert await credential_repo.delete_password_if_not_last(alice.userid) is True
|
||||
assert await credential_repo.get_password_by_user(alice.userid) is None
|
||||
|
||||
|
||||
async def test_delete_webauthn_if_not_last_refuses_last(
|
||||
credential_repo: SQLiteCredentialRepository, alice: User
|
||||
) -> None:
|
||||
await credential_repo.create_webauthn(
|
||||
WebAuthnCredential(user_id=alice.userid, credential_id=b"\x01", public_key=b"\x02")
|
||||
)
|
||||
|
||||
assert await credential_repo.delete_webauthn_if_not_last(alice.userid, b"\x01") is False
|
||||
assert len(await credential_repo.get_webauthn_by_user(alice.userid)) == 1
|
||||
|
||||
|
||||
async def test_delete_webauthn_if_not_last_allows_when_others_exist(
|
||||
credential_repo: SQLiteCredentialRepository, alice: User
|
||||
) -> None:
|
||||
await credential_repo.create_webauthn(
|
||||
WebAuthnCredential(user_id=alice.userid, credential_id=b"\x01", public_key=b"\x02")
|
||||
)
|
||||
await credential_repo.create_password(PasswordCredential(user_id=alice.userid, password_hash="h"))
|
||||
|
||||
assert await credential_repo.delete_webauthn_if_not_last(alice.userid, b"\x01") is True
|
||||
assert len(await credential_repo.get_webauthn_by_user(alice.userid)) == 0
|
||||
|
|
|
|||
|
|
@ -63,6 +63,34 @@ async def test_mark_used_not_found(magic_link_repo: SQLiteMagicLinkRepository) -
|
|||
assert marked is False
|
||||
|
||||
|
||||
async def test_consume_marks_used_and_returns_link(magic_link_repo: SQLiteMagicLinkRepository) -> None:
|
||||
await magic_link_repo.create(_make_link())
|
||||
|
||||
consumed = await magic_link_repo.consume("abc123")
|
||||
assert consumed is not None
|
||||
assert consumed.used is True
|
||||
assert consumed.username == "alice"
|
||||
|
||||
|
||||
async def test_consume_is_single_use(magic_link_repo: SQLiteMagicLinkRepository) -> None:
|
||||
await magic_link_repo.create(_make_link())
|
||||
|
||||
first = await magic_link_repo.consume("abc123")
|
||||
assert first is not None
|
||||
# A second consume of the same token must fail (atomic single-use).
|
||||
second = await magic_link_repo.consume("abc123")
|
||||
assert second is None
|
||||
|
||||
|
||||
async def test_consume_expired_returns_none(magic_link_repo: SQLiteMagicLinkRepository) -> None:
|
||||
await magic_link_repo.create(_make_link(token="exp", expires_at=datetime.now(UTC) - timedelta(hours=1)))
|
||||
assert await magic_link_repo.consume("exp") is None
|
||||
|
||||
|
||||
async def test_consume_nonexistent_returns_none(magic_link_repo: SQLiteMagicLinkRepository) -> None:
|
||||
assert await magic_link_repo.consume("nope") is None
|
||||
|
||||
|
||||
async def test_delete_expired(magic_link_repo: SQLiteMagicLinkRepository) -> None:
|
||||
expired = _make_link(token="expired", expires_at=datetime.now(UTC) - timedelta(hours=1))
|
||||
await magic_link_repo.create(expired)
|
||||
|
|
|
|||
|
|
@ -237,3 +237,14 @@ async def test_roundtrip_preserves_all_fields(user_repo: SQLiteUserRepository) -
|
|||
assert sorted(fetched.groups) == ["admin", "users"]
|
||||
assert fetched.created_at == user.created_at
|
||||
assert fetched.updated_at == user.updated_at
|
||||
|
||||
|
||||
async def test_count_active_admins(user_repo: SQLiteUserRepository) -> None:
|
||||
await user_repo.create(_make_user(userid="a1", username="admin1", groups=["admin", "users"]))
|
||||
await user_repo.create(_make_user(userid="a2", username="admin2", groups=["admin"]))
|
||||
# Inactive admin must not count.
|
||||
await user_repo.create(_make_user(userid="a3", username="admin3", groups=["admin"], active=False))
|
||||
# Non-admin must not count.
|
||||
await user_repo.create(_make_user(userid="u1", username="user1", groups=["users"]))
|
||||
|
||||
assert await user_repo.count_active_admins() == 2
|
||||
|
|
|
|||
|
|
@ -292,3 +292,15 @@ class TestFormatValidationErrors:
|
|||
result = format_validation_errors(exc)
|
||||
assert "Picture URL must be a valid HTTP or HTTPS URL" in result
|
||||
assert "Value error" not in result
|
||||
|
||||
def test_escapes_user_input_in_error_message(self) -> None:
|
||||
# A validation error message that echoes user input (e.g. an invalid
|
||||
# group name) must not emit raw HTML — otherwise it is reflected XSS.
|
||||
payload = "<img src=x onerror=alert(1)>"
|
||||
try:
|
||||
GroupListInput(groups=payload)
|
||||
except ValidationError as exc:
|
||||
result = format_validation_errors(exc)
|
||||
assert payload not in result
|
||||
assert "<img src=x onerror=alert(1)>" in result
|
||||
assert "<img" not in result
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue