Compare commits

..

No commits in common. "cf2754f3024d1aad6eb2af673e4f17ff22cb497b" and "baef5e0e2efcf898909759068347c0bb26b20e8c" have entirely different histories.

13 changed files with 24 additions and 162 deletions

View file

@ -8,7 +8,6 @@ from urllib.parse import urlparse
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fido2.webauthn import UserVerificationRequirement
from slowapi.errors import RateLimitExceeded
from starlette.middleware.sessions import SessionMiddleware
from starlette.requests import Request
@ -56,7 +55,6 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
rp_id=rp_id,
rp_name=app.title,
origin=settings.issuer,
user_verification=UserVerificationRequirement(settings.webauthn_user_verification),
)
app.state.magic_link_service = MagicLinkService(
@ -82,8 +80,8 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
}
oidc_server.keyjar.add_symmetric(client_id, client_cfg.client_secret)
# Register management client (stable secret tied to the session secret)
manage_secret = app.state.session_secret
# Register management client
manage_secret = settings.session_secret or secrets.token_hex(32)
oidc_server.context.cdb[settings.manage_client_id] = {
"client_id": settings.manage_client_id,
"client_secret": manage_secret,
@ -101,21 +99,6 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
raise NotImplementedError("MongoDB backend not yet implemented")
def _resolve_session_secret(settings: Settings) -> str:
"""Return the session signing secret, requiring one in production.
A random per-process secret silently invalidates sessions on restart and
rotates the management client secret, so it is only acceptable for local
development (debug or a localhost issuer).
"""
if settings.session_secret:
return settings.session_secret
host = urlparse(settings.issuer).hostname or ""
if settings.debug or host in ("localhost", "127.0.0.1", "::1"):
return secrets.token_hex(32)
raise RuntimeError("OIDC_OP_SESSION_SECRET must be set in production (non-debug, non-localhost issuer).")
def create_app(settings: Settings | None = None) -> FastAPI:
if settings is None:
settings = Settings()
@ -131,8 +114,7 @@ def create_app(settings: Settings | None = None) -> FastAPI:
app.state.settings = settings
# Session middleware
session_secret = _resolve_session_secret(settings)
app.state.session_secret = session_secret
session_secret = settings.session_secret or secrets.token_hex(32)
app.add_middleware(
CSRFMiddleware, # ty: ignore[invalid-argument-type]
exempt_paths={"/token", "/userinfo"},

View file

@ -23,19 +23,6 @@ def _login_redirect_target(request: Request) -> str:
return "/manage/credentials"
def _is_sign_count_rollback(stored: int, presented: int) -> bool:
"""Detect a WebAuthn signature-counter rollback (cloned authenticator/replay).
Authenticators that don't maintain a counter (incl. synced passkeys) report
0; when both stored and presented are 0 the counter is meaningless and no
rollback can be inferred. Otherwise the presented counter must strictly
exceed the stored one.
"""
if stored == 0 and presented == 0:
return False
return presented <= stored
def _establish_authenticated_session(request: Request, user: User) -> None:
"""Reset the session before recording the authenticated identity.
@ -157,7 +144,7 @@ async def register_magic_link(request: Request, token: str) -> Response:
return RedirectResponse("/manage/credentials?setup=1", status_code=303)
@router.post("/login/webauthn/begin")
@router.get("/login/webauthn/begin")
async def login_webauthn_begin(request: Request) -> Response:
webauthn_service = request.app.state.webauthn_service
@ -169,7 +156,7 @@ async def login_webauthn_begin(request: Request) -> Response:
@router.post("/login/webauthn/complete")
@limiter.limit("10/minute")
async def login_webauthn_complete(request: Request) -> Response: # noqa: PLR0911
async def login_webauthn_complete(request: Request) -> Response:
webauthn_service = request.app.state.webauthn_service
user_repo = request.app.state.user_repo
cred_repo = request.app.state.credential_repo
@ -207,9 +194,6 @@ async def login_webauthn_complete(request: Request) -> Response: # noqa: PLR091
stored = await cred_repo.get_webauthn_by_credential_id(matched_credential_id)
if stored is not None:
if _is_sign_count_rollback(stored.sign_count, new_counter):
# Counter went backwards — likely a cloned authenticator or replay.
return JSONResponse({"error": "Authentication failed"}, status_code=400)
stored.sign_count = new_counter
await cred_repo.update_webauthn(stored)

View file

@ -18,16 +18,9 @@ from fido2.webauthn import (
class WebAuthnService:
"""FIDO2 WebAuthn registration and authentication."""
def __init__(
self,
rp_id: str,
rp_name: str,
origin: str,
user_verification: UserVerificationRequirement = UserVerificationRequirement.PREFERRED,
) -> None:
def __init__(self, rp_id: str, rp_name: str, origin: str) -> None:
rp = PublicKeyCredentialRpEntity(name=rp_name, id=rp_id)
self._server = Fido2Server(rp, verify_origin=lambda o: o == origin)
self._user_verification = user_verification
def begin_registration(
self,
@ -46,7 +39,7 @@ class WebAuthnService:
user=user,
credentials=existing_credentials,
resident_key_requirement=ResidentKeyRequirement.REQUIRED,
user_verification=self._user_verification,
user_verification=UserVerificationRequirement.PREFERRED,
)
return dict(options), state
@ -71,7 +64,7 @@ class WebAuthnService:
"""
options, state = self._server.authenticate_begin(
credentials=credentials,
user_verification=self._user_verification,
user_verification=UserVerificationRequirement.PREFERRED,
)
return dict(options), state

View file

@ -49,10 +49,6 @@ class Settings(BaseSettings):
session_secret: str | None = None # If None, a random secret is generated per process
session_https_only: bool = True
# WebAuthn user verification requirement: "preferred" (default), "required",
# or "discouraged". Identity providers may want "required".
webauthn_user_verification: str = "preferred"
# Magic links
invite_ttl: int = 86400 # seconds

View file

@ -19,16 +19,6 @@ function getCsrfToken() {
return meta ? meta.getAttribute('content') : '';
}
// Render an alert with the message as text (never as HTML) to avoid XSS.
function showAlert(el, message) {
if (!el) return;
el.replaceChildren();
const div = document.createElement('div');
div.setAttribute('role', 'alert');
div.textContent = message;
el.appendChild(div);
}
async function beginRegistration() {
const statusEl = document.getElementById('webauthn-status');
@ -39,7 +29,7 @@ async function beginRegistration() {
headers: { 'Content-Type': 'application/json', 'X-CSRF-Token': getCsrfToken() },
});
if (!beginRes.ok) {
showAlert(statusEl, 'Failed to start registration');
if (statusEl) statusEl.innerHTML = '<div role="alert">Failed to start registration</div>';
return;
}
const options = await beginRes.json();
@ -84,7 +74,7 @@ async function beginRegistration() {
if (statusEl) statusEl.innerHTML = text;
}
} catch (err) {
showAlert(statusEl, 'Registration failed: ' + err.message);
if (statusEl) statusEl.innerHTML = '<div role="alert">Registration failed: ' + err.message + '</div>';
}
}
@ -93,13 +83,10 @@ async function beginAuthentication() {
try {
// Step 1: Get options from server (no username needed)
const beginRes = await fetch('/login/webauthn/begin', {
method: 'POST',
headers: { 'X-CSRF-Token': getCsrfToken() },
});
const beginRes = await fetch('/login/webauthn/begin');
if (!beginRes.ok) {
const data = await beginRes.json();
showAlert(statusEl, data.error || 'Failed to start authentication');
if (statusEl) statusEl.innerHTML = '<div role="alert">' + (data.error || 'Failed to start authentication') + '</div>';
return;
}
const options = await beginRes.json();
@ -142,10 +129,10 @@ async function beginAuthentication() {
window.location.href = data.redirect || '/manage/credentials';
} else {
const data = await completeRes.json().catch(function () { return {}; });
showAlert(statusEl, data.error || 'Authentication failed');
if (statusEl) statusEl.innerHTML = '<div role="alert">' + (data.error || 'Authentication failed') + '</div>';
}
} catch (err) {
showAlert(statusEl, 'Authentication failed: ' + err.message);
if (statusEl) statusEl.innerHTML = '<div role="alert">Authentication failed: ' + err.message + '</div>';
}
}

View file

@ -1,5 +0,0 @@
-- A WebAuthn credential_id must be globally unique: lookups by credential_id
-- alone (during usernameless authentication) must never resolve to the wrong
-- user. The table PK is (user_id, credential_id), which does not prevent the
-- same credential_id under two users, so add an explicit unique index.
CREATE UNIQUE INDEX ix_webauthn_credential_id ON webauthn_credentials (credential_id);

View file

@ -1,10 +1,7 @@
from unittest.mock import MagicMock
import pytest
from httpx import AsyncClient
from porchlight.app import create_app
from porchlight.config import Settings
from porchlight.dependencies import (
get_credential_repo,
get_magic_link_repo,
@ -59,19 +56,3 @@ async def test_dependency_functions() -> None:
assert get_user_repo(request) == "user_repo_sentinel"
assert get_credential_repo(request) == "credential_repo_sentinel"
assert get_magic_link_repo(request) == "magic_link_repo_sentinel"
def test_create_app_requires_session_secret_in_production() -> None:
settings = Settings(issuer="https://op.example.com", sqlite_path=":memory:", debug=False)
with pytest.raises(RuntimeError, match="SESSION_SECRET"):
create_app(settings)
def test_create_app_allows_missing_secret_on_localhost() -> None:
settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:")
assert create_app(settings) is not None
def test_create_app_allows_missing_secret_in_debug() -> None:
settings = Settings(issuer="https://op.example.com", sqlite_path=":memory:", debug=True)
assert create_app(settings) is not None

View file

@ -46,12 +46,10 @@ async def _setup_user_with_webauthn(
async def test_webauthn_login_begin_returns_options(client: AsyncClient) -> None:
"""Begin is a POST (mutates session) with no username — returns options
with empty allowCredentials."""
"""Begin is now GET with no username — returns options with empty allowCredentials."""
await _setup_user_with_webauthn(client)
token = await get_csrf_token(client)
res = await client.post("/login/webauthn/begin", headers={"X-CSRF-Token": token})
res = await client.get("/login/webauthn/begin")
assert res.status_code == 200
data = res.json()
assert "publicKey" in data
@ -61,8 +59,7 @@ async def test_webauthn_login_begin_returns_options(client: AsyncClient) -> None
async def test_webauthn_login_begin_has_user_verification_preferred(client: AsyncClient) -> None:
token = await get_csrf_token(client)
res = await client.post("/login/webauthn/begin", headers={"X-CSRF-Token": token})
res = await client.get("/login/webauthn/begin")
assert res.status_code == 200
data = res.json()
assert data["publicKey"]["userVerification"] == "preferred"
@ -84,9 +81,10 @@ async def test_webauthn_login_complete_returns_json_redirect(client: AsyncClient
_userid, _pk, _credential_id, _att = await _setup_user_with_webauthn(client)
# Begin to get state into session
token = await get_csrf_token(client)
res1 = await client.post("/login/webauthn/begin", headers={"X-CSRF-Token": token})
res1 = await client.get("/login/webauthn/begin")
assert res1.status_code == 200
token = await get_csrf_token(client)
# We can't easily complete the full assertion without browser interaction,
# but we verify the endpoint returns 400 JSON (not HTML) for bad assertions
res2 = await client.post(

View file

@ -1,22 +0,0 @@
from porchlight.authn.routes import _is_sign_count_rollback
def test_increasing_counter_is_not_rollback() -> None:
assert _is_sign_count_rollback(stored=5, presented=6) is False
def test_equal_counter_is_rollback() -> None:
assert _is_sign_count_rollback(stored=5, presented=5) is True
def test_lower_counter_is_rollback() -> None:
assert _is_sign_count_rollback(stored=5, presented=3) is True
def test_both_zero_sync_passkey_is_allowed() -> None:
# Sync passkeys (and counter-less authenticators) always report 0.
assert _is_sign_count_rollback(stored=0, presented=0) is False
def test_first_increment_from_zero_is_allowed() -> None:
assert _is_sign_count_rollback(stored=0, presented=1) is False

View file

@ -17,7 +17,6 @@ from fido2.webauthn import (
PublicKeyCredentialDescriptor,
PublicKeyCredentialType,
RegistrationResponse,
UserVerificationRequirement,
)
from porchlight.authn.webauthn import WebAuthnService
@ -195,19 +194,6 @@ def test_begin_authentication_prefers_user_verification() -> None:
assert pub_key["userVerification"] == "preferred"
def test_user_verification_is_configurable() -> None:
service = WebAuthnService(
rp_id=RP_ID,
rp_name=RP_NAME,
origin=ORIGIN,
user_verification=UserVerificationRequirement.REQUIRED,
)
reg, _ = service.begin_registration(user_id=b"user-123", username="alice")
assert reg["publicKey"]["authenticatorSelection"]["userVerification"] == "required"
auth, _ = service.begin_authentication()
assert auth["publicKey"]["userVerification"] == "required"
def test_begin_authentication_returns_options_and_state() -> None:
service = _make_service()
_, cred_id, _attested = _generate_credential()

View file

@ -19,7 +19,7 @@ scope = ["openid", "profile"]
toml_file = tmp_path / "test.toml"
toml_file.write_text(toml_content)
settings = Settings(_toml_file=str(toml_file), session_secret="x" * 32)
settings = Settings(_toml_file=str(toml_file))
app = create_app(settings)
async with (
@ -41,7 +41,7 @@ scope = ["openid", "profile"]
async def test_manage_app_always_registered() -> None:
"""The internal manage-app client is always registered, even without config file clients."""
settings = Settings(issuer="https://test.example.com", session_secret="x" * 32)
settings = Settings(issuer="https://test.example.com")
app = create_app(settings)
async with (

View file

@ -13,7 +13,7 @@ async def test_run_migrations_applies_initial() -> None:
async with aiosqlite.connect(":memory:") as db:
await db.execute("PRAGMA foreign_keys=ON")
count = await run_migrations(db, MIGRATIONS_DIR)
assert count == 3
assert count == 2
async with db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'") as cursor:
row = await cursor.fetchone()
assert row is not None
@ -24,7 +24,7 @@ async def test_run_migrations_skips_already_applied() -> None:
await db.execute("PRAGMA foreign_keys=ON")
first_count = await run_migrations(db, MIGRATIONS_DIR)
second_count = await run_migrations(db, MIGRATIONS_DIR)
assert first_count == 3
assert first_count == 2
assert second_count == 0

View file

@ -227,21 +227,3 @@ async def test_delete_webauthn_if_not_last_allows_when_others_exist(
assert await credential_repo.delete_webauthn_if_not_last(alice.userid, b"\x01") is True
assert len(await credential_repo.get_webauthn_by_user(alice.userid)) == 0
async def test_webauthn_credential_id_is_globally_unique(
credential_repo: SQLiteCredentialRepository, user_repo: SQLiteUserRepository
) -> None:
# The same credential_id must not be registrable for two different users,
# otherwise lookup-by-credential-id could resolve to the wrong account.
await user_repo.create(User(userid="u-alice", username="alice2"))
await user_repo.create(User(userid="u-bob", username="bob2"))
cred_id = b"\xde\xad\xbe\xef"
await credential_repo.create_webauthn(
WebAuthnCredential(user_id="u-alice", credential_id=cred_id, public_key=b"\x01")
)
with pytest.raises(DuplicateError):
await credential_repo.create_webauthn(
WebAuthnCredential(user_id="u-bob", credential_id=cred_id, public_key=b"\x02")
)