diff --git a/docs/plans/2026-02-19-csrf-protection-plan.md b/docs/plans/2026-02-19-csrf-protection-plan.md new file mode 100644 index 0000000..9248007 --- /dev/null +++ b/docs/plans/2026-02-19-csrf-protection-plan.md @@ -0,0 +1,623 @@ +# CSRF Protection Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Add OWASP-recommended Synchronizer Token Pattern CSRF protection to all session-authenticated state-changing endpoints. + +**Architecture:** ASGI middleware validates CSRF tokens on all non-safe HTTP methods, skipping exempt paths (OIDC protocol endpoints). Tokens are stored in the session, injected into templates via Jinja2 globals, and sent by htmx via `hx-headers` and by fetch JS via a `` tag. + +**Tech Stack:** FastAPI, Starlette SessionMiddleware, Jinja2, htmx, vanilla JS fetch + +--- + +### Task 1: CSRF Middleware + +**Files:** +- Create: `src/porchlight/csrf.py` +- Test: `tests/test_csrf.py` + +**Step 1: Write the failing tests** + +Create `tests/test_csrf.py`: + +```python +import secrets + +import pytest +from httpx import ASGITransport, AsyncClient +from starlette.applications import Starlette +from starlette.middleware.sessions import SessionMiddleware +from starlette.requests import Request +from starlette.responses import JSONResponse, PlainTextResponse +from starlette.routing import Route + +from porchlight.csrf import CSRFMiddleware, generate_csrf_token + + +def _make_app(exempt_paths: set[str] | None = None, check_origin: str | None = None) -> Starlette: + """Create a minimal Starlette app with session + CSRF middleware for testing.""" + + async def get_token(request: Request) -> JSONResponse: + token = generate_csrf_token(request) + return JSONResponse({"token": token}) + + async def post_action(request: Request) -> PlainTextResponse: + return PlainTextResponse("ok") + + async def exempt_action(request: Request) -> PlainTextResponse: + return PlainTextResponse("ok") + + app = Starlette( + routes=[ + Route("/get-token", get_token), + Route("/action", post_action, methods=["POST"]), + Route("/action", get_token, methods=["GET"]), + Route("/exempt", exempt_action, methods=["POST"]), + ], + ) + app.add_middleware( + CSRFMiddleware, + exempt_paths=exempt_paths or {"/exempt"}, + check_origin=check_origin, + ) + app.add_middleware(SessionMiddleware, secret_key="test-secret") + return app + + +@pytest.fixture +def app() -> Starlette: + return _make_app() + + +@pytest.fixture +async def tc(app: Starlette) -> AsyncClient: + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as ac: + yield ac + + +async def _get_token_and_cookies(tc: AsyncClient) -> tuple[str, dict]: + """Helper: GET /get-token to obtain a CSRF token and session cookie.""" + resp = await tc.get("/get-token") + assert resp.status_code == 200 + token = resp.json()["token"] + return token, dict(resp.cookies) + + +class TestCSRFValidation: + async def test_safe_methods_pass_through(self, tc: AsyncClient) -> None: + resp = await tc.get("/action") + assert resp.status_code == 200 + + async def test_post_without_token_returns_403(self, tc: AsyncClient) -> None: + resp = await tc.post("/action") + assert resp.status_code == 403 + assert "CSRF" in resp.text + + async def test_post_with_valid_form_token(self, tc: AsyncClient) -> None: + token, cookies = await _get_token_and_cookies(tc) + resp = await tc.post("/action", data={"csrf_token": token}, cookies=cookies) + assert resp.status_code == 200 + + async def test_post_with_valid_header_token(self, tc: AsyncClient) -> None: + token, cookies = await _get_token_and_cookies(tc) + resp = await tc.post( + "/action", + headers={"X-CSRF-Token": token}, + cookies=cookies, + ) + assert resp.status_code == 200 + + async def test_post_with_wrong_token_returns_403(self, tc: AsyncClient) -> None: + _token, cookies = await _get_token_and_cookies(tc) + resp = await tc.post("/action", data={"csrf_token": "wrong"}, cookies=cookies) + assert resp.status_code == 403 + + async def test_exempt_path_skips_validation(self, tc: AsyncClient) -> None: + resp = await tc.post("/exempt") + assert resp.status_code == 200 + + +class TestOriginCheck: + async def test_matching_origin_passes(self) -> None: + app = _make_app(check_origin="http://testserver") + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as tc: + token, cookies = await _get_token_and_cookies(tc) + resp = await tc.post( + "/action", + data={"csrf_token": token}, + cookies=cookies, + headers={"Origin": "http://testserver"}, + ) + assert resp.status_code == 200 + + async def test_mismatched_origin_returns_403(self) -> None: + app = _make_app(check_origin="http://testserver") + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as tc: + token, cookies = await _get_token_and_cookies(tc) + resp = await tc.post( + "/action", + data={"csrf_token": token}, + cookies=cookies, + headers={"Origin": "http://evil.com"}, + ) + assert resp.status_code == 403 + assert "Origin" in resp.text + + async def test_no_origin_falls_back_to_token_check(self) -> None: + app = _make_app(check_origin="http://testserver") + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as tc: + token, cookies = await _get_token_and_cookies(tc) + resp = await tc.post( + "/action", + data={"csrf_token": token}, + cookies=cookies, + ) + assert resp.status_code == 200 + + +class TestGenerateCSRFToken: + async def test_generates_token_and_stores_in_session(self, tc: AsyncClient) -> None: + resp = await tc.get("/get-token") + token = resp.json()["token"] + assert len(token) > 20 + + async def test_returns_same_token_within_session(self, tc: AsyncClient) -> None: + resp1 = await tc.get("/get-token") + token1 = resp1.json()["token"] + cookies = dict(resp1.cookies) + resp2 = await tc.get("/get-token", cookies=cookies) + token2 = resp2.json()["token"] + assert token1 == token2 +``` + +**Step 2: Run tests to verify they fail** + +Run: `uv run python -m pytest tests/test_csrf.py -v` +Expected: FAIL — `ImportError: cannot import name 'CSRFMiddleware' from 'porchlight.csrf'` + +**Step 3: Write the implementation** + +Create `src/porchlight/csrf.py`: + +```python +import hmac +import logging +import secrets +from urllib.parse import urlparse + +from starlette.requests import Request +from starlette.responses import HTMLResponse +from starlette.types import ASGIApp, Message, Receive, Scope, Send + +logger = logging.getLogger(__name__) + +SAFE_METHODS = frozenset({"GET", "HEAD", "OPTIONS"}) +SESSION_KEY = "csrf_token" + + +def generate_csrf_token(request: Request) -> str: + """Get or create a CSRF token for the current session.""" + token = request.session.get(SESSION_KEY) + if token is None: + token = secrets.token_urlsafe(32) + request.session[SESSION_KEY] = token + return token + + +class CSRFMiddleware: + """ASGI middleware implementing the Synchronizer Token Pattern. + + Validates that non-safe requests include a CSRF token matching the + one stored in the session. The token can be sent as: + - Form field: csrf_token + - Header: X-CSRF-Token + + Optionally checks the Origin header as defense-in-depth. + """ + + def __init__( + self, + app: ASGIApp, + exempt_paths: set[str] | None = None, + check_origin: str | None = None, + ) -> None: + self.app = app + self.exempt_paths = exempt_paths or set() + self.check_origin = check_origin + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + request = Request(scope, receive, send) + method = request.method + + if method in SAFE_METHODS: + await self.app(scope, receive, send) + return + + if request.url.path in self.exempt_paths: + await self.app(scope, receive, send) + return + + # Defense-in-depth: Origin check + if self.check_origin: + origin = request.headers.get("origin") + if origin and origin != "null": + expected_origin = self.check_origin.rstrip("/") + if origin.rstrip("/") != expected_origin: + logger.warning("CSRF origin mismatch on %s: %s", request.url.path, origin) + response = HTMLResponse( + "
Origin mismatch
", + status_code=403, + ) + await response(scope, receive, send) + return + + # Token validation + session_token = request.session.get(SESSION_KEY) + if not session_token: + logger.warning("CSRF token missing from session on %s", request.url.path) + response = HTMLResponse( + "CSRF validation failed
", + status_code=403, + ) + await response(scope, receive, send) + return + + # Try header first (works for both JSON and form requests) + submitted_token = request.headers.get("x-csrf-token") + + # Fall back to form body + if not submitted_token: + # We need to read the body for form data + form = await request.form() + submitted_token = form.get("csrf_token") + if isinstance(submitted_token, str): + pass # good + else: + submitted_token = None + + if not submitted_token or not hmac.compare_digest(submitted_token, session_token): + logger.warning("CSRF token mismatch on %s", request.url.path) + response = HTMLResponse( + "CSRF validation failed
", + status_code=403, + ) + await response(scope, receive, send) + return + + await self.app(scope, receive, send) +``` + +**Step 4: Run tests to verify they pass** + +Run: `uv run python -m pytest tests/test_csrf.py -v` +Expected: All pass + +**Step 5: Commit** + +```bash +git add src/porchlight/csrf.py tests/test_csrf.py +git commit -m "feat: add CSRF middleware with synchronizer token pattern" +``` + +--- + +### Task 2: Wire Middleware + Session Hardening + Template Globals + +**Files:** +- Modify: `src/porchlight/app.py:94-128` +- Modify: `src/porchlight/config.py:23-62` +- Test: `tests/test_app.py` (existing tests should still pass) +- Test: `tests/test_config.py` (existing tests should still pass) + +**Step 1: Write the failing tests** + +Add to `tests/test_csrf.py`: + +```python +class TestAppIntegration: + """Test CSRF middleware is wired into the real app.""" + + async def test_post_without_csrf_token_returns_403(self, client: AsyncClient) -> None: + """Any POST to a session-protected endpoint without CSRF token gets 403.""" + resp = await client.post("/login/password", data={"username": "x", "password": "y"}) + assert resp.status_code == 403 + + async def test_exempt_token_endpoint(self, client: AsyncClient) -> None: + """The /token endpoint is exempt from CSRF (uses client auth).""" + resp = await client.post("/token", data={"grant_type": "authorization_code", "code": "fake"}) + # Should NOT be 403 — it should fail for auth reasons, not CSRF + assert resp.status_code != 403 +``` + +These tests need the `client` fixture from `tests/conftest.py`. + +**Step 2: Run tests to verify they fail** + +Run: `uv run python -m pytest tests/test_csrf.py::TestAppIntegration -v` +Expected: First test FAILS (currently returns 200 with error HTML, not 403). Second test passes trivially. + +**Step 3: Add config field and wire middleware** + +Modify `src/porchlight/config.py` — add `session_https_only` field after `session_secret`: + +```python + # Session + session_secret: str | None = None # If None, a random secret is generated per process + session_https_only: bool = True +``` + +Modify `src/porchlight/app.py`: + +1. Add import: +```python +from porchlight.csrf import CSRFMiddleware, generate_csrf_token +``` + +2. Replace session middleware block (lines 108-110) with: +```python + # Session middleware + session_secret = settings.session_secret or secrets.token_hex(32) + app.add_middleware( + CSRFMiddleware, + exempt_paths={"/token", "/userinfo"}, + check_origin=settings.issuer, + ) + app.add_middleware( + SessionMiddleware, + secret_key=session_secret, + same_site="lax", + https_only=settings.session_https_only, + ) # type: ignore[arg-type] +``` + +Note: Starlette middleware order is reversed — `SessionMiddleware` is added second but runs first (outermost). CSRF runs after session is available. + +3. Add CSRF token to template globals (after templates creation, line 113): +```python + # Templates + templates = Jinja2Templates(directory=str(PACKAGE_DIR / "templates")) + + def csrf_token_processor(request: Request) -> str: + return generate_csrf_token(request) + + templates.env.globals["csrf_token_processor"] = csrf_token_processor + app.state.templates = templates +``` + +**Step 4: Run tests to verify they pass** + +Run: `uv run python -m pytest tests/test_csrf.py -v` +Expected: All pass + +Run: `uv run python -m pytest tests/test_app.py tests/test_config.py -v` +Expected: All pass (config test for `session_https_only` default) + +**Step 5: Commit** + +```bash +git add src/porchlight/app.py src/porchlight/config.py tests/test_csrf.py +git commit -m "feat: wire CSRF middleware and harden session cookie" +``` + +--- + +### Task 3: Template Changes + +**Files:** +- Modify: `src/porchlight/templates/base.html` +- Modify: `src/porchlight/templates/login.html` +- Modify: `src/porchlight/templates/consent.html` +- Modify: `src/porchlight/templates/manage/credentials.html` +- Modify: `src/porchlight/templates/manage/profile.html` +- Modify: `src/porchlight/static/webauthn.js` + +**Step 1: Modify base.html** + +Add CSRF meta tag and htmx headers to `base.html`: + +```html + + + + + + +