diff --git a/src/porchlight/authn/routes.py b/src/porchlight/authn/routes.py index d99057d..2ef2cea 100644 --- a/src/porchlight/authn/routes.py +++ b/src/porchlight/authn/routes.py @@ -75,9 +75,13 @@ async def register_magic_link(request: Request, token: str) -> Response: if link is None: return HTMLResponse("

Invalid or expired registration link.

", status_code=400) - userid = await generate_unique_userid(user_repo) - user = User(userid=userid, username=link.username, groups=["users"]) - await user_repo.create(user) + existing_user = await user_repo.get_by_username(link.username) + if existing_user is not None: + user = existing_user + else: + userid = await generate_unique_userid(user_repo) + user = User(userid=userid, username=link.username, groups=["users"]) + await user_repo.create(user) await magic_link_service.mark_used(token) diff --git a/src/porchlight/cli.py b/src/porchlight/cli.py new file mode 100644 index 0000000..6bc5f7a --- /dev/null +++ b/src/porchlight/cli.py @@ -0,0 +1,74 @@ +import asyncio +from pathlib import Path +from typing import Annotated + +import typer + +from porchlight.config import Settings +from porchlight.invite.service import MagicLinkService +from porchlight.models import User +from porchlight.store.sqlite.db import open_db +from porchlight.store.sqlite.repositories import SQLiteMagicLinkRepository, SQLiteUserRepository +from porchlight.userid import generate_unique_userid + +PACKAGE_DIR = Path(__file__).resolve().parent +MIGRATIONS_DIR = PACKAGE_DIR / "store" / "sqlite" / "migrations" + +app = typer.Typer() + + +async def _create_invite(settings: Settings, username: str, ttl: int, note: str | None) -> str: + """Create an invite link and return the full registration URL.""" + async with open_db(settings.sqlite_path, MIGRATIONS_DIR) as db: + repo = SQLiteMagicLinkRepository(db) + service = MagicLinkService(repo, ttl=ttl) + link = await service.create(username=username, created_by="cli", note=note) + return f"{settings.issuer}/register/{link.token}" + + +async def _initial_admin(settings: Settings, username: str, groups: list[str]) -> str: + """Create an admin user and invite link, return the full registration URL.""" + async with open_db(settings.sqlite_path, MIGRATIONS_DIR) as db: + user_repo = SQLiteUserRepository(db) + + existing = await user_repo.get_by_username(username) + if existing is not None: + raise typer.BadParameter(f"User '{username}' already exists.") + + userid = await generate_unique_userid(user_repo) + user = User(userid=userid, username=username, groups=groups) + await user_repo.create(user) + + magic_link_repo = SQLiteMagicLinkRepository(db) + service = MagicLinkService(magic_link_repo, ttl=settings.invite_ttl) + link = await service.create(username=username, created_by="cli", note="initial admin setup") + return f"{settings.issuer}/register/{link.token}" + + +@app.command() +def create_invite( + username: str, + ttl: Annotated[int | None, typer.Option(help="Link expiration in seconds")] = None, + note: Annotated[str | None, typer.Option(help="Optional note stored with the link")] = None, +) -> None: + """Generate a magic link registration URL for a new user.""" + settings = Settings() # type: ignore[call-arg] + effective_ttl = ttl if ttl is not None else settings.invite_ttl + url = asyncio.run(_create_invite(settings, username, effective_ttl, note)) + typer.echo(url) + + +@app.command() +def initial_admin( + username: str, + group: Annotated[list[str] | None, typer.Option(help="Groups to assign (repeatable)")] = None, +) -> None: + """Bootstrap the first admin user with a registration link.""" + settings = Settings() # type: ignore[call-arg] + groups = group if group is not None else ["admin", "users"] + url = asyncio.run(_initial_admin(settings, username, groups)) + typer.echo(url) + + +def main() -> None: + app() diff --git a/tests/test_auth_routes/test_register_magic_link.py b/tests/test_auth_routes/test_register_magic_link.py index 6a0cbd6..1a03fed 100644 --- a/tests/test_auth_routes/test_register_magic_link.py +++ b/tests/test_auth_routes/test_register_magic_link.py @@ -2,7 +2,7 @@ from datetime import UTC, datetime, timedelta from httpx import AsyncClient -from porchlight.models import MagicLink +from porchlight.models import MagicLink, User async def test_register_invalid_token_returns_error_page(client: AsyncClient) -> None: @@ -70,3 +70,39 @@ async def test_register_used_token_returns_error(client: AsyncClient) -> None: res = await client.get("/register/used", follow_redirects=False) assert res.status_code == 400 + + +async def test_register_existing_user_logs_in_and_redirects(client: AsyncClient) -> None: + """When initial-admin creates a user, the invite link should log them in.""" + app = client._transport.app # type: ignore[union-attr] + magic_link_repo = app.state.magic_link_repo + user_repo = app.state.user_repo + + # Pre-create the user (as initial-admin would) + user = User(userid="lusab-bansen", username="admin", groups=["admin", "users"]) + await user_repo.create(user) + + # Create invite for the same username + await magic_link_repo.create( + MagicLink( + token="admin-setup", + username="admin", + expires_at=datetime.now(UTC) + timedelta(hours=1), + ) + ) + + res = await client.get("/register/admin-setup", follow_redirects=False) + assert res.status_code in (302, 303) + assert "/manage/credentials" in res.headers["location"] + assert "setup=1" in res.headers["location"] + + # Token should be marked used + link = await magic_link_repo.get_by_token("admin-setup") + assert link is not None + assert link.used is True + + # Original user should still exist with original groups + existing = await user_repo.get_by_username("admin") + assert existing is not None + assert existing.userid == "lusab-bansen" + assert "admin" in existing.groups diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..f22b61d --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,86 @@ +import os +import tempfile + +from typer.testing import CliRunner + +from porchlight.cli import app + +runner = CliRunner() + +ENV = {"OIDC_OP_ISSUER": "https://example.com"} + + +def _env(tmpdir: str) -> dict[str, str]: + return {**ENV, "OIDC_OP_SQLITE_PATH": os.path.join(tmpdir, "test.db")} + + +# -- create-invite ----------------------------------------------------------- + + +def test_create_invite_prints_registration_url() -> None: + """create-invite should print a URL containing /register/.""" + with tempfile.TemporaryDirectory() as tmpdir: + result = runner.invoke(app, ["create-invite", "testuser"], env=_env(tmpdir)) + assert result.exit_code == 0, result.output + assert "https://example.com/register/" in result.output + + +def test_create_invite_with_note() -> None: + """create-invite with --note should work.""" + with tempfile.TemporaryDirectory() as tmpdir: + result = runner.invoke(app, ["create-invite", "testuser", "--note", "Welcome aboard"], env=_env(tmpdir)) + assert result.exit_code == 0, result.output + assert "https://example.com/register/" in result.output + + +def test_create_invite_with_custom_ttl() -> None: + """create-invite with --ttl should use the custom TTL.""" + with tempfile.TemporaryDirectory() as tmpdir: + result = runner.invoke(app, ["create-invite", "testuser", "--ttl", "3600"], env=_env(tmpdir)) + assert result.exit_code == 0, result.output + assert "https://example.com/register/" in result.output + + +def test_create_invite_missing_username_shows_error() -> None: + """create-invite without a username should show an error.""" + result = runner.invoke(app, ["create-invite"]) + assert result.exit_code != 0 + + +# -- initial-admin ------------------------------------------------------------ + + +def test_initial_admin_creates_user_and_prints_url() -> None: + """initial-admin should create a user and print a registration URL.""" + with tempfile.TemporaryDirectory() as tmpdir: + result = runner.invoke(app, ["initial-admin", "admin"], env=_env(tmpdir)) + assert result.exit_code == 0, result.output + assert "https://example.com/register/" in result.output + + +def test_initial_admin_with_custom_groups() -> None: + """initial-admin with --group should assign those groups.""" + with tempfile.TemporaryDirectory() as tmpdir: + result = runner.invoke( + app, ["initial-admin", "admin", "--group", "admin", "--group", "superusers"], env=_env(tmpdir) + ) + assert result.exit_code == 0, result.output + assert "https://example.com/register/" in result.output + + +def test_initial_admin_duplicate_username_fails() -> None: + """initial-admin should fail if username already exists.""" + with tempfile.TemporaryDirectory() as tmpdir: + env = _env(tmpdir) + # Create the user first + result1 = runner.invoke(app, ["initial-admin", "admin"], env=env) + assert result1.exit_code == 0, result1.output + # Try again — should fail + result2 = runner.invoke(app, ["initial-admin", "admin"], env=env) + assert result2.exit_code != 0 + + +def test_initial_admin_missing_username_shows_error() -> None: + """initial-admin without a username should show an error.""" + result = runner.invoke(app, ["initial-admin"]) + assert result.exit_code != 0