fix(security): reset session on login to prevent fixation
Both password and WebAuthn login wrote the authenticated identity onto the existing pre-auth session, so a fixed/planted session could be elevated to an authenticated one. Add _establish_authenticated_session() which clears the session (preserving only a pending OIDC authorization request) before setting the identity, used by both login paths. Tests that reused a pre-login CSRF token now re-fetch it after login, matching real client behavior. Refs: porchlight-vxr Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
7c4dbf2cd9
commit
407db57279
5 changed files with 65 additions and 6 deletions
|
|
@ -23,6 +23,21 @@ def _login_redirect_target(request: Request) -> str:
|
||||||
return "/manage/credentials"
|
return "/manage/credentials"
|
||||||
|
|
||||||
|
|
||||||
|
def _establish_authenticated_session(request: Request, user: User) -> None:
|
||||||
|
"""Reset the session before recording the authenticated identity.
|
||||||
|
|
||||||
|
Clearing first defeats session fixation: any values an attacker planted in
|
||||||
|
the pre-auth session are dropped and the session cookie is reissued. A
|
||||||
|
pending OIDC authorization request is the only pre-auth state worth keeping.
|
||||||
|
"""
|
||||||
|
pending_oidc = request.session.get("oidc_auth_request")
|
||||||
|
request.session.clear()
|
||||||
|
if pending_oidc is not None:
|
||||||
|
request.session["oidc_auth_request"] = pending_oidc
|
||||||
|
request.session["userid"] = user.userid
|
||||||
|
request.session["username"] = user.username
|
||||||
|
|
||||||
|
|
||||||
@router.get("/login", response_class=HTMLResponse)
|
@router.get("/login", response_class=HTMLResponse)
|
||||||
async def login_page(request: Request) -> HTMLResponse:
|
async def login_page(request: Request) -> HTMLResponse:
|
||||||
templates = request.app.state.templates
|
templates = request.app.state.templates
|
||||||
|
|
@ -56,8 +71,7 @@ async def login_password(
|
||||||
if not user.active:
|
if not user.active:
|
||||||
return HTMLResponse(error_html)
|
return HTMLResponse(error_html)
|
||||||
|
|
||||||
request.session["userid"] = user.userid
|
_establish_authenticated_session(request, user)
|
||||||
request.session["username"] = user.username
|
|
||||||
|
|
||||||
response = Response()
|
response = Response()
|
||||||
response.headers["HX-Redirect"] = _login_redirect_target(request)
|
response.headers["HX-Redirect"] = _login_redirect_target(request)
|
||||||
|
|
@ -167,7 +181,6 @@ async def login_webauthn_complete(request: Request) -> Response:
|
||||||
if user is None or not user.active:
|
if user is None or not user.active:
|
||||||
return JSONResponse({"error": "Authentication failed"}, status_code=400)
|
return JSONResponse({"error": "Authentication failed"}, status_code=400)
|
||||||
|
|
||||||
request.session["userid"] = user.userid
|
_establish_authenticated_session(request, user)
|
||||||
request.session["username"] = user.username
|
|
||||||
|
|
||||||
return JSONResponse({"redirect": _login_redirect_target(request)})
|
return JSONResponse({"redirect": _login_redirect_target(request)})
|
||||||
|
|
|
||||||
|
|
@ -41,6 +41,8 @@ async def _setup_admin_and_target(client: AsyncClient) -> tuple[str, str]:
|
||||||
data={"username": "admin_g", "password": "AdminPass123!"},
|
data={"username": "admin_g", "password": "AdminPass123!"},
|
||||||
headers={"HX-Request": "true", "X-CSRF-Token": token},
|
headers={"HX-Request": "true", "X-CSRF-Token": token},
|
||||||
)
|
)
|
||||||
|
# Login resets the session (fixation defense); fetch a fresh CSRF token.
|
||||||
|
token = await get_csrf_token(client)
|
||||||
return token, target.userid
|
return token, target.userid
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,8 @@ async def _login_admin(client: AsyncClient) -> str:
|
||||||
data={"username": "admin", "password": "AdminPass123!"},
|
data={"username": "admin", "password": "AdminPass123!"},
|
||||||
headers={"HX-Request": "true", "X-CSRF-Token": token},
|
headers={"HX-Request": "true", "X-CSRF-Token": token},
|
||||||
)
|
)
|
||||||
return token
|
# Login resets the session (fixation defense); fetch a fresh CSRF token.
|
||||||
|
return await get_csrf_token(client)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|
|
||||||
42
tests/test_auth_routes/test_login_session.py
Normal file
42
tests/test_auth_routes/test_login_session.py
Normal file
|
|
@ -0,0 +1,42 @@
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
|
||||||
|
from httpx import AsyncClient
|
||||||
|
|
||||||
|
from porchlight.authn.password import PasswordHasher, PasswordService
|
||||||
|
from porchlight.models import PasswordCredential, User
|
||||||
|
from tests.conftest import get_csrf_token
|
||||||
|
|
||||||
|
|
||||||
|
async def _create_active_user(client: AsyncClient) -> None:
|
||||||
|
app = client._transport.app # type: ignore[union-attr]
|
||||||
|
user = User(
|
||||||
|
userid="login-01",
|
||||||
|
username="loginuser",
|
||||||
|
created_at=datetime.now(UTC),
|
||||||
|
updated_at=datetime.now(UTC),
|
||||||
|
)
|
||||||
|
await app.state.user_repo.create(user)
|
||||||
|
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||||
|
await app.state.credential_repo.create_password(
|
||||||
|
PasswordCredential(user_id=user.userid, password_hash=svc.hash("password123!Secure"))
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_login_resets_session_to_prevent_fixation(client: AsyncClient) -> None:
|
||||||
|
"""Logging in must reset the pre-auth session so an attacker-fixed session
|
||||||
|
cannot become authenticated. We observe this via the CSRF token (stored in
|
||||||
|
the session) changing across login."""
|
||||||
|
await _create_active_user(client)
|
||||||
|
|
||||||
|
pre_login_token = await get_csrf_token(client)
|
||||||
|
|
||||||
|
res = await client.post(
|
||||||
|
"/login/password",
|
||||||
|
data={"username": "loginuser", "password": "password123!Secure"},
|
||||||
|
headers={"HX-Request": "true", "X-CSRF-Token": pre_login_token},
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
assert res.headers.get("HX-Redirect") # logged in
|
||||||
|
|
||||||
|
post_login_token = await get_csrf_token(client)
|
||||||
|
assert post_login_token != pre_login_token
|
||||||
|
|
@ -31,7 +31,8 @@ async def _login_user_with_password(client: AsyncClient) -> str:
|
||||||
data={"username": "pwuser", "password": "OldPass123!ok"},
|
data={"username": "pwuser", "password": "OldPass123!ok"},
|
||||||
headers={"HX-Request": "true", "X-CSRF-Token": token},
|
headers={"HX-Request": "true", "X-CSRF-Token": token},
|
||||||
)
|
)
|
||||||
return token
|
# Login resets the session (fixation defense); fetch a fresh CSRF token.
|
||||||
|
return await get_csrf_token(client)
|
||||||
|
|
||||||
|
|
||||||
async def _login_user_without_password(client: AsyncClient) -> str:
|
async def _login_user_without_password(client: AsyncClient) -> str:
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue