Merge branch 'feature/admin-pages'
# Conflicts: # src/porchlight/app.py
This commit is contained in:
commit
33a61ecc2a
20 changed files with 1542 additions and 0 deletions
238
tests/e2e/admin.spec.js
Normal file
238
tests/e2e/admin.spec.js
Normal file
|
|
@ -0,0 +1,238 @@
|
|||
// @ts-check
|
||||
const { test, expect } = require('@playwright/test');
|
||||
|
||||
const fixtures = JSON.parse(process.env.E2E_FIXTURES || '{}');
|
||||
|
||||
/** Log in as the admin user and land on /manage/credentials. */
|
||||
async function loginAsAdmin(page) {
|
||||
await page.goto('/login');
|
||||
await page.fill('#username', fixtures.admin_username);
|
||||
await page.fill('#password', fixtures.admin_password);
|
||||
await page.click('form[hx-post="/login/password"] button[type="submit"]');
|
||||
await page.waitForURL('**/manage/credentials', { timeout: 5000 });
|
||||
}
|
||||
|
||||
/** Log in as a regular (non-admin) user. */
|
||||
async function loginAsRegularUser(page) {
|
||||
await page.goto('/login');
|
||||
await page.fill('#username', fixtures.login_username);
|
||||
await page.fill('#password', fixtures.login_password);
|
||||
await page.click('form[hx-post="/login/password"] button[type="submit"]');
|
||||
await page.waitForURL('**/manage/credentials', { timeout: 5000 });
|
||||
}
|
||||
|
||||
test.describe('Admin pages', () => {
|
||||
// ---------------------------------------------------------------
|
||||
// 1 & 2. Auth guards
|
||||
// ---------------------------------------------------------------
|
||||
test.describe('Auth guard', () => {
|
||||
test('unauthenticated user visiting /admin/users is redirected to /login', async ({ page }) => {
|
||||
await page.goto('/admin/users');
|
||||
await page.waitForURL('**/login', { timeout: 5000 });
|
||||
expect(page.url()).toContain('/login');
|
||||
});
|
||||
|
||||
test('non-admin logged-in user gets 403', async ({ page }) => {
|
||||
await loginAsRegularUser(page);
|
||||
const response = await page.goto('/admin/users');
|
||||
expect(response.status()).toBe(403);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// 3 & 4 & 5. User list page
|
||||
// ---------------------------------------------------------------
|
||||
test.describe('User list page', () => {
|
||||
test.beforeEach(async ({ page }) => {
|
||||
await loginAsAdmin(page);
|
||||
await page.goto('/admin/users');
|
||||
});
|
||||
|
||||
test('has correct page structure', async ({ page }) => {
|
||||
await expect(page.locator('h1')).toHaveText('Users');
|
||||
await expect(page.locator('.admin-table thead th')).toHaveCount(6);
|
||||
await expect(page.locator('input[name="q"]')).toBeVisible();
|
||||
await expect(page.locator('form[hx-post="/admin/invite"]')).toBeVisible();
|
||||
});
|
||||
|
||||
test('table headers are correct', async ({ page }) => {
|
||||
const headers = page.locator('.admin-table thead th');
|
||||
await expect(headers.nth(0)).toHaveText('Username');
|
||||
await expect(headers.nth(1)).toHaveText('Name');
|
||||
await expect(headers.nth(2)).toHaveText('Email');
|
||||
await expect(headers.nth(3)).toHaveText('Groups');
|
||||
await expect(headers.nth(4)).toHaveText('Status');
|
||||
await expect(headers.nth(5)).toHaveText('Created');
|
||||
});
|
||||
|
||||
test('shows seeded users', async ({ page }) => {
|
||||
const tableBody = page.locator('#user-table-body');
|
||||
await expect(tableBody).toContainText('testuser');
|
||||
await expect(tableBody).toContainText('adminuser');
|
||||
});
|
||||
|
||||
test('search filters results', async ({ page }) => {
|
||||
const searchInput = page.locator('input[name="q"]');
|
||||
await searchInput.fill('admin');
|
||||
// Wait for htmx debounce (300ms) and response
|
||||
await expect(page.locator('#user-table-body')).toContainText('adminuser', { timeout: 5000 });
|
||||
// Other users should be filtered out
|
||||
await expect(page.locator('#user-table-body')).not.toContainText('testuser', { timeout: 3000 });
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// 6. User detail page structure
|
||||
// ---------------------------------------------------------------
|
||||
test.describe('User detail page', () => {
|
||||
test.beforeEach(async ({ page }) => {
|
||||
await loginAsAdmin(page);
|
||||
await page.goto('/admin/users');
|
||||
});
|
||||
|
||||
test('clicking a user link shows detail page', async ({ page }) => {
|
||||
await page.click(`a:has-text("adminuser")`);
|
||||
await page.waitForURL('**/admin/users/**', { timeout: 5000 });
|
||||
|
||||
await expect(page.locator('h1')).toHaveText('adminuser');
|
||||
// Profile section
|
||||
await expect(page.locator('h2:has-text("Profile")')).toBeVisible();
|
||||
await expect(page.locator('#given_name')).toBeVisible();
|
||||
// Groups section
|
||||
await expect(page.locator('h2:has-text("Groups")')).toBeVisible();
|
||||
await expect(page.locator('#groups')).toBeVisible();
|
||||
// Credentials section
|
||||
await expect(page.locator('h2:has-text("Credentials")')).toBeVisible();
|
||||
// Actions section
|
||||
await expect(page.locator('h2:has-text("Actions")')).toBeVisible();
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// 7. Profile update
|
||||
// ---------------------------------------------------------------
|
||||
test.describe('Profile update', () => {
|
||||
test('fill in profile fields and save', async ({ page }) => {
|
||||
await loginAsAdmin(page);
|
||||
await page.goto(`/admin/users/${fixtures.admin_userid}`);
|
||||
|
||||
await page.fill('#given_name', 'Updated');
|
||||
await page.fill('#family_name', 'Name');
|
||||
await page.fill('#email', 'updated@example.com');
|
||||
|
||||
await page.click('section:has(h2:has-text("Profile")) button[type="submit"]');
|
||||
|
||||
const status = page.locator('#profile-status [role="status"]');
|
||||
await expect(status).toBeVisible({ timeout: 5000 });
|
||||
await expect(status).toContainText('Profile updated');
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// 8. Groups update
|
||||
// ---------------------------------------------------------------
|
||||
test.describe('Groups update', () => {
|
||||
test('change groups and save', async ({ page }) => {
|
||||
await loginAsAdmin(page);
|
||||
await page.goto(`/admin/users/${fixtures.admin_userid}`);
|
||||
|
||||
await page.fill('#groups', 'admin, users, editors');
|
||||
await page.click('section:has(h2:has-text("Groups")) button[type="submit"]');
|
||||
|
||||
const status = page.locator('#groups-status [role="status"]');
|
||||
await expect(status).toBeVisible({ timeout: 5000 });
|
||||
await expect(status).toContainText('Groups updated');
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// 9. Activate/deactivate toggle
|
||||
// ---------------------------------------------------------------
|
||||
test.describe('Activate/deactivate toggle', () => {
|
||||
test('deactivate then activate user', async ({ page }) => {
|
||||
await loginAsAdmin(page);
|
||||
// Use a non-admin user for this test
|
||||
await page.goto(`/admin/users/${fixtures.admin_userid}`);
|
||||
|
||||
// Click deactivate
|
||||
await page.click('button:has-text("Deactivate user")');
|
||||
const deactivatedStatus = page.locator('#actions-section [role="status"]');
|
||||
await expect(deactivatedStatus).toBeVisible({ timeout: 5000 });
|
||||
await expect(deactivatedStatus).toContainText('User deactivated');
|
||||
|
||||
// Now an activate button should appear
|
||||
await expect(page.locator('button:has-text("Activate user")')).toBeVisible();
|
||||
|
||||
// Click activate
|
||||
await page.click('button:has-text("Activate user")');
|
||||
const activatedStatus = page.locator('#actions-section [role="status"]');
|
||||
await expect(activatedStatus).toBeVisible({ timeout: 5000 });
|
||||
await expect(activatedStatus).toContainText('User activated');
|
||||
|
||||
// Deactivate button should reappear
|
||||
await expect(page.locator('button:has-text("Deactivate user")')).toBeVisible();
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// 10. Create invite from user list
|
||||
// ---------------------------------------------------------------
|
||||
test.describe('Create invite', () => {
|
||||
test('fill username and submit to get invite URL', async ({ page }) => {
|
||||
await loginAsAdmin(page);
|
||||
await page.goto('/admin/users');
|
||||
|
||||
await page.fill('form[hx-post="/admin/invite"] input[name="username"]', 'inviteduser');
|
||||
await page.click('form[hx-post="/admin/invite"] button[type="submit"]');
|
||||
|
||||
const inviteStatus = page.locator('#invite-status');
|
||||
await expect(inviteStatus.locator('.invite-url')).toBeVisible({ timeout: 5000 });
|
||||
const inviteUrl = await inviteStatus.locator('.invite-url').textContent();
|
||||
expect(inviteUrl).toContain('/register/');
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// 11. Re-invite from user detail
|
||||
// ---------------------------------------------------------------
|
||||
test.describe('Re-invite from user detail', () => {
|
||||
test('generate invite link from user detail page', async ({ page }) => {
|
||||
await loginAsAdmin(page);
|
||||
await page.goto(`/admin/users/${fixtures.admin_userid}`);
|
||||
|
||||
await page.click('button:has-text("Generate invite link")');
|
||||
|
||||
const inviteResult = page.locator('#invite-result');
|
||||
await expect(inviteResult.locator('.invite-url')).toBeVisible({ timeout: 5000 });
|
||||
const inviteUrl = await inviteResult.locator('.invite-url').textContent();
|
||||
expect(inviteUrl).toContain('/register/');
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// 12. Delete user
|
||||
// ---------------------------------------------------------------
|
||||
test.describe('Delete user', () => {
|
||||
test('delete a user and verify redirect to user list', async ({ page }) => {
|
||||
await loginAsAdmin(page);
|
||||
|
||||
// Use a disposable user seeded specifically for this test
|
||||
await page.goto(`/admin/users/${fixtures.disposable_userid}`);
|
||||
await expect(page.locator('h1')).toHaveText('disposableuser');
|
||||
|
||||
// Set up dialog handler before clicking delete
|
||||
page.on('dialog', async (dialog) => {
|
||||
await dialog.accept();
|
||||
});
|
||||
|
||||
await page.click('button:has-text("Delete user")');
|
||||
|
||||
// htmx processes the HX-Redirect header and navigates to /admin/users
|
||||
await page.waitForURL('**/admin/users', { timeout: 5000 });
|
||||
await expect(page.locator('h1')).toHaveText('Users');
|
||||
|
||||
// The deleted user should no longer appear in the list
|
||||
await expect(page.locator('#user-table-body')).not.toContainText('disposableuser');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -100,6 +100,28 @@ async def seed() -> None:
|
|||
result["profile_username"] = "profileuser"
|
||||
result["profile_password"] = "profilepass123"
|
||||
|
||||
# 6. Admin user for admin page tests
|
||||
admin_user = User(
|
||||
userid="test-user-05",
|
||||
username="adminuser",
|
||||
given_name="Admin",
|
||||
family_name="User",
|
||||
email="admin@example.com",
|
||||
groups=["admin", "users"],
|
||||
)
|
||||
await user_repo.create(admin_user)
|
||||
admin_password_hash = password_service.hash("adminpass123")
|
||||
await cred_repo.create_password(PasswordCredential(user_id=admin_user.userid, password_hash=admin_password_hash))
|
||||
result["admin_username"] = "adminuser"
|
||||
result["admin_password"] = "adminpass123"
|
||||
result["admin_userid"] = "test-user-05"
|
||||
|
||||
# 7. Disposable user for admin delete test (not used by any other tests)
|
||||
disposable_user = User(userid="test-user-06", username="disposableuser", groups=["users"])
|
||||
await user_repo.create(disposable_user)
|
||||
result["disposable_userid"] = "test-user-06"
|
||||
result["disposable_username"] = "disposableuser"
|
||||
|
||||
await db.commit()
|
||||
await db.close()
|
||||
print(json.dumps(result))
|
||||
|
|
|
|||
0
tests/test_admin/__init__.py
Normal file
0
tests/test_admin/__init__.py
Normal file
53
tests/test_admin/test_admin_guard.py
Normal file
53
tests/test_admin/test_admin_guard.py
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
from datetime import UTC, datetime
|
||||
|
||||
import pytest
|
||||
from argon2 import PasswordHasher
|
||||
from httpx import AsyncClient
|
||||
|
||||
from porchlight.authn.password import PasswordService
|
||||
from porchlight.models import PasswordCredential, User
|
||||
|
||||
|
||||
async def _login(
|
||||
client: AsyncClient, username: str = "alice", password: str = "testpass", *, groups: list[str] | None = None
|
||||
) -> None:
|
||||
"""Helper: create user + password credential and log in via POST /login/password."""
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
user_repo = app.state.user_repo
|
||||
cred_repo = app.state.credential_repo
|
||||
|
||||
user = await user_repo.get_by_username(username)
|
||||
if user is None:
|
||||
user = User(
|
||||
userid="test-user-01",
|
||||
username=username,
|
||||
groups=groups or [],
|
||||
created_at=datetime.now(UTC),
|
||||
updated_at=datetime.now(UTC),
|
||||
)
|
||||
await user_repo.create(user)
|
||||
|
||||
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||
existing = await cred_repo.get_password_by_user(user.userid)
|
||||
if existing is None:
|
||||
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash(password)))
|
||||
|
||||
await client.post(
|
||||
"/login/password",
|
||||
data={"username": username, "password": password},
|
||||
headers={"HX-Request": "true"},
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_users_redirects_unauthenticated(client: AsyncClient) -> None:
|
||||
response = await client.get("/admin/users", follow_redirects=False)
|
||||
assert response.status_code == 303
|
||||
assert response.headers["location"] == "/login"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_users_403_for_non_admin(client: AsyncClient) -> None:
|
||||
await _login(client, username="regularuser", password="password123", groups=["users"])
|
||||
response = await client.get("/admin/users", follow_redirects=False)
|
||||
assert response.status_code == 403
|
||||
391
tests/test_admin/test_admin_routes.py
Normal file
391
tests/test_admin/test_admin_routes.py
Normal file
|
|
@ -0,0 +1,391 @@
|
|||
from datetime import UTC, datetime
|
||||
|
||||
import pytest
|
||||
from argon2 import PasswordHasher
|
||||
from httpx import AsyncClient
|
||||
|
||||
from porchlight.authn.password import PasswordService
|
||||
from porchlight.models import PasswordCredential, User, WebAuthnCredential
|
||||
|
||||
|
||||
async def _login(
|
||||
client: AsyncClient,
|
||||
username: str = "admin",
|
||||
password: str = "adminpass",
|
||||
*,
|
||||
userid: str = "admin-user-01",
|
||||
groups: list[str] | None = None,
|
||||
) -> None:
|
||||
"""Helper: create user + password credential and log in via POST /login/password."""
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
user_repo = app.state.user_repo
|
||||
cred_repo = app.state.credential_repo
|
||||
|
||||
user = await user_repo.get_by_username(username)
|
||||
if user is None:
|
||||
user = User(
|
||||
userid=userid,
|
||||
username=username,
|
||||
groups=groups if groups is not None else ["admin"],
|
||||
created_at=datetime.now(UTC),
|
||||
updated_at=datetime.now(UTC),
|
||||
)
|
||||
await user_repo.create(user)
|
||||
|
||||
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||
existing = await cred_repo.get_password_by_user(user.userid)
|
||||
if existing is None:
|
||||
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash(password)))
|
||||
|
||||
await client.post(
|
||||
"/login/password",
|
||||
data={"username": username, "password": password},
|
||||
headers={"HX-Request": "true"},
|
||||
)
|
||||
|
||||
|
||||
async def _create_target_user(
|
||||
client: AsyncClient,
|
||||
*,
|
||||
userid: str = "target-user-01",
|
||||
username: str = "bob",
|
||||
email: str | None = None,
|
||||
groups: list[str] | None = None,
|
||||
active: bool = True,
|
||||
) -> User:
|
||||
"""Helper: create a target user in the database (does NOT log in)."""
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
user_repo = app.state.user_repo
|
||||
user = User(
|
||||
userid=userid,
|
||||
username=username,
|
||||
email=email,
|
||||
groups=groups or [],
|
||||
active=active,
|
||||
created_at=datetime.now(UTC),
|
||||
updated_at=datetime.now(UTC),
|
||||
)
|
||||
return await user_repo.create(user)
|
||||
|
||||
|
||||
# --- User list ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_users_list_returns_html(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
response = await client.get("/admin/users")
|
||||
assert response.status_code == 200
|
||||
assert "text/html" in response.headers["content-type"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_users_list_shows_users(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
await _create_target_user(client, username="bob")
|
||||
response = await client.get("/admin/users")
|
||||
assert response.status_code == 200
|
||||
assert "bob" in response.text
|
||||
assert "admin" in response.text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_users_list_search(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
await _create_target_user(client, userid="target-user-01", username="bob")
|
||||
await _create_target_user(client, userid="target-user-02", username="carol")
|
||||
response = await client.get("/admin/users?q=bob")
|
||||
assert response.status_code == 200
|
||||
assert "bob" in response.text
|
||||
assert "carol" not in response.text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_users_list_htmx_search_returns_partial(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
await _create_target_user(client, username="bob")
|
||||
response = await client.get(
|
||||
"/admin/users?q=bob",
|
||||
headers={"HX-Request": "true", "HX-Trigger-Name": "q"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
# Partial should contain user row data but not the full page layout
|
||||
assert "bob" in response.text
|
||||
assert "<thead>" not in response.text
|
||||
assert "<html" not in response.text
|
||||
|
||||
|
||||
# --- User detail ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_user_detail_returns_html(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
target = await _create_target_user(client, username="bob")
|
||||
response = await client.get(f"/admin/users/{target.userid}")
|
||||
assert response.status_code == 200
|
||||
assert "text/html" in response.headers["content-type"]
|
||||
assert "bob" in response.text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_user_detail_404_for_nonexistent(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
response = await client.get("/admin/users/nonexistent-id")
|
||||
assert response.status_code == 404
|
||||
assert "not found" in response.text.lower()
|
||||
|
||||
|
||||
# --- Profile update ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_profile(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
target = await _create_target_user(client, username="bob")
|
||||
response = await client.post(
|
||||
f"/admin/users/{target.userid}/profile",
|
||||
data={
|
||||
"given_name": "Bob",
|
||||
"family_name": "Smith",
|
||||
"preferred_username": "bobby",
|
||||
"email": "bob@example.com",
|
||||
"phone_number": "+1234567890",
|
||||
"picture": "https://example.com/bob.jpg",
|
||||
"locale": "en-US",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "Profile updated" in response.text
|
||||
|
||||
# Verify user was actually updated
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
user = await app.state.user_repo.get_by_userid(target.userid)
|
||||
assert user is not None
|
||||
assert user.given_name == "Bob"
|
||||
assert user.family_name == "Smith"
|
||||
assert user.email == "bob@example.com"
|
||||
assert user.picture == "https://example.com/bob.jpg"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_profile_invalid_email(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
target = await _create_target_user(client, username="bob")
|
||||
response = await client.post(
|
||||
f"/admin/users/{target.userid}/profile",
|
||||
data={"email": "not-an-email"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "Invalid email" in response.text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_profile_invalid_picture_url(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
target = await _create_target_user(client, username="bob")
|
||||
response = await client.post(
|
||||
f"/admin/users/{target.userid}/profile",
|
||||
data={"picture": "not-a-url"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "Picture URL must be a valid HTTP or HTTPS URL" in response.text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_profile_404_for_nonexistent(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
response = await client.post(
|
||||
"/admin/users/nonexistent-id/profile",
|
||||
data={"given_name": "Ghost"},
|
||||
)
|
||||
assert response.status_code == 404
|
||||
assert "not found" in response.text.lower()
|
||||
|
||||
|
||||
# --- Groups update ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_groups(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
target = await _create_target_user(client, username="bob")
|
||||
response = await client.post(
|
||||
f"/admin/users/{target.userid}/groups",
|
||||
data={"groups": "admin, users, editors"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "Groups updated" in response.text
|
||||
|
||||
# Verify groups were actually updated
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
user = await app.state.user_repo.get_by_userid(target.userid)
|
||||
assert user is not None
|
||||
assert set(user.groups) == {"admin", "users", "editors"}
|
||||
|
||||
|
||||
# --- Activate / deactivate ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_activate_user(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
target = await _create_target_user(client, username="bob", active=False)
|
||||
response = await client.post(f"/admin/users/{target.userid}/activate")
|
||||
assert response.status_code == 200
|
||||
assert "User activated" in response.text
|
||||
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
user = await app.state.user_repo.get_by_userid(target.userid)
|
||||
assert user is not None
|
||||
assert user.active is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deactivate_user(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
target = await _create_target_user(client, username="bob", active=True)
|
||||
response = await client.post(f"/admin/users/{target.userid}/deactivate")
|
||||
assert response.status_code == 200
|
||||
assert "User deactivated" in response.text
|
||||
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
user = await app.state.user_repo.get_by_userid(target.userid)
|
||||
assert user is not None
|
||||
assert user.active is False
|
||||
|
||||
|
||||
# --- Delete user ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_user(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
target = await _create_target_user(client, username="bob")
|
||||
response = await client.request("DELETE", f"/admin/users/{target.userid}")
|
||||
assert response.status_code == 200
|
||||
assert "User deleted" in response.text
|
||||
assert response.headers.get("hx-redirect") == "/admin/users"
|
||||
|
||||
# Verify user was actually deleted
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
user = await app.state.user_repo.get_by_userid(target.userid)
|
||||
assert user is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_user_self_prevention(client: AsyncClient) -> None:
|
||||
await _login(client, userid="admin-user-01")
|
||||
# Try to delete ourselves
|
||||
response = await client.request("DELETE", "/admin/users/admin-user-01")
|
||||
assert response.status_code == 200
|
||||
assert "Cannot delete your own account" in response.text
|
||||
|
||||
# Verify admin user was NOT deleted
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
user = await app.state.user_repo.get_by_userid("admin-user-01")
|
||||
assert user is not None
|
||||
|
||||
|
||||
# --- Delete credentials ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_password_credential(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
target = await _create_target_user(client, username="bob")
|
||||
|
||||
# Create a password credential for the target user
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
cred_repo = app.state.credential_repo
|
||||
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||
await cred_repo.create_password(PasswordCredential(user_id=target.userid, password_hash=svc.hash("bobpass")))
|
||||
|
||||
response = await client.request("DELETE", f"/admin/users/{target.userid}/credentials/password")
|
||||
assert response.status_code == 200
|
||||
assert "text/html" in response.headers["content-type"]
|
||||
|
||||
# Verify password was deleted
|
||||
pw = await cred_repo.get_password_by_user(target.userid)
|
||||
assert pw is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_webauthn_credential(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
target = await _create_target_user(client, username="bob")
|
||||
|
||||
# Create a webauthn credential for the target user
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
cred_repo = app.state.credential_repo
|
||||
credential_id = b"\x01\x02\x03\x04\x05\x06\x07\x08"
|
||||
await cred_repo.create_webauthn(
|
||||
WebAuthnCredential(
|
||||
user_id=target.userid,
|
||||
credential_id=credential_id,
|
||||
public_key=b"\x00" * 32,
|
||||
sign_count=0,
|
||||
device_name="test-key",
|
||||
)
|
||||
)
|
||||
|
||||
# URL uses base64url without padding
|
||||
from base64 import urlsafe_b64encode
|
||||
|
||||
credential_id_b64 = urlsafe_b64encode(credential_id).decode().rstrip("=")
|
||||
|
||||
response = await client.request("DELETE", f"/admin/users/{target.userid}/credentials/webauthn/{credential_id_b64}")
|
||||
assert response.status_code == 200
|
||||
assert "text/html" in response.headers["content-type"]
|
||||
|
||||
# Verify credential was deleted
|
||||
creds = await cred_repo.get_webauthn_by_user(target.userid)
|
||||
assert len(creds) == 0
|
||||
|
||||
|
||||
# --- Create invite ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_invite(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
response = await client.post(
|
||||
"/admin/invite",
|
||||
data={"username": "newuser"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "Invite created" in response.text
|
||||
assert "newuser" in response.text
|
||||
assert "/register/" in response.text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_invite_empty_username(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
response = await client.post(
|
||||
"/admin/invite",
|
||||
data={"username": " "},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "Username is required" in response.text
|
||||
|
||||
|
||||
# --- Re-invite ---
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reinvite_user(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
target = await _create_target_user(client, username="bob")
|
||||
response = await client.post(f"/admin/users/{target.userid}/invite")
|
||||
assert response.status_code == 200
|
||||
assert "Invite link generated" in response.text
|
||||
assert "/register/" in response.text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reinvite_user_404_for_nonexistent(client: AsyncClient) -> None:
|
||||
await _login(client)
|
||||
response = await client.post("/admin/users/nonexistent-id/invite")
|
||||
assert response.status_code == 404
|
||||
assert "not found" in response.text.lower()
|
||||
|
|
@ -159,6 +159,49 @@ async def test_create_duplicate_username(user_repo: SQLiteUserRepository) -> Non
|
|||
await user_repo.create(_make_user(userid="different-id", username="alice"))
|
||||
|
||||
|
||||
async def test_search_users_by_username(user_repo: SQLiteUserRepository) -> None:
|
||||
await user_repo.create(_make_user(userid="id-1", username="sample_user"))
|
||||
results = await user_repo.search_users("sample", offset=0, limit=100)
|
||||
assert len(results) == 1
|
||||
assert results[0].userid == "id-1"
|
||||
|
||||
|
||||
async def test_search_users_by_email(user_repo: SQLiteUserRepository) -> None:
|
||||
await user_repo.create(_make_user(email="alice@example.com"))
|
||||
results = await user_repo.search_users("alice", offset=0, limit=100)
|
||||
assert len(results) == 1
|
||||
|
||||
|
||||
async def test_search_users_no_match(user_repo: SQLiteUserRepository) -> None:
|
||||
await user_repo.create(_make_user())
|
||||
results = await user_repo.search_users("nonexistent", offset=0, limit=100)
|
||||
assert len(results) == 0
|
||||
|
||||
|
||||
async def test_search_users_pagination(user_repo: SQLiteUserRepository) -> None:
|
||||
for i in range(5):
|
||||
await user_repo.create(_make_user(userid=f"id-{i}", username=f"user{i}", groups=["users"]))
|
||||
page1 = await user_repo.search_users("user", offset=0, limit=2)
|
||||
page2 = await user_repo.search_users("user", offset=2, limit=2)
|
||||
assert len(page1) == 2
|
||||
assert len(page2) == 2
|
||||
assert page1[0].username != page2[0].username
|
||||
|
||||
|
||||
async def test_count_users_no_query(user_repo: SQLiteUserRepository) -> None:
|
||||
await user_repo.create(_make_user())
|
||||
count = await user_repo.count_users()
|
||||
assert count == 1
|
||||
|
||||
|
||||
async def test_count_users_with_query(user_repo: SQLiteUserRepository) -> None:
|
||||
await user_repo.create(_make_user())
|
||||
count = await user_repo.count_users(query="alice")
|
||||
assert count == 1
|
||||
count = await user_repo.count_users(query="nonexistent")
|
||||
assert count == 0
|
||||
|
||||
|
||||
async def test_roundtrip_preserves_all_fields(user_repo: SQLiteUserRepository) -> None:
|
||||
user = _make_user(
|
||||
preferred_username="ally",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue