feat: add MagicLinkService with token create/validate/cleanup

This commit is contained in:
Johan Lundberg 2026-02-13 14:58:10 +01:00
parent 872001c6de
commit 4774ae3c2f
No known key found for this signature in database
GPG key ID: A6C152738D03C7D1
3 changed files with 164 additions and 0 deletions

View file

@ -0,0 +1,52 @@
import secrets
from datetime import UTC, datetime, timedelta
from fastapi_oidc_op.models import MagicLink
from fastapi_oidc_op.store.protocols import MagicLinkRepository
class MagicLinkService:
"""Magic link token lifecycle management."""
def __init__(self, repo: MagicLinkRepository, ttl: int = 86400) -> None:
self._repo = repo
self._ttl = ttl
async def create(
self,
username: str,
created_by: str | None = None,
note: str | None = None,
) -> MagicLink:
"""Generate and store a new magic link for the given username."""
token = secrets.token_urlsafe(32)
expires_at = datetime.now(UTC) + timedelta(seconds=self._ttl)
link = MagicLink(
token=token,
username=username,
expires_at=expires_at,
created_by=created_by,
note=note,
)
return await self._repo.create(link)
async def validate(self, token: str) -> MagicLink | None:
"""Validate a magic link token.
Returns the MagicLink if it exists, is not used, and has not expired.
Returns None otherwise.
"""
link = await self._repo.get_by_token(token)
if link is None or link.used:
return None
if link.expires_at < datetime.now(UTC):
return None
return link
async def mark_used(self, token: str) -> bool:
"""Mark a magic link as used. Returns True if found and marked."""
return await self._repo.mark_used(token)
async def cleanup_expired(self) -> int:
"""Delete expired unused links. Returns count deleted."""
return await self._repo.delete_expired()

View file

View file

@ -0,0 +1,112 @@
from datetime import UTC, datetime, timedelta
from pathlib import Path
import aiosqlite
import pytest
from fastapi_oidc_op.invite.service import MagicLinkService
from fastapi_oidc_op.store.sqlite.migrations import run_migrations
from fastapi_oidc_op.store.sqlite.repositories import SQLiteMagicLinkRepository
MIGRATIONS_DIR = (
Path(__file__).resolve().parent.parent.parent / "src" / "fastapi_oidc_op" / "store" / "sqlite" / "migrations"
)
@pytest.fixture
async def db():
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON")
await run_migrations(conn, MIGRATIONS_DIR)
yield conn
await conn.close()
@pytest.fixture
def repo(db: aiosqlite.Connection) -> SQLiteMagicLinkRepository:
return SQLiteMagicLinkRepository(db)
@pytest.fixture
def service(repo: SQLiteMagicLinkRepository) -> MagicLinkService:
return MagicLinkService(repo=repo, ttl=3600)
async def test_create_returns_magic_link(service: MagicLinkService) -> None:
link = await service.create(username="alice")
assert link.username == "alice"
assert link.token # non-empty
assert link.used is False
assert link.expires_at > datetime.now(UTC)
async def test_create_generates_unique_tokens(service: MagicLinkService) -> None:
link1 = await service.create(username="alice")
link2 = await service.create(username="bob")
assert link1.token != link2.token
async def test_create_with_optional_fields(service: MagicLinkService) -> None:
link = await service.create(username="alice", created_by="admin-id", note="Welcome")
assert link.created_by == "admin-id"
assert link.note == "Welcome"
async def test_create_respects_ttl(service: MagicLinkService) -> None:
link = await service.create(username="alice")
expected_min = datetime.now(UTC) + timedelta(seconds=3500)
expected_max = datetime.now(UTC) + timedelta(seconds=3700)
assert expected_min < link.expires_at < expected_max
async def test_validate_valid_token(service: MagicLinkService) -> None:
link = await service.create(username="alice")
result = await service.validate(link.token)
assert result is not None
assert result.token == link.token
assert result.username == "alice"
async def test_validate_nonexistent_token(service: MagicLinkService) -> None:
result = await service.validate("nonexistent-token")
assert result is None
async def test_validate_used_token(service: MagicLinkService) -> None:
link = await service.create(username="alice")
await service.mark_used(link.token)
result = await service.validate(link.token)
assert result is None
async def test_validate_expired_token(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None:
# Create a link that's already expired by using a negative TTL
expired_service = MagicLinkService(repo=repo, ttl=-1)
link = await expired_service.create(username="alice")
# The link expires_at is essentially now, so by the time we validate it should be expired
result = await service.validate(link.token)
assert result is None
async def test_mark_used_returns_true(service: MagicLinkService) -> None:
link = await service.create(username="alice")
result = await service.mark_used(link.token)
assert result is True
async def test_mark_used_nonexistent_returns_false(service: MagicLinkService) -> None:
result = await service.mark_used("nonexistent")
assert result is False
async def test_cleanup_expired(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None:
# Create an expired link
expired_service = MagicLinkService(repo=repo, ttl=-3600)
await expired_service.create(username="expired-user")
# Create a valid link
await service.create(username="valid-user")
count = await service.cleanup_expired()
assert count == 1