diff --git a/src/porchlight/app.py b/src/porchlight/app.py
index acf3a70..82fc285 100644
--- a/src/porchlight/app.py
+++ b/src/porchlight/app.py
@@ -8,8 +8,10 @@ from urllib.parse import urlparse
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
+from slowapi.errors import RateLimitExceeded
from starlette.middleware.sessions import SessionMiddleware
from starlette.requests import Request
+from starlette.responses import HTMLResponse as StarletteHTMLResponse
from porchlight.admin.routes import router as admin_router
from porchlight.authn.password import PasswordService
@@ -18,6 +20,7 @@ from porchlight.authn.webauthn import WebAuthnService
from porchlight.config import Settings, StorageBackend
from porchlight.csrf import CSRFMiddleware, generate_csrf_token
from porchlight.invite.service import MagicLinkService
+from porchlight.rate_limit import limiter
from porchlight.manage.routes import router as manage_router
from porchlight.oidc.endpoints import router as oidc_router
from porchlight.oidc.provider import create_oidc_server
@@ -123,6 +126,16 @@ def create_app(settings: Settings | None = None) -> FastAPI:
https_only=settings.session_https_only,
)
+ # Rate limiting
+ app.state.limiter = limiter
+
+ @app.exception_handler(RateLimitExceeded)
+ async def rate_limit_handler(request: Request, exc: RateLimitExceeded) -> StarletteHTMLResponse:
+ return StarletteHTMLResponse(
+ '
Too many attempts. Please try again later.
',
+ status_code=429,
+ )
+
# Templates
templates = Jinja2Templates(directory=str(PACKAGE_DIR / "templates"))
diff --git a/src/porchlight/authn/routes.py b/src/porchlight/authn/routes.py
index a7ae50d..fa580e3 100644
--- a/src/porchlight/authn/routes.py
+++ b/src/porchlight/authn/routes.py
@@ -5,6 +5,7 @@ from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fido2.webauthn import AttestedCredentialData, AuthenticationResponse
from porchlight.models import User
+from porchlight.rate_limit import limiter
from porchlight.userid import generate_unique_userid
router = APIRouter(tags=["authn"])
@@ -28,6 +29,7 @@ async def login_page(request: Request) -> HTMLResponse:
@router.post("/login/password", response_class=HTMLResponse)
+@limiter.limit("5/minute")
async def login_password(
request: Request,
username: str = Form(),
@@ -107,6 +109,7 @@ async def login_webauthn_begin(request: Request) -> Response:
@router.post("/login/webauthn/complete")
+@limiter.limit("10/minute")
async def login_webauthn_complete(request: Request) -> Response:
webauthn_service = request.app.state.webauthn_service
user_repo = request.app.state.user_repo
diff --git a/src/porchlight/rate_limit.py b/src/porchlight/rate_limit.py
new file mode 100644
index 0000000..38404a8
--- /dev/null
+++ b/src/porchlight/rate_limit.py
@@ -0,0 +1,4 @@
+from slowapi import Limiter
+from slowapi.util import get_remote_address
+
+limiter = Limiter(key_func=get_remote_address)
diff --git a/tests/conftest.py b/tests/conftest.py
index 3506c70..cc9eee3 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -6,6 +6,7 @@ from httpx import ASGITransport, AsyncClient
from porchlight.app import create_app
from porchlight.config import Settings
+from porchlight.rate_limit import limiter
@pytest.fixture
@@ -21,6 +22,12 @@ async def client(settings: Settings) -> AsyncIterator[AsyncClient]:
yield ac
+@pytest.fixture(autouse=True)
+def _reset_rate_limiter() -> None:
+ """Reset the rate limiter storage before each test."""
+ limiter.reset()
+
+
async def get_csrf_token(client: AsyncClient) -> str:
"""Get a CSRF token by visiting the login page.
diff --git a/tests/test_rate_limit.py b/tests/test_rate_limit.py
new file mode 100644
index 0000000..b84ea14
--- /dev/null
+++ b/tests/test_rate_limit.py
@@ -0,0 +1,25 @@
+import pytest
+from httpx import AsyncClient
+
+from tests.conftest import get_csrf_token
+
+
+@pytest.mark.asyncio
+async def test_password_login_rate_limited(client: AsyncClient) -> None:
+ """After 5 failed attempts, the 6th should be rate-limited."""
+ token = await get_csrf_token(client)
+
+ for _ in range(5):
+ await client.post(
+ "/login/password",
+ data={"username": "nobody", "password": "wrong"},
+ headers={"X-CSRF-Token": token},
+ )
+
+ response = await client.post(
+ "/login/password",
+ data={"username": "nobody", "password": "wrong"},
+ headers={"X-CSRF-Token": token},
+ )
+
+ assert response.status_code == 429