fix(security): make rate-limit client IP proxy-aware
The limiter keyed solely on the direct peer address, so behind a reverse proxy every request shares the proxy's IP (collapsing all users into one bucket). Blindly trusting X-Forwarded-For would instead let clients spoof it. Add a trusted_proxy_count setting (default 0). When > 0, the client IP is read from X-Forwarded-For counting that many hops from the right (ProxyFix-style); when 0, the header is ignored and the peer address is used. Refs: porchlight-8qj Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
aedb451128
commit
efb265a68b
3 changed files with 65 additions and 1 deletions
|
|
@ -54,6 +54,10 @@ class Settings(BaseSettings):
|
|||
|
||||
# Rate limiting (disable for e2e/load tests that authenticate repeatedly)
|
||||
rate_limit_enabled: bool = True
|
||||
# Number of trusted reverse proxies in front of the app. When > 0, the
|
||||
# client IP for rate limiting is taken from X-Forwarded-For, counting this
|
||||
# many hops from the right. Keep 0 unless deployed behind a known proxy.
|
||||
trusted_proxy_count: int = 0
|
||||
|
||||
# Signing keys
|
||||
signing_key_path: str = "data/keys"
|
||||
|
|
|
|||
|
|
@ -1,4 +1,25 @@
|
|||
from slowapi import Limiter
|
||||
from slowapi.util import get_remote_address
|
||||
from starlette.requests import Request
|
||||
|
||||
limiter = Limiter(key_func=get_remote_address)
|
||||
|
||||
def _client_identifier(request: Request) -> str:
|
||||
"""Rate-limit key: the client IP, proxy-aware.
|
||||
|
||||
By default the direct peer address is used. When ``trusted_proxy_count`` is
|
||||
configured (the number of trusted reverse proxies in front of the app), the
|
||||
client address is taken from the ``X-Forwarded-For`` chain instead, counting
|
||||
that many hops back from the right. The header is ignored unless proxies are
|
||||
trusted, so it cannot be spoofed in a direct-connection deployment.
|
||||
"""
|
||||
settings = getattr(getattr(request.app, "state", None), "settings", None)
|
||||
hops: int = getattr(settings, "trusted_proxy_count", 0) or 0
|
||||
if hops > 0:
|
||||
forwarded = request.headers.get("x-forwarded-for", "")
|
||||
parts = [p.strip() for p in forwarded.split(",") if p.strip()]
|
||||
if len(parts) >= hops:
|
||||
return parts[-hops]
|
||||
return get_remote_address(request)
|
||||
|
||||
|
||||
limiter = Limiter(key_func=_client_identifier)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,10 @@
|
|||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
from httpx import AsyncClient
|
||||
from starlette.requests import Request
|
||||
|
||||
from porchlight.rate_limit import _client_identifier
|
||||
from tests.conftest import get_csrf_token
|
||||
|
||||
|
||||
|
|
@ -23,3 +27,38 @@ async def test_password_login_rate_limited(client: AsyncClient) -> None:
|
|||
)
|
||||
|
||||
assert response.status_code == 429
|
||||
|
||||
|
||||
def _make_request(headers: dict[str, str], client_host: str, trusted_proxy_count: int) -> Request:
|
||||
scope = {
|
||||
"type": "http",
|
||||
"headers": [(k.lower().encode(), v.encode()) for k, v in headers.items()],
|
||||
"client": (client_host, 12345),
|
||||
"app": SimpleNamespace(
|
||||
state=SimpleNamespace(settings=SimpleNamespace(trusted_proxy_count=trusted_proxy_count))
|
||||
),
|
||||
}
|
||||
return Request(scope)
|
||||
|
||||
|
||||
def test_client_identifier_ignores_forwarded_when_no_trusted_proxy() -> None:
|
||||
req = _make_request({"x-forwarded-for": "203.0.113.9"}, "127.0.0.1", trusted_proxy_count=0)
|
||||
# Spoofable header must be ignored when no proxy is trusted.
|
||||
assert _client_identifier(req) == "127.0.0.1"
|
||||
|
||||
|
||||
def test_client_identifier_uses_forwarded_with_one_trusted_proxy() -> None:
|
||||
req = _make_request({"x-forwarded-for": "203.0.113.9"}, "10.0.0.1", trusted_proxy_count=1)
|
||||
assert _client_identifier(req) == "203.0.113.9"
|
||||
|
||||
|
||||
def test_client_identifier_picks_correct_hop_with_two_trusted_proxies() -> None:
|
||||
req = _make_request({"x-forwarded-for": "client, proxyA, proxyB"}, "10.0.0.1", trusted_proxy_count=2)
|
||||
# Two trusted proxies appended their peers; the client as seen by the
|
||||
# outermost trusted proxy is the 2nd-from-right entry.
|
||||
assert _client_identifier(req) == "proxyA"
|
||||
|
||||
|
||||
def test_client_identifier_falls_back_when_forwarded_missing() -> None:
|
||||
req = _make_request({}, "10.0.0.1", trusted_proxy_count=1)
|
||||
assert _client_identifier(req) == "10.0.0.1"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue