Compare commits
5 commits
baef5e0e2e
...
cf2754f302
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cf2754f302 | ||
|
|
c175633980 | ||
|
|
1571706d21 | ||
|
|
f03d509eb4 | ||
|
|
0f04a7daf9 |
13 changed files with 162 additions and 24 deletions
|
|
@ -8,6 +8,7 @@ from urllib.parse import urlparse
|
|||
from fastapi import FastAPI
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from fido2.webauthn import UserVerificationRequirement
|
||||
from slowapi.errors import RateLimitExceeded
|
||||
from starlette.middleware.sessions import SessionMiddleware
|
||||
from starlette.requests import Request
|
||||
|
|
@ -55,6 +56,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
|
|||
rp_id=rp_id,
|
||||
rp_name=app.title,
|
||||
origin=settings.issuer,
|
||||
user_verification=UserVerificationRequirement(settings.webauthn_user_verification),
|
||||
)
|
||||
|
||||
app.state.magic_link_service = MagicLinkService(
|
||||
|
|
@ -80,8 +82,8 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
|
|||
}
|
||||
oidc_server.keyjar.add_symmetric(client_id, client_cfg.client_secret)
|
||||
|
||||
# Register management client
|
||||
manage_secret = settings.session_secret or secrets.token_hex(32)
|
||||
# Register management client (stable secret tied to the session secret)
|
||||
manage_secret = app.state.session_secret
|
||||
oidc_server.context.cdb[settings.manage_client_id] = {
|
||||
"client_id": settings.manage_client_id,
|
||||
"client_secret": manage_secret,
|
||||
|
|
@ -99,6 +101,21 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
|
|||
raise NotImplementedError("MongoDB backend not yet implemented")
|
||||
|
||||
|
||||
def _resolve_session_secret(settings: Settings) -> str:
|
||||
"""Return the session signing secret, requiring one in production.
|
||||
|
||||
A random per-process secret silently invalidates sessions on restart and
|
||||
rotates the management client secret, so it is only acceptable for local
|
||||
development (debug or a localhost issuer).
|
||||
"""
|
||||
if settings.session_secret:
|
||||
return settings.session_secret
|
||||
host = urlparse(settings.issuer).hostname or ""
|
||||
if settings.debug or host in ("localhost", "127.0.0.1", "::1"):
|
||||
return secrets.token_hex(32)
|
||||
raise RuntimeError("OIDC_OP_SESSION_SECRET must be set in production (non-debug, non-localhost issuer).")
|
||||
|
||||
|
||||
def create_app(settings: Settings | None = None) -> FastAPI:
|
||||
if settings is None:
|
||||
settings = Settings()
|
||||
|
|
@ -114,7 +131,8 @@ def create_app(settings: Settings | None = None) -> FastAPI:
|
|||
app.state.settings = settings
|
||||
|
||||
# Session middleware
|
||||
session_secret = settings.session_secret or secrets.token_hex(32)
|
||||
session_secret = _resolve_session_secret(settings)
|
||||
app.state.session_secret = session_secret
|
||||
app.add_middleware(
|
||||
CSRFMiddleware, # ty: ignore[invalid-argument-type]
|
||||
exempt_paths={"/token", "/userinfo"},
|
||||
|
|
|
|||
|
|
@ -23,6 +23,19 @@ def _login_redirect_target(request: Request) -> str:
|
|||
return "/manage/credentials"
|
||||
|
||||
|
||||
def _is_sign_count_rollback(stored: int, presented: int) -> bool:
|
||||
"""Detect a WebAuthn signature-counter rollback (cloned authenticator/replay).
|
||||
|
||||
Authenticators that don't maintain a counter (incl. synced passkeys) report
|
||||
0; when both stored and presented are 0 the counter is meaningless and no
|
||||
rollback can be inferred. Otherwise the presented counter must strictly
|
||||
exceed the stored one.
|
||||
"""
|
||||
if stored == 0 and presented == 0:
|
||||
return False
|
||||
return presented <= stored
|
||||
|
||||
|
||||
def _establish_authenticated_session(request: Request, user: User) -> None:
|
||||
"""Reset the session before recording the authenticated identity.
|
||||
|
||||
|
|
@ -144,7 +157,7 @@ async def register_magic_link(request: Request, token: str) -> Response:
|
|||
return RedirectResponse("/manage/credentials?setup=1", status_code=303)
|
||||
|
||||
|
||||
@router.get("/login/webauthn/begin")
|
||||
@router.post("/login/webauthn/begin")
|
||||
async def login_webauthn_begin(request: Request) -> Response:
|
||||
webauthn_service = request.app.state.webauthn_service
|
||||
|
||||
|
|
@ -156,7 +169,7 @@ async def login_webauthn_begin(request: Request) -> Response:
|
|||
|
||||
@router.post("/login/webauthn/complete")
|
||||
@limiter.limit("10/minute")
|
||||
async def login_webauthn_complete(request: Request) -> Response:
|
||||
async def login_webauthn_complete(request: Request) -> Response: # noqa: PLR0911
|
||||
webauthn_service = request.app.state.webauthn_service
|
||||
user_repo = request.app.state.user_repo
|
||||
cred_repo = request.app.state.credential_repo
|
||||
|
|
@ -194,6 +207,9 @@ async def login_webauthn_complete(request: Request) -> Response:
|
|||
|
||||
stored = await cred_repo.get_webauthn_by_credential_id(matched_credential_id)
|
||||
if stored is not None:
|
||||
if _is_sign_count_rollback(stored.sign_count, new_counter):
|
||||
# Counter went backwards — likely a cloned authenticator or replay.
|
||||
return JSONResponse({"error": "Authentication failed"}, status_code=400)
|
||||
stored.sign_count = new_counter
|
||||
await cred_repo.update_webauthn(stored)
|
||||
|
||||
|
|
|
|||
|
|
@ -18,9 +18,16 @@ from fido2.webauthn import (
|
|||
class WebAuthnService:
|
||||
"""FIDO2 WebAuthn registration and authentication."""
|
||||
|
||||
def __init__(self, rp_id: str, rp_name: str, origin: str) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
rp_id: str,
|
||||
rp_name: str,
|
||||
origin: str,
|
||||
user_verification: UserVerificationRequirement = UserVerificationRequirement.PREFERRED,
|
||||
) -> None:
|
||||
rp = PublicKeyCredentialRpEntity(name=rp_name, id=rp_id)
|
||||
self._server = Fido2Server(rp, verify_origin=lambda o: o == origin)
|
||||
self._user_verification = user_verification
|
||||
|
||||
def begin_registration(
|
||||
self,
|
||||
|
|
@ -39,7 +46,7 @@ class WebAuthnService:
|
|||
user=user,
|
||||
credentials=existing_credentials,
|
||||
resident_key_requirement=ResidentKeyRequirement.REQUIRED,
|
||||
user_verification=UserVerificationRequirement.PREFERRED,
|
||||
user_verification=self._user_verification,
|
||||
)
|
||||
return dict(options), state
|
||||
|
||||
|
|
@ -64,7 +71,7 @@ class WebAuthnService:
|
|||
"""
|
||||
options, state = self._server.authenticate_begin(
|
||||
credentials=credentials,
|
||||
user_verification=UserVerificationRequirement.PREFERRED,
|
||||
user_verification=self._user_verification,
|
||||
)
|
||||
return dict(options), state
|
||||
|
||||
|
|
|
|||
|
|
@ -49,6 +49,10 @@ class Settings(BaseSettings):
|
|||
session_secret: str | None = None # If None, a random secret is generated per process
|
||||
session_https_only: bool = True
|
||||
|
||||
# WebAuthn user verification requirement: "preferred" (default), "required",
|
||||
# or "discouraged". Identity providers may want "required".
|
||||
webauthn_user_verification: str = "preferred"
|
||||
|
||||
# Magic links
|
||||
invite_ttl: int = 86400 # seconds
|
||||
|
||||
|
|
|
|||
|
|
@ -19,6 +19,16 @@ function getCsrfToken() {
|
|||
return meta ? meta.getAttribute('content') : '';
|
||||
}
|
||||
|
||||
// Render an alert with the message as text (never as HTML) to avoid XSS.
|
||||
function showAlert(el, message) {
|
||||
if (!el) return;
|
||||
el.replaceChildren();
|
||||
const div = document.createElement('div');
|
||||
div.setAttribute('role', 'alert');
|
||||
div.textContent = message;
|
||||
el.appendChild(div);
|
||||
}
|
||||
|
||||
async function beginRegistration() {
|
||||
const statusEl = document.getElementById('webauthn-status');
|
||||
|
||||
|
|
@ -29,7 +39,7 @@ async function beginRegistration() {
|
|||
headers: { 'Content-Type': 'application/json', 'X-CSRF-Token': getCsrfToken() },
|
||||
});
|
||||
if (!beginRes.ok) {
|
||||
if (statusEl) statusEl.innerHTML = '<div role="alert">Failed to start registration</div>';
|
||||
showAlert(statusEl, 'Failed to start registration');
|
||||
return;
|
||||
}
|
||||
const options = await beginRes.json();
|
||||
|
|
@ -74,7 +84,7 @@ async function beginRegistration() {
|
|||
if (statusEl) statusEl.innerHTML = text;
|
||||
}
|
||||
} catch (err) {
|
||||
if (statusEl) statusEl.innerHTML = '<div role="alert">Registration failed: ' + err.message + '</div>';
|
||||
showAlert(statusEl, 'Registration failed: ' + err.message);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -83,10 +93,13 @@ async function beginAuthentication() {
|
|||
|
||||
try {
|
||||
// Step 1: Get options from server (no username needed)
|
||||
const beginRes = await fetch('/login/webauthn/begin');
|
||||
const beginRes = await fetch('/login/webauthn/begin', {
|
||||
method: 'POST',
|
||||
headers: { 'X-CSRF-Token': getCsrfToken() },
|
||||
});
|
||||
if (!beginRes.ok) {
|
||||
const data = await beginRes.json();
|
||||
if (statusEl) statusEl.innerHTML = '<div role="alert">' + (data.error || 'Failed to start authentication') + '</div>';
|
||||
showAlert(statusEl, data.error || 'Failed to start authentication');
|
||||
return;
|
||||
}
|
||||
const options = await beginRes.json();
|
||||
|
|
@ -129,10 +142,10 @@ async function beginAuthentication() {
|
|||
window.location.href = data.redirect || '/manage/credentials';
|
||||
} else {
|
||||
const data = await completeRes.json().catch(function () { return {}; });
|
||||
if (statusEl) statusEl.innerHTML = '<div role="alert">' + (data.error || 'Authentication failed') + '</div>';
|
||||
showAlert(statusEl, data.error || 'Authentication failed');
|
||||
}
|
||||
} catch (err) {
|
||||
if (statusEl) statusEl.innerHTML = '<div role="alert">Authentication failed: ' + err.message + '</div>';
|
||||
showAlert(statusEl, 'Authentication failed: ' + err.message);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,5 @@
|
|||
-- A WebAuthn credential_id must be globally unique: lookups by credential_id
|
||||
-- alone (during usernameless authentication) must never resolve to the wrong
|
||||
-- user. The table PK is (user_id, credential_id), which does not prevent the
|
||||
-- same credential_id under two users, so add an explicit unique index.
|
||||
CREATE UNIQUE INDEX ix_webauthn_credential_id ON webauthn_credentials (credential_id);
|
||||
|
|
@ -1,7 +1,10 @@
|
|||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
from httpx import AsyncClient
|
||||
|
||||
from porchlight.app import create_app
|
||||
from porchlight.config import Settings
|
||||
from porchlight.dependencies import (
|
||||
get_credential_repo,
|
||||
get_magic_link_repo,
|
||||
|
|
@ -56,3 +59,19 @@ async def test_dependency_functions() -> None:
|
|||
assert get_user_repo(request) == "user_repo_sentinel"
|
||||
assert get_credential_repo(request) == "credential_repo_sentinel"
|
||||
assert get_magic_link_repo(request) == "magic_link_repo_sentinel"
|
||||
|
||||
|
||||
def test_create_app_requires_session_secret_in_production() -> None:
|
||||
settings = Settings(issuer="https://op.example.com", sqlite_path=":memory:", debug=False)
|
||||
with pytest.raises(RuntimeError, match="SESSION_SECRET"):
|
||||
create_app(settings)
|
||||
|
||||
|
||||
def test_create_app_allows_missing_secret_on_localhost() -> None:
|
||||
settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:")
|
||||
assert create_app(settings) is not None
|
||||
|
||||
|
||||
def test_create_app_allows_missing_secret_in_debug() -> None:
|
||||
settings = Settings(issuer="https://op.example.com", sqlite_path=":memory:", debug=True)
|
||||
assert create_app(settings) is not None
|
||||
|
|
|
|||
|
|
@ -46,10 +46,12 @@ async def _setup_user_with_webauthn(
|
|||
|
||||
|
||||
async def test_webauthn_login_begin_returns_options(client: AsyncClient) -> None:
|
||||
"""Begin is now GET with no username — returns options with empty allowCredentials."""
|
||||
"""Begin is a POST (mutates session) with no username — returns options
|
||||
with empty allowCredentials."""
|
||||
await _setup_user_with_webauthn(client)
|
||||
|
||||
res = await client.get("/login/webauthn/begin")
|
||||
token = await get_csrf_token(client)
|
||||
res = await client.post("/login/webauthn/begin", headers={"X-CSRF-Token": token})
|
||||
assert res.status_code == 200
|
||||
data = res.json()
|
||||
assert "publicKey" in data
|
||||
|
|
@ -59,7 +61,8 @@ async def test_webauthn_login_begin_returns_options(client: AsyncClient) -> None
|
|||
|
||||
|
||||
async def test_webauthn_login_begin_has_user_verification_preferred(client: AsyncClient) -> None:
|
||||
res = await client.get("/login/webauthn/begin")
|
||||
token = await get_csrf_token(client)
|
||||
res = await client.post("/login/webauthn/begin", headers={"X-CSRF-Token": token})
|
||||
assert res.status_code == 200
|
||||
data = res.json()
|
||||
assert data["publicKey"]["userVerification"] == "preferred"
|
||||
|
|
@ -81,10 +84,9 @@ async def test_webauthn_login_complete_returns_json_redirect(client: AsyncClient
|
|||
_userid, _pk, _credential_id, _att = await _setup_user_with_webauthn(client)
|
||||
|
||||
# Begin to get state into session
|
||||
res1 = await client.get("/login/webauthn/begin")
|
||||
assert res1.status_code == 200
|
||||
|
||||
token = await get_csrf_token(client)
|
||||
res1 = await client.post("/login/webauthn/begin", headers={"X-CSRF-Token": token})
|
||||
assert res1.status_code == 200
|
||||
# We can't easily complete the full assertion without browser interaction,
|
||||
# but we verify the endpoint returns 400 JSON (not HTML) for bad assertions
|
||||
res2 = await client.post(
|
||||
|
|
|
|||
22
tests/test_authn/test_sign_count.py
Normal file
22
tests/test_authn/test_sign_count.py
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
from porchlight.authn.routes import _is_sign_count_rollback
|
||||
|
||||
|
||||
def test_increasing_counter_is_not_rollback() -> None:
|
||||
assert _is_sign_count_rollback(stored=5, presented=6) is False
|
||||
|
||||
|
||||
def test_equal_counter_is_rollback() -> None:
|
||||
assert _is_sign_count_rollback(stored=5, presented=5) is True
|
||||
|
||||
|
||||
def test_lower_counter_is_rollback() -> None:
|
||||
assert _is_sign_count_rollback(stored=5, presented=3) is True
|
||||
|
||||
|
||||
def test_both_zero_sync_passkey_is_allowed() -> None:
|
||||
# Sync passkeys (and counter-less authenticators) always report 0.
|
||||
assert _is_sign_count_rollback(stored=0, presented=0) is False
|
||||
|
||||
|
||||
def test_first_increment_from_zero_is_allowed() -> None:
|
||||
assert _is_sign_count_rollback(stored=0, presented=1) is False
|
||||
|
|
@ -17,6 +17,7 @@ from fido2.webauthn import (
|
|||
PublicKeyCredentialDescriptor,
|
||||
PublicKeyCredentialType,
|
||||
RegistrationResponse,
|
||||
UserVerificationRequirement,
|
||||
)
|
||||
|
||||
from porchlight.authn.webauthn import WebAuthnService
|
||||
|
|
@ -194,6 +195,19 @@ def test_begin_authentication_prefers_user_verification() -> None:
|
|||
assert pub_key["userVerification"] == "preferred"
|
||||
|
||||
|
||||
def test_user_verification_is_configurable() -> None:
|
||||
service = WebAuthnService(
|
||||
rp_id=RP_ID,
|
||||
rp_name=RP_NAME,
|
||||
origin=ORIGIN,
|
||||
user_verification=UserVerificationRequirement.REQUIRED,
|
||||
)
|
||||
reg, _ = service.begin_registration(user_id=b"user-123", username="alice")
|
||||
assert reg["publicKey"]["authenticatorSelection"]["userVerification"] == "required"
|
||||
auth, _ = service.begin_authentication()
|
||||
assert auth["publicKey"]["userVerification"] == "required"
|
||||
|
||||
|
||||
def test_begin_authentication_returns_options_and_state() -> None:
|
||||
service = _make_service()
|
||||
_, cred_id, _attested = _generate_credential()
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ scope = ["openid", "profile"]
|
|||
toml_file = tmp_path / "test.toml"
|
||||
toml_file.write_text(toml_content)
|
||||
|
||||
settings = Settings(_toml_file=str(toml_file))
|
||||
settings = Settings(_toml_file=str(toml_file), session_secret="x" * 32)
|
||||
app = create_app(settings)
|
||||
|
||||
async with (
|
||||
|
|
@ -41,7 +41,7 @@ scope = ["openid", "profile"]
|
|||
|
||||
async def test_manage_app_always_registered() -> None:
|
||||
"""The internal manage-app client is always registered, even without config file clients."""
|
||||
settings = Settings(issuer="https://test.example.com")
|
||||
settings = Settings(issuer="https://test.example.com", session_secret="x" * 32)
|
||||
app = create_app(settings)
|
||||
|
||||
async with (
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ async def test_run_migrations_applies_initial() -> None:
|
|||
async with aiosqlite.connect(":memory:") as db:
|
||||
await db.execute("PRAGMA foreign_keys=ON")
|
||||
count = await run_migrations(db, MIGRATIONS_DIR)
|
||||
assert count == 2
|
||||
assert count == 3
|
||||
async with db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'") as cursor:
|
||||
row = await cursor.fetchone()
|
||||
assert row is not None
|
||||
|
|
@ -24,7 +24,7 @@ async def test_run_migrations_skips_already_applied() -> None:
|
|||
await db.execute("PRAGMA foreign_keys=ON")
|
||||
first_count = await run_migrations(db, MIGRATIONS_DIR)
|
||||
second_count = await run_migrations(db, MIGRATIONS_DIR)
|
||||
assert first_count == 2
|
||||
assert first_count == 3
|
||||
assert second_count == 0
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -227,3 +227,21 @@ async def test_delete_webauthn_if_not_last_allows_when_others_exist(
|
|||
|
||||
assert await credential_repo.delete_webauthn_if_not_last(alice.userid, b"\x01") is True
|
||||
assert len(await credential_repo.get_webauthn_by_user(alice.userid)) == 0
|
||||
|
||||
|
||||
async def test_webauthn_credential_id_is_globally_unique(
|
||||
credential_repo: SQLiteCredentialRepository, user_repo: SQLiteUserRepository
|
||||
) -> None:
|
||||
# The same credential_id must not be registrable for two different users,
|
||||
# otherwise lookup-by-credential-id could resolve to the wrong account.
|
||||
await user_repo.create(User(userid="u-alice", username="alice2"))
|
||||
await user_repo.create(User(userid="u-bob", username="bob2"))
|
||||
cred_id = b"\xde\xad\xbe\xef"
|
||||
|
||||
await credential_repo.create_webauthn(
|
||||
WebAuthnCredential(user_id="u-alice", credential_id=cred_id, public_key=b"\x01")
|
||||
)
|
||||
with pytest.raises(DuplicateError):
|
||||
await credential_repo.create_webauthn(
|
||||
WebAuthnCredential(user_id="u-bob", credential_id=cred_id, public_key=b"\x02")
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue