Merge branch 'feature/cli-module'

This commit is contained in:
Johan Lundberg 2026-02-18 11:35:15 +01:00
commit 80960d5a1f
No known key found for this signature in database
GPG key ID: A6C152738D03C7D1
4 changed files with 204 additions and 4 deletions

View file

@ -75,9 +75,13 @@ async def register_magic_link(request: Request, token: str) -> Response:
if link is None:
return HTMLResponse("<p>Invalid or expired registration link.</p>", status_code=400)
userid = await generate_unique_userid(user_repo)
user = User(userid=userid, username=link.username, groups=["users"])
await user_repo.create(user)
existing_user = await user_repo.get_by_username(link.username)
if existing_user is not None:
user = existing_user
else:
userid = await generate_unique_userid(user_repo)
user = User(userid=userid, username=link.username, groups=["users"])
await user_repo.create(user)
await magic_link_service.mark_used(token)

74
src/porchlight/cli.py Normal file
View file

@ -0,0 +1,74 @@
import asyncio
from pathlib import Path
from typing import Annotated
import typer
from porchlight.config import Settings
from porchlight.invite.service import MagicLinkService
from porchlight.models import User
from porchlight.store.sqlite.db import open_db
from porchlight.store.sqlite.repositories import SQLiteMagicLinkRepository, SQLiteUserRepository
from porchlight.userid import generate_unique_userid
PACKAGE_DIR = Path(__file__).resolve().parent
MIGRATIONS_DIR = PACKAGE_DIR / "store" / "sqlite" / "migrations"
app = typer.Typer()
async def _create_invite(settings: Settings, username: str, ttl: int, note: str | None) -> str:
"""Create an invite link and return the full registration URL."""
async with open_db(settings.sqlite_path, MIGRATIONS_DIR) as db:
repo = SQLiteMagicLinkRepository(db)
service = MagicLinkService(repo, ttl=ttl)
link = await service.create(username=username, created_by="cli", note=note)
return f"{settings.issuer}/register/{link.token}"
async def _initial_admin(settings: Settings, username: str, groups: list[str]) -> str:
"""Create an admin user and invite link, return the full registration URL."""
async with open_db(settings.sqlite_path, MIGRATIONS_DIR) as db:
user_repo = SQLiteUserRepository(db)
existing = await user_repo.get_by_username(username)
if existing is not None:
raise typer.BadParameter(f"User '{username}' already exists.")
userid = await generate_unique_userid(user_repo)
user = User(userid=userid, username=username, groups=groups)
await user_repo.create(user)
magic_link_repo = SQLiteMagicLinkRepository(db)
service = MagicLinkService(magic_link_repo, ttl=settings.invite_ttl)
link = await service.create(username=username, created_by="cli", note="initial admin setup")
return f"{settings.issuer}/register/{link.token}"
@app.command()
def create_invite(
username: str,
ttl: Annotated[int | None, typer.Option(help="Link expiration in seconds")] = None,
note: Annotated[str | None, typer.Option(help="Optional note stored with the link")] = None,
) -> None:
"""Generate a magic link registration URL for a new user."""
settings = Settings() # type: ignore[call-arg]
effective_ttl = ttl if ttl is not None else settings.invite_ttl
url = asyncio.run(_create_invite(settings, username, effective_ttl, note))
typer.echo(url)
@app.command()
def initial_admin(
username: str,
group: Annotated[list[str] | None, typer.Option(help="Groups to assign (repeatable)")] = None,
) -> None:
"""Bootstrap the first admin user with a registration link."""
settings = Settings() # type: ignore[call-arg]
groups = group if group is not None else ["admin", "users"]
url = asyncio.run(_initial_admin(settings, username, groups))
typer.echo(url)
def main() -> None:
app()

View file

@ -2,7 +2,7 @@ from datetime import UTC, datetime, timedelta
from httpx import AsyncClient
from porchlight.models import MagicLink
from porchlight.models import MagicLink, User
async def test_register_invalid_token_returns_error_page(client: AsyncClient) -> None:
@ -70,3 +70,39 @@ async def test_register_used_token_returns_error(client: AsyncClient) -> None:
res = await client.get("/register/used", follow_redirects=False)
assert res.status_code == 400
async def test_register_existing_user_logs_in_and_redirects(client: AsyncClient) -> None:
"""When initial-admin creates a user, the invite link should log them in."""
app = client._transport.app # type: ignore[union-attr]
magic_link_repo = app.state.magic_link_repo
user_repo = app.state.user_repo
# Pre-create the user (as initial-admin would)
user = User(userid="lusab-bansen", username="admin", groups=["admin", "users"])
await user_repo.create(user)
# Create invite for the same username
await magic_link_repo.create(
MagicLink(
token="admin-setup",
username="admin",
expires_at=datetime.now(UTC) + timedelta(hours=1),
)
)
res = await client.get("/register/admin-setup", follow_redirects=False)
assert res.status_code in (302, 303)
assert "/manage/credentials" in res.headers["location"]
assert "setup=1" in res.headers["location"]
# Token should be marked used
link = await magic_link_repo.get_by_token("admin-setup")
assert link is not None
assert link.used is True
# Original user should still exist with original groups
existing = await user_repo.get_by_username("admin")
assert existing is not None
assert existing.userid == "lusab-bansen"
assert "admin" in existing.groups

86
tests/test_cli.py Normal file
View file

@ -0,0 +1,86 @@
import os
import tempfile
from typer.testing import CliRunner
from porchlight.cli import app
runner = CliRunner()
ENV = {"OIDC_OP_ISSUER": "https://example.com"}
def _env(tmpdir: str) -> dict[str, str]:
return {**ENV, "OIDC_OP_SQLITE_PATH": os.path.join(tmpdir, "test.db")}
# -- create-invite -----------------------------------------------------------
def test_create_invite_prints_registration_url() -> None:
"""create-invite should print a URL containing /register/."""
with tempfile.TemporaryDirectory() as tmpdir:
result = runner.invoke(app, ["create-invite", "testuser"], env=_env(tmpdir))
assert result.exit_code == 0, result.output
assert "https://example.com/register/" in result.output
def test_create_invite_with_note() -> None:
"""create-invite with --note should work."""
with tempfile.TemporaryDirectory() as tmpdir:
result = runner.invoke(app, ["create-invite", "testuser", "--note", "Welcome aboard"], env=_env(tmpdir))
assert result.exit_code == 0, result.output
assert "https://example.com/register/" in result.output
def test_create_invite_with_custom_ttl() -> None:
"""create-invite with --ttl should use the custom TTL."""
with tempfile.TemporaryDirectory() as tmpdir:
result = runner.invoke(app, ["create-invite", "testuser", "--ttl", "3600"], env=_env(tmpdir))
assert result.exit_code == 0, result.output
assert "https://example.com/register/" in result.output
def test_create_invite_missing_username_shows_error() -> None:
"""create-invite without a username should show an error."""
result = runner.invoke(app, ["create-invite"])
assert result.exit_code != 0
# -- initial-admin ------------------------------------------------------------
def test_initial_admin_creates_user_and_prints_url() -> None:
"""initial-admin should create a user and print a registration URL."""
with tempfile.TemporaryDirectory() as tmpdir:
result = runner.invoke(app, ["initial-admin", "admin"], env=_env(tmpdir))
assert result.exit_code == 0, result.output
assert "https://example.com/register/" in result.output
def test_initial_admin_with_custom_groups() -> None:
"""initial-admin with --group should assign those groups."""
with tempfile.TemporaryDirectory() as tmpdir:
result = runner.invoke(
app, ["initial-admin", "admin", "--group", "admin", "--group", "superusers"], env=_env(tmpdir)
)
assert result.exit_code == 0, result.output
assert "https://example.com/register/" in result.output
def test_initial_admin_duplicate_username_fails() -> None:
"""initial-admin should fail if username already exists."""
with tempfile.TemporaryDirectory() as tmpdir:
env = _env(tmpdir)
# Create the user first
result1 = runner.invoke(app, ["initial-admin", "admin"], env=env)
assert result1.exit_code == 0, result1.output
# Try again — should fail
result2 = runner.invoke(app, ["initial-admin", "admin"], env=env)
assert result2.exit_code != 0
def test_initial_admin_missing_username_shows_error() -> None:
"""initial-admin without a username should show an error."""
result = runner.invoke(app, ["initial-admin"])
assert result.exit_code != 0