# CSRF Protection Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Add OWASP-recommended Synchronizer Token Pattern CSRF protection to all session-authenticated state-changing endpoints. **Architecture:** ASGI middleware validates CSRF tokens on all non-safe HTTP methods, skipping exempt paths (OIDC protocol endpoints). Tokens are stored in the session, injected into templates via Jinja2 globals, and sent by htmx via `hx-headers` and by fetch JS via a `` tag. **Tech Stack:** FastAPI, Starlette SessionMiddleware, Jinja2, htmx, vanilla JS fetch --- ### Task 1: CSRF Middleware **Files:** - Create: `src/porchlight/csrf.py` - Test: `tests/test_csrf.py` **Step 1: Write the failing tests** Create `tests/test_csrf.py`: ```python import secrets import pytest from httpx import ASGITransport, AsyncClient from starlette.applications import Starlette from starlette.middleware.sessions import SessionMiddleware from starlette.requests import Request from starlette.responses import JSONResponse, PlainTextResponse from starlette.routing import Route from porchlight.csrf import CSRFMiddleware, generate_csrf_token def _make_app(exempt_paths: set[str] | None = None, check_origin: str | None = None) -> Starlette: """Create a minimal Starlette app with session + CSRF middleware for testing.""" async def get_token(request: Request) -> JSONResponse: token = generate_csrf_token(request) return JSONResponse({"token": token}) async def post_action(request: Request) -> PlainTextResponse: return PlainTextResponse("ok") async def exempt_action(request: Request) -> PlainTextResponse: return PlainTextResponse("ok") app = Starlette( routes=[ Route("/get-token", get_token), Route("/action", post_action, methods=["POST"]), Route("/action", get_token, methods=["GET"]), Route("/exempt", exempt_action, methods=["POST"]), ], ) app.add_middleware( CSRFMiddleware, exempt_paths=exempt_paths or {"/exempt"}, check_origin=check_origin, ) app.add_middleware(SessionMiddleware, secret_key="test-secret") return app @pytest.fixture def app() -> Starlette: return _make_app() @pytest.fixture async def tc(app: Starlette) -> AsyncClient: transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver") as ac: yield ac async def _get_token_and_cookies(tc: AsyncClient) -> tuple[str, dict]: """Helper: GET /get-token to obtain a CSRF token and session cookie.""" resp = await tc.get("/get-token") assert resp.status_code == 200 token = resp.json()["token"] return token, dict(resp.cookies) class TestCSRFValidation: async def test_safe_methods_pass_through(self, tc: AsyncClient) -> None: resp = await tc.get("/action") assert resp.status_code == 200 async def test_post_without_token_returns_403(self, tc: AsyncClient) -> None: resp = await tc.post("/action") assert resp.status_code == 403 assert "CSRF" in resp.text async def test_post_with_valid_form_token(self, tc: AsyncClient) -> None: token, cookies = await _get_token_and_cookies(tc) resp = await tc.post("/action", data={"csrf_token": token}, cookies=cookies) assert resp.status_code == 200 async def test_post_with_valid_header_token(self, tc: AsyncClient) -> None: token, cookies = await _get_token_and_cookies(tc) resp = await tc.post( "/action", headers={"X-CSRF-Token": token}, cookies=cookies, ) assert resp.status_code == 200 async def test_post_with_wrong_token_returns_403(self, tc: AsyncClient) -> None: _token, cookies = await _get_token_and_cookies(tc) resp = await tc.post("/action", data={"csrf_token": "wrong"}, cookies=cookies) assert resp.status_code == 403 async def test_exempt_path_skips_validation(self, tc: AsyncClient) -> None: resp = await tc.post("/exempt") assert resp.status_code == 200 class TestOriginCheck: async def test_matching_origin_passes(self) -> None: app = _make_app(check_origin="http://testserver") transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver") as tc: token, cookies = await _get_token_and_cookies(tc) resp = await tc.post( "/action", data={"csrf_token": token}, cookies=cookies, headers={"Origin": "http://testserver"}, ) assert resp.status_code == 200 async def test_mismatched_origin_returns_403(self) -> None: app = _make_app(check_origin="http://testserver") transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver") as tc: token, cookies = await _get_token_and_cookies(tc) resp = await tc.post( "/action", data={"csrf_token": token}, cookies=cookies, headers={"Origin": "http://evil.com"}, ) assert resp.status_code == 403 assert "Origin" in resp.text async def test_no_origin_falls_back_to_token_check(self) -> None: app = _make_app(check_origin="http://testserver") transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver") as tc: token, cookies = await _get_token_and_cookies(tc) resp = await tc.post( "/action", data={"csrf_token": token}, cookies=cookies, ) assert resp.status_code == 200 class TestGenerateCSRFToken: async def test_generates_token_and_stores_in_session(self, tc: AsyncClient) -> None: resp = await tc.get("/get-token") token = resp.json()["token"] assert len(token) > 20 async def test_returns_same_token_within_session(self, tc: AsyncClient) -> None: resp1 = await tc.get("/get-token") token1 = resp1.json()["token"] cookies = dict(resp1.cookies) resp2 = await tc.get("/get-token", cookies=cookies) token2 = resp2.json()["token"] assert token1 == token2 ``` **Step 2: Run tests to verify they fail** Run: `uv run python -m pytest tests/test_csrf.py -v` Expected: FAIL — `ImportError: cannot import name 'CSRFMiddleware' from 'porchlight.csrf'` **Step 3: Write the implementation** Create `src/porchlight/csrf.py`: ```python import hmac import logging import secrets from urllib.parse import urlparse from starlette.requests import Request from starlette.responses import HTMLResponse from starlette.types import ASGIApp, Message, Receive, Scope, Send logger = logging.getLogger(__name__) SAFE_METHODS = frozenset({"GET", "HEAD", "OPTIONS"}) SESSION_KEY = "csrf_token" def generate_csrf_token(request: Request) -> str: """Get or create a CSRF token for the current session.""" token = request.session.get(SESSION_KEY) if token is None: token = secrets.token_urlsafe(32) request.session[SESSION_KEY] = token return token class CSRFMiddleware: """ASGI middleware implementing the Synchronizer Token Pattern. Validates that non-safe requests include a CSRF token matching the one stored in the session. The token can be sent as: - Form field: csrf_token - Header: X-CSRF-Token Optionally checks the Origin header as defense-in-depth. """ def __init__( self, app: ASGIApp, exempt_paths: set[str] | None = None, check_origin: str | None = None, ) -> None: self.app = app self.exempt_paths = exempt_paths or set() self.check_origin = check_origin async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: if scope["type"] != "http": await self.app(scope, receive, send) return request = Request(scope, receive, send) method = request.method if method in SAFE_METHODS: await self.app(scope, receive, send) return if request.url.path in self.exempt_paths: await self.app(scope, receive, send) return # Defense-in-depth: Origin check if self.check_origin: origin = request.headers.get("origin") if origin and origin != "null": expected_origin = self.check_origin.rstrip("/") if origin.rstrip("/") != expected_origin: logger.warning("CSRF origin mismatch on %s: %s", request.url.path, origin) response = HTMLResponse( "
Origin mismatch
", status_code=403, ) await response(scope, receive, send) return # Token validation session_token = request.session.get(SESSION_KEY) if not session_token: logger.warning("CSRF token missing from session on %s", request.url.path) response = HTMLResponse( "CSRF validation failed
", status_code=403, ) await response(scope, receive, send) return # Try header first (works for both JSON and form requests) submitted_token = request.headers.get("x-csrf-token") # Fall back to form body if not submitted_token: # We need to read the body for form data form = await request.form() submitted_token = form.get("csrf_token") if isinstance(submitted_token, str): pass # good else: submitted_token = None if not submitted_token or not hmac.compare_digest(submitted_token, session_token): logger.warning("CSRF token mismatch on %s", request.url.path) response = HTMLResponse( "CSRF validation failed
", status_code=403, ) await response(scope, receive, send) return await self.app(scope, receive, send) ``` **Step 4: Run tests to verify they pass** Run: `uv run python -m pytest tests/test_csrf.py -v` Expected: All pass **Step 5: Commit** ```bash git add src/porchlight/csrf.py tests/test_csrf.py git commit -m "feat: add CSRF middleware with synchronizer token pattern" ``` --- ### Task 2: Wire Middleware + Session Hardening + Template Globals **Files:** - Modify: `src/porchlight/app.py:94-128` - Modify: `src/porchlight/config.py:23-62` - Test: `tests/test_app.py` (existing tests should still pass) - Test: `tests/test_config.py` (existing tests should still pass) **Step 1: Write the failing tests** Add to `tests/test_csrf.py`: ```python class TestAppIntegration: """Test CSRF middleware is wired into the real app.""" async def test_post_without_csrf_token_returns_403(self, client: AsyncClient) -> None: """Any POST to a session-protected endpoint without CSRF token gets 403.""" resp = await client.post("/login/password", data={"username": "x", "password": "y"}) assert resp.status_code == 403 async def test_exempt_token_endpoint(self, client: AsyncClient) -> None: """The /token endpoint is exempt from CSRF (uses client auth).""" resp = await client.post("/token", data={"grant_type": "authorization_code", "code": "fake"}) # Should NOT be 403 — it should fail for auth reasons, not CSRF assert resp.status_code != 403 ``` These tests need the `client` fixture from `tests/conftest.py`. **Step 2: Run tests to verify they fail** Run: `uv run python -m pytest tests/test_csrf.py::TestAppIntegration -v` Expected: First test FAILS (currently returns 200 with error HTML, not 403). Second test passes trivially. **Step 3: Add config field and wire middleware** Modify `src/porchlight/config.py` — add `session_https_only` field after `session_secret`: ```python # Session session_secret: str | None = None # If None, a random secret is generated per process session_https_only: bool = True ``` Modify `src/porchlight/app.py`: 1. Add import: ```python from porchlight.csrf import CSRFMiddleware, generate_csrf_token ``` 2. Replace session middleware block (lines 108-110) with: ```python # Session middleware session_secret = settings.session_secret or secrets.token_hex(32) app.add_middleware( CSRFMiddleware, exempt_paths={"/token", "/userinfo"}, check_origin=settings.issuer, ) app.add_middleware( SessionMiddleware, secret_key=session_secret, same_site="lax", https_only=settings.session_https_only, ) # type: ignore[arg-type] ``` Note: Starlette middleware order is reversed — `SessionMiddleware` is added second but runs first (outermost). CSRF runs after session is available. 3. Add CSRF token to template globals (after templates creation, line 113): ```python # Templates templates = Jinja2Templates(directory=str(PACKAGE_DIR / "templates")) def csrf_token_processor(request: Request) -> str: return generate_csrf_token(request) templates.env.globals["csrf_token_processor"] = csrf_token_processor app.state.templates = templates ``` **Step 4: Run tests to verify they pass** Run: `uv run python -m pytest tests/test_csrf.py -v` Expected: All pass Run: `uv run python -m pytest tests/test_app.py tests/test_config.py -v` Expected: All pass (config test for `session_https_only` default) **Step 5: Commit** ```bash git add src/porchlight/app.py src/porchlight/config.py tests/test_csrf.py git commit -m "feat: wire CSRF middleware and harden session cookie" ``` --- ### Task 3: Template Changes **Files:** - Modify: `src/porchlight/templates/base.html` - Modify: `src/porchlight/templates/login.html` - Modify: `src/porchlight/templates/consent.html` - Modify: `src/porchlight/templates/manage/credentials.html` - Modify: `src/porchlight/templates/manage/profile.html` - Modify: `src/porchlight/static/webauthn.js` **Step 1: Modify base.html** Add CSRF meta tag and htmx headers to `base.html`: ```html