diff --git a/src/porchlight/admin/routes.py b/src/porchlight/admin/routes.py
index fe7404f..4bcc0a8 100644
--- a/src/porchlight/admin/routes.py
+++ b/src/porchlight/admin/routes.py
@@ -11,15 +11,6 @@ from porchlight.validation import GroupListInput, ProfileUpdate, UsernameInput,
router = APIRouter(prefix="/admin", tags=["admin"])
-ADMIN_GROUP = "admin"
-
-
-async def _is_last_active_admin(request: Request, user: User) -> bool:
- """True if removing this user's admin access would leave zero active admins."""
- if not user.active or ADMIN_GROUP not in user.groups:
- return False
- return await request.app.state.user_repo.count_active_admins() <= 1
-
async def _get_admin_user(request: Request) -> User | None:
"""Return the current user if they are an admin, else None."""
@@ -207,9 +198,6 @@ async def update_user_groups(
if user is None:
return HTMLResponse("User not found", status_code=404)
- if ADMIN_GROUP not in validated.group_list and await _is_last_active_admin(request, user):
- return HTMLResponse('
')
user_repo = request.app.state.user_repo
- target = await user_repo.get_by_userid(userid)
- if target is not None and await _is_last_active_admin(request, target):
- return HTMLResponse('
Cannot delete the last active admin
')
-
deleted = await user_repo.delete(userid)
if not deleted:
return HTMLResponse("User not found", status_code=404)
diff --git a/src/porchlight/authn/routes.py b/src/porchlight/authn/routes.py
index 606b315..c710f3f 100644
--- a/src/porchlight/authn/routes.py
+++ b/src/porchlight/authn/routes.py
@@ -23,21 +23,6 @@ def _login_redirect_target(request: Request) -> str:
return "/manage/credentials"
-def _establish_authenticated_session(request: Request, user: User) -> None:
- """Reset the session before recording the authenticated identity.
-
- Clearing first defeats session fixation: any values an attacker planted in
- the pre-auth session are dropped and the session cookie is reissued. A
- pending OIDC authorization request is the only pre-auth state worth keeping.
- """
- pending_oidc = request.session.get("oidc_auth_request")
- request.session.clear()
- if pending_oidc is not None:
- request.session["oidc_auth_request"] = pending_oidc
- request.session["userid"] = user.userid
- request.session["username"] = user.username
-
-
@router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request) -> HTMLResponse:
templates = request.app.state.templates
@@ -71,7 +56,8 @@ async def login_password(
if not user.active:
return HTMLResponse(error_html)
- _establish_authenticated_session(request, user)
+ request.session["userid"] = user.userid
+ request.session["username"] = user.username
response = Response()
response.headers["HX-Redirect"] = _login_redirect_target(request)
@@ -86,34 +72,12 @@ async def logout(request: Request) -> Response:
return response
-@router.get("/register/{token}", response_class=HTMLResponse)
-async def register_magic_link_page(request: Request, token: str) -> Response:
- """Show the registration confirmation page.
-
- GET is side-effect free: it only validates the token (without consuming it)
- and renders a form. The token is consumed by the POST below, so that simply
- visiting the link (email scanners, prefetchers) cannot create a session.
- """
- magic_link_service = request.app.state.magic_link_service
- link = await magic_link_service.validate(token)
- if link is None:
- return HTMLResponse("
Invalid or expired registration link.
", status_code=400)
-
- templates = request.app.state.templates
- return templates.TemplateResponse(
- request,
- "register.html",
- {"token": token, "username": link.username},
- )
-
-
-@router.post("/register/{token}")
+@router.get("/register/{token}")
async def register_magic_link(request: Request, token: str) -> Response:
magic_link_service = request.app.state.magic_link_service
user_repo = request.app.state.user_repo
- # Atomically validate and consume the token (single-use, no replay race).
- link = await magic_link_service.consume(token)
+ link = await magic_link_service.validate(token)
if link is None:
return HTMLResponse("
Invalid or expired registration link.
", status_code=400)
@@ -121,25 +85,16 @@ async def register_magic_link(request: Request, token: str) -> Response:
if existing_user is not None:
if not existing_user.active:
return HTMLResponse("
This account has been deactivated.
", status_code=400)
- # An invite link is for account setup, not authentication. If the
- # account already has credentials, refuse to establish a session —
- # otherwise a re-invite would be a passwordless login. Account
- # recovery must go through a separate, explicitly authenticated flow.
- cred_repo = request.app.state.credential_repo
- has_password = await cred_repo.get_password_by_user(existing_user.userid) is not None
- has_webauthn = bool(await cred_repo.get_webauthn_by_user(existing_user.userid))
- if has_password or has_webauthn:
- return HTMLResponse(
- "
This account is already set up. Please sign in instead.
",
- status_code=400,
- )
user = existing_user
else:
userid = await generate_unique_userid(user_repo)
user = User(userid=userid, username=link.username, groups=["users"])
await user_repo.create(user)
- _establish_authenticated_session(request, user)
+ await magic_link_service.mark_used(token)
+
+ request.session["userid"] = user.userid
+ request.session["username"] = user.username
return RedirectResponse("/manage/credentials?setup=1", status_code=303)
@@ -201,6 +156,7 @@ async def login_webauthn_complete(request: Request) -> Response:
if user is None or not user.active:
return JSONResponse({"error": "Authentication failed"}, status_code=400)
- _establish_authenticated_session(request, user)
+ request.session["userid"] = user.userid
+ request.session["username"] = user.username
return JSONResponse({"redirect": _login_redirect_target(request)})
diff --git a/src/porchlight/config.py b/src/porchlight/config.py
index dff55a5..38fb597 100644
--- a/src/porchlight/config.py
+++ b/src/porchlight/config.py
@@ -54,10 +54,6 @@ class Settings(BaseSettings):
# Rate limiting (disable for e2e/load tests that authenticate repeatedly)
rate_limit_enabled: bool = True
- # Number of trusted reverse proxies in front of the app. When > 0, the
- # client IP for rate limiting is taken from X-Forwarded-For, counting this
- # many hops from the right. Keep 0 unless deployed behind a known proxy.
- trusted_proxy_count: int = 0
# Signing keys
signing_key_path: str = "data/keys"
diff --git a/src/porchlight/invite/service.py b/src/porchlight/invite/service.py
index 4766442..fd5bebf 100644
--- a/src/porchlight/invite/service.py
+++ b/src/porchlight/invite/service.py
@@ -1,5 +1,3 @@
-import hashlib
-import hmac
import secrets
from datetime import UTC, datetime, timedelta
@@ -8,23 +6,11 @@ from porchlight.store.protocols import MagicLinkRepository
class MagicLinkService:
- """Magic link token lifecycle management.
+ """Magic link token lifecycle management."""
- The raw token is only ever exposed to the user (in the registration URL).
- Only a hash of the token is persisted, so a database disclosure does not
- yield usable tokens. A pepper, when configured, keys the hash (HMAC);
- otherwise a plain SHA-256 of the high-entropy token is stored.
- """
-
- def __init__(self, repo: MagicLinkRepository, ttl: int = 86400, pepper: str | None = None) -> None:
+ def __init__(self, repo: MagicLinkRepository, ttl: int = 86400) -> None:
self._repo = repo
self._ttl = ttl
- self._pepper = pepper
-
- def _hash_token(self, token: str) -> str:
- if self._pepper:
- return hmac.new(self._pepper.encode(), token.encode(), hashlib.sha256).hexdigest()
- return hashlib.sha256(token.encode()).hexdigest()
async def create(
self,
@@ -32,52 +18,34 @@ class MagicLinkService:
created_by: str | None = None,
note: str | None = None,
) -> MagicLink:
- """Generate and store a new magic link for the given username.
-
- Persists only the hashed token; the returned MagicLink carries the raw
- token so the caller can build the registration URL.
- """
+ """Generate and store a new magic link for the given username."""
token = secrets.token_urlsafe(32)
expires_at = datetime.now(UTC) + timedelta(seconds=self._ttl)
- stored = MagicLink(
- token=self._hash_token(token),
+ link = MagicLink(
+ token=token,
username=username,
expires_at=expires_at,
created_by=created_by,
note=note,
)
- await self._repo.create(stored)
- return stored.model_copy(update={"token": token})
+ return await self._repo.create(link)
async def validate(self, token: str) -> MagicLink | None:
"""Validate a magic link token.
- Returns the MagicLink (with the raw token re-attached) if it exists, is
- not used, and has not expired. Returns None otherwise.
+ Returns the MagicLink if it exists, is not used, and has not expired.
+ Returns None otherwise.
"""
- link = await self._repo.get_by_token(self._hash_token(token))
+ link = await self._repo.get_by_token(token)
if link is None or link.used:
return None
if link.expires_at < datetime.now(UTC):
return None
- return link.model_copy(update={"token": token})
+ return link
async def mark_used(self, token: str) -> bool:
"""Mark a magic link as used. Returns True if found and marked."""
- return await self._repo.mark_used(self._hash_token(token))
-
- async def consume(self, token: str) -> MagicLink | None:
- """Atomically validate and consume a token in one step.
-
- Prefer this over validate()+mark_used(): it closes the replay race
- where two concurrent requests could both pass validation before either
- marks the token used. Returns the link (raw token re-attached) on
- success, else None.
- """
- link = await self._repo.consume(self._hash_token(token))
- if link is None:
- return None
- return link.model_copy(update={"token": token})
+ return await self._repo.mark_used(token)
async def cleanup_expired(self) -> int:
"""Delete expired unused links. Returns count deleted."""
diff --git a/src/porchlight/manage/routes.py b/src/porchlight/manage/routes.py
index 92a7912..69afdff 100644
--- a/src/porchlight/manage/routes.py
+++ b/src/porchlight/manage/routes.py
@@ -13,6 +13,13 @@ from porchlight.validation import PasswordChange, PasswordSet, ProfileUpdate, fo
router = APIRouter(prefix="/manage", tags=["manage"])
+async def _count_credentials(cred_repo: object, userid: str) -> int:
+ """Count total credentials (password + webauthn) for a user."""
+ webauthn = await cred_repo.get_webauthn_by_user(userid) # type: ignore[union-attr]
+ password = await cred_repo.get_password_by_user(userid) # type: ignore[union-attr]
+ return len(webauthn) + (1 if password else 0)
+
+
@router.get("/credentials", response_class=HTMLResponse)
async def credentials_page(request: Request) -> Response:
session_user = get_session_user(request)
@@ -100,9 +107,11 @@ async def delete_password(request: Request) -> Response:
userid, _username = session_user
cred_repo = request.app.state.credential_repo
- # Atomic: refuses if this is the user's last credential (not raceable).
- if not await cred_repo.delete_password_if_not_last(userid):
+ count = await _count_credentials(cred_repo, userid)
+ if count <= 1:
return HTMLResponse('
", status_code=400)
- # Allow — collect approved scopes, rejecting anything outside the
- # originally requested set (a forged form must not escalate scope).
- requested_scopes = set(auth_params.get("scope", "openid").split())
- approved_scopes: list[str] = [str(s) for s in form.getlist("scope") if str(s) in requested_scopes]
+ # Allow — collect approved scopes
+ approved_scopes: list[str] = [str(s) for s in form.getlist("scope")]
if "openid" not in approved_scopes:
approved_scopes = ["openid", *list(approved_scopes)]
@@ -371,6 +359,6 @@ async def consent_submit(request: Request) -> Response:
if "error" in parsed:
raise ValueError(parsed.get("error_description", parsed["error"]))
except Exception as exc:
- return _error_page(exc)
+ return HTMLResponse(f"
Error
{exc}
", status_code=400)
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username)
diff --git a/src/porchlight/oidc/provider.py b/src/porchlight/oidc/provider.py
index aae64d3..d1ac38b 100644
--- a/src/porchlight/oidc/provider.py
+++ b/src/porchlight/oidc/provider.py
@@ -3,7 +3,6 @@
from pathlib import Path
from idpyoidc.server import Server
-from idpyoidc.server.oauth2.add_on.pkce import CC_METHOD
from porchlight.config import Settings
from porchlight.oidc.claims import PorchlightUserInfo
@@ -127,20 +126,6 @@ def _build_server_config(settings: Settings) -> dict:
"phone": ["phone_number", "phone_number_verified"],
},
"authentication": {},
- # PKCE: advertise and verify S256 (only). Not "essential", so existing
- # relying parties that do not yet send a code_challenge keep working;
- # but any RP that does use PKCE must use S256 and is verified at the
- # token endpoint. Set essential=True (or per-client pkce_essential) to
- # require it once all clients have migrated.
- "add_on": {
- "pkce": {
- "function": "idpyoidc.server.oauth2.add_on.pkce.add_support",
- "kwargs": {
- "essential": False,
- "code_challenge_methods": {"S256": CC_METHOD["S256"]},
- },
- },
- },
}
diff --git a/src/porchlight/rate_limit.py b/src/porchlight/rate_limit.py
index a0a6eff..38404a8 100644
--- a/src/porchlight/rate_limit.py
+++ b/src/porchlight/rate_limit.py
@@ -1,25 +1,4 @@
from slowapi import Limiter
from slowapi.util import get_remote_address
-from starlette.requests import Request
-
-def _client_identifier(request: Request) -> str:
- """Rate-limit key: the client IP, proxy-aware.
-
- By default the direct peer address is used. When ``trusted_proxy_count`` is
- configured (the number of trusted reverse proxies in front of the app), the
- client address is taken from the ``X-Forwarded-For`` chain instead, counting
- that many hops back from the right. The header is ignored unless proxies are
- trusted, so it cannot be spoofed in a direct-connection deployment.
- """
- settings = getattr(getattr(request.app, "state", None), "settings", None)
- hops: int = getattr(settings, "trusted_proxy_count", 0) or 0
- if hops > 0:
- forwarded = request.headers.get("x-forwarded-for", "")
- parts = [p.strip() for p in forwarded.split(",") if p.strip()]
- if len(parts) >= hops:
- return parts[-hops]
- return get_remote_address(request)
-
-
-limiter = Limiter(key_func=_client_identifier)
+limiter = Limiter(key_func=get_remote_address)
diff --git a/src/porchlight/static/style.css b/src/porchlight/static/style.css
index 15dc79e..17319ff 100644
--- a/src/porchlight/static/style.css
+++ b/src/porchlight/static/style.css
@@ -205,10 +205,6 @@ main {
line-height: 1;
}
-#user-table-container {
- overflow-x: auto;
-}
-
.admin-table {
width: 100%;
border-collapse: collapse;
@@ -218,7 +214,7 @@ main {
.admin-table th,
.admin-table td {
text-align: left;
- padding: var(--sp-2);
+ padding: var(--sp-2) var(--sp-3);
border-bottom: 1px solid var(--border);
}
diff --git a/src/porchlight/store/protocols.py b/src/porchlight/store/protocols.py
index 53e7c37..ba1af34 100644
--- a/src/porchlight/store/protocols.py
+++ b/src/porchlight/store/protocols.py
@@ -25,8 +25,6 @@ class UserRepository(Protocol):
async def count_users(self, query: str | None = None) -> int: ...
- async def count_active_admins(self) -> int: ...
-
async def delete(self, userid: str) -> bool: ...
@@ -48,10 +46,6 @@ class CredentialRepository(Protocol):
async def delete_password(self, user_id: str) -> bool: ...
- async def delete_password_if_not_last(self, user_id: str) -> bool: ...
-
- async def delete_webauthn_if_not_last(self, user_id: str, credential_id: bytes) -> bool: ...
-
@runtime_checkable
class MagicLinkRepository(Protocol):
@@ -61,8 +55,6 @@ class MagicLinkRepository(Protocol):
async def mark_used(self, token: str) -> bool: ...
- async def consume(self, token: str) -> MagicLink | None: ...
-
async def delete_expired(self) -> int: ...
diff --git a/src/porchlight/store/sqlite/repositories.py b/src/porchlight/store/sqlite/repositories.py
index f9ebd81..37d02bb 100644
--- a/src/porchlight/store/sqlite/repositories.py
+++ b/src/porchlight/store/sqlite/repositories.py
@@ -163,17 +163,6 @@ class SQLiteUserRepository:
row = await cursor.fetchone()
return row[0] if row else 0
- async def count_active_admins(self) -> int:
- async with self._db.execute(
- """
- SELECT COUNT(*) FROM users u
- JOIN user_groups g ON g.userid = u.userid
- WHERE u.active = 1 AND g.group_name = 'admin'
- """
- ) as cursor:
- row = await cursor.fetchone()
- return row[0] if row else 0
-
async def delete(self, userid: str) -> bool:
cursor = await self._db.execute("DELETE FROM users WHERE userid = ?", (userid,))
await self._db.commit()
@@ -277,41 +266,6 @@ class SQLiteCredentialRepository:
await self._db.commit()
return cursor.rowcount > 0
- async def delete_password_if_not_last(self, user_id: str) -> bool:
- """Delete the password credential only if it is not the user's last
- credential. The count and delete happen in one atomic statement, so it
- is not raceable. Returns True if a row was deleted."""
- cursor = await self._db.execute(
- """
- DELETE FROM password_credentials
- WHERE user_id = ?
- AND (
- (SELECT COUNT(*) FROM password_credentials WHERE user_id = ?)
- + (SELECT COUNT(*) FROM webauthn_credentials WHERE user_id = ?)
- ) > 1
- """,
- (user_id, user_id, user_id),
- )
- await self._db.commit()
- return cursor.rowcount > 0
-
- async def delete_webauthn_if_not_last(self, user_id: str, credential_id: bytes) -> bool:
- """Delete a WebAuthn credential only if it is not the user's last
- credential, atomically. Returns True if a row was deleted."""
- cursor = await self._db.execute(
- """
- DELETE FROM webauthn_credentials
- WHERE user_id = ? AND credential_id = ?
- AND (
- (SELECT COUNT(*) FROM password_credentials WHERE user_id = ?)
- + (SELECT COUNT(*) FROM webauthn_credentials WHERE user_id = ?)
- ) > 1
- """,
- (user_id, credential_id, user_id, user_id),
- )
- await self._db.commit()
- return cursor.rowcount > 0
-
class SQLiteMagicLinkRepository:
def __init__(self, db: aiosqlite.Connection) -> None:
@@ -357,25 +311,6 @@ class SQLiteMagicLinkRepository:
await self._db.commit()
return cursor.rowcount > 0
- async def consume(self, token: str) -> MagicLink | None:
- """Atomically validate-and-mark a token as used.
-
- The conditional UPDATE is the single point of decision, so two
- concurrent requests cannot both consume the same token. Returns the
- link if this call won the race (unused and unexpired), else None.
- """
- now = datetime.now(UTC).isoformat()
- cursor = await self._db.execute(
- "UPDATE magic_links SET used = 1 WHERE token = ? AND used = 0 AND expires_at > ?",
- (token, now),
- )
- await self._db.commit()
- if cursor.rowcount == 0:
- return None
- async with self._db.execute("SELECT * FROM magic_links WHERE token = ?", (token,)) as c:
- row = await c.fetchone()
- return self._row_to_magic_link(row) if row is not None else None
-
async def delete_expired(self) -> int:
now = datetime.now(UTC).isoformat()
cursor = await self._db.execute("DELETE FROM magic_links WHERE expires_at < ? AND used = 0", (now,))
diff --git a/src/porchlight/templates/admin/_credentials_section.html b/src/porchlight/templates/admin/_credentials_section.html
index fbfbe8e..b2351eb 100644
--- a/src/porchlight/templates/admin/_credentials_section.html
+++ b/src/porchlight/templates/admin/_credentials_section.html
@@ -1,4 +1,3 @@
-{% if error %}
You're about to set up the account for {{ username }}.
-
-
-{% endblock %}
diff --git a/src/porchlight/validation.py b/src/porchlight/validation.py
index e675e0b..5d3b48c 100644
--- a/src/porchlight/validation.py
+++ b/src/porchlight/validation.py
@@ -1,4 +1,3 @@
-import html
import re
from typing import Annotated
from urllib.parse import urlparse
@@ -167,9 +166,7 @@ def format_validation_errors(exc: ValidationError) -> str:
display_msg = f"{label}: {raw}"
else:
display_msg = f"{label}: {msg}"
- # Escape: messages can echo user input (e.g. an invalid group name),
- # and the result is interpolated into HTML below.
- messages.append(html.escape(display_msg))
+ messages.append(display_msg)
if len(messages) == 1:
return f'
{messages[0]}
'
diff --git a/tests/e2e/full-flow.spec.js b/tests/e2e/full-flow.spec.js
index 584078f..be797d2 100644
--- a/tests/e2e/full-flow.spec.js
+++ b/tests/e2e/full-flow.spec.js
@@ -9,9 +9,7 @@ test.describe('Full user journey', () => {
expect(fixtures.register_token).toBeTruthy();
// ---- Step 1: Register via magic link ----
- // GET shows a confirmation page; submitting it (POST) consumes the token.
await page.goto(`/register/${fixtures.register_token}`);
- await page.click('button[type="submit"]');
// Should redirect to /manage/credentials?setup=1
await page.waitForURL('**/manage/credentials?setup=1', { timeout: 5000 });
diff --git a/tests/test_admin/test_admin_routes.py b/tests/test_admin/test_admin_routes.py
index ae87b93..603af60 100644
--- a/tests/test_admin/test_admin_routes.py
+++ b/tests/test_admin/test_admin_routes.py
@@ -328,15 +328,11 @@ async def test_delete_password_credential(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob")
- # Create a password credential for the target user, plus a second
- # credential so the password is not the last one (last-credential guard).
+ # Create a password credential for the target user
app = client._transport.app # type: ignore[union-attr]
cred_repo = app.state.credential_repo
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("bobpass")))
- await cred_repo.create_webauthn(
- WebAuthnCredential(user_id=target.userid, credential_id=b"\xaa\xbb", public_key=b"\x00" * 32)
- )
token = await get_csrf_token(client)
response = await client.request(
@@ -355,8 +351,7 @@ async def test_delete_webauthn_credential(client: AsyncClient) -> None:
await _login(client)
target = await _create_target_user(client, username="bob")
- # Create a webauthn credential for the target user, plus a second
- # credential so the key is not the last one (last-credential guard).
+ # Create a webauthn credential for the target user
app = client._transport.app # type: ignore[union-attr]
cred_repo = app.state.credential_repo
credential_id = b"\x01\x02\x03\x04\x05\x06\x07\x08"
@@ -369,8 +364,6 @@ async def test_delete_webauthn_credential(client: AsyncClient) -> None:
device_name="test-key",
)
)
- svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
- await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("bobpass")))
# URL uses base64url without padding
credential_id_b64 = urlsafe_b64encode(credential_id).decode().rstrip("=")
@@ -441,92 +434,3 @@ async def test_reinvite_user_404_for_nonexistent(client: AsyncClient) -> None:
response = await client.post("/admin/users/nonexistent-id/invite", headers={"X-CSRF-Token": token})
assert response.status_code == 404
assert "not found" in response.text.lower()
-
-
-@pytest.mark.asyncio
-async def test_admin_cannot_delete_users_last_credential(client: AsyncClient) -> None:
- await _login(client)
- target = await _create_target_user(client, userid="lonecred-01", username="lonecred")
-
- app = client._transport.app # type: ignore[union-attr]
- cred_repo = app.state.credential_repo
- svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
- await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("x")))
-
- token = await get_csrf_token(client)
- res = await client.request(
- "DELETE",
- f"/admin/users/{target.userid}/credentials/password",
- headers={"X-CSRF-Token": token, "HX-Request": "true"},
- )
-
- # The last credential must survive, and the admin is told why.
- assert await cred_repo.get_password_by_user(target.userid) is not None
- assert "last credential" in res.text.lower()
-
-
-async def _make_sole_admin_target(client: AsyncClient) -> User:
- """Create a second admin, then return it as the target. The logged-in
- admin from _login plus this one means 2 admins; callers deactivate the
- _login admin first if they need this to be the *last* admin."""
- return await _create_target_user(client, userid="admin2-01", username="admin2", groups=["admin"])
-
-
-@pytest.mark.asyncio
-async def test_cannot_deactivate_last_active_admin(client: AsyncClient) -> None:
- await _login(client) # the only admin
- app = client._transport.app # type: ignore[union-attr]
- admin = await app.state.user_repo.get_by_username("admin")
-
- token = await get_csrf_token(client)
- res = await client.post(
- f"/admin/users/{admin.userid}/deactivate",
- headers={"X-CSRF-Token": token, "HX-Request": "true"},
- )
- still = await app.state.user_repo.get_by_userid(admin.userid)
- assert still.active is True
- msg = res.text.lower()
- assert "last" in msg
- assert "admin" in msg
-
-
-@pytest.mark.asyncio
-async def test_cannot_remove_admin_group_from_last_admin(client: AsyncClient) -> None:
- await _login(client)
- app = client._transport.app # type: ignore[union-attr]
- admin = await app.state.user_repo.get_by_username("admin")
-
- token = await get_csrf_token(client)
- res = await client.post(
- f"/admin/users/{admin.userid}/groups",
- data={"groups": "users"}, # drops "admin"
- headers={"X-CSRF-Token": token, "HX-Request": "true"},
- )
- still = await app.state.user_repo.get_by_userid(admin.userid)
- assert "admin" in still.groups
- msg = res.text.lower()
- assert "last" in msg
- assert "admin" in msg
-
-
-@pytest.mark.asyncio
-async def test_cannot_delete_last_active_admin(client: AsyncClient) -> None:
- await _login(client)
- app = client._transport.app # type: ignore[union-attr]
- # A second, separate admin to delete (not self — self-delete is blocked separately).
- target = await _create_target_user(client, userid="otheradmin-01", username="otheradmin", groups=["admin"])
- # Deactivate the logged-in admin's peer? Instead make target the only ACTIVE admin
- # by deactivating the _login admin via repo, so deleting target would orphan admins.
- login_admin = await app.state.user_repo.get_by_username("admin")
- await app.state.user_repo.update(login_admin.model_copy(update={"active": False}))
-
- token = await get_csrf_token(client)
- res = await client.request(
- "DELETE",
- f"/admin/users/{target.userid}",
- headers={"X-CSRF-Token": token, "HX-Request": "true"},
- )
- assert await app.state.user_repo.get_by_userid(target.userid) is not None
- msg = res.text.lower()
- assert "last" in msg
- assert "admin" in msg
diff --git a/tests/test_admin_groups_validation.py b/tests/test_admin_groups_validation.py
index 32b1ecc..ff662e9 100644
--- a/tests/test_admin_groups_validation.py
+++ b/tests/test_admin_groups_validation.py
@@ -41,8 +41,6 @@ async def _setup_admin_and_target(client: AsyncClient) -> tuple[str, str]:
data={"username": "admin_g", "password": "AdminPass123!"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
- # Login resets the session (fixation defense); fetch a fresh CSRF token.
- token = await get_csrf_token(client)
return token, target.userid
diff --git a/tests/test_admin_invite_validation.py b/tests/test_admin_invite_validation.py
index 73ccd73..6bf1b68 100644
--- a/tests/test_admin_invite_validation.py
+++ b/tests/test_admin_invite_validation.py
@@ -32,8 +32,7 @@ async def _login_admin(client: AsyncClient) -> str:
data={"username": "admin", "password": "AdminPass123!"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
- # Login resets the session (fixation defense); fetch a fresh CSRF token.
- return await get_csrf_token(client)
+ return token
@pytest.mark.asyncio
diff --git a/tests/test_auth_routes/test_login_session.py b/tests/test_auth_routes/test_login_session.py
deleted file mode 100644
index 8a7f0cc..0000000
--- a/tests/test_auth_routes/test_login_session.py
+++ /dev/null
@@ -1,42 +0,0 @@
-from datetime import UTC, datetime
-
-from httpx import AsyncClient
-
-from porchlight.authn.password import PasswordHasher, PasswordService
-from porchlight.models import PasswordCredential, User
-from tests.conftest import get_csrf_token
-
-
-async def _create_active_user(client: AsyncClient) -> None:
- app = client._transport.app # type: ignore[union-attr]
- user = User(
- userid="login-01",
- username="loginuser",
- created_at=datetime.now(UTC),
- updated_at=datetime.now(UTC),
- )
- await app.state.user_repo.create(user)
- svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
- await app.state.credential_repo.create_password(
- PasswordCredential(user_id=user.userid, password_hash=svc.hash("password123!Secure"))
- )
-
-
-async def test_login_resets_session_to_prevent_fixation(client: AsyncClient) -> None:
- """Logging in must reset the pre-auth session so an attacker-fixed session
- cannot become authenticated. We observe this via the CSRF token (stored in
- the session) changing across login."""
- await _create_active_user(client)
-
- pre_login_token = await get_csrf_token(client)
-
- res = await client.post(
- "/login/password",
- data={"username": "loginuser", "password": "password123!Secure"},
- headers={"HX-Request": "true", "X-CSRF-Token": pre_login_token},
- )
- assert res.status_code == 200
- assert res.headers.get("HX-Redirect") # logged in
-
- post_login_token = await get_csrf_token(client)
- assert post_login_token != pre_login_token
diff --git a/tests/test_auth_routes/test_register_magic_link.py b/tests/test_auth_routes/test_register_magic_link.py
index 7ff47ba..1a03fed 100644
--- a/tests/test_auth_routes/test_register_magic_link.py
+++ b/tests/test_auth_routes/test_register_magic_link.py
@@ -1,10 +1,8 @@
-from argon2 import PasswordHasher
+from datetime import UTC, datetime, timedelta
+
from httpx import AsyncClient
-from porchlight.authn.password import PasswordService
-from porchlight.invite.service import MagicLinkService
-from porchlight.models import PasswordCredential, User
-from tests.conftest import get_csrf_token
+from porchlight.models import MagicLink, User
async def test_register_invalid_token_returns_error_page(client: AsyncClient) -> None:
@@ -15,48 +13,42 @@ async def test_register_invalid_token_returns_error_page(client: AsyncClient) ->
async def test_register_expired_token_returns_error_page(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
- # An already-expired link (negative TTL), stored hashed like the real service.
- expired_service = MagicLinkService(repo=app.state.magic_link_repo, ttl=-3600)
- link = await expired_service.create(username="newuser")
+ repo = app.state.magic_link_repo
+ await repo.create(
+ MagicLink(
+ token="expired",
+ username="newuser",
+ expires_at=datetime.now(UTC) - timedelta(hours=1),
+ )
+ )
- res = await client.get(f"/register/{link.token}", follow_redirects=False)
+ res = await client.get("/register/expired", follow_redirects=False)
assert res.status_code == 400
assert "Invalid or expired" in res.text
-async def test_register_get_does_not_consume_token(client: AsyncClient) -> None:
- """GET is side-effect free: it shows a confirmation form and must not
- consume the token or create a session."""
+async def test_register_valid_token_creates_user_and_redirects(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
- magic_link_service = app.state.magic_link_service
-
- link = await magic_link_service.create(username="newuser")
- res = await client.get(f"/register/{link.token}", follow_redirects=False)
- assert res.status_code == 200
- assert f'action="/register/{link.token}"' in res.text
- # Token still valid (not consumed by the GET).
- assert await magic_link_service.validate(link.token) is not None
-
-
-async def test_register_post_creates_user_and_redirects(client: AsyncClient) -> None:
- app = client._transport.app # type: ignore[union-attr]
- magic_link_service = app.state.magic_link_service
+ magic_link_repo = app.state.magic_link_repo
user_repo = app.state.user_repo
- link = await magic_link_service.create(username="newuser")
-
- csrf = await get_csrf_token(client)
- res = await client.post(
- f"/register/{link.token}",
- headers={"X-CSRF-Token": csrf},
- follow_redirects=False,
+ await magic_link_repo.create(
+ MagicLink(
+ token="t1",
+ username="newuser",
+ expires_at=datetime.now(UTC) + timedelta(hours=1),
+ )
)
+
+ res = await client.get("/register/t1", follow_redirects=False)
assert res.status_code in (302, 303)
assert "/manage/credentials" in res.headers["location"]
assert "setup=1" in res.headers["location"]
- # Token should be consumed (no longer valid)
- assert await magic_link_service.validate(link.token) is None
+ # Token should be marked used
+ link = await magic_link_repo.get_by_token("t1")
+ assert link is not None
+ assert link.used is True
# User should exist
user = await user_repo.get_by_username("newuser")
@@ -64,81 +56,53 @@ async def test_register_post_creates_user_and_redirects(client: AsyncClient) ->
assert "users" in user.groups
-async def test_register_post_requires_csrf(client: AsyncClient) -> None:
- app = client._transport.app # type: ignore[union-attr]
- magic_link_service = app.state.magic_link_service
- link = await magic_link_service.create(username="newuser")
-
- res = await client.post(f"/register/{link.token}", follow_redirects=False)
- assert res.status_code == 403
- # Token must not have been consumed by the rejected request.
- assert await magic_link_service.validate(link.token) is not None
-
-
async def test_register_used_token_returns_error(client: AsyncClient) -> None:
app = client._transport.app # type: ignore[union-attr]
- magic_link_service = app.state.magic_link_service
- link = await magic_link_service.create(username="newuser")
- await magic_link_service.mark_used(link.token)
+ repo = app.state.magic_link_repo
+ await repo.create(
+ MagicLink(
+ token="used",
+ username="newuser",
+ expires_at=datetime.now(UTC) + timedelta(hours=1),
+ used=True,
+ )
+ )
- res = await client.get(f"/register/{link.token}", follow_redirects=False)
+ res = await client.get("/register/used", follow_redirects=False)
assert res.status_code == 400
async def test_register_existing_user_logs_in_and_redirects(client: AsyncClient) -> None:
- """When initial-admin creates a user (no credentials yet), the invite link
- should let them set up their account."""
+ """When initial-admin creates a user, the invite link should log them in."""
app = client._transport.app # type: ignore[union-attr]
- magic_link_service = app.state.magic_link_service
+ magic_link_repo = app.state.magic_link_repo
user_repo = app.state.user_repo
- # Pre-create the user (as initial-admin would), with no credentials.
+ # Pre-create the user (as initial-admin would)
user = User(userid="lusab-bansen", username="admin", groups=["admin", "users"])
await user_repo.create(user)
- link = await magic_link_service.create(username="admin")
-
- csrf = await get_csrf_token(client)
- res = await client.post(
- f"/register/{link.token}",
- headers={"X-CSRF-Token": csrf},
- follow_redirects=False,
+ # Create invite for the same username
+ await magic_link_repo.create(
+ MagicLink(
+ token="admin-setup",
+ username="admin",
+ expires_at=datetime.now(UTC) + timedelta(hours=1),
+ )
)
+
+ res = await client.get("/register/admin-setup", follow_redirects=False)
assert res.status_code in (302, 303)
assert "/manage/credentials" in res.headers["location"]
assert "setup=1" in res.headers["location"]
- # Token should be consumed
- assert await magic_link_service.validate(link.token) is None
+ # Token should be marked used
+ link = await magic_link_repo.get_by_token("admin-setup")
+ assert link is not None
+ assert link.used is True
# Original user should still exist with original groups
existing = await user_repo.get_by_username("admin")
assert existing is not None
assert existing.userid == "lusab-bansen"
assert "admin" in existing.groups
-
-
-async def test_register_rejects_user_that_already_has_credentials(client: AsyncClient) -> None:
- """An invite/re-invite link must not act as a passwordless login for an
- account that already has credentials. Recovery is a separate flow."""
- app = client._transport.app # type: ignore[union-attr]
- magic_link_service = app.state.magic_link_service
- user_repo = app.state.user_repo
- cred_repo = app.state.credential_repo
-
- user = User(userid="lusab-hascreds", username="hascreds", groups=["users"])
- await user_repo.create(user)
- svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
- await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("existing-pass")))
-
- link = await magic_link_service.create(username="hascreds")
- csrf = await get_csrf_token(client)
- res = await client.post(
- f"/register/{link.token}",
- headers={"X-CSRF-Token": csrf},
- follow_redirects=False,
- )
-
- # No passwordless login: rejected, not redirected to setup.
- assert res.status_code == 400
- assert "setup=1" not in res.headers.get("location", "")
diff --git a/tests/test_authn_active.py b/tests/test_authn_active.py
index 6126343..3564bb5 100644
--- a/tests/test_authn_active.py
+++ b/tests/test_authn_active.py
@@ -61,11 +61,6 @@ async def test_inactive_user_cannot_register_magic_link(client: AsyncClient) ->
link = await magic_link_service.create(username="deactivated", created_by="admin", note="test")
- csrf = await get_csrf_token(client)
- response = await client.post(
- f"/register/{link.token}",
- headers={"X-CSRF-Token": csrf},
- follow_redirects=False,
- )
+ response = await client.get(f"/register/{link.token}", follow_redirects=False)
assert response.status_code == 400 or "deactivated" in response.text.lower() or "Invalid" in response.text
diff --git a/tests/test_invite/test_service.py b/tests/test_invite/test_service.py
index 166e7cd..6464fce 100644
--- a/tests/test_invite/test_service.py
+++ b/tests/test_invite/test_service.py
@@ -42,18 +42,6 @@ async def test_create_returns_magic_link(service: MagicLinkService) -> None:
assert link.expires_at > datetime.now(UTC)
-async def test_token_stored_hashed_not_plaintext(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None:
- # The raw token handed to the user must never be the value persisted in
- # the store; a DB read must not yield a usable token.
- link = await service.create(username="alice")
- raw = link.token
- assert await repo.get_by_token(raw) is None
- # ...but the raw token must still validate through the service.
- validated = await service.validate(raw)
- assert validated is not None
- assert validated.username == "alice"
-
-
async def test_create_generates_unique_tokens(service: MagicLinkService) -> None:
link1 = await service.create(username="alice")
link2 = await service.create(username="bob")
@@ -102,22 +90,6 @@ async def test_validate_expired_token(service: MagicLinkService, repo: SQLiteMag
assert result is None
-async def test_consume_validates_and_is_single_use(service: MagicLinkService) -> None:
- link = await service.create(username="alice")
- consumed = await service.consume(link.token)
- assert consumed is not None
- assert consumed.username == "alice"
- assert consumed.token == link.token # raw token re-attached
- # A second consume of the same token fails.
- assert await service.consume(link.token) is None
-
-
-async def test_consume_expired_returns_none(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None:
- expired_service = MagicLinkService(repo=repo, ttl=-1)
- link = await expired_service.create(username="alice")
- assert await service.consume(link.token) is None
-
-
async def test_mark_used_returns_true(service: MagicLinkService) -> None:
link = await service.create(username="alice")
result = await service.mark_used(link.token)
diff --git a/tests/test_oidc/test_authorization.py b/tests/test_oidc/test_authorization.py
index de3c70b..5994134 100644
--- a/tests/test_oidc/test_authorization.py
+++ b/tests/test_oidc/test_authorization.py
@@ -2,16 +2,6 @@ import secrets
from httpx import AsyncClient
-from porchlight.oidc.endpoints import _error_page
-
-
-def test_error_page_escapes_html() -> None:
- # OIDC error pages must not interpolate request-derived text as raw HTML.
- resp = _error_page("")
- body = resp.body.decode()
- assert "