porchlight/docs/plans/2026-02-13-sqlite-repositories-plan.md

44 KiB

SQLite Repositories Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Implement SQLite-backed repository classes that satisfy the existing Protocol interfaces, with a migration runner, lifespan integration, and FastAPI dependency injection.

Architecture: Three repository classes (SQLiteUserRepository, SQLiteCredentialRepository, SQLiteMagicLinkRepository) sharing a single aiosqlite connection, initialized via FastAPI lifespan. Schema managed by numbered SQL migration files applied at startup.

Tech Stack: aiosqlite, SQLite WAL mode, pytest with in-memory SQLite

Quality gate: ./scripts/check.sh (ruff format, ruff check, ty check, pytest)


Task 1: SQL Migration File

Files:

  • Create: src/fastapi_oidc_op/store/sqlite/migrations/001_initial.sql

Step 1: Create the migration file

CREATE TABLE users (
    userid TEXT PRIMARY KEY,
    username TEXT NOT NULL UNIQUE,
    preferred_username TEXT,
    given_name TEXT,
    family_name TEXT,
    nickname TEXT,
    email TEXT,
    email_verified INTEGER NOT NULL DEFAULT 0,
    phone_number TEXT,
    phone_number_verified INTEGER NOT NULL DEFAULT 0,
    picture TEXT,
    locale TEXT,
    active INTEGER NOT NULL DEFAULT 1,
    created_at TEXT NOT NULL,
    updated_at TEXT NOT NULL
);

CREATE TABLE user_groups (
    userid TEXT NOT NULL REFERENCES users(userid) ON DELETE CASCADE,
    group_name TEXT NOT NULL,
    PRIMARY KEY (userid, group_name)
);

CREATE TABLE webauthn_credentials (
    user_id TEXT NOT NULL REFERENCES users(userid) ON DELETE CASCADE,
    credential_id BLOB NOT NULL,
    public_key BLOB NOT NULL,
    sign_count INTEGER NOT NULL DEFAULT 0,
    device_name TEXT NOT NULL DEFAULT '',
    created_at TEXT NOT NULL,
    PRIMARY KEY (user_id, credential_id)
);

CREATE TABLE password_credentials (
    user_id TEXT PRIMARY KEY REFERENCES users(userid) ON DELETE CASCADE,
    password_hash TEXT NOT NULL,
    created_at TEXT NOT NULL
);

CREATE TABLE magic_links (
    token TEXT PRIMARY KEY,
    username TEXT NOT NULL,
    expires_at TEXT NOT NULL,
    used INTEGER NOT NULL DEFAULT 0,
    created_by TEXT,
    note TEXT
);

Step 2: Commit

git add src/fastapi_oidc_op/store/sqlite/migrations/001_initial.sql
git commit -m "feat: add initial SQLite migration schema"

Task 2: Migration Runner

Files:

  • Create: src/fastapi_oidc_op/store/sqlite/migrations.py
  • Test: tests/test_store/test_migrations.py

Step 1: Write the failing tests

# tests/test_store/test_migrations.py
from pathlib import Path

import aiosqlite

from fastapi_oidc_op.store.sqlite.migrations import run_migrations

MIGRATIONS_DIR = Path(__file__).resolve().parent.parent.parent / "src" / "fastapi_oidc_op" / "store" / "sqlite" / "migrations"


async def test_run_migrations_applies_initial() -> None:
    async with aiosqlite.connect(":memory:") as db:
        await db.execute("PRAGMA foreign_keys=ON")
        count = await run_migrations(db, MIGRATIONS_DIR)
        assert count == 1
        # Verify users table exists
        async with db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'") as cursor:
            row = await cursor.fetchone()
        assert row is not None


async def test_run_migrations_skips_already_applied() -> None:
    async with aiosqlite.connect(":memory:") as db:
        await db.execute("PRAGMA foreign_keys=ON")
        first_count = await run_migrations(db, MIGRATIONS_DIR)
        second_count = await run_migrations(db, MIGRATIONS_DIR)
        assert first_count == 1
        assert second_count == 0


async def test_run_migrations_creates_all_tables() -> None:
    async with aiosqlite.connect(":memory:") as db:
        await db.execute("PRAGMA foreign_keys=ON")
        await run_migrations(db, MIGRATIONS_DIR)
        async with db.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") as cursor:
            tables = [row[0] async for row in cursor]
        assert "users" in tables
        assert "user_groups" in tables
        assert "webauthn_credentials" in tables
        assert "password_credentials" in tables
        assert "magic_links" in tables
        assert "_migrations" in tables

Step 2: Run tests to verify they fail

Run: pytest tests/test_store/test_migrations.py -v Expected: FAIL — cannot import run_migrations

Step 3: Write the migration runner

# src/fastapi_oidc_op/store/sqlite/migrations.py
from pathlib import Path

import aiosqlite


async def run_migrations(db: aiosqlite.Connection, migrations_dir: Path) -> int:
    """Apply unapplied SQL migration files in order. Returns count of newly applied migrations."""
    await db.execute(
        """
        CREATE TABLE IF NOT EXISTS _migrations (
            id INTEGER PRIMARY KEY,
            filename TEXT NOT NULL UNIQUE,
            applied_at TEXT NOT NULL DEFAULT (datetime('now'))
        )
        """
    )
    await db.commit()

    # Get already-applied migrations
    async with db.execute("SELECT filename FROM _migrations") as cursor:
        applied = {row[0] async for row in cursor}

    # Find and sort migration files
    migration_files = sorted(migrations_dir.glob("*.sql"))

    count = 0
    for migration_file in migration_files:
        if migration_file.name in applied:
            continue
        sql = migration_file.read_text()
        await db.executescript(sql)
        await db.execute("INSERT INTO _migrations (filename) VALUES (?)", (migration_file.name,))
        await db.commit()
        count += 1

    return count

Step 4: Run tests to verify they pass

Run: pytest tests/test_store/test_migrations.py -v Expected: 3 PASSED

Step 5: Run quality gate

Run: ./scripts/check.sh Expected: All green

Step 6: Commit

git add src/fastapi_oidc_op/store/sqlite/migrations.py tests/test_store/test_migrations.py
git commit -m "feat: add SQLite migration runner"

Task 3: Domain Exception and Store Exports

Files:

  • Create: src/fastapi_oidc_op/store/exceptions.py
  • Test: tests/test_store/test_exceptions.py

Step 1: Write the failing test

# tests/test_store/test_exceptions.py
from fastapi_oidc_op.store.exceptions import DuplicateError


def test_duplicate_error_is_exception() -> None:
    error = DuplicateError("user already exists")
    assert isinstance(error, Exception)
    assert str(error) == "user already exists"

Step 2: Run test to verify it fails

Run: pytest tests/test_store/test_exceptions.py -v Expected: FAIL — cannot import DuplicateError

Step 3: Write the exception

# src/fastapi_oidc_op/store/exceptions.py
class DuplicateError(Exception):
    """Raised when a create operation violates a uniqueness constraint."""

Step 4: Run test to verify it passes

Run: pytest tests/test_store/test_exceptions.py -v Expected: 1 PASSED

Step 5: Commit

git add src/fastapi_oidc_op/store/exceptions.py tests/test_store/test_exceptions.py
git commit -m "feat: add DuplicateError domain exception"

Task 4: SQLiteUserRepository

Files:

  • Create: src/fastapi_oidc_op/store/sqlite/repositories.py
  • Create: tests/test_store/conftest.py (shared fixtures)
  • Create: tests/test_store/test_sqlite_user_repo.py

Step 1: Write shared test fixtures

# tests/test_store/conftest.py
from pathlib import Path

import aiosqlite
import pytest

from fastapi_oidc_op.store.sqlite.migrations import run_migrations

MIGRATIONS_DIR = Path(__file__).resolve().parent.parent.parent / "src" / "fastapi_oidc_op" / "store" / "sqlite" / "migrations"


@pytest.fixture
async def db():
    conn = await aiosqlite.connect(":memory:")
    conn.row_factory = aiosqlite.Row
    await conn.execute("PRAGMA foreign_keys=ON")
    await run_migrations(conn, MIGRATIONS_DIR)
    yield conn
    await conn.close()

Step 2: Write the failing tests

# tests/test_store/test_sqlite_user_repo.py
import aiosqlite
import pytest

from fastapi_oidc_op.models import User
from fastapi_oidc_op.store.exceptions import DuplicateError
from fastapi_oidc_op.store.protocols import UserRepository
from fastapi_oidc_op.store.sqlite.repositories import SQLiteUserRepository


@pytest.fixture
def user_repo(db: aiosqlite.Connection) -> SQLiteUserRepository:
    return SQLiteUserRepository(db)


def _make_user(**overrides) -> User:
    defaults = {"userid": "lusab-bansen", "username": "alice"}
    defaults.update(overrides)
    return User(**defaults)


async def test_implements_protocol(user_repo: SQLiteUserRepository) -> None:
    assert isinstance(user_repo, UserRepository)


async def test_create_and_get_by_userid(user_repo: SQLiteUserRepository) -> None:
    user = _make_user()
    created = await user_repo.create(user)
    assert created.userid == "lusab-bansen"
    assert created.username == "alice"

    fetched = await user_repo.get_by_userid("lusab-bansen")
    assert fetched is not None
    assert fetched.userid == "lusab-bansen"
    assert fetched.username == "alice"


async def test_get_by_username(user_repo: SQLiteUserRepository) -> None:
    user = _make_user()
    await user_repo.create(user)

    fetched = await user_repo.get_by_username("alice")
    assert fetched is not None
    assert fetched.username == "alice"


async def test_get_by_userid_not_found(user_repo: SQLiteUserRepository) -> None:
    result = await user_repo.get_by_userid("nonexistent")
    assert result is None


async def test_get_by_username_not_found(user_repo: SQLiteUserRepository) -> None:
    result = await user_repo.get_by_username("nonexistent")
    assert result is None


async def test_create_with_groups(user_repo: SQLiteUserRepository) -> None:
    user = _make_user(groups=["admin", "users"])
    await user_repo.create(user)

    fetched = await user_repo.get_by_userid("lusab-bansen")
    assert fetched is not None
    assert sorted(fetched.groups) == ["admin", "users"]


async def test_update(user_repo: SQLiteUserRepository) -> None:
    user = _make_user()
    await user_repo.create(user)

    user.email = "alice@example.com"
    user.given_name = "Alice"
    updated = await user_repo.update(user)
    assert updated.email == "alice@example.com"
    assert updated.given_name == "Alice"

    fetched = await user_repo.get_by_userid("lusab-bansen")
    assert fetched is not None
    assert fetched.email == "alice@example.com"


async def test_update_groups(user_repo: SQLiteUserRepository) -> None:
    user = _make_user(groups=["users"])
    await user_repo.create(user)

    user.groups = ["admin", "editors"]
    await user_repo.update(user)

    fetched = await user_repo.get_by_userid("lusab-bansen")
    assert fetched is not None
    assert sorted(fetched.groups) == ["admin", "editors"]


async def test_list_users(user_repo: SQLiteUserRepository) -> None:
    await user_repo.create(_make_user(userid="id-1", username="alice"))
    await user_repo.create(_make_user(userid="id-2", username="bob"))
    await user_repo.create(_make_user(userid="id-3", username="charlie"))

    users = await user_repo.list_users()
    assert len(users) == 3


async def test_list_users_pagination(user_repo: SQLiteUserRepository) -> None:
    for i in range(5):
        await user_repo.create(_make_user(userid=f"id-{i}", username=f"user-{i}"))

    page1 = await user_repo.list_users(offset=0, limit=2)
    page2 = await user_repo.list_users(offset=2, limit=2)
    page3 = await user_repo.list_users(offset=4, limit=2)
    assert len(page1) == 2
    assert len(page2) == 2
    assert len(page3) == 1


async def test_delete(user_repo: SQLiteUserRepository) -> None:
    user = _make_user()
    await user_repo.create(user)

    deleted = await user_repo.delete("lusab-bansen")
    assert deleted is True

    fetched = await user_repo.get_by_userid("lusab-bansen")
    assert fetched is None


async def test_delete_not_found(user_repo: SQLiteUserRepository) -> None:
    deleted = await user_repo.delete("nonexistent")
    assert deleted is False


async def test_delete_cascades_groups(user_repo: SQLiteUserRepository) -> None:
    user = _make_user(groups=["admin"])
    await user_repo.create(user)

    await user_repo.delete("lusab-bansen")

    # Verify groups were cascaded
    async with user_repo._db.execute("SELECT COUNT(*) FROM user_groups WHERE userid = ?", ("lusab-bansen",)) as cursor:
        row = await cursor.fetchone()
    assert row[0] == 0


async def test_create_duplicate_username(user_repo: SQLiteUserRepository) -> None:
    await user_repo.create(_make_user())

    with pytest.raises(DuplicateError):
        await user_repo.create(_make_user(userid="different-id", username="alice"))


async def test_roundtrip_preserves_all_fields(user_repo: SQLiteUserRepository) -> None:
    user = _make_user(
        preferred_username="ally",
        given_name="Alice",
        family_name="Smith",
        nickname="Al",
        email="alice@example.com",
        email_verified=True,
        phone_number="+1234567890",
        phone_number_verified=True,
        picture="https://example.com/alice.jpg",
        locale="en-US",
        active=False,
        groups=["admin", "users"],
    )
    await user_repo.create(user)

    fetched = await user_repo.get_by_userid("lusab-bansen")
    assert fetched is not None
    assert fetched.preferred_username == "ally"
    assert fetched.given_name == "Alice"
    assert fetched.family_name == "Smith"
    assert fetched.nickname == "Al"
    assert fetched.email == "alice@example.com"
    assert fetched.email_verified is True
    assert fetched.phone_number == "+1234567890"
    assert fetched.phone_number_verified is True
    assert fetched.picture == "https://example.com/alice.jpg"
    assert fetched.locale == "en-US"
    assert fetched.active is False
    assert sorted(fetched.groups) == ["admin", "users"]
    assert fetched.created_at == user.created_at
    assert fetched.updated_at == user.updated_at

Step 3: Run tests to verify they fail

Run: pytest tests/test_store/test_sqlite_user_repo.py -v Expected: FAIL — cannot import SQLiteUserRepository

Step 4: Write the implementation

# src/fastapi_oidc_op/store/sqlite/repositories.py
from datetime import UTC, datetime

import aiosqlite

from fastapi_oidc_op.models import (
    MagicLink,
    PasswordCredential,
    User,
    WebAuthnCredential,
)
from fastapi_oidc_op.store.exceptions import DuplicateError


class SQLiteUserRepository:
    def __init__(self, db: aiosqlite.Connection) -> None:
        self._db = db

    async def _get_groups(self, userid: str) -> list[str]:
        async with self._db.execute(
            "SELECT group_name FROM user_groups WHERE userid = ? ORDER BY group_name", (userid,)
        ) as cursor:
            return [row[0] async for row in cursor]

    async def _set_groups(self, userid: str, groups: list[str]) -> None:
        await self._db.execute("DELETE FROM user_groups WHERE userid = ?", (userid,))
        for group in groups:
            await self._db.execute("INSERT INTO user_groups (userid, group_name) VALUES (?, ?)", (userid, group))

    def _row_to_user(self, row: aiosqlite.Row, groups: list[str]) -> User:
        return User(
            userid=row["userid"],
            username=row["username"],
            preferred_username=row["preferred_username"],
            given_name=row["given_name"],
            family_name=row["family_name"],
            nickname=row["nickname"],
            email=row["email"],
            email_verified=bool(row["email_verified"]),
            phone_number=row["phone_number"],
            phone_number_verified=bool(row["phone_number_verified"]),
            picture=row["picture"],
            locale=row["locale"],
            active=bool(row["active"]),
            created_at=datetime.fromisoformat(row["created_at"]),
            updated_at=datetime.fromisoformat(row["updated_at"]),
            groups=groups,
        )

    async def create(self, user: User) -> User:
        try:
            await self._db.execute(
                """
                INSERT INTO users (
                    userid, username, preferred_username, given_name, family_name,
                    nickname, email, email_verified, phone_number, phone_number_verified,
                    picture, locale, active, created_at, updated_at
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    user.userid,
                    user.username,
                    user.preferred_username,
                    user.given_name,
                    user.family_name,
                    user.nickname,
                    user.email,
                    int(user.email_verified),
                    user.phone_number,
                    int(user.phone_number_verified),
                    user.picture,
                    user.locale,
                    int(user.active),
                    user.created_at.isoformat(),
                    user.updated_at.isoformat(),
                ),
            )
            await self._set_groups(user.userid, user.groups)
            await self._db.commit()
        except aiosqlite.IntegrityError as e:
            raise DuplicateError(str(e)) from e
        return user

    async def get_by_userid(self, userid: str) -> User | None:
        async with self._db.execute("SELECT * FROM users WHERE userid = ?", (userid,)) as cursor:
            row = await cursor.fetchone()
        if row is None:
            return None
        groups = await self._get_groups(userid)
        return self._row_to_user(row, groups)

    async def get_by_username(self, username: str) -> User | None:
        async with self._db.execute("SELECT * FROM users WHERE username = ?", (username,)) as cursor:
            row = await cursor.fetchone()
        if row is None:
            return None
        groups = await self._get_groups(row["userid"])
        return self._row_to_user(row, groups)

    async def update(self, user: User) -> User:
        user.updated_at = datetime.now(UTC)
        await self._db.execute(
            """
            UPDATE users SET
                username = ?, preferred_username = ?, given_name = ?, family_name = ?,
                nickname = ?, email = ?, email_verified = ?, phone_number = ?,
                phone_number_verified = ?, picture = ?, locale = ?, active = ?,
                updated_at = ?
            WHERE userid = ?
            """,
            (
                user.username,
                user.preferred_username,
                user.given_name,
                user.family_name,
                user.nickname,
                user.email,
                int(user.email_verified),
                user.phone_number,
                int(user.phone_number_verified),
                user.picture,
                user.locale,
                int(user.active),
                user.updated_at.isoformat(),
                user.userid,
            ),
        )
        await self._set_groups(user.userid, user.groups)
        await self._db.commit()
        return user

    async def list_users(self, offset: int = 0, limit: int = 100) -> list[User]:
        async with self._db.execute(
            "SELECT * FROM users ORDER BY username LIMIT ? OFFSET ?", (limit, offset)
        ) as cursor:
            rows = await cursor.fetchall()
        users = []
        for row in rows:
            groups = await self._get_groups(row["userid"])
            users.append(self._row_to_user(row, groups))
        return users

    async def delete(self, userid: str) -> bool:
        cursor = await self._db.execute("DELETE FROM users WHERE userid = ?", (userid,))
        await self._db.commit()
        return cursor.rowcount > 0

Step 5: Run tests to verify they pass

Run: pytest tests/test_store/test_sqlite_user_repo.py -v Expected: All PASSED

Step 6: Run quality gate

Run: ./scripts/check.sh Expected: All green

Step 7: Commit

git add src/fastapi_oidc_op/store/sqlite/repositories.py tests/test_store/conftest.py tests/test_store/test_sqlite_user_repo.py
git commit -m "feat: add SQLiteUserRepository with tests"

Task 5: SQLiteCredentialRepository

Files:

  • Modify: src/fastapi_oidc_op/store/sqlite/repositories.py
  • Create: tests/test_store/test_sqlite_credential_repo.py

Step 1: Write the failing tests

# tests/test_store/test_sqlite_credential_repo.py
import aiosqlite
import pytest

from fastapi_oidc_op.models import PasswordCredential, User, WebAuthnCredential
from fastapi_oidc_op.store.exceptions import DuplicateError
from fastapi_oidc_op.store.protocols import CredentialRepository
from fastapi_oidc_op.store.sqlite.repositories import (
    SQLiteCredentialRepository,
    SQLiteUserRepository,
)


@pytest.fixture
def user_repo(db: aiosqlite.Connection) -> SQLiteUserRepository:
    return SQLiteUserRepository(db)


@pytest.fixture
def credential_repo(db: aiosqlite.Connection) -> SQLiteCredentialRepository:
    return SQLiteCredentialRepository(db)


@pytest.fixture
async def alice(user_repo: SQLiteUserRepository) -> User:
    return await user_repo.create(User(userid="lusab-bansen", username="alice"))


async def test_implements_protocol(credential_repo: SQLiteCredentialRepository) -> None:
    assert isinstance(credential_repo, CredentialRepository)


async def test_create_and_get_webauthn(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
    cred = WebAuthnCredential(
        user_id=alice.userid,
        credential_id=b"\x01\x02\x03",
        public_key=b"\x04\x05\x06",
        device_name="YubiKey",
    )
    created = await credential_repo.create_webauthn(cred)
    assert created.user_id == alice.userid

    creds = await credential_repo.get_webauthn_by_user(alice.userid)
    assert len(creds) == 1
    assert creds[0].credential_id == b"\x01\x02\x03"
    assert creds[0].public_key == b"\x04\x05\x06"
    assert creds[0].device_name == "YubiKey"


async def test_get_webauthn_by_credential_id(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
    cred = WebAuthnCredential(
        user_id=alice.userid,
        credential_id=b"\x01\x02\x03",
        public_key=b"\x04\x05\x06",
    )
    await credential_repo.create_webauthn(cred)

    fetched = await credential_repo.get_webauthn_by_credential_id(b"\x01\x02\x03")
    assert fetched is not None
    assert fetched.user_id == alice.userid


async def test_get_webauthn_by_credential_id_not_found(credential_repo: SQLiteCredentialRepository) -> None:
    result = await credential_repo.get_webauthn_by_credential_id(b"\xff\xff")
    assert result is None


async def test_multiple_webauthn_per_user(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
    for i in range(3):
        cred = WebAuthnCredential(
            user_id=alice.userid,
            credential_id=bytes([i]),
            public_key=b"\x00",
            device_name=f"Key {i}",
        )
        await credential_repo.create_webauthn(cred)

    creds = await credential_repo.get_webauthn_by_user(alice.userid)
    assert len(creds) == 3


async def test_update_webauthn(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
    cred = WebAuthnCredential(
        user_id=alice.userid,
        credential_id=b"\x01\x02\x03",
        public_key=b"\x04\x05\x06",
        sign_count=0,
        device_name="Old Name",
    )
    await credential_repo.create_webauthn(cred)

    cred.sign_count = 42
    cred.device_name = "New Name"
    updated = await credential_repo.update_webauthn(cred)
    assert updated.sign_count == 42
    assert updated.device_name == "New Name"

    fetched = await credential_repo.get_webauthn_by_credential_id(b"\x01\x02\x03")
    assert fetched is not None
    assert fetched.sign_count == 42
    assert fetched.device_name == "New Name"


async def test_delete_webauthn(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
    cred = WebAuthnCredential(
        user_id=alice.userid,
        credential_id=b"\x01\x02\x03",
        public_key=b"\x04\x05\x06",
    )
    await credential_repo.create_webauthn(cred)

    deleted = await credential_repo.delete_webauthn(alice.userid, b"\x01\x02\x03")
    assert deleted is True

    creds = await credential_repo.get_webauthn_by_user(alice.userid)
    assert len(creds) == 0


async def test_delete_webauthn_not_found(credential_repo: SQLiteCredentialRepository) -> None:
    deleted = await credential_repo.delete_webauthn("nobody", b"\xff")
    assert deleted is False


async def test_create_and_get_password(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
    cred = PasswordCredential(
        user_id=alice.userid,
        password_hash="$argon2id$v=19$m=65536,t=3,p=4$hash",
    )
    created = await credential_repo.create_password(cred)
    assert created.user_id == alice.userid

    fetched = await credential_repo.get_password_by_user(alice.userid)
    assert fetched is not None
    assert fetched.password_hash == "$argon2id$v=19$m=65536,t=3,p=4$hash"


async def test_get_password_not_found(credential_repo: SQLiteCredentialRepository) -> None:
    result = await credential_repo.get_password_by_user("nobody")
    assert result is None


async def test_delete_password(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
    cred = PasswordCredential(
        user_id=alice.userid,
        password_hash="$argon2id$v=19$hash",
    )
    await credential_repo.create_password(cred)

    deleted = await credential_repo.delete_password(alice.userid)
    assert deleted is True

    fetched = await credential_repo.get_password_by_user(alice.userid)
    assert fetched is None


async def test_delete_password_not_found(credential_repo: SQLiteCredentialRepository) -> None:
    deleted = await credential_repo.delete_password("nobody")
    assert deleted is False


async def test_create_duplicate_password(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
    cred = PasswordCredential(user_id=alice.userid, password_hash="hash1")
    await credential_repo.create_password(cred)

    with pytest.raises(DuplicateError):
        cred2 = PasswordCredential(user_id=alice.userid, password_hash="hash2")
        await credential_repo.create_password(cred2)


async def test_create_duplicate_webauthn(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
    cred = WebAuthnCredential(user_id=alice.userid, credential_id=b"\x01", public_key=b"\x02")
    await credential_repo.create_webauthn(cred)

    with pytest.raises(DuplicateError):
        await credential_repo.create_webauthn(cred)

Step 2: Run tests to verify they fail

Run: pytest tests/test_store/test_sqlite_credential_repo.py -v Expected: FAIL — cannot import SQLiteCredentialRepository

Step 3: Add SQLiteCredentialRepository to repositories.py

Append to src/fastapi_oidc_op/store/sqlite/repositories.py:

class SQLiteCredentialRepository:
    def __init__(self, db: aiosqlite.Connection) -> None:
        self._db = db

    def _row_to_webauthn(self, row: aiosqlite.Row) -> WebAuthnCredential:
        return WebAuthnCredential(
            user_id=row["user_id"],
            credential_id=bytes(row["credential_id"]),
            public_key=bytes(row["public_key"]),
            sign_count=row["sign_count"],
            device_name=row["device_name"],
            created_at=datetime.fromisoformat(row["created_at"]),
        )

    def _row_to_password(self, row: aiosqlite.Row) -> PasswordCredential:
        return PasswordCredential(
            user_id=row["user_id"],
            password_hash=row["password_hash"],
            created_at=datetime.fromisoformat(row["created_at"]),
        )

    async def create_webauthn(self, credential: WebAuthnCredential) -> WebAuthnCredential:
        try:
            await self._db.execute(
                """
                INSERT INTO webauthn_credentials (user_id, credential_id, public_key, sign_count, device_name, created_at)
                VALUES (?, ?, ?, ?, ?, ?)
                """,
                (
                    credential.user_id,
                    credential.credential_id,
                    credential.public_key,
                    credential.sign_count,
                    credential.device_name,
                    credential.created_at.isoformat(),
                ),
            )
            await self._db.commit()
        except aiosqlite.IntegrityError as e:
            raise DuplicateError(str(e)) from e
        return credential

    async def create_password(self, credential: PasswordCredential) -> PasswordCredential:
        try:
            await self._db.execute(
                "INSERT INTO password_credentials (user_id, password_hash, created_at) VALUES (?, ?, ?)",
                (credential.user_id, credential.password_hash, credential.created_at.isoformat()),
            )
            await self._db.commit()
        except aiosqlite.IntegrityError as e:
            raise DuplicateError(str(e)) from e
        return credential

    async def get_webauthn_by_user(self, user_id: str) -> list[WebAuthnCredential]:
        async with self._db.execute(
            "SELECT * FROM webauthn_credentials WHERE user_id = ?", (user_id,)
        ) as cursor:
            rows = await cursor.fetchall()
        return [self._row_to_webauthn(row) for row in rows]

    async def get_webauthn_by_credential_id(self, credential_id: bytes) -> WebAuthnCredential | None:
        async with self._db.execute(
            "SELECT * FROM webauthn_credentials WHERE credential_id = ?", (credential_id,)
        ) as cursor:
            row = await cursor.fetchone()
        if row is None:
            return None
        return self._row_to_webauthn(row)

    async def get_password_by_user(self, user_id: str) -> PasswordCredential | None:
        async with self._db.execute(
            "SELECT * FROM password_credentials WHERE user_id = ?", (user_id,)
        ) as cursor:
            row = await cursor.fetchone()
        if row is None:
            return None
        return self._row_to_password(row)

    async def update_webauthn(self, credential: WebAuthnCredential) -> WebAuthnCredential:
        await self._db.execute(
            "UPDATE webauthn_credentials SET sign_count = ?, device_name = ? WHERE user_id = ? AND credential_id = ?",
            (credential.sign_count, credential.device_name, credential.user_id, credential.credential_id),
        )
        await self._db.commit()
        return credential

    async def delete_webauthn(self, user_id: str, credential_id: bytes) -> bool:
        cursor = await self._db.execute(
            "DELETE FROM webauthn_credentials WHERE user_id = ? AND credential_id = ?",
            (user_id, credential_id),
        )
        await self._db.commit()
        return cursor.rowcount > 0

    async def delete_password(self, user_id: str) -> bool:
        cursor = await self._db.execute(
            "DELETE FROM password_credentials WHERE user_id = ?", (user_id,)
        )
        await self._db.commit()
        return cursor.rowcount > 0

Step 4: Run tests to verify they pass

Run: pytest tests/test_store/test_sqlite_credential_repo.py -v Expected: All PASSED

Step 5: Run quality gate

Run: ./scripts/check.sh Expected: All green

Step 6: Commit

git add src/fastapi_oidc_op/store/sqlite/repositories.py tests/test_store/test_sqlite_credential_repo.py
git commit -m "feat: add SQLiteCredentialRepository with tests"

Task 6: SQLiteMagicLinkRepository

Files:

  • Modify: src/fastapi_oidc_op/store/sqlite/repositories.py
  • Create: tests/test_store/test_sqlite_magic_link_repo.py

Step 1: Write the failing tests

# tests/test_store/test_sqlite_magic_link_repo.py
from datetime import UTC, datetime, timedelta

import aiosqlite
import pytest

from fastapi_oidc_op.models import MagicLink
from fastapi_oidc_op.store.exceptions import DuplicateError
from fastapi_oidc_op.store.protocols import MagicLinkRepository
from fastapi_oidc_op.store.sqlite.repositories import SQLiteMagicLinkRepository


@pytest.fixture
def magic_link_repo(db: aiosqlite.Connection) -> SQLiteMagicLinkRepository:
    return SQLiteMagicLinkRepository(db)


def _make_link(**overrides) -> MagicLink:
    defaults = {
        "token": "abc123",
        "username": "alice",
        "expires_at": datetime.now(UTC) + timedelta(hours=24),
    }
    defaults.update(overrides)
    return MagicLink(**defaults)


async def test_implements_protocol(magic_link_repo: SQLiteMagicLinkRepository) -> None:
    assert isinstance(magic_link_repo, MagicLinkRepository)


async def test_create_and_get_by_token(magic_link_repo: SQLiteMagicLinkRepository) -> None:
    link = _make_link()
    created = await magic_link_repo.create(link)
    assert created.token == "abc123"

    fetched = await magic_link_repo.get_by_token("abc123")
    assert fetched is not None
    assert fetched.token == "abc123"
    assert fetched.username == "alice"
    assert fetched.used is False


async def test_get_by_token_not_found(magic_link_repo: SQLiteMagicLinkRepository) -> None:
    result = await magic_link_repo.get_by_token("nonexistent")
    assert result is None


async def test_mark_used(magic_link_repo: SQLiteMagicLinkRepository) -> None:
    link = _make_link()
    await magic_link_repo.create(link)

    marked = await magic_link_repo.mark_used("abc123")
    assert marked is True

    fetched = await magic_link_repo.get_by_token("abc123")
    assert fetched is not None
    assert fetched.used is True


async def test_mark_used_not_found(magic_link_repo: SQLiteMagicLinkRepository) -> None:
    marked = await magic_link_repo.mark_used("nonexistent")
    assert marked is False


async def test_delete_expired(magic_link_repo: SQLiteMagicLinkRepository) -> None:
    # Create an expired link
    expired = _make_link(token="expired", expires_at=datetime.now(UTC) - timedelta(hours=1))
    await magic_link_repo.create(expired)

    # Create a valid link
    valid = _make_link(token="valid", expires_at=datetime.now(UTC) + timedelta(hours=24))
    await magic_link_repo.create(valid)

    count = await magic_link_repo.delete_expired()
    assert count == 1

    # Expired should be gone
    assert await magic_link_repo.get_by_token("expired") is None
    # Valid should remain
    assert await magic_link_repo.get_by_token("valid") is not None


async def test_delete_expired_skips_used(magic_link_repo: SQLiteMagicLinkRepository) -> None:
    # Create an expired but used link
    link = _make_link(token="used-expired", expires_at=datetime.now(UTC) - timedelta(hours=1))
    await magic_link_repo.create(link)
    await magic_link_repo.mark_used("used-expired")

    count = await magic_link_repo.delete_expired()
    assert count == 0


async def test_create_with_optional_fields(magic_link_repo: SQLiteMagicLinkRepository) -> None:
    link = _make_link(created_by="admin", note="Welcome aboard")
    await magic_link_repo.create(link)

    fetched = await magic_link_repo.get_by_token("abc123")
    assert fetched is not None
    assert fetched.created_by == "admin"
    assert fetched.note == "Welcome aboard"


async def test_create_duplicate_token(magic_link_repo: SQLiteMagicLinkRepository) -> None:
    await magic_link_repo.create(_make_link())

    with pytest.raises(DuplicateError):
        await magic_link_repo.create(_make_link())

Step 2: Run tests to verify they fail

Run: pytest tests/test_store/test_sqlite_magic_link_repo.py -v Expected: FAIL — cannot import SQLiteMagicLinkRepository

Step 3: Add SQLiteMagicLinkRepository to repositories.py

Append to src/fastapi_oidc_op/store/sqlite/repositories.py:

class SQLiteMagicLinkRepository:
    def __init__(self, db: aiosqlite.Connection) -> None:
        self._db = db

    def _row_to_magic_link(self, row: aiosqlite.Row) -> MagicLink:
        return MagicLink(
            token=row["token"],
            username=row["username"],
            expires_at=datetime.fromisoformat(row["expires_at"]),
            used=bool(row["used"]),
            created_by=row["created_by"],
            note=row["note"],
        )

    async def create(self, link: MagicLink) -> MagicLink:
        try:
            await self._db.execute(
                "INSERT INTO magic_links (token, username, expires_at, used, created_by, note) VALUES (?, ?, ?, ?, ?, ?)",
                (
                    link.token,
                    link.username,
                    link.expires_at.isoformat(),
                    int(link.used),
                    link.created_by,
                    link.note,
                ),
            )
            await self._db.commit()
        except aiosqlite.IntegrityError as e:
            raise DuplicateError(str(e)) from e
        return link

    async def get_by_token(self, token: str) -> MagicLink | None:
        async with self._db.execute("SELECT * FROM magic_links WHERE token = ?", (token,)) as cursor:
            row = await cursor.fetchone()
        if row is None:
            return None
        return self._row_to_magic_link(row)

    async def mark_used(self, token: str) -> bool:
        cursor = await self._db.execute(
            "UPDATE magic_links SET used = 1 WHERE token = ?", (token,)
        )
        await self._db.commit()
        return cursor.rowcount > 0

    async def delete_expired(self) -> int:
        now = datetime.now(UTC).isoformat()
        cursor = await self._db.execute(
            "DELETE FROM magic_links WHERE expires_at < ? AND used = 0", (now,)
        )
        await self._db.commit()
        return cursor.rowcount

Step 4: Run tests to verify they pass

Run: pytest tests/test_store/test_sqlite_magic_link_repo.py -v Expected: All PASSED

Step 5: Run quality gate

Run: ./scripts/check.sh Expected: All green

Step 6: Commit

git add src/fastapi_oidc_op/store/sqlite/repositories.py tests/test_store/test_sqlite_magic_link_repo.py
git commit -m "feat: add SQLiteMagicLinkRepository with tests"

Task 7: Lifespan Integration and Dependencies

Files:

  • Modify: src/fastapi_oidc_op/app.py
  • Create: src/fastapi_oidc_op/dependencies.py
  • Modify: tests/conftest.py
  • Modify: tests/test_app.py

Step 1: Write the failing tests

Add to tests/test_app.py:

# Add these tests to the existing file

async def test_app_has_repos_on_state(client: AsyncClient) -> None:
    """Repos should be available on app.state after lifespan startup."""
    from fastapi_oidc_op.store.protocols import (
        CredentialRepository,
        MagicLinkRepository,
        UserRepository,
    )

    app = client._transport.app  # type: ignore[union-attr]
    assert isinstance(app.state.user_repo, UserRepository)
    assert isinstance(app.state.credential_repo, CredentialRepository)
    assert isinstance(app.state.magic_link_repo, MagicLinkRepository)


async def test_dependency_functions() -> None:
    """Dependency functions should return Protocol-typed repos."""
    from unittest.mock import MagicMock

    from fastapi_oidc_op.dependencies import (
        get_credential_repo,
        get_magic_link_repo,
        get_user_repo,
    )

    request = MagicMock()
    request.app.state.user_repo = "user_repo_sentinel"
    request.app.state.credential_repo = "credential_repo_sentinel"
    request.app.state.magic_link_repo = "magic_link_repo_sentinel"

    assert get_user_repo(request) == "user_repo_sentinel"
    assert get_credential_repo(request) == "credential_repo_sentinel"
    assert get_magic_link_repo(request) == "magic_link_repo_sentinel"

Update tests/conftest.py so the client fixture uses lifespan (the app needs to run its lifespan to initialize repos). The settings fixture should use :memory: for SQLite:

# tests/conftest.py
from collections.abc import AsyncIterator

import pytest
from httpx import ASGITransport, AsyncClient

from fastapi_oidc_op.app import create_app
from fastapi_oidc_op.config import Settings


@pytest.fixture
def settings() -> Settings:
    return Settings(issuer="http://localhost:8000", sqlite_path=":memory:")


@pytest.fixture
async def client(settings: Settings) -> AsyncIterator[AsyncClient]:
    app = create_app(settings)
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url=settings.issuer) as ac:
        yield ac

Step 2: Run tests to verify they fail

Run: pytest tests/test_app.py -v Expected: FAIL — app has no lifespan, no repos on state

Step 3: Implement the lifespan in app.py

# src/fastapi_oidc_op/app.py
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from pathlib import Path

import aiosqlite
from fastapi import FastAPI

from fastapi_oidc_op.config import Settings, StorageBackend
from fastapi_oidc_op.store.sqlite.migrations import run_migrations
from fastapi_oidc_op.store.sqlite.repositories import (
    SQLiteCredentialRepository,
    SQLiteMagicLinkRepository,
    SQLiteUserRepository,
)

MIGRATIONS_DIR = Path(__file__).parent / "store" / "sqlite" / "migrations"


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    settings: Settings = app.state.settings
    if settings.storage_backend == StorageBackend.SQLITE:
        # Ensure parent directory exists (skip for :memory:)
        if settings.sqlite_path != ":memory:":
            Path(settings.sqlite_path).parent.mkdir(parents=True, exist_ok=True)
        db = await aiosqlite.connect(settings.sqlite_path)
        db.row_factory = aiosqlite.Row
        await db.execute("PRAGMA journal_mode=WAL")
        await db.execute("PRAGMA foreign_keys=ON")
        await run_migrations(db, MIGRATIONS_DIR)
        app.state.user_repo = SQLiteUserRepository(db)
        app.state.credential_repo = SQLiteCredentialRepository(db)
        app.state.magic_link_repo = SQLiteMagicLinkRepository(db)
        yield
        await db.close()
    else:
        raise NotImplementedError("MongoDB backend not yet implemented")


def create_app(settings: Settings | None = None) -> FastAPI:
    if settings is None:
        settings = Settings()  # type: ignore[call-arg]

    app = FastAPI(
        title="FastAPI OIDC OP",
        version="0.1.0",
        docs_url="/docs" if settings.debug else None,
        redoc_url=None,
        lifespan=lifespan,
    )

    app.state.settings = settings

    @app.get("/health")
    async def health() -> dict[str, str]:
        return {"status": "ok"}

    return app

Step 4: Implement dependencies.py

# src/fastapi_oidc_op/dependencies.py
from fastapi import Request

from fastapi_oidc_op.store.protocols import (
    CredentialRepository,
    MagicLinkRepository,
    UserRepository,
)


def get_user_repo(request: Request) -> UserRepository:
    return request.app.state.user_repo


def get_credential_repo(request: Request) -> CredentialRepository:
    return request.app.state.credential_repo


def get_magic_link_repo(request: Request) -> MagicLinkRepository:
    return request.app.state.magic_link_repo

Step 5: Run tests to verify they pass

Run: pytest tests/test_app.py -v Expected: All PASSED

Step 6: Run full quality gate

Run: ./scripts/check.sh Expected: All green (all existing tests still pass with the updated conftest)

Step 7: Commit

git add src/fastapi_oidc_op/app.py src/fastapi_oidc_op/dependencies.py tests/conftest.py tests/test_app.py
git commit -m "feat: add lifespan integration and dependency injection"

Task 8: Update Design Document

Files:

  • Modify: docs/plans/2026-02-12-sqlite-repositories-design.md

Step 1: Update the schema status and next steps

Change the schema status line from:

Status: Schema section was presented to user and NOT YET explicitly approved. User requested shutdown before responding. Ask for confirmation before proceeding.

To:

Status: Schema approved. Implementation complete.

Update the "Next Steps" section to reflect completion and point to the next phase (Authentication).

Step 2: Commit

git add docs/plans/2026-02-12-sqlite-repositories-design.md
git commit -m "docs: update sqlite design doc to reflect completed implementation"