feat: rewrite WebAuthn login routes for usernameless discoverable credential flow
This commit is contained in:
parent
2ffe968342
commit
32567b5484
3 changed files with 61 additions and 81 deletions
|
|
@ -1,11 +1,8 @@
|
|||
from base64 import urlsafe_b64decode
|
||||
|
||||
from fastapi import APIRouter, Form, Request, Response
|
||||
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
|
||||
from fido2.webauthn import (
|
||||
AttestedCredentialData,
|
||||
AuthenticationResponse,
|
||||
PublicKeyCredentialDescriptor,
|
||||
PublicKeyCredentialType,
|
||||
)
|
||||
from fido2.webauthn import AttestedCredentialData, AuthenticationResponse
|
||||
|
||||
from porchlight.models import User
|
||||
from porchlight.userid import generate_unique_userid
|
||||
|
|
@ -90,34 +87,13 @@ async def register_magic_link(request: Request, token: str) -> Response:
|
|||
return RedirectResponse("/manage/credentials?setup=1", status_code=303)
|
||||
|
||||
|
||||
@router.post("/login/webauthn/begin")
|
||||
async def login_webauthn_begin(
|
||||
request: Request,
|
||||
username: str = Form(),
|
||||
) -> Response:
|
||||
user_repo = request.app.state.user_repo
|
||||
cred_repo = request.app.state.credential_repo
|
||||
@router.get("/login/webauthn/begin")
|
||||
async def login_webauthn_begin(request: Request) -> Response:
|
||||
webauthn_service = request.app.state.webauthn_service
|
||||
|
||||
error_html = '<div role="alert">Invalid username or password</div>'
|
||||
|
||||
user = await user_repo.get_by_username(username)
|
||||
if user is None:
|
||||
return HTMLResponse(error_html)
|
||||
|
||||
webauthn_creds = await cred_repo.get_webauthn_by_user(user.userid)
|
||||
if not webauthn_creds:
|
||||
return HTMLResponse(error_html)
|
||||
|
||||
descriptors = [
|
||||
PublicKeyCredentialDescriptor(type=PublicKeyCredentialType.PUBLIC_KEY, id=cred.credential_id)
|
||||
for cred in webauthn_creds
|
||||
]
|
||||
|
||||
options, state = webauthn_service.begin_authentication(credentials=descriptors)
|
||||
options, state = webauthn_service.begin_authentication()
|
||||
|
||||
request.session["webauthn_login_state"] = state
|
||||
request.session["webauthn_login_userid"] = user.userid
|
||||
return JSONResponse(options)
|
||||
|
||||
|
||||
|
|
@ -128,22 +104,32 @@ async def login_webauthn_complete(request: Request) -> Response:
|
|||
cred_repo = request.app.state.credential_repo
|
||||
|
||||
state = request.session.pop("webauthn_login_state", None)
|
||||
userid = request.session.pop("webauthn_login_userid", None)
|
||||
|
||||
if state is None or userid is None:
|
||||
return HTMLResponse('<div role="alert">Authentication session expired</div>', status_code=400)
|
||||
|
||||
webauthn_creds = await cred_repo.get_webauthn_by_user(userid)
|
||||
credentials = [AttestedCredentialData(cred.public_key) for cred in webauthn_creds]
|
||||
if state is None:
|
||||
return JSONResponse({"error": "Authentication session expired"}, status_code=400)
|
||||
|
||||
body = await request.json()
|
||||
|
||||
# Extract user_handle from the assertion to identify the user
|
||||
user_handle_b64 = body.get("response", {}).get("userHandle")
|
||||
if not user_handle_b64:
|
||||
return JSONResponse({"error": "Missing user handle"}, status_code=400)
|
||||
|
||||
# Decode base64url user_handle to get userid string
|
||||
padded = user_handle_b64 + "=" * (-len(user_handle_b64) % 4)
|
||||
userid = urlsafe_b64decode(padded).decode()
|
||||
|
||||
webauthn_creds = await cred_repo.get_webauthn_by_user(userid)
|
||||
if not webauthn_creds:
|
||||
return JSONResponse({"error": "Authentication failed"}, status_code=400)
|
||||
|
||||
credentials = [AttestedCredentialData(cred.public_key) for cred in webauthn_creds]
|
||||
|
||||
try:
|
||||
webauthn_service.complete_authentication(state, credentials, body)
|
||||
except Exception:
|
||||
return HTMLResponse('<div role="alert">Authentication failed</div>')
|
||||
return JSONResponse({"error": "Authentication failed"}, status_code=400)
|
||||
|
||||
# Extract sign count from the response and update
|
||||
# Update sign count
|
||||
auth_response = AuthenticationResponse.from_dict(body)
|
||||
new_counter = auth_response.response.authenticator_data.counter
|
||||
matched_credential_id = auth_response.raw_id
|
||||
|
|
@ -155,11 +141,9 @@ async def login_webauthn_complete(request: Request) -> Response:
|
|||
|
||||
user = await user_repo.get_by_userid(userid)
|
||||
if user is None:
|
||||
return HTMLResponse('<div role="alert">User not found</div>', status_code=400)
|
||||
return JSONResponse({"error": "User not found"}, status_code=400)
|
||||
|
||||
request.session["userid"] = user.userid
|
||||
request.session["username"] = user.username
|
||||
|
||||
response = Response()
|
||||
response.headers["HX-Redirect"] = _login_redirect_target(request)
|
||||
return response
|
||||
return JSONResponse({"redirect": _login_redirect_target(request)})
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue