21 KiB
Authentication Services Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Implement backend authentication services: PasswordService (argon2), WebAuthnService (fido2), and MagicLinkService (token lifecycle).
Architecture: Three independent services with no cross-dependencies. PasswordService and WebAuthnService are pure logic (no repo dependency). MagicLinkService depends on MagicLinkRepository. All testable with in-memory SQLite or unit tests.
Tech Stack: argon2-cffi, python-fido2 v2.1, cryptography (for test key generation)
Quality gate: ./scripts/check.sh (ruff format, ruff check, ty check, pytest)
Task 1: PasswordService
Files:
- Create:
src/fastapi_oidc_op/authn/password.py - Create:
tests/test_authn/__init__.py - Create:
tests/test_authn/test_password.py
Step 1: Write the failing tests
# tests/test_authn/test_password.py
from argon2 import PasswordHasher
from fastapi_oidc_op.authn.password import PasswordService
def test_hash_returns_argon2_string() -> None:
service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
result = service.hash("correcthorse")
assert result.startswith("$argon2id$")
def test_verify_correct_password() -> None:
service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
hashed = service.hash("correcthorse")
assert service.verify(hashed, "correcthorse") is True
def test_verify_wrong_password() -> None:
service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
hashed = service.hash("correcthorse")
assert service.verify(hashed, "wrongpassword") is False
def test_verify_invalid_hash() -> None:
service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
assert service.verify("not-a-hash", "password") is False
def test_default_hasher() -> None:
service = PasswordService()
hashed = service.hash("test")
assert service.verify(hashed, "test") is True
Step 2: Run tests to verify they fail
Run: pytest tests/test_authn/test_password.py -v
Expected: FAIL — cannot import PasswordService
Step 3: Create __init__.py and write the implementation
# tests/test_authn/__init__.py
# (empty file)
# src/fastapi_oidc_op/authn/password.py
from argon2 import PasswordHasher
from argon2.exceptions import InvalidHashError, VerifyMismatchError
class PasswordService:
"""Argon2 password hashing and verification."""
def __init__(self, hasher: PasswordHasher | None = None) -> None:
self._hasher = hasher or PasswordHasher()
def hash(self, password: str) -> str:
"""Hash a password using argon2id. Returns the full hash string."""
return self._hasher.hash(password)
def verify(self, password_hash: str, password: str) -> bool:
"""Verify a password against an argon2 hash. Returns True if valid."""
try:
return self._hasher.verify(password_hash, password)
except (VerifyMismatchError, InvalidHashError):
return False
Step 4: Run tests to verify they pass
Run: pytest tests/test_authn/test_password.py -v
Expected: 5 PASSED
Step 5: Run quality gate
Run: ./scripts/check.sh
Expected: All green
Step 6: Commit
git add src/fastapi_oidc_op/authn/password.py tests/test_authn/__init__.py tests/test_authn/test_password.py
git commit -m "feat: add PasswordService with argon2 hash/verify"
Task 2: WebAuthnService
Files:
- Create:
src/fastapi_oidc_op/authn/webauthn.py - Create:
tests/test_authn/test_webauthn.py
Step 1: Write the failing tests
These tests build real cryptographic registration and authentication responses using fido2's factory methods and the cryptography library.
# tests/test_authn/test_webauthn.py
import os
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.hashes import SHA256
from fido2.cose import ES256
from fido2.utils import sha256
from fido2.webauthn import (
Aaguid,
AttestedCredentialData,
AttestationObject,
AuthenticationResponse,
AuthenticatorAssertionResponse,
AuthenticatorAttestationResponse,
AuthenticatorData,
CollectedClientData,
PublicKeyCredentialDescriptor,
PublicKeyCredentialType,
RegistrationResponse,
)
from fastapi_oidc_op.authn.webauthn import WebAuthnService
RP_ID = "localhost"
RP_NAME = "Test RP"
ORIGIN = "http://localhost:8000"
def _make_service() -> WebAuthnService:
return WebAuthnService(rp_id=RP_ID, rp_name=RP_NAME, origin=ORIGIN)
def _generate_credential() -> tuple[ec.EllipticCurvePrivateKey, bytes, AttestedCredentialData]:
"""Generate a test credential: (private_key, credential_id, attested_credential_data)."""
private_key = ec.generate_private_key(ec.SECP256R1())
cose_key = ES256.from_cryptography_key(private_key.public_key())
credential_id = os.urandom(32)
attested = AttestedCredentialData.create(
aaguid=Aaguid.NONE,
credential_id=credential_id,
public_key=cose_key,
)
return private_key, credential_id, attested
def _build_registration_response(
credential_id: bytes,
attested: AttestedCredentialData,
challenge: bytes,
) -> RegistrationResponse:
"""Build a valid registration response for the given challenge."""
auth_data = AuthenticatorData.create(
rp_id_hash=sha256(RP_ID.encode()),
flags=AuthenticatorData.FLAG.UP | AuthenticatorData.FLAG.AT,
counter=0,
credential_data=attested,
)
attestation_object = AttestationObject.create(fmt="none", auth_data=auth_data, att_stmt={})
client_data = CollectedClientData.create(
type=CollectedClientData.TYPE.CREATE,
challenge=challenge,
origin=ORIGIN,
)
return RegistrationResponse(
raw_id=credential_id,
response=AuthenticatorAttestationResponse(
client_data=client_data,
attestation_object=attestation_object,
),
)
def _build_authentication_response(
private_key: ec.EllipticCurvePrivateKey,
credential_id: bytes,
challenge: bytes,
counter: int = 1,
) -> AuthenticationResponse:
"""Build a valid authentication response signed with the private key."""
client_data = CollectedClientData.create(
type=CollectedClientData.TYPE.GET,
challenge=challenge,
origin=ORIGIN,
)
auth_data = AuthenticatorData.create(
rp_id_hash=sha256(RP_ID.encode()),
flags=AuthenticatorData.FLAG.UP,
counter=counter,
)
signature = private_key.sign(auth_data + client_data.hash, ec.ECDSA(SHA256()))
return AuthenticationResponse(
raw_id=credential_id,
response=AuthenticatorAssertionResponse(
client_data=client_data,
authenticator_data=auth_data,
signature=signature,
),
)
# --- Registration tests ---
def test_begin_registration_returns_options_and_state() -> None:
service = _make_service()
options, state = service.begin_registration(
user_id=b"user-123",
username="alice",
)
# Options should be a dict suitable for JSON serialization
assert "publicKey" in options
pub_key = options["publicKey"]
assert "challenge" in pub_key
assert "rp" in pub_key
assert "user" in pub_key
# State should be a dict with challenge
assert "challenge" in state
def test_complete_registration_returns_credential_data() -> None:
service = _make_service()
private_key, credential_id, attested = _generate_credential()
options, state = service.begin_registration(
user_id=b"user-123",
username="alice",
)
# Extract challenge from state to build a matching response
challenge = state["challenge"]
response = _build_registration_response(credential_id, attested, challenge)
result = service.complete_registration(state, response)
assert result.credential_data is not None
assert result.credential_data.credential_id == credential_id
def test_begin_registration_with_existing_credentials() -> None:
service = _make_service()
_, cred_id, attested = _generate_credential()
existing = [
PublicKeyCredentialDescriptor(
type=PublicKeyCredentialType.PUBLIC_KEY,
id=cred_id,
)
]
options, state = service.begin_registration(
user_id=b"user-123",
username="alice",
existing_credentials=existing,
)
pub_key = options["publicKey"]
assert "excludeCredentials" in pub_key
assert len(pub_key["excludeCredentials"]) == 1
# --- Authentication tests ---
def test_begin_authentication_returns_options_and_state() -> None:
service = _make_service()
_, cred_id, attested = _generate_credential()
credentials = [
PublicKeyCredentialDescriptor(
type=PublicKeyCredentialType.PUBLIC_KEY,
id=cred_id,
)
]
options, state = service.begin_authentication(credentials=credentials)
assert "publicKey" in options
assert "challenge" in state
def test_complete_authentication_verifies_signature() -> None:
service = _make_service()
private_key, credential_id, attested = _generate_credential()
credentials = [
PublicKeyCredentialDescriptor(
type=PublicKeyCredentialType.PUBLIC_KEY,
id=credential_id,
)
]
options, state = service.begin_authentication(credentials=credentials)
challenge = state["challenge"]
response = _build_authentication_response(private_key, credential_id, challenge)
result = service.complete_authentication(
state=state,
credentials=[attested],
response=response,
)
assert result.credential_id == credential_id
def test_complete_authentication_wrong_signature_raises() -> None:
import pytest
service = _make_service()
private_key, credential_id, attested = _generate_credential()
# Generate a different key to produce a wrong signature
wrong_key = ec.generate_private_key(ec.SECP256R1())
credentials = [
PublicKeyCredentialDescriptor(
type=PublicKeyCredentialType.PUBLIC_KEY,
id=credential_id,
)
]
options, state = service.begin_authentication(credentials=credentials)
challenge = state["challenge"]
response = _build_authentication_response(wrong_key, credential_id, challenge)
with pytest.raises(Exception): # fido2 raises ValueError or InvalidSignature
service.complete_authentication(
state=state,
credentials=[attested],
response=response,
)
Step 2: Run tests to verify they fail
Run: pytest tests/test_authn/test_webauthn.py -v
Expected: FAIL — cannot import WebAuthnService
Step 3: Write the implementation
# src/fastapi_oidc_op/authn/webauthn.py
from typing import Any, Sequence
from fido2.server import Fido2Server
from fido2.webauthn import (
AttestedCredentialData,
AuthenticationResponse,
AuthenticatorData,
PublicKeyCredentialDescriptor,
PublicKeyCredentialRpEntity,
PublicKeyCredentialUserEntity,
RegistrationResponse,
)
class WebAuthnService:
"""FIDO2 WebAuthn registration and authentication."""
def __init__(self, rp_id: str, rp_name: str, origin: str) -> None:
rp = PublicKeyCredentialRpEntity(name=rp_name, id=rp_id)
self._server = Fido2Server(rp, verify_origin=lambda o: o == origin)
def begin_registration(
self,
user_id: bytes,
username: str,
existing_credentials: Sequence[PublicKeyCredentialDescriptor] | None = None,
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Begin WebAuthn registration.
Returns (options_dict, state_dict).
options_dict is JSON-serializable for sending to the browser.
state_dict must be stored by the caller and passed to complete_registration.
"""
user = PublicKeyCredentialUserEntity(id=user_id, name=username, display_name=username)
options, state = self._server.register_begin(
user=user,
credentials=existing_credentials,
)
return dict(options), state
def complete_registration(
self,
state: dict[str, Any],
response: RegistrationResponse | dict[str, Any],
) -> AuthenticatorData:
"""Complete WebAuthn registration.
Returns AuthenticatorData with credential_data containing the public key.
"""
return self._server.register_complete(state, response)
def begin_authentication(
self,
credentials: Sequence[PublicKeyCredentialDescriptor] | None = None,
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Begin WebAuthn authentication.
Returns (options_dict, state_dict).
"""
options, state = self._server.authenticate_begin(credentials=credentials)
return dict(options), state
def complete_authentication(
self,
state: dict[str, Any],
credentials: Sequence[AttestedCredentialData],
response: AuthenticationResponse | dict[str, Any],
) -> AttestedCredentialData:
"""Complete WebAuthn authentication.
Verifies the assertion signature against stored credentials.
Returns the matched AttestedCredentialData.
Raises on verification failure.
"""
return self._server.authenticate_complete(state, credentials, response)
Step 4: Run tests to verify they pass
Run: pytest tests/test_authn/test_webauthn.py -v
Expected: 6 PASSED
Step 5: Run quality gate
Run: ./scripts/check.sh
Expected: All green
Step 6: Commit
git add src/fastapi_oidc_op/authn/webauthn.py tests/test_authn/test_webauthn.py
git commit -m "feat: add WebAuthnService with fido2 registration and authentication"
Task 3: MagicLinkService
Files:
- Create:
src/fastapi_oidc_op/invite/service.py - Create:
tests/test_invite/__init__.py - Create:
tests/test_invite/test_service.py
Step 1: Write the failing tests
Uses in-memory SQLite fixtures for the MagicLinkRepository.
# tests/test_invite/test_service.py
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import patch
import aiosqlite
import pytest
from fastapi_oidc_op.invite.service import MagicLinkService
from fastapi_oidc_op.store.sqlite.migrations import run_migrations
from fastapi_oidc_op.store.sqlite.repositories import SQLiteMagicLinkRepository
MIGRATIONS_DIR = (
Path(__file__).resolve().parent.parent.parent / "src" / "fastapi_oidc_op" / "store" / "sqlite" / "migrations"
)
@pytest.fixture
async def db():
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON")
await run_migrations(conn, MIGRATIONS_DIR)
yield conn
await conn.close()
@pytest.fixture
def repo(db: aiosqlite.Connection) -> SQLiteMagicLinkRepository:
return SQLiteMagicLinkRepository(db)
@pytest.fixture
def service(repo: SQLiteMagicLinkRepository) -> MagicLinkService:
return MagicLinkService(repo=repo, ttl=3600)
async def test_create_returns_magic_link(service: MagicLinkService) -> None:
link = await service.create(username="alice")
assert link.username == "alice"
assert link.token # non-empty
assert link.used is False
assert link.expires_at > datetime.now(UTC)
async def test_create_generates_unique_tokens(service: MagicLinkService) -> None:
link1 = await service.create(username="alice")
link2 = await service.create(username="bob")
assert link1.token != link2.token
async def test_create_with_optional_fields(service: MagicLinkService) -> None:
link = await service.create(username="alice", created_by="admin-id", note="Welcome")
assert link.created_by == "admin-id"
assert link.note == "Welcome"
async def test_create_respects_ttl(service: MagicLinkService) -> None:
link = await service.create(username="alice")
expected_min = datetime.now(UTC) + timedelta(seconds=3500)
expected_max = datetime.now(UTC) + timedelta(seconds=3700)
assert expected_min < link.expires_at < expected_max
async def test_validate_valid_token(service: MagicLinkService) -> None:
link = await service.create(username="alice")
result = await service.validate(link.token)
assert result is not None
assert result.token == link.token
assert result.username == "alice"
async def test_validate_nonexistent_token(service: MagicLinkService) -> None:
result = await service.validate("nonexistent-token")
assert result is None
async def test_validate_used_token(service: MagicLinkService) -> None:
link = await service.create(username="alice")
await service.mark_used(link.token)
result = await service.validate(link.token)
assert result is None
async def test_validate_expired_token(
service: MagicLinkService, repo: SQLiteMagicLinkRepository
) -> None:
# Create a link that's already expired by using a very short TTL service
expired_service = MagicLinkService(repo=repo, ttl=0)
link = await expired_service.create(username="alice")
# The link expires_at is essentially now, so by the time we validate it should be expired
# To be safe, patch datetime to ensure expiry
result = await service.validate(link.token)
assert result is None
async def test_mark_used_returns_true(service: MagicLinkService) -> None:
link = await service.create(username="alice")
result = await service.mark_used(link.token)
assert result is True
async def test_mark_used_nonexistent_returns_false(service: MagicLinkService) -> None:
result = await service.mark_used("nonexistent")
assert result is False
async def test_cleanup_expired(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None:
# Create an expired link
expired_service = MagicLinkService(repo=repo, ttl=-3600)
await expired_service.create(username="expired-user")
# Create a valid link
await service.create(username="valid-user")
count = await service.cleanup_expired()
assert count == 1
Step 2: Run tests to verify they fail
Run: pytest tests/test_invite/test_service.py -v
Expected: FAIL — cannot import MagicLinkService
Step 3: Create __init__.py and write the implementation
# tests/test_invite/__init__.py
# (empty file)
# src/fastapi_oidc_op/invite/service.py
import secrets
from datetime import UTC, datetime, timedelta
from fastapi_oidc_op.models import MagicLink
from fastapi_oidc_op.store.protocols import MagicLinkRepository
class MagicLinkService:
"""Magic link token lifecycle management."""
def __init__(self, repo: MagicLinkRepository, ttl: int = 86400) -> None:
self._repo = repo
self._ttl = ttl
async def create(
self,
username: str,
created_by: str | None = None,
note: str | None = None,
) -> MagicLink:
"""Generate and store a new magic link for the given username."""
token = secrets.token_urlsafe(32)
expires_at = datetime.now(UTC) + timedelta(seconds=self._ttl)
link = MagicLink(
token=token,
username=username,
expires_at=expires_at,
created_by=created_by,
note=note,
)
return await self._repo.create(link)
async def validate(self, token: str) -> MagicLink | None:
"""Validate a magic link token.
Returns the MagicLink if it exists, is not used, and has not expired.
Returns None otherwise.
"""
link = await self._repo.get_by_token(token)
if link is None or link.used:
return None
if link.expires_at < datetime.now(UTC):
return None
return link
async def mark_used(self, token: str) -> bool:
"""Mark a magic link as used. Returns True if found and marked."""
return await self._repo.mark_used(token)
async def cleanup_expired(self) -> int:
"""Delete expired unused links. Returns count deleted."""
return await self._repo.delete_expired()
Step 4: Run tests to verify they pass
Run: pytest tests/test_invite/test_service.py -v
Expected: All PASSED
Step 5: Run quality gate
Run: ./scripts/check.sh
Expected: All green
Step 6: Commit
git add src/fastapi_oidc_op/invite/service.py tests/test_invite/__init__.py tests/test_invite/test_service.py
git commit -m "feat: add MagicLinkService with token create/validate/cleanup"
Task 4: Update Design Documents
Files:
- Modify:
docs/plans/2026-02-12-sqlite-repositories-design.md
Step 1: Update the roadmap section
In docs/plans/2026-02-12-sqlite-repositories-design.md, update the Roadmap section to mark Authentication services as complete:
Change:
3. **Authentication** (WebAuthn + password) — next phase
To:
3. ~~Authentication services~~ (done) — PasswordService, WebAuthnService, MagicLinkService
4. **Authentication routes** (login/register endpoints + templates) — next phase
And renumber the remaining items.
Step 2: Commit
git add docs/plans/2026-02-12-sqlite-repositories-design.md
git commit -m "docs: update roadmap to reflect completed auth services"