diff --git a/ruff.toml b/ruff.toml
deleted file mode 100644
index a0fdc61..0000000
--- a/ruff.toml
+++ /dev/null
@@ -1,61 +0,0 @@
-# Set the maximum line length to 120.
-line-length = 120
-target-version = "py313"
-
-[lint]
-select = [
- "A",
- "ANN",
- "ASYNC",
- "B",
- "C4",
- "DTZ",
- "E",
- "ERA",
- "F",
- "FAST",
- "FLY",
- "FURB",
- "I",
- "ISC",
- "PERF",
- "PGH",
- "PIE",
- "PL",
- "PT",
- "RUF",
- "SIM",
- "UP",
- "W",
-]
-
-ignore = [
- "SIM102", # collapsible-if
- "SIM103", # return-bool-condition-directly (keeping explicit if/else for clarity)
- "SIM108", # if-else-block-instead-of-if-exp
-
- # Since we use ruff as a formatter, the following rules should be ignored
- # See: https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
- "W191",
- "E111",
- "E114",
- "E117",
- "E501",
- "D206",
- "D300",
- "Q000",
- "Q001",
- "Q002",
- "Q003",
- "COM812",
- "COM819",
- "ISC002",
-]
-
-allowed-confusables = ["’", "х"]
-
-[lint.flake8-annotations]
-allow-star-arg-any = true
-
-[lint.per-file-ignores]
-"**/test_*.py" = ["PLR2004"] # magic-value-comparison - allow magic numbers in tests
diff --git a/src/porchlight/admin/routes.py b/src/porchlight/admin/routes.py
index 4bcc0a8..b6692cd 100644
--- a/src/porchlight/admin/routes.py
+++ b/src/porchlight/admin/routes.py
@@ -1,5 +1,4 @@
from base64 import urlsafe_b64decode
-from typing import Annotated
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, RedirectResponse
@@ -7,7 +6,7 @@ from pydantic import ValidationError
from porchlight.dependencies import get_session_user
from porchlight.models import User
-from porchlight.validation import GroupListInput, ProfileUpdate, UsernameInput, format_validation_errors
+from porchlight.validation import ProfileUpdate
router = APIRouter(prefix="/admin", tags=["admin"])
@@ -100,7 +99,7 @@ async def user_detail(request: Request, userid: str) -> Response:
@router.post("/invite", response_class=HTMLResponse)
async def create_invite(
request: Request,
- username: Annotated[str, Form()],
+ username: str = Form(),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@@ -110,19 +109,17 @@ async def create_invite(
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
- try:
- validated = UsernameInput(username=username)
- except ValidationError as exc:
- return HTMLResponse(format_validation_errors(exc))
+ username = username.strip()
+ if not username:
+ return HTMLResponse('
Username is required
')
magic_link_service = request.app.state.magic_link_service
settings = request.app.state.settings
- link = await magic_link_service.create(username=validated.username, created_by=admin.username, note="admin invite")
+ link = await magic_link_service.create(username=username, created_by=admin.username, note="admin invite")
url = f"{settings.issuer}/register/{link.token}"
return HTMLResponse(
- f'Invite created for {validated.username}:
'
- f'{url}
'
+ f'Invite created for {username}:
{url}
'
)
@@ -131,6 +128,13 @@ async def create_invite(
async def update_user_profile(
request: Request,
userid: str,
+ given_name: str = Form(""),
+ family_name: str = Form(""),
+ preferred_username: str = Form(""),
+ email: str = Form(""),
+ phone_number: str = Form(""),
+ picture: str = Form(""),
+ locale: str = Form(""),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@@ -140,19 +144,32 @@ async def update_user_profile(
return HTMLResponse("Forbidden", status_code=403)
# Validate
- form = await request.form()
try:
profile = ProfileUpdate(
- given_name=str(form.get("given_name", "")),
- family_name=str(form.get("family_name", "")),
- preferred_username=str(form.get("preferred_username", "")),
- email=str(form.get("email", "")),
- phone_number=str(form.get("phone_number", "")),
- picture=str(form.get("picture", "")),
- locale=str(form.get("locale", "")),
+ given_name=given_name,
+ family_name=family_name,
+ preferred_username=preferred_username,
+ email=email,
+ phone_number=phone_number,
+ picture=picture,
+ locale=locale,
)
except ValidationError as exc:
- return HTMLResponse(format_validation_errors(exc))
+ error = exc.errors()[0]
+ field = error["loc"][-1] if error["loc"] else "input"
+ msg = error["msg"]
+ labels = {
+ "given_name": "Given name",
+ "family_name": "Family name",
+ "preferred_username": "Display name",
+ "email": "Email",
+ "phone_number": "Phone number",
+ "picture": "Picture URL",
+ "locale": "Locale",
+ }
+ label = labels.get(str(field), str(field))
+ display_msg = msg.removeprefix("Value error, ") if error["type"] == "value_error" else f"{label}: {msg}"
+ return HTMLResponse(f'{display_msg}
')
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
@@ -179,7 +196,7 @@ async def update_user_profile(
async def update_user_groups(
request: Request,
userid: str,
- groups: Annotated[str, Form()] = "",
+ groups: str = Form(""),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@@ -188,17 +205,13 @@ async def update_user_groups(
if admin is None:
return HTMLResponse("Forbidden", status_code=403)
- try:
- validated = GroupListInput(groups=groups)
- except ValidationError as exc:
- return HTMLResponse(format_validation_errors(exc))
-
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
if user is None:
return HTMLResponse("User not found", status_code=404)
- updated = user.model_copy(update={"groups": validated.group_list})
+ group_list = [g.strip() for g in groups.split(",") if g.strip()]
+ updated = user.model_copy(update={"groups": group_list})
await user_repo.update(updated)
return HTMLResponse('Groups updated
')
@@ -340,7 +353,7 @@ async def delete_user(request: Request, userid: str) -> Response:
return HTMLResponse("Forbidden", status_code=403)
# Prevent self-deletion
- admin_userid, _ = session_user
+ admin_userid, _ = get_session_user(request)
if userid == admin_userid:
return HTMLResponse('Cannot delete your own account
')
diff --git a/src/porchlight/app.py b/src/porchlight/app.py
index 91bb490..acf3a70 100644
--- a/src/porchlight/app.py
+++ b/src/porchlight/app.py
@@ -8,11 +8,8 @@ from urllib.parse import urlparse
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
-from slowapi.errors import RateLimitExceeded
from starlette.middleware.sessions import SessionMiddleware
from starlette.requests import Request
-from starlette.responses import HTMLResponse as StarletteHTMLResponse
-from starlette.responses import Response
from porchlight.admin.routes import router as admin_router
from porchlight.authn.password import PasswordService
@@ -24,7 +21,6 @@ from porchlight.invite.service import MagicLinkService
from porchlight.manage.routes import router as manage_router
from porchlight.oidc.endpoints import router as oidc_router
from porchlight.oidc.provider import create_oidc_server
-from porchlight.rate_limit import limiter
from porchlight.store.sqlite.db import open_db
from porchlight.store.sqlite.repositories import (
SQLiteConsentRepository,
@@ -127,16 +123,6 @@ def create_app(settings: Settings | None = None) -> FastAPI:
https_only=settings.session_https_only,
)
- # Rate limiting
- app.state.limiter = limiter
-
- @app.exception_handler(RateLimitExceeded)
- async def rate_limit_handler(request: Request, exc: RateLimitExceeded) -> StarletteHTMLResponse:
- return StarletteHTMLResponse(
- 'Too many attempts. Please try again later.
',
- status_code=429,
- )
-
# Templates
templates = Jinja2Templates(directory=str(PACKAGE_DIR / "templates"))
@@ -161,7 +147,7 @@ def create_app(settings: Settings | None = None) -> FastAPI:
return {"status": "ok"}
@app.get("/")
- async def landing(request: Request) -> Response:
+ async def landing(request: Request): # type: ignore[no-untyped-def]
return templates.TemplateResponse(request, "index.html")
return app
diff --git a/src/porchlight/authn/routes.py b/src/porchlight/authn/routes.py
index c710f3f..2ef2cea 100644
--- a/src/porchlight/authn/routes.py
+++ b/src/porchlight/authn/routes.py
@@ -1,12 +1,10 @@
from base64 import urlsafe_b64decode
-from typing import Annotated
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fido2.webauthn import AttestedCredentialData, AuthenticationResponse
from porchlight.models import User
-from porchlight.rate_limit import limiter
from porchlight.userid import generate_unique_userid
router = APIRouter(tags=["authn"])
@@ -30,11 +28,10 @@ async def login_page(request: Request) -> HTMLResponse:
@router.post("/login/password", response_class=HTMLResponse)
-@limiter.limit("5/minute")
async def login_password(
request: Request,
- username: Annotated[str, Form()],
- password: Annotated[str, Form()],
+ username: str = Form(),
+ password: str = Form(),
) -> Response:
user_repo = request.app.state.user_repo
cred_repo = request.app.state.credential_repo
@@ -53,9 +50,6 @@ async def login_password(
if not password_service.verify(credential.password_hash, password):
return HTMLResponse(error_html)
- if not user.active:
- return HTMLResponse(error_html)
-
request.session["userid"] = user.userid
request.session["username"] = user.username
@@ -83,8 +77,6 @@ async def register_magic_link(request: Request, token: str) -> Response:
existing_user = await user_repo.get_by_username(link.username)
if existing_user is not None:
- if not existing_user.active:
- return HTMLResponse("This account has been deactivated.
", status_code=400)
user = existing_user
else:
userid = await generate_unique_userid(user_repo)
@@ -110,7 +102,6 @@ async def login_webauthn_begin(request: Request) -> Response:
@router.post("/login/webauthn/complete")
-@limiter.limit("10/minute")
async def login_webauthn_complete(request: Request) -> Response:
webauthn_service = request.app.state.webauthn_service
user_repo = request.app.state.user_repo
@@ -153,8 +144,8 @@ async def login_webauthn_complete(request: Request) -> Response:
await cred_repo.update_webauthn(stored)
user = await user_repo.get_by_userid(userid)
- if user is None or not user.active:
- return JSONResponse({"error": "Authentication failed"}, status_code=400)
+ if user is None:
+ return JSONResponse({"error": "User not found"}, status_code=400)
request.session["userid"] = user.userid
request.session["username"] = user.username
diff --git a/src/porchlight/csrf.py b/src/porchlight/csrf.py
index f9b8a68..630c723 100644
--- a/src/porchlight/csrf.py
+++ b/src/porchlight/csrf.py
@@ -61,7 +61,7 @@ class CSRFMiddleware:
# Origin check (defense-in-depth)
if self.check_origin is not None:
origin = request.headers.get("origin")
- if origin is not None and origin not in ("null", self.check_origin):
+ if origin is not None and origin != "null" and origin != self.check_origin:
logger.warning("CSRF origin mismatch: expected %s, got %s", self.check_origin, origin)
response = HTMLResponse(
"403 Forbidden
Origin mismatch
",
diff --git a/src/porchlight/manage/routes.py b/src/porchlight/manage/routes.py
index 69afdff..a3c4887 100644
--- a/src/porchlight/manage/routes.py
+++ b/src/porchlight/manage/routes.py
@@ -1,5 +1,4 @@
from base64 import urlsafe_b64decode
-from typing import Annotated
from fastapi import APIRouter, Form, Request, Response
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
@@ -8,7 +7,7 @@ from pydantic import ValidationError
from porchlight.dependencies import get_session_user
from porchlight.models import PasswordCredential, WebAuthnCredential
-from porchlight.validation import PasswordChange, PasswordSet, ProfileUpdate, format_validation_errors
+from porchlight.validation import ProfileUpdate
router = APIRouter(prefix="/manage", tags=["manage"])
@@ -54,9 +53,8 @@ async def credentials_page(request: Request) -> Response:
@router.post("/credentials/password", response_class=HTMLResponse)
async def set_password(
request: Request,
- password: Annotated[str, Form()],
- confirm: Annotated[str, Form()],
- current_password: Annotated[str, Form()] = "",
+ password: str = Form(),
+ confirm: str = Form(),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@@ -66,30 +64,15 @@ async def set_password(
cred_repo = request.app.state.credential_repo
password_service = request.app.state.password_service
+ if password != confirm:
+ return HTMLResponse('Passwords do not match
')
+
+ if len(password) < 8:
+ return HTMLResponse('Password must be at least 8 characters
')
+
+ password_hash = password_service.hash(password)
+
existing = await cred_repo.get_password_by_user(userid)
- has_password = existing is not None
-
- # Validate input
- try:
- if has_password:
- validated = PasswordChange(
- current_password=current_password,
- password=password,
- confirm=confirm,
- )
- else:
- validated = PasswordSet(password=password, confirm=confirm)
- except ValidationError as exc:
- return HTMLResponse(format_validation_errors(exc))
-
- # Verify current password if changing
- if has_password and isinstance(validated, PasswordChange):
- if not password_service.verify(existing.password_hash, validated.current_password):
- return HTMLResponse('Current password is incorrect
')
-
- # Store new password
- password_hash = password_service.hash(validated.password)
-
if existing is not None:
await cred_repo.delete_password(userid)
@@ -217,6 +200,13 @@ async def profile_page(request: Request) -> Response:
@router.post("/profile", response_class=HTMLResponse)
async def update_profile(
request: Request,
+ given_name: str = Form(""),
+ family_name: str = Form(""),
+ preferred_username: str = Form(""),
+ email: str = Form(""),
+ phone_number: str = Form(""),
+ picture: str = Form(""),
+ locale: str = Form(""),
) -> Response:
session_user = get_session_user(request)
if session_user is None:
@@ -224,19 +214,34 @@ async def update_profile(
userid, _username = session_user
- form = await request.form()
try:
profile = ProfileUpdate(
- given_name=str(form.get("given_name", "")),
- family_name=str(form.get("family_name", "")),
- preferred_username=str(form.get("preferred_username", "")),
- email=str(form.get("email", "")),
- phone_number=str(form.get("phone_number", "")),
- picture=str(form.get("picture", "")),
- locale=str(form.get("locale", "")),
+ given_name=given_name,
+ family_name=family_name,
+ preferred_username=preferred_username,
+ email=email,
+ phone_number=phone_number,
+ picture=picture,
+ locale=locale,
)
except ValidationError as exc:
- return HTMLResponse(format_validation_errors(exc))
+ error = exc.errors()[0]
+ field = error["loc"][-1] if error["loc"] else "input"
+ msg = error["msg"]
+ # Produce user-friendly field labels
+ labels = {
+ "given_name": "Given name",
+ "family_name": "Family name",
+ "preferred_username": "Display name",
+ "email": "Email",
+ "phone_number": "Phone number",
+ "picture": "Picture URL",
+ "locale": "Locale",
+ }
+ label = labels.get(str(field), str(field))
+ # Use custom message for value errors (e.g. picture URL), generic pydantic message otherwise
+ display_msg = msg.removeprefix("Value error, ") if error["type"] == "value_error" else f"{label}: {msg}"
+ return HTMLResponse(f'{display_msg}
')
user_repo = request.app.state.user_repo
user = await user_repo.get_by_userid(userid)
diff --git a/src/porchlight/oidc/claims.py b/src/porchlight/oidc/claims.py
index edf2373..45987c0 100644
--- a/src/porchlight/oidc/claims.py
+++ b/src/porchlight/oidc/claims.py
@@ -1,7 +1,5 @@
"""OIDC claims mapping and UserInfo source."""
-from typing import Any
-
from idpyoidc.server.user_info import UserInfo
from porchlight.models import User
@@ -30,7 +28,9 @@ def user_to_claims(user: User) -> dict:
"locale": user.locale,
}
- claims.update({claim_name: value for claim_name, value in optional_fields.items() if value is not None})
+ for claim_name, value in optional_fields.items():
+ if value is not None:
+ claims[claim_name] = value
# updated_at as Unix timestamp (OIDC spec requires number)
if user.updated_at:
@@ -46,7 +46,7 @@ class PorchlightUserInfo(UserInfo):
idpyoidc calls __call__() synchronously to look up claims.
"""
- def __init__(self, **kwargs: Any) -> None:
+ def __init__(self, **kwargs) -> None:
super().__init__(db={}, **kwargs)
def set_user_claims(self, user_id: str, claims: dict) -> None:
diff --git a/src/porchlight/oidc/endpoints.py b/src/porchlight/oidc/endpoints.py
index 67d4968..ee221ee 100644
--- a/src/porchlight/oidc/endpoints.py
+++ b/src/porchlight/oidc/endpoints.py
@@ -106,7 +106,7 @@ async def authorization_complete(request: Request) -> Response:
)
-async def _check_consent_or_complete( # noqa: PLR0913
+async def _check_consent_or_complete(
request: Request,
oidc_server: object,
endpoint: object,
@@ -137,7 +137,7 @@ async def _check_consent_or_complete( # noqa: PLR0913
return RedirectResponse("/consent", status_code=303)
-async def _complete_authorization( # noqa: PLR0913
+async def _complete_authorization(
request: Request,
oidc_server: object,
endpoint: object,
@@ -332,10 +332,11 @@ async def consent_submit(request: Request) -> Response:
redirect_uri = auth_params.get("redirect_uri", "")
state = auth_params.get("state", "")
+ if action == "deny":
+ params = urlencode({"error": "access_denied", "state": state})
+ return RedirectResponse(f"{redirect_uri}?{params}", status_code=303)
+
if action != "allow":
- if action == "deny":
- params = urlencode({"error": "access_denied", "state": state})
- return RedirectResponse(f"{redirect_uri}?{params}", status_code=303)
return HTMLResponse("Error
Invalid action
", status_code=400)
# Allow — collect approved scopes
@@ -356,9 +357,11 @@ async def consent_submit(request: Request) -> Response:
try:
parsed = endpoint.parse_request(auth_params)
- if "error" in parsed:
- raise ValueError(parsed.get("error_description", parsed["error"]))
except Exception as exc:
return HTMLResponse(f"Error
{exc}
", status_code=400)
+ if "error" in parsed:
+ error_desc = parsed.get("error_description", parsed["error"])
+ return HTMLResponse(f"Error
{error_desc}
", status_code=400)
+
return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username)
diff --git a/src/porchlight/rate_limit.py b/src/porchlight/rate_limit.py
deleted file mode 100644
index 38404a8..0000000
--- a/src/porchlight/rate_limit.py
+++ /dev/null
@@ -1,4 +0,0 @@
-from slowapi import Limiter
-from slowapi.util import get_remote_address
-
-limiter = Limiter(key_func=get_remote_address)
diff --git a/src/porchlight/store/sqlite/migrations.py b/src/porchlight/store/sqlite/migrations.py
index 2bfbb32..afc5628 100644
--- a/src/porchlight/store/sqlite/migrations.py
+++ b/src/porchlight/store/sqlite/migrations.py
@@ -1,13 +1,11 @@
from pathlib import Path
import aiosqlite
-import anyio
async def run_migrations(db: aiosqlite.Connection, migrations_dir: Path) -> int:
"""Apply unapplied SQL migration files in order. Returns count of newly applied migrations."""
- async_dir = anyio.Path(migrations_dir)
- if not await async_dir.is_dir():
+ if not migrations_dir.is_dir():
raise FileNotFoundError(f"Migrations directory not found: {migrations_dir}")
await db.execute(
@@ -24,22 +22,19 @@ async def run_migrations(db: aiosqlite.Connection, migrations_dir: Path) -> int:
async with db.execute("SELECT filename FROM _migrations") as cursor:
applied = {row[0] async for row in cursor}
- migration_files = sorted(
- [f async for f in async_dir.iterdir() if f.suffix == ".sql"],
- key=lambda f: f.name,
- )
+ migration_files = sorted(migrations_dir.glob("*.sql"))
count = 0
for migration_file in migration_files:
if migration_file.name in applied:
continue
- sql = await migration_file.read_text(encoding="utf-8")
+ sql = migration_file.read_text(encoding="utf-8")
await db.execute("BEGIN")
try:
for statement in sql.split(";"):
- cleaned = statement.strip()
- if cleaned:
- await db.execute(cleaned)
+ statement = statement.strip()
+ if statement:
+ await db.execute(statement)
await db.execute(
"INSERT INTO _migrations (filename) VALUES (?)",
(migration_file.name,),
diff --git a/src/porchlight/templates/admin/user_detail.html b/src/porchlight/templates/admin/user_detail.html
index 99138db..f65717e 100644
--- a/src/porchlight/templates/admin/user_detail.html
+++ b/src/porchlight/templates/admin/user_detail.html
@@ -11,7 +11,6 @@