feat: add rate limiting middleware for authentication endpoints
Add slowapi-based rate limiting: 5/min on password login, 10/min on WebAuthn login. Includes shared rate limiter reset fixture for tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
23ca6272a2
commit
d4acb46cf5
5 changed files with 52 additions and 0 deletions
|
|
@ -8,8 +8,10 @@ from urllib.parse import urlparse
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from fastapi.staticfiles import StaticFiles
|
from fastapi.staticfiles import StaticFiles
|
||||||
from fastapi.templating import Jinja2Templates
|
from fastapi.templating import Jinja2Templates
|
||||||
|
from slowapi.errors import RateLimitExceeded
|
||||||
from starlette.middleware.sessions import SessionMiddleware
|
from starlette.middleware.sessions import SessionMiddleware
|
||||||
from starlette.requests import Request
|
from starlette.requests import Request
|
||||||
|
from starlette.responses import HTMLResponse as StarletteHTMLResponse
|
||||||
|
|
||||||
from porchlight.admin.routes import router as admin_router
|
from porchlight.admin.routes import router as admin_router
|
||||||
from porchlight.authn.password import PasswordService
|
from porchlight.authn.password import PasswordService
|
||||||
|
|
@ -18,6 +20,7 @@ from porchlight.authn.webauthn import WebAuthnService
|
||||||
from porchlight.config import Settings, StorageBackend
|
from porchlight.config import Settings, StorageBackend
|
||||||
from porchlight.csrf import CSRFMiddleware, generate_csrf_token
|
from porchlight.csrf import CSRFMiddleware, generate_csrf_token
|
||||||
from porchlight.invite.service import MagicLinkService
|
from porchlight.invite.service import MagicLinkService
|
||||||
|
from porchlight.rate_limit import limiter
|
||||||
from porchlight.manage.routes import router as manage_router
|
from porchlight.manage.routes import router as manage_router
|
||||||
from porchlight.oidc.endpoints import router as oidc_router
|
from porchlight.oidc.endpoints import router as oidc_router
|
||||||
from porchlight.oidc.provider import create_oidc_server
|
from porchlight.oidc.provider import create_oidc_server
|
||||||
|
|
@ -123,6 +126,16 @@ def create_app(settings: Settings | None = None) -> FastAPI:
|
||||||
https_only=settings.session_https_only,
|
https_only=settings.session_https_only,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Rate limiting
|
||||||
|
app.state.limiter = limiter
|
||||||
|
|
||||||
|
@app.exception_handler(RateLimitExceeded)
|
||||||
|
async def rate_limit_handler(request: Request, exc: RateLimitExceeded) -> StarletteHTMLResponse:
|
||||||
|
return StarletteHTMLResponse(
|
||||||
|
'<div role="alert">Too many attempts. Please try again later.</div>',
|
||||||
|
status_code=429,
|
||||||
|
)
|
||||||
|
|
||||||
# Templates
|
# Templates
|
||||||
templates = Jinja2Templates(directory=str(PACKAGE_DIR / "templates"))
|
templates = Jinja2Templates(directory=str(PACKAGE_DIR / "templates"))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
|
||||||
from fido2.webauthn import AttestedCredentialData, AuthenticationResponse
|
from fido2.webauthn import AttestedCredentialData, AuthenticationResponse
|
||||||
|
|
||||||
from porchlight.models import User
|
from porchlight.models import User
|
||||||
|
from porchlight.rate_limit import limiter
|
||||||
from porchlight.userid import generate_unique_userid
|
from porchlight.userid import generate_unique_userid
|
||||||
|
|
||||||
router = APIRouter(tags=["authn"])
|
router = APIRouter(tags=["authn"])
|
||||||
|
|
@ -28,6 +29,7 @@ async def login_page(request: Request) -> HTMLResponse:
|
||||||
|
|
||||||
|
|
||||||
@router.post("/login/password", response_class=HTMLResponse)
|
@router.post("/login/password", response_class=HTMLResponse)
|
||||||
|
@limiter.limit("5/minute")
|
||||||
async def login_password(
|
async def login_password(
|
||||||
request: Request,
|
request: Request,
|
||||||
username: str = Form(),
|
username: str = Form(),
|
||||||
|
|
@ -107,6 +109,7 @@ async def login_webauthn_begin(request: Request) -> Response:
|
||||||
|
|
||||||
|
|
||||||
@router.post("/login/webauthn/complete")
|
@router.post("/login/webauthn/complete")
|
||||||
|
@limiter.limit("10/minute")
|
||||||
async def login_webauthn_complete(request: Request) -> Response:
|
async def login_webauthn_complete(request: Request) -> Response:
|
||||||
webauthn_service = request.app.state.webauthn_service
|
webauthn_service = request.app.state.webauthn_service
|
||||||
user_repo = request.app.state.user_repo
|
user_repo = request.app.state.user_repo
|
||||||
|
|
|
||||||
4
src/porchlight/rate_limit.py
Normal file
4
src/porchlight/rate_limit.py
Normal file
|
|
@ -0,0 +1,4 @@
|
||||||
|
from slowapi import Limiter
|
||||||
|
from slowapi.util import get_remote_address
|
||||||
|
|
||||||
|
limiter = Limiter(key_func=get_remote_address)
|
||||||
|
|
@ -6,6 +6,7 @@ from httpx import ASGITransport, AsyncClient
|
||||||
|
|
||||||
from porchlight.app import create_app
|
from porchlight.app import create_app
|
||||||
from porchlight.config import Settings
|
from porchlight.config import Settings
|
||||||
|
from porchlight.rate_limit import limiter
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
|
@ -21,6 +22,12 @@ async def client(settings: Settings) -> AsyncIterator[AsyncClient]:
|
||||||
yield ac
|
yield ac
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _reset_rate_limiter() -> None:
|
||||||
|
"""Reset the rate limiter storage before each test."""
|
||||||
|
limiter.reset()
|
||||||
|
|
||||||
|
|
||||||
async def get_csrf_token(client: AsyncClient) -> str:
|
async def get_csrf_token(client: AsyncClient) -> str:
|
||||||
"""Get a CSRF token by visiting the login page.
|
"""Get a CSRF token by visiting the login page.
|
||||||
|
|
||||||
|
|
|
||||||
25
tests/test_rate_limit.py
Normal file
25
tests/test_rate_limit.py
Normal file
|
|
@ -0,0 +1,25 @@
|
||||||
|
import pytest
|
||||||
|
from httpx import AsyncClient
|
||||||
|
|
||||||
|
from tests.conftest import get_csrf_token
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_password_login_rate_limited(client: AsyncClient) -> None:
|
||||||
|
"""After 5 failed attempts, the 6th should be rate-limited."""
|
||||||
|
token = await get_csrf_token(client)
|
||||||
|
|
||||||
|
for _ in range(5):
|
||||||
|
await client.post(
|
||||||
|
"/login/password",
|
||||||
|
data={"username": "nobody", "password": "wrong"},
|
||||||
|
headers={"X-CSRF-Token": token},
|
||||||
|
)
|
||||||
|
|
||||||
|
response = await client.post(
|
||||||
|
"/login/password",
|
||||||
|
data={"username": "nobody", "password": "wrong"},
|
||||||
|
headers={"X-CSRF-Token": token},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 429
|
||||||
Loading…
Add table
Add a link
Reference in a new issue