feat: add Consent model, migration, and repository

This commit is contained in:
Johan Lundberg 2026-02-18 14:41:32 +01:00
parent 16f3e039d9
commit 9ccc6c885f
No known key found for this signature in database
GPG key ID: A6C152738D03C7D1
7 changed files with 200 additions and 3 deletions

View file

@ -56,3 +56,11 @@ class MagicLink(BaseModel):
used: bool = False
created_by: str | None = None
note: str | None = None
class Consent(BaseModel):
userid: str
client_id: str
scopes: list[str]
created_at: datetime = Field(default_factory=_utcnow)
updated_at: datetime = Field(default_factory=_utcnow)

View file

@ -1,6 +1,7 @@
from typing import Protocol, runtime_checkable
from porchlight.models import (
Consent,
MagicLink,
PasswordCredential,
User,
@ -51,3 +52,14 @@ class MagicLinkRepository(Protocol):
async def mark_used(self, token: str) -> bool: ...
async def delete_expired(self) -> int: ...
@runtime_checkable
class ConsentRepository(Protocol):
async def get_consent(self, userid: str, client_id: str) -> Consent | None: ...
async def set_consent(self, userid: str, client_id: str, scopes: list[str]) -> None: ...
async def delete_consent(self, userid: str, client_id: str) -> bool: ...
async def list_consents(self, userid: str) -> list[Consent]: ...

View file

@ -0,0 +1,8 @@
CREATE TABLE user_consents (
userid TEXT NOT NULL REFERENCES users(userid) ON DELETE CASCADE,
client_id TEXT NOT NULL,
scopes TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
PRIMARY KEY (userid, client_id)
);

View file

@ -1,8 +1,9 @@
import json
from datetime import UTC, datetime
import aiosqlite
from porchlight.models import MagicLink, PasswordCredential, User, WebAuthnCredential
from porchlight.models import Consent, MagicLink, PasswordCredential, User, WebAuthnCredential
from porchlight.store.exceptions import DuplicateError
@ -289,3 +290,56 @@ class SQLiteMagicLinkRepository:
cursor = await self._db.execute("DELETE FROM magic_links WHERE expires_at < ? AND used = 0", (now,))
await self._db.commit()
return cursor.rowcount
class SQLiteConsentRepository:
def __init__(self, db: aiosqlite.Connection) -> None:
self._db = db
def _row_to_consent(self, row: aiosqlite.Row) -> Consent:
return Consent(
userid=row["userid"],
client_id=row["client_id"],
scopes=json.loads(row["scopes"]),
created_at=datetime.fromisoformat(row["created_at"]),
updated_at=datetime.fromisoformat(row["updated_at"]),
)
async def get_consent(self, userid: str, client_id: str) -> Consent | None:
async with self._db.execute(
"SELECT * FROM user_consents WHERE userid = ? AND client_id = ?",
(userid, client_id),
) as cursor:
row = await cursor.fetchone()
if row is None:
return None
return self._row_to_consent(row)
async def set_consent(self, userid: str, client_id: str, scopes: list[str]) -> None:
now = datetime.now(UTC).isoformat()
await self._db.execute(
"""
INSERT INTO user_consents (userid, client_id, scopes, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (userid, client_id)
DO UPDATE SET scopes = excluded.scopes, updated_at = excluded.updated_at
""",
(userid, client_id, json.dumps(scopes), now, now),
)
await self._db.commit()
async def delete_consent(self, userid: str, client_id: str) -> bool:
cursor = await self._db.execute(
"DELETE FROM user_consents WHERE userid = ? AND client_id = ?",
(userid, client_id),
)
await self._db.commit()
return cursor.rowcount > 0
async def list_consents(self, userid: str) -> list[Consent]:
async with self._db.execute(
"SELECT * FROM user_consents WHERE userid = ? ORDER BY client_id",
(userid,),
) as cursor:
rows = await cursor.fetchall()
return [self._row_to_consent(row) for row in rows]

View file

@ -13,7 +13,7 @@ async def test_run_migrations_applies_initial() -> None:
async with aiosqlite.connect(":memory:") as db:
await db.execute("PRAGMA foreign_keys=ON")
count = await run_migrations(db, MIGRATIONS_DIR)
assert count == 1
assert count == 2
async with db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'") as cursor:
row = await cursor.fetchone()
assert row is not None
@ -24,7 +24,7 @@ async def test_run_migrations_skips_already_applied() -> None:
await db.execute("PRAGMA foreign_keys=ON")
first_count = await run_migrations(db, MIGRATIONS_DIR)
second_count = await run_migrations(db, MIGRATIONS_DIR)
assert first_count == 1
assert first_count == 2
assert second_count == 0
@ -39,4 +39,5 @@ async def test_run_migrations_creates_all_tables() -> None:
assert "webauthn_credentials" in tables
assert "password_credentials" in tables
assert "magic_links" in tables
assert "user_consents" in tables
assert "_migrations" in tables

View file

@ -1,6 +1,7 @@
from typing import runtime_checkable
from porchlight.store.protocols import (
ConsentRepository,
CredentialRepository,
MagicLinkRepository,
UserRepository,
@ -11,3 +12,4 @@ def test_protocols_are_runtime_checkable() -> None:
assert runtime_checkable(UserRepository) # type: ignore[arg-type]
assert runtime_checkable(CredentialRepository) # type: ignore[arg-type]
assert runtime_checkable(MagicLinkRepository) # type: ignore[arg-type]
assert runtime_checkable(ConsentRepository) # type: ignore[arg-type]

View file

@ -0,0 +1,112 @@
from datetime import UTC, datetime
from porchlight.models import User
from porchlight.store.protocols import ConsentRepository
from porchlight.store.sqlite.repositories import SQLiteConsentRepository, SQLiteUserRepository
async def _create_user(db) -> User:
"""Helper to create a test user."""
user_repo = SQLiteUserRepository(db)
user = User(
userid="test-user-id",
username="testuser",
email="test@example.com",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
return await user_repo.create(user)
async def test_implements_protocol(db) -> None:
repo = SQLiteConsentRepository(db)
assert isinstance(repo, ConsentRepository)
async def test_set_and_get_consent(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
await repo.set_consent(user.userid, "test-rp", ["openid", "profile"])
consent = await repo.get_consent(user.userid, "test-rp")
assert consent is not None
assert consent.userid == user.userid
assert consent.client_id == "test-rp"
assert consent.scopes == ["openid", "profile"]
assert isinstance(consent.created_at, datetime)
assert isinstance(consent.updated_at, datetime)
async def test_get_consent_not_found(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
consent = await repo.get_consent(user.userid, "nonexistent")
assert consent is None
async def test_set_consent_upserts(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
await repo.set_consent(user.userid, "test-rp", ["openid"])
original = await repo.get_consent(user.userid, "test-rp")
assert original is not None
await repo.set_consent(user.userid, "test-rp", ["openid", "profile", "email"])
consent = await repo.get_consent(user.userid, "test-rp")
assert consent is not None
assert consent.scopes == ["openid", "profile", "email"]
assert consent.created_at == original.created_at
assert consent.updated_at >= original.updated_at
async def test_delete_consent(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
await repo.set_consent(user.userid, "test-rp", ["openid"])
result = await repo.delete_consent(user.userid, "test-rp")
assert result is True
consent = await repo.get_consent(user.userid, "test-rp")
assert consent is None
async def test_delete_consent_not_found(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
result = await repo.delete_consent(user.userid, "nonexistent")
assert result is False
async def test_list_consents(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
await repo.set_consent(user.userid, "rp-a", ["openid"])
await repo.set_consent(user.userid, "rp-b", ["openid", "profile"])
consents = await repo.list_consents(user.userid)
assert len(consents) == 2
client_ids = {c.client_id for c in consents}
assert client_ids == {"rp-a", "rp-b"}
async def test_list_consents_empty(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
consents = await repo.list_consents(user.userid)
assert consents == []
async def test_consent_deleted_on_user_cascade(db) -> None:
"""Consent records are deleted when the user is deleted (CASCADE)."""
user = await _create_user(db)
user_repo = SQLiteUserRepository(db)
repo = SQLiteConsentRepository(db)
await repo.set_consent(user.userid, "test-rp", ["openid"])
await user_repo.delete(user.userid)
consent = await repo.get_consent(user.userid, "test-rp")
assert consent is None