184 lines
6.4 KiB
Python
184 lines
6.4 KiB
Python
import aiosqlite
|
|
import pytest
|
|
|
|
from porchlight.models import PasswordCredential, User, WebAuthnCredential
|
|
from porchlight.store.exceptions import DuplicateError
|
|
from porchlight.store.protocols import CredentialRepository
|
|
from porchlight.store.sqlite.repositories import (
|
|
SQLiteCredentialRepository,
|
|
SQLiteUserRepository,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def user_repo(db: aiosqlite.Connection) -> SQLiteUserRepository:
|
|
return SQLiteUserRepository(db)
|
|
|
|
|
|
@pytest.fixture
|
|
def credential_repo(db: aiosqlite.Connection) -> SQLiteCredentialRepository:
|
|
return SQLiteCredentialRepository(db)
|
|
|
|
|
|
@pytest.fixture
|
|
async def alice(user_repo: SQLiteUserRepository) -> User:
|
|
return await user_repo.create(User(userid="lusab-bansen", username="alice"))
|
|
|
|
|
|
async def test_implements_protocol(credential_repo: SQLiteCredentialRepository) -> None:
|
|
assert isinstance(credential_repo, CredentialRepository)
|
|
|
|
|
|
async def test_create_and_get_webauthn(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
|
|
cred = WebAuthnCredential(
|
|
user_id=alice.userid,
|
|
credential_id=b"\x01\x02\x03",
|
|
public_key=b"\x04\x05\x06",
|
|
device_name="YubiKey",
|
|
)
|
|
created = await credential_repo.create_webauthn(cred)
|
|
assert created.user_id == alice.userid
|
|
|
|
creds = await credential_repo.get_webauthn_by_user(alice.userid)
|
|
assert len(creds) == 1
|
|
assert creds[0].credential_id == b"\x01\x02\x03"
|
|
assert creds[0].public_key == b"\x04\x05\x06"
|
|
assert creds[0].device_name == "YubiKey"
|
|
|
|
|
|
async def test_get_webauthn_by_credential_id(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
|
|
cred = WebAuthnCredential(
|
|
user_id=alice.userid,
|
|
credential_id=b"\x01\x02\x03",
|
|
public_key=b"\x04\x05\x06",
|
|
)
|
|
await credential_repo.create_webauthn(cred)
|
|
|
|
fetched = await credential_repo.get_webauthn_by_credential_id(b"\x01\x02\x03")
|
|
assert fetched is not None
|
|
assert fetched.user_id == alice.userid
|
|
|
|
|
|
async def test_get_webauthn_by_credential_id_not_found(credential_repo: SQLiteCredentialRepository) -> None:
|
|
result = await credential_repo.get_webauthn_by_credential_id(b"\xff\xff")
|
|
assert result is None
|
|
|
|
|
|
async def test_multiple_webauthn_per_user(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
|
|
for i in range(3):
|
|
cred = WebAuthnCredential(
|
|
user_id=alice.userid,
|
|
credential_id=bytes([i]),
|
|
public_key=b"\x00",
|
|
device_name=f"Key {i}",
|
|
)
|
|
await credential_repo.create_webauthn(cred)
|
|
|
|
creds = await credential_repo.get_webauthn_by_user(alice.userid)
|
|
assert len(creds) == 3
|
|
|
|
|
|
async def test_update_webauthn(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
|
|
cred = WebAuthnCredential(
|
|
user_id=alice.userid,
|
|
credential_id=b"\x01\x02\x03",
|
|
public_key=b"\x04\x05\x06",
|
|
sign_count=0,
|
|
device_name="Old Name",
|
|
)
|
|
await credential_repo.create_webauthn(cred)
|
|
|
|
cred.sign_count = 42
|
|
cred.device_name = "New Name"
|
|
updated = await credential_repo.update_webauthn(cred)
|
|
assert updated.sign_count == 42
|
|
assert updated.device_name == "New Name"
|
|
|
|
fetched = await credential_repo.get_webauthn_by_credential_id(b"\x01\x02\x03")
|
|
assert fetched is not None
|
|
assert fetched.sign_count == 42
|
|
assert fetched.device_name == "New Name"
|
|
|
|
|
|
async def test_update_webauthn_not_found(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
|
|
cred = WebAuthnCredential(
|
|
user_id=alice.userid,
|
|
credential_id=b"\x01\x02\x03",
|
|
public_key=b"\x04\x05\x06",
|
|
)
|
|
with pytest.raises(ValueError, match="WebAuthn credential not found"):
|
|
await credential_repo.update_webauthn(cred)
|
|
|
|
|
|
async def test_delete_webauthn(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
|
|
cred = WebAuthnCredential(
|
|
user_id=alice.userid,
|
|
credential_id=b"\x01\x02\x03",
|
|
public_key=b"\x04\x05\x06",
|
|
)
|
|
await credential_repo.create_webauthn(cred)
|
|
|
|
deleted = await credential_repo.delete_webauthn(alice.userid, b"\x01\x02\x03")
|
|
assert deleted is True
|
|
|
|
creds = await credential_repo.get_webauthn_by_user(alice.userid)
|
|
assert len(creds) == 0
|
|
|
|
|
|
async def test_delete_webauthn_not_found(credential_repo: SQLiteCredentialRepository) -> None:
|
|
deleted = await credential_repo.delete_webauthn("nobody", b"\xff")
|
|
assert deleted is False
|
|
|
|
|
|
async def test_create_and_get_password(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
|
|
cred = PasswordCredential(
|
|
user_id=alice.userid,
|
|
password_hash="$argon2id$v=19$m=65536,t=3,p=4$hash",
|
|
)
|
|
created = await credential_repo.create_password(cred)
|
|
assert created.user_id == alice.userid
|
|
|
|
fetched = await credential_repo.get_password_by_user(alice.userid)
|
|
assert fetched is not None
|
|
assert fetched.password_hash == "$argon2id$v=19$m=65536,t=3,p=4$hash"
|
|
|
|
|
|
async def test_get_password_not_found(credential_repo: SQLiteCredentialRepository) -> None:
|
|
result = await credential_repo.get_password_by_user("nobody")
|
|
assert result is None
|
|
|
|
|
|
async def test_delete_password(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
|
|
cred = PasswordCredential(
|
|
user_id=alice.userid,
|
|
password_hash="$argon2id$v=19$hash",
|
|
)
|
|
await credential_repo.create_password(cred)
|
|
|
|
deleted = await credential_repo.delete_password(alice.userid)
|
|
assert deleted is True
|
|
|
|
fetched = await credential_repo.get_password_by_user(alice.userid)
|
|
assert fetched is None
|
|
|
|
|
|
async def test_delete_password_not_found(credential_repo: SQLiteCredentialRepository) -> None:
|
|
deleted = await credential_repo.delete_password("nobody")
|
|
assert deleted is False
|
|
|
|
|
|
async def test_create_duplicate_password(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
|
|
cred = PasswordCredential(user_id=alice.userid, password_hash="hash1")
|
|
await credential_repo.create_password(cred)
|
|
|
|
with pytest.raises(DuplicateError):
|
|
cred2 = PasswordCredential(user_id=alice.userid, password_hash="hash2")
|
|
await credential_repo.create_password(cred2)
|
|
|
|
|
|
async def test_create_duplicate_webauthn(credential_repo: SQLiteCredentialRepository, alice: User) -> None:
|
|
cred = WebAuthnCredential(user_id=alice.userid, credential_id=b"\x01", public_key=b"\x02")
|
|
await credential_repo.create_webauthn(cred)
|
|
|
|
with pytest.raises(DuplicateError):
|
|
await credential_repo.create_webauthn(cred)
|