112 lines
3.8 KiB
Python
112 lines
3.8 KiB
Python
from datetime import UTC, datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import aiosqlite
|
|
import pytest
|
|
|
|
from fastapi_oidc_op.invite.service import MagicLinkService
|
|
from fastapi_oidc_op.store.sqlite.migrations import run_migrations
|
|
from fastapi_oidc_op.store.sqlite.repositories import SQLiteMagicLinkRepository
|
|
|
|
MIGRATIONS_DIR = (
|
|
Path(__file__).resolve().parent.parent.parent / "src" / "fastapi_oidc_op" / "store" / "sqlite" / "migrations"
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
async def db():
|
|
conn = await aiosqlite.connect(":memory:")
|
|
conn.row_factory = aiosqlite.Row
|
|
await conn.execute("PRAGMA foreign_keys=ON")
|
|
await run_migrations(conn, MIGRATIONS_DIR)
|
|
yield conn
|
|
await conn.close()
|
|
|
|
|
|
@pytest.fixture
|
|
def repo(db: aiosqlite.Connection) -> SQLiteMagicLinkRepository:
|
|
return SQLiteMagicLinkRepository(db)
|
|
|
|
|
|
@pytest.fixture
|
|
def service(repo: SQLiteMagicLinkRepository) -> MagicLinkService:
|
|
return MagicLinkService(repo=repo, ttl=3600)
|
|
|
|
|
|
async def test_create_returns_magic_link(service: MagicLinkService) -> None:
|
|
link = await service.create(username="alice")
|
|
assert link.username == "alice"
|
|
assert link.token # non-empty
|
|
assert link.used is False
|
|
assert link.expires_at > datetime.now(UTC)
|
|
|
|
|
|
async def test_create_generates_unique_tokens(service: MagicLinkService) -> None:
|
|
link1 = await service.create(username="alice")
|
|
link2 = await service.create(username="bob")
|
|
assert link1.token != link2.token
|
|
|
|
|
|
async def test_create_with_optional_fields(service: MagicLinkService) -> None:
|
|
link = await service.create(username="alice", created_by="admin-id", note="Welcome")
|
|
assert link.created_by == "admin-id"
|
|
assert link.note == "Welcome"
|
|
|
|
|
|
async def test_create_respects_ttl(service: MagicLinkService) -> None:
|
|
link = await service.create(username="alice")
|
|
expected_min = datetime.now(UTC) + timedelta(seconds=3500)
|
|
expected_max = datetime.now(UTC) + timedelta(seconds=3700)
|
|
assert expected_min < link.expires_at < expected_max
|
|
|
|
|
|
async def test_validate_valid_token(service: MagicLinkService) -> None:
|
|
link = await service.create(username="alice")
|
|
result = await service.validate(link.token)
|
|
assert result is not None
|
|
assert result.token == link.token
|
|
assert result.username == "alice"
|
|
|
|
|
|
async def test_validate_nonexistent_token(service: MagicLinkService) -> None:
|
|
result = await service.validate("nonexistent-token")
|
|
assert result is None
|
|
|
|
|
|
async def test_validate_used_token(service: MagicLinkService) -> None:
|
|
link = await service.create(username="alice")
|
|
await service.mark_used(link.token)
|
|
result = await service.validate(link.token)
|
|
assert result is None
|
|
|
|
|
|
async def test_validate_expired_token(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None:
|
|
# Create a link that's already expired by using a negative TTL
|
|
expired_service = MagicLinkService(repo=repo, ttl=-1)
|
|
link = await expired_service.create(username="alice")
|
|
# The link expires_at is essentially now, so by the time we validate it should be expired
|
|
result = await service.validate(link.token)
|
|
assert result is None
|
|
|
|
|
|
async def test_mark_used_returns_true(service: MagicLinkService) -> None:
|
|
link = await service.create(username="alice")
|
|
result = await service.mark_used(link.token)
|
|
assert result is True
|
|
|
|
|
|
async def test_mark_used_nonexistent_returns_false(service: MagicLinkService) -> None:
|
|
result = await service.mark_used("nonexistent")
|
|
assert result is False
|
|
|
|
|
|
async def test_cleanup_expired(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None:
|
|
# Create an expired link
|
|
expired_service = MagicLinkService(repo=repo, ttl=-3600)
|
|
await expired_service.create(username="expired-user")
|
|
|
|
# Create a valid link
|
|
await service.create(username="valid-user")
|
|
|
|
count = await service.cleanup_expired()
|
|
assert count == 1
|