porchlight/docs/plans/2026-02-19-csrf-protection-plan.md
2026-02-19 11:32:51 +01:00

21 KiB

CSRF Protection Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Add OWASP-recommended Synchronizer Token Pattern CSRF protection to all session-authenticated state-changing endpoints.

Architecture: ASGI middleware validates CSRF tokens on all non-safe HTTP methods, skipping exempt paths (OIDC protocol endpoints). Tokens are stored in the session, injected into templates via Jinja2 globals, and sent by htmx via hx-headers and by fetch JS via a <meta> tag.

Tech Stack: FastAPI, Starlette SessionMiddleware, Jinja2, htmx, vanilla JS fetch


Task 1: CSRF Middleware

Files:

  • Create: src/porchlight/csrf.py
  • Test: tests/test_csrf.py

Step 1: Write the failing tests

Create tests/test_csrf.py:

import secrets

import pytest
from httpx import ASGITransport, AsyncClient
from starlette.applications import Starlette
from starlette.middleware.sessions import SessionMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse, PlainTextResponse
from starlette.routing import Route

from porchlight.csrf import CSRFMiddleware, generate_csrf_token


def _make_app(exempt_paths: set[str] | None = None, check_origin: str | None = None) -> Starlette:
    """Create a minimal Starlette app with session + CSRF middleware for testing."""

    async def get_token(request: Request) -> JSONResponse:
        token = generate_csrf_token(request)
        return JSONResponse({"token": token})

    async def post_action(request: Request) -> PlainTextResponse:
        return PlainTextResponse("ok")

    async def exempt_action(request: Request) -> PlainTextResponse:
        return PlainTextResponse("ok")

    app = Starlette(
        routes=[
            Route("/get-token", get_token),
            Route("/action", post_action, methods=["POST"]),
            Route("/action", get_token, methods=["GET"]),
            Route("/exempt", exempt_action, methods=["POST"]),
        ],
    )
    app.add_middleware(
        CSRFMiddleware,
        exempt_paths=exempt_paths or {"/exempt"},
        check_origin=check_origin,
    )
    app.add_middleware(SessionMiddleware, secret_key="test-secret")
    return app


@pytest.fixture
def app() -> Starlette:
    return _make_app()


@pytest.fixture
async def tc(app: Starlette) -> AsyncClient:
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
        yield ac


async def _get_token_and_cookies(tc: AsyncClient) -> tuple[str, dict]:
    """Helper: GET /get-token to obtain a CSRF token and session cookie."""
    resp = await tc.get("/get-token")
    assert resp.status_code == 200
    token = resp.json()["token"]
    return token, dict(resp.cookies)


class TestCSRFValidation:
    async def test_safe_methods_pass_through(self, tc: AsyncClient) -> None:
        resp = await tc.get("/action")
        assert resp.status_code == 200

    async def test_post_without_token_returns_403(self, tc: AsyncClient) -> None:
        resp = await tc.post("/action")
        assert resp.status_code == 403
        assert "CSRF" in resp.text

    async def test_post_with_valid_form_token(self, tc: AsyncClient) -> None:
        token, cookies = await _get_token_and_cookies(tc)
        resp = await tc.post("/action", data={"csrf_token": token}, cookies=cookies)
        assert resp.status_code == 200

    async def test_post_with_valid_header_token(self, tc: AsyncClient) -> None:
        token, cookies = await _get_token_and_cookies(tc)
        resp = await tc.post(
            "/action",
            headers={"X-CSRF-Token": token},
            cookies=cookies,
        )
        assert resp.status_code == 200

    async def test_post_with_wrong_token_returns_403(self, tc: AsyncClient) -> None:
        _token, cookies = await _get_token_and_cookies(tc)
        resp = await tc.post("/action", data={"csrf_token": "wrong"}, cookies=cookies)
        assert resp.status_code == 403

    async def test_exempt_path_skips_validation(self, tc: AsyncClient) -> None:
        resp = await tc.post("/exempt")
        assert resp.status_code == 200


class TestOriginCheck:
    async def test_matching_origin_passes(self) -> None:
        app = _make_app(check_origin="http://testserver")
        transport = ASGITransport(app=app)
        async with AsyncClient(transport=transport, base_url="http://testserver") as tc:
            token, cookies = await _get_token_and_cookies(tc)
            resp = await tc.post(
                "/action",
                data={"csrf_token": token},
                cookies=cookies,
                headers={"Origin": "http://testserver"},
            )
            assert resp.status_code == 200

    async def test_mismatched_origin_returns_403(self) -> None:
        app = _make_app(check_origin="http://testserver")
        transport = ASGITransport(app=app)
        async with AsyncClient(transport=transport, base_url="http://testserver") as tc:
            token, cookies = await _get_token_and_cookies(tc)
            resp = await tc.post(
                "/action",
                data={"csrf_token": token},
                cookies=cookies,
                headers={"Origin": "http://evil.com"},
            )
            assert resp.status_code == 403
            assert "Origin" in resp.text

    async def test_no_origin_falls_back_to_token_check(self) -> None:
        app = _make_app(check_origin="http://testserver")
        transport = ASGITransport(app=app)
        async with AsyncClient(transport=transport, base_url="http://testserver") as tc:
            token, cookies = await _get_token_and_cookies(tc)
            resp = await tc.post(
                "/action",
                data={"csrf_token": token},
                cookies=cookies,
            )
            assert resp.status_code == 200


class TestGenerateCSRFToken:
    async def test_generates_token_and_stores_in_session(self, tc: AsyncClient) -> None:
        resp = await tc.get("/get-token")
        token = resp.json()["token"]
        assert len(token) > 20

    async def test_returns_same_token_within_session(self, tc: AsyncClient) -> None:
        resp1 = await tc.get("/get-token")
        token1 = resp1.json()["token"]
        cookies = dict(resp1.cookies)
        resp2 = await tc.get("/get-token", cookies=cookies)
        token2 = resp2.json()["token"]
        assert token1 == token2

Step 2: Run tests to verify they fail

Run: uv run python -m pytest tests/test_csrf.py -v Expected: FAIL — ImportError: cannot import name 'CSRFMiddleware' from 'porchlight.csrf'

Step 3: Write the implementation

Create src/porchlight/csrf.py:

import hmac
import logging
import secrets
from urllib.parse import urlparse

from starlette.requests import Request
from starlette.responses import HTMLResponse
from starlette.types import ASGIApp, Message, Receive, Scope, Send

logger = logging.getLogger(__name__)

SAFE_METHODS = frozenset({"GET", "HEAD", "OPTIONS"})
SESSION_KEY = "csrf_token"


def generate_csrf_token(request: Request) -> str:
    """Get or create a CSRF token for the current session."""
    token = request.session.get(SESSION_KEY)
    if token is None:
        token = secrets.token_urlsafe(32)
        request.session[SESSION_KEY] = token
    return token


class CSRFMiddleware:
    """ASGI middleware implementing the Synchronizer Token Pattern.

    Validates that non-safe requests include a CSRF token matching the
    one stored in the session. The token can be sent as:
    - Form field: csrf_token
    - Header: X-CSRF-Token

    Optionally checks the Origin header as defense-in-depth.
    """

    def __init__(
        self,
        app: ASGIApp,
        exempt_paths: set[str] | None = None,
        check_origin: str | None = None,
    ) -> None:
        self.app = app
        self.exempt_paths = exempt_paths or set()
        self.check_origin = check_origin

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        request = Request(scope, receive, send)
        method = request.method

        if method in SAFE_METHODS:
            await self.app(scope, receive, send)
            return

        if request.url.path in self.exempt_paths:
            await self.app(scope, receive, send)
            return

        # Defense-in-depth: Origin check
        if self.check_origin:
            origin = request.headers.get("origin")
            if origin and origin != "null":
                expected_origin = self.check_origin.rstrip("/")
                if origin.rstrip("/") != expected_origin:
                    logger.warning("CSRF origin mismatch on %s: %s", request.url.path, origin)
                    response = HTMLResponse(
                        "<h1>403 Forbidden</h1><p>Origin mismatch</p>",
                        status_code=403,
                    )
                    await response(scope, receive, send)
                    return

        # Token validation
        session_token = request.session.get(SESSION_KEY)
        if not session_token:
            logger.warning("CSRF token missing from session on %s", request.url.path)
            response = HTMLResponse(
                "<h1>403 Forbidden</h1><p>CSRF validation failed</p>",
                status_code=403,
            )
            await response(scope, receive, send)
            return

        # Try header first (works for both JSON and form requests)
        submitted_token = request.headers.get("x-csrf-token")

        # Fall back to form body
        if not submitted_token:
            # We need to read the body for form data
            form = await request.form()
            submitted_token = form.get("csrf_token")
            if isinstance(submitted_token, str):
                pass  # good
            else:
                submitted_token = None

        if not submitted_token or not hmac.compare_digest(submitted_token, session_token):
            logger.warning("CSRF token mismatch on %s", request.url.path)
            response = HTMLResponse(
                "<h1>403 Forbidden</h1><p>CSRF validation failed</p>",
                status_code=403,
            )
            await response(scope, receive, send)
            return

        await self.app(scope, receive, send)

Step 4: Run tests to verify they pass

Run: uv run python -m pytest tests/test_csrf.py -v Expected: All pass

Step 5: Commit

git add src/porchlight/csrf.py tests/test_csrf.py
git commit -m "feat: add CSRF middleware with synchronizer token pattern"

Task 2: Wire Middleware + Session Hardening + Template Globals

Files:

  • Modify: src/porchlight/app.py:94-128
  • Modify: src/porchlight/config.py:23-62
  • Test: tests/test_app.py (existing tests should still pass)
  • Test: tests/test_config.py (existing tests should still pass)

Step 1: Write the failing tests

Add to tests/test_csrf.py:

class TestAppIntegration:
    """Test CSRF middleware is wired into the real app."""

    async def test_post_without_csrf_token_returns_403(self, client: AsyncClient) -> None:
        """Any POST to a session-protected endpoint without CSRF token gets 403."""
        resp = await client.post("/login/password", data={"username": "x", "password": "y"})
        assert resp.status_code == 403

    async def test_exempt_token_endpoint(self, client: AsyncClient) -> None:
        """The /token endpoint is exempt from CSRF (uses client auth)."""
        resp = await client.post("/token", data={"grant_type": "authorization_code", "code": "fake"})
        # Should NOT be 403 — it should fail for auth reasons, not CSRF
        assert resp.status_code != 403

These tests need the client fixture from tests/conftest.py.

Step 2: Run tests to verify they fail

Run: uv run python -m pytest tests/test_csrf.py::TestAppIntegration -v Expected: First test FAILS (currently returns 200 with error HTML, not 403). Second test passes trivially.

Step 3: Add config field and wire middleware

Modify src/porchlight/config.py — add session_https_only field after session_secret:

    # Session
    session_secret: str | None = None  # If None, a random secret is generated per process
    session_https_only: bool = True

Modify src/porchlight/app.py:

  1. Add import:
from porchlight.csrf import CSRFMiddleware, generate_csrf_token
  1. Replace session middleware block (lines 108-110) with:
    # Session middleware
    session_secret = settings.session_secret or secrets.token_hex(32)
    app.add_middleware(
        CSRFMiddleware,
        exempt_paths={"/token", "/userinfo"},
        check_origin=settings.issuer,
    )
    app.add_middleware(
        SessionMiddleware,
        secret_key=session_secret,
        same_site="lax",
        https_only=settings.session_https_only,
    )  # type: ignore[arg-type]

Note: Starlette middleware order is reversed — SessionMiddleware is added second but runs first (outermost). CSRF runs after session is available.

  1. Add CSRF token to template globals (after templates creation, line 113):
    # Templates
    templates = Jinja2Templates(directory=str(PACKAGE_DIR / "templates"))

    def csrf_token_processor(request: Request) -> str:
        return generate_csrf_token(request)

    templates.env.globals["csrf_token_processor"] = csrf_token_processor
    app.state.templates = templates

Step 4: Run tests to verify they pass

Run: uv run python -m pytest tests/test_csrf.py -v Expected: All pass

Run: uv run python -m pytest tests/test_app.py tests/test_config.py -v Expected: All pass (config test for session_https_only default)

Step 5: Commit

git add src/porchlight/app.py src/porchlight/config.py tests/test_csrf.py
git commit -m "feat: wire CSRF middleware and harden session cookie"

Task 3: Template Changes

Files:

  • Modify: src/porchlight/templates/base.html
  • Modify: src/porchlight/templates/login.html
  • Modify: src/porchlight/templates/consent.html
  • Modify: src/porchlight/templates/manage/credentials.html
  • Modify: src/porchlight/templates/manage/profile.html
  • Modify: src/porchlight/static/webauthn.js

Step 1: Modify base.html

Add CSRF meta tag and htmx headers to base.html:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <meta name="csrf-token" content="{{ csrf_token_processor(request) }}">
    <title>{% block title %}Porchlight{% endblock %}</title>
    <link rel="icon" type="image/png" href="/static/favicon.png">
    <link rel="stylesheet" href="/static/style.css">
</head>
<body hx-headers='{"X-CSRF-Token": "{{ csrf_token_processor(request) }}"}'>
    <a class="skip-link" href="#main">Skip to content</a>

The hx-headers attribute on <body> makes htmx automatically include the CSRF token as a header on every htmx request. The <meta> tag is for vanilla JS fetch calls (webauthn.js).

Step 2: Add hidden field to login.html

In the password form (after line 12):

    <form hx-post="/login/password" hx-target="#login-error" hx-swap="innerHTML">
        <input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">

Note: htmx will send the X-CSRF-Token header (from hx-headers on body), but the hidden field is belt-and-suspenders for the form data path and also protects against login CSRF per OWASP.

Step 3: Add hidden field to consent.html

In the consent form (after line 10):

    <form method="post" action="/consent">
        <input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">

Step 4: Add hidden field to credentials.html

In the password form (after line 41):

        <form hx-post="/manage/credentials/password" hx-target="#password-section" hx-swap="innerHTML">
            <input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">

Step 5: Add hidden field to profile.html

In the profile form (after line 16):

        <form hx-post="/manage/profile" hx-target="#profile-status" hx-swap="innerHTML">
            <input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">

Step 6: Update webauthn.js

Add CSRF header to all fetch calls. Add this helper at the top of the file (after the base64 helpers):

function getCsrfToken() {
  const meta = document.querySelector('meta[name="csrf-token"]');
  return meta ? meta.getAttribute('content') : '';
}

Then update all fetch() calls to include the header:

In beginRegistration() — the begin fetch (line 22-25):

    const beginRes = await fetch('/manage/credentials/webauthn/begin', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json', 'X-CSRF-Token': getCsrfToken() },
    });

In beginRegistration() — the complete fetch (line 58-62):

    const completeRes = await fetch('/manage/credentials/webauthn/complete', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json', 'X-CSRF-Token': getCsrfToken() },
      body: JSON.stringify(body),
    });

In beginAuthentication() — the complete fetch (line 116-120):

    const completeRes = await fetch('/login/webauthn/complete', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json', 'X-CSRF-Token': getCsrfToken() },
      body: JSON.stringify(body),
    });

Step 7: Run tests

Run: uv run python -m pytest -v Expected: Many existing tests will FAIL because they don't include CSRF tokens. This is expected — Task 4 fixes them.

Step 8: Commit

git add src/porchlight/templates/ src/porchlight/static/webauthn.js
git commit -m "feat: add CSRF tokens to templates and JS fetch calls"

Task 4: Update Existing Tests

All existing tests that POST to session-protected endpoints need to include CSRF tokens. The approach: create a test helper that seeds a CSRF token into the session cookie.

Files:

  • Modify: tests/conftest.py
  • Modify: All test files that POST/DELETE to session endpoints

Step 1: Add CSRF helper to conftest

The test client fixture needs to support CSRF. Since tests use httpx.AsyncClient with ASGI transport, we need a helper to get a valid session cookie with a CSRF token.

Add to tests/conftest.py:

async def get_csrf_token(client: AsyncClient) -> tuple[str, dict]:
    """Get a CSRF token by visiting a page that generates one.

    Returns (token, cookies) tuple.
    """
    resp = await client.get("/login")
    # Extract CSRF token from the HTML meta tag
    import re
    match = re.search(r'name="csrf-token" content="([^"]+)"', resp.text)
    assert match, "CSRF meta tag not found in page"
    token = match.group(1)
    return token, dict(resp.cookies)

Step 2: Update each test file

For each test file that makes POST/DELETE requests, add CSRF token to the request. The pattern varies:

For form POSTs (htmx-style): include csrf_token in form data AND/OR X-CSRF-Token header.

For JSON POSTs: include X-CSRF-Token header.

For DELETE requests: include X-CSRF-Token header.

The specific test files that need updating:

  • tests/test_auth_routes/test_password_login.py — POST /login/password, POST /logout
  • tests/test_auth_routes/test_webauthn_login.py — POST /login/webauthn/complete
  • tests/test_auth_routes/test_manage_password_credential.py — POST and DELETE /manage/credentials/password
  • tests/test_auth_routes/test_manage_webauthn_credential.py — POST /manage/credentials/webauthn/*
  • tests/test_auth_routes/test_last_credential_guard.py — DELETE endpoints
  • tests/test_oidc/test_consent_flow.py — POST /consent
  • tests/test_oidc/test_e2e_flow.py — POST /consent, POST /login/password
  • tests/test_oidc/test_token.py — POST /consent (in helper)
  • tests/test_oidc/test_userinfo.py — POST /consent (in helper)
  • tests/test_auth_routes/test_manage_credentials_page.py — GET only, may not need changes

Each test that POSTs needs to:

  1. First GET a page to obtain a CSRF token and session cookie
  2. Include the token in the POST

Step 3: Run full test suite

Run: uv run python -m pytest -v Expected: All 189+ tests pass

Step 4: Commit

git add tests/
git commit -m "test: update all tests to include CSRF tokens"

Task 5: Quality Check

Step 1: Format and lint

Run: uv run ruff format src/ tests/ && uv run ruff check src/ tests/ --fix

Step 2: Type check

Run: uv run ty check src/

Step 3: Full test suite

Run: uv run python -m pytest -v Expected: All tests pass

Step 4: Fix any issues and commit

git add -A
git commit -m "fix: address lint and type issues"