porchlight/src/porchlight/authn/routes.py
Johan Lundberg 01e3382aaf
fix: resolve all ruff lint errors and type checker warnings
- Use Annotated[str, Form()] for FastAPI dependencies (FAST002)
- Add missing type annotations across src/ and tests/ (ANN001/003/201/202)
- Reduce function arguments via request.form() reads (PLR0913)
- Combine return paths to reduce return statements (PLR0911)
- Use anyio.Path for async-safe filesystem operations (ASYNC240)
- Extract constants, helpers, and dict comprehensions for clarity
- Move inline imports to top-level (PLC0415)
- Use raw strings for regex match patterns (RUF043)
- Fix redundant get_session_user call in delete_user (not-iterable)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:48:46 +02:00

162 lines
5.6 KiB
Python

from base64 import urlsafe_b64decode
from typing import Annotated
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fido2.webauthn import AttestedCredentialData, AuthenticationResponse
from porchlight.models import User
from porchlight.rate_limit import limiter
from porchlight.userid import generate_unique_userid
router = APIRouter(tags=["authn"])
def _login_redirect_target(request: Request) -> str:
"""Determine where to redirect after successful login.
If there's a pending OIDC authorization request, redirect to complete it.
Otherwise, redirect to credential management.
"""
if "oidc_auth_request" in request.session:
return "/authorization/complete"
return "/manage/credentials"
@router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request) -> HTMLResponse:
templates = request.app.state.templates
return templates.TemplateResponse(request, "login.html")
@router.post("/login/password", response_class=HTMLResponse)
@limiter.limit("5/minute")
async def login_password(
request: Request,
username: Annotated[str, Form()],
password: Annotated[str, Form()],
) -> Response:
user_repo = request.app.state.user_repo
cred_repo = request.app.state.credential_repo
password_service = request.app.state.password_service
error_html = '<div role="alert">Invalid username or password</div>'
user = await user_repo.get_by_username(username)
if user is None:
return HTMLResponse(error_html)
credential = await cred_repo.get_password_by_user(user.userid)
if credential is None:
return HTMLResponse(error_html)
if not password_service.verify(credential.password_hash, password):
return HTMLResponse(error_html)
if not user.active:
return HTMLResponse(error_html)
request.session["userid"] = user.userid
request.session["username"] = user.username
response = Response()
response.headers["HX-Redirect"] = _login_redirect_target(request)
return response
@router.post("/logout")
async def logout(request: Request) -> Response:
request.session.clear()
response = Response()
response.headers["HX-Redirect"] = "/login"
return response
@router.get("/register/{token}")
async def register_magic_link(request: Request, token: str) -> Response:
magic_link_service = request.app.state.magic_link_service
user_repo = request.app.state.user_repo
link = await magic_link_service.validate(token)
if link is None:
return HTMLResponse("<p>Invalid or expired registration link.</p>", status_code=400)
existing_user = await user_repo.get_by_username(link.username)
if existing_user is not None:
if not existing_user.active:
return HTMLResponse("<p>This account has been deactivated.</p>", status_code=400)
user = existing_user
else:
userid = await generate_unique_userid(user_repo)
user = User(userid=userid, username=link.username, groups=["users"])
await user_repo.create(user)
await magic_link_service.mark_used(token)
request.session["userid"] = user.userid
request.session["username"] = user.username
return RedirectResponse("/manage/credentials?setup=1", status_code=303)
@router.get("/login/webauthn/begin")
async def login_webauthn_begin(request: Request) -> Response:
webauthn_service = request.app.state.webauthn_service
options, state = webauthn_service.begin_authentication()
request.session["webauthn_login_state"] = state
return JSONResponse(options)
@router.post("/login/webauthn/complete")
@limiter.limit("10/minute")
async def login_webauthn_complete(request: Request) -> Response:
webauthn_service = request.app.state.webauthn_service
user_repo = request.app.state.user_repo
cred_repo = request.app.state.credential_repo
state = request.session.pop("webauthn_login_state", None)
if state is None:
return JSONResponse({"error": "Authentication session expired"}, status_code=400)
body = await request.json()
# Extract user_handle from the assertion to identify the user
user_handle_b64 = body.get("response", {}).get("userHandle")
if not user_handle_b64:
return JSONResponse({"error": "Missing user handle"}, status_code=400)
# Decode base64url user_handle to get userid string
padded = user_handle_b64 + "=" * (-len(user_handle_b64) % 4)
userid = urlsafe_b64decode(padded).decode()
webauthn_creds = await cred_repo.get_webauthn_by_user(userid)
if not webauthn_creds:
return JSONResponse({"error": "Authentication failed"}, status_code=400)
credentials = [AttestedCredentialData(cred.public_key) for cred in webauthn_creds]
try:
webauthn_service.complete_authentication(state, credentials, body)
except Exception:
return JSONResponse({"error": "Authentication failed"}, status_code=400)
# Update sign count
auth_response = AuthenticationResponse.from_dict(body)
new_counter = auth_response.response.authenticator_data.counter
matched_credential_id = auth_response.raw_id
stored = await cred_repo.get_webauthn_by_credential_id(matched_credential_id)
if stored is not None:
stored.sign_count = new_counter
await cred_repo.update_webauthn(stored)
user = await user_repo.get_by_userid(userid)
if user is None or not user.active:
return JSONResponse({"error": "Authentication failed"}, status_code=400)
request.session["userid"] = user.userid
request.session["username"] = user.username
return JSONResponse({"redirect": _login_redirect_target(request)})