diff --git a/docs/plans/2026-02-11-scaffolding-plan.md b/docs/plans/2026-02-11-scaffolding-plan.md deleted file mode 100644 index 8257316..0000000 --- a/docs/plans/2026-02-11-scaffolding-plan.md +++ /dev/null @@ -1,860 +0,0 @@ -# Project Scaffolding Implementation Plan - -> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. - -**Goal:** Set up the project foundation: tooling (ruff, ty), package structure, configuration, Pydantic models, repository protocols, app factory, and initial test infrastructure. - -**Architecture:** FastAPI app factory pattern with src layout. Pydantic models for data, Protocol-based repository interfaces, env-based configuration via pydantic-settings. All code checked by ruff (broad ruleset) and ty (strict mode). - -**Tech Stack:** Python 3.13, FastAPI, pydantic-settings, ruff, ty, pytest, pytest-asyncio - -**Design Document:** `docs/plans/2026-02-11-oidc-op-design.md` - ---- - -### Task 1: Configure pyproject.toml with dependencies and tooling - -**Files:** -- Modify: `pyproject.toml` - -**Step 1: Update pyproject.toml with all sections** - -```toml -[project] -name = "fastapi-oidc-op" -version = "0.1.0" -description = "OIDC OpenID Provider with user management" -readme = "README.md" -requires-python = ">=3.13" -dependencies = [ - "fastapi>=0.115", - "uvicorn[standard]>=0.34", - "idpyoidc>=5.0", - "pydantic-settings>=2.7", - "jinja2>=3.1", - "fido2>=2.1", - "argon2-cffi>=25.1", - "motor>=3.7", - "aiosqlite>=0.21", - "proquint>=0.2", - "python-multipart>=0.0.20", - "httpx>=0.28", -] - -[project.scripts] -fastapi-oidc-op = "fastapi_oidc_op.cli:main" - -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[tool.hatch.build.targets.wheel] -packages = ["src/fastapi_oidc_op"] - -[dependency-groups] -dev = [ - "pytest>=8.0", - "pytest-asyncio>=0.25", - "ruff>=0.15", - "ty>=0.0.16", -] - -[tool.ruff] -line-length = 120 -target-version = "py311" -src = ["src", "tests"] - -[tool.ruff.lint] -select = ["E", "F", "UP", "B", "SIM", "I", "C4", "RUF"] -ignore = ["E501"] - -[tool.ruff.lint.per-file-ignores] -"tests/*" = ["S101"] - -[tool.ruff.format] -quote-style = "double" -indent-style = "space" -docstring-code-format = true - -[tool.ty] -python-version = "3.13" -src = ["src"] - -[tool.ty.rules] -possibly-unresolved-reference = "error" - -[tool.pytest.ini_options] -asyncio_mode = "auto" -testpaths = ["tests"] -``` - -**Step 2: Install dependencies** - -Run: `uv sync` -Expected: All dependencies resolve and install successfully. - -**Step 3: Verify tools work** - -Run: `uv run ruff check --version && uv run ty --version` -Expected: Both print version numbers. - -**Step 4: Commit** - -```bash -git add pyproject.toml uv.lock -git commit -m "chore: configure project dependencies and tooling (ruff, ty, pytest)" -``` - ---- - -### Task 2: Create package structure with __init__.py files - -**Files:** -- Create: `src/fastapi_oidc_op/__init__.py` -- Create: `src/fastapi_oidc_op/oidc/__init__.py` -- Create: `src/fastapi_oidc_op/authn/__init__.py` -- Create: `src/fastapi_oidc_op/manage/__init__.py` -- Create: `src/fastapi_oidc_op/store/__init__.py` -- Create: `src/fastapi_oidc_op/store/mongodb/__init__.py` -- Create: `src/fastapi_oidc_op/store/sqlite/__init__.py` -- Create: `src/fastapi_oidc_op/invite/__init__.py` -- Create: `tests/__init__.py` -- Create: `tests/test_store/__init__.py` -- Create: `tests/test_authn/__init__.py` -- Create: `tests/test_oidc/__init__.py` -- Create: `tests/test_manage/__init__.py` - -**Step 1: Create all directories and __init__.py files** - -All `__init__.py` files are empty. Create the directory tree: - -``` -src/ -└── fastapi_oidc_op/ - ├── __init__.py - ├── oidc/ - │ └── __init__.py - ├── authn/ - │ └── __init__.py - ├── manage/ - │ └── __init__.py - ├── store/ - │ ├── __init__.py - │ ├── mongodb/ - │ │ └── __init__.py - │ └── sqlite/ - │ └── __init__.py - └── invite/ - └── __init__.py -tests/ -├── __init__.py -├── test_store/ -│ └── __init__.py -├── test_authn/ -│ └── __init__.py -├── test_oidc/ -│ └── __init__.py -└── test_manage/ - └── __init__.py -``` - -**Step 2: Verify package is importable** - -Run: `uv run python -c "import fastapi_oidc_op; print('OK')"` -Expected: Prints `OK` - -**Step 3: Commit** - -```bash -git add src/ tests/ -git commit -m "chore: create package structure with src layout" -``` - ---- - -### Task 3: Implement configuration module - -**Files:** -- Create: `src/fastapi_oidc_op/config.py` -- Create: `tests/test_config.py` - -**Step 1: Write the failing test** - -```python -# tests/test_config.py -from fastapi_oidc_op.config import Settings, StorageBackend - - -def test_default_settings() -> None: - settings = Settings( - issuer="http://localhost:8000", - ) - assert settings.issuer == "http://localhost:8000" - assert settings.storage_backend == StorageBackend.SQLITE - assert settings.sqlite_path == "data/oidc_op.db" - assert settings.manage_client_id == "manage-app" - assert settings.invite_ttl == 86400 - assert settings.theme == "default" - - -def test_mongodb_settings() -> None: - settings = Settings( - issuer="http://localhost:8000", - storage_backend=StorageBackend.MONGODB, - mongodb_uri="mongodb://mongo:27017", - mongodb_database="test_db", - ) - assert settings.storage_backend == StorageBackend.MONGODB - assert settings.mongodb_uri == "mongodb://mongo:27017" - assert settings.mongodb_database == "test_db" - - -def test_settings_from_env(monkeypatch: "pytest.MonkeyPatch") -> None: - import pytest - - monkeypatch.setenv("OIDC_OP_ISSUER", "https://op.example.org") - monkeypatch.setenv("OIDC_OP_STORAGE_BACKEND", "mongodb") - monkeypatch.setenv("OIDC_OP_MONGODB_URI", "mongodb://remote:27017") - settings = Settings() # type: ignore[call-arg] - assert settings.issuer == "https://op.example.org" - assert settings.storage_backend == StorageBackend.MONGODB -``` - -**Step 2: Run test to verify it fails** - -Run: `uv run pytest tests/test_config.py -v` -Expected: FAIL - ImportError - -**Step 3: Write the implementation** - -```python -# src/fastapi_oidc_op/config.py -from enum import StrEnum - -from pydantic_settings import BaseSettings - - -class StorageBackend(StrEnum): - SQLITE = "sqlite" - MONGODB = "mongodb" - - -class Settings(BaseSettings): - model_config = {"env_prefix": "OIDC_OP_"} - - # Core - issuer: str - debug: bool = False - - # Storage - storage_backend: StorageBackend = StorageBackend.SQLITE - - # SQLite - sqlite_path: str = "data/oidc_op.db" - - # MongoDB - mongodb_uri: str = "mongodb://localhost:27017" - mongodb_database: str = "oidc_op" - - # Management RP - manage_client_id: str = "manage-app" - - # Magic links - invite_ttl: int = 86400 # seconds - - # Theme - theme: str = "default" -``` - -**Step 4: Run tests to verify they pass** - -Run: `uv run pytest tests/test_config.py -v` -Expected: All 3 tests PASS - -**Step 5: Run ruff and ty** - -Run: `uv run ruff check src/fastapi_oidc_op/config.py tests/test_config.py && uv run ruff format --check src/fastapi_oidc_op/config.py tests/test_config.py && uv run ty check src/fastapi_oidc_op/config.py` -Expected: No errors - -**Step 6: Commit** - -```bash -git add src/fastapi_oidc_op/config.py tests/test_config.py -git commit -m "feat: add configuration module with env-based settings" -``` - ---- - -### Task 4: Implement Pydantic models - -**Files:** -- Create: `src/fastapi_oidc_op/models.py` -- Create: `tests/test_models.py` - -**Step 1: Write the failing tests** - -```python -# tests/test_models.py -from datetime import datetime, timezone - -from fastapi_oidc_op.models import ( - CredentialType, - MagicLink, - PasswordCredential, - User, - WebAuthnCredential, -) - - -def test_user_creation() -> None: - user = User( - userid="lusab-bansen", - username="alice", - ) - assert user.userid == "lusab-bansen" - assert user.username == "alice" - assert user.preferred_username is None - assert user.email is None - assert user.active is True - assert user.groups == [] - assert user.created_at is not None - assert user.updated_at is not None - - -def test_user_with_all_fields() -> None: - user = User( - userid="lusab-bansen", - username="alice", - preferred_username="Alice W.", - given_name="Alice", - family_name="Wonderland", - nickname="ally", - email="alice@example.com", - email_verified=True, - phone_number="+1234567890", - phone_number_verified=False, - picture="https://example.com/alice.jpg", - locale="en-US", - active=True, - groups=["admin", "users"], - ) - assert user.given_name == "Alice" - assert user.groups == ["admin", "users"] - - -def test_webauthn_credential() -> None: - cred = WebAuthnCredential( - user_id="lusab-bansen", - credential_id=b"\x01\x02\x03", - public_key=b"\x04\x05\x06", - sign_count=0, - device_name="YubiKey 5", - ) - assert cred.type == CredentialType.WEBAUTHN - assert cred.credential_id == b"\x01\x02\x03" - assert cred.device_name == "YubiKey 5" - - -def test_password_credential() -> None: - cred = PasswordCredential( - user_id="lusab-bansen", - password_hash="$argon2id$v=19$m=65536,t=3,p=4$hash", - ) - assert cred.type == CredentialType.PASSWORD - assert cred.password_hash.startswith("$argon2") - - -def test_magic_link() -> None: - link = MagicLink( - token="abc123def456", - username="newuser", - ) - assert link.token == "abc123def456" - assert link.username == "newuser" - assert link.used is False - assert link.created_by is None - assert link.expires_at > datetime.now(timezone.utc) -``` - -**Step 2: Run tests to verify they fail** - -Run: `uv run pytest tests/test_models.py -v` -Expected: FAIL - ImportError - -**Step 3: Write the implementation** - -```python -# src/fastapi_oidc_op/models.py -from datetime import datetime, timedelta, timezone -from enum import StrEnum - -from pydantic import BaseModel, Field - - -def _utcnow() -> datetime: - return datetime.now(timezone.utc) - - -def _default_expiry() -> datetime: - return datetime.now(timezone.utc) + timedelta(hours=24) - - -class CredentialType(StrEnum): - WEBAUTHN = "webauthn" - PASSWORD = "password" - - -class User(BaseModel): - userid: str - username: str - preferred_username: str | None = None - given_name: str | None = None - family_name: str | None = None - nickname: str | None = None - email: str | None = None - email_verified: bool = False - phone_number: str | None = None - phone_number_verified: bool = False - picture: str | None = None - locale: str | None = None - active: bool = True - created_at: datetime = Field(default_factory=_utcnow) - updated_at: datetime = Field(default_factory=_utcnow) - groups: list[str] = Field(default_factory=list) - - -class WebAuthnCredential(BaseModel): - user_id: str - type: CredentialType = CredentialType.WEBAUTHN - credential_id: bytes - public_key: bytes - sign_count: int = 0 - device_name: str = "" - created_at: datetime = Field(default_factory=_utcnow) - - -class PasswordCredential(BaseModel): - user_id: str - type: CredentialType = CredentialType.PASSWORD - password_hash: str - created_at: datetime = Field(default_factory=_utcnow) - - -class MagicLink(BaseModel): - token: str - username: str - expires_at: datetime = Field(default_factory=_default_expiry) - used: bool = False - created_by: str | None = None - note: str | None = None -``` - -**Step 4: Run tests to verify they pass** - -Run: `uv run pytest tests/test_models.py -v` -Expected: All 5 tests PASS - -**Step 5: Run ruff and ty** - -Run: `uv run ruff check src/fastapi_oidc_op/models.py tests/test_models.py && uv run ruff format --check src/fastapi_oidc_op/models.py tests/test_models.py && uv run ty check src/fastapi_oidc_op/models.py` -Expected: No errors - -**Step 6: Commit** - -```bash -git add src/fastapi_oidc_op/models.py tests/test_models.py -git commit -m "feat: add Pydantic models for User, Credential, and MagicLink" -``` - ---- - -### Task 5: Implement repository protocols - -**Files:** -- Create: `src/fastapi_oidc_op/store/protocols.py` -- Create: `tests/test_store/test_protocols.py` - -**Step 1: Write the failing test** - -```python -# tests/test_store/test_protocols.py -from typing import runtime_checkable - -from fastapi_oidc_op.store.protocols import ( - CredentialRepository, - MagicLinkRepository, - UserRepository, -) - - -def test_protocols_are_runtime_checkable() -> None: - assert runtime_checkable(UserRepository) # type: ignore[arg-type] - assert runtime_checkable(CredentialRepository) # type: ignore[arg-type] - assert runtime_checkable(MagicLinkRepository) # type: ignore[arg-type] -``` - -Note: This test just verifies the protocols are importable and runtime-checkable. The actual conformance tests come when we implement the SQLite/MongoDB repositories. - -**Step 2: Run test to verify it fails** - -Run: `uv run pytest tests/test_store/test_protocols.py -v` -Expected: FAIL - ImportError - -**Step 3: Write the implementation** - -```python -# src/fastapi_oidc_op/store/protocols.py -from typing import Protocol, runtime_checkable - -from fastapi_oidc_op.models import ( - MagicLink, - PasswordCredential, - User, - WebAuthnCredential, -) - - -@runtime_checkable -class UserRepository(Protocol): - async def create(self, user: User) -> User: ... - - async def get_by_userid(self, userid: str) -> User | None: ... - - async def get_by_username(self, username: str) -> User | None: ... - - async def update(self, user: User) -> User: ... - - async def list_users(self, offset: int = 0, limit: int = 100) -> list[User]: ... - - async def delete(self, userid: str) -> bool: ... - - -@runtime_checkable -class CredentialRepository(Protocol): - async def create_webauthn(self, credential: WebAuthnCredential) -> WebAuthnCredential: ... - - async def create_password(self, credential: PasswordCredential) -> PasswordCredential: ... - - async def get_webauthn_by_user(self, user_id: str) -> list[WebAuthnCredential]: ... - - async def get_webauthn_by_credential_id(self, credential_id: bytes) -> WebAuthnCredential | None: ... - - async def get_password_by_user(self, user_id: str) -> PasswordCredential | None: ... - - async def update_webauthn(self, credential: WebAuthnCredential) -> WebAuthnCredential: ... - - async def delete_webauthn(self, user_id: str, credential_id: bytes) -> bool: ... - - async def delete_password(self, user_id: str) -> bool: ... - - -@runtime_checkable -class MagicLinkRepository(Protocol): - async def create(self, link: MagicLink) -> MagicLink: ... - - async def get_by_token(self, token: str) -> MagicLink | None: ... - - async def mark_used(self, token: str) -> bool: ... - - async def delete_expired(self) -> int: ... -``` - -**Step 4: Run tests to verify they pass** - -Run: `uv run pytest tests/test_store/test_protocols.py -v` -Expected: PASS - -**Step 5: Run ruff and ty** - -Run: `uv run ruff check src/fastapi_oidc_op/store/protocols.py && uv run ruff format --check src/fastapi_oidc_op/store/protocols.py && uv run ty check src/fastapi_oidc_op/store/protocols.py` -Expected: No errors - -**Step 6: Commit** - -```bash -git add src/fastapi_oidc_op/store/protocols.py tests/test_store/test_protocols.py -git commit -m "feat: add repository Protocol interfaces for User, Credential, MagicLink" -``` - ---- - -### Task 6: Implement userid generation utility - -**Files:** -- Create: `src/fastapi_oidc_op/userid.py` -- Create: `tests/test_userid.py` - -**Step 1: Write the failing tests** - -```python -# tests/test_userid.py -import re - -from fastapi_oidc_op.userid import generate_userid - - -def test_generate_userid_format() -> None: - userid = generate_userid() - # 32-bit proquint format: xxxxx-xxxxx - parts = userid.split("-") - assert len(parts) == 2 - for part in parts: - assert len(part) == 5 - - -def test_generate_userid_uniqueness() -> None: - ids = {generate_userid() for _ in range(100)} - assert len(ids) == 100 # All unique - - -def test_generate_userid_is_lowercase() -> None: - userid = generate_userid() - assert userid == userid.lower() -``` - -**Step 2: Run tests to verify they fail** - -Run: `uv run pytest tests/test_userid.py -v` -Expected: FAIL - ImportError - -**Step 3: Write the implementation** - -```python -# src/fastapi_oidc_op/userid.py -import secrets - -from proquint import uint2quint - - -def generate_userid() -> str: - """Generate a unique user identifier in proquint format. - - Returns a 32-bit proquint string like 'lusab-bansen'. - """ - return uint2quint(secrets.randbits(32)) -``` - -**Step 4: Run tests to verify they pass** - -Run: `uv run pytest tests/test_userid.py -v` -Expected: All 3 tests PASS - -**Step 5: Run ruff and ty** - -Run: `uv run ruff check src/fastapi_oidc_op/userid.py tests/test_userid.py && uv run ruff format --check src/fastapi_oidc_op/userid.py tests/test_userid.py && uv run ty check src/fastapi_oidc_op/userid.py` -Expected: No errors (ty may warn about proquint missing type stubs - suppress with `# type: ignore[import-untyped]` if needed) - -**Step 6: Commit** - -```bash -git add src/fastapi_oidc_op/userid.py tests/test_userid.py -git commit -m "feat: add proquint-based userid generation" -``` - ---- - -### Task 7: Implement app factory and health endpoint - -**Files:** -- Create: `src/fastapi_oidc_op/app.py` -- Create: `tests/conftest.py` -- Create: `tests/test_app.py` -- Delete: `main.py` (replaced by app.py) - -**Step 1: Write the failing tests** - -```python -# tests/conftest.py -from collections.abc import AsyncIterator - -import pytest -from httpx import ASGITransport, AsyncClient - -from fastapi_oidc_op.app import create_app -from fastapi_oidc_op.config import Settings - - -@pytest.fixture -def settings() -> Settings: - return Settings(issuer="http://localhost:8000") - - -@pytest.fixture -async def client(settings: Settings) -> AsyncIterator[AsyncClient]: - app = create_app(settings) - transport = ASGITransport(app=app) - async with AsyncClient(transport=transport, base_url=settings.issuer) as ac: - yield ac -``` - -```python -# tests/test_app.py -from httpx import AsyncClient - - -async def test_health_endpoint(client: AsyncClient) -> None: - response = await client.get("/health") - assert response.status_code == 200 - data = response.json() - assert data["status"] == "ok" - - -async def test_app_has_title(client: AsyncClient) -> None: - response = await client.get("/openapi.json") - assert response.status_code == 200 - data = response.json() - assert data["info"]["title"] == "FastAPI OIDC OP" -``` - -**Step 2: Run tests to verify they fail** - -Run: `uv run pytest tests/test_app.py -v` -Expected: FAIL - ImportError - -**Step 3: Write the implementation** - -```python -# src/fastapi_oidc_op/app.py -from fastapi import FastAPI - -from fastapi_oidc_op.config import Settings - - -def create_app(settings: Settings | None = None) -> FastAPI: - if settings is None: - settings = Settings() # type: ignore[call-arg] - - app = FastAPI( - title="FastAPI OIDC OP", - version="0.1.0", - docs_url="/docs" if settings.debug else None, - redoc_url=None, - ) - - app.state.settings = settings - - @app.get("/health") - async def health() -> dict[str, str]: - return {"status": "ok"} - - return app -``` - -**Step 4: Run tests to verify they pass** - -Run: `uv run pytest tests/test_app.py -v` -Expected: All 2 tests PASS - -**Step 5: Delete the old main.py** - -Run: `rm main.py` - -**Step 6: Run ruff and ty on all source files** - -Run: `uv run ruff check src/ tests/ && uv run ruff format --check src/ tests/ && uv run ty check src/` -Expected: No errors - -**Step 7: Commit** - -```bash -git rm main.py -git add src/fastapi_oidc_op/app.py tests/conftest.py tests/test_app.py -git commit -m "feat: add app factory with health endpoint and test infrastructure" -``` - ---- - -### Task 8: Run full quality checks and format - -**Files:** -- All files in `src/` and `tests/` - -**Step 1: Format all code with ruff** - -Run: `uv run ruff format src/ tests/` -Expected: Files formatted (or already formatted) - -**Step 2: Run ruff lint with auto-fix** - -Run: `uv run ruff check src/ tests/ --fix` -Expected: No errors (or safe fixes applied) - -**Step 3: Run ty type check** - -Run: `uv run ty check src/` -Expected: No errors - -**Step 4: Run full test suite** - -Run: `uv run pytest -v` -Expected: All tests pass - -**Step 5: Commit any formatting changes** - -```bash -git add -A -git diff --cached --quiet || git commit -m "style: apply ruff formatting" -``` - ---- - -### Task 9: Add pre-commit quality gate script - -**Files:** -- Create: `scripts/check.sh` - -**Step 1: Create the check script** - -```bash -#!/usr/bin/env bash -# Run all quality checks -set -euo pipefail - -echo "==> Formatting..." -uv run ruff format src/ tests/ - -echo "==> Linting..." -uv run ruff check src/ tests/ --fix - -echo "==> Type checking..." -uv run ty check src/ - -echo "==> Testing..." -uv run pytest -v - -echo "==> All checks passed!" -``` - -**Step 2: Make it executable** - -Run: `chmod +x scripts/check.sh` - -**Step 3: Run the script to verify it works** - -Run: `./scripts/check.sh` -Expected: All checks pass - -**Step 4: Commit** - -```bash -git add scripts/check.sh -git commit -m "chore: add quality check script (ruff, ty, pytest)" -``` - ---- - -## Summary - -After completing these 9 tasks, the project will have: - -1. **pyproject.toml** - Dependencies, ruff config (broad ruleset), ty config (strict), pytest config -2. **Package structure** - Full src layout with all subpackages -3. **Configuration** - Pydantic Settings with env vars, storage backend selection -4. **Models** - User, WebAuthnCredential, PasswordCredential, MagicLink -5. **Repository protocols** - Type-safe interfaces for all data access -6. **Userid generation** - Proquint-based unique identifiers -7. **App factory** - FastAPI app with health endpoint -8. **Test infrastructure** - pytest + httpx async client fixtures -9. **Quality tooling** - ruff (lint + format), ty (type check), check script - -The next plan will cover the SQLite repository implementation, followed by authentication, OIDC provider integration, and the management UI. diff --git a/docs/plans/2026-02-13-auth-services-plan.md b/docs/plans/2026-02-13-auth-services-plan.md deleted file mode 100644 index f282c39..0000000 --- a/docs/plans/2026-02-13-auth-services-plan.md +++ /dev/null @@ -1,702 +0,0 @@ -# Authentication Services Implementation Plan - -> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. - -**Goal:** Implement backend authentication services: PasswordService (argon2), WebAuthnService (fido2), and MagicLinkService (token lifecycle). - -**Architecture:** Three independent services with no cross-dependencies. PasswordService and WebAuthnService are pure logic (no repo dependency). MagicLinkService depends on MagicLinkRepository. All testable with in-memory SQLite or unit tests. - -**Tech Stack:** argon2-cffi, python-fido2 v2.1, cryptography (for test key generation) - -**Quality gate:** `./scripts/check.sh` (ruff format, ruff check, ty check, pytest) - ---- - -### Task 1: PasswordService - -**Files:** -- Create: `src/fastapi_oidc_op/authn/password.py` -- Create: `tests/test_authn/__init__.py` -- Create: `tests/test_authn/test_password.py` - -**Step 1: Write the failing tests** - -```python -# tests/test_authn/test_password.py -from argon2 import PasswordHasher - -from fastapi_oidc_op.authn.password import PasswordService - - -def test_hash_returns_argon2_string() -> None: - service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) - result = service.hash("correcthorse") - assert result.startswith("$argon2id$") - - -def test_verify_correct_password() -> None: - service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) - hashed = service.hash("correcthorse") - assert service.verify(hashed, "correcthorse") is True - - -def test_verify_wrong_password() -> None: - service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) - hashed = service.hash("correcthorse") - assert service.verify(hashed, "wrongpassword") is False - - -def test_verify_invalid_hash() -> None: - service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) - assert service.verify("not-a-hash", "password") is False - - -def test_default_hasher() -> None: - service = PasswordService() - hashed = service.hash("test") - assert service.verify(hashed, "test") is True -``` - -**Step 2: Run tests to verify they fail** - -Run: `pytest tests/test_authn/test_password.py -v` -Expected: FAIL — cannot import `PasswordService` - -**Step 3: Create `__init__.py` and write the implementation** - -```python -# tests/test_authn/__init__.py -# (empty file) -``` - -```python -# src/fastapi_oidc_op/authn/password.py -from argon2 import PasswordHasher -from argon2.exceptions import InvalidHashError, VerifyMismatchError - - -class PasswordService: - """Argon2 password hashing and verification.""" - - def __init__(self, hasher: PasswordHasher | None = None) -> None: - self._hasher = hasher or PasswordHasher() - - def hash(self, password: str) -> str: - """Hash a password using argon2id. Returns the full hash string.""" - return self._hasher.hash(password) - - def verify(self, password_hash: str, password: str) -> bool: - """Verify a password against an argon2 hash. Returns True if valid.""" - try: - return self._hasher.verify(password_hash, password) - except (VerifyMismatchError, InvalidHashError): - return False -``` - -**Step 4: Run tests to verify they pass** - -Run: `pytest tests/test_authn/test_password.py -v` -Expected: 5 PASSED - -**Step 5: Run quality gate** - -Run: `./scripts/check.sh` -Expected: All green - -**Step 6: Commit** - -```bash -git add src/fastapi_oidc_op/authn/password.py tests/test_authn/__init__.py tests/test_authn/test_password.py -git commit -m "feat: add PasswordService with argon2 hash/verify" -``` - ---- - -### Task 2: WebAuthnService - -**Files:** -- Create: `src/fastapi_oidc_op/authn/webauthn.py` -- Create: `tests/test_authn/test_webauthn.py` - -**Step 1: Write the failing tests** - -These tests build real cryptographic registration and authentication responses using fido2's factory methods and the `cryptography` library. - -```python -# tests/test_authn/test_webauthn.py -import os - -from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.primitives.hashes import SHA256 -from fido2.cose import ES256 -from fido2.utils import sha256 -from fido2.webauthn import ( - Aaguid, - AttestedCredentialData, - AttestationObject, - AuthenticationResponse, - AuthenticatorAssertionResponse, - AuthenticatorAttestationResponse, - AuthenticatorData, - CollectedClientData, - PublicKeyCredentialDescriptor, - PublicKeyCredentialType, - RegistrationResponse, -) - -from fastapi_oidc_op.authn.webauthn import WebAuthnService - -RP_ID = "localhost" -RP_NAME = "Test RP" -ORIGIN = "http://localhost:8000" - - -def _make_service() -> WebAuthnService: - return WebAuthnService(rp_id=RP_ID, rp_name=RP_NAME, origin=ORIGIN) - - -def _generate_credential() -> tuple[ec.EllipticCurvePrivateKey, bytes, AttestedCredentialData]: - """Generate a test credential: (private_key, credential_id, attested_credential_data).""" - private_key = ec.generate_private_key(ec.SECP256R1()) - cose_key = ES256.from_cryptography_key(private_key.public_key()) - credential_id = os.urandom(32) - attested = AttestedCredentialData.create( - aaguid=Aaguid.NONE, - credential_id=credential_id, - public_key=cose_key, - ) - return private_key, credential_id, attested - - -def _build_registration_response( - credential_id: bytes, - attested: AttestedCredentialData, - challenge: bytes, -) -> RegistrationResponse: - """Build a valid registration response for the given challenge.""" - auth_data = AuthenticatorData.create( - rp_id_hash=sha256(RP_ID.encode()), - flags=AuthenticatorData.FLAG.UP | AuthenticatorData.FLAG.AT, - counter=0, - credential_data=attested, - ) - attestation_object = AttestationObject.create(fmt="none", auth_data=auth_data, att_stmt={}) - client_data = CollectedClientData.create( - type=CollectedClientData.TYPE.CREATE, - challenge=challenge, - origin=ORIGIN, - ) - return RegistrationResponse( - raw_id=credential_id, - response=AuthenticatorAttestationResponse( - client_data=client_data, - attestation_object=attestation_object, - ), - ) - - -def _build_authentication_response( - private_key: ec.EllipticCurvePrivateKey, - credential_id: bytes, - challenge: bytes, - counter: int = 1, -) -> AuthenticationResponse: - """Build a valid authentication response signed with the private key.""" - client_data = CollectedClientData.create( - type=CollectedClientData.TYPE.GET, - challenge=challenge, - origin=ORIGIN, - ) - auth_data = AuthenticatorData.create( - rp_id_hash=sha256(RP_ID.encode()), - flags=AuthenticatorData.FLAG.UP, - counter=counter, - ) - signature = private_key.sign(auth_data + client_data.hash, ec.ECDSA(SHA256())) - return AuthenticationResponse( - raw_id=credential_id, - response=AuthenticatorAssertionResponse( - client_data=client_data, - authenticator_data=auth_data, - signature=signature, - ), - ) - - -# --- Registration tests --- - -def test_begin_registration_returns_options_and_state() -> None: - service = _make_service() - options, state = service.begin_registration( - user_id=b"user-123", - username="alice", - ) - # Options should be a dict suitable for JSON serialization - assert "publicKey" in options - pub_key = options["publicKey"] - assert "challenge" in pub_key - assert "rp" in pub_key - assert "user" in pub_key - # State should be a dict with challenge - assert "challenge" in state - - -def test_complete_registration_returns_credential_data() -> None: - service = _make_service() - private_key, credential_id, attested = _generate_credential() - - options, state = service.begin_registration( - user_id=b"user-123", - username="alice", - ) - - # Extract challenge from state to build a matching response - challenge = state["challenge"] - response = _build_registration_response(credential_id, attested, challenge) - - result = service.complete_registration(state, response) - assert result.credential_data is not None - assert result.credential_data.credential_id == credential_id - - -def test_begin_registration_with_existing_credentials() -> None: - service = _make_service() - _, cred_id, attested = _generate_credential() - existing = [ - PublicKeyCredentialDescriptor( - type=PublicKeyCredentialType.PUBLIC_KEY, - id=cred_id, - ) - ] - - options, state = service.begin_registration( - user_id=b"user-123", - username="alice", - existing_credentials=existing, - ) - pub_key = options["publicKey"] - assert "excludeCredentials" in pub_key - assert len(pub_key["excludeCredentials"]) == 1 - - -# --- Authentication tests --- - -def test_begin_authentication_returns_options_and_state() -> None: - service = _make_service() - _, cred_id, attested = _generate_credential() - credentials = [ - PublicKeyCredentialDescriptor( - type=PublicKeyCredentialType.PUBLIC_KEY, - id=cred_id, - ) - ] - - options, state = service.begin_authentication(credentials=credentials) - assert "publicKey" in options - assert "challenge" in state - - -def test_complete_authentication_verifies_signature() -> None: - service = _make_service() - private_key, credential_id, attested = _generate_credential() - - credentials = [ - PublicKeyCredentialDescriptor( - type=PublicKeyCredentialType.PUBLIC_KEY, - id=credential_id, - ) - ] - - options, state = service.begin_authentication(credentials=credentials) - challenge = state["challenge"] - - response = _build_authentication_response(private_key, credential_id, challenge) - - result = service.complete_authentication( - state=state, - credentials=[attested], - response=response, - ) - assert result.credential_id == credential_id - - -def test_complete_authentication_wrong_signature_raises() -> None: - import pytest - - service = _make_service() - private_key, credential_id, attested = _generate_credential() - - # Generate a different key to produce a wrong signature - wrong_key = ec.generate_private_key(ec.SECP256R1()) - - credentials = [ - PublicKeyCredentialDescriptor( - type=PublicKeyCredentialType.PUBLIC_KEY, - id=credential_id, - ) - ] - - options, state = service.begin_authentication(credentials=credentials) - challenge = state["challenge"] - - response = _build_authentication_response(wrong_key, credential_id, challenge) - - with pytest.raises(Exception): # fido2 raises ValueError or InvalidSignature - service.complete_authentication( - state=state, - credentials=[attested], - response=response, - ) -``` - -**Step 2: Run tests to verify they fail** - -Run: `pytest tests/test_authn/test_webauthn.py -v` -Expected: FAIL — cannot import `WebAuthnService` - -**Step 3: Write the implementation** - -```python -# src/fastapi_oidc_op/authn/webauthn.py -from typing import Any, Sequence - -from fido2.server import Fido2Server -from fido2.webauthn import ( - AttestedCredentialData, - AuthenticationResponse, - AuthenticatorData, - PublicKeyCredentialDescriptor, - PublicKeyCredentialRpEntity, - PublicKeyCredentialUserEntity, - RegistrationResponse, -) - - -class WebAuthnService: - """FIDO2 WebAuthn registration and authentication.""" - - def __init__(self, rp_id: str, rp_name: str, origin: str) -> None: - rp = PublicKeyCredentialRpEntity(name=rp_name, id=rp_id) - self._server = Fido2Server(rp, verify_origin=lambda o: o == origin) - - def begin_registration( - self, - user_id: bytes, - username: str, - existing_credentials: Sequence[PublicKeyCredentialDescriptor] | None = None, - ) -> tuple[dict[str, Any], dict[str, Any]]: - """Begin WebAuthn registration. - - Returns (options_dict, state_dict). - options_dict is JSON-serializable for sending to the browser. - state_dict must be stored by the caller and passed to complete_registration. - """ - user = PublicKeyCredentialUserEntity(id=user_id, name=username, display_name=username) - options, state = self._server.register_begin( - user=user, - credentials=existing_credentials, - ) - return dict(options), state - - def complete_registration( - self, - state: dict[str, Any], - response: RegistrationResponse | dict[str, Any], - ) -> AuthenticatorData: - """Complete WebAuthn registration. - - Returns AuthenticatorData with credential_data containing the public key. - """ - return self._server.register_complete(state, response) - - def begin_authentication( - self, - credentials: Sequence[PublicKeyCredentialDescriptor] | None = None, - ) -> tuple[dict[str, Any], dict[str, Any]]: - """Begin WebAuthn authentication. - - Returns (options_dict, state_dict). - """ - options, state = self._server.authenticate_begin(credentials=credentials) - return dict(options), state - - def complete_authentication( - self, - state: dict[str, Any], - credentials: Sequence[AttestedCredentialData], - response: AuthenticationResponse | dict[str, Any], - ) -> AttestedCredentialData: - """Complete WebAuthn authentication. - - Verifies the assertion signature against stored credentials. - Returns the matched AttestedCredentialData. - Raises on verification failure. - """ - return self._server.authenticate_complete(state, credentials, response) -``` - -**Step 4: Run tests to verify they pass** - -Run: `pytest tests/test_authn/test_webauthn.py -v` -Expected: 6 PASSED - -**Step 5: Run quality gate** - -Run: `./scripts/check.sh` -Expected: All green - -**Step 6: Commit** - -```bash -git add src/fastapi_oidc_op/authn/webauthn.py tests/test_authn/test_webauthn.py -git commit -m "feat: add WebAuthnService with fido2 registration and authentication" -``` - ---- - -### Task 3: MagicLinkService - -**Files:** -- Create: `src/fastapi_oidc_op/invite/service.py` -- Create: `tests/test_invite/__init__.py` -- Create: `tests/test_invite/test_service.py` - -**Step 1: Write the failing tests** - -Uses in-memory SQLite fixtures for the MagicLinkRepository. - -```python -# tests/test_invite/test_service.py -from datetime import UTC, datetime, timedelta -from pathlib import Path -from unittest.mock import patch - -import aiosqlite -import pytest - -from fastapi_oidc_op.invite.service import MagicLinkService -from fastapi_oidc_op.store.sqlite.migrations import run_migrations -from fastapi_oidc_op.store.sqlite.repositories import SQLiteMagicLinkRepository - -MIGRATIONS_DIR = ( - Path(__file__).resolve().parent.parent.parent / "src" / "fastapi_oidc_op" / "store" / "sqlite" / "migrations" -) - - -@pytest.fixture -async def db(): - conn = await aiosqlite.connect(":memory:") - conn.row_factory = aiosqlite.Row - await conn.execute("PRAGMA foreign_keys=ON") - await run_migrations(conn, MIGRATIONS_DIR) - yield conn - await conn.close() - - -@pytest.fixture -def repo(db: aiosqlite.Connection) -> SQLiteMagicLinkRepository: - return SQLiteMagicLinkRepository(db) - - -@pytest.fixture -def service(repo: SQLiteMagicLinkRepository) -> MagicLinkService: - return MagicLinkService(repo=repo, ttl=3600) - - -async def test_create_returns_magic_link(service: MagicLinkService) -> None: - link = await service.create(username="alice") - assert link.username == "alice" - assert link.token # non-empty - assert link.used is False - assert link.expires_at > datetime.now(UTC) - - -async def test_create_generates_unique_tokens(service: MagicLinkService) -> None: - link1 = await service.create(username="alice") - link2 = await service.create(username="bob") - assert link1.token != link2.token - - -async def test_create_with_optional_fields(service: MagicLinkService) -> None: - link = await service.create(username="alice", created_by="admin-id", note="Welcome") - assert link.created_by == "admin-id" - assert link.note == "Welcome" - - -async def test_create_respects_ttl(service: MagicLinkService) -> None: - link = await service.create(username="alice") - expected_min = datetime.now(UTC) + timedelta(seconds=3500) - expected_max = datetime.now(UTC) + timedelta(seconds=3700) - assert expected_min < link.expires_at < expected_max - - -async def test_validate_valid_token(service: MagicLinkService) -> None: - link = await service.create(username="alice") - result = await service.validate(link.token) - assert result is not None - assert result.token == link.token - assert result.username == "alice" - - -async def test_validate_nonexistent_token(service: MagicLinkService) -> None: - result = await service.validate("nonexistent-token") - assert result is None - - -async def test_validate_used_token(service: MagicLinkService) -> None: - link = await service.create(username="alice") - await service.mark_used(link.token) - result = await service.validate(link.token) - assert result is None - - -async def test_validate_expired_token( - service: MagicLinkService, repo: SQLiteMagicLinkRepository -) -> None: - # Create a link that's already expired by using a very short TTL service - expired_service = MagicLinkService(repo=repo, ttl=0) - link = await expired_service.create(username="alice") - # The link expires_at is essentially now, so by the time we validate it should be expired - # To be safe, patch datetime to ensure expiry - result = await service.validate(link.token) - assert result is None - - -async def test_mark_used_returns_true(service: MagicLinkService) -> None: - link = await service.create(username="alice") - result = await service.mark_used(link.token) - assert result is True - - -async def test_mark_used_nonexistent_returns_false(service: MagicLinkService) -> None: - result = await service.mark_used("nonexistent") - assert result is False - - -async def test_cleanup_expired(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None: - # Create an expired link - expired_service = MagicLinkService(repo=repo, ttl=-3600) - await expired_service.create(username="expired-user") - - # Create a valid link - await service.create(username="valid-user") - - count = await service.cleanup_expired() - assert count == 1 -``` - -**Step 2: Run tests to verify they fail** - -Run: `pytest tests/test_invite/test_service.py -v` -Expected: FAIL — cannot import `MagicLinkService` - -**Step 3: Create `__init__.py` and write the implementation** - -```python -# tests/test_invite/__init__.py -# (empty file) -``` - -```python -# src/fastapi_oidc_op/invite/service.py -import secrets -from datetime import UTC, datetime, timedelta - -from fastapi_oidc_op.models import MagicLink -from fastapi_oidc_op.store.protocols import MagicLinkRepository - - -class MagicLinkService: - """Magic link token lifecycle management.""" - - def __init__(self, repo: MagicLinkRepository, ttl: int = 86400) -> None: - self._repo = repo - self._ttl = ttl - - async def create( - self, - username: str, - created_by: str | None = None, - note: str | None = None, - ) -> MagicLink: - """Generate and store a new magic link for the given username.""" - token = secrets.token_urlsafe(32) - expires_at = datetime.now(UTC) + timedelta(seconds=self._ttl) - link = MagicLink( - token=token, - username=username, - expires_at=expires_at, - created_by=created_by, - note=note, - ) - return await self._repo.create(link) - - async def validate(self, token: str) -> MagicLink | None: - """Validate a magic link token. - - Returns the MagicLink if it exists, is not used, and has not expired. - Returns None otherwise. - """ - link = await self._repo.get_by_token(token) - if link is None or link.used: - return None - if link.expires_at < datetime.now(UTC): - return None - return link - - async def mark_used(self, token: str) -> bool: - """Mark a magic link as used. Returns True if found and marked.""" - return await self._repo.mark_used(token) - - async def cleanup_expired(self) -> int: - """Delete expired unused links. Returns count deleted.""" - return await self._repo.delete_expired() -``` - -**Step 4: Run tests to verify they pass** - -Run: `pytest tests/test_invite/test_service.py -v` -Expected: All PASSED - -**Step 5: Run quality gate** - -Run: `./scripts/check.sh` -Expected: All green - -**Step 6: Commit** - -```bash -git add src/fastapi_oidc_op/invite/service.py tests/test_invite/__init__.py tests/test_invite/test_service.py -git commit -m "feat: add MagicLinkService with token create/validate/cleanup" -``` - ---- - -### Task 4: Update Design Documents - -**Files:** -- Modify: `docs/plans/2026-02-12-sqlite-repositories-design.md` - -**Step 1: Update the roadmap section** - -In `docs/plans/2026-02-12-sqlite-repositories-design.md`, update the Roadmap section to mark Authentication services as complete: - -Change: -``` -3. **Authentication** (WebAuthn + password) — next phase -``` - -To: -``` -3. ~~Authentication services~~ (done) — PasswordService, WebAuthnService, MagicLinkService -4. **Authentication routes** (login/register endpoints + templates) — next phase -``` - -And renumber the remaining items. - -**Step 2: Commit** - -```bash -git add docs/plans/2026-02-12-sqlite-repositories-design.md -git commit -m "docs: update roadmap to reflect completed auth services" -``` diff --git a/docs/plans/2026-02-13-sqlite-repositories-plan.md b/docs/plans/2026-02-13-sqlite-repositories-plan.md deleted file mode 100644 index e657f7b..0000000 --- a/docs/plans/2026-02-13-sqlite-repositories-plan.md +++ /dev/null @@ -1,1367 +0,0 @@ -# SQLite Repositories Implementation Plan - -> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. - -**Goal:** Implement SQLite-backed repository classes that satisfy the existing Protocol interfaces, with a migration runner, lifespan integration, and FastAPI dependency injection. - -**Architecture:** Three repository classes (`SQLiteUserRepository`, `SQLiteCredentialRepository`, `SQLiteMagicLinkRepository`) sharing a single `aiosqlite` connection, initialized via FastAPI lifespan. Schema managed by numbered SQL migration files applied at startup. - -**Tech Stack:** aiosqlite, SQLite WAL mode, pytest with in-memory SQLite - -**Quality gate:** `./scripts/check.sh` (ruff format, ruff check, ty check, pytest) - ---- - -### Task 1: SQL Migration File - -**Files:** -- Create: `src/fastapi_oidc_op/store/sqlite/migrations/001_initial.sql` - -**Step 1: Create the migration file** - -```sql -CREATE TABLE users ( - userid TEXT PRIMARY KEY, - username TEXT NOT NULL UNIQUE, - preferred_username TEXT, - given_name TEXT, - family_name TEXT, - nickname TEXT, - email TEXT, - email_verified INTEGER NOT NULL DEFAULT 0, - phone_number TEXT, - phone_number_verified INTEGER NOT NULL DEFAULT 0, - picture TEXT, - locale TEXT, - active INTEGER NOT NULL DEFAULT 1, - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL -); - -CREATE TABLE user_groups ( - userid TEXT NOT NULL REFERENCES users(userid) ON DELETE CASCADE, - group_name TEXT NOT NULL, - PRIMARY KEY (userid, group_name) -); - -CREATE TABLE webauthn_credentials ( - user_id TEXT NOT NULL REFERENCES users(userid) ON DELETE CASCADE, - credential_id BLOB NOT NULL, - public_key BLOB NOT NULL, - sign_count INTEGER NOT NULL DEFAULT 0, - device_name TEXT NOT NULL DEFAULT '', - created_at TEXT NOT NULL, - PRIMARY KEY (user_id, credential_id) -); - -CREATE TABLE password_credentials ( - user_id TEXT PRIMARY KEY REFERENCES users(userid) ON DELETE CASCADE, - password_hash TEXT NOT NULL, - created_at TEXT NOT NULL -); - -CREATE TABLE magic_links ( - token TEXT PRIMARY KEY, - username TEXT NOT NULL, - expires_at TEXT NOT NULL, - used INTEGER NOT NULL DEFAULT 0, - created_by TEXT, - note TEXT -); -``` - -**Step 2: Commit** - -```bash -git add src/fastapi_oidc_op/store/sqlite/migrations/001_initial.sql -git commit -m "feat: add initial SQLite migration schema" -``` - ---- - -### Task 2: Migration Runner - -**Files:** -- Create: `src/fastapi_oidc_op/store/sqlite/migrations.py` -- Test: `tests/test_store/test_migrations.py` - -**Step 1: Write the failing tests** - -```python -# tests/test_store/test_migrations.py -from pathlib import Path - -import aiosqlite - -from fastapi_oidc_op.store.sqlite.migrations import run_migrations - -MIGRATIONS_DIR = Path(__file__).resolve().parent.parent.parent / "src" / "fastapi_oidc_op" / "store" / "sqlite" / "migrations" - - -async def test_run_migrations_applies_initial() -> None: - async with aiosqlite.connect(":memory:") as db: - await db.execute("PRAGMA foreign_keys=ON") - count = await run_migrations(db, MIGRATIONS_DIR) - assert count == 1 - # Verify users table exists - async with db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'") as cursor: - row = await cursor.fetchone() - assert row is not None - - -async def test_run_migrations_skips_already_applied() -> None: - async with aiosqlite.connect(":memory:") as db: - await db.execute("PRAGMA foreign_keys=ON") - first_count = await run_migrations(db, MIGRATIONS_DIR) - second_count = await run_migrations(db, MIGRATIONS_DIR) - assert first_count == 1 - assert second_count == 0 - - -async def test_run_migrations_creates_all_tables() -> None: - async with aiosqlite.connect(":memory:") as db: - await db.execute("PRAGMA foreign_keys=ON") - await run_migrations(db, MIGRATIONS_DIR) - async with db.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") as cursor: - tables = [row[0] async for row in cursor] - assert "users" in tables - assert "user_groups" in tables - assert "webauthn_credentials" in tables - assert "password_credentials" in tables - assert "magic_links" in tables - assert "_migrations" in tables -``` - -**Step 2: Run tests to verify they fail** - -Run: `pytest tests/test_store/test_migrations.py -v` -Expected: FAIL — cannot import `run_migrations` - -**Step 3: Write the migration runner** - -```python -# src/fastapi_oidc_op/store/sqlite/migrations.py -from pathlib import Path - -import aiosqlite - - -async def run_migrations(db: aiosqlite.Connection, migrations_dir: Path) -> int: - """Apply unapplied SQL migration files in order. Returns count of newly applied migrations.""" - await db.execute( - """ - CREATE TABLE IF NOT EXISTS _migrations ( - id INTEGER PRIMARY KEY, - filename TEXT NOT NULL UNIQUE, - applied_at TEXT NOT NULL DEFAULT (datetime('now')) - ) - """ - ) - await db.commit() - - # Get already-applied migrations - async with db.execute("SELECT filename FROM _migrations") as cursor: - applied = {row[0] async for row in cursor} - - # Find and sort migration files - migration_files = sorted(migrations_dir.glob("*.sql")) - - count = 0 - for migration_file in migration_files: - if migration_file.name in applied: - continue - sql = migration_file.read_text() - await db.executescript(sql) - await db.execute("INSERT INTO _migrations (filename) VALUES (?)", (migration_file.name,)) - await db.commit() - count += 1 - - return count -``` - -**Step 4: Run tests to verify they pass** - -Run: `pytest tests/test_store/test_migrations.py -v` -Expected: 3 PASSED - -**Step 5: Run quality gate** - -Run: `./scripts/check.sh` -Expected: All green - -**Step 6: Commit** - -```bash -git add src/fastapi_oidc_op/store/sqlite/migrations.py tests/test_store/test_migrations.py -git commit -m "feat: add SQLite migration runner" -``` - ---- - -### Task 3: Domain Exception and Store Exports - -**Files:** -- Create: `src/fastapi_oidc_op/store/exceptions.py` -- Test: `tests/test_store/test_exceptions.py` - -**Step 1: Write the failing test** - -```python -# tests/test_store/test_exceptions.py -from fastapi_oidc_op.store.exceptions import DuplicateError - - -def test_duplicate_error_is_exception() -> None: - error = DuplicateError("user already exists") - assert isinstance(error, Exception) - assert str(error) == "user already exists" -``` - -**Step 2: Run test to verify it fails** - -Run: `pytest tests/test_store/test_exceptions.py -v` -Expected: FAIL — cannot import `DuplicateError` - -**Step 3: Write the exception** - -```python -# src/fastapi_oidc_op/store/exceptions.py -class DuplicateError(Exception): - """Raised when a create operation violates a uniqueness constraint.""" -``` - -**Step 4: Run test to verify it passes** - -Run: `pytest tests/test_store/test_exceptions.py -v` -Expected: 1 PASSED - -**Step 5: Commit** - -```bash -git add src/fastapi_oidc_op/store/exceptions.py tests/test_store/test_exceptions.py -git commit -m "feat: add DuplicateError domain exception" -``` - ---- - -### Task 4: SQLiteUserRepository - -**Files:** -- Create: `src/fastapi_oidc_op/store/sqlite/repositories.py` -- Create: `tests/test_store/conftest.py` (shared fixtures) -- Create: `tests/test_store/test_sqlite_user_repo.py` - -**Step 1: Write shared test fixtures** - -```python -# tests/test_store/conftest.py -from pathlib import Path - -import aiosqlite -import pytest - -from fastapi_oidc_op.store.sqlite.migrations import run_migrations - -MIGRATIONS_DIR = Path(__file__).resolve().parent.parent.parent / "src" / "fastapi_oidc_op" / "store" / "sqlite" / "migrations" - - -@pytest.fixture -async def db(): - conn = await aiosqlite.connect(":memory:") - conn.row_factory = aiosqlite.Row - await conn.execute("PRAGMA foreign_keys=ON") - await run_migrations(conn, MIGRATIONS_DIR) - yield conn - await conn.close() -``` - -**Step 2: Write the failing tests** - -```python -# tests/test_store/test_sqlite_user_repo.py -import aiosqlite -import pytest - -from fastapi_oidc_op.models import User -from fastapi_oidc_op.store.exceptions import DuplicateError -from fastapi_oidc_op.store.protocols import UserRepository -from fastapi_oidc_op.store.sqlite.repositories import SQLiteUserRepository - - -@pytest.fixture -def user_repo(db: aiosqlite.Connection) -> SQLiteUserRepository: - return SQLiteUserRepository(db) - - -def _make_user(**overrides) -> User: - defaults = {"userid": "lusab-bansen", "username": "alice"} - defaults.update(overrides) - return User(**defaults) - - -async def test_implements_protocol(user_repo: SQLiteUserRepository) -> None: - assert isinstance(user_repo, UserRepository) - - -async def test_create_and_get_by_userid(user_repo: SQLiteUserRepository) -> None: - user = _make_user() - created = await user_repo.create(user) - assert created.userid == "lusab-bansen" - assert created.username == "alice" - - fetched = await user_repo.get_by_userid("lusab-bansen") - assert fetched is not None - assert fetched.userid == "lusab-bansen" - assert fetched.username == "alice" - - -async def test_get_by_username(user_repo: SQLiteUserRepository) -> None: - user = _make_user() - await user_repo.create(user) - - fetched = await user_repo.get_by_username("alice") - assert fetched is not None - assert fetched.username == "alice" - - -async def test_get_by_userid_not_found(user_repo: SQLiteUserRepository) -> None: - result = await user_repo.get_by_userid("nonexistent") - assert result is None - - -async def test_get_by_username_not_found(user_repo: SQLiteUserRepository) -> None: - result = await user_repo.get_by_username("nonexistent") - assert result is None - - -async def test_create_with_groups(user_repo: SQLiteUserRepository) -> None: - user = _make_user(groups=["admin", "users"]) - await user_repo.create(user) - - fetched = await user_repo.get_by_userid("lusab-bansen") - assert fetched is not None - assert sorted(fetched.groups) == ["admin", "users"] - - -async def test_update(user_repo: SQLiteUserRepository) -> None: - user = _make_user() - await user_repo.create(user) - - user.email = "alice@example.com" - user.given_name = "Alice" - updated = await user_repo.update(user) - assert updated.email == "alice@example.com" - assert updated.given_name == "Alice" - - fetched = await user_repo.get_by_userid("lusab-bansen") - assert fetched is not None - assert fetched.email == "alice@example.com" - - -async def test_update_groups(user_repo: SQLiteUserRepository) -> None: - user = _make_user(groups=["users"]) - await user_repo.create(user) - - user.groups = ["admin", "editors"] - await user_repo.update(user) - - fetched = await user_repo.get_by_userid("lusab-bansen") - assert fetched is not None - assert sorted(fetched.groups) == ["admin", "editors"] - - -async def test_list_users(user_repo: SQLiteUserRepository) -> None: - await user_repo.create(_make_user(userid="id-1", username="alice")) - await user_repo.create(_make_user(userid="id-2", username="bob")) - await user_repo.create(_make_user(userid="id-3", username="charlie")) - - users = await user_repo.list_users() - assert len(users) == 3 - - -async def test_list_users_pagination(user_repo: SQLiteUserRepository) -> None: - for i in range(5): - await user_repo.create(_make_user(userid=f"id-{i}", username=f"user-{i}")) - - page1 = await user_repo.list_users(offset=0, limit=2) - page2 = await user_repo.list_users(offset=2, limit=2) - page3 = await user_repo.list_users(offset=4, limit=2) - assert len(page1) == 2 - assert len(page2) == 2 - assert len(page3) == 1 - - -async def test_delete(user_repo: SQLiteUserRepository) -> None: - user = _make_user() - await user_repo.create(user) - - deleted = await user_repo.delete("lusab-bansen") - assert deleted is True - - fetched = await user_repo.get_by_userid("lusab-bansen") - assert fetched is None - - -async def test_delete_not_found(user_repo: SQLiteUserRepository) -> None: - deleted = await user_repo.delete("nonexistent") - assert deleted is False - - -async def test_delete_cascades_groups(user_repo: SQLiteUserRepository) -> None: - user = _make_user(groups=["admin"]) - await user_repo.create(user) - - await user_repo.delete("lusab-bansen") - - # Verify groups were cascaded - async with user_repo._db.execute("SELECT COUNT(*) FROM user_groups WHERE userid = ?", ("lusab-bansen",)) as cursor: - row = await cursor.fetchone() - assert row[0] == 0 - - -async def test_create_duplicate_username(user_repo: SQLiteUserRepository) -> None: - await user_repo.create(_make_user()) - - with pytest.raises(DuplicateError): - await user_repo.create(_make_user(userid="different-id", username="alice")) - - -async def test_roundtrip_preserves_all_fields(user_repo: SQLiteUserRepository) -> None: - user = _make_user( - preferred_username="ally", - given_name="Alice", - family_name="Smith", - nickname="Al", - email="alice@example.com", - email_verified=True, - phone_number="+1234567890", - phone_number_verified=True, - picture="https://example.com/alice.jpg", - locale="en-US", - active=False, - groups=["admin", "users"], - ) - await user_repo.create(user) - - fetched = await user_repo.get_by_userid("lusab-bansen") - assert fetched is not None - assert fetched.preferred_username == "ally" - assert fetched.given_name == "Alice" - assert fetched.family_name == "Smith" - assert fetched.nickname == "Al" - assert fetched.email == "alice@example.com" - assert fetched.email_verified is True - assert fetched.phone_number == "+1234567890" - assert fetched.phone_number_verified is True - assert fetched.picture == "https://example.com/alice.jpg" - assert fetched.locale == "en-US" - assert fetched.active is False - assert sorted(fetched.groups) == ["admin", "users"] - assert fetched.created_at == user.created_at - assert fetched.updated_at == user.updated_at -``` - -**Step 3: Run tests to verify they fail** - -Run: `pytest tests/test_store/test_sqlite_user_repo.py -v` -Expected: FAIL — cannot import `SQLiteUserRepository` - -**Step 4: Write the implementation** - -```python -# src/fastapi_oidc_op/store/sqlite/repositories.py -from datetime import UTC, datetime - -import aiosqlite - -from fastapi_oidc_op.models import ( - MagicLink, - PasswordCredential, - User, - WebAuthnCredential, -) -from fastapi_oidc_op.store.exceptions import DuplicateError - - -class SQLiteUserRepository: - def __init__(self, db: aiosqlite.Connection) -> None: - self._db = db - - async def _get_groups(self, userid: str) -> list[str]: - async with self._db.execute( - "SELECT group_name FROM user_groups WHERE userid = ? ORDER BY group_name", (userid,) - ) as cursor: - return [row[0] async for row in cursor] - - async def _set_groups(self, userid: str, groups: list[str]) -> None: - await self._db.execute("DELETE FROM user_groups WHERE userid = ?", (userid,)) - for group in groups: - await self._db.execute("INSERT INTO user_groups (userid, group_name) VALUES (?, ?)", (userid, group)) - - def _row_to_user(self, row: aiosqlite.Row, groups: list[str]) -> User: - return User( - userid=row["userid"], - username=row["username"], - preferred_username=row["preferred_username"], - given_name=row["given_name"], - family_name=row["family_name"], - nickname=row["nickname"], - email=row["email"], - email_verified=bool(row["email_verified"]), - phone_number=row["phone_number"], - phone_number_verified=bool(row["phone_number_verified"]), - picture=row["picture"], - locale=row["locale"], - active=bool(row["active"]), - created_at=datetime.fromisoformat(row["created_at"]), - updated_at=datetime.fromisoformat(row["updated_at"]), - groups=groups, - ) - - async def create(self, user: User) -> User: - try: - await self._db.execute( - """ - INSERT INTO users ( - userid, username, preferred_username, given_name, family_name, - nickname, email, email_verified, phone_number, phone_number_verified, - picture, locale, active, created_at, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - user.userid, - user.username, - user.preferred_username, - user.given_name, - user.family_name, - user.nickname, - user.email, - int(user.email_verified), - user.phone_number, - int(user.phone_number_verified), - user.picture, - user.locale, - int(user.active), - user.created_at.isoformat(), - user.updated_at.isoformat(), - ), - ) - await self._set_groups(user.userid, user.groups) - await self._db.commit() - except aiosqlite.IntegrityError as e: - raise DuplicateError(str(e)) from e - return user - - async def get_by_userid(self, userid: str) -> User | None: - async with self._db.execute("SELECT * FROM users WHERE userid = ?", (userid,)) as cursor: - row = await cursor.fetchone() - if row is None: - return None - groups = await self._get_groups(userid) - return self._row_to_user(row, groups) - - async def get_by_username(self, username: str) -> User | None: - async with self._db.execute("SELECT * FROM users WHERE username = ?", (username,)) as cursor: - row = await cursor.fetchone() - if row is None: - return None - groups = await self._get_groups(row["userid"]) - return self._row_to_user(row, groups) - - async def update(self, user: User) -> User: - user.updated_at = datetime.now(UTC) - await self._db.execute( - """ - UPDATE users SET - username = ?, preferred_username = ?, given_name = ?, family_name = ?, - nickname = ?, email = ?, email_verified = ?, phone_number = ?, - phone_number_verified = ?, picture = ?, locale = ?, active = ?, - updated_at = ? - WHERE userid = ? - """, - ( - user.username, - user.preferred_username, - user.given_name, - user.family_name, - user.nickname, - user.email, - int(user.email_verified), - user.phone_number, - int(user.phone_number_verified), - user.picture, - user.locale, - int(user.active), - user.updated_at.isoformat(), - user.userid, - ), - ) - await self._set_groups(user.userid, user.groups) - await self._db.commit() - return user - - async def list_users(self, offset: int = 0, limit: int = 100) -> list[User]: - async with self._db.execute( - "SELECT * FROM users ORDER BY username LIMIT ? OFFSET ?", (limit, offset) - ) as cursor: - rows = await cursor.fetchall() - users = [] - for row in rows: - groups = await self._get_groups(row["userid"]) - users.append(self._row_to_user(row, groups)) - return users - - async def delete(self, userid: str) -> bool: - cursor = await self._db.execute("DELETE FROM users WHERE userid = ?", (userid,)) - await self._db.commit() - return cursor.rowcount > 0 -``` - -**Step 5: Run tests to verify they pass** - -Run: `pytest tests/test_store/test_sqlite_user_repo.py -v` -Expected: All PASSED - -**Step 6: Run quality gate** - -Run: `./scripts/check.sh` -Expected: All green - -**Step 7: Commit** - -```bash -git add src/fastapi_oidc_op/store/sqlite/repositories.py tests/test_store/conftest.py tests/test_store/test_sqlite_user_repo.py -git commit -m "feat: add SQLiteUserRepository with tests" -``` - ---- - -### Task 5: SQLiteCredentialRepository - -**Files:** -- Modify: `src/fastapi_oidc_op/store/sqlite/repositories.py` -- Create: `tests/test_store/test_sqlite_credential_repo.py` - -**Step 1: Write the failing tests** - -```python -# tests/test_store/test_sqlite_credential_repo.py -import aiosqlite -import pytest - -from fastapi_oidc_op.models import PasswordCredential, User, WebAuthnCredential -from fastapi_oidc_op.store.exceptions import DuplicateError -from fastapi_oidc_op.store.protocols import CredentialRepository -from fastapi_oidc_op.store.sqlite.repositories import ( - SQLiteCredentialRepository, - SQLiteUserRepository, -) - - -@pytest.fixture -def user_repo(db: aiosqlite.Connection) -> SQLiteUserRepository: - return SQLiteUserRepository(db) - - -@pytest.fixture -def credential_repo(db: aiosqlite.Connection) -> SQLiteCredentialRepository: - return SQLiteCredentialRepository(db) - - -@pytest.fixture -async def alice(user_repo: SQLiteUserRepository) -> User: - return await user_repo.create(User(userid="lusab-bansen", username="alice")) - - -async def test_implements_protocol(credential_repo: SQLiteCredentialRepository) -> None: - assert isinstance(credential_repo, CredentialRepository) - - -async def test_create_and_get_webauthn(credential_repo: SQLiteCredentialRepository, alice: User) -> None: - cred = WebAuthnCredential( - user_id=alice.userid, - credential_id=b"\x01\x02\x03", - public_key=b"\x04\x05\x06", - device_name="YubiKey", - ) - created = await credential_repo.create_webauthn(cred) - assert created.user_id == alice.userid - - creds = await credential_repo.get_webauthn_by_user(alice.userid) - assert len(creds) == 1 - assert creds[0].credential_id == b"\x01\x02\x03" - assert creds[0].public_key == b"\x04\x05\x06" - assert creds[0].device_name == "YubiKey" - - -async def test_get_webauthn_by_credential_id(credential_repo: SQLiteCredentialRepository, alice: User) -> None: - cred = WebAuthnCredential( - user_id=alice.userid, - credential_id=b"\x01\x02\x03", - public_key=b"\x04\x05\x06", - ) - await credential_repo.create_webauthn(cred) - - fetched = await credential_repo.get_webauthn_by_credential_id(b"\x01\x02\x03") - assert fetched is not None - assert fetched.user_id == alice.userid - - -async def test_get_webauthn_by_credential_id_not_found(credential_repo: SQLiteCredentialRepository) -> None: - result = await credential_repo.get_webauthn_by_credential_id(b"\xff\xff") - assert result is None - - -async def test_multiple_webauthn_per_user(credential_repo: SQLiteCredentialRepository, alice: User) -> None: - for i in range(3): - cred = WebAuthnCredential( - user_id=alice.userid, - credential_id=bytes([i]), - public_key=b"\x00", - device_name=f"Key {i}", - ) - await credential_repo.create_webauthn(cred) - - creds = await credential_repo.get_webauthn_by_user(alice.userid) - assert len(creds) == 3 - - -async def test_update_webauthn(credential_repo: SQLiteCredentialRepository, alice: User) -> None: - cred = WebAuthnCredential( - user_id=alice.userid, - credential_id=b"\x01\x02\x03", - public_key=b"\x04\x05\x06", - sign_count=0, - device_name="Old Name", - ) - await credential_repo.create_webauthn(cred) - - cred.sign_count = 42 - cred.device_name = "New Name" - updated = await credential_repo.update_webauthn(cred) - assert updated.sign_count == 42 - assert updated.device_name == "New Name" - - fetched = await credential_repo.get_webauthn_by_credential_id(b"\x01\x02\x03") - assert fetched is not None - assert fetched.sign_count == 42 - assert fetched.device_name == "New Name" - - -async def test_delete_webauthn(credential_repo: SQLiteCredentialRepository, alice: User) -> None: - cred = WebAuthnCredential( - user_id=alice.userid, - credential_id=b"\x01\x02\x03", - public_key=b"\x04\x05\x06", - ) - await credential_repo.create_webauthn(cred) - - deleted = await credential_repo.delete_webauthn(alice.userid, b"\x01\x02\x03") - assert deleted is True - - creds = await credential_repo.get_webauthn_by_user(alice.userid) - assert len(creds) == 0 - - -async def test_delete_webauthn_not_found(credential_repo: SQLiteCredentialRepository) -> None: - deleted = await credential_repo.delete_webauthn("nobody", b"\xff") - assert deleted is False - - -async def test_create_and_get_password(credential_repo: SQLiteCredentialRepository, alice: User) -> None: - cred = PasswordCredential( - user_id=alice.userid, - password_hash="$argon2id$v=19$m=65536,t=3,p=4$hash", - ) - created = await credential_repo.create_password(cred) - assert created.user_id == alice.userid - - fetched = await credential_repo.get_password_by_user(alice.userid) - assert fetched is not None - assert fetched.password_hash == "$argon2id$v=19$m=65536,t=3,p=4$hash" - - -async def test_get_password_not_found(credential_repo: SQLiteCredentialRepository) -> None: - result = await credential_repo.get_password_by_user("nobody") - assert result is None - - -async def test_delete_password(credential_repo: SQLiteCredentialRepository, alice: User) -> None: - cred = PasswordCredential( - user_id=alice.userid, - password_hash="$argon2id$v=19$hash", - ) - await credential_repo.create_password(cred) - - deleted = await credential_repo.delete_password(alice.userid) - assert deleted is True - - fetched = await credential_repo.get_password_by_user(alice.userid) - assert fetched is None - - -async def test_delete_password_not_found(credential_repo: SQLiteCredentialRepository) -> None: - deleted = await credential_repo.delete_password("nobody") - assert deleted is False - - -async def test_create_duplicate_password(credential_repo: SQLiteCredentialRepository, alice: User) -> None: - cred = PasswordCredential(user_id=alice.userid, password_hash="hash1") - await credential_repo.create_password(cred) - - with pytest.raises(DuplicateError): - cred2 = PasswordCredential(user_id=alice.userid, password_hash="hash2") - await credential_repo.create_password(cred2) - - -async def test_create_duplicate_webauthn(credential_repo: SQLiteCredentialRepository, alice: User) -> None: - cred = WebAuthnCredential(user_id=alice.userid, credential_id=b"\x01", public_key=b"\x02") - await credential_repo.create_webauthn(cred) - - with pytest.raises(DuplicateError): - await credential_repo.create_webauthn(cred) -``` - -**Step 2: Run tests to verify they fail** - -Run: `pytest tests/test_store/test_sqlite_credential_repo.py -v` -Expected: FAIL — cannot import `SQLiteCredentialRepository` - -**Step 3: Add SQLiteCredentialRepository to `repositories.py`** - -Append to `src/fastapi_oidc_op/store/sqlite/repositories.py`: - -```python -class SQLiteCredentialRepository: - def __init__(self, db: aiosqlite.Connection) -> None: - self._db = db - - def _row_to_webauthn(self, row: aiosqlite.Row) -> WebAuthnCredential: - return WebAuthnCredential( - user_id=row["user_id"], - credential_id=bytes(row["credential_id"]), - public_key=bytes(row["public_key"]), - sign_count=row["sign_count"], - device_name=row["device_name"], - created_at=datetime.fromisoformat(row["created_at"]), - ) - - def _row_to_password(self, row: aiosqlite.Row) -> PasswordCredential: - return PasswordCredential( - user_id=row["user_id"], - password_hash=row["password_hash"], - created_at=datetime.fromisoformat(row["created_at"]), - ) - - async def create_webauthn(self, credential: WebAuthnCredential) -> WebAuthnCredential: - try: - await self._db.execute( - """ - INSERT INTO webauthn_credentials (user_id, credential_id, public_key, sign_count, device_name, created_at) - VALUES (?, ?, ?, ?, ?, ?) - """, - ( - credential.user_id, - credential.credential_id, - credential.public_key, - credential.sign_count, - credential.device_name, - credential.created_at.isoformat(), - ), - ) - await self._db.commit() - except aiosqlite.IntegrityError as e: - raise DuplicateError(str(e)) from e - return credential - - async def create_password(self, credential: PasswordCredential) -> PasswordCredential: - try: - await self._db.execute( - "INSERT INTO password_credentials (user_id, password_hash, created_at) VALUES (?, ?, ?)", - (credential.user_id, credential.password_hash, credential.created_at.isoformat()), - ) - await self._db.commit() - except aiosqlite.IntegrityError as e: - raise DuplicateError(str(e)) from e - return credential - - async def get_webauthn_by_user(self, user_id: str) -> list[WebAuthnCredential]: - async with self._db.execute( - "SELECT * FROM webauthn_credentials WHERE user_id = ?", (user_id,) - ) as cursor: - rows = await cursor.fetchall() - return [self._row_to_webauthn(row) for row in rows] - - async def get_webauthn_by_credential_id(self, credential_id: bytes) -> WebAuthnCredential | None: - async with self._db.execute( - "SELECT * FROM webauthn_credentials WHERE credential_id = ?", (credential_id,) - ) as cursor: - row = await cursor.fetchone() - if row is None: - return None - return self._row_to_webauthn(row) - - async def get_password_by_user(self, user_id: str) -> PasswordCredential | None: - async with self._db.execute( - "SELECT * FROM password_credentials WHERE user_id = ?", (user_id,) - ) as cursor: - row = await cursor.fetchone() - if row is None: - return None - return self._row_to_password(row) - - async def update_webauthn(self, credential: WebAuthnCredential) -> WebAuthnCredential: - await self._db.execute( - "UPDATE webauthn_credentials SET sign_count = ?, device_name = ? WHERE user_id = ? AND credential_id = ?", - (credential.sign_count, credential.device_name, credential.user_id, credential.credential_id), - ) - await self._db.commit() - return credential - - async def delete_webauthn(self, user_id: str, credential_id: bytes) -> bool: - cursor = await self._db.execute( - "DELETE FROM webauthn_credentials WHERE user_id = ? AND credential_id = ?", - (user_id, credential_id), - ) - await self._db.commit() - return cursor.rowcount > 0 - - async def delete_password(self, user_id: str) -> bool: - cursor = await self._db.execute( - "DELETE FROM password_credentials WHERE user_id = ?", (user_id,) - ) - await self._db.commit() - return cursor.rowcount > 0 -``` - -**Step 4: Run tests to verify they pass** - -Run: `pytest tests/test_store/test_sqlite_credential_repo.py -v` -Expected: All PASSED - -**Step 5: Run quality gate** - -Run: `./scripts/check.sh` -Expected: All green - -**Step 6: Commit** - -```bash -git add src/fastapi_oidc_op/store/sqlite/repositories.py tests/test_store/test_sqlite_credential_repo.py -git commit -m "feat: add SQLiteCredentialRepository with tests" -``` - ---- - -### Task 6: SQLiteMagicLinkRepository - -**Files:** -- Modify: `src/fastapi_oidc_op/store/sqlite/repositories.py` -- Create: `tests/test_store/test_sqlite_magic_link_repo.py` - -**Step 1: Write the failing tests** - -```python -# tests/test_store/test_sqlite_magic_link_repo.py -from datetime import UTC, datetime, timedelta - -import aiosqlite -import pytest - -from fastapi_oidc_op.models import MagicLink -from fastapi_oidc_op.store.exceptions import DuplicateError -from fastapi_oidc_op.store.protocols import MagicLinkRepository -from fastapi_oidc_op.store.sqlite.repositories import SQLiteMagicLinkRepository - - -@pytest.fixture -def magic_link_repo(db: aiosqlite.Connection) -> SQLiteMagicLinkRepository: - return SQLiteMagicLinkRepository(db) - - -def _make_link(**overrides) -> MagicLink: - defaults = { - "token": "abc123", - "username": "alice", - "expires_at": datetime.now(UTC) + timedelta(hours=24), - } - defaults.update(overrides) - return MagicLink(**defaults) - - -async def test_implements_protocol(magic_link_repo: SQLiteMagicLinkRepository) -> None: - assert isinstance(magic_link_repo, MagicLinkRepository) - - -async def test_create_and_get_by_token(magic_link_repo: SQLiteMagicLinkRepository) -> None: - link = _make_link() - created = await magic_link_repo.create(link) - assert created.token == "abc123" - - fetched = await magic_link_repo.get_by_token("abc123") - assert fetched is not None - assert fetched.token == "abc123" - assert fetched.username == "alice" - assert fetched.used is False - - -async def test_get_by_token_not_found(magic_link_repo: SQLiteMagicLinkRepository) -> None: - result = await magic_link_repo.get_by_token("nonexistent") - assert result is None - - -async def test_mark_used(magic_link_repo: SQLiteMagicLinkRepository) -> None: - link = _make_link() - await magic_link_repo.create(link) - - marked = await magic_link_repo.mark_used("abc123") - assert marked is True - - fetched = await magic_link_repo.get_by_token("abc123") - assert fetched is not None - assert fetched.used is True - - -async def test_mark_used_not_found(magic_link_repo: SQLiteMagicLinkRepository) -> None: - marked = await magic_link_repo.mark_used("nonexistent") - assert marked is False - - -async def test_delete_expired(magic_link_repo: SQLiteMagicLinkRepository) -> None: - # Create an expired link - expired = _make_link(token="expired", expires_at=datetime.now(UTC) - timedelta(hours=1)) - await magic_link_repo.create(expired) - - # Create a valid link - valid = _make_link(token="valid", expires_at=datetime.now(UTC) + timedelta(hours=24)) - await magic_link_repo.create(valid) - - count = await magic_link_repo.delete_expired() - assert count == 1 - - # Expired should be gone - assert await magic_link_repo.get_by_token("expired") is None - # Valid should remain - assert await magic_link_repo.get_by_token("valid") is not None - - -async def test_delete_expired_skips_used(magic_link_repo: SQLiteMagicLinkRepository) -> None: - # Create an expired but used link - link = _make_link(token="used-expired", expires_at=datetime.now(UTC) - timedelta(hours=1)) - await magic_link_repo.create(link) - await magic_link_repo.mark_used("used-expired") - - count = await magic_link_repo.delete_expired() - assert count == 0 - - -async def test_create_with_optional_fields(magic_link_repo: SQLiteMagicLinkRepository) -> None: - link = _make_link(created_by="admin", note="Welcome aboard") - await magic_link_repo.create(link) - - fetched = await magic_link_repo.get_by_token("abc123") - assert fetched is not None - assert fetched.created_by == "admin" - assert fetched.note == "Welcome aboard" - - -async def test_create_duplicate_token(magic_link_repo: SQLiteMagicLinkRepository) -> None: - await magic_link_repo.create(_make_link()) - - with pytest.raises(DuplicateError): - await magic_link_repo.create(_make_link()) -``` - -**Step 2: Run tests to verify they fail** - -Run: `pytest tests/test_store/test_sqlite_magic_link_repo.py -v` -Expected: FAIL — cannot import `SQLiteMagicLinkRepository` - -**Step 3: Add SQLiteMagicLinkRepository to `repositories.py`** - -Append to `src/fastapi_oidc_op/store/sqlite/repositories.py`: - -```python -class SQLiteMagicLinkRepository: - def __init__(self, db: aiosqlite.Connection) -> None: - self._db = db - - def _row_to_magic_link(self, row: aiosqlite.Row) -> MagicLink: - return MagicLink( - token=row["token"], - username=row["username"], - expires_at=datetime.fromisoformat(row["expires_at"]), - used=bool(row["used"]), - created_by=row["created_by"], - note=row["note"], - ) - - async def create(self, link: MagicLink) -> MagicLink: - try: - await self._db.execute( - "INSERT INTO magic_links (token, username, expires_at, used, created_by, note) VALUES (?, ?, ?, ?, ?, ?)", - ( - link.token, - link.username, - link.expires_at.isoformat(), - int(link.used), - link.created_by, - link.note, - ), - ) - await self._db.commit() - except aiosqlite.IntegrityError as e: - raise DuplicateError(str(e)) from e - return link - - async def get_by_token(self, token: str) -> MagicLink | None: - async with self._db.execute("SELECT * FROM magic_links WHERE token = ?", (token,)) as cursor: - row = await cursor.fetchone() - if row is None: - return None - return self._row_to_magic_link(row) - - async def mark_used(self, token: str) -> bool: - cursor = await self._db.execute( - "UPDATE magic_links SET used = 1 WHERE token = ?", (token,) - ) - await self._db.commit() - return cursor.rowcount > 0 - - async def delete_expired(self) -> int: - now = datetime.now(UTC).isoformat() - cursor = await self._db.execute( - "DELETE FROM magic_links WHERE expires_at < ? AND used = 0", (now,) - ) - await self._db.commit() - return cursor.rowcount -``` - -**Step 4: Run tests to verify they pass** - -Run: `pytest tests/test_store/test_sqlite_magic_link_repo.py -v` -Expected: All PASSED - -**Step 5: Run quality gate** - -Run: `./scripts/check.sh` -Expected: All green - -**Step 6: Commit** - -```bash -git add src/fastapi_oidc_op/store/sqlite/repositories.py tests/test_store/test_sqlite_magic_link_repo.py -git commit -m "feat: add SQLiteMagicLinkRepository with tests" -``` - ---- - -### Task 7: Lifespan Integration and Dependencies - -**Files:** -- Modify: `src/fastapi_oidc_op/app.py` -- Create: `src/fastapi_oidc_op/dependencies.py` -- Modify: `tests/conftest.py` -- Modify: `tests/test_app.py` - -**Step 1: Write the failing tests** - -Add to `tests/test_app.py`: - -```python -# Add these tests to the existing file - -async def test_app_has_repos_on_state(client: AsyncClient) -> None: - """Repos should be available on app.state after lifespan startup.""" - from fastapi_oidc_op.store.protocols import ( - CredentialRepository, - MagicLinkRepository, - UserRepository, - ) - - app = client._transport.app # type: ignore[union-attr] - assert isinstance(app.state.user_repo, UserRepository) - assert isinstance(app.state.credential_repo, CredentialRepository) - assert isinstance(app.state.magic_link_repo, MagicLinkRepository) - - -async def test_dependency_functions() -> None: - """Dependency functions should return Protocol-typed repos.""" - from unittest.mock import MagicMock - - from fastapi_oidc_op.dependencies import ( - get_credential_repo, - get_magic_link_repo, - get_user_repo, - ) - - request = MagicMock() - request.app.state.user_repo = "user_repo_sentinel" - request.app.state.credential_repo = "credential_repo_sentinel" - request.app.state.magic_link_repo = "magic_link_repo_sentinel" - - assert get_user_repo(request) == "user_repo_sentinel" - assert get_credential_repo(request) == "credential_repo_sentinel" - assert get_magic_link_repo(request) == "magic_link_repo_sentinel" -``` - -Update `tests/conftest.py` so the client fixture uses lifespan (the app needs to run its lifespan to initialize repos). The settings fixture should use `:memory:` for SQLite: - -```python -# tests/conftest.py -from collections.abc import AsyncIterator - -import pytest -from httpx import ASGITransport, AsyncClient - -from fastapi_oidc_op.app import create_app -from fastapi_oidc_op.config import Settings - - -@pytest.fixture -def settings() -> Settings: - return Settings(issuer="http://localhost:8000", sqlite_path=":memory:") - - -@pytest.fixture -async def client(settings: Settings) -> AsyncIterator[AsyncClient]: - app = create_app(settings) - transport = ASGITransport(app=app) - async with AsyncClient(transport=transport, base_url=settings.issuer) as ac: - yield ac -``` - -**Step 2: Run tests to verify they fail** - -Run: `pytest tests/test_app.py -v` -Expected: FAIL — app has no lifespan, no repos on state - -**Step 3: Implement the lifespan in `app.py`** - -```python -# src/fastapi_oidc_op/app.py -from collections.abc import AsyncIterator -from contextlib import asynccontextmanager -from pathlib import Path - -import aiosqlite -from fastapi import FastAPI - -from fastapi_oidc_op.config import Settings, StorageBackend -from fastapi_oidc_op.store.sqlite.migrations import run_migrations -from fastapi_oidc_op.store.sqlite.repositories import ( - SQLiteCredentialRepository, - SQLiteMagicLinkRepository, - SQLiteUserRepository, -) - -MIGRATIONS_DIR = Path(__file__).parent / "store" / "sqlite" / "migrations" - - -@asynccontextmanager -async def lifespan(app: FastAPI) -> AsyncIterator[None]: - settings: Settings = app.state.settings - if settings.storage_backend == StorageBackend.SQLITE: - # Ensure parent directory exists (skip for :memory:) - if settings.sqlite_path != ":memory:": - Path(settings.sqlite_path).parent.mkdir(parents=True, exist_ok=True) - db = await aiosqlite.connect(settings.sqlite_path) - db.row_factory = aiosqlite.Row - await db.execute("PRAGMA journal_mode=WAL") - await db.execute("PRAGMA foreign_keys=ON") - await run_migrations(db, MIGRATIONS_DIR) - app.state.user_repo = SQLiteUserRepository(db) - app.state.credential_repo = SQLiteCredentialRepository(db) - app.state.magic_link_repo = SQLiteMagicLinkRepository(db) - yield - await db.close() - else: - raise NotImplementedError("MongoDB backend not yet implemented") - - -def create_app(settings: Settings | None = None) -> FastAPI: - if settings is None: - settings = Settings() # type: ignore[call-arg] - - app = FastAPI( - title="FastAPI OIDC OP", - version="0.1.0", - docs_url="/docs" if settings.debug else None, - redoc_url=None, - lifespan=lifespan, - ) - - app.state.settings = settings - - @app.get("/health") - async def health() -> dict[str, str]: - return {"status": "ok"} - - return app -``` - -**Step 4: Implement `dependencies.py`** - -```python -# src/fastapi_oidc_op/dependencies.py -from fastapi import Request - -from fastapi_oidc_op.store.protocols import ( - CredentialRepository, - MagicLinkRepository, - UserRepository, -) - - -def get_user_repo(request: Request) -> UserRepository: - return request.app.state.user_repo - - -def get_credential_repo(request: Request) -> CredentialRepository: - return request.app.state.credential_repo - - -def get_magic_link_repo(request: Request) -> MagicLinkRepository: - return request.app.state.magic_link_repo -``` - -**Step 5: Run tests to verify they pass** - -Run: `pytest tests/test_app.py -v` -Expected: All PASSED - -**Step 6: Run full quality gate** - -Run: `./scripts/check.sh` -Expected: All green (all existing tests still pass with the updated conftest) - -**Step 7: Commit** - -```bash -git add src/fastapi_oidc_op/app.py src/fastapi_oidc_op/dependencies.py tests/conftest.py tests/test_app.py -git commit -m "feat: add lifespan integration and dependency injection" -``` - ---- - -### Task 8: Update Design Document - -**Files:** -- Modify: `docs/plans/2026-02-12-sqlite-repositories-design.md` - -**Step 1: Update the schema status and next steps** - -Change the schema status line from: -> **Status:** Schema section was presented to user and NOT YET explicitly approved. User requested shutdown before responding. **Ask for confirmation before proceeding.** - -To: -> **Status:** Schema approved. Implementation complete. - -Update the "Next Steps" section to reflect completion and point to the next phase (Authentication). - -**Step 2: Commit** - -```bash -git add docs/plans/2026-02-12-sqlite-repositories-design.md -git commit -m "docs: update sqlite design doc to reflect completed implementation" -``` diff --git a/docs/plans/2026-02-16-auth-routes-plan.md b/docs/plans/2026-02-16-auth-routes-plan.md deleted file mode 100644 index 23d1b79..0000000 --- a/docs/plans/2026-02-16-auth-routes-plan.md +++ /dev/null @@ -1,1286 +0,0 @@ -# Authentication Routes Implementation Plan (Phase 4) - -> **Status: COMPLETE** — All 10 tasks implemented and passing. 120 tests, full quality gate green. - -> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. - -**Goal:** Add standalone login/registration + credential management routes (no OIDC yet) using sessions, Jinja2 templates, HTMX, and the existing auth services + repositories. - -**Architecture:** FastAPI routers for `authn` and `manage` are mounted in `create_app()`. Starlette `SessionMiddleware` stores a minimal session (`userid`, `username`) plus WebAuthn transient state. HTML is server-rendered with Jinja2; HTMX progressively enhances forms; `webauthn.js` bridges browser WebAuthn APIs. Auth services (`PasswordService`, `WebAuthnService`, `MagicLinkService`) are instantiated in the lifespan and stored on `app.state` alongside the repositories. - -**Tech Stack:** FastAPI, Starlette SessionMiddleware, Jinja2 templates, HTMX (vendored), python-fido2 >=2.1, argon2-cffi, aiosqlite. - -**Quality gate:** `./scripts/check.sh` - -**Known constraints:** -- Starlette `SessionMiddleware` uses signed cookies (~4KB limit). WebAuthn challenge state is small (~100 bytes), so this is fine. If state grows, switch to server-side session storage. -- The `fido2` library's `Fido2Server.authenticate_complete()` returns `AttestedCredentialData` (matched credential), NOT the new sign count. The sign count must be extracted from the raw `AuthenticationResponse.response.authenticator_data.counter`. -- WebAuthn options returned by `fido2` contain `bytes` fields (`challenge`, `user.id`, credential IDs). The library provides `fido2.utils.websafe_encode`/`websafe_decode` for base64url conversion. The `dict(options)` output from `begin_registration`/`begin_authentication` is already JSON-serializable as the library handles encoding internally via its CBOR/JSON mapping. - -**Discoveries during implementation:** -- `itsdangerous` package was needed for Starlette's `SessionMiddleware` — added via `uv add itsdangerous` -- `ty` type checker flags `app.add_middleware(SessionMiddleware, ...)` as invalid argument type — needs `# type: ignore[arg-type]` -- The `_count_credentials` helper in `manage/routes.py` needs `# type: ignore[union-attr]` on the cred_repo calls since it takes `object` type -- Magic link service is at `fastapi_oidc_op.invite.service.MagicLinkService` (not `authn/magic_link.py`) -- The `PasswordService.verify()` takes `(password_hash, password)` — hash first, then plaintext -- `AttestedCredentialData` is a `bytes` subclass — reconstruct from stored bytes via `AttestedCredentialData(stored_bytes)`, not `from_ctap_object()` - ---- - -### Task 1: Config + App Wiring + Templates + Static Files [DONE] - -**Files:** -- Modify: `src/fastapi_oidc_op/config.py` -- Modify: `src/fastapi_oidc_op/app.py` -- Create: `src/fastapi_oidc_op/authn/routes.py` -- Create: `src/fastapi_oidc_op/manage/routes.py` -- Create: `src/fastapi_oidc_op/templates/base.html` -- Create: `src/fastapi_oidc_op/templates/login.html` -- Create: `src/fastapi_oidc_op/static/style.css` -- Create: `src/fastapi_oidc_op/static/htmx.min.js` -- Create: `tests/test_auth_routes/__init__.py` -- Create: `tests/test_auth_routes/test_pages.py` - -**Why merged:** The original plan had Tasks 1 and 2 as separate steps, but Task 1's tests could never pass without Task 2's templates and static files. Merging them gives a clean red-green cycle. - -**Step 1: Write the failing tests** - -Create `tests/test_auth_routes/__init__.py` (empty file). - -Create `tests/test_auth_routes/test_pages.py`: - -```python -from httpx import AsyncClient - - -async def test_get_login_page_contains_form(client: AsyncClient) -> None: - res = await client.get("/login") - assert res.status_code == 200 - assert "