Compare commits
No commits in common. "27763d19ea6d11a1423b37e23f48fe9aaa100641" and "cf2754f3024d1aad6eb2af673e4f17ff22cb497b" have entirely different histories.
27763d19ea
...
cf2754f302
9 changed files with 10 additions and 231 deletions
|
|
@ -26,7 +26,6 @@ from porchlight.manage.routes import router as manage_router
|
|||
from porchlight.oidc.endpoints import router as oidc_router
|
||||
from porchlight.oidc.provider import create_oidc_server
|
||||
from porchlight.rate_limit import limiter
|
||||
from porchlight.security_headers import SecurityHeadersMiddleware
|
||||
from porchlight.store.sqlite.db import open_db
|
||||
from porchlight.store.sqlite.repositories import (
|
||||
SQLiteConsentRepository,
|
||||
|
|
@ -131,13 +130,6 @@ def create_app(settings: Settings | None = None) -> FastAPI:
|
|||
|
||||
app.state.settings = settings
|
||||
|
||||
# Security headers (outermost so they apply to every response)
|
||||
app.add_middleware(
|
||||
SecurityHeadersMiddleware, # ty: ignore[invalid-argument-type]
|
||||
csp=settings.content_security_policy,
|
||||
hsts=settings.session_https_only,
|
||||
)
|
||||
|
||||
# Session middleware
|
||||
session_secret = _resolve_session_secret(settings)
|
||||
app.state.session_secret = session_secret
|
||||
|
|
@ -151,7 +143,6 @@ def create_app(settings: Settings | None = None) -> FastAPI:
|
|||
secret_key=session_secret,
|
||||
same_site="lax",
|
||||
https_only=settings.session_https_only,
|
||||
max_age=settings.session_max_age,
|
||||
)
|
||||
|
||||
# Rate limiting
|
||||
|
|
|
|||
|
|
@ -48,12 +48,6 @@ class Settings(BaseSettings):
|
|||
# Session
|
||||
session_secret: str | None = None # If None, a random secret is generated per process
|
||||
session_https_only: bool = True
|
||||
session_max_age: int = 28800 # Cookie lifetime in seconds (default 8 hours)
|
||||
|
||||
# Content-Security-Policy applied to all responses.
|
||||
content_security_policy: str = (
|
||||
"default-src 'self'; img-src 'self' data: https:; frame-ancestors 'none'; base-uri 'self'; form-action 'self'"
|
||||
)
|
||||
|
||||
# WebAuthn user verification requirement: "preferred" (default), "required",
|
||||
# or "discouraged". Identity providers may want "required".
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
"""idpyoidc Server initialization."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from idpyoidc.server import Server
|
||||
|
|
@ -67,10 +66,7 @@ def _build_server_config(settings: Settings) -> dict:
|
|||
"expires_in": 3600,
|
||||
},
|
||||
"refresh_token": {
|
||||
# Rotate refresh tokens (mint a new one) and mint
|
||||
# access tokens, but do NOT mint fresh ID tokens on
|
||||
# refresh — re-authentication should issue ID tokens.
|
||||
"supports_minting": ["access_token", "refresh_token"],
|
||||
"supports_minting": ["access_token", "refresh_token", "id_token"],
|
||||
"expires_in": 86400,
|
||||
},
|
||||
},
|
||||
|
|
@ -148,32 +144,8 @@ def _build_server_config(settings: Settings) -> dict:
|
|||
}
|
||||
|
||||
|
||||
_PRIVATE_KEY_FILES = ("private_jwks.json", "token_jwks.json")
|
||||
|
||||
|
||||
def create_oidc_server(settings: Settings) -> Server:
|
||||
"""Create and configure an idpyoidc Server instance.
|
||||
|
||||
Private signing keys are written to ``signing_key_path``; lock the directory
|
||||
to 0700 and the private key files to 0600, and refuse to start if a
|
||||
pre-existing private key is group/world accessible (a key disclosure).
|
||||
"""
|
||||
key_path = Path(settings.signing_key_path)
|
||||
key_path.mkdir(parents=True, exist_ok=True)
|
||||
os.chmod(key_path, 0o700)
|
||||
|
||||
# Fail on pre-existing keys with loose permissions (left by a prior run).
|
||||
for name in _PRIVATE_KEY_FILES:
|
||||
f = key_path / name
|
||||
if f.exists() and (f.stat().st_mode & 0o077):
|
||||
raise RuntimeError(f"Insecure permission on {f}: private key is group/world accessible")
|
||||
|
||||
"""Create and configure an idpyoidc Server instance."""
|
||||
config = _build_server_config(settings)
|
||||
server = Server(conf=config)
|
||||
|
||||
# Lock down any keys idpyoidc just wrote (umask may have left them 0644).
|
||||
for name in _PRIVATE_KEY_FILES:
|
||||
f = key_path / name
|
||||
if f.exists():
|
||||
os.chmod(f, 0o600)
|
||||
return server
|
||||
|
|
|
|||
|
|
@ -1,42 +0,0 @@
|
|||
"""ASGI middleware adding baseline security response headers."""
|
||||
|
||||
from starlette.types import ASGIApp, Message, Receive, Scope, Send
|
||||
|
||||
|
||||
class SecurityHeadersMiddleware:
|
||||
"""Attach security headers to every HTTP response.
|
||||
|
||||
Sets Content-Security-Policy, X-Content-Type-Options, X-Frame-Options and
|
||||
Referrer-Policy on all responses, plus Strict-Transport-Security when the
|
||||
deployment is HTTPS. Existing headers are not overwritten.
|
||||
"""
|
||||
|
||||
def __init__(self, app: ASGIApp, csp: str, hsts: bool) -> None:
|
||||
self.app = app
|
||||
self._csp = csp
|
||||
self._hsts = hsts
|
||||
|
||||
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
|
||||
if scope["type"] != "http":
|
||||
await self.app(scope, receive, send)
|
||||
return
|
||||
|
||||
async def send_with_headers(message: Message) -> None:
|
||||
if message["type"] == "http.response.start":
|
||||
headers = message.setdefault("headers", [])
|
||||
existing = {name.decode().lower() for name, _ in headers}
|
||||
|
||||
def add(name: str, value: str) -> None:
|
||||
if name.lower() not in existing:
|
||||
headers.append((name.encode(), value.encode()))
|
||||
existing.add(name.lower())
|
||||
|
||||
add("content-security-policy", self._csp)
|
||||
add("x-content-type-options", "nosniff")
|
||||
add("x-frame-options", "DENY")
|
||||
add("referrer-policy", "strict-origin-when-cross-origin")
|
||||
if self._hsts:
|
||||
add("strict-transport-security", "max-age=63072000; includeSubDomains")
|
||||
await send(message)
|
||||
|
||||
await self.app(scope, receive, send_with_headers)
|
||||
|
|
@ -1,6 +1,5 @@
|
|||
import re
|
||||
from collections.abc import AsyncIterator
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
|
|
@ -11,13 +10,8 @@ from porchlight.rate_limit import limiter
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def settings(tmp_path: Path) -> Settings:
|
||||
return Settings(
|
||||
issuer="http://localhost:8000",
|
||||
sqlite_path=":memory:",
|
||||
session_https_only=False,
|
||||
signing_key_path=str(tmp_path / "keys"),
|
||||
)
|
||||
def settings() -> Settings:
|
||||
return Settings(issuer="http://localhost:8000", sqlite_path=":memory:", session_https_only=False)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
|
|
|||
|
|
@ -1,8 +1,7 @@
|
|||
from pathlib import Path
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
from httpx import AsyncClient
|
||||
|
||||
from porchlight.app import create_app
|
||||
from porchlight.config import Settings
|
||||
|
|
@ -76,36 +75,3 @@ def test_create_app_allows_missing_secret_on_localhost() -> None:
|
|||
def test_create_app_allows_missing_secret_in_debug() -> None:
|
||||
settings = Settings(issuer="https://op.example.com", sqlite_path=":memory:", debug=True)
|
||||
assert create_app(settings) is not None
|
||||
|
||||
|
||||
async def test_session_cookie_has_explicit_max_age(client: AsyncClient) -> None:
|
||||
# Visiting /login establishes a session (CSRF token), setting the cookie.
|
||||
res = await client.get("/login")
|
||||
set_cookies = res.headers.get_list("set-cookie")
|
||||
session_cookies = [c for c in set_cookies if c.startswith("session=")]
|
||||
assert session_cookies, "no session cookie set"
|
||||
assert "Max-Age=28800" in session_cookies[0]
|
||||
|
||||
|
||||
async def test_security_headers_present(client: AsyncClient) -> None:
|
||||
res = await client.get("/login")
|
||||
assert res.headers.get("x-content-type-options") == "nosniff"
|
||||
assert res.headers.get("x-frame-options") == "DENY"
|
||||
assert "default-src 'self'" in res.headers.get("content-security-policy", "")
|
||||
assert "frame-ancestors 'none'" in res.headers.get("content-security-policy", "")
|
||||
assert res.headers.get("referrer-policy") == "strict-origin-when-cross-origin"
|
||||
|
||||
|
||||
async def test_hsts_present_when_https_only(tmp_path: Path) -> None:
|
||||
settings = Settings(
|
||||
issuer="https://op.example.com",
|
||||
sqlite_path=":memory:",
|
||||
session_secret="x" * 32,
|
||||
session_https_only=True,
|
||||
signing_key_path=str(tmp_path / "keys"),
|
||||
)
|
||||
app = create_app(settings)
|
||||
transport = ASGITransport(app=app)
|
||||
async with app.router.lifespan_context(app), AsyncClient(transport=transport, base_url=settings.issuer) as ac:
|
||||
res = await ac.get("/health")
|
||||
assert "max-age=" in res.headers.get("strict-transport-security", "")
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ scope = ["openid", "profile"]
|
|||
toml_file = tmp_path / "test.toml"
|
||||
toml_file.write_text(toml_content)
|
||||
|
||||
settings = Settings(_toml_file=str(toml_file), session_secret="x" * 32, signing_key_path=str(tmp_path / "keys"))
|
||||
settings = Settings(_toml_file=str(toml_file), session_secret="x" * 32)
|
||||
app = create_app(settings)
|
||||
|
||||
async with (
|
||||
|
|
@ -39,11 +39,9 @@ scope = ["openid", "profile"]
|
|||
assert cdb_entry["allowed_scopes"] == ["openid", "profile"]
|
||||
|
||||
|
||||
async def test_manage_app_always_registered(tmp_path: Path) -> None:
|
||||
async def test_manage_app_always_registered() -> None:
|
||||
"""The internal manage-app client is always registered, even without config file clients."""
|
||||
settings = Settings(
|
||||
issuer="https://test.example.com", session_secret="x" * 32, signing_key_path=str(tmp_path / "keys")
|
||||
)
|
||||
settings = Settings(issuer="https://test.example.com", session_secret="x" * 32)
|
||||
app = create_app(settings)
|
||||
|
||||
async with (
|
||||
|
|
|
|||
|
|
@ -33,8 +33,8 @@ async def _setup_rp_and_user(app: FastAPI) -> None:
|
|||
"redirect_uris": [(REDIRECT_URI, {})],
|
||||
"response_types_supported": ["code"],
|
||||
"token_endpoint_auth_method": "client_secret_basic",
|
||||
"scope": ["openid", "profile", "email", "offline_access"],
|
||||
"allowed_scopes": ["openid", "profile", "email", "offline_access"],
|
||||
"scope": ["openid", "profile", "email"],
|
||||
"allowed_scopes": ["openid", "profile", "email"],
|
||||
"client_salt": secrets.token_hex(8),
|
||||
}
|
||||
oidc_server.keyjar.add_symmetric(CLIENT_ID, CLIENT_SECRET)
|
||||
|
|
@ -263,69 +263,3 @@ async def test_pkce_wrong_verifier_rejected(client: AsyncClient) -> None:
|
|||
},
|
||||
)
|
||||
assert res.status_code != 200, f"PKCE verifier mismatch should be rejected, got {res.status_code}: {res.text}"
|
||||
|
||||
|
||||
async def _authorize_with_offline_access(client: AsyncClient, state: str, nonce: str) -> str:
|
||||
"""Run the auth-code flow requesting offline_access, return the auth code."""
|
||||
scope = "openid profile offline_access"
|
||||
# offline_access requires prompt=consent per the OIDC spec.
|
||||
await client.get(
|
||||
"/authorization",
|
||||
params={
|
||||
"response_type": "code",
|
||||
"client_id": CLIENT_ID,
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
"scope": scope,
|
||||
"state": state,
|
||||
"nonce": nonce,
|
||||
"prompt": "consent",
|
||||
},
|
||||
follow_redirects=False,
|
||||
)
|
||||
token = await get_csrf_token(client)
|
||||
await client.post(
|
||||
"/login/password",
|
||||
data={"username": "alice", "password": "testpass"},
|
||||
headers={"HX-Request": "true", "X-CSRF-Token": token},
|
||||
)
|
||||
complete = await client.get("/authorization/complete", follow_redirects=False)
|
||||
assert "/consent" in complete.headers["location"]
|
||||
token = await get_csrf_token(client)
|
||||
consent = await client.post(
|
||||
"/consent",
|
||||
data={"action": "allow", "scope": ["openid", "profile", "offline_access"]},
|
||||
headers={"X-CSRF-Token": token},
|
||||
follow_redirects=False,
|
||||
)
|
||||
return parse_qs(urlparse(consent.headers["location"]).query)["code"][0]
|
||||
|
||||
|
||||
async def test_refresh_token_issued_only_with_offline_access(client: AsyncClient) -> None:
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
await _setup_rp_and_user(app)
|
||||
|
||||
# Without offline_access: no refresh token.
|
||||
code = await _login_and_authorize(client, secrets.token_urlsafe(8), secrets.token_urlsafe(8))
|
||||
no_offline = await _exchange_token(client, code)
|
||||
assert "refresh_token" not in no_offline
|
||||
|
||||
|
||||
async def test_refresh_grant_does_not_mint_id_token(client: AsyncClient) -> None:
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
await _setup_rp_and_user(app)
|
||||
|
||||
code = await _authorize_with_offline_access(client, secrets.token_urlsafe(8), secrets.token_urlsafe(8))
|
||||
token_data = await _exchange_token(client, code)
|
||||
assert "refresh_token" in token_data, "offline_access should yield a refresh token"
|
||||
|
||||
# Use the refresh token; the response must not contain a new ID token.
|
||||
auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()
|
||||
res = await client.post(
|
||||
"/token",
|
||||
data={"grant_type": "refresh_token", "refresh_token": token_data["refresh_token"]},
|
||||
headers={"Authorization": f"Basic {auth_header}", "Content-Type": "application/x-www-form-urlencoded"},
|
||||
)
|
||||
assert res.status_code == 200, res.text
|
||||
refreshed = res.json()
|
||||
assert "access_token" in refreshed
|
||||
assert "id_token" not in refreshed
|
||||
|
|
|
|||
|
|
@ -1,9 +1,6 @@
|
|||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from porchlight.config import Settings
|
||||
from porchlight.oidc.claims import PorchlightUserInfo
|
||||
from porchlight.oidc.provider import create_oidc_server
|
||||
|
|
@ -56,28 +53,3 @@ def test_create_server_userinfo_is_porchlight() -> None:
|
|||
assert isinstance(server.context.userinfo, PorchlightUserInfo)
|
||||
finally:
|
||||
shutil.rmtree(key_path, ignore_errors=True)
|
||||
|
||||
|
||||
def test_signing_key_files_have_strict_permissions(tmp_path: Path) -> None:
|
||||
key_path = tmp_path / "keys"
|
||||
settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:", signing_key_path=str(key_path))
|
||||
create_oidc_server(settings)
|
||||
|
||||
assert (key_path.stat().st_mode & 0o077) == 0, "key directory must not be group/world accessible"
|
||||
for name in ("private_jwks.json", "token_jwks.json"):
|
||||
f = key_path / name
|
||||
assert f.exists()
|
||||
assert (f.stat().st_mode & 0o077) == 0, f"{name} must be 0600"
|
||||
|
||||
|
||||
def test_startup_fails_on_world_readable_private_key(tmp_path: Path) -> None:
|
||||
key_path = tmp_path / "keys"
|
||||
key_path.mkdir()
|
||||
# Simulate a pre-existing private key left group/world readable.
|
||||
leaked = key_path / "private_jwks.json"
|
||||
leaked.write_text("{}")
|
||||
os.chmod(leaked, 0o644)
|
||||
|
||||
settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:", signing_key_path=str(key_path))
|
||||
with pytest.raises(RuntimeError, match="permission"):
|
||||
create_oidc_server(settings)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue