194 lines
6.3 KiB
Python
194 lines
6.3 KiB
Python
import aiosqlite
|
|
import pytest
|
|
|
|
from porchlight.models import User
|
|
from porchlight.store.exceptions import DuplicateError
|
|
from porchlight.store.protocols import UserRepository
|
|
from porchlight.store.sqlite.repositories import SQLiteUserRepository
|
|
|
|
|
|
@pytest.fixture
|
|
def user_repo(db: aiosqlite.Connection) -> SQLiteUserRepository:
|
|
return SQLiteUserRepository(db)
|
|
|
|
|
|
def _make_user(**overrides) -> User:
|
|
defaults = {"userid": "lusab-bansen", "username": "alice"}
|
|
defaults.update(overrides)
|
|
return User(**defaults)
|
|
|
|
|
|
async def test_implements_protocol(user_repo: SQLiteUserRepository) -> None:
|
|
assert isinstance(user_repo, UserRepository)
|
|
|
|
|
|
async def test_create_and_get_by_userid(user_repo: SQLiteUserRepository) -> None:
|
|
user = _make_user()
|
|
created = await user_repo.create(user)
|
|
assert created.userid == "lusab-bansen"
|
|
assert created.username == "alice"
|
|
|
|
fetched = await user_repo.get_by_userid("lusab-bansen")
|
|
assert fetched is not None
|
|
assert fetched.userid == "lusab-bansen"
|
|
assert fetched.username == "alice"
|
|
|
|
|
|
async def test_get_by_username(user_repo: SQLiteUserRepository) -> None:
|
|
user = _make_user()
|
|
await user_repo.create(user)
|
|
|
|
fetched = await user_repo.get_by_username("alice")
|
|
assert fetched is not None
|
|
assert fetched.username == "alice"
|
|
|
|
|
|
async def test_get_by_userid_not_found(user_repo: SQLiteUserRepository) -> None:
|
|
result = await user_repo.get_by_userid("nonexistent")
|
|
assert result is None
|
|
|
|
|
|
async def test_get_by_username_not_found(user_repo: SQLiteUserRepository) -> None:
|
|
result = await user_repo.get_by_username("nonexistent")
|
|
assert result is None
|
|
|
|
|
|
async def test_create_with_groups(user_repo: SQLiteUserRepository) -> None:
|
|
user = _make_user(groups=["admin", "users"])
|
|
await user_repo.create(user)
|
|
|
|
fetched = await user_repo.get_by_userid("lusab-bansen")
|
|
assert fetched is not None
|
|
assert sorted(fetched.groups) == ["admin", "users"]
|
|
|
|
|
|
async def test_update(user_repo: SQLiteUserRepository) -> None:
|
|
user = _make_user()
|
|
await user_repo.create(user)
|
|
|
|
user.email = "alice@example.com"
|
|
user.given_name = "Alice"
|
|
updated = await user_repo.update(user)
|
|
assert updated.email == "alice@example.com"
|
|
assert updated.given_name == "Alice"
|
|
|
|
fetched = await user_repo.get_by_userid("lusab-bansen")
|
|
assert fetched is not None
|
|
assert fetched.email == "alice@example.com"
|
|
|
|
|
|
async def test_update_not_found(user_repo: SQLiteUserRepository) -> None:
|
|
user = _make_user()
|
|
with pytest.raises(ValueError, match="User not found"):
|
|
await user_repo.update(user)
|
|
|
|
|
|
async def test_update_does_not_mutate_input(user_repo: SQLiteUserRepository) -> None:
|
|
user = _make_user()
|
|
await user_repo.create(user)
|
|
original_updated_at = user.updated_at
|
|
|
|
await user_repo.update(user)
|
|
assert user.updated_at == original_updated_at
|
|
|
|
|
|
async def test_update_groups(user_repo: SQLiteUserRepository) -> None:
|
|
user = _make_user(groups=["users"])
|
|
await user_repo.create(user)
|
|
|
|
user.groups = ["admin", "editors"]
|
|
await user_repo.update(user)
|
|
|
|
fetched = await user_repo.get_by_userid("lusab-bansen")
|
|
assert fetched is not None
|
|
assert sorted(fetched.groups) == ["admin", "editors"]
|
|
|
|
|
|
async def test_list_users(user_repo: SQLiteUserRepository) -> None:
|
|
await user_repo.create(_make_user(userid="id-1", username="alice"))
|
|
await user_repo.create(_make_user(userid="id-2", username="bob"))
|
|
await user_repo.create(_make_user(userid="id-3", username="charlie"))
|
|
|
|
users = await user_repo.list_users()
|
|
assert len(users) == 3
|
|
|
|
|
|
async def test_list_users_pagination(user_repo: SQLiteUserRepository) -> None:
|
|
for i in range(5):
|
|
await user_repo.create(_make_user(userid=f"id-{i}", username=f"user-{i}"))
|
|
|
|
page1 = await user_repo.list_users(offset=0, limit=2)
|
|
page2 = await user_repo.list_users(offset=2, limit=2)
|
|
page3 = await user_repo.list_users(offset=4, limit=2)
|
|
assert len(page1) == 2
|
|
assert len(page2) == 2
|
|
assert len(page3) == 1
|
|
|
|
|
|
async def test_delete(user_repo: SQLiteUserRepository) -> None:
|
|
user = _make_user()
|
|
await user_repo.create(user)
|
|
|
|
deleted = await user_repo.delete("lusab-bansen")
|
|
assert deleted is True
|
|
|
|
fetched = await user_repo.get_by_userid("lusab-bansen")
|
|
assert fetched is None
|
|
|
|
|
|
async def test_delete_not_found(user_repo: SQLiteUserRepository) -> None:
|
|
deleted = await user_repo.delete("nonexistent")
|
|
assert deleted is False
|
|
|
|
|
|
async def test_delete_cascades_groups(user_repo: SQLiteUserRepository) -> None:
|
|
user = _make_user(groups=["admin"])
|
|
await user_repo.create(user)
|
|
|
|
await user_repo.delete("lusab-bansen")
|
|
|
|
async with user_repo._db.execute("SELECT COUNT(*) FROM user_groups WHERE userid = ?", ("lusab-bansen",)) as cursor:
|
|
row = await cursor.fetchone()
|
|
assert row[0] == 0
|
|
|
|
|
|
async def test_create_duplicate_username(user_repo: SQLiteUserRepository) -> None:
|
|
await user_repo.create(_make_user())
|
|
|
|
with pytest.raises(DuplicateError):
|
|
await user_repo.create(_make_user(userid="different-id", username="alice"))
|
|
|
|
|
|
async def test_roundtrip_preserves_all_fields(user_repo: SQLiteUserRepository) -> None:
|
|
user = _make_user(
|
|
preferred_username="ally",
|
|
given_name="Alice",
|
|
family_name="Smith",
|
|
nickname="Al",
|
|
email="alice@example.com",
|
|
email_verified=True,
|
|
phone_number="+1234567890",
|
|
phone_number_verified=True,
|
|
picture="https://example.com/alice.jpg",
|
|
locale="en-US",
|
|
active=False,
|
|
groups=["admin", "users"],
|
|
)
|
|
await user_repo.create(user)
|
|
|
|
fetched = await user_repo.get_by_userid("lusab-bansen")
|
|
assert fetched is not None
|
|
assert fetched.preferred_username == "ally"
|
|
assert fetched.given_name == "Alice"
|
|
assert fetched.family_name == "Smith"
|
|
assert fetched.nickname == "Al"
|
|
assert fetched.email == "alice@example.com"
|
|
assert fetched.email_verified is True
|
|
assert fetched.phone_number == "+1234567890"
|
|
assert fetched.phone_number_verified is True
|
|
assert fetched.picture == "https://example.com/alice.jpg"
|
|
assert fetched.locale == "en-US"
|
|
assert fetched.active is False
|
|
assert sorted(fetched.groups) == ["admin", "users"]
|
|
assert fetched.created_at == user.created_at
|
|
assert fetched.updated_at == user.updated_at
|