Compare commits

..

10 commits

Author SHA1 Message Date
Johan Lundberg
01e3382aaf
fix: resolve all ruff lint errors and type checker warnings
- Use Annotated[str, Form()] for FastAPI dependencies (FAST002)
- Add missing type annotations across src/ and tests/ (ANN001/003/201/202)
- Reduce function arguments via request.form() reads (PLR0913)
- Combine return paths to reduce return statements (PLR0911)
- Use anyio.Path for async-safe filesystem operations (ASYNC240)
- Extract constants, helpers, and dict comprehensions for clarity
- Move inline imports to top-level (PLC0415)
- Use raw strings for regex match patterns (RUF043)
- Fix redundant get_session_user call in delete_user (not-iterable)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:48:46 +02:00
Johan Lundberg
2b652ff603
added ruff config 2026-03-31 15:40:37 +02:00
Johan Lundberg
2745471412
fix: narrow type for PasswordChange to satisfy type checker
Use isinstance check instead of bool flag to help ty resolve
the current_password attribute on the validated model.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:37:52 +02:00
Johan Lundberg
d7cdedbd5f
style: apply ruff formatting to new files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:36:08 +02:00
Johan Lundberg
a65af90320
feat: require current password when changing password, add zxcvbn strength check
Use PasswordChange model (requires current password) for users with
existing passwords and PasswordSet for first-time setup. Add zxcvbn
strength validation and current password field to credentials template.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:34:43 +02:00
Johan Lundberg
72a93984f2
feat: wire validation models into admin routes and deduplicate error handling
Replace manual validation error formatting with shared helper in both
admin and manage profile routes. Add UsernameInput validation to invite
route and GroupListInput validation to groups route.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:34:28 +02:00
Johan Lundberg
56c177c817
fix: add CSRF tokens to admin forms and HTML5 validation hints
Add hidden CSRF token inputs to admin profile, groups, and invite
forms. Add maxlength, pattern, and title attributes to invite input.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:24:16 +02:00
Johan Lundberg
d4acb46cf5
feat: add rate limiting middleware for authentication endpoints
Add slowapi-based rate limiting: 5/min on password login, 10/min on
WebAuthn login. Includes shared rate limiter reset fixture for tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:23:51 +02:00
Johan Lundberg
23ca6272a2
fix: block inactive users from all authentication paths
Add active-user checks to password login, WebAuthn login, and magic
link registration to prevent deactivated accounts from authenticating.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:18:51 +02:00
Johan Lundberg
aff6ddb99b
feat: add validation models (locale, username, groups, password) and error helper
Add BCP 47 locale validator to ProfileUpdate, UsernameInput model,
GroupListInput model, PasswordSet/PasswordChange with zxcvbn strength
checking, and shared format_validation_errors HTML helper.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 15:18:24 +02:00
35 changed files with 1109 additions and 266 deletions

61
ruff.toml Normal file
View file

@ -0,0 +1,61 @@
# Set the maximum line length to 120.
line-length = 120
target-version = "py313"
[lint]
select = [
"A",
"ANN",
"ASYNC",
"B",
"C4",
"DTZ",
"E",
"ERA",
"F",
"FAST",
"FLY",
"FURB",
"I",
"ISC",
"PERF",
"PGH",
"PIE",
"PL",
"PT",
"RUF",
"SIM",
"UP",
"W",
]
ignore = [
"SIM102", # collapsible-if
"SIM103", # return-bool-condition-directly (keeping explicit if/else for clarity)
"SIM108", # if-else-block-instead-of-if-exp
# Since we use ruff as a formatter, the following rules should be ignored
# See: https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
"W191",
"E111",
"E114",
"E117",
"E501",
"D206",
"D300",
"Q000",
"Q001",
"Q002",
"Q003",
"COM812",
"COM819",
"ISC002",
]
allowed-confusables = ["", "х"]
[lint.flake8-annotations]
allow-star-arg-any = true
[lint.per-file-ignores]
"**/test_*.py" = ["PLR2004"] # magic-value-comparison - allow magic numbers in tests

View file

@ -1,4 +1,5 @@
from base64 import urlsafe_b64decode
from typing import Annotated
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, RedirectResponse
@ -6,7 +7,7 @@ from pydantic import ValidationError
from porchlight.dependencies import get_session_user
from porchlight.models import User
from porchlight.validation import ProfileUpdate
from porchlight.validation import GroupListInput, ProfileUpdate, UsernameInput, format_validation_errors
router = APIRouter(prefix="/admin", tags=["admin"])
@ -99,7 +100,7 @@ async def user_detail(request: Request, userid: str) -> Response:
@router.post("/invite", response_class=HTMLResponse)
async def create_invite(
request: Request,
username: str = Form(),
username: Annotated[str, Form()],
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@ -109,17 +110,19 @@ async def create_invite(
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
username = username.strip()
if not username:
return HTMLResponse('<div role="alert">Username is required</div>')
try:
validated = UsernameInput(username=username)
except ValidationError as exc:
return HTMLResponse(format_validation_errors(exc))
magic_link_service = request.app.state.magic_link_service
settings = request.app.state.settings
link = await magic_link_service.create(username=username, created_by=admin.username, note="admin invite")
link = await magic_link_service.create(username=validated.username, created_by=admin.username, note="admin invite")
url = f"{settings.issuer}/register/{link.token}"
return HTMLResponse(
f'<div role="status">Invite created for <strong>{username}</strong>:</div><div class="invite-url">{url}</div>'
f'<div role="status">Invite created for <strong>{validated.username}</strong>:</div>'
f'<div class="invite-url">{url}</div>'
)
@ -128,13 +131,6 @@ async def create_invite(
async def update_user_profile(
request: Request,
userid: str,
given_name: str = Form(""),
family_name: str = Form(""),
preferred_username: str = Form(""),
email: str = Form(""),
phone_number: str = Form(""),
picture: str = Form(""),
locale: str = Form(""),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@ -144,32 +140,19 @@ async def update_user_profile(
return HTMLResponse("Forbidden", status_code=403)
# Validate
form = await request.form()
try:
profile = ProfileUpdate(
given_name=given_name,
family_name=family_name,
preferred_username=preferred_username,
email=email,
phone_number=phone_number,
picture=picture,
locale=locale,
given_name=str(form.get("given_name", "")),
family_name=str(form.get("family_name", "")),
preferred_username=str(form.get("preferred_username", "")),
email=str(form.get("email", "")),
phone_number=str(form.get("phone_number", "")),
picture=str(form.get("picture", "")),
locale=str(form.get("locale", "")),
)
except ValidationError as exc:
error = exc.errors()[0]
field = error["loc"][-1] if error["loc"] else "input"
msg = error["msg"]
labels = {
"given_name": "Given name",
"family_name": "Family name",
"preferred_username": "Display name",
"email": "Email",
"phone_number": "Phone number",
"picture": "Picture URL",
"locale": "Locale",
}
label = labels.get(str(field), str(field))
display_msg = msg.removeprefix("Value error, ") if error["type"] == "value_error" else f"{label}: {msg}"
return HTMLResponse(f'<div role="alert">{display_msg}</div>')
return HTMLResponse(format_validation_errors(exc))
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
@ -196,7 +179,7 @@ async def update_user_profile(
async def update_user_groups(
request: Request,
userid: str,
groups: str = Form(""),
groups: Annotated[str, Form()] = "",
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@ -205,13 +188,17 @@ async def update_user_groups(
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
try:
validated = GroupListInput(groups=groups)
except ValidationError as exc:
return HTMLResponse(format_validation_errors(exc))
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
if user is None:
return HTMLResponse("User not found", status_code=404)
group_list = [g.strip() for g in groups.split(",") if g.strip()]
updated = user.model_copy(update={"groups": group_list})
updated = user.model_copy(update={"groups": validated.group_list})
await user_repo.update(updated)
return HTMLResponse('<div role="status">Groups updated</div>')
@ -353,7 +340,7 @@ async def delete_user(request: Request, userid: str) -> Response:
return HTMLResponse("Forbidden", status_code=403)
# Prevent self-deletion
admin_userid, _ = get_session_user(request)
admin_userid, _ = session_user
if userid == admin_userid:
return HTMLResponse('<div role="alert">Cannot delete your own account</div>')

View file

@ -8,8 +8,11 @@ from urllib.parse import urlparse
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from slowapi.errors import RateLimitExceeded
from starlette.middleware.sessions import SessionMiddleware
from starlette.requests import Request
from starlette.responses import HTMLResponse as StarletteHTMLResponse
from starlette.responses import Response
from porchlight.admin.routes import router as admin_router
from porchlight.authn.password import PasswordService
@ -21,6 +24,7 @@ from porchlight.invite.service import MagicLinkService
from porchlight.manage.routes import router as manage_router
from porchlight.oidc.endpoints import router as oidc_router
from porchlight.oidc.provider import create_oidc_server
from porchlight.rate_limit import limiter
from porchlight.store.sqlite.db import open_db
from porchlight.store.sqlite.repositories import (
SQLiteConsentRepository,
@ -123,6 +127,16 @@ def create_app(settings: Settings | None = None) -> FastAPI:
https_only=settings.session_https_only,
)
# Rate limiting
app.state.limiter = limiter
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded) -> StarletteHTMLResponse:
return StarletteHTMLResponse(
'<div role="alert">Too many attempts. Please try again later.</div>',
status_code=429,
)
# Templates
templates = Jinja2Templates(directory=str(PACKAGE_DIR / "templates"))
@ -147,7 +161,7 @@ def create_app(settings: Settings | None = None) -> FastAPI:
return {"status": "ok"}
@app.get("/")
async def landing(request: Request): # type: ignore[no-untyped-def]
async def landing(request: Request) -> Response:
return templates.TemplateResponse(request, "index.html")
return app

View file

@ -1,10 +1,12 @@
from base64 import urlsafe_b64decode
from typing import Annotated
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fido2.webauthn import AttestedCredentialData, AuthenticationResponse
from porchlight.models import User
from porchlight.rate_limit import limiter
from porchlight.userid import generate_unique_userid
router = APIRouter(tags=["authn"])
@ -28,10 +30,11 @@ async def login_page(request: Request) -> HTMLResponse:
@router.post("/login/password", response_class=HTMLResponse)
@limiter.limit("5/minute")
async def login_password(
request: Request,
username: str = Form(),
password: str = Form(),
username: Annotated[str, Form()],
password: Annotated[str, Form()],
) -> Response:
user_repo = request.app.state.user_repo
cred_repo = request.app.state.credential_repo
@ -50,6 +53,9 @@ async def login_password(
if not password_service.verify(credential.password_hash, password):
return HTMLResponse(error_html)
if not user.active:
return HTMLResponse(error_html)
request.session["userid"] = user.userid
request.session["username"] = user.username
@ -77,6 +83,8 @@ async def register_magic_link(request: Request, token: str) -> Response:
existing_user = await user_repo.get_by_username(link.username)
if existing_user is not None:
if not existing_user.active:
return HTMLResponse("<p>This account has been deactivated.</p>", status_code=400)
user = existing_user
else:
userid = await generate_unique_userid(user_repo)
@ -102,6 +110,7 @@ async def login_webauthn_begin(request: Request) -> Response:
@router.post("/login/webauthn/complete")
@limiter.limit("10/minute")
async def login_webauthn_complete(request: Request) -> Response:
webauthn_service = request.app.state.webauthn_service
user_repo = request.app.state.user_repo
@ -144,8 +153,8 @@ async def login_webauthn_complete(request: Request) -> Response:
await cred_repo.update_webauthn(stored)
user = await user_repo.get_by_userid(userid)
if user is None:
return JSONResponse({"error": "User not found"}, status_code=400)
if user is None or not user.active:
return JSONResponse({"error": "Authentication failed"}, status_code=400)
request.session["userid"] = user.userid
request.session["username"] = user.username

View file

@ -61,7 +61,7 @@ class CSRFMiddleware:
# Origin check (defense-in-depth)
if self.check_origin is not None:
origin = request.headers.get("origin")
if origin is not None and origin != "null" and origin != self.check_origin:
if origin is not None and origin not in ("null", self.check_origin):
logger.warning("CSRF origin mismatch: expected %s, got %s", self.check_origin, origin)
response = HTMLResponse(
"<h1>403 Forbidden</h1><p>Origin mismatch</p>",

View file

@ -1,4 +1,5 @@
from base64 import urlsafe_b64decode
from typing import Annotated
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
@ -7,7 +8,7 @@ from pydantic import ValidationError
from porchlight.dependencies import get_session_user
from porchlight.models import PasswordCredential, WebAuthnCredential
from porchlight.validation import ProfileUpdate
from porchlight.validation import PasswordChange, PasswordSet, ProfileUpdate, format_validation_errors
router = APIRouter(prefix="/manage", tags=["manage"])
@ -53,8 +54,9 @@ async def credentials_page(request: Request) -> Response:
@router.post("/credentials/password", response_class=HTMLResponse)
async def set_password(
request: Request,
password: str = Form(),
confirm: str = Form(),
password: Annotated[str, Form()],
confirm: Annotated[str, Form()],
current_password: Annotated[str, Form()] = "",
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@ -64,15 +66,30 @@ async def set_password(
cred_repo = request.app.state.credential_repo
password_service = request.app.state.password_service
if password != confirm:
return HTMLResponse('<div role="alert">Passwords do not match</div>')
if len(password) < 8:
return HTMLResponse('<div role="alert">Password must be at least 8 characters</div>')
password_hash = password_service.hash(password)
existing = await cred_repo.get_password_by_user(userid)
has_password = existing is not None
# Validate input
try:
if has_password:
validated = PasswordChange(
current_password=current_password,
password=password,
confirm=confirm,
)
else:
validated = PasswordSet(password=password, confirm=confirm)
except ValidationError as exc:
return HTMLResponse(format_validation_errors(exc))
# Verify current password if changing
if has_password and isinstance(validated, PasswordChange):
if not password_service.verify(existing.password_hash, validated.current_password):
return HTMLResponse('<div role="alert">Current password is incorrect</div>')
# Store new password
password_hash = password_service.hash(validated.password)
if existing is not None:
await cred_repo.delete_password(userid)
@ -200,13 +217,6 @@ async def profile_page(request: Request) -> Response:
@router.post("/profile", response_class=HTMLResponse)
async def update_profile(
request: Request,
given_name: str = Form(""),
family_name: str = Form(""),
preferred_username: str = Form(""),
email: str = Form(""),
phone_number: str = Form(""),
picture: str = Form(""),
locale: str = Form(""),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@ -214,34 +224,19 @@ async def update_profile(
userid, _username = session_user
form = await request.form()
try:
profile = ProfileUpdate(
given_name=given_name,
family_name=family_name,
preferred_username=preferred_username,
email=email,
phone_number=phone_number,
picture=picture,
locale=locale,
given_name=str(form.get("given_name", "")),
family_name=str(form.get("family_name", "")),
preferred_username=str(form.get("preferred_username", "")),
email=str(form.get("email", "")),
phone_number=str(form.get("phone_number", "")),
picture=str(form.get("picture", "")),
locale=str(form.get("locale", "")),
)
except ValidationError as exc:
error = exc.errors()[0]
field = error["loc"][-1] if error["loc"] else "input"
msg = error["msg"]
# Produce user-friendly field labels
labels = {
"given_name": "Given name",
"family_name": "Family name",
"preferred_username": "Display name",
"email": "Email",
"phone_number": "Phone number",
"picture": "Picture URL",
"locale": "Locale",
}
label = labels.get(str(field), str(field))
# Use custom message for value errors (e.g. picture URL), generic pydantic message otherwise
display_msg = msg.removeprefix("Value error, ") if error["type"] == "value_error" else f"{label}: {msg}"
return HTMLResponse(f'<div role="alert">{display_msg}</div>')
return HTMLResponse(format_validation_errors(exc))
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)

View file

@ -1,5 +1,7 @@
"""OIDC claims mapping and UserInfo source."""
from typing import Any
from idpyoidc.server.user_info import UserInfo
from porchlight.models import User
@ -28,9 +30,7 @@ def user_to_claims(user: User) -> dict:
"locale": user.locale,
}
for claim_name, value in optional_fields.items():
if value is not None:
claims[claim_name] = value
claims.update({claim_name: value for claim_name, value in optional_fields.items() if value is not None})
# updated_at as Unix timestamp (OIDC spec requires number)
if user.updated_at:
@ -46,7 +46,7 @@ class PorchlightUserInfo(UserInfo):
idpyoidc calls __call__() synchronously to look up claims.
"""
def __init__(self, **kwargs) -> None:
def __init__(self, **kwargs: Any) -> None:
super().__init__(db={}, **kwargs)
def set_user_claims(self, user_id: str, claims: dict) -> None:

View file

@ -106,7 +106,7 @@ async def authorization_complete(request: Request) -> Response:
)
async def _check_consent_or_complete(
async def _check_consent_or_complete( # noqa: PLR0913
request: Request,
oidc_server: object,
endpoint: object,
@ -137,7 +137,7 @@ async def _check_consent_or_complete(
return RedirectResponse("/consent", status_code=303)
async def _complete_authorization(
async def _complete_authorization( # noqa: PLR0913
request: Request,
oidc_server: object,
endpoint: object,
@ -332,11 +332,10 @@ async def consent_submit(request: Request) -> Response:
redirect_uri = auth_params.get("redirect_uri", "")
state = auth_params.get("state", "")
if action == "deny":
params = urlencode({"error": "access_denied", "state": state})
return RedirectResponse(f"{redirect_uri}?{params}", status_code=303)
if action != "allow":
if action == "deny":
params = urlencode({"error": "access_denied", "state": state})
return RedirectResponse(f"{redirect_uri}?{params}", status_code=303)
return HTMLResponse("<h1>Error</h1><p>Invalid action</p>", status_code=400)
# Allow — collect approved scopes
@ -357,11 +356,9 @@ async def consent_submit(request: Request) -> Response:
try:
parsed = endpoint.parse_request(auth_params)
if "error" in parsed:
raise ValueError(parsed.get("error_description", parsed["error"]))
except Exception as exc:
return HTMLResponse(f"<h1>Error</h1><p>{exc}</p>", status_code=400)
if "error" in parsed:
error_desc = parsed.get("error_description", parsed["error"])
return HTMLResponse(f"<h1>Error</h1><p>{error_desc}</p>", status_code=400)
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username)

View file

@ -0,0 +1,4 @@
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)

View file

@ -1,11 +1,13 @@
from pathlib import Path
import aiosqlite
import anyio
async def run_migrations(db: aiosqlite.Connection, migrations_dir: Path) -> int:
"""Apply unapplied SQL migration files in order. Returns count of newly applied migrations."""
if not migrations_dir.is_dir():
async_dir = anyio.Path(migrations_dir)
if not await async_dir.is_dir():
raise FileNotFoundError(f"Migrations directory not found: {migrations_dir}")
await db.execute(
@ -22,19 +24,22 @@ async def run_migrations(db: aiosqlite.Connection, migrations_dir: Path) -> int:
async with db.execute("SELECT filename FROM _migrations") as cursor:
applied = {row[0] async for row in cursor}
migration_files = sorted(migrations_dir.glob("*.sql"))
migration_files = sorted(
[f async for f in async_dir.iterdir() if f.suffix == ".sql"],
key=lambda f: f.name,
)
count = 0
for migration_file in migration_files:
if migration_file.name in applied:
continue
sql = migration_file.read_text(encoding="utf-8")
sql = await migration_file.read_text(encoding="utf-8")
await db.execute("BEGIN")
try:
for statement in sql.split(";"):
statement = statement.strip()
if statement:
await db.execute(statement)
cleaned = statement.strip()
if cleaned:
await db.execute(cleaned)
await db.execute(
"INSERT INTO _migrations (filename) VALUES (?)",
(migration_file.name,),

View file

@ -11,6 +11,7 @@
<section>
<h2>Profile</h2>
<form hx-post="/admin/users/{{ target_user.userid }}/profile" hx-target="#profile-status" hx-swap="innerHTML">
<input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">
<div>
<label for="given_name">Given name</label>
<input type="text" id="given_name" name="given_name" value="{{ target_user.given_name or '' }}" maxlength="255">
@ -48,6 +49,7 @@
<h2>Groups</h2>
<div id="groups-section">
<form hx-post="/admin/users/{{ target_user.userid }}/groups" hx-target="#groups-status" hx-swap="innerHTML">
<input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">
<div id="group-list">
{% for group in target_user.groups %}
<span class="group-tag">{{ group }}</span>

View file

@ -8,8 +8,11 @@
<section>
<h2>Create invite</h2>
<form hx-post="/admin/invite" hx-target="#invite-status" hx-swap="innerHTML">
<input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">
<div class="admin-search">
<input type="text" name="username" placeholder="Username for new invite" required>
<input type="text" name="username" placeholder="Username or email for new invite" required
maxlength="255" pattern="[a-zA-Z0-9_.@-]+"
title="Letters, digits, dots, hyphens, underscores, and @">
<button type="submit">Create invite</button>
</div>
</form>

View file

@ -40,13 +40,19 @@
{% endif %}
<form hx-post="/manage/credentials/password" hx-target="#password-section" hx-swap="innerHTML">
<input type="hidden" name="csrf_token" value="{{ csrf_token_processor(request) }}">
{% if has_password %}
<div>
<label for="current_password">Current password</label>
<input type="password" id="current_password" name="current_password" required autocomplete="current-password">
</div>
{% endif %}
<div>
<label for="password">{{ "New password" if has_password else "Set password" }}</label>
<input type="password" id="password" name="password" required minlength="8" autocomplete="new-password">
<input type="password" id="password" name="password" required minlength="8" maxlength="256" autocomplete="new-password">
</div>
<div>
<label for="confirm">Confirm password</label>
<input type="password" id="confirm" name="confirm" required minlength="8" autocomplete="new-password">
<input type="password" id="confirm" name="confirm" required minlength="8" maxlength="256" autocomplete="new-password">
</div>
<button type="submit">{{ "Change password" if has_password else "Set password" }}</button>
</form>

View file

@ -1,8 +1,10 @@
import re
from typing import Annotated
from urllib.parse import urlparse
from pydantic import BaseModel, EmailStr, Field, field_validator
from pydantic import BaseModel, EmailStr, Field, ValidationError, field_validator, model_validator
from pydantic_extra_types.phone_numbers import PhoneNumberValidator
from zxcvbn import zxcvbn
E164Phone = Annotated[str, PhoneNumberValidator(number_format="E164")]
@ -40,3 +42,134 @@ class ProfileUpdate(BaseModel):
if parsed.scheme not in ("http", "https") or not parsed.netloc:
raise ValueError("Picture URL must be a valid HTTP or HTTPS URL")
return v
@field_validator("locale", mode="before")
@classmethod
def validate_locale(cls, v: str) -> str:
if isinstance(v, str):
v = v.strip()
if v == "":
return ""
if not re.match(r"^[a-z]{2,3}(-[A-Z][a-z]{3})?(-[A-Z]{2})?$", v):
raise ValueError("Locale must be a valid BCP 47 language tag (e.g. en, sv-SE, zh-Hans-CN)")
return v
class UsernameInput(BaseModel):
username: str = Field(max_length=255)
@field_validator("username", mode="before")
@classmethod
def validate_username(cls, v: str) -> str:
if isinstance(v, str):
v = v.strip()
if not v:
raise ValueError("Username is required")
if not re.match(r"^[a-zA-Z0-9_.@-]+$", v):
raise ValueError("Username may only contain letters, digits, dots, hyphens, underscores, and @")
return v
class GroupListInput(BaseModel):
groups: str = ""
@property
def group_list(self) -> list[str]:
"""Parse comma-separated groups into a deduplicated list."""
seen: set[str] = set()
result: list[str] = []
for g in (g.strip() for g in self.groups.split(",") if g.strip()):
if g not in seen:
seen.add(g)
result.append(g)
return result
@field_validator("groups", mode="before")
@classmethod
def validate_groups(cls, v: str) -> str:
if isinstance(v, str):
names = [g.strip() for g in v.split(",") if g.strip()]
for name in names:
if not re.match(r"^[a-z0-9_-]{1,64}$", name):
raise ValueError(
f"Invalid group name '{name}'. "
"Groups must be 1-64 lowercase letters, digits, hyphens, or underscores."
)
return v
MIN_PASSWORD_STRENGTH = 2
class PasswordSet(BaseModel):
password: str = Field(min_length=8, max_length=256)
confirm: str
@model_validator(mode="after")
def validate_password(self) -> "PasswordSet":
if self.password != self.confirm:
raise ValueError("Passwords do not match")
result = zxcvbn(self.password)
if result["score"] < MIN_PASSWORD_STRENGTH:
feedback = result.get("feedback", {})
warning = feedback.get("warning", "")
suggestions = feedback.get("suggestions", [])
msg = "Password is too easily guessed."
if warning:
msg += f" {warning}."
if suggestions:
msg += " " + " ".join(suggestions)
raise ValueError(msg)
return self
class PasswordChange(PasswordSet):
current_password: str
@field_validator("current_password", mode="before")
@classmethod
def validate_current_password(cls, v: str) -> str:
if isinstance(v, str) and v.strip() == "":
raise ValueError("Current password is required")
return v
FIELD_LABELS: dict[str, str] = {
"given_name": "Given name",
"family_name": "Family name",
"preferred_username": "Display name",
"email": "Email",
"phone_number": "Phone number",
"picture": "Picture URL",
"locale": "Locale",
"username": "Username",
"groups": "Groups",
"password": "Password",
"confirm": "Confirm password",
"current_password": "Current password",
}
def format_validation_errors(exc: ValidationError) -> str:
"""Format Pydantic ValidationError into user-friendly HTML."""
messages: list[str] = []
for error in exc.errors():
field = str(error["loc"][-1]) if error["loc"] else "input"
label = FIELD_LABELS.get(field, field)
msg = error["msg"]
if error["type"] == "value_error":
raw = msg.removeprefix("Value error, ")
# If the message already starts with the label, don't duplicate it
if raw.startswith(label):
display_msg = raw
else:
display_msg = f"{label}: {raw}"
else:
display_msg = f"{label}: {msg}"
messages.append(display_msg)
if len(messages) == 1:
return f'<div role="alert">{messages[0]}</div>'
items = "".join(f"<li>{m}</li>" for m in messages)
return f'<div role="alert"><ul>{items}</ul></div>'

View file

@ -6,6 +6,7 @@ from httpx import ASGITransport, AsyncClient
from porchlight.app import create_app
from porchlight.config import Settings
from porchlight.rate_limit import limiter
@pytest.fixture
@ -21,6 +22,12 @@ async def client(settings: Settings) -> AsyncIterator[AsyncClient]:
yield ac
@pytest.fixture(autouse=True)
def _reset_rate_limiter() -> None:
"""Reset the rate limiter storage before each test."""
limiter.reset()
async def get_csrf_token(client: AsyncClient) -> str:
"""Get a CSRF token by visiting the login page.

View file

@ -24,6 +24,107 @@ from porchlight.store.sqlite.repositories import (
)
async def _create_user_with_password(
user_repo: SQLiteUserRepository,
cred_repo: SQLiteCredentialRepository,
password_service: PasswordService,
user: User,
password: str,
) -> None:
"""Helper to create a user and set their password credential."""
await user_repo.create(user)
password_hash = password_service.hash(password)
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=password_hash))
async def _seed_test_users(
user_repo: SQLiteUserRepository,
cred_repo: SQLiteCredentialRepository,
password_service: PasswordService,
result: dict[str, str],
) -> None:
"""Create all test users with passwords."""
# Login test user
await _create_user_with_password(
user_repo,
cred_repo,
password_service,
User(userid="test-user-01", username="testuser", groups=["users"]),
"testpassword123",
)
result["login_username"] = "testuser"
result["login_password"] = "testpassword123"
# Credentials management test user
await _create_user_with_password(
user_repo,
cred_repo,
password_service,
User(userid="test-user-02", username="creduser", groups=["users"]),
"credpassword123",
)
result["cred_username"] = "creduser"
result["cred_password"] = "credpassword123"
# WebAuthn registration test user
await _create_user_with_password(
user_repo,
cred_repo,
password_service,
User(userid="test-user-03", username="webauthnuser", groups=["users"]),
"webauthnpass123",
)
result["webauthn_username"] = "webauthnuser"
result["webauthn_password"] = "webauthnpass123"
result["webauthn_userid"] = "test-user-03"
# Profile management test user
await _create_user_with_password(
user_repo,
cred_repo,
password_service,
User(
userid="test-user-04",
username="profileuser",
given_name="Alice",
family_name="Smith",
preferred_username="asmith",
email="alice@example.com",
phone_number="+12025551234",
picture="https://example.com/alice.jpg",
locale="en",
groups=["users"],
),
"profilepass123",
)
result["profile_username"] = "profileuser"
result["profile_password"] = "profilepass123"
# Admin user for admin page tests
await _create_user_with_password(
user_repo,
cred_repo,
password_service,
User(
userid="test-user-05",
username="adminuser",
given_name="Admin",
family_name="User",
email="admin@example.com",
groups=["admin", "users"],
),
"adminpass123",
)
result["admin_username"] = "adminuser"
result["admin_password"] = "adminpass123"
result["admin_userid"] = "test-user-05"
# Disposable user for admin delete test
await user_repo.create(User(userid="test-user-06", username="disposableuser", groups=["users"]))
result["disposable_userid"] = "test-user-06"
result["disposable_username"] = "disposableuser"
async def seed() -> None:
db_path = os.environ.get("OIDC_OP_SQLITE_PATH")
if not db_path:
@ -39,89 +140,21 @@ async def seed() -> None:
password_service = PasswordService()
magic_link_service = MagicLinkService(repo=magic_link_repo)
result = {}
result: dict[str, str] = {}
# 1. Create a magic link for registration test
# Create magic link for registration test
link = await magic_link_service.create(username="newuser")
result["register_token"] = link.token
result["register_username"] = "newuser"
# 2. Create a user with a password for login test
user = User(userid="test-user-01", username="testuser", groups=["users"])
await user_repo.create(user)
password_hash = password_service.hash("testpassword123")
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=password_hash))
result["login_username"] = "testuser"
result["login_password"] = "testpassword123"
# Create all test users
await _seed_test_users(user_repo, cred_repo, password_service, result)
# 3. Create a separate user for credentials management test
cred_user = User(userid="test-user-02", username="creduser", groups=["users"])
await user_repo.create(cred_user)
cred_password_hash = password_service.hash("credpassword123")
await cred_repo.create_password(PasswordCredential(user_id=cred_user.userid, password_hash=cred_password_hash))
result["cred_username"] = "creduser"
result["cred_password"] = "credpassword123"
# 5. Create a user with password for WebAuthn registration tests
# (login with password first, then register a passkey)
webauthn_user = User(userid="test-user-03", username="webauthnuser", groups=["users"])
await user_repo.create(webauthn_user)
webauthn_password_hash = password_service.hash("webauthnpass123")
await cred_repo.create_password(
PasswordCredential(user_id=webauthn_user.userid, password_hash=webauthn_password_hash)
)
result["webauthn_username"] = "webauthnuser"
result["webauthn_password"] = "webauthnpass123"
result["webauthn_userid"] = "test-user-03"
# 4. Create an expired/used magic link for negative test
# Create an expired/used magic link for negative test
expired_link = await magic_link_service.create(username="expired")
await magic_link_service.mark_used(expired_link.token)
result["used_token"] = expired_link.token
# 5. Create a user with profile data for profile management tests
profile_user = User(
userid="test-user-04",
username="profileuser",
given_name="Alice",
family_name="Smith",
preferred_username="asmith",
email="alice@example.com",
phone_number="+12025551234",
picture="https://example.com/alice.jpg",
locale="en",
groups=["users"],
)
await user_repo.create(profile_user)
profile_password_hash = password_service.hash("profilepass123")
await cred_repo.create_password(
PasswordCredential(user_id=profile_user.userid, password_hash=profile_password_hash)
)
result["profile_username"] = "profileuser"
result["profile_password"] = "profilepass123"
# 6. Admin user for admin page tests
admin_user = User(
userid="test-user-05",
username="adminuser",
given_name="Admin",
family_name="User",
email="admin@example.com",
groups=["admin", "users"],
)
await user_repo.create(admin_user)
admin_password_hash = password_service.hash("adminpass123")
await cred_repo.create_password(PasswordCredential(user_id=admin_user.userid, password_hash=admin_password_hash))
result["admin_username"] = "adminuser"
result["admin_password"] = "adminpass123"
result["admin_userid"] = "test-user-05"
# 7. Disposable user for admin delete test (not used by any other tests)
disposable_user = User(userid="test-user-06", username="disposableuser", groups=["users"])
await user_repo.create(disposable_user)
result["disposable_userid"] = "test-user-06"
result["disposable_username"] = "disposableuser"
await db.commit()
await db.close()
print(json.dumps(result))

View file

@ -1,3 +1,4 @@
from base64 import urlsafe_b64encode
from datetime import UTC, datetime
import pytest
@ -46,7 +47,7 @@ async def _login(
)
async def _create_target_user(
async def _create_target_user( # noqa: PLR0913
client: AsyncClient,
*,
userid: str = "target-user-01",
@ -365,8 +366,6 @@ async def test_delete_webauthn_credential(client: AsyncClient) -> None:
)
# URL uses base64url without padding
from base64 import urlsafe_b64encode
credential_id_b64 = urlsafe_b64encode(credential_id).decode().rstrip("=")
token = await get_csrf_token(client)

View file

@ -0,0 +1,85 @@
from datetime import UTC, datetime
import pytest
from httpx import AsyncClient
from porchlight.authn.password import PasswordHasher, PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
async def _setup_admin_and_target(client: AsyncClient) -> tuple[str, str]:
"""Create admin + target user, login as admin, return (token, target_userid)."""
app = client._transport.app
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
admin = User(
userid="admin-g01",
username="admin_g",
groups=["admin", "users"],
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(admin)
target = User(
userid="target-g01",
username="target_g",
groups=["users"],
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(target)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=admin.userid, password_hash=svc.hash("AdminPass123!")))
token = await get_csrf_token(client)
await client.post(
"/login/password",
data={"username": "admin_g", "password": "AdminPass123!"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
return token, target.userid
@pytest.mark.asyncio
async def test_valid_groups(client: AsyncClient) -> None:
token, userid = await _setup_admin_and_target(client)
response = await client.post(
f"/admin/users/{userid}/groups",
data={"groups": "users, staff"},
headers={"X-CSRF-Token": token},
)
assert "Groups updated" in response.text
app = client._transport.app
user = await app.state.user_repo.get_by_userid(userid)
assert sorted(user.groups) == ["staff", "users"]
@pytest.mark.asyncio
async def test_invalid_group_name_rejected(client: AsyncClient) -> None:
token, userid = await _setup_admin_and_target(client)
response = await client.post(
f"/admin/users/{userid}/groups",
data={"groups": "users, Bad Group!"},
headers={"X-CSRF-Token": token},
)
assert "alert" in response.text
@pytest.mark.asyncio
async def test_empty_groups_clears(client: AsyncClient) -> None:
token, userid = await _setup_admin_and_target(client)
response = await client.post(
f"/admin/users/{userid}/groups",
data={"groups": ""},
headers={"X-CSRF-Token": token},
)
assert "Groups updated" in response.text
app = client._transport.app
user = await app.state.user_repo.get_by_userid(userid)
assert user.groups == []

View file

@ -0,0 +1,71 @@
from datetime import UTC, datetime
import pytest
from httpx import AsyncClient
from porchlight.authn.password import PasswordHasher, PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
async def _login_admin(client: AsyncClient) -> str:
"""Create and login as admin user, return CSRF token."""
app = client._transport.app
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = User(
userid="admin-01",
username="admin",
groups=["admin", "users"],
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("AdminPass123!")))
token = await get_csrf_token(client)
await client.post(
"/login/password",
data={"username": "admin", "password": "AdminPass123!"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
return token
@pytest.mark.asyncio
async def test_invite_valid_username(client: AsyncClient) -> None:
token = await _login_admin(client)
response = await client.post(
"/admin/invite",
data={"username": "newuser@example.com"},
headers={"X-CSRF-Token": token},
)
assert response.status_code == 200
assert "Invite created" in response.text
@pytest.mark.asyncio
async def test_invite_empty_username_rejected(client: AsyncClient) -> None:
token = await _login_admin(client)
response = await client.post(
"/admin/invite",
data={"username": ""},
headers={"X-CSRF-Token": token},
)
# Empty username is rejected — either by FastAPI (422) or validation (alert)
assert response.status_code == 422 or "alert" in response.text
@pytest.mark.asyncio
async def test_invite_invalid_username_rejected(client: AsyncClient) -> None:
token = await _login_admin(client)
response = await client.post(
"/admin/invite",
data={"username": "bad user<script>"},
headers={"X-CSRF-Token": token},
)
assert "alert" in response.text
assert "letters" in response.text.lower() or "Username" in response.text

View file

@ -1,5 +1,18 @@
from unittest.mock import MagicMock
from httpx import AsyncClient
from porchlight.dependencies import (
get_credential_repo,
get_magic_link_repo,
get_user_repo,
)
from porchlight.store.protocols import (
CredentialRepository,
MagicLinkRepository,
UserRepository,
)
async def test_health_endpoint(client: AsyncClient) -> None:
response = await client.get("/health")
@ -16,12 +29,6 @@ async def test_app_has_title(client: AsyncClient) -> None:
async def test_app_has_repos_on_state(client: AsyncClient) -> None:
from porchlight.store.protocols import (
CredentialRepository,
MagicLinkRepository,
UserRepository,
)
app = client._transport.app # type: ignore[union-attr]
assert isinstance(app.state.user_repo, UserRepository)
assert isinstance(app.state.credential_repo, CredentialRepository)
@ -41,14 +48,6 @@ async def test_landing_page(client: AsyncClient) -> None:
async def test_dependency_functions() -> None:
from unittest.mock import MagicMock
from porchlight.dependencies import (
get_credential_repo,
get_magic_link_repo,
get_user_repo,
)
request = MagicMock()
request.app.state.user_repo = "user_repo_sentinel"
request.app.state.credential_repo = "credential_repo_sentinel"

View file

@ -46,7 +46,7 @@ async def test_set_password_mismatch_returns_error(client: AsyncClient) -> None:
token = await get_csrf_token(client)
res = await client.post(
"/manage/credentials/password",
data={"password": "newpassword", "confirm": "different"},
data={"current_password": "old", "password": "newpassword", "confirm": "different"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
assert res.status_code == 200
@ -59,7 +59,7 @@ async def test_set_password_too_short_returns_error(client: AsyncClient) -> None
token = await get_csrf_token(client)
res = await client.post(
"/manage/credentials/password",
data={"password": "short", "confirm": "short"},
data={"current_password": "old", "password": "short", "confirm": "short"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
assert res.status_code == 200
@ -74,7 +74,7 @@ async def test_set_password_creates_or_replaces_password(client: AsyncClient) ->
token = await get_csrf_token(client)
res = await client.post(
"/manage/credentials/password",
data={"password": "newpassword123", "confirm": "newpassword123"},
data={"current_password": "old", "password": "NewStr0ng!Pass99", "confirm": "NewStr0ng!Pass99"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
assert res.status_code == 200
@ -83,7 +83,7 @@ async def test_set_password_creates_or_replaces_password(client: AsyncClient) ->
updated = await cred_repo.get_password_by_user(userid)
assert updated is not None
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
assert svc.verify(updated.password_hash, "newpassword123") is True
assert svc.verify(updated.password_hash, "NewStr0ng!Pass99") is True
async def test_delete_password_requires_session(client: AsyncClient) -> None:

View file

@ -1,5 +1,6 @@
import os
import pytest
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.hashes import SHA256
from fido2.cose import ES256
@ -233,8 +234,6 @@ def test_complete_authentication_verifies_signature() -> None:
def test_complete_authentication_wrong_signature_raises() -> None:
import pytest
service = _make_service()
_private_key, credential_id, attested = _generate_credential()

View file

@ -0,0 +1,66 @@
from datetime import UTC, datetime
import pytest
from httpx import AsyncClient
from porchlight.authn.password import PasswordHasher, PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
async def _create_inactive_user_with_password(client: AsyncClient) -> None:
"""Create an inactive user with a password credential."""
app = client._transport.app
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = User(
userid="inactive-01",
username="inactive",
active=False,
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(
PasswordCredential(user_id=user.userid, password_hash=svc.hash("password123!Secure"))
)
@pytest.mark.asyncio
async def test_inactive_user_cannot_login_password(client: AsyncClient) -> None:
await _create_inactive_user_with_password(client)
token = await get_csrf_token(client)
response = await client.post(
"/login/password",
data={"username": "inactive", "password": "password123!Secure"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
assert "Invalid username or password" in response.text
@pytest.mark.asyncio
async def test_inactive_user_cannot_register_magic_link(client: AsyncClient) -> None:
"""If an inactive user exists, magic link registration should reject them."""
app = client._transport.app
user_repo = app.state.user_repo
magic_link_service = app.state.magic_link_service
user = User(
userid="inactive-02",
username="deactivated",
active=False,
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
link = await magic_link_service.create(username="deactivated", created_by="admin", note="test")
response = await client.get(f"/register/{link.token}", follow_redirects=False)
assert response.status_code == 400 or "deactivated" in response.text.lower() or "Invalid" in response.text

View file

@ -1,3 +1,4 @@
from collections.abc import AsyncIterator
from datetime import UTC, datetime, timedelta
from pathlib import Path
@ -14,7 +15,7 @@ MIGRATIONS_DIR = (
@pytest.fixture
async def db():
async def db() -> AsyncIterator[aiosqlite.Connection]:
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON")

View file

@ -3,6 +3,7 @@ from datetime import UTC, datetime
from urllib.parse import parse_qs, urlparse
from argon2 import PasswordHasher
from fastapi import FastAPI
from httpx import AsyncClient
from porchlight.authn.password import PasswordService
@ -225,7 +226,7 @@ async def test_partial_consent_filters_scopes(client: AsyncClient) -> None:
# -- Test helpers --
def _register_test_rp(app) -> None:
def _register_test_rp(app: FastAPI) -> None:
oidc_server = app.state.oidc_server
if "consent-rp" in oidc_server.context.cdb:
return
@ -244,7 +245,7 @@ def _register_test_rp(app) -> None:
oidc_server.keyjar.add_symmetric(client_id, client_secret)
async def _create_test_user(app) -> None:
async def _create_test_user(app: FastAPI) -> None:
user_repo = app.state.user_repo
existing = await user_repo.get_by_username("consentuser")
if existing:

View file

@ -2,48 +2,41 @@ import json
import secrets
from base64 import b64encode
from datetime import UTC, datetime
from typing import Any
from urllib.parse import parse_qs, urlparse
from argon2 import PasswordHasher
from cryptojwt.jwk.jwk import key_from_jwk_dict
from cryptojwt.jws.jws import JWS
from fastapi import FastAPI
from httpx import AsyncClient
from porchlight.authn.password import PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
CLIENT_ID = "e2e-rp"
CLIENT_SECRET = "e2e-secret-0123456789abcdef" # 30+ chars
REDIRECT_URI = "http://localhost:9000/callback"
async def test_full_authorization_code_flow(client: AsyncClient) -> None:
"""End-to-end test of the complete OIDC Authorization Code flow.
Exercises: discovery, client registration, authorization request,
password login, authorization completion, token exchange, ID token
validation (JWKS + JWT signature), and userinfo.
"""
# -- Setup: register RP client and create user --
app = client._transport.app # type: ignore[union-attr]
async def _setup_rp_and_user(app: FastAPI) -> None:
"""Register an RP client and create a test user with password."""
oidc_server = app.state.oidc_server
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
client_id = "e2e-rp"
client_secret = "e2e-secret-0123456789abcdef" # 30+ chars
redirect_uri = "http://localhost:9000/callback"
state = secrets.token_urlsafe(16)
nonce = secrets.token_urlsafe(16)
oidc_server.context.cdb[client_id] = {
"client_id": client_id,
"client_secret": client_secret,
"redirect_uris": [(redirect_uri, {})],
oidc_server.context.cdb[CLIENT_ID] = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"redirect_uris": [(REDIRECT_URI, {})],
"response_types_supported": ["code"],
"token_endpoint_auth_method": "client_secret_basic",
"scope": ["openid", "profile", "email"],
"allowed_scopes": ["openid", "profile", "email"],
"client_salt": secrets.token_hex(8),
}
oidc_server.keyjar.add_symmetric(client_id, client_secret)
oidc_server.keyjar.add_symmetric(CLIENT_ID, CLIENT_SECRET)
user = User(
userid="lusab-bansen",
@ -60,13 +53,16 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None:
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
# -- Step 1: Authorization request (unauthenticated) → redirect to /login --
async def _login_and_authorize(client: AsyncClient, state: str, nonce: str) -> str:
"""Perform authorization request, login, consent, and return the auth code."""
# Step 1: Authorization request (unauthenticated) -> redirect to /login
auth_res = await client.get(
"/authorization",
params={
"response_type": "code",
"client_id": client_id,
"redirect_uri": redirect_uri,
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"scope": "openid profile email",
"state": state,
"nonce": nonce,
@ -76,7 +72,7 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None:
assert auth_res.status_code in (302, 303), f"Expected redirect to /login, got {auth_res.status_code}"
assert "/login" in auth_res.headers["location"]
# -- Step 2: Password login → HX-Redirect to /authorization/complete --
# Step 2: Password login -> HX-Redirect to /authorization/complete
token = await get_csrf_token(client)
login_res = await client.post(
"/login/password",
@ -89,14 +85,14 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None:
f"Expected HX-Redirect to /authorization/complete, got '{hx_redirect}'"
)
# -- Step 3: Complete authorization → redirect to consent --
# Step 3: Complete authorization -> redirect to consent
complete_res = await client.get("/authorization/complete", follow_redirects=False)
assert complete_res.status_code in (302, 303), (
f"Expected redirect to /consent, got {complete_res.status_code}: {complete_res.text}"
)
assert "/consent" in complete_res.headers["location"]
# -- Step 3b: Approve consent → redirect to callback with code + state --
# Step 3b: Approve consent -> redirect to callback with code + state
token = await get_csrf_token(client)
consent_res = await client.post(
"/consent",
@ -117,15 +113,18 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None:
code = callback_params["code"][0]
assert len(code) > 0
return code
# -- Step 4: Token exchange → access_token, id_token, token_type --
auth_header = b64encode(f"{client_id}:{client_secret}".encode()).decode()
async def _exchange_token(client: AsyncClient, code: str) -> dict[str, Any]:
"""Exchange authorization code for tokens."""
auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()
token_res = await client.post(
"/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"redirect_uri": REDIRECT_URI,
},
headers={
"Authorization": f"Basic {auth_header}",
@ -138,45 +137,59 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None:
assert "access_token" in token_data
assert "id_token" in token_data
assert token_data["token_type"].lower() == "bearer"
return token_data
access_token = token_data["access_token"]
id_token_jwt = token_data["id_token"]
# -- Step 5: Validate ID token — fetch JWKS, verify JWT signature --
async def _validate_id_token(client: AsyncClient, id_token_jwt: str, nonce: str) -> str:
"""Validate the ID token via JWKS and return the sub claim."""
jwks_res = await client.get("/jwks")
assert jwks_res.status_code == 200
jwks = jwks_res.json()
assert "keys" in jwks
assert len(jwks["keys"]) > 0
# Decode the ID token header to find the key ID
id_token_header = json.loads(JWS()._decode(id_token_jwt.split(".")[0]))
id_token_header = json.loads(JWS()._decode(id_token_jwt.split(".", maxsplit=1)[0]))
kid = id_token_header.get("kid")
# Find the matching key in JWKS
matching_keys = [k for k in jwks["keys"] if k.get("kid") == kid]
assert len(matching_keys) == 1, f"Expected 1 matching key for kid={kid}, found {len(matching_keys)}"
key = key_from_jwk_dict(matching_keys[0])
# Verify JWT signature and decode payload
verifier = JWS()
payload = verifier.verify_compact(id_token_jwt, keys=[key])
id_token_payload = json.loads(payload) if isinstance(payload, (str, bytes)) else payload
assert id_token_payload["iss"] == "http://localhost:8000"
assert client_id in id_token_payload["aud"]
assert CLIENT_ID in id_token_payload["aud"]
assert id_token_payload["nonce"] == nonce
# sub is a pairwise identifier — just verify it exists and is non-empty
id_token_sub = id_token_payload["sub"]
assert isinstance(id_token_sub, str)
assert len(id_token_sub) > 0
return id_token_sub
async def test_full_authorization_code_flow(client: AsyncClient) -> None:
"""End-to-end test of the complete OIDC Authorization Code flow.
Exercises: discovery, client registration, authorization request,
password login, authorization completion, token exchange, ID token
validation (JWKS + JWT signature), and userinfo.
"""
app = client._transport.app # type: ignore[union-attr]
state = secrets.token_urlsafe(16)
nonce = secrets.token_urlsafe(16)
await _setup_rp_and_user(app)
code = await _login_and_authorize(client, state, nonce)
token_data = await _exchange_token(client, code)
id_token_sub = await _validate_id_token(client, token_data["id_token"], nonce)
# -- Step 6: UserInfo — GET /userinfo with Bearer token --
userinfo_res = await client.get(
"/userinfo",
headers={"Authorization": f"Bearer {access_token}"},
headers={"Authorization": f"Bearer {token_data['access_token']}"},
)
assert userinfo_res.status_code == 200, f"UserInfo failed: {userinfo_res.status_code} {userinfo_res.text}"

View file

@ -2,6 +2,7 @@ import shutil
from pathlib import Path
from porchlight.config import Settings
from porchlight.oidc.claims import PorchlightUserInfo
from porchlight.oidc.provider import create_oidc_server
@ -49,8 +50,6 @@ def test_create_server_userinfo_is_porchlight() -> None:
try:
settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:", signing_key_path=str(key_path))
server = create_oidc_server(settings)
from porchlight.oidc.claims import PorchlightUserInfo
assert isinstance(server.context.userinfo, PorchlightUserInfo)
finally:
shutil.rmtree(key_path, ignore_errors=True)

View file

@ -0,0 +1,122 @@
from datetime import UTC, datetime
import pytest
from httpx import AsyncClient
from porchlight.authn.password import PasswordHasher, PasswordService
from porchlight.models import PasswordCredential, User
from tests.conftest import get_csrf_token
async def _login_user_with_password(client: AsyncClient) -> str:
"""Create user with password, login, return CSRF token."""
app = client._transport.app
user_repo = app.state.user_repo
cred_repo = app.state.credential_repo
user = User(
userid="pw-user-01",
username="pwuser",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("OldPass123!ok")))
token = await get_csrf_token(client)
await client.post(
"/login/password",
data={"username": "pwuser", "password": "OldPass123!ok"},
headers={"HX-Request": "true", "X-CSRF-Token": token},
)
return token
async def _login_user_without_password(client: AsyncClient) -> str:
"""Create user without password, simulate session login, return CSRF token."""
app = client._transport.app
user_repo = app.state.user_repo
user = User(
userid="pw-user-02",
username="newuser",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await user_repo.create(user)
# Simulate session login via magic link
token = await get_csrf_token(client)
magic_link_service = app.state.magic_link_service
link = await magic_link_service.create(username="newuser", created_by="admin", note="test")
await client.get(f"/register/{link.token}", follow_redirects=False)
# Re-fetch CSRF token after session change
token = await get_csrf_token(client)
return token
@pytest.mark.asyncio
async def test_change_password_requires_current(client: AsyncClient) -> None:
token = await _login_user_with_password(client)
response = await client.post(
"/manage/credentials/password",
data={
"password": "NewStrong!Pass99",
"confirm": "NewStrong!Pass99",
},
headers={"X-CSRF-Token": token},
)
assert "alert" in response.text
assert "urrent password" in response.text
@pytest.mark.asyncio
async def test_change_password_wrong_current_rejected(client: AsyncClient) -> None:
token = await _login_user_with_password(client)
response = await client.post(
"/manage/credentials/password",
data={
"current_password": "WrongPassword",
"password": "NewStrong!Pass99",
"confirm": "NewStrong!Pass99",
},
headers={"X-CSRF-Token": token},
)
assert "alert" in response.text
assert "incorrect" in response.text.lower() or "Invalid" in response.text or "current" in response.text.lower()
@pytest.mark.asyncio
async def test_change_password_correct_current_succeeds(client: AsyncClient) -> None:
token = await _login_user_with_password(client)
response = await client.post(
"/manage/credentials/password",
data={
"current_password": "OldPass123!ok",
"password": "NewStrong!Pass99",
"confirm": "NewStrong!Pass99",
},
headers={"X-CSRF-Token": token},
)
assert "updated" in response.text.lower() or "status" in response.text
@pytest.mark.asyncio
async def test_set_password_no_current_needed(client: AsyncClient) -> None:
"""First-time password setup should not require current password."""
token = await _login_user_without_password(client)
response = await client.post(
"/manage/credentials/password",
data={
"password": "FirstPass!Strong99",
"confirm": "FirstPass!Strong99",
},
headers={"X-CSRF-Token": token},
)
assert "updated" in response.text.lower() or "status" in response.text

25
tests/test_rate_limit.py Normal file
View file

@ -0,0 +1,25 @@
import pytest
from httpx import AsyncClient
from tests.conftest import get_csrf_token
@pytest.mark.asyncio
async def test_password_login_rate_limited(client: AsyncClient) -> None:
"""After 5 failed attempts, the 6th should be rate-limited."""
token = await get_csrf_token(client)
for _ in range(5):
await client.post(
"/login/password",
data={"username": "nobody", "password": "wrong"},
headers={"X-CSRF-Token": token},
)
response = await client.post(
"/login/password",
data={"username": "nobody", "password": "wrong"},
headers={"X-CSRF-Token": token},
)
assert response.status_code == 429

View file

@ -1,3 +1,4 @@
from collections.abc import AsyncIterator
from pathlib import Path
import aiosqlite
@ -11,7 +12,7 @@ MIGRATIONS_DIR = (
@pytest.fixture
async def db():
async def db() -> AsyncIterator[aiosqlite.Connection]:
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON")

View file

@ -1,11 +1,13 @@
from datetime import UTC, datetime
import aiosqlite
from porchlight.models import User
from porchlight.store.protocols import ConsentRepository
from porchlight.store.sqlite.repositories import SQLiteConsentRepository, SQLiteUserRepository
async def _create_user(db) -> User:
async def _create_user(db: aiosqlite.Connection) -> User:
"""Helper to create a test user."""
user_repo = SQLiteUserRepository(db)
user = User(
@ -18,12 +20,12 @@ async def _create_user(db) -> User:
return await user_repo.create(user)
async def test_implements_protocol(db) -> None:
async def test_implements_protocol(db: aiosqlite.Connection) -> None:
repo = SQLiteConsentRepository(db)
assert isinstance(repo, ConsentRepository)
async def test_set_and_get_consent(db) -> None:
async def test_set_and_get_consent(db: aiosqlite.Connection) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
await repo.set_consent(user.userid, "test-rp", ["openid", "profile"])
@ -37,14 +39,14 @@ async def test_set_and_get_consent(db) -> None:
assert isinstance(consent.updated_at, datetime)
async def test_get_consent_not_found(db) -> None:
async def test_get_consent_not_found(db: aiosqlite.Connection) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
consent = await repo.get_consent(user.userid, "nonexistent")
assert consent is None
async def test_set_consent_upserts(db) -> None:
async def test_set_consent_upserts(db: aiosqlite.Connection) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
await repo.set_consent(user.userid, "test-rp", ["openid"])
@ -61,7 +63,7 @@ async def test_set_consent_upserts(db) -> None:
assert consent.updated_at >= original.updated_at
async def test_delete_consent(db) -> None:
async def test_delete_consent(db: aiosqlite.Connection) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
await repo.set_consent(user.userid, "test-rp", ["openid"])
@ -73,14 +75,14 @@ async def test_delete_consent(db) -> None:
assert consent is None
async def test_delete_consent_not_found(db) -> None:
async def test_delete_consent_not_found(db: aiosqlite.Connection) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
result = await repo.delete_consent(user.userid, "nonexistent")
assert result is False
async def test_list_consents(db) -> None:
async def test_list_consents(db: aiosqlite.Connection) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
await repo.set_consent(user.userid, "rp-a", ["openid"])
@ -92,14 +94,14 @@ async def test_list_consents(db) -> None:
assert client_ids == {"rp-a", "rp-b"}
async def test_list_consents_empty(db) -> None:
async def test_list_consents_empty(db: aiosqlite.Connection) -> None:
user = await _create_user(db)
repo = SQLiteConsentRepository(db)
consents = await repo.list_consents(user.userid)
assert consents == []
async def test_consent_deleted_on_user_cascade(db) -> None:
async def test_consent_deleted_on_user_cascade(db: aiosqlite.Connection) -> None:
"""Consent records are deleted when the user is deleted (CASCADE)."""
user = await _create_user(db)
user_repo = SQLiteUserRepository(db)

View file

@ -171,8 +171,8 @@ async def test_create_duplicate_password(credential_repo: SQLiteCredentialReposi
cred = PasswordCredential(user_id=alice.userid, password_hash="hash1")
await credential_repo.create_password(cred)
cred2 = PasswordCredential(user_id=alice.userid, password_hash="hash2")
with pytest.raises(DuplicateError):
cred2 = PasswordCredential(user_id=alice.userid, password_hash="hash2")
await credential_repo.create_password(cred2)

View file

@ -1,4 +1,5 @@
from datetime import UTC, datetime, timedelta
from typing import Any
import aiosqlite
import pytest
@ -14,7 +15,7 @@ def magic_link_repo(db: aiosqlite.Connection) -> SQLiteMagicLinkRepository:
return SQLiteMagicLinkRepository(db)
def _make_link(**overrides) -> MagicLink:
def _make_link(**overrides: Any) -> MagicLink:
defaults = {
"token": "abc123",
"username": "alice",

View file

@ -1,3 +1,5 @@
from typing import Any
import aiosqlite
import pytest
@ -12,7 +14,7 @@ def user_repo(db: aiosqlite.Connection) -> SQLiteUserRepository:
return SQLiteUserRepository(db)
def _make_user(**overrides) -> User:
def _make_user(**overrides: Any) -> User:
defaults = {"userid": "lusab-bansen", "username": "alice"}
defaults.update(overrides)
return User(**defaults)

View file

@ -1,7 +1,14 @@
import pytest
from pydantic import ValidationError
from porchlight.validation import ProfileUpdate
from porchlight.validation import (
GroupListInput,
PasswordChange,
PasswordSet,
ProfileUpdate,
UsernameInput,
format_validation_errors,
)
class TestProfileUpdateEmail:
@ -91,3 +98,197 @@ class TestProfileUpdateDefaults:
assert p.phone_number is None
assert p.picture is None
assert p.locale == ""
class TestProfileUpdateLocale:
def test_valid_locale_language_only(self) -> None:
p = ProfileUpdate(locale="en")
assert p.locale == "en"
def test_valid_locale_language_region(self) -> None:
p = ProfileUpdate(locale="sv-SE")
assert p.locale == "sv-SE"
def test_valid_locale_language_script_region(self) -> None:
p = ProfileUpdate(locale="zh-Hans-CN")
assert p.locale == "zh-Hans-CN"
def test_valid_locale_three_letter(self) -> None:
p = ProfileUpdate(locale="gsw")
assert p.locale == "gsw"
def test_empty_locale_allowed(self) -> None:
p = ProfileUpdate(locale="")
assert p.locale == ""
def test_invalid_locale_rejected(self) -> None:
with pytest.raises(ValidationError, match="locale"):
ProfileUpdate(locale="not_a_locale!")
def test_invalid_locale_numbers(self) -> None:
with pytest.raises(ValidationError, match="locale"):
ProfileUpdate(locale="12345")
def test_whitespace_locale_becomes_empty(self) -> None:
p = ProfileUpdate(locale=" ")
assert p.locale == ""
class TestUsernameInput:
def test_valid_simple(self) -> None:
u = UsernameInput(username="alice")
assert u.username == "alice"
def test_valid_with_dots_dashes_underscores(self) -> None:
u = UsernameInput(username="alice.bob_charlie-1")
assert u.username == "alice.bob_charlie-1"
def test_valid_email_style(self) -> None:
u = UsernameInput(username="user@example.com")
assert u.username == "user@example.com"
def test_valid_uppercase(self) -> None:
u = UsernameInput(username="Alice")
assert u.username == "Alice"
def test_empty_rejected(self) -> None:
with pytest.raises(ValidationError, match="username"):
UsernameInput(username="")
def test_whitespace_only_rejected(self) -> None:
with pytest.raises(ValidationError, match="username"):
UsernameInput(username=" ")
def test_too_long_rejected(self) -> None:
with pytest.raises(ValidationError):
UsernameInput(username="a" * 256)
def test_spaces_rejected(self) -> None:
with pytest.raises(ValidationError, match="username"):
UsernameInput(username="alice bob")
def test_special_chars_rejected(self) -> None:
with pytest.raises(ValidationError, match="username"):
UsernameInput(username="alice<script>")
def test_strips_whitespace(self) -> None:
u = UsernameInput(username=" alice ")
assert u.username == "alice"
class TestGroupListInput:
def test_valid_single_group(self) -> None:
g = GroupListInput(groups="users")
assert g.group_list == ["users"]
def test_valid_multiple_groups(self) -> None:
g = GroupListInput(groups="users, admin, staff")
assert g.group_list == ["users", "admin", "staff"]
def test_empty_string_gives_empty_list(self) -> None:
g = GroupListInput(groups="")
assert g.group_list == []
def test_whitespace_only_gives_empty_list(self) -> None:
g = GroupListInput(groups=" , , ")
assert g.group_list == []
def test_valid_with_hyphens_underscores(self) -> None:
g = GroupListInput(groups="my-group, my_group")
assert g.group_list == ["my-group", "my_group"]
def test_invalid_group_name_spaces(self) -> None:
with pytest.raises(ValidationError, match="group"):
GroupListInput(groups="bad group")
def test_invalid_group_name_special_chars(self) -> None:
with pytest.raises(ValidationError, match="group"):
GroupListInput(groups="admin, bad!group")
def test_invalid_group_name_uppercase(self) -> None:
with pytest.raises(ValidationError, match="group"):
GroupListInput(groups="Admin")
def test_group_name_too_long(self) -> None:
with pytest.raises(ValidationError, match="group"):
GroupListInput(groups="a" * 65)
def test_deduplicates(self) -> None:
g = GroupListInput(groups="users, users, admin")
assert g.group_list == ["users", "admin"]
class TestPasswordSet:
def test_valid_password(self) -> None:
p = PasswordSet(password="strongP@ss123", confirm="strongP@ss123")
assert p.password == "strongP@ss123"
def test_mismatch_rejected(self) -> None:
with pytest.raises(ValidationError, match="match"):
PasswordSet(password="strongP@ss123", confirm="different")
def test_too_short_rejected(self) -> None:
with pytest.raises(ValidationError):
PasswordSet(password="short", confirm="short")
def test_too_long_rejected(self) -> None:
long_pw = "a" * 257
with pytest.raises(ValidationError):
PasswordSet(password=long_pw, confirm=long_pw)
def test_weak_password_rejected(self) -> None:
with pytest.raises(ValidationError, match=r"[Ww]eak\b|[Ss]trength|easily guessed"):
PasswordSet(password="password", confirm="password")
def test_common_password_rejected(self) -> None:
with pytest.raises(ValidationError, match=r"[Ww]eak\b|[Ss]trength|easily guessed"):
PasswordSet(password="12345678", confirm="12345678")
class TestPasswordChange:
def test_valid_change(self) -> None:
p = PasswordChange(
current_password="oldPassword1",
password="newStrongP@ss99",
confirm="newStrongP@ss99",
)
assert p.current_password == "oldPassword1"
assert p.password == "newStrongP@ss99"
def test_missing_current_password(self) -> None:
with pytest.raises(ValidationError, match="current"):
PasswordChange(
current_password="",
password="newStrongP@ss99",
confirm="newStrongP@ss99",
)
class TestFormatValidationErrors:
def test_single_error_no_list(self) -> None:
try:
ProfileUpdate(email="bad")
except ValidationError as exc:
result = format_validation_errors(exc)
assert '<div role="alert">' in result
assert "<ul>" not in result
assert "Email" in result
def test_multiple_errors_uses_list(self) -> None:
try:
ProfileUpdate(email="bad", phone_number="bad")
except ValidationError as exc:
result = format_validation_errors(exc)
assert '<div role="alert">' in result
assert "<ul>" in result
assert "<li>" in result
assert "Email" in result
assert "Phone" in result
def test_value_error_strips_prefix(self) -> None:
try:
ProfileUpdate(picture="ftp://bad.url")
except ValidationError as exc:
result = format_validation_errors(exc)
assert "Picture URL must be a valid HTTP or HTTPS URL" in result
assert "Value error" not in result