diff --git a/ruff.toml b/ruff.toml
new file mode 100644
index 0000000..a0fdc61
--- /dev/null
+++ b/ruff.toml
@@ -0,0 +1,61 @@
+# Set the maximum line length to 120.
+line-length = 120
+target-version = "py313"
+
+[lint]
+select = [
+ "A",
+ "ANN",
+ "ASYNC",
+ "B",
+ "C4",
+ "DTZ",
+ "E",
+ "ERA",
+ "F",
+ "FAST",
+ "FLY",
+ "FURB",
+ "I",
+ "ISC",
+ "PERF",
+ "PGH",
+ "PIE",
+ "PL",
+ "PT",
+ "RUF",
+ "SIM",
+ "UP",
+ "W",
+]
+
+ignore = [
+ "SIM102", # collapsible-if
+ "SIM103", # return-bool-condition-directly (keeping explicit if/else for clarity)
+ "SIM108", # if-else-block-instead-of-if-exp
+
+ # Since we use ruff as a formatter, the following rules should be ignored
+ # See: https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
+ "W191",
+ "E111",
+ "E114",
+ "E117",
+ "E501",
+ "D206",
+ "D300",
+ "Q000",
+ "Q001",
+ "Q002",
+ "Q003",
+ "COM812",
+ "COM819",
+ "ISC002",
+]
+
+allowed-confusables = ["’", "х"]
+
+[lint.flake8-annotations]
+allow-star-arg-any = true
+
+[lint.per-file-ignores]
+"**/test_*.py" = ["PLR2004"] # magic-value-comparison - allow magic numbers in tests
diff --git a/src/porchlight/admin/routes.py b/src/porchlight/admin/routes.py
index b6692cd..4bcc0a8 100644
--- a/src/porchlight/admin/routes.py
+++ b/src/porchlight/admin/routes.py
@@ -1,4 +1,5 @@
from base64 import urlsafe_b64decode
+from typing import Annotated
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, RedirectResponse
@@ -6,7 +7,7 @@ from pydantic import ValidationError
from porchlight.dependencies import get_session_user
from porchlight.models import User
-from porchlight.validation import ProfileUpdate
+from porchlight.validation import GroupListInput, ProfileUpdate, UsernameInput, format_validation_errors
router = APIRouter(prefix="/admin", tags=["admin"])
@@ -99,7 +100,7 @@ async def user_detail(request: Request, userid: str) -> Response:
@router.post("/invite", response_class=HTMLResponse)
async def create_invite(
request: Request,
- username: str = Form(),
+ username: Annotated[str, Form()],
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@@ -109,17 +110,19 @@ async def create_invite(
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
- username = username.strip()
- if not username:
- return HTMLResponse('
Username is required
')
+ try:
+ validated = UsernameInput(username=username)
+ except ValidationError as exc:
+ return HTMLResponse(format_validation_errors(exc))
magic_link_service = request.app.state.magic_link_service
settings = request.app.state.settings
- link = await magic_link_service.create(username=username, created_by=admin.username, note="admin invite")
+ link = await magic_link_service.create(username=validated.username, created_by=admin.username, note="admin invite")
url = f"{settings.issuer}/register/{link.token}"
return HTMLResponse(
- f'Invite created for {username}:
{url}
'
+ f'Invite created for {validated.username}:
'
+ f'{url}
'
)
@@ -128,13 +131,6 @@ async def create_invite(
async def update_user_profile(
request: Request,
userid: str,
- given_name: str = Form(""),
- family_name: str = Form(""),
- preferred_username: str = Form(""),
- email: str = Form(""),
- phone_number: str = Form(""),
- picture: str = Form(""),
- locale: str = Form(""),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@@ -144,32 +140,19 @@ async def update_user_profile(
return HTMLResponse("Forbidden", status_code=403)
# Validate
+ form = await request.form()
try:
profile = ProfileUpdate(
- given_name=given_name,
- family_name=family_name,
- preferred_username=preferred_username,
- email=email,
- phone_number=phone_number,
- picture=picture,
- locale=locale,
+ given_name=str(form.get("given_name", "")),
+ family_name=str(form.get("family_name", "")),
+ preferred_username=str(form.get("preferred_username", "")),
+ email=str(form.get("email", "")),
+ phone_number=str(form.get("phone_number", "")),
+ picture=str(form.get("picture", "")),
+ locale=str(form.get("locale", "")),
)
except ValidationError as exc:
- error = exc.errors()[0]
- field = error["loc"][-1] if error["loc"] else "input"
- msg = error["msg"]
- labels = {
- "given_name": "Given name",
- "family_name": "Family name",
- "preferred_username": "Display name",
- "email": "Email",
- "phone_number": "Phone number",
- "picture": "Picture URL",
- "locale": "Locale",
- }
- label = labels.get(str(field), str(field))
- display_msg = msg.removeprefix("Value error, ") if error["type"] == "value_error" else f"{label}: {msg}"
- return HTMLResponse(f'{display_msg}
')
+ return HTMLResponse(format_validation_errors(exc))
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
@@ -196,7 +179,7 @@ async def update_user_profile(
async def update_user_groups(
request: Request,
userid: str,
- groups: str = Form(""),
+ groups: Annotated[str, Form()] = "",
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@@ -205,13 +188,17 @@ async def update_user_groups(
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
+ try:
+ validated = GroupListInput(groups=groups)
+ except ValidationError as exc:
+ return HTMLResponse(format_validation_errors(exc))
+
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
if user is None:
return HTMLResponse("User not found", status_code=404)
- group_list = [g.strip() for g in groups.split(",") if g.strip()]
- updated = user.model_copy(update={"groups": group_list})
+ updated = user.model_copy(update={"groups": validated.group_list})
await user_repo.update(updated)
return HTMLResponse('Groups updated
')
@@ -353,7 +340,7 @@ async def delete_user(request: Request, userid: str) -> Response:
return HTMLResponse("Forbidden", status_code=403)
# Prevent self-deletion
- admin_userid, _ = get_session_user(request)
+ admin_userid, _ = session_user
if userid == admin_userid:
return HTMLResponse('Cannot delete your own account
')
diff --git a/src/porchlight/app.py b/src/porchlight/app.py
index acf3a70..91bb490 100644
--- a/src/porchlight/app.py
+++ b/src/porchlight/app.py
@@ -8,8 +8,11 @@ from urllib.parse import urlparse
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
+from slowapi.errors import RateLimitExceeded
from starlette.middleware.sessions import SessionMiddleware
from starlette.requests import Request
+from starlette.responses import HTMLResponse as StarletteHTMLResponse
+from starlette.responses import Response
from porchlight.admin.routes import router as admin_router
from porchlight.authn.password import PasswordService
@@ -21,6 +24,7 @@ from porchlight.invite.service import MagicLinkService
from porchlight.manage.routes import router as manage_router
from porchlight.oidc.endpoints import router as oidc_router
from porchlight.oidc.provider import create_oidc_server
+from porchlight.rate_limit import limiter
from porchlight.store.sqlite.db import open_db
from porchlight.store.sqlite.repositories import (
SQLiteConsentRepository,
@@ -123,6 +127,16 @@ def create_app(settings: Settings | None = None) -> FastAPI:
https_only=settings.session_https_only,
)
+ # Rate limiting
+ app.state.limiter = limiter
+
+ @app.exception_handler(RateLimitExceeded)
+ async def rate_limit_handler(request: Request, exc: RateLimitExceeded) -> StarletteHTMLResponse:
+ return StarletteHTMLResponse(
+ 'Too many attempts. Please try again later.
',
+ status_code=429,
+ )
+
# Templates
templates = Jinja2Templates(directory=str(PACKAGE_DIR / "templates"))
@@ -147,7 +161,7 @@ def create_app(settings: Settings | None = None) -> FastAPI:
return {"status": "ok"}
@app.get("/")
- async def landing(request: Request): # type: ignore[no-untyped-def]
+ async def landing(request: Request) -> Response:
return templates.TemplateResponse(request, "index.html")
return app
diff --git a/src/porchlight/authn/routes.py b/src/porchlight/authn/routes.py
index 2ef2cea..c710f3f 100644
--- a/src/porchlight/authn/routes.py
+++ b/src/porchlight/authn/routes.py
@@ -1,10 +1,12 @@
from base64 import urlsafe_b64decode
+from typing import Annotated
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fido2.webauthn import AttestedCredentialData, AuthenticationResponse
from porchlight.models import User
+from porchlight.rate_limit import limiter
from porchlight.userid import generate_unique_userid
router = APIRouter(tags=["authn"])
@@ -28,10 +30,11 @@ async def login_page(request: Request) -> HTMLResponse:
@router.post("/login/password", response_class=HTMLResponse)
+@limiter.limit("5/minute")
async def login_password(
request: Request,
- username: str = Form(),
- password: str = Form(),
+ username: Annotated[str, Form()],
+ password: Annotated[str, Form()],
) -> Response:
user_repo = request.app.state.user_repo
cred_repo = request.app.state.credential_repo
@@ -50,6 +53,9 @@ async def login_password(
if not password_service.verify(credential.password_hash, password):
return HTMLResponse(error_html)
+ if not user.active:
+ return HTMLResponse(error_html)
+
request.session["userid"] = user.userid
request.session["username"] = user.username
@@ -77,6 +83,8 @@ async def register_magic_link(request: Request, token: str) -> Response:
existing_user = await user_repo.get_by_username(link.username)
if existing_user is not None:
+ if not existing_user.active:
+ return HTMLResponse("This account has been deactivated.
", status_code=400)
user = existing_user
else:
userid = await generate_unique_userid(user_repo)
@@ -102,6 +110,7 @@ async def login_webauthn_begin(request: Request) -> Response:
@router.post("/login/webauthn/complete")
+@limiter.limit("10/minute")
async def login_webauthn_complete(request: Request) -> Response:
webauthn_service = request.app.state.webauthn_service
user_repo = request.app.state.user_repo
@@ -144,8 +153,8 @@ async def login_webauthn_complete(request: Request) -> Response:
await cred_repo.update_webauthn(stored)
user = await user_repo.get_by_userid(userid)
- if user is None:
- return JSONResponse({"error": "User not found"}, status_code=400)
+ if user is None or not user.active:
+ return JSONResponse({"error": "Authentication failed"}, status_code=400)
request.session["userid"] = user.userid
request.session["username"] = user.username
diff --git a/src/porchlight/csrf.py b/src/porchlight/csrf.py
index 630c723..f9b8a68 100644
--- a/src/porchlight/csrf.py
+++ b/src/porchlight/csrf.py
@@ -61,7 +61,7 @@ class CSRFMiddleware:
# Origin check (defense-in-depth)
if self.check_origin is not None:
origin = request.headers.get("origin")
- if origin is not None and origin != "null" and origin != self.check_origin:
+ if origin is not None and origin not in ("null", self.check_origin):
logger.warning("CSRF origin mismatch: expected %s, got %s", self.check_origin, origin)
response = HTMLResponse(
"403 Forbidden
Origin mismatch
",
diff --git a/src/porchlight/manage/routes.py b/src/porchlight/manage/routes.py
index a3c4887..69afdff 100644
--- a/src/porchlight/manage/routes.py
+++ b/src/porchlight/manage/routes.py
@@ -1,4 +1,5 @@
from base64 import urlsafe_b64decode
+from typing import Annotated
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
@@ -7,7 +8,7 @@ from pydantic import ValidationError
from porchlight.dependencies import get_session_user
from porchlight.models import PasswordCredential, WebAuthnCredential
-from porchlight.validation import ProfileUpdate
+from porchlight.validation import PasswordChange, PasswordSet, ProfileUpdate, format_validation_errors
router = APIRouter(prefix="/manage", tags=["manage"])
@@ -53,8 +54,9 @@ async def credentials_page(request: Request) -> Response:
@router.post("/credentials/password", response_class=HTMLResponse)
async def set_password(
request: Request,
- password: str = Form(),
- confirm: str = Form(),
+ password: Annotated[str, Form()],
+ confirm: Annotated[str, Form()],
+ current_password: Annotated[str, Form()] = "",
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@@ -64,15 +66,30 @@ async def set_password(
cred_repo = request.app.state.credential_repo
password_service = request.app.state.password_service
- if password != confirm:
- return HTMLResponse('Passwords do not match
')
-
- if len(password) < 8:
- return HTMLResponse('Password must be at least 8 characters
')
-
- password_hash = password_service.hash(password)
-
existing = await cred_repo.get_password_by_user(userid)
+ has_password = existing is not None
+
+ # Validate input
+ try:
+ if has_password:
+ validated = PasswordChange(
+ current_password=current_password,
+ password=password,
+ confirm=confirm,
+ )
+ else:
+ validated = PasswordSet(password=password, confirm=confirm)
+ except ValidationError as exc:
+ return HTMLResponse(format_validation_errors(exc))
+
+ # Verify current password if changing
+ if has_password and isinstance(validated, PasswordChange):
+ if not password_service.verify(existing.password_hash, validated.current_password):
+ return HTMLResponse('Current password is incorrect
')
+
+ # Store new password
+ password_hash = password_service.hash(validated.password)
+
if existing is not None:
await cred_repo.delete_password(userid)
@@ -200,13 +217,6 @@ async def profile_page(request: Request) -> Response:
@router.post("/profile", response_class=HTMLResponse)
async def update_profile(
request: Request,
- given_name: str = Form(""),
- family_name: str = Form(""),
- preferred_username: str = Form(""),
- email: str = Form(""),
- phone_number: str = Form(""),
- picture: str = Form(""),
- locale: str = Form(""),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@@ -214,34 +224,19 @@ async def update_profile(
userid, _username = session_user
+ form = await request.form()
try:
profile = ProfileUpdate(
- given_name=given_name,
- family_name=family_name,
- preferred_username=preferred_username,
- email=email,
- phone_number=phone_number,
- picture=picture,
- locale=locale,
+ given_name=str(form.get("given_name", "")),
+ family_name=str(form.get("family_name", "")),
+ preferred_username=str(form.get("preferred_username", "")),
+ email=str(form.get("email", "")),
+ phone_number=str(form.get("phone_number", "")),
+ picture=str(form.get("picture", "")),
+ locale=str(form.get("locale", "")),
)
except ValidationError as exc:
- error = exc.errors()[0]
- field = error["loc"][-1] if error["loc"] else "input"
- msg = error["msg"]
- # Produce user-friendly field labels
- labels = {
- "given_name": "Given name",
- "family_name": "Family name",
- "preferred_username": "Display name",
- "email": "Email",
- "phone_number": "Phone number",
- "picture": "Picture URL",
- "locale": "Locale",
- }
- label = labels.get(str(field), str(field))
- # Use custom message for value errors (e.g. picture URL), generic pydantic message otherwise
- display_msg = msg.removeprefix("Value error, ") if error["type"] == "value_error" else f"{label}: {msg}"
- return HTMLResponse(f'{display_msg}
')
+ return HTMLResponse(format_validation_errors(exc))
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
diff --git a/src/porchlight/oidc/claims.py b/src/porchlight/oidc/claims.py
index 45987c0..edf2373 100644
--- a/src/porchlight/oidc/claims.py
+++ b/src/porchlight/oidc/claims.py
@@ -1,5 +1,7 @@
"""OIDC claims mapping and UserInfo source."""
+from typing import Any
+
from idpyoidc.server.user_info import UserInfo
from porchlight.models import User
@@ -28,9 +30,7 @@ def user_to_claims(user: User) -> dict:
"locale": user.locale,
}
- for claim_name, value in optional_fields.items():
- if value is not None:
- claims[claim_name] = value
+ claims.update({claim_name: value for claim_name, value in optional_fields.items() if value is not None})
# updated_at as Unix timestamp (OIDC spec requires number)
if user.updated_at:
@@ -46,7 +46,7 @@ class PorchlightUserInfo(UserInfo):
idpyoidc calls __call__() synchronously to look up claims.
"""
- def __init__(self, **kwargs) -> None:
+ def __init__(self, **kwargs: Any) -> None:
super().__init__(db={}, **kwargs)
def set_user_claims(self, user_id: str, claims: dict) -> None:
diff --git a/src/porchlight/oidc/endpoints.py b/src/porchlight/oidc/endpoints.py
index ee221ee..67d4968 100644
--- a/src/porchlight/oidc/endpoints.py
+++ b/src/porchlight/oidc/endpoints.py
@@ -106,7 +106,7 @@ async def authorization_complete(request: Request) -> Response:
)
-async def _check_consent_or_complete(
+async def _check_consent_or_complete( # noqa: PLR0913
request: Request,
oidc_server: object,
endpoint: object,
@@ -137,7 +137,7 @@ async def _check_consent_or_complete(
return RedirectResponse("/consent", status_code=303)
-async def _complete_authorization(
+async def _complete_authorization( # noqa: PLR0913
request: Request,
oidc_server: object,
endpoint: object,
@@ -332,11 +332,10 @@ async def consent_submit(request: Request) -> Response:
redirect_uri = auth_params.get("redirect_uri", "")
state = auth_params.get("state", "")
- if action == "deny":
- params = urlencode({"error": "access_denied", "state": state})
- return RedirectResponse(f"{redirect_uri}?{params}", status_code=303)
-
if action != "allow":
+ if action == "deny":
+ params = urlencode({"error": "access_denied", "state": state})
+ return RedirectResponse(f"{redirect_uri}?{params}", status_code=303)
return HTMLResponse("Error
Invalid action
", status_code=400)
# Allow — collect approved scopes
@@ -357,11 +356,9 @@ async def consent_submit(request: Request) -> Response:
try:
parsed = endpoint.parse_request(auth_params)
+ if "error" in parsed:
+ raise ValueError(parsed.get("error_description", parsed["error"]))
except Exception as exc:
return HTMLResponse(f"Error
{exc}
", status_code=400)
- if "error" in parsed:
- error_desc = parsed.get("error_description", parsed["error"])
- return HTMLResponse(f"Error
{error_desc}
", status_code=400)
-
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username)
diff --git a/src/porchlight/rate_limit.py b/src/porchlight/rate_limit.py
new file mode 100644
index 0000000..38404a8
--- /dev/null
+++ b/src/porchlight/rate_limit.py
@@ -0,0 +1,4 @@
+from slowapi import Limiter
+from slowapi.util import get_remote_address
+
+limiter = Limiter(key_func=get_remote_address)
diff --git a/src/porchlight/store/sqlite/migrations.py b/src/porchlight/store/sqlite/migrations.py
index afc5628..2bfbb32 100644
--- a/src/porchlight/store/sqlite/migrations.py
+++ b/src/porchlight/store/sqlite/migrations.py
@@ -1,11 +1,13 @@
from pathlib import Path
import aiosqlite
+import anyio
async def run_migrations(db: aiosqlite.Connection, migrations_dir: Path) -> int:
"""Apply unapplied SQL migration files in order. Returns count of newly applied migrations."""
- if not migrations_dir.is_dir():
+ async_dir = anyio.Path(migrations_dir)
+ if not await async_dir.is_dir():
raise FileNotFoundError(f"Migrations directory not found: {migrations_dir}")
await db.execute(
@@ -22,19 +24,22 @@ async def run_migrations(db: aiosqlite.Connection, migrations_dir: Path) -> int:
async with db.execute("SELECT filename FROM _migrations") as cursor:
applied = {row[0] async for row in cursor}
- migration_files = sorted(migrations_dir.glob("*.sql"))
+ migration_files = sorted(
+ [f async for f in async_dir.iterdir() if f.suffix == ".sql"],
+ key=lambda f: f.name,
+ )
count = 0
for migration_file in migration_files:
if migration_file.name in applied:
continue
- sql = migration_file.read_text(encoding="utf-8")
+ sql = await migration_file.read_text(encoding="utf-8")
await db.execute("BEGIN")
try:
for statement in sql.split(";"):
- statement = statement.strip()
- if statement:
- await db.execute(statement)
+ cleaned = statement.strip()
+ if cleaned:
+ await db.execute(cleaned)
await db.execute(
"INSERT INTO _migrations (filename) VALUES (?)",
(migration_file.name,),
diff --git a/src/porchlight/templates/admin/user_detail.html b/src/porchlight/templates/admin/user_detail.html
index f65717e..99138db 100644
--- a/src/porchlight/templates/admin/user_detail.html
+++ b/src/porchlight/templates/admin/user_detail.html
@@ -11,6 +11,7 @@