Compare commits

..

No commits in common. "01e3382aafe5e762cdce5e9331b8ac4b93621134" and "2f8cca3f4142af2fa092d78e31de788b63957dd3" have entirely different histories.

35 changed files with 266 additions and 1109 deletions

View file

@ -1,61 +0,0 @@
# Set the maximum line length to 120.
line-length = 120
target-version = "py313"
[lint]
select = [
"A",
"ANN",
"ASYNC",
"B",
"C4",
"DTZ",
"E",
"ERA",
"F",
"FAST",
"FLY",
"FURB",
"I",
"ISC",
"PERF",
"PGH",
"PIE",
"PL",
"PT",
"RUF",
"SIM",
"UP",
"W",
]
ignore = [
"SIM102", # collapsible-if
"SIM103", # return-bool-condition-directly (keeping explicit if/else for clarity)
"SIM108", # if-else-block-instead-of-if-exp
# Since we use ruff as a formatter, the following rules should be ignored
# See: https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
"W191",
"E111",
"E114",
"E117",
"E501",
"D206",
"D300",
"Q000",
"Q001",
"Q002",
"Q003",
"COM812",
"COM819",
"ISC002",
]
allowed-confusables = ["", "х"]
[lint.flake8-annotations]
allow-star-arg-any = true
[lint.per-file-ignores]
"**/test_*.py" = ["PLR2004"] # magic-value-comparison - allow magic numbers in tests

View file

@ -1,5 +1,4 @@
from base64 import urlsafe_b64decode
from typing import Annotated
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, RedirectResponse
@ -7,7 +6,7 @@ from pydantic import ValidationError
from porchlight.dependencies import get_session_user
from porchlight.models import User
from porchlight.validation import GroupListInput, ProfileUpdate, UsernameInput, format_validation_errors
from porchlight.validation import ProfileUpdate
router = APIRouter(prefix="/admin", tags=["admin"])
@ -100,7 +99,7 @@ async def user_detail(request: Request, userid: str) -> Response:
@router.post("/invite", response_class=HTMLResponse)
async def create_invite(
request: Request,
username: Annotated[str, Form()],
username: str = Form(),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@ -110,19 +109,17 @@ async def create_invite(
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
try:
validated = UsernameInput(username=username)
except ValidationError as exc:
return HTMLResponse(format_validation_errors(exc))
username = username.strip()
if not username:
return HTMLResponse('<div role="alert">Username is required</div>')
magic_link_service = request.app.state.magic_link_service
settings = request.app.state.settings
link = await magic_link_service.create(username=validated.username, created_by=admin.username, note="admin invite")
link = await magic_link_service.create(username=username, created_by=admin.username, note="admin invite")
url = f"{settings.issuer}/register/{link.token}"
return HTMLResponse(
f'<div role="status">Invite created for <strong>{validated.username}</strong>:</div>'
f'<div class="invite-url">{url}</div>'
f'<div role="status">Invite created for <strong>{username}</strong>:</div><div class="invite-url">{url}</div>'
)
@ -131,6 +128,13 @@ async def create_invite(
async def update_user_profile(
request: Request,
userid: str,
given_name: str = Form(""),
family_name: str = Form(""),
preferred_username: str = Form(""),
email: str = Form(""),
phone_number: str = Form(""),
picture: str = Form(""),
locale: str = Form(""),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@ -140,19 +144,32 @@ async def update_user_profile(
return HTMLResponse("Forbidden", status_code=403)
# Validate
form = await request.form()
try:
profile = ProfileUpdate(
given_name=str(form.get("given_name", "")),
family_name=str(form.get("family_name", "")),
preferred_username=str(form.get("preferred_username", "")),
email=str(form.get("email", "")),
phone_number=str(form.get("phone_number", "")),
picture=str(form.get("picture", "")),
locale=str(form.get("locale", "")),
given_name=given_name,
family_name=family_name,
preferred_username=preferred_username,
email=email,
phone_number=phone_number,
picture=picture,
locale=locale,
)
except ValidationError as exc:
return HTMLResponse(format_validation_errors(exc))
error = exc.errors()[0]
field = error["loc"][-1] if error["loc"] else "input"
msg = error["msg"]
labels = {
"given_name": "Given name",
"family_name": "Family name",
"preferred_username": "Display name",
"email": "Email",
"phone_number": "Phone number",
"picture": "Picture URL",
"locale": "Locale",
}
label = labels.get(str(field), str(field))
display_msg = msg.removeprefix("Value error, ") if error["type"] == "value_error" else f"{label}: {msg}"
return HTMLResponse(f'<div role="alert">{display_msg}</div>')
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
@ -179,7 +196,7 @@ async def update_user_profile(
async def update_user_groups(
request: Request,
userid: str,
groups: Annotated[str, Form()] = "",
groups: str = Form(""),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@ -188,17 +205,13 @@ async def update_user_groups(
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
try:
validated = GroupListInput(groups=groups)
except ValidationError as exc:
return HTMLResponse(format_validation_errors(exc))
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
if user is None:
return HTMLResponse("User not found", status_code=404)
updated = user.model_copy(update={"groups": validated.group_list})
group_list = [g.strip() for g in groups.split(",") if g.strip()]
updated = user.model_copy(update={"groups": group_list})
await user_repo.update(updated)
return HTMLResponse('<div role="status">Groups updated</div>')
@ -340,7 +353,7 @@ async def delete_user(request: Request, userid: str) -> Response:
return HTMLResponse("Forbidden", status_code=403)
# Prevent self-deletion
admin_userid, _ = session_user
admin_userid, _ = get_session_user(request)
if userid == admin_userid:
return HTMLResponse('<div role="alert">Cannot delete your own account</div>')

View file

@ -8,11 +8,8 @@ from urllib.parse import urlparse
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from slowapi.errors import RateLimitExceeded
from starlette.middleware.sessions import SessionMiddleware
from starlette.requests import Request
from starlette.responses import HTMLResponse as StarletteHTMLResponse
from starlette.responses import Response
from porchlight.admin.routes import router as admin_router
from porchlight.authn.password import PasswordService
@ -24,7 +21,6 @@ from porchlight.invite.service import MagicLinkService
from porchlight.manage.routes import router as manage_router
from porchlight.oidc.endpoints import router as oidc_router
from porchlight.oidc.provider import create_oidc_server
from porchlight.rate_limit import limiter
from porchlight.store.sqlite.db import open_db
from porchlight.store.sqlite.repositories import (
SQLiteConsentRepository,
@ -127,16 +123,6 @@ def create_app(settings: Settings | None = None) -> FastAPI:
https_only=settings.session_https_only,
)
# Rate limiting
app.state.limiter = limiter
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded) -> StarletteHTMLResponse:
return StarletteHTMLResponse(
'<div role="alert">Too many attempts. Please try again later.</div>',
status_code=429,
)
# Templates
templates = Jinja2Templates(directory=str(PACKAGE_DIR / "templates"))
@ -161,7 +147,7 @@ def create_app(settings: Settings | None = None) -> FastAPI:
return {"status": "ok"}
@app.get("/")
async def landing(request: Request) -> Response:
async def landing(request: Request): # type: ignore[no-untyped-def]
return templates.TemplateResponse(request, "index.html")
return app

View file

@ -1,12 +1,10 @@
from base64 import urlsafe_b64decode
from typing import Annotated
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fido2.webauthn import AttestedCredentialData, AuthenticationResponse
from porchlight.models import User
from porchlight.rate_limit import limiter
from porchlight.userid import generate_unique_userid
router = APIRouter(tags=["authn"])
@ -30,11 +28,10 @@ async def login_page(request: Request) -> HTMLResponse:
@router.post("/login/password", response_class=HTMLResponse)
@limiter.limit("5/minute")
async def login_password(
request: Request,
username: Annotated[str, Form()],
password: Annotated[str, Form()],
username: str = Form(),
password: str = Form(),
) -> Response:
user_repo = request.app.state.user_repo
cred_repo = request.app.state.credential_repo
@ -53,9 +50,6 @@ async def login_password(
if not password_service.verify(credential.password_hash, password):
return HTMLResponse(error_html)
if not user.active:
return HTMLResponse(error_html)
request.session["userid"] = user.userid
request.session["username"] = user.username
@ -83,8 +77,6 @@ async def register_magic_link(request: Request, token: str) -> Response:
existing_user = await user_repo.get_by_username(link.username)
if existing_user is not None:
if not existing_user.active:
return HTMLResponse("<p>This account has been deactivated.</p>", status_code=400)
user = existing_user
else:
userid = await generate_unique_userid(user_repo)
@ -110,7 +102,6 @@ async def login_webauthn_begin(request: Request) -> Response:
@router.post("/login/webauthn/complete")
@limiter.limit("10/minute")
async def login_webauthn_complete(request: Request) -> Response:
webauthn_service = request.app.state.webauthn_service
user_repo = request.app.state.user_repo
@ -153,8 +144,8 @@ async def login_webauthn_complete(request: Request) -> Response:
await cred_repo.update_webauthn(stored)
user = await user_repo.get_by_userid(userid)
if user is None or not user.active:
return JSONResponse({"error": "Authentication failed"}, status_code=400)
if user is None:
return JSONResponse({"error": "User not found"}, status_code=400)
request.session["userid"] = user.userid
request.session["username"] = user.username

View file

@ -61,7 +61,7 @@ class CSRFMiddleware:
# Origin check (defense-in-depth)
if self.check_origin is not None:
origin = request.headers.get("origin")
if origin is not None and origin not in ("null", self.check_origin):
if origin is not None and origin != "null" and origin != self.check_origin:
logger.warning("CSRF origin mismatch: expected %s, got %s", self.check_origin, origin)
response = HTMLResponse(
"<h1>403 Forbidden</h1><p>Origin mismatch</p>",

View file

@ -1,5 +1,4 @@
from base64 import urlsafe_b64decode
from typing import Annotated
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
@ -8,7 +7,7 @@ from pydantic import ValidationError
from porchlight.dependencies import get_session_user
from porchlight.models import PasswordCredential, WebAuthnCredential
from porchlight.validation import PasswordChange, PasswordSet, ProfileUpdate, format_validation_errors
from porchlight.validation import ProfileUpdate
router = APIRouter(prefix="/manage", tags=["manage"])
@ -54,9 +53,8 @@ async def credentials_page(request: Request) -> Response:
@router.post("/credentials/password", response_class=HTMLResponse)
async def set_password(
request: Request,
password: Annotated[str, Form()],
confirm: Annotated[str, Form()],
current_password: Annotated[str, Form()] = "",
password: str = Form(),
confirm: str = Form(),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@ -66,30 +64,15 @@ async def set_password(
cred_repo = request.app.state.credential_repo
password_service = request.app.state.password_service
if password != confirm:
return HTMLResponse('<div role="alert">Passwords do not match</div>')
if len(password) < 8:
return HTMLResponse('<div role="alert">Password must be at least 8 characters</div>')
password_hash = password_service.hash(password)
existing = await cred_repo.get_password_by_user(userid)
has_password = existing is not None
# Validate input
try:
if has_password:
validated = PasswordChange(
current_password=current_password,
password=password,
confirm=confirm,
)
else:
validated = PasswordSet(password=password, confirm=confirm)
except ValidationError as exc:
return HTMLResponse(format_validation_errors(exc))
# Verify current password if changing
if has_password and isinstance(validated, PasswordChange):
if not password_service.verify(existing.password_hash, validated.current_password):
return HTMLResponse('<div role="alert">Current password is incorrect</div>')
# Store new password
password_hash = password_service.hash(validated.password)
if existing is not None:
await cred_repo.delete_password(userid)
@ -217,6 +200,13 @@ async def profile_page(request: Request) -> Response:
@router.post("/profile", response_class=HTMLResponse)
async def update_profile(
request: Request,
given_name: str = Form(""),
family_name: str = Form(""),
preferred_username: str = Form(""),
email: str = Form(""),
phone_number: str = Form(""),
picture: str = Form(""),
locale: str = Form(""),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@ -224,19 +214,34 @@ async def update_profile(
userid, _username = session_user
form = await request.form()
try:
profile = ProfileUpdate(
given_name=str(form.get("given_name", "")),
family_name=str(form.get("family_name", "")),
preferred_username=str(form.get("preferred_username", "")),
email=str(form.get("email", "")),
phone_number=str(form.get("phone_number", "")),
picture=str(form.get("picture", "")),
locale=str(form.get("locale", "")),
given_name=given_name,
family_name=family_name,
preferred_username=preferred_username,
email=email,
phone_number=phone_number,
picture=picture,
locale=locale,
)
except ValidationError as exc:
return HTMLResponse(format_validation_errors(exc))
error = exc.errors()[0]
field = error["loc"][-1] if error["loc"] else "input"
msg = error["msg"]
# Produce user-friendly field labels
labels = {
"given_name": "Given name",
"family_name": "Family name",
"preferred_username": "Display name",
"email": "Email",
"phone_number": "Phone number",
"picture": "Picture URL",
"locale": "Locale",
}
label = labels.get(str(field), str(field))
# Use custom message for value errors (e.g. picture URL), generic pydantic message otherwise
display_msg = msg.removeprefix("Value error, ") if error["type"] == "value_error" else f"{label}: {msg}"
return HTMLResponse(f'<div role="alert">{display_msg}</div>')
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)

View file

@ -1,7 +1,5 @@
"""OIDC claims mapping and UserInfo source."""
from typing import Any
from idpyoidc.server.user_info import UserInfo
from porchlight.models import User
@ -30,7 +28,9 @@ def user_to_claims(user: User) -> dict:
"locale": user.locale,
}
claims.update({claim_name: value for claim_name, value in optional_fields.items() if value is not None})
for claim_name, value in optional_fields.items():
if value is not None:
claims[claim_name] = value
# updated_at as Unix timestamp (OIDC spec requires number)
if user.updated_at:
@ -46,7 +46,7 @@ class PorchlightUserInfo(UserInfo):
idpyoidc calls __call__() synchronously to look up claims.
"""
def __init__(self, **kwargs: Any) -> None:
def __init__(self, **kwargs) -> None:
super().__init__(db={}, **kwargs)
def set_user_claims(self, user_id: str, claims: dict) -> None:

View file

@ -106,7 +106,7 @@ async def authorization_complete(request: Request) -> Response:
)
async def _check_consent_or_complete( # noqa: PLR0913
async def _check_consent_or_complete(
request: Request,
oidc_server: object,
endpoint: object,
@ -137,7 +137,7 @@ async def _check_consent_or_complete( # noqa: PLR0913
return RedirectResponse("/consent", status_code=303)
async def _complete_authorization( # noqa: PLR0913
async def _complete_authorization(
request: Request,
oidc_server: object,
endpoint: object,
@ -332,10 +332,11 @@ async def consent_submit(request: Request) -> Response:
redirect_uri = auth_params.get("redirect_uri", "")
state = auth_params.get("state", "")
if action == "deny":
params = urlencode({"error": "access_denied", "state": state})
return RedirectResponse(f"{redirect_uri}?{params}", status_code=303)
if action != "allow":
if action == "deny":
params = urlencode({"error": "access_denied", "state": state})
return RedirectResponse(f"{redirect_uri}?{params}", status_code=303)
return HTMLResponse("<h1>Error</h1><p>Invalid action</p>", status_code=400)
# Allow — collect approved scopes
@ -356,9 +357,11 @@ async def consent_submit(request: Request) -> Response:
try:
parsed = endpoint.parse_request(auth_params)
if "error" in parsed:
raise ValueError(parsed.get("error_description", parsed["error"]))
except Exception as exc:
return HTMLResponse(f"<h1>Error</h1><p>{exc}</p>", status_code=400)
if "error" in parsed:
error_desc = parsed.get("error_description", parsed["error"])
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username)

View file

@ -1,4 +0,0 @@
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)

View file

@ -1,13 +1,11 @@
from pathlib import Path
import aiosqlite
import anyio
async def run_migrations(db: aiosqlite.Connection, migrations_dir: Path) -> int:
"""Apply unapplied SQL migration files in order. Returns count of newly applied migrations."""
async_dir = anyio.Path(migrations_dir)
if not await async_dir.is_dir():
if not migrations_dir.is_dir():
raise FileNotFoundError(f"Migrations directory not found: {migrations_dir}")
await db.execute(
@ -24,22 +22,19 @@ async def run_migrations(db: aiosqlite.Connection, migrations_dir: Path) -> int:
async with db.execute("SELECT filename FROM _migrations") as cursor:
applied = {row[0] async for row in cursor}
migration_files = sorted(
[f async for f in async_dir.iterdir() if f.suffix == ".sql"],
key=lambda f: f.name,
)
migration_files = sorted(migrations_dir.glob("*.sql"))
count = 0
for migration_file in migration_files:
if migration_file.name in applied:
continue
sql = await migration_file.read_text(encoding="utf-8")
sql = migration_file.read_text(encoding="utf-8")
await db.execute("BEGIN")
try:
for statement in sql.split(";"):
cleaned = statement.strip()
if cleaned:
await db.execute(cleaned)
statement = statement.strip()
if statement:
await db.execute(statement)
await db.execute(
"INSERT INTO _migrations (filename) VALUES (?)",
(migration_file.name,),

View file

@ -11,7 +11,6 @@
<section>
<h2>Profile</h2>
<form hx-post="/admin/users/{{ target_user.userid }}/profile" hx-target="#profile-status" hx-swap="innerHTML">
<input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">
<div>
<label for="given_name">Given name</label>
<input type="text" id="given_name" name="given_name" value="{{ target_user.given_name or '' }}" maxlength="255">
@ -49,7 +48,6 @@
<h2>Groups</h2>
<div id="groups-section">
<form hx-post="/admin/users/{{ target_user.userid }}/groups" hx-target="#groups-status" hx-swap="innerHTML">
<input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">
<div id="group-list">
{% for group in target_user.groups %}
<span class="group-tag">{{ group }}</span>

View file

@ -8,11 +8,8 @@
<section>
<h2>Create invite</h2>
<form hx-post="/admin/invite" hx-target="#invite-status" hx-swap="innerHTML">
<input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">
<div class="admin-search">
<input type="text" name="username" placeholder="Username or email for new invite" required
maxlength="255" pattern="[a-zA-Z0-9_.@-]+"
title="Letters, digits, dots, hyphens, underscores, and @">
<input type="text" name="username" placeholder="Username for new invite" required>
<button type="submit">Create invite</button>
</div>
</form>

View file

@ -40,19 +40,13 @@
{% endif %}
<form hx-post="/manage/credentials/password" hx-target="#password-section" hx-swap="innerHTML">
<input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">
{% if has_password %}
<div>
<label for="current_password">Current password</label>
<input type="password" id="current_password" name="current_password" required autocomplete="current-password">
</div>
{% endif %}
<div>
<label for="password">{{ "New password" if has_password else "Set password" }}</label>
<input type="password" id="password" name="password" required minlength="8" maxlength="256" autocomplete="new-password">
<input type="password" id="password" name="password" required minlength="8" autocomplete="new-password">
</div>
<div>
<label for="confirm">Confirm password</label>
<input type="password" id="confirm" name="confirm" required minlength="8" maxlength="256" autocomplete="new-password">
<input type="password" id="confirm" name="confirm" required minlength="8" autocomplete="new-password">
</div>
<button type="submit">{{ "Change password" if has_password else "Set password" }}</button>
</form>

View file

@ -1,10 +1,8 @@
import re
from typing import Annotated
from urllib.parse import urlparse
from pydantic import BaseModel, EmailStr, Field, ValidationError, field_validator, model_validator
from pydantic import BaseModel, EmailStr, Field, field_validator
from pydantic_extra_types.phone_numbers import PhoneNumberValidator
from zxcvbn import zxcvbn
E164Phone = Annotated[str, PhoneNumberValidator(number_format="E164")]
@ -42,134 +40,3 @@ class ProfileUpdate(BaseModel):
if parsed.scheme not in ("http", "https") or not parsed.netloc:
raise ValueError("Picture URL must be a valid HTTP or HTTPS URL")
return v
@field_validator("locale", mode="before")
@classmethod
def validate_locale(cls, v: str) -> str:
if isinstance(v, str):
v = v.strip()
if v == "":
return ""
if not re.match(r"^[a-z]{2,3}(-[A-Z][a-z]{3})?(-[A-Z]{2})?$", v):
raise ValueError("Locale must be a valid BCP 47 language tag (e.g. en, sv-SE, zh-Hans-CN)")
return v
class UsernameInput(BaseModel):
username: str = Field(max_length=255)
@field_validator("username", mode="before")
@classmethod
def validate_username(cls, v: str) -> str:
if isinstance(v, str):
v = v.strip()
if not v:
raise ValueError("Username is required")
if not re.match(r"^[a-zA-Z0-9_.@-]+$", v):
raise ValueError("Username may only contain letters, digits, dots, hyphens, underscores, and @")
return v
class GroupListInput(BaseModel):
groups: str = ""
@property
def group_list(self) -> list[str]:
"""Parse comma-separated groups into a deduplicated list."""
seen: set[str] = set()
result: list[str] = []
for g in (g.strip() for g in self.groups.split(",") if g.strip()):
if g not in seen:
seen.add(g)
result.append(g)
return result
@field_validator("groups", mode="before")
@classmethod
def validate_groups(cls, v: str) -> str:
if isinstance(v, str):
names = [g.strip() for g in v.split(",") if g.strip()]
for name in names:
if not re.match(r"^[a-z0-9_-]{1,64}$", name):
raise ValueError(
f"Invalid group name '{name}'. "
"Groups must be 1-64 lowercase letters, digits, hyphens, or underscores."
)
return v
MIN_PASSWORD_STRENGTH = 2
class PasswordSet(BaseModel):
password: str = Field(min_length=8, max_length=256)
confirm: str
@model_validator(mode="after")
def validate_password(self) -> "PasswordSet":
if self.password != self.confirm:
raise ValueError("Passwords do not match")
result = zxcvbn(self.password)
if result["score"] < MIN_PASSWORD_STRENGTH:
feedback = result.get("feedback", {})
warning = feedback.get("warning", "")
suggestions = feedback.get("suggestions", [])
msg = "Password is too easily guessed."
if warning:
msg += f" {warning}."
if suggestions:
msg += " " + " ".join(suggestions)
raise ValueError(msg)
return self
class PasswordChange(PasswordSet):
current_password: str
@field_validator("current_password", mode="before")
@classmethod
def validate_current_password(cls, v: str) -> str:
if isinstance(v, str) and v.strip() == "":
raise ValueError("Current password is required")
return v
FIELD_LABELS: dict[str, str] = {
"given_name": "Given name",
"family_name": "Family name",
"preferred_username": "Display name",
"email": "Email",
"phone_number": "Phone number",
"picture": "Picture URL",
"locale": "Locale",
"username": "Username",
"groups": "Groups",
"password": "Password",
"confirm": "Confirm password",
"current_password": "Current password",
}
def format_validation_errors(exc: ValidationError) -> str:
"""Format Pydantic ValidationError into user-friendly HTML."""
messages: list[str] = []
for error in exc.errors():
field = str(error["loc"][-1]) if error["loc"] else "input"
label = FIELD_LABELS.get(field, field)
msg = error["msg"]
if error["type"] == "value_error":
raw = msg.removeprefix("Value error, ")
# If the message already starts with the label, don't duplicate it
if raw.startswith(label):
display_msg = raw
else:
display_msg = f"{label}: {raw}"
else:
display_msg = f"{label}: {msg}"
messages.append(display_msg)
if len(messages) == 1:
return f'<div role="alert">{messages[0]}</div>'
items = "".join(f"<li>{m}</li>" for m in messages)
return f'<div role="alert"><ul>{items}</ul></div>'

View file

@ -6,7 +6,6 @@ from httpx import ASGITransport, AsyncClient
from porchlight.app import create_app
from porchlight.config import Settings
from porchlight.rate_limit import limiter
@pytest.fixture
@ -22,12 +21,6 @@ async def client(settings: Settings) -> AsyncIterator[AsyncClient]:
yield ac
@pytest.fixture(autouse=True)
def _reset_rate_limiter() -> None:
"""Reset the rate limiter storage before each test."""
limiter.reset()
async def get_csrf_token(client: AsyncClient) -> str:
"""Get a CSRF token by visiting the login page.

View file

@ -24,107 +24,6 @@ from porchlight.store.sqlite.repositories import (
)
async def _create_user_with_password(
user_repo: SQLiteUserRepository,
cred_repo: SQLiteCredentialRepository,
password_service: PasswordService,
user: User,
password: str,
) -> None:
"""Helper to create a user and set their password credential."""
await user_repo.create(user)
password_hash = password_service.hash(password)
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=password_hash))
async def _seed_test_users(
user_repo: SQLiteUserRepository,
cred_repo: SQLiteCredentialRepository,
password_service: PasswordService,
result: dict[str, str],
) -> None:
"""Create all test users with passwords."""
# Login test user
await _create_user_with_password(
user_repo,
cred_repo,
password_service,
User(userid="test-user-01", username="testuser", groups=["users"]),
"testpassword123",
)
result["login_username"] = "testuser"
result["login_password"] = "testpassword123"
# Credentials management test user
await _create_user_with_password(
user_repo,
cred_repo,
password_service,
User(userid="test-user-02", username="creduser", groups=["users"]),
"credpassword123",
)
result["cred_username"] = "creduser"
result["cred_password"] = "credpassword123"
# WebAuthn registration test user
await _create_user_with_password(
user_repo,
cred_repo,
password_service,
User(userid="test-user-03", username="webauthnuser", groups=["users"]),
"webauthnpass123",
)
result["webauthn_username"] = "webauthnuser"
result["webauthn_password"] = "webauthnpass123"
result["webauthn_userid"] = "test-user-03"
# Profile management test user
await _create_user_with_password(
user_repo,
cred_repo,
password_service,
User(
userid="test-user-04",
username="profileuser",
given_name="Alice",
family_name="Smith",
preferred_username="asmith",
email="alice@example.com",
phone_number="+12025551234",
picture="https://example.com/alice.jpg",
locale="en",
groups=["users"],
),
"profilepass123",
)
result["profile_username"] = "profileuser"
result["profile_password"] = "profilepass123"
# Admin user for admin page tests
await _create_user_with_password(
user_repo,
cred_repo,
password_service,
User(
userid="test-user-05",
username="adminuser",
given_name="Admin",
family_name="User",
email="admin@example.com",
groups=["admin", "users"],
),
"adminpass123",
)
result["admin_username"] = "adminuser"
result["admin_password"] = "adminpass123"
result["admin_userid"] = "test-user-05"
# Disposable user for admin delete test
await user_repo.create(User(userid="test-user-06", username="disposableuser", groups=["users"]))
result["disposable_userid"] = "test-user-06"
result["disposable_username"] = "disposableuser"
async def seed() -> None:
db_path = os.environ.get("OIDC_OP_SQLITE_PATH")
if not db_path:
@ -140,21 +39,89 @@ async def seed() -> None:
password_service = PasswordService()
magic_link_service = MagicLinkService(repo=magic_link_repo)
result: dict[str, str] = {}
result = {}
# Create magic link for registration test
# 1. Create a magic link for registration test
link = await magic_link_service.create(username="newuser")
result["register_token"] = link.token
result["register_username"] = "newuser"
# Create all test users
await _seed_test_users(user_repo, cred_repo, password_service, result)
# 2. Create a user with a password for login test
user = User(userid="test-user-01", username="testuser", groups=["users"])
await user_repo.create(user)
password_hash = password_service.hash("testpassword123")
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=password_hash))
result["login_username"] = "testuser"
result["login_password"] = "testpassword123"
# Create an expired/used magic link for negative test
# 3. Create a separate user for credentials management test
cred_user = User(userid="test-user-02", username="creduser", groups=["users"])
await user_repo.create(cred_user)
cred_password_hash = password_service.hash("credpassword123")
await cred_repo.create_password(PasswordCredential(user_id=cred_user.userid, password_hash=cred_password_hash))
result["cred_username"] = "creduser"
result["cred_password"] = "credpassword123"
# 5. Create a user with password for WebAuthn registration tests
# (login with password first, then register a passkey)
webauthn_user = User(userid="test-user-03", username="webauthnuser", groups=["users"])
await user_repo.create(webauthn_user)
webauthn_password_hash = password_service.hash("webauthnpass123")
await cred_repo.create_password(
PasswordCredential(user_id=webauthn_user.userid, password_hash=webauthn_password_hash)
)
result["webauthn_username"] = "webauthnuser"
result["webauthn_password"] = "webauthnpass123"
result["webauthn_userid"] = "test-user-03"
# 4. Create an expired/used magic link for negative test
expired_link = await magic_link_service.create(username="expired")
await magic_link_service.mark_used(expired_link.token)
result["used_token"] = expired_link.token
# 5. Create a user with profile data for profile management tests
profile_user = User(
userid="test-user-04",
username="profileuser",
given_name="Alice",
family_name="Smith",
preferred_username="asmith",
email="alice@example.com",
phone_number="+12025551234",
picture="https://example.com/alice.jpg",
locale="en",
groups=["users"],
)
await user_repo.create(profile_user)
profile_password_hash = password_service.hash("profilepass123")
await cred_repo.create_password(
PasswordCredential(user_id=profile_user.userid, password_hash=profile_password_hash)
)
result["profile_username"] = "profileuser"
result["profile_password"] = "profilepass123"
# 6. Admin user for admin page tests
admin_user = User(
userid="test-user-05",
username="adminuser",
given_name="Admin",
family_name="User",
email="admin@example.com",
groups=["admin", "users"],
)
await user_repo.create(admin_user)
admin_password_hash = password_service.hash("adminpass123")
await cred_repo.create_password(PasswordCredential(user_id=admin_user.userid, password_hash=admin_password_hash))
result["admin_username"] = "adminuser"
result["admin_password"] = "adminpass123"
result["admin_userid"] = "test-user-05"
# 7. Disposable user for admin delete test (not used by any other tests)
disposable_user = User(userid="test-user-06", username="disposableuser", groups=["users"])
await user_repo.create(disposable_user)
result["disposable_userid"] = "test-user-06"
result["disposable_username"] = "disposableuser"
await db.commit()
await db.close()
print(json.dumps(result))

View file

@ -1,4 +1,3 @@
from base64 import urlsafe_b64encode
from datetime import UTC, datetime
import pytest
@ -47,7 +46,7 @@ async def _login(
)
async def _create_target_user( # noqa: PLR0913
async def _create_target_user(
client: AsyncClient,
*,
userid: str = "target-user-01",
@ -366,6 +365,8 @@ async def test_delete_webauthn_credential(client: AsyncClient) -> None:
)
# URL uses base64url without padding
from base64 import urlsafe_b64encode
credential_id_b64 = urlsafe_b64encode(credential_id).decode().rstrip("=")
token = await get_csrf_token(client)

View file

@ -1,85 +0,0 @@
from datetime import UTC, datetime
import pytest
from httpx import AsyncClient
from porchlight.authn.password import PasswordHasher, PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
async def _setup_admin_and_target(client: AsyncClient) -> tuple[str, str]:
"""Create admin + target user, login as admin, return (token, target_userid)."""
app = client._transport.app
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
admin = User(
userid="admin-g01",
username="admin_g",
groups=["admin", "users"],
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(admin)
target = User(
userid="target-g01",
username="target_g",
groups=["users"],
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(target)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=admin.userid, password_hash=svc.hash("AdminPass123!")))
token = await get_csrf_token(client)
await client.post(
"/login/password",
data={"username": "admin_g", "password": "AdminPass123!"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
return token, target.userid
@pytest.mark.asyncio
async def test_valid_groups(client: AsyncClient) -> None:
token, userid = await _setup_admin_and_target(client)
response = await client.post(
f"/admin/users/{userid}/groups",
data={"groups": "users, staff"},
headers={"X-CSRF-Token": token},
)
assert "Groups updated" in response.text
app = client._transport.app
user = await app.state.user_repo.get_by_userid(userid)
assert sorted(user.groups) == ["staff", "users"]
@pytest.mark.asyncio
async def test_invalid_group_name_rejected(client: AsyncClient) -> None:
token, userid = await _setup_admin_and_target(client)
response = await client.post(
f"/admin/users/{userid}/groups",
data={"groups": "users, Bad Group!"},
headers={"X-CSRF-Token": token},
)
assert "alert" in response.text
@pytest.mark.asyncio
async def test_empty_groups_clears(client: AsyncClient) -> None:
token, userid = await _setup_admin_and_target(client)
response = await client.post(
f"/admin/users/{userid}/groups",
data={"groups": ""},
headers={"X-CSRF-Token": token},
)
assert "Groups updated" in response.text
app = client._transport.app
user = await app.state.user_repo.get_by_userid(userid)
assert user.groups == []

View file

@ -1,71 +0,0 @@
from datetime import UTC, datetime
import pytest
from httpx import AsyncClient
from porchlight.authn.password import PasswordHasher, PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
async def _login_admin(client: AsyncClient) -> str:
"""Create and login as admin user, return CSRF token."""
app = client._transport.app
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = User(
userid="admin-01",
username="admin",
groups=["admin", "users"],
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("AdminPass123!")))
token = await get_csrf_token(client)
await client.post(
"/login/password",
data={"username": "admin", "password": "AdminPass123!"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
return token
@pytest.mark.asyncio
async def test_invite_valid_username(client: AsyncClient) -> None:
token = await _login_admin(client)
response = await client.post(
"/admin/invite",
data={"username": "newuser@example.com"},
headers={"X-CSRF-Token": token},
)
assert response.status_code == 200
assert "Invite created" in response.text
@pytest.mark.asyncio
async def test_invite_empty_username_rejected(client: AsyncClient) -> None:
token = await _login_admin(client)
response = await client.post(
"/admin/invite",
data={"username": ""},
headers={"X-CSRF-Token": token},
)
# Empty username is rejected — either by FastAPI (422) or validation (alert)
assert response.status_code == 422 or "alert" in response.text
@pytest.mark.asyncio
async def test_invite_invalid_username_rejected(client: AsyncClient) -> None:
token = await _login_admin(client)
response = await client.post(
"/admin/invite",
data={"username": "bad user<script>"},
headers={"X-CSRF-Token": token},
)
assert "alert" in response.text
assert "letters" in response.text.lower() or "Username" in response.text

View file

@ -1,18 +1,5 @@
from unittest.mock import MagicMock
from httpx import AsyncClient
from porchlight.dependencies import (
get_credential_repo,
get_magic_link_repo,
get_user_repo,
)
from porchlight.store.protocols import (
CredentialRepository,
MagicLinkRepository,
UserRepository,
)
async def test_health_endpoint(client: AsyncClient) -> None:
response = await client.get("/health")
@ -29,6 +16,12 @@ async def test_app_has_title(client: AsyncClient) -> None:
async def test_app_has_repos_on_state(client: AsyncClient) -> None:
from porchlight.store.protocols import (
CredentialRepository,
MagicLinkRepository,
UserRepository,
)
app = client._transport.app # type: ignore[union-attr]
assert isinstance(app.state.user_repo, UserRepository)
assert isinstance(app.state.credential_repo, CredentialRepository)
@ -48,6 +41,14 @@ async def test_landing_page(client: AsyncClient) -> None:
async def test_dependency_functions() -> None:
from unittest.mock import MagicMock
from porchlight.dependencies import (
get_credential_repo,
get_magic_link_repo,
get_user_repo,
)
request = MagicMock()
request.app.state.user_repo = "user_repo_sentinel"
request.app.state.credential_repo = "credential_repo_sentinel"

View file

@ -46,7 +46,7 @@ async def test_set_password_mismatch_returns_error(client: AsyncClient) -> None:
token = await get_csrf_token(client)
res = await client.post(
"/manage/credentials/password",
data={"current_password": "old", "password": "newpassword", "confirm": "different"},
data={"password": "newpassword", "confirm": "different"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
assert res.status_code == 200
@ -59,7 +59,7 @@ async def test_set_password_too_short_returns_error(client: AsyncClient) -> None
token = await get_csrf_token(client)
res = await client.post(
"/manage/credentials/password",
data={"current_password": "old", "password": "short", "confirm": "short"},
data={"password": "short", "confirm": "short"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
assert res.status_code == 200
@ -74,7 +74,7 @@ async def test_set_password_creates_or_replaces_password(client: AsyncClient) ->
token = await get_csrf_token(client)
res = await client.post(
"/manage/credentials/password",
data={"current_password": "old", "password": "NewStr0ng!Pass99", "confirm": "NewStr0ng!Pass99"},
data={"password": "newpassword123", "confirm": "newpassword123"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
assert res.status_code == 200
@ -83,7 +83,7 @@ async def test_set_password_creates_or_replaces_password(client: AsyncClient) ->
updated = await cred_repo.get_password_by_user(userid)
assert updated is not None
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
assert svc.verify(updated.password_hash, "NewStr0ng!Pass99") is True
assert svc.verify(updated.password_hash, "newpassword123") is True
async def test_delete_password_requires_session(client: AsyncClient) -> None:

View file

@ -1,6 +1,5 @@
import os
import pytest
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.hashes import SHA256
from fido2.cose import ES256
@ -234,6 +233,8 @@ def test_complete_authentication_verifies_signature() -> None:
def test_complete_authentication_wrong_signature_raises() -> None:
import pytest
service = _make_service()
_private_key, credential_id, attested = _generate_credential()

View file

@ -1,66 +0,0 @@
from datetime import UTC, datetime
import pytest
from httpx import AsyncClient
from porchlight.authn.password import PasswordHasher, PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
async def _create_inactive_user_with_password(client: AsyncClient) -> None:
"""Create an inactive user with a password credential."""
app = client._transport.app
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = User(
userid="inactive-01",
username="inactive",
active=False,
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(
PasswordCredential(user_id=user.userid, password_hash=svc.hash("password123!Secure"))
)
@pytest.mark.asyncio
async def test_inactive_user_cannot_login_password(client: AsyncClient) -> None:
await _create_inactive_user_with_password(client)
token = await get_csrf_token(client)
response = await client.post(
"/login/password",
data={"username": "inactive", "password": "password123!Secure"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
assert "Invalid username or password" in response.text
@pytest.mark.asyncio
async def test_inactive_user_cannot_register_magic_link(client: AsyncClient) -> None:
"""If an inactive user exists, magic link registration should reject them."""
app = client._transport.app
user_repo = app.state.user_repo
magic_link_service = app.state.magic_link_service
user = User(
userid="inactive-02",
username="deactivated",
active=False,
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
link = await magic_link_service.create(username="deactivated", created_by="admin", note="test")
response = await client.get(f"/register/{link.token}", follow_redirects=False)
assert response.status_code == 400 or "deactivated" in response.text.lower() or "Invalid" in response.text

View file

@ -1,4 +1,3 @@
from collections.abc import AsyncIterator
from datetime import UTC, datetime, timedelta
from pathlib import Path
@ -15,7 +14,7 @@ MIGRATIONS_DIR = (
@pytest.fixture
async def db() -> AsyncIterator[aiosqlite.Connection]:
async def db():
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON")

View file

@ -3,7 +3,6 @@ from datetime import UTC, datetime
from urllib.parse import parse_qs, urlparse
from argon2 import PasswordHasher
from fastapi import FastAPI
from httpx import AsyncClient
from porchlight.authn.password import PasswordService
@ -226,7 +225,7 @@ async def test_partial_consent_filters_scopes(client: AsyncClient) -> None:
# -- Test helpers --
def _register_test_rp(app: FastAPI) -> None:
def _register_test_rp(app) -> None:
oidc_server = app.state.oidc_server
if "consent-rp" in oidc_server.context.cdb:
return
@ -245,7 +244,7 @@ def _register_test_rp(app: FastAPI) -> None:
oidc_server.keyjar.add_symmetric(client_id, client_secret)
async def _create_test_user(app: FastAPI) -> None:
async def _create_test_user(app) -> None:
user_repo = app.state.user_repo
existing = await user_repo.get_by_username("consentuser")
if existing:

View file

@ -2,41 +2,48 @@ import json
import secrets
from base64 import b64encode
from datetime import UTC, datetime
from typing import Any
from urllib.parse import parse_qs, urlparse
from argon2 import PasswordHasher
from cryptojwt.jwk.jwk import key_from_jwk_dict
from cryptojwt.jws.jws import JWS
from fastapi import FastAPI
from httpx import AsyncClient
from porchlight.authn.password import PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
CLIENT_ID = "e2e-rp"
CLIENT_SECRET = "e2e-secret-0123456789abcdef" # 30+ chars
REDIRECT_URI = "http://localhost:9000/callback"
async def test_full_authorization_code_flow(client: AsyncClient) -> None:
"""End-to-end test of the complete OIDC Authorization Code flow.
async def _setup_rp_and_user(app: FastAPI) -> None:
"""Register an RP client and create a test user with password."""
Exercises: discovery, client registration, authorization request,
password login, authorization completion, token exchange, ID token
validation (JWKS + JWT signature), and userinfo.
"""
# -- Setup: register RP client and create user --
app = client._transport.app # type: ignore[union-attr]
oidc_server = app.state.oidc_server
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
oidc_server.context.cdb[CLIENT_ID] = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"redirect_uris": [(REDIRECT_URI, {})],
client_id = "e2e-rp"
client_secret = "e2e-secret-0123456789abcdef" # 30+ chars
redirect_uri = "http://localhost:9000/callback"
state = secrets.token_urlsafe(16)
nonce = secrets.token_urlsafe(16)
oidc_server.context.cdb[client_id] = {
"client_id": client_id,
"client_secret": client_secret,
"redirect_uris": [(redirect_uri, {})],
"response_types_supported": ["code"],
"token_endpoint_auth_method": "client_secret_basic",
"scope": ["openid", "profile", "email"],
"allowed_scopes": ["openid", "profile", "email"],
"client_salt": secrets.token_hex(8),
}
oidc_server.keyjar.add_symmetric(CLIENT_ID, CLIENT_SECRET)
oidc_server.keyjar.add_symmetric(client_id, client_secret)
user = User(
userid="lusab-bansen",
@ -53,16 +60,13 @@ async def _setup_rp_and_user(app: FastAPI) -> None:
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
async def _login_and_authorize(client: AsyncClient, state: str, nonce: str) -> str:
"""Perform authorization request, login, consent, and return the auth code."""
# Step 1: Authorization request (unauthenticated) -> redirect to /login
# -- Step 1: Authorization request (unauthenticated) → redirect to /login --
auth_res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": "openid profile email",
"state": state,
"nonce": nonce,
@ -72,7 +76,7 @@ async def _login_and_authorize(client: AsyncClient, state: str, nonce: str) -> s
assert auth_res.status_code in (302, 303), f"Expected redirect to /login, got {auth_res.status_code}"
assert "/login" in auth_res.headers["location"]
# Step 2: Password login -> HX-Redirect to /authorization/complete
# -- Step 2: Password login → HX-Redirect to /authorization/complete --
token = await get_csrf_token(client)
login_res = await client.post(
"/login/password",
@ -85,14 +89,14 @@ async def _login_and_authorize(client: AsyncClient, state: str, nonce: str) -> s
f"Expected HX-Redirect to /authorization/complete, got '{hx_redirect}'"
)
# Step 3: Complete authorization -> redirect to consent
# -- Step 3: Complete authorization → redirect to consent --
complete_res = await client.get("/authorization/complete", follow_redirects=False)
assert complete_res.status_code in (302, 303), (
f"Expected redirect to /consent, got {complete_res.status_code}: {complete_res.text}"
)
assert "/consent" in complete_res.headers["location"]
# Step 3b: Approve consent -> redirect to callback with code + state
# -- Step 3b: Approve consent → redirect to callback with code + state --
token = await get_csrf_token(client)
consent_res = await client.post(
"/consent",
@ -113,18 +117,15 @@ async def _login_and_authorize(client: AsyncClient, state: str, nonce: str) -> s
code = callback_params["code"][0]
assert len(code) > 0
return code
async def _exchange_token(client: AsyncClient, code: str) -> dict[str, Any]:
"""Exchange authorization code for tokens."""
auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()
# -- Step 4: Token exchange → access_token, id_token, token_type --
auth_header = b64encode(f"{client_id}:{client_secret}".encode()).decode()
token_res = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"redirect_uri": redirect_uri,
},
headers={
"Authorization": f"Basic {auth_header}",
@ -137,59 +138,45 @@ async def _exchange_token(client: AsyncClient, code: str) -> dict[str, Any]:
assert "access_token" in token_data
assert "id_token" in token_data
assert token_data["token_type"].lower() == "bearer"
return token_data
access_token = token_data["access_token"]
id_token_jwt = token_data["id_token"]
async def _validate_id_token(client: AsyncClient, id_token_jwt: str, nonce: str) -> str:
"""Validate the ID token via JWKS and return the sub claim."""
# -- Step 5: Validate ID token — fetch JWKS, verify JWT signature --
jwks_res = await client.get("/jwks")
assert jwks_res.status_code == 200
jwks = jwks_res.json()
assert "keys" in jwks
assert len(jwks["keys"]) > 0
id_token_header = json.loads(JWS()._decode(id_token_jwt.split(".", maxsplit=1)[0]))
# Decode the ID token header to find the key ID
id_token_header = json.loads(JWS()._decode(id_token_jwt.split(".")[0]))
kid = id_token_header.get("kid")
# Find the matching key in JWKS
matching_keys = [k for k in jwks["keys"] if k.get("kid") == kid]
assert len(matching_keys) == 1, f"Expected 1 matching key for kid={kid}, found {len(matching_keys)}"
key = key_from_jwk_dict(matching_keys[0])
# Verify JWT signature and decode payload
verifier = JWS()
payload = verifier.verify_compact(id_token_jwt, keys=[key])
id_token_payload = json.loads(payload) if isinstance(payload, (str, bytes)) else payload
assert id_token_payload["iss"] == "http://localhost:8000"
assert CLIENT_ID in id_token_payload["aud"]
assert client_id in id_token_payload["aud"]
assert id_token_payload["nonce"] == nonce
# sub is a pairwise identifier — just verify it exists and is non-empty
id_token_sub = id_token_payload["sub"]
assert isinstance(id_token_sub, str)
assert len(id_token_sub) > 0
return id_token_sub
async def test_full_authorization_code_flow(client: AsyncClient) -> None:
"""End-to-end test of the complete OIDC Authorization Code flow.
Exercises: discovery, client registration, authorization request,
password login, authorization completion, token exchange, ID token
validation (JWKS + JWT signature), and userinfo.
"""
app = client._transport.app # type: ignore[union-attr]
state = secrets.token_urlsafe(16)
nonce = secrets.token_urlsafe(16)
await _setup_rp_and_user(app)
code = await _login_and_authorize(client, state, nonce)
token_data = await _exchange_token(client, code)
id_token_sub = await _validate_id_token(client, token_data["id_token"], nonce)
# -- Step 6: UserInfo — GET /userinfo with Bearer token --
userinfo_res = await client.get(
"/userinfo",
headers={"Authorization": f"Bearer {token_data['access_token']}"},
headers={"Authorization": f"Bearer {access_token}"},
)
assert userinfo_res.status_code == 200, f"UserInfo failed: {userinfo_res.status_code} {userinfo_res.text}"

View file

@ -2,7 +2,6 @@ import shutil
from pathlib import Path
from porchlight.config import Settings
from porchlight.oidc.claims import PorchlightUserInfo
from porchlight.oidc.provider import create_oidc_server
@ -50,6 +49,8 @@ def test_create_server_userinfo_is_porchlight() -> None:
try:
settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:", signing_key_path=str(key_path))
server = create_oidc_server(settings)
from porchlight.oidc.claims import PorchlightUserInfo
assert isinstance(server.context.userinfo, PorchlightUserInfo)
finally:
shutil.rmtree(key_path, ignore_errors=True)

View file

@ -1,122 +0,0 @@
from datetime import UTC, datetime
import pytest
from httpx import AsyncClient
from porchlight.authn.password import PasswordHasher, PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
async def _login_user_with_password(client: AsyncClient) -> str:
"""Create user with password, login, return CSRF token."""
app = client._transport.app
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = User(
userid="pw-user-01",
username="pwuser",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("OldPass123!ok")))
token = await get_csrf_token(client)
await client.post(
"/login/password",
data={"username": "pwuser", "password": "OldPass123!ok"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
return token
async def _login_user_without_password(client: AsyncClient) -> str:
"""Create user without password, simulate session login, return CSRF token."""
app = client._transport.app
user_repo = app.state.user_repo
user = User(
userid="pw-user-02",
username="newuser",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
# Simulate session login via magic link
token = await get_csrf_token(client)
magic_link_service = app.state.magic_link_service
link = await magic_link_service.create(username="newuser", created_by="admin", note="test")
await client.get(f"/register/{link.token}", follow_redirects=False)
# Re-fetch CSRF token after session change
token = await get_csrf_token(client)
return token
@pytest.mark.asyncio
async def test_change_password_requires_current(client: AsyncClient) -> None:
token = await _login_user_with_password(client)
response = await client.post(
"/manage/credentials/password",
data={
"password": "NewStrong!Pass99",
"confirm": "NewStrong!Pass99",
},
headers={"X-CSRF-Token": token},
)
assert "alert" in response.text
assert "urrent password" in response.text
@pytest.mark.asyncio
async def test_change_password_wrong_current_rejected(client: AsyncClient) -> None:
token = await _login_user_with_password(client)
response = await client.post(
"/manage/credentials/password",
data={
"current_password": "WrongPassword",
"password": "NewStrong!Pass99",
"confirm": "NewStrong!Pass99",
},
headers={"X-CSRF-Token": token},
)
assert "alert" in response.text
assert "incorrect" in response.text.lower() or "Invalid" in response.text or "current" in response.text.lower()
@pytest.mark.asyncio
async def test_change_password_correct_current_succeeds(client: AsyncClient) -> None:
token = await _login_user_with_password(client)
response = await client.post(
"/manage/credentials/password",
data={
"current_password": "OldPass123!ok",
"password": "NewStrong!Pass99",
"confirm": "NewStrong!Pass99",
},
headers={"X-CSRF-Token": token},
)
assert "updated" in response.text.lower() or "status" in response.text
@pytest.mark.asyncio
async def test_set_password_no_current_needed(client: AsyncClient) -> None:
"""First-time password setup should not require current password."""
token = await _login_user_without_password(client)
response = await client.post(
"/manage/credentials/password",
data={
"password": "FirstPass!Strong99",
"confirm": "FirstPass!Strong99",
},
headers={"X-CSRF-Token": token},
)
assert "updated" in response.text.lower() or "status" in response.text

View file

@ -1,25 +0,0 @@
import pytest
from httpx import AsyncClient
from tests.conftest import get_csrf_token
@pytest.mark.asyncio
async def test_password_login_rate_limited(client: AsyncClient) -> None:
"""After 5 failed attempts, the 6th should be rate-limited."""
token = await get_csrf_token(client)
for _ in range(5):
await client.post(
"/login/password",
data={"username": "nobody", "password": "wrong"},
headers={"X-CSRF-Token": token},
)
response = await client.post(
"/login/password",
data={"username": "nobody", "password": "wrong"},
headers={"X-CSRF-Token": token},
)
assert response.status_code == 429

View file

@ -1,4 +1,3 @@
from collections.abc import AsyncIterator
from pathlib import Path
import aiosqlite
@ -12,7 +11,7 @@ MIGRATIONS_DIR = (
@pytest.fixture
async def db() -> AsyncIterator[aiosqlite.Connection]:
async def db():
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON")

View file

@ -1,13 +1,11 @@
from datetime import UTC, datetime
import aiosqlite
from porchlight.models import User
from porchlight.store.protocols import ConsentRepository
from porchlight.store.sqlite.repositories import SQLiteConsentRepository, SQLiteUserRepository
async def _create_user(db: aiosqlite.Connection) -> User:
async def _create_user(db) -> User:
"""Helper to create a test user."""
user_repo = SQLiteUserRepository(db)
user = User(
@ -20,12 +18,12 @@ async def _create_user(db: aiosqlite.Connection) -> User:
return await user_repo.create(user)
async def test_implements_protocol(db: aiosqlite.Connection) -> None:
async def test_implements_protocol(db) -> None:
repo = SQLiteConsentRepository(db)
assert isinstance(repo, ConsentRepository)
async def test_set_and_get_consent(db: aiosqlite.Connection) -> None:
async def test_set_and_get_consent(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
await repo.set_consent(user.userid, "test-rp", ["openid", "profile"])
@ -39,14 +37,14 @@ async def test_set_and_get_consent(db: aiosqlite.Connection) -> None:
assert isinstance(consent.updated_at, datetime)
async def test_get_consent_not_found(db: aiosqlite.Connection) -> None:
async def test_get_consent_not_found(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
consent = await repo.get_consent(user.userid, "nonexistent")
assert consent is None
async def test_set_consent_upserts(db: aiosqlite.Connection) -> None:
async def test_set_consent_upserts(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
await repo.set_consent(user.userid, "test-rp", ["openid"])
@ -63,7 +61,7 @@ async def test_set_consent_upserts(db: aiosqlite.Connection) -> None:
assert consent.updated_at >= original.updated_at
async def test_delete_consent(db: aiosqlite.Connection) -> None:
async def test_delete_consent(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
await repo.set_consent(user.userid, "test-rp", ["openid"])
@ -75,14 +73,14 @@ async def test_delete_consent(db: aiosqlite.Connection) -> None:
assert consent is None
async def test_delete_consent_not_found(db: aiosqlite.Connection) -> None:
async def test_delete_consent_not_found(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
result = await repo.delete_consent(user.userid, "nonexistent")
assert result is False
async def test_list_consents(db: aiosqlite.Connection) -> None:
async def test_list_consents(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
await repo.set_consent(user.userid, "rp-a", ["openid"])
@ -94,14 +92,14 @@ async def test_list_consents(db: aiosqlite.Connection) -> None:
assert client_ids == {"rp-a", "rp-b"}
async def test_list_consents_empty(db: aiosqlite.Connection) -> None:
async def test_list_consents_empty(db) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
consents = await repo.list_consents(user.userid)
assert consents == []
async def test_consent_deleted_on_user_cascade(db: aiosqlite.Connection) -> None:
async def test_consent_deleted_on_user_cascade(db) -> None:
"""Consent records are deleted when the user is deleted (CASCADE)."""
user = await _create_user(db)
user_repo = SQLiteUserRepository(db)

View file

@ -171,8 +171,8 @@ async def test_create_duplicate_password(credential_repo: SQLiteCredentialReposi
cred = PasswordCredential(user_id=alice.userid, password_hash="hash1")
await credential_repo.create_password(cred)
cred2 = PasswordCredential(user_id=alice.userid, password_hash="hash2")
with pytest.raises(DuplicateError):
cred2 = PasswordCredential(user_id=alice.userid, password_hash="hash2")
await credential_repo.create_password(cred2)

View file

@ -1,5 +1,4 @@
from datetime import UTC, datetime, timedelta
from typing import Any
import aiosqlite
import pytest
@ -15,7 +14,7 @@ def magic_link_repo(db: aiosqlite.Connection) -> SQLiteMagicLinkRepository:
return SQLiteMagicLinkRepository(db)
def _make_link(**overrides: Any) -> MagicLink:
def _make_link(**overrides) -> MagicLink:
defaults = {
"token": "abc123",
"username": "alice",

View file

@ -1,5 +1,3 @@
from typing import Any
import aiosqlite
import pytest
@ -14,7 +12,7 @@ def user_repo(db: aiosqlite.Connection) -> SQLiteUserRepository:
return SQLiteUserRepository(db)
def _make_user(**overrides: Any) -> User:
def _make_user(**overrides) -> User:
defaults = {"userid": "lusab-bansen", "username": "alice"}
defaults.update(overrides)
return User(**defaults)

View file

@ -1,14 +1,7 @@
import pytest
from pydantic import ValidationError
from porchlight.validation import (
GroupListInput,
PasswordChange,
PasswordSet,
ProfileUpdate,
UsernameInput,
format_validation_errors,
)
from porchlight.validation import ProfileUpdate
class TestProfileUpdateEmail:
@ -98,197 +91,3 @@ class TestProfileUpdateDefaults:
assert p.phone_number is None
assert p.picture is None
assert p.locale == ""
class TestProfileUpdateLocale:
def test_valid_locale_language_only(self) -> None:
p = ProfileUpdate(locale="en")
assert p.locale == "en"
def test_valid_locale_language_region(self) -> None:
p = ProfileUpdate(locale="sv-SE")
assert p.locale == "sv-SE"
def test_valid_locale_language_script_region(self) -> None:
p = ProfileUpdate(locale="zh-Hans-CN")
assert p.locale == "zh-Hans-CN"
def test_valid_locale_three_letter(self) -> None:
p = ProfileUpdate(locale="gsw")
assert p.locale == "gsw"
def test_empty_locale_allowed(self) -> None:
p = ProfileUpdate(locale="")
assert p.locale == ""
def test_invalid_locale_rejected(self) -> None:
with pytest.raises(ValidationError, match="locale"):
ProfileUpdate(locale="not_a_locale!")
def test_invalid_locale_numbers(self) -> None:
with pytest.raises(ValidationError, match="locale"):
ProfileUpdate(locale="12345")
def test_whitespace_locale_becomes_empty(self) -> None:
p = ProfileUpdate(locale=" ")
assert p.locale == ""
class TestUsernameInput:
def test_valid_simple(self) -> None:
u = UsernameInput(username="alice")
assert u.username == "alice"
def test_valid_with_dots_dashes_underscores(self) -> None:
u = UsernameInput(username="alice.bob_charlie-1")
assert u.username == "alice.bob_charlie-1"
def test_valid_email_style(self) -> None:
u = UsernameInput(username="user@example.com")
assert u.username == "user@example.com"
def test_valid_uppercase(self) -> None:
u = UsernameInput(username="Alice")
assert u.username == "Alice"
def test_empty_rejected(self) -> None:
with pytest.raises(ValidationError, match="username"):
UsernameInput(username="")
def test_whitespace_only_rejected(self) -> None:
with pytest.raises(ValidationError, match="username"):
UsernameInput(username=" ")
def test_too_long_rejected(self) -> None:
with pytest.raises(ValidationError):
UsernameInput(username="a" * 256)
def test_spaces_rejected(self) -> None:
with pytest.raises(ValidationError, match="username"):
UsernameInput(username="alice bob")
def test_special_chars_rejected(self) -> None:
with pytest.raises(ValidationError, match="username"):
UsernameInput(username="alice<script>")
def test_strips_whitespace(self) -> None:
u = UsernameInput(username=" alice ")
assert u.username == "alice"
class TestGroupListInput:
def test_valid_single_group(self) -> None:
g = GroupListInput(groups="users")
assert g.group_list == ["users"]
def test_valid_multiple_groups(self) -> None:
g = GroupListInput(groups="users, admin, staff")
assert g.group_list == ["users", "admin", "staff"]
def test_empty_string_gives_empty_list(self) -> None:
g = GroupListInput(groups="")
assert g.group_list == []
def test_whitespace_only_gives_empty_list(self) -> None:
g = GroupListInput(groups=" , , ")
assert g.group_list == []
def test_valid_with_hyphens_underscores(self) -> None:
g = GroupListInput(groups="my-group, my_group")
assert g.group_list == ["my-group", "my_group"]
def test_invalid_group_name_spaces(self) -> None:
with pytest.raises(ValidationError, match="group"):
GroupListInput(groups="bad group")
def test_invalid_group_name_special_chars(self) -> None:
with pytest.raises(ValidationError, match="group"):
GroupListInput(groups="admin, bad!group")
def test_invalid_group_name_uppercase(self) -> None:
with pytest.raises(ValidationError, match="group"):
GroupListInput(groups="Admin")
def test_group_name_too_long(self) -> None:
with pytest.raises(ValidationError, match="group"):
GroupListInput(groups="a" * 65)
def test_deduplicates(self) -> None:
g = GroupListInput(groups="users, users, admin")
assert g.group_list == ["users", "admin"]
class TestPasswordSet:
def test_valid_password(self) -> None:
p = PasswordSet(password="strongP@ss123", confirm="strongP@ss123")
assert p.password == "strongP@ss123"
def test_mismatch_rejected(self) -> None:
with pytest.raises(ValidationError, match="match"):
PasswordSet(password="strongP@ss123", confirm="different")
def test_too_short_rejected(self) -> None:
with pytest.raises(ValidationError):
PasswordSet(password="short", confirm="short")
def test_too_long_rejected(self) -> None:
long_pw = "a" * 257
with pytest.raises(ValidationError):
PasswordSet(password=long_pw, confirm=long_pw)
def test_weak_password_rejected(self) -> None:
with pytest.raises(ValidationError, match=r"[Ww]eak\b|[Ss]trength|easily guessed"):
PasswordSet(password="password", confirm="password")
def test_common_password_rejected(self) -> None:
with pytest.raises(ValidationError, match=r"[Ww]eak\b|[Ss]trength|easily guessed"):
PasswordSet(password="12345678", confirm="12345678")
class TestPasswordChange:
def test_valid_change(self) -> None:
p = PasswordChange(
current_password="oldPassword1",
password="newStrongP@ss99",
confirm="newStrongP@ss99",
)
assert p.current_password == "oldPassword1"
assert p.password == "newStrongP@ss99"
def test_missing_current_password(self) -> None:
with pytest.raises(ValidationError, match="current"):
PasswordChange(
current_password="",
password="newStrongP@ss99",
confirm="newStrongP@ss99",
)
class TestFormatValidationErrors:
def test_single_error_no_list(self) -> None:
try:
ProfileUpdate(email="bad")
except ValidationError as exc:
result = format_validation_errors(exc)
assert '<div role="alert">' in result
assert "<ul>" not in result
assert "Email" in result
def test_multiple_errors_uses_list(self) -> None:
try:
ProfileUpdate(email="bad", phone_number="bad")
except ValidationError as exc:
result = format_validation_errors(exc)
assert '<div role="alert">' in result
assert "<ul>" in result
assert "<li>" in result
assert "Email" in result
assert "Phone" in result
def test_value_error_strips_prefix(self) -> None:
try:
ProfileUpdate(picture="ftp://bad.url")
except ValidationError as exc:
result = format_validation_errors(exc)
assert "Picture URL must be a valid HTTP or HTTPS URL" in result
assert "Value error" not in result