from argon2 import PasswordHasher from httpx import AsyncClient from porchlight.authn.password import PasswordService from porchlight.invite.service import MagicLinkService from porchlight.models import PasswordCredential, User from tests.conftest import get_csrf_token async def test_register_invalid_token_returns_error_page(client: AsyncClient) -> None: res = await client.get("/register/nope", follow_redirects=False) assert res.status_code == 400 assert "Invalid or expired" in res.text async def test_register_expired_token_returns_error_page(client: AsyncClient) -> None: app = client._transport.app # type: ignore[union-attr] # An already-expired link (negative TTL), stored hashed like the real service. expired_service = MagicLinkService(repo=app.state.magic_link_repo, ttl=-3600) link = await expired_service.create(username="newuser") res = await client.get(f"/register/{link.token}", follow_redirects=False) assert res.status_code == 400 assert "Invalid or expired" in res.text async def test_register_get_does_not_consume_token(client: AsyncClient) -> None: """GET is side-effect free: it shows a confirmation form and must not consume the token or create a session.""" app = client._transport.app # type: ignore[union-attr] magic_link_service = app.state.magic_link_service link = await magic_link_service.create(username="newuser") res = await client.get(f"/register/{link.token}", follow_redirects=False) assert res.status_code == 200 assert f'action="/register/{link.token}"' in res.text # Token still valid (not consumed by the GET). assert await magic_link_service.validate(link.token) is not None async def test_register_post_creates_user_and_redirects(client: AsyncClient) -> None: app = client._transport.app # type: ignore[union-attr] magic_link_service = app.state.magic_link_service user_repo = app.state.user_repo link = await magic_link_service.create(username="newuser") csrf = await get_csrf_token(client) res = await client.post( f"/register/{link.token}", headers={"X-CSRF-Token": csrf}, follow_redirects=False, ) assert res.status_code in (302, 303) assert "/manage/credentials" in res.headers["location"] assert "setup=1" in res.headers["location"] # Token should be consumed (no longer valid) assert await magic_link_service.validate(link.token) is None # User should exist user = await user_repo.get_by_username("newuser") assert user is not None assert "users" in user.groups async def test_register_post_requires_csrf(client: AsyncClient) -> None: app = client._transport.app # type: ignore[union-attr] magic_link_service = app.state.magic_link_service link = await magic_link_service.create(username="newuser") res = await client.post(f"/register/{link.token}", follow_redirects=False) assert res.status_code == 403 # Token must not have been consumed by the rejected request. assert await magic_link_service.validate(link.token) is not None async def test_register_used_token_returns_error(client: AsyncClient) -> None: app = client._transport.app # type: ignore[union-attr] magic_link_service = app.state.magic_link_service link = await magic_link_service.create(username="newuser") await magic_link_service.mark_used(link.token) res = await client.get(f"/register/{link.token}", follow_redirects=False) assert res.status_code == 400 async def test_register_existing_user_logs_in_and_redirects(client: AsyncClient) -> None: """When initial-admin creates a user (no credentials yet), the invite link should let them set up their account.""" app = client._transport.app # type: ignore[union-attr] magic_link_service = app.state.magic_link_service user_repo = app.state.user_repo # Pre-create the user (as initial-admin would), with no credentials. user = User(userid="lusab-bansen", username="admin", groups=["admin", "users"]) await user_repo.create(user) link = await magic_link_service.create(username="admin") csrf = await get_csrf_token(client) res = await client.post( f"/register/{link.token}", headers={"X-CSRF-Token": csrf}, follow_redirects=False, ) assert res.status_code in (302, 303) assert "/manage/credentials" in res.headers["location"] assert "setup=1" in res.headers["location"] # Token should be consumed assert await magic_link_service.validate(link.token) is None # Original user should still exist with original groups existing = await user_repo.get_by_username("admin") assert existing is not None assert existing.userid == "lusab-bansen" assert "admin" in existing.groups async def test_register_rejects_user_that_already_has_credentials(client: AsyncClient) -> None: """An invite/re-invite link must not act as a passwordless login for an account that already has credentials. Recovery is a separate flow.""" app = client._transport.app # type: ignore[union-attr] magic_link_service = app.state.magic_link_service user_repo = app.state.user_repo cred_repo = app.state.credential_repo user = User(userid="lusab-hascreds", username="hascreds", groups=["users"]) await user_repo.create(user) svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("existing-pass"))) link = await magic_link_service.create(username="hascreds") csrf = await get_csrf_token(client) res = await client.post( f"/register/{link.token}", headers={"X-CSRF-Token": csrf}, follow_redirects=False, ) # No passwordless login: rejected, not redirected to setup. assert res.status_code == 400 assert "setup=1" not in res.headers.get("location", "")