# CSRF Protection Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Add OWASP-recommended Synchronizer Token Pattern CSRF protection to all session-authenticated state-changing endpoints. **Architecture:** ASGI middleware validates CSRF tokens on all non-safe HTTP methods, skipping exempt paths (OIDC protocol endpoints). Tokens are stored in the session, injected into templates via Jinja2 globals, and sent by htmx via `hx-headers` and by fetch JS via a `` tag. **Tech Stack:** FastAPI, Starlette SessionMiddleware, Jinja2, htmx, vanilla JS fetch --- ### Task 1: CSRF Middleware **Files:** - Create: `src/porchlight/csrf.py` - Test: `tests/test_csrf.py` **Step 1: Write the failing tests** Create `tests/test_csrf.py`: ```python import secrets import pytest from httpx import ASGITransport, AsyncClient from starlette.applications import Starlette from starlette.middleware.sessions import SessionMiddleware from starlette.requests import Request from starlette.responses import JSONResponse, PlainTextResponse from starlette.routing import Route from porchlight.csrf import CSRFMiddleware, generate_csrf_token def _make_app(exempt_paths: set[str] | None = None, check_origin: str | None = None) -> Starlette: """Create a minimal Starlette app with session + CSRF middleware for testing.""" async def get_token(request: Request) -> JSONResponse: token = generate_csrf_token(request) return JSONResponse({"token": token}) async def post_action(request: Request) -> PlainTextResponse: return PlainTextResponse("ok") async def exempt_action(request: Request) -> PlainTextResponse: return PlainTextResponse("ok") app = Starlette( routes=[ Route("/get-token", get_token), Route("/action", post_action, methods=["POST"]), Route("/action", get_token, methods=["GET"]), Route("/exempt", exempt_action, methods=["POST"]), ], ) app.add_middleware( CSRFMiddleware, exempt_paths=exempt_paths or {"/exempt"}, check_origin=check_origin, ) app.add_middleware(SessionMiddleware, secret_key="test-secret") return app @pytest.fixture def app() -> Starlette: return _make_app() @pytest.fixture async def tc(app: Starlette) -> AsyncClient: transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver") as ac: yield ac async def _get_token_and_cookies(tc: AsyncClient) -> tuple[str, dict]: """Helper: GET /get-token to obtain a CSRF token and session cookie.""" resp = await tc.get("/get-token") assert resp.status_code == 200 token = resp.json()["token"] return token, dict(resp.cookies) class TestCSRFValidation: async def test_safe_methods_pass_through(self, tc: AsyncClient) -> None: resp = await tc.get("/action") assert resp.status_code == 200 async def test_post_without_token_returns_403(self, tc: AsyncClient) -> None: resp = await tc.post("/action") assert resp.status_code == 403 assert "CSRF" in resp.text async def test_post_with_valid_form_token(self, tc: AsyncClient) -> None: token, cookies = await _get_token_and_cookies(tc) resp = await tc.post("/action", data={"csrf_token": token}, cookies=cookies) assert resp.status_code == 200 async def test_post_with_valid_header_token(self, tc: AsyncClient) -> None: token, cookies = await _get_token_and_cookies(tc) resp = await tc.post( "/action", headers={"X-CSRF-Token": token}, cookies=cookies, ) assert resp.status_code == 200 async def test_post_with_wrong_token_returns_403(self, tc: AsyncClient) -> None: _token, cookies = await _get_token_and_cookies(tc) resp = await tc.post("/action", data={"csrf_token": "wrong"}, cookies=cookies) assert resp.status_code == 403 async def test_exempt_path_skips_validation(self, tc: AsyncClient) -> None: resp = await tc.post("/exempt") assert resp.status_code == 200 class TestOriginCheck: async def test_matching_origin_passes(self) -> None: app = _make_app(check_origin="http://testserver") transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver") as tc: token, cookies = await _get_token_and_cookies(tc) resp = await tc.post( "/action", data={"csrf_token": token}, cookies=cookies, headers={"Origin": "http://testserver"}, ) assert resp.status_code == 200 async def test_mismatched_origin_returns_403(self) -> None: app = _make_app(check_origin="http://testserver") transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver") as tc: token, cookies = await _get_token_and_cookies(tc) resp = await tc.post( "/action", data={"csrf_token": token}, cookies=cookies, headers={"Origin": "http://evil.com"}, ) assert resp.status_code == 403 assert "Origin" in resp.text async def test_no_origin_falls_back_to_token_check(self) -> None: app = _make_app(check_origin="http://testserver") transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://testserver") as tc: token, cookies = await _get_token_and_cookies(tc) resp = await tc.post( "/action", data={"csrf_token": token}, cookies=cookies, ) assert resp.status_code == 200 class TestGenerateCSRFToken: async def test_generates_token_and_stores_in_session(self, tc: AsyncClient) -> None: resp = await tc.get("/get-token") token = resp.json()["token"] assert len(token) > 20 async def test_returns_same_token_within_session(self, tc: AsyncClient) -> None: resp1 = await tc.get("/get-token") token1 = resp1.json()["token"] cookies = dict(resp1.cookies) resp2 = await tc.get("/get-token", cookies=cookies) token2 = resp2.json()["token"] assert token1 == token2 ``` **Step 2: Run tests to verify they fail** Run: `uv run python -m pytest tests/test_csrf.py -v` Expected: FAIL — `ImportError: cannot import name 'CSRFMiddleware' from 'porchlight.csrf'` **Step 3: Write the implementation** Create `src/porchlight/csrf.py`: ```python import hmac import logging import secrets from urllib.parse import urlparse from starlette.requests import Request from starlette.responses import HTMLResponse from starlette.types import ASGIApp, Message, Receive, Scope, Send logger = logging.getLogger(__name__) SAFE_METHODS = frozenset({"GET", "HEAD", "OPTIONS"}) SESSION_KEY = "csrf_token" def generate_csrf_token(request: Request) -> str: """Get or create a CSRF token for the current session.""" token = request.session.get(SESSION_KEY) if token is None: token = secrets.token_urlsafe(32) request.session[SESSION_KEY] = token return token class CSRFMiddleware: """ASGI middleware implementing the Synchronizer Token Pattern. Validates that non-safe requests include a CSRF token matching the one stored in the session. The token can be sent as: - Form field: csrf_token - Header: X-CSRF-Token Optionally checks the Origin header as defense-in-depth. """ def __init__( self, app: ASGIApp, exempt_paths: set[str] | None = None, check_origin: str | None = None, ) -> None: self.app = app self.exempt_paths = exempt_paths or set() self.check_origin = check_origin async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: if scope["type"] != "http": await self.app(scope, receive, send) return request = Request(scope, receive, send) method = request.method if method in SAFE_METHODS: await self.app(scope, receive, send) return if request.url.path in self.exempt_paths: await self.app(scope, receive, send) return # Defense-in-depth: Origin check if self.check_origin: origin = request.headers.get("origin") if origin and origin != "null": expected_origin = self.check_origin.rstrip("/") if origin.rstrip("/") != expected_origin: logger.warning("CSRF origin mismatch on %s: %s", request.url.path, origin) response = HTMLResponse( "

403 Forbidden

Origin mismatch

", status_code=403, ) await response(scope, receive, send) return # Token validation session_token = request.session.get(SESSION_KEY) if not session_token: logger.warning("CSRF token missing from session on %s", request.url.path) response = HTMLResponse( "

403 Forbidden

CSRF validation failed

", status_code=403, ) await response(scope, receive, send) return # Try header first (works for both JSON and form requests) submitted_token = request.headers.get("x-csrf-token") # Fall back to form body if not submitted_token: # We need to read the body for form data form = await request.form() submitted_token = form.get("csrf_token") if isinstance(submitted_token, str): pass # good else: submitted_token = None if not submitted_token or not hmac.compare_digest(submitted_token, session_token): logger.warning("CSRF token mismatch on %s", request.url.path) response = HTMLResponse( "

403 Forbidden

CSRF validation failed

", status_code=403, ) await response(scope, receive, send) return await self.app(scope, receive, send) ``` **Step 4: Run tests to verify they pass** Run: `uv run python -m pytest tests/test_csrf.py -v` Expected: All pass **Step 5: Commit** ```bash git add src/porchlight/csrf.py tests/test_csrf.py git commit -m "feat: add CSRF middleware with synchronizer token pattern" ``` --- ### Task 2: Wire Middleware + Session Hardening + Template Globals **Files:** - Modify: `src/porchlight/app.py:94-128` - Modify: `src/porchlight/config.py:23-62` - Test: `tests/test_app.py` (existing tests should still pass) - Test: `tests/test_config.py` (existing tests should still pass) **Step 1: Write the failing tests** Add to `tests/test_csrf.py`: ```python class TestAppIntegration: """Test CSRF middleware is wired into the real app.""" async def test_post_without_csrf_token_returns_403(self, client: AsyncClient) -> None: """Any POST to a session-protected endpoint without CSRF token gets 403.""" resp = await client.post("/login/password", data={"username": "x", "password": "y"}) assert resp.status_code == 403 async def test_exempt_token_endpoint(self, client: AsyncClient) -> None: """The /token endpoint is exempt from CSRF (uses client auth).""" resp = await client.post("/token", data={"grant_type": "authorization_code", "code": "fake"}) # Should NOT be 403 — it should fail for auth reasons, not CSRF assert resp.status_code != 403 ``` These tests need the `client` fixture from `tests/conftest.py`. **Step 2: Run tests to verify they fail** Run: `uv run python -m pytest tests/test_csrf.py::TestAppIntegration -v` Expected: First test FAILS (currently returns 200 with error HTML, not 403). Second test passes trivially. **Step 3: Add config field and wire middleware** Modify `src/porchlight/config.py` — add `session_https_only` field after `session_secret`: ```python # Session session_secret: str | None = None # If None, a random secret is generated per process session_https_only: bool = True ``` Modify `src/porchlight/app.py`: 1. Add import: ```python from porchlight.csrf import CSRFMiddleware, generate_csrf_token ``` 2. Replace session middleware block (lines 108-110) with: ```python # Session middleware session_secret = settings.session_secret or secrets.token_hex(32) app.add_middleware( CSRFMiddleware, exempt_paths={"/token", "/userinfo"}, check_origin=settings.issuer, ) app.add_middleware( SessionMiddleware, secret_key=session_secret, same_site="lax", https_only=settings.session_https_only, ) # type: ignore[arg-type] ``` Note: Starlette middleware order is reversed — `SessionMiddleware` is added second but runs first (outermost). CSRF runs after session is available. 3. Add CSRF token to template globals (after templates creation, line 113): ```python # Templates templates = Jinja2Templates(directory=str(PACKAGE_DIR / "templates")) def csrf_token_processor(request: Request) -> str: return generate_csrf_token(request) templates.env.globals["csrf_token_processor"] = csrf_token_processor app.state.templates = templates ``` **Step 4: Run tests to verify they pass** Run: `uv run python -m pytest tests/test_csrf.py -v` Expected: All pass Run: `uv run python -m pytest tests/test_app.py tests/test_config.py -v` Expected: All pass (config test for `session_https_only` default) **Step 5: Commit** ```bash git add src/porchlight/app.py src/porchlight/config.py tests/test_csrf.py git commit -m "feat: wire CSRF middleware and harden session cookie" ``` --- ### Task 3: Template Changes **Files:** - Modify: `src/porchlight/templates/base.html` - Modify: `src/porchlight/templates/login.html` - Modify: `src/porchlight/templates/consent.html` - Modify: `src/porchlight/templates/manage/credentials.html` - Modify: `src/porchlight/templates/manage/profile.html` - Modify: `src/porchlight/static/webauthn.js` **Step 1: Modify base.html** Add CSRF meta tag and htmx headers to `base.html`: ```html {% block title %}Porchlight{% endblock %} ``` The `hx-headers` attribute on `` makes htmx automatically include the CSRF token as a header on every htmx request. The `` tag is for vanilla JS fetch calls (webauthn.js). **Step 2: Add hidden field to login.html** In the password form (after line 12): ```html
``` Note: htmx will send the `X-CSRF-Token` header (from `hx-headers` on body), but the hidden field is belt-and-suspenders for the form data path and also protects against login CSRF per OWASP. **Step 3: Add hidden field to consent.html** In the consent form (after line 10): ```html ``` **Step 4: Add hidden field to credentials.html** In the password form (after line 41): ```html ``` **Step 5: Add hidden field to profile.html** In the profile form (after line 16): ```html ``` **Step 6: Update webauthn.js** Add CSRF header to all fetch calls. Add this helper at the top of the file (after the base64 helpers): ```javascript function getCsrfToken() { const meta = document.querySelector('meta[name="csrf-token"]'); return meta ? meta.getAttribute('content') : ''; } ``` Then update all `fetch()` calls to include the header: In `beginRegistration()` — the begin fetch (line 22-25): ```javascript const beginRes = await fetch('/manage/credentials/webauthn/begin', { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-CSRF-Token': getCsrfToken() }, }); ``` In `beginRegistration()` — the complete fetch (line 58-62): ```javascript const completeRes = await fetch('/manage/credentials/webauthn/complete', { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-CSRF-Token': getCsrfToken() }, body: JSON.stringify(body), }); ``` In `beginAuthentication()` — the complete fetch (line 116-120): ```javascript const completeRes = await fetch('/login/webauthn/complete', { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-CSRF-Token': getCsrfToken() }, body: JSON.stringify(body), }); ``` **Step 7: Run tests** Run: `uv run python -m pytest -v` Expected: Many existing tests will FAIL because they don't include CSRF tokens. This is expected — Task 4 fixes them. **Step 8: Commit** ```bash git add src/porchlight/templates/ src/porchlight/static/webauthn.js git commit -m "feat: add CSRF tokens to templates and JS fetch calls" ``` --- ### Task 4: Update Existing Tests All existing tests that POST to session-protected endpoints need to include CSRF tokens. The approach: create a test helper that seeds a CSRF token into the session cookie. **Files:** - Modify: `tests/conftest.py` - Modify: All test files that POST/DELETE to session endpoints **Step 1: Add CSRF helper to conftest** The test `client` fixture needs to support CSRF. Since tests use `httpx.AsyncClient` with ASGI transport, we need a helper to get a valid session cookie with a CSRF token. Add to `tests/conftest.py`: ```python async def get_csrf_token(client: AsyncClient) -> tuple[str, dict]: """Get a CSRF token by visiting a page that generates one. Returns (token, cookies) tuple. """ resp = await client.get("/login") # Extract CSRF token from the HTML meta tag import re match = re.search(r'name="csrf-token" content="([^"]+)"', resp.text) assert match, "CSRF meta tag not found in page" token = match.group(1) return token, dict(resp.cookies) ``` **Step 2: Update each test file** For each test file that makes POST/DELETE requests, add CSRF token to the request. The pattern varies: For **form POSTs** (htmx-style): include `csrf_token` in form data AND/OR `X-CSRF-Token` header. For **JSON POSTs**: include `X-CSRF-Token` header. For **DELETE requests**: include `X-CSRF-Token` header. The specific test files that need updating: - `tests/test_auth_routes/test_password_login.py` — POST /login/password, POST /logout - `tests/test_auth_routes/test_webauthn_login.py` — POST /login/webauthn/complete - `tests/test_auth_routes/test_manage_password_credential.py` — POST and DELETE /manage/credentials/password - `tests/test_auth_routes/test_manage_webauthn_credential.py` — POST /manage/credentials/webauthn/* - `tests/test_auth_routes/test_last_credential_guard.py` — DELETE endpoints - `tests/test_oidc/test_consent_flow.py` — POST /consent - `tests/test_oidc/test_e2e_flow.py` — POST /consent, POST /login/password - `tests/test_oidc/test_token.py` — POST /consent (in helper) - `tests/test_oidc/test_userinfo.py` — POST /consent (in helper) - `tests/test_auth_routes/test_manage_credentials_page.py` — GET only, may not need changes Each test that POSTs needs to: 1. First GET a page to obtain a CSRF token and session cookie 2. Include the token in the POST **Step 3: Run full test suite** Run: `uv run python -m pytest -v` Expected: All 189+ tests pass **Step 4: Commit** ```bash git add tests/ git commit -m "test: update all tests to include CSRF tokens" ``` --- ### Task 5: Quality Check **Step 1: Format and lint** Run: `uv run ruff format src/ tests/ && uv run ruff check src/ tests/ --fix` **Step 2: Type check** Run: `uv run ty check src/` **Step 3: Full test suite** Run: `uv run python -m pytest -v` Expected: All tests pass **Step 4: Fix any issues and commit** ```bash git add -A git commit -m "fix: address lint and type issues" ```