# Authentication Services Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Implement backend authentication services: PasswordService (argon2), WebAuthnService (fido2), and MagicLinkService (token lifecycle). **Architecture:** Three independent services with no cross-dependencies. PasswordService and WebAuthnService are pure logic (no repo dependency). MagicLinkService depends on MagicLinkRepository. All testable with in-memory SQLite or unit tests. **Tech Stack:** argon2-cffi, python-fido2 v2.1, cryptography (for test key generation) **Quality gate:** `./scripts/check.sh` (ruff format, ruff check, ty check, pytest) --- ### Task 1: PasswordService **Files:** - Create: `src/fastapi_oidc_op/authn/password.py` - Create: `tests/test_authn/__init__.py` - Create: `tests/test_authn/test_password.py` **Step 1: Write the failing tests** ```python # tests/test_authn/test_password.py from argon2 import PasswordHasher from fastapi_oidc_op.authn.password import PasswordService def test_hash_returns_argon2_string() -> None: service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) result = service.hash("correcthorse") assert result.startswith("$argon2id$") def test_verify_correct_password() -> None: service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) hashed = service.hash("correcthorse") assert service.verify(hashed, "correcthorse") is True def test_verify_wrong_password() -> None: service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) hashed = service.hash("correcthorse") assert service.verify(hashed, "wrongpassword") is False def test_verify_invalid_hash() -> None: service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) assert service.verify("not-a-hash", "password") is False def test_default_hasher() -> None: service = PasswordService() hashed = service.hash("test") assert service.verify(hashed, "test") is True ``` **Step 2: Run tests to verify they fail** Run: `pytest tests/test_authn/test_password.py -v` Expected: FAIL — cannot import `PasswordService` **Step 3: Create `__init__.py` and write the implementation** ```python # tests/test_authn/__init__.py # (empty file) ``` ```python # src/fastapi_oidc_op/authn/password.py from argon2 import PasswordHasher from argon2.exceptions import InvalidHashError, VerifyMismatchError class PasswordService: """Argon2 password hashing and verification.""" def __init__(self, hasher: PasswordHasher | None = None) -> None: self._hasher = hasher or PasswordHasher() def hash(self, password: str) -> str: """Hash a password using argon2id. Returns the full hash string.""" return self._hasher.hash(password) def verify(self, password_hash: str, password: str) -> bool: """Verify a password against an argon2 hash. Returns True if valid.""" try: return self._hasher.verify(password_hash, password) except (VerifyMismatchError, InvalidHashError): return False ``` **Step 4: Run tests to verify they pass** Run: `pytest tests/test_authn/test_password.py -v` Expected: 5 PASSED **Step 5: Run quality gate** Run: `./scripts/check.sh` Expected: All green **Step 6: Commit** ```bash git add src/fastapi_oidc_op/authn/password.py tests/test_authn/__init__.py tests/test_authn/test_password.py git commit -m "feat: add PasswordService with argon2 hash/verify" ``` --- ### Task 2: WebAuthnService **Files:** - Create: `src/fastapi_oidc_op/authn/webauthn.py` - Create: `tests/test_authn/test_webauthn.py` **Step 1: Write the failing tests** These tests build real cryptographic registration and authentication responses using fido2's factory methods and the `cryptography` library. ```python # tests/test_authn/test_webauthn.py import os from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.hashes import SHA256 from fido2.cose import ES256 from fido2.utils import sha256 from fido2.webauthn import ( Aaguid, AttestedCredentialData, AttestationObject, AuthenticationResponse, AuthenticatorAssertionResponse, AuthenticatorAttestationResponse, AuthenticatorData, CollectedClientData, PublicKeyCredentialDescriptor, PublicKeyCredentialType, RegistrationResponse, ) from fastapi_oidc_op.authn.webauthn import WebAuthnService RP_ID = "localhost" RP_NAME = "Test RP" ORIGIN = "http://localhost:8000" def _make_service() -> WebAuthnService: return WebAuthnService(rp_id=RP_ID, rp_name=RP_NAME, origin=ORIGIN) def _generate_credential() -> tuple[ec.EllipticCurvePrivateKey, bytes, AttestedCredentialData]: """Generate a test credential: (private_key, credential_id, attested_credential_data).""" private_key = ec.generate_private_key(ec.SECP256R1()) cose_key = ES256.from_cryptography_key(private_key.public_key()) credential_id = os.urandom(32) attested = AttestedCredentialData.create( aaguid=Aaguid.NONE, credential_id=credential_id, public_key=cose_key, ) return private_key, credential_id, attested def _build_registration_response( credential_id: bytes, attested: AttestedCredentialData, challenge: bytes, ) -> RegistrationResponse: """Build a valid registration response for the given challenge.""" auth_data = AuthenticatorData.create( rp_id_hash=sha256(RP_ID.encode()), flags=AuthenticatorData.FLAG.UP | AuthenticatorData.FLAG.AT, counter=0, credential_data=attested, ) attestation_object = AttestationObject.create(fmt="none", auth_data=auth_data, att_stmt={}) client_data = CollectedClientData.create( type=CollectedClientData.TYPE.CREATE, challenge=challenge, origin=ORIGIN, ) return RegistrationResponse( raw_id=credential_id, response=AuthenticatorAttestationResponse( client_data=client_data, attestation_object=attestation_object, ), ) def _build_authentication_response( private_key: ec.EllipticCurvePrivateKey, credential_id: bytes, challenge: bytes, counter: int = 1, ) -> AuthenticationResponse: """Build a valid authentication response signed with the private key.""" client_data = CollectedClientData.create( type=CollectedClientData.TYPE.GET, challenge=challenge, origin=ORIGIN, ) auth_data = AuthenticatorData.create( rp_id_hash=sha256(RP_ID.encode()), flags=AuthenticatorData.FLAG.UP, counter=counter, ) signature = private_key.sign(auth_data + client_data.hash, ec.ECDSA(SHA256())) return AuthenticationResponse( raw_id=credential_id, response=AuthenticatorAssertionResponse( client_data=client_data, authenticator_data=auth_data, signature=signature, ), ) # --- Registration tests --- def test_begin_registration_returns_options_and_state() -> None: service = _make_service() options, state = service.begin_registration( user_id=b"user-123", username="alice", ) # Options should be a dict suitable for JSON serialization assert "publicKey" in options pub_key = options["publicKey"] assert "challenge" in pub_key assert "rp" in pub_key assert "user" in pub_key # State should be a dict with challenge assert "challenge" in state def test_complete_registration_returns_credential_data() -> None: service = _make_service() private_key, credential_id, attested = _generate_credential() options, state = service.begin_registration( user_id=b"user-123", username="alice", ) # Extract challenge from state to build a matching response challenge = state["challenge"] response = _build_registration_response(credential_id, attested, challenge) result = service.complete_registration(state, response) assert result.credential_data is not None assert result.credential_data.credential_id == credential_id def test_begin_registration_with_existing_credentials() -> None: service = _make_service() _, cred_id, attested = _generate_credential() existing = [ PublicKeyCredentialDescriptor( type=PublicKeyCredentialType.PUBLIC_KEY, id=cred_id, ) ] options, state = service.begin_registration( user_id=b"user-123", username="alice", existing_credentials=existing, ) pub_key = options["publicKey"] assert "excludeCredentials" in pub_key assert len(pub_key["excludeCredentials"]) == 1 # --- Authentication tests --- def test_begin_authentication_returns_options_and_state() -> None: service = _make_service() _, cred_id, attested = _generate_credential() credentials = [ PublicKeyCredentialDescriptor( type=PublicKeyCredentialType.PUBLIC_KEY, id=cred_id, ) ] options, state = service.begin_authentication(credentials=credentials) assert "publicKey" in options assert "challenge" in state def test_complete_authentication_verifies_signature() -> None: service = _make_service() private_key, credential_id, attested = _generate_credential() credentials = [ PublicKeyCredentialDescriptor( type=PublicKeyCredentialType.PUBLIC_KEY, id=credential_id, ) ] options, state = service.begin_authentication(credentials=credentials) challenge = state["challenge"] response = _build_authentication_response(private_key, credential_id, challenge) result = service.complete_authentication( state=state, credentials=[attested], response=response, ) assert result.credential_id == credential_id def test_complete_authentication_wrong_signature_raises() -> None: import pytest service = _make_service() private_key, credential_id, attested = _generate_credential() # Generate a different key to produce a wrong signature wrong_key = ec.generate_private_key(ec.SECP256R1()) credentials = [ PublicKeyCredentialDescriptor( type=PublicKeyCredentialType.PUBLIC_KEY, id=credential_id, ) ] options, state = service.begin_authentication(credentials=credentials) challenge = state["challenge"] response = _build_authentication_response(wrong_key, credential_id, challenge) with pytest.raises(Exception): # fido2 raises ValueError or InvalidSignature service.complete_authentication( state=state, credentials=[attested], response=response, ) ``` **Step 2: Run tests to verify they fail** Run: `pytest tests/test_authn/test_webauthn.py -v` Expected: FAIL — cannot import `WebAuthnService` **Step 3: Write the implementation** ```python # src/fastapi_oidc_op/authn/webauthn.py from typing import Any, Sequence from fido2.server import Fido2Server from fido2.webauthn import ( AttestedCredentialData, AuthenticationResponse, AuthenticatorData, PublicKeyCredentialDescriptor, PublicKeyCredentialRpEntity, PublicKeyCredentialUserEntity, RegistrationResponse, ) class WebAuthnService: """FIDO2 WebAuthn registration and authentication.""" def __init__(self, rp_id: str, rp_name: str, origin: str) -> None: rp = PublicKeyCredentialRpEntity(name=rp_name, id=rp_id) self._server = Fido2Server(rp, verify_origin=lambda o: o == origin) def begin_registration( self, user_id: bytes, username: str, existing_credentials: Sequence[PublicKeyCredentialDescriptor] | None = None, ) -> tuple[dict[str, Any], dict[str, Any]]: """Begin WebAuthn registration. Returns (options_dict, state_dict). options_dict is JSON-serializable for sending to the browser. state_dict must be stored by the caller and passed to complete_registration. """ user = PublicKeyCredentialUserEntity(id=user_id, name=username, display_name=username) options, state = self._server.register_begin( user=user, credentials=existing_credentials, ) return dict(options), state def complete_registration( self, state: dict[str, Any], response: RegistrationResponse | dict[str, Any], ) -> AuthenticatorData: """Complete WebAuthn registration. Returns AuthenticatorData with credential_data containing the public key. """ return self._server.register_complete(state, response) def begin_authentication( self, credentials: Sequence[PublicKeyCredentialDescriptor] | None = None, ) -> tuple[dict[str, Any], dict[str, Any]]: """Begin WebAuthn authentication. Returns (options_dict, state_dict). """ options, state = self._server.authenticate_begin(credentials=credentials) return dict(options), state def complete_authentication( self, state: dict[str, Any], credentials: Sequence[AttestedCredentialData], response: AuthenticationResponse | dict[str, Any], ) -> AttestedCredentialData: """Complete WebAuthn authentication. Verifies the assertion signature against stored credentials. Returns the matched AttestedCredentialData. Raises on verification failure. """ return self._server.authenticate_complete(state, credentials, response) ``` **Step 4: Run tests to verify they pass** Run: `pytest tests/test_authn/test_webauthn.py -v` Expected: 6 PASSED **Step 5: Run quality gate** Run: `./scripts/check.sh` Expected: All green **Step 6: Commit** ```bash git add src/fastapi_oidc_op/authn/webauthn.py tests/test_authn/test_webauthn.py git commit -m "feat: add WebAuthnService with fido2 registration and authentication" ``` --- ### Task 3: MagicLinkService **Files:** - Create: `src/fastapi_oidc_op/invite/service.py` - Create: `tests/test_invite/__init__.py` - Create: `tests/test_invite/test_service.py` **Step 1: Write the failing tests** Uses in-memory SQLite fixtures for the MagicLinkRepository. ```python # tests/test_invite/test_service.py from datetime import UTC, datetime, timedelta from pathlib import Path from unittest.mock import patch import aiosqlite import pytest from fastapi_oidc_op.invite.service import MagicLinkService from fastapi_oidc_op.store.sqlite.migrations import run_migrations from fastapi_oidc_op.store.sqlite.repositories import SQLiteMagicLinkRepository MIGRATIONS_DIR = ( Path(__file__).resolve().parent.parent.parent / "src" / "fastapi_oidc_op" / "store" / "sqlite" / "migrations" ) @pytest.fixture async def db(): conn = await aiosqlite.connect(":memory:") conn.row_factory = aiosqlite.Row await conn.execute("PRAGMA foreign_keys=ON") await run_migrations(conn, MIGRATIONS_DIR) yield conn await conn.close() @pytest.fixture def repo(db: aiosqlite.Connection) -> SQLiteMagicLinkRepository: return SQLiteMagicLinkRepository(db) @pytest.fixture def service(repo: SQLiteMagicLinkRepository) -> MagicLinkService: return MagicLinkService(repo=repo, ttl=3600) async def test_create_returns_magic_link(service: MagicLinkService) -> None: link = await service.create(username="alice") assert link.username == "alice" assert link.token # non-empty assert link.used is False assert link.expires_at > datetime.now(UTC) async def test_create_generates_unique_tokens(service: MagicLinkService) -> None: link1 = await service.create(username="alice") link2 = await service.create(username="bob") assert link1.token != link2.token async def test_create_with_optional_fields(service: MagicLinkService) -> None: link = await service.create(username="alice", created_by="admin-id", note="Welcome") assert link.created_by == "admin-id" assert link.note == "Welcome" async def test_create_respects_ttl(service: MagicLinkService) -> None: link = await service.create(username="alice") expected_min = datetime.now(UTC) + timedelta(seconds=3500) expected_max = datetime.now(UTC) + timedelta(seconds=3700) assert expected_min < link.expires_at < expected_max async def test_validate_valid_token(service: MagicLinkService) -> None: link = await service.create(username="alice") result = await service.validate(link.token) assert result is not None assert result.token == link.token assert result.username == "alice" async def test_validate_nonexistent_token(service: MagicLinkService) -> None: result = await service.validate("nonexistent-token") assert result is None async def test_validate_used_token(service: MagicLinkService) -> None: link = await service.create(username="alice") await service.mark_used(link.token) result = await service.validate(link.token) assert result is None async def test_validate_expired_token( service: MagicLinkService, repo: SQLiteMagicLinkRepository ) -> None: # Create a link that's already expired by using a very short TTL service expired_service = MagicLinkService(repo=repo, ttl=0) link = await expired_service.create(username="alice") # The link expires_at is essentially now, so by the time we validate it should be expired # To be safe, patch datetime to ensure expiry result = await service.validate(link.token) assert result is None async def test_mark_used_returns_true(service: MagicLinkService) -> None: link = await service.create(username="alice") result = await service.mark_used(link.token) assert result is True async def test_mark_used_nonexistent_returns_false(service: MagicLinkService) -> None: result = await service.mark_used("nonexistent") assert result is False async def test_cleanup_expired(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None: # Create an expired link expired_service = MagicLinkService(repo=repo, ttl=-3600) await expired_service.create(username="expired-user") # Create a valid link await service.create(username="valid-user") count = await service.cleanup_expired() assert count == 1 ``` **Step 2: Run tests to verify they fail** Run: `pytest tests/test_invite/test_service.py -v` Expected: FAIL — cannot import `MagicLinkService` **Step 3: Create `__init__.py` and write the implementation** ```python # tests/test_invite/__init__.py # (empty file) ``` ```python # src/fastapi_oidc_op/invite/service.py import secrets from datetime import UTC, datetime, timedelta from fastapi_oidc_op.models import MagicLink from fastapi_oidc_op.store.protocols import MagicLinkRepository class MagicLinkService: """Magic link token lifecycle management.""" def __init__(self, repo: MagicLinkRepository, ttl: int = 86400) -> None: self._repo = repo self._ttl = ttl async def create( self, username: str, created_by: str | None = None, note: str | None = None, ) -> MagicLink: """Generate and store a new magic link for the given username.""" token = secrets.token_urlsafe(32) expires_at = datetime.now(UTC) + timedelta(seconds=self._ttl) link = MagicLink( token=token, username=username, expires_at=expires_at, created_by=created_by, note=note, ) return await self._repo.create(link) async def validate(self, token: str) -> MagicLink | None: """Validate a magic link token. Returns the MagicLink if it exists, is not used, and has not expired. Returns None otherwise. """ link = await self._repo.get_by_token(token) if link is None or link.used: return None if link.expires_at < datetime.now(UTC): return None return link async def mark_used(self, token: str) -> bool: """Mark a magic link as used. Returns True if found and marked.""" return await self._repo.mark_used(token) async def cleanup_expired(self) -> int: """Delete expired unused links. Returns count deleted.""" return await self._repo.delete_expired() ``` **Step 4: Run tests to verify they pass** Run: `pytest tests/test_invite/test_service.py -v` Expected: All PASSED **Step 5: Run quality gate** Run: `./scripts/check.sh` Expected: All green **Step 6: Commit** ```bash git add src/fastapi_oidc_op/invite/service.py tests/test_invite/__init__.py tests/test_invite/test_service.py git commit -m "feat: add MagicLinkService with token create/validate/cleanup" ``` --- ### Task 4: Update Design Documents **Files:** - Modify: `docs/plans/2026-02-12-sqlite-repositories-design.md` **Step 1: Update the roadmap section** In `docs/plans/2026-02-12-sqlite-repositories-design.md`, update the Roadmap section to mark Authentication services as complete: Change: ``` 3. **Authentication** (WebAuthn + password) — next phase ``` To: ``` 3. ~~Authentication services~~ (done) — PasswordService, WebAuthnService, MagicLinkService 4. **Authentication routes** (login/register endpoints + templates) — next phase ``` And renumber the remaining items. **Step 2: Commit** ```bash git add docs/plans/2026-02-12-sqlite-repositories-design.md git commit -m "docs: update roadmap to reflect completed auth services" ```