From 01e3382aafe5e762cdce5e9331b8ac4b93621134 Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Tue, 31 Mar 2026 15:48:46 +0200 Subject: [PATCH] fix: resolve all ruff lint errors and type checker warnings - Use Annotated[str, Form()] for FastAPI dependencies (FAST002) - Add missing type annotations across src/ and tests/ (ANN001/003/201/202) - Reduce function arguments via request.form() reads (PLR0913) - Combine return paths to reduce return statements (PLR0911) - Use anyio.Path for async-safe filesystem operations (ASYNC240) - Extract constants, helpers, and dict comprehensions for clarity - Move inline imports to top-level (PLC0415) - Use raw strings for regex match patterns (RUF043) - Fix redundant get_session_user call in delete_user (not-iterable) Co-Authored-By: Claude Opus 4.6 (1M context) --- src/porchlight/admin/routes.py | 29 ++- src/porchlight/app.py | 3 +- src/porchlight/authn/routes.py | 10 +- src/porchlight/csrf.py | 2 +- src/porchlight/manage/routes.py | 29 ++- src/porchlight/oidc/claims.py | 8 +- src/porchlight/oidc/endpoints.py | 17 +- src/porchlight/store/sqlite/migrations.py | 17 +- src/porchlight/validation.py | 5 +- tests/e2e/setup_db.py | 179 +++++++++++------- tests/test_admin/test_admin_routes.py | 5 +- tests/test_app.py | 27 ++- tests/test_authn/test_webauthn.py | 3 +- tests/test_invite/test_service.py | 3 +- tests/test_oidc/test_consent_flow.py | 5 +- tests/test_oidc/test_e2e_flow.py | 89 +++++---- tests/test_oidc/test_provider.py | 3 +- tests/test_store/conftest.py | 3 +- tests/test_store/test_sqlite_consent_repo.py | 22 ++- .../test_store/test_sqlite_credential_repo.py | 2 +- .../test_store/test_sqlite_magic_link_repo.py | 3 +- tests/test_store/test_sqlite_user_repo.py | 4 +- tests/test_validation.py | 4 +- 23 files changed, 258 insertions(+), 214 deletions(-) diff --git a/src/porchlight/admin/routes.py b/src/porchlight/admin/routes.py index ef445c9..4bcc0a8 100644 --- a/src/porchlight/admin/routes.py +++ b/src/porchlight/admin/routes.py @@ -1,4 +1,5 @@ from base64 import urlsafe_b64decode +from typing import Annotated from fastapi import APIRouter, Form, Request, Response from fastapi.responses import HTMLResponse, RedirectResponse @@ -99,7 +100,7 @@ async def user_detail(request: Request, userid: str) -> Response: @router.post("/invite", response_class=HTMLResponse) async def create_invite( request: Request, - username: str = Form(), + username: Annotated[str, Form()], ) -> Response: session_user = get_session_user(request) if session_user is None: @@ -130,13 +131,6 @@ async def create_invite( async def update_user_profile( request: Request, userid: str, - given_name: str = Form(""), - family_name: str = Form(""), - preferred_username: str = Form(""), - email: str = Form(""), - phone_number: str = Form(""), - picture: str = Form(""), - locale: str = Form(""), ) -> Response: session_user = get_session_user(request) if session_user is None: @@ -146,15 +140,16 @@ async def update_user_profile( return HTMLResponse("Forbidden", status_code=403) # Validate + form = await request.form() try: profile = ProfileUpdate( - given_name=given_name, - family_name=family_name, - preferred_username=preferred_username, - email=email, - phone_number=phone_number, - picture=picture, - locale=locale, + given_name=str(form.get("given_name", "")), + family_name=str(form.get("family_name", "")), + preferred_username=str(form.get("preferred_username", "")), + email=str(form.get("email", "")), + phone_number=str(form.get("phone_number", "")), + picture=str(form.get("picture", "")), + locale=str(form.get("locale", "")), ) except ValidationError as exc: return HTMLResponse(format_validation_errors(exc)) @@ -184,7 +179,7 @@ async def update_user_profile( async def update_user_groups( request: Request, userid: str, - groups: str = Form(""), + groups: Annotated[str, Form()] = "", ) -> Response: session_user = get_session_user(request) if session_user is None: @@ -345,7 +340,7 @@ async def delete_user(request: Request, userid: str) -> Response: return HTMLResponse("Forbidden", status_code=403) # Prevent self-deletion - admin_userid, _ = get_session_user(request) + admin_userid, _ = session_user if userid == admin_userid: return HTMLResponse('
Cannot delete your own account
') diff --git a/src/porchlight/app.py b/src/porchlight/app.py index f0fb27e..91bb490 100644 --- a/src/porchlight/app.py +++ b/src/porchlight/app.py @@ -12,6 +12,7 @@ from slowapi.errors import RateLimitExceeded from starlette.middleware.sessions import SessionMiddleware from starlette.requests import Request from starlette.responses import HTMLResponse as StarletteHTMLResponse +from starlette.responses import Response from porchlight.admin.routes import router as admin_router from porchlight.authn.password import PasswordService @@ -160,7 +161,7 @@ def create_app(settings: Settings | None = None) -> FastAPI: return {"status": "ok"} @app.get("/") - async def landing(request: Request): # type: ignore[no-untyped-def] + async def landing(request: Request) -> Response: return templates.TemplateResponse(request, "index.html") return app diff --git a/src/porchlight/authn/routes.py b/src/porchlight/authn/routes.py index fa580e3..c710f3f 100644 --- a/src/porchlight/authn/routes.py +++ b/src/porchlight/authn/routes.py @@ -1,4 +1,5 @@ from base64 import urlsafe_b64decode +from typing import Annotated from fastapi import APIRouter, Form, Request, Response from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse @@ -32,8 +33,8 @@ async def login_page(request: Request) -> HTMLResponse: @limiter.limit("5/minute") async def login_password( request: Request, - username: str = Form(), - password: str = Form(), + username: Annotated[str, Form()], + password: Annotated[str, Form()], ) -> Response: user_repo = request.app.state.user_repo cred_repo = request.app.state.credential_repo @@ -152,10 +153,7 @@ async def login_webauthn_complete(request: Request) -> Response: await cred_repo.update_webauthn(stored) user = await user_repo.get_by_userid(userid) - if user is None: - return JSONResponse({"error": "User not found"}, status_code=400) - - if not user.active: + if user is None or not user.active: return JSONResponse({"error": "Authentication failed"}, status_code=400) request.session["userid"] = user.userid diff --git a/src/porchlight/csrf.py b/src/porchlight/csrf.py index 630c723..f9b8a68 100644 --- a/src/porchlight/csrf.py +++ b/src/porchlight/csrf.py @@ -61,7 +61,7 @@ class CSRFMiddleware: # Origin check (defense-in-depth) if self.check_origin is not None: origin = request.headers.get("origin") - if origin is not None and origin != "null" and origin != self.check_origin: + if origin is not None and origin not in ("null", self.check_origin): logger.warning("CSRF origin mismatch: expected %s, got %s", self.check_origin, origin) response = HTMLResponse( "

403 Forbidden

Origin mismatch

", diff --git a/src/porchlight/manage/routes.py b/src/porchlight/manage/routes.py index 8399c8e..69afdff 100644 --- a/src/porchlight/manage/routes.py +++ b/src/porchlight/manage/routes.py @@ -1,4 +1,5 @@ from base64 import urlsafe_b64decode +from typing import Annotated from fastapi import APIRouter, Form, Request, Response from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse @@ -53,9 +54,9 @@ async def credentials_page(request: Request) -> Response: @router.post("/credentials/password", response_class=HTMLResponse) async def set_password( request: Request, - password: str = Form(), - confirm: str = Form(), - current_password: str = Form(""), + password: Annotated[str, Form()], + confirm: Annotated[str, Form()], + current_password: Annotated[str, Form()] = "", ) -> Response: session_user = get_session_user(request) if session_user is None: @@ -216,13 +217,6 @@ async def profile_page(request: Request) -> Response: @router.post("/profile", response_class=HTMLResponse) async def update_profile( request: Request, - given_name: str = Form(""), - family_name: str = Form(""), - preferred_username: str = Form(""), - email: str = Form(""), - phone_number: str = Form(""), - picture: str = Form(""), - locale: str = Form(""), ) -> Response: session_user = get_session_user(request) if session_user is None: @@ -230,15 +224,16 @@ async def update_profile( userid, _username = session_user + form = await request.form() try: profile = ProfileUpdate( - given_name=given_name, - family_name=family_name, - preferred_username=preferred_username, - email=email, - phone_number=phone_number, - picture=picture, - locale=locale, + given_name=str(form.get("given_name", "")), + family_name=str(form.get("family_name", "")), + preferred_username=str(form.get("preferred_username", "")), + email=str(form.get("email", "")), + phone_number=str(form.get("phone_number", "")), + picture=str(form.get("picture", "")), + locale=str(form.get("locale", "")), ) except ValidationError as exc: return HTMLResponse(format_validation_errors(exc)) diff --git a/src/porchlight/oidc/claims.py b/src/porchlight/oidc/claims.py index 45987c0..edf2373 100644 --- a/src/porchlight/oidc/claims.py +++ b/src/porchlight/oidc/claims.py @@ -1,5 +1,7 @@ """OIDC claims mapping and UserInfo source.""" +from typing import Any + from idpyoidc.server.user_info import UserInfo from porchlight.models import User @@ -28,9 +30,7 @@ def user_to_claims(user: User) -> dict: "locale": user.locale, } - for claim_name, value in optional_fields.items(): - if value is not None: - claims[claim_name] = value + claims.update({claim_name: value for claim_name, value in optional_fields.items() if value is not None}) # updated_at as Unix timestamp (OIDC spec requires number) if user.updated_at: @@ -46,7 +46,7 @@ class PorchlightUserInfo(UserInfo): idpyoidc calls __call__() synchronously to look up claims. """ - def __init__(self, **kwargs) -> None: + def __init__(self, **kwargs: Any) -> None: super().__init__(db={}, **kwargs) def set_user_claims(self, user_id: str, claims: dict) -> None: diff --git a/src/porchlight/oidc/endpoints.py b/src/porchlight/oidc/endpoints.py index ee221ee..67d4968 100644 --- a/src/porchlight/oidc/endpoints.py +++ b/src/porchlight/oidc/endpoints.py @@ -106,7 +106,7 @@ async def authorization_complete(request: Request) -> Response: ) -async def _check_consent_or_complete( +async def _check_consent_or_complete( # noqa: PLR0913 request: Request, oidc_server: object, endpoint: object, @@ -137,7 +137,7 @@ async def _check_consent_or_complete( return RedirectResponse("/consent", status_code=303) -async def _complete_authorization( +async def _complete_authorization( # noqa: PLR0913 request: Request, oidc_server: object, endpoint: object, @@ -332,11 +332,10 @@ async def consent_submit(request: Request) -> Response: redirect_uri = auth_params.get("redirect_uri", "") state = auth_params.get("state", "") - if action == "deny": - params = urlencode({"error": "access_denied", "state": state}) - return RedirectResponse(f"{redirect_uri}?{params}", status_code=303) - if action != "allow": + if action == "deny": + params = urlencode({"error": "access_denied", "state": state}) + return RedirectResponse(f"{redirect_uri}?{params}", status_code=303) return HTMLResponse("

Error

Invalid action

", status_code=400) # Allow — collect approved scopes @@ -357,11 +356,9 @@ async def consent_submit(request: Request) -> Response: try: parsed = endpoint.parse_request(auth_params) + if "error" in parsed: + raise ValueError(parsed.get("error_description", parsed["error"])) except Exception as exc: return HTMLResponse(f"

Error

{exc}

", status_code=400) - if "error" in parsed: - error_desc = parsed.get("error_description", parsed["error"]) - return HTMLResponse(f"

Error

{error_desc}

", status_code=400) - return await _complete_authorization(request, oidc_server, endpoint, parsed, userid, username) diff --git a/src/porchlight/store/sqlite/migrations.py b/src/porchlight/store/sqlite/migrations.py index afc5628..2bfbb32 100644 --- a/src/porchlight/store/sqlite/migrations.py +++ b/src/porchlight/store/sqlite/migrations.py @@ -1,11 +1,13 @@ from pathlib import Path import aiosqlite +import anyio async def run_migrations(db: aiosqlite.Connection, migrations_dir: Path) -> int: """Apply unapplied SQL migration files in order. Returns count of newly applied migrations.""" - if not migrations_dir.is_dir(): + async_dir = anyio.Path(migrations_dir) + if not await async_dir.is_dir(): raise FileNotFoundError(f"Migrations directory not found: {migrations_dir}") await db.execute( @@ -22,19 +24,22 @@ async def run_migrations(db: aiosqlite.Connection, migrations_dir: Path) -> int: async with db.execute("SELECT filename FROM _migrations") as cursor: applied = {row[0] async for row in cursor} - migration_files = sorted(migrations_dir.glob("*.sql")) + migration_files = sorted( + [f async for f in async_dir.iterdir() if f.suffix == ".sql"], + key=lambda f: f.name, + ) count = 0 for migration_file in migration_files: if migration_file.name in applied: continue - sql = migration_file.read_text(encoding="utf-8") + sql = await migration_file.read_text(encoding="utf-8") await db.execute("BEGIN") try: for statement in sql.split(";"): - statement = statement.strip() - if statement: - await db.execute(statement) + cleaned = statement.strip() + if cleaned: + await db.execute(cleaned) await db.execute( "INSERT INTO _migrations (filename) VALUES (?)", (migration_file.name,), diff --git a/src/porchlight/validation.py b/src/porchlight/validation.py index ebee9d6..5d3b48c 100644 --- a/src/porchlight/validation.py +++ b/src/porchlight/validation.py @@ -98,6 +98,9 @@ class GroupListInput(BaseModel): return v +MIN_PASSWORD_STRENGTH = 2 + + class PasswordSet(BaseModel): password: str = Field(min_length=8, max_length=256) confirm: str @@ -107,7 +110,7 @@ class PasswordSet(BaseModel): if self.password != self.confirm: raise ValueError("Passwords do not match") result = zxcvbn(self.password) - if result["score"] < 2: + if result["score"] < MIN_PASSWORD_STRENGTH: feedback = result.get("feedback", {}) warning = feedback.get("warning", "") suggestions = feedback.get("suggestions", []) diff --git a/tests/e2e/setup_db.py b/tests/e2e/setup_db.py index bb43e8e..5dac7aa 100644 --- a/tests/e2e/setup_db.py +++ b/tests/e2e/setup_db.py @@ -24,6 +24,107 @@ from porchlight.store.sqlite.repositories import ( ) +async def _create_user_with_password( + user_repo: SQLiteUserRepository, + cred_repo: SQLiteCredentialRepository, + password_service: PasswordService, + user: User, + password: str, +) -> None: + """Helper to create a user and set their password credential.""" + await user_repo.create(user) + password_hash = password_service.hash(password) + await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=password_hash)) + + +async def _seed_test_users( + user_repo: SQLiteUserRepository, + cred_repo: SQLiteCredentialRepository, + password_service: PasswordService, + result: dict[str, str], +) -> None: + """Create all test users with passwords.""" + # Login test user + await _create_user_with_password( + user_repo, + cred_repo, + password_service, + User(userid="test-user-01", username="testuser", groups=["users"]), + "testpassword123", + ) + result["login_username"] = "testuser" + result["login_password"] = "testpassword123" + + # Credentials management test user + await _create_user_with_password( + user_repo, + cred_repo, + password_service, + User(userid="test-user-02", username="creduser", groups=["users"]), + "credpassword123", + ) + result["cred_username"] = "creduser" + result["cred_password"] = "credpassword123" + + # WebAuthn registration test user + await _create_user_with_password( + user_repo, + cred_repo, + password_service, + User(userid="test-user-03", username="webauthnuser", groups=["users"]), + "webauthnpass123", + ) + result["webauthn_username"] = "webauthnuser" + result["webauthn_password"] = "webauthnpass123" + result["webauthn_userid"] = "test-user-03" + + # Profile management test user + await _create_user_with_password( + user_repo, + cred_repo, + password_service, + User( + userid="test-user-04", + username="profileuser", + given_name="Alice", + family_name="Smith", + preferred_username="asmith", + email="alice@example.com", + phone_number="+12025551234", + picture="https://example.com/alice.jpg", + locale="en", + groups=["users"], + ), + "profilepass123", + ) + result["profile_username"] = "profileuser" + result["profile_password"] = "profilepass123" + + # Admin user for admin page tests + await _create_user_with_password( + user_repo, + cred_repo, + password_service, + User( + userid="test-user-05", + username="adminuser", + given_name="Admin", + family_name="User", + email="admin@example.com", + groups=["admin", "users"], + ), + "adminpass123", + ) + result["admin_username"] = "adminuser" + result["admin_password"] = "adminpass123" + result["admin_userid"] = "test-user-05" + + # Disposable user for admin delete test + await user_repo.create(User(userid="test-user-06", username="disposableuser", groups=["users"])) + result["disposable_userid"] = "test-user-06" + result["disposable_username"] = "disposableuser" + + async def seed() -> None: db_path = os.environ.get("OIDC_OP_SQLITE_PATH") if not db_path: @@ -39,89 +140,21 @@ async def seed() -> None: password_service = PasswordService() magic_link_service = MagicLinkService(repo=magic_link_repo) - result = {} + result: dict[str, str] = {} - # 1. Create a magic link for registration test + # Create magic link for registration test link = await magic_link_service.create(username="newuser") result["register_token"] = link.token result["register_username"] = "newuser" - # 2. Create a user with a password for login test - user = User(userid="test-user-01", username="testuser", groups=["users"]) - await user_repo.create(user) - password_hash = password_service.hash("testpassword123") - await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=password_hash)) - result["login_username"] = "testuser" - result["login_password"] = "testpassword123" + # Create all test users + await _seed_test_users(user_repo, cred_repo, password_service, result) - # 3. Create a separate user for credentials management test - cred_user = User(userid="test-user-02", username="creduser", groups=["users"]) - await user_repo.create(cred_user) - cred_password_hash = password_service.hash("credpassword123") - await cred_repo.create_password(PasswordCredential(user_id=cred_user.userid, password_hash=cred_password_hash)) - result["cred_username"] = "creduser" - result["cred_password"] = "credpassword123" - - # 5. Create a user with password for WebAuthn registration tests - # (login with password first, then register a passkey) - webauthn_user = User(userid="test-user-03", username="webauthnuser", groups=["users"]) - await user_repo.create(webauthn_user) - webauthn_password_hash = password_service.hash("webauthnpass123") - await cred_repo.create_password( - PasswordCredential(user_id=webauthn_user.userid, password_hash=webauthn_password_hash) - ) - result["webauthn_username"] = "webauthnuser" - result["webauthn_password"] = "webauthnpass123" - result["webauthn_userid"] = "test-user-03" - - # 4. Create an expired/used magic link for negative test + # Create an expired/used magic link for negative test expired_link = await magic_link_service.create(username="expired") await magic_link_service.mark_used(expired_link.token) result["used_token"] = expired_link.token - # 5. Create a user with profile data for profile management tests - profile_user = User( - userid="test-user-04", - username="profileuser", - given_name="Alice", - family_name="Smith", - preferred_username="asmith", - email="alice@example.com", - phone_number="+12025551234", - picture="https://example.com/alice.jpg", - locale="en", - groups=["users"], - ) - await user_repo.create(profile_user) - profile_password_hash = password_service.hash("profilepass123") - await cred_repo.create_password( - PasswordCredential(user_id=profile_user.userid, password_hash=profile_password_hash) - ) - result["profile_username"] = "profileuser" - result["profile_password"] = "profilepass123" - - # 6. Admin user for admin page tests - admin_user = User( - userid="test-user-05", - username="adminuser", - given_name="Admin", - family_name="User", - email="admin@example.com", - groups=["admin", "users"], - ) - await user_repo.create(admin_user) - admin_password_hash = password_service.hash("adminpass123") - await cred_repo.create_password(PasswordCredential(user_id=admin_user.userid, password_hash=admin_password_hash)) - result["admin_username"] = "adminuser" - result["admin_password"] = "adminpass123" - result["admin_userid"] = "test-user-05" - - # 7. Disposable user for admin delete test (not used by any other tests) - disposable_user = User(userid="test-user-06", username="disposableuser", groups=["users"]) - await user_repo.create(disposable_user) - result["disposable_userid"] = "test-user-06" - result["disposable_username"] = "disposableuser" - await db.commit() await db.close() print(json.dumps(result)) diff --git a/tests/test_admin/test_admin_routes.py b/tests/test_admin/test_admin_routes.py index b579a44..603af60 100644 --- a/tests/test_admin/test_admin_routes.py +++ b/tests/test_admin/test_admin_routes.py @@ -1,3 +1,4 @@ +from base64 import urlsafe_b64encode from datetime import UTC, datetime import pytest @@ -46,7 +47,7 @@ async def _login( ) -async def _create_target_user( +async def _create_target_user( # noqa: PLR0913 client: AsyncClient, *, userid: str = "target-user-01", @@ -365,8 +366,6 @@ async def test_delete_webauthn_credential(client: AsyncClient) -> None: ) # URL uses base64url without padding - from base64 import urlsafe_b64encode - credential_id_b64 = urlsafe_b64encode(credential_id).decode().rstrip("=") token = await get_csrf_token(client) diff --git a/tests/test_app.py b/tests/test_app.py index 779c417..82f1358 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -1,5 +1,18 @@ +from unittest.mock import MagicMock + from httpx import AsyncClient +from porchlight.dependencies import ( + get_credential_repo, + get_magic_link_repo, + get_user_repo, +) +from porchlight.store.protocols import ( + CredentialRepository, + MagicLinkRepository, + UserRepository, +) + async def test_health_endpoint(client: AsyncClient) -> None: response = await client.get("/health") @@ -16,12 +29,6 @@ async def test_app_has_title(client: AsyncClient) -> None: async def test_app_has_repos_on_state(client: AsyncClient) -> None: - from porchlight.store.protocols import ( - CredentialRepository, - MagicLinkRepository, - UserRepository, - ) - app = client._transport.app # type: ignore[union-attr] assert isinstance(app.state.user_repo, UserRepository) assert isinstance(app.state.credential_repo, CredentialRepository) @@ -41,14 +48,6 @@ async def test_landing_page(client: AsyncClient) -> None: async def test_dependency_functions() -> None: - from unittest.mock import MagicMock - - from porchlight.dependencies import ( - get_credential_repo, - get_magic_link_repo, - get_user_repo, - ) - request = MagicMock() request.app.state.user_repo = "user_repo_sentinel" request.app.state.credential_repo = "credential_repo_sentinel" diff --git a/tests/test_authn/test_webauthn.py b/tests/test_authn/test_webauthn.py index 207264a..9c3da22 100644 --- a/tests/test_authn/test_webauthn.py +++ b/tests/test_authn/test_webauthn.py @@ -1,5 +1,6 @@ import os +import pytest from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.hashes import SHA256 from fido2.cose import ES256 @@ -233,8 +234,6 @@ def test_complete_authentication_verifies_signature() -> None: def test_complete_authentication_wrong_signature_raises() -> None: - import pytest - service = _make_service() _private_key, credential_id, attested = _generate_credential() diff --git a/tests/test_invite/test_service.py b/tests/test_invite/test_service.py index f2036cc..6464fce 100644 --- a/tests/test_invite/test_service.py +++ b/tests/test_invite/test_service.py @@ -1,3 +1,4 @@ +from collections.abc import AsyncIterator from datetime import UTC, datetime, timedelta from pathlib import Path @@ -14,7 +15,7 @@ MIGRATIONS_DIR = ( @pytest.fixture -async def db(): +async def db() -> AsyncIterator[aiosqlite.Connection]: conn = await aiosqlite.connect(":memory:") conn.row_factory = aiosqlite.Row await conn.execute("PRAGMA foreign_keys=ON") diff --git a/tests/test_oidc/test_consent_flow.py b/tests/test_oidc/test_consent_flow.py index cc5ac9f..91d3d73 100644 --- a/tests/test_oidc/test_consent_flow.py +++ b/tests/test_oidc/test_consent_flow.py @@ -3,6 +3,7 @@ from datetime import UTC, datetime from urllib.parse import parse_qs, urlparse from argon2 import PasswordHasher +from fastapi import FastAPI from httpx import AsyncClient from porchlight.authn.password import PasswordService @@ -225,7 +226,7 @@ async def test_partial_consent_filters_scopes(client: AsyncClient) -> None: # -- Test helpers -- -def _register_test_rp(app) -> None: +def _register_test_rp(app: FastAPI) -> None: oidc_server = app.state.oidc_server if "consent-rp" in oidc_server.context.cdb: return @@ -244,7 +245,7 @@ def _register_test_rp(app) -> None: oidc_server.keyjar.add_symmetric(client_id, client_secret) -async def _create_test_user(app) -> None: +async def _create_test_user(app: FastAPI) -> None: user_repo = app.state.user_repo existing = await user_repo.get_by_username("consentuser") if existing: diff --git a/tests/test_oidc/test_e2e_flow.py b/tests/test_oidc/test_e2e_flow.py index def10ae..b1b9545 100644 --- a/tests/test_oidc/test_e2e_flow.py +++ b/tests/test_oidc/test_e2e_flow.py @@ -2,48 +2,41 @@ import json import secrets from base64 import b64encode from datetime import UTC, datetime +from typing import Any from urllib.parse import parse_qs, urlparse from argon2 import PasswordHasher from cryptojwt.jwk.jwk import key_from_jwk_dict from cryptojwt.jws.jws import JWS +from fastapi import FastAPI from httpx import AsyncClient from porchlight.authn.password import PasswordService from porchlight.models import PasswordCredential, User from tests.conftest import get_csrf_token +CLIENT_ID = "e2e-rp" +CLIENT_SECRET = "e2e-secret-0123456789abcdef" # 30+ chars +REDIRECT_URI = "http://localhost:9000/callback" -async def test_full_authorization_code_flow(client: AsyncClient) -> None: - """End-to-end test of the complete OIDC Authorization Code flow. - Exercises: discovery, client registration, authorization request, - password login, authorization completion, token exchange, ID token - validation (JWKS + JWT signature), and userinfo. - """ - # -- Setup: register RP client and create user -- - app = client._transport.app # type: ignore[union-attr] +async def _setup_rp_and_user(app: FastAPI) -> None: + """Register an RP client and create a test user with password.""" oidc_server = app.state.oidc_server user_repo = app.state.user_repo cred_repo = app.state.credential_repo - client_id = "e2e-rp" - client_secret = "e2e-secret-0123456789abcdef" # 30+ chars - redirect_uri = "http://localhost:9000/callback" - state = secrets.token_urlsafe(16) - nonce = secrets.token_urlsafe(16) - - oidc_server.context.cdb[client_id] = { - "client_id": client_id, - "client_secret": client_secret, - "redirect_uris": [(redirect_uri, {})], + oidc_server.context.cdb[CLIENT_ID] = { + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "redirect_uris": [(REDIRECT_URI, {})], "response_types_supported": ["code"], "token_endpoint_auth_method": "client_secret_basic", "scope": ["openid", "profile", "email"], "allowed_scopes": ["openid", "profile", "email"], "client_salt": secrets.token_hex(8), } - oidc_server.keyjar.add_symmetric(client_id, client_secret) + oidc_server.keyjar.add_symmetric(CLIENT_ID, CLIENT_SECRET) user = User( userid="lusab-bansen", @@ -60,13 +53,16 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None: svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192)) await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass"))) - # -- Step 1: Authorization request (unauthenticated) → redirect to /login -- + +async def _login_and_authorize(client: AsyncClient, state: str, nonce: str) -> str: + """Perform authorization request, login, consent, and return the auth code.""" + # Step 1: Authorization request (unauthenticated) -> redirect to /login auth_res = await client.get( "/authorization", params={ "response_type": "code", - "client_id": client_id, - "redirect_uri": redirect_uri, + "client_id": CLIENT_ID, + "redirect_uri": REDIRECT_URI, "scope": "openid profile email", "state": state, "nonce": nonce, @@ -76,7 +72,7 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None: assert auth_res.status_code in (302, 303), f"Expected redirect to /login, got {auth_res.status_code}" assert "/login" in auth_res.headers["location"] - # -- Step 2: Password login → HX-Redirect to /authorization/complete -- + # Step 2: Password login -> HX-Redirect to /authorization/complete token = await get_csrf_token(client) login_res = await client.post( "/login/password", @@ -89,14 +85,14 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None: f"Expected HX-Redirect to /authorization/complete, got '{hx_redirect}'" ) - # -- Step 3: Complete authorization → redirect to consent -- + # Step 3: Complete authorization -> redirect to consent complete_res = await client.get("/authorization/complete", follow_redirects=False) assert complete_res.status_code in (302, 303), ( f"Expected redirect to /consent, got {complete_res.status_code}: {complete_res.text}" ) assert "/consent" in complete_res.headers["location"] - # -- Step 3b: Approve consent → redirect to callback with code + state -- + # Step 3b: Approve consent -> redirect to callback with code + state token = await get_csrf_token(client) consent_res = await client.post( "/consent", @@ -117,15 +113,18 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None: code = callback_params["code"][0] assert len(code) > 0 + return code - # -- Step 4: Token exchange → access_token, id_token, token_type -- - auth_header = b64encode(f"{client_id}:{client_secret}".encode()).decode() + +async def _exchange_token(client: AsyncClient, code: str) -> dict[str, Any]: + """Exchange authorization code for tokens.""" + auth_header = b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode() token_res = await client.post( "/token", data={ "grant_type": "authorization_code", "code": code, - "redirect_uri": redirect_uri, + "redirect_uri": REDIRECT_URI, }, headers={ "Authorization": f"Basic {auth_header}", @@ -138,45 +137,59 @@ async def test_full_authorization_code_flow(client: AsyncClient) -> None: assert "access_token" in token_data assert "id_token" in token_data assert token_data["token_type"].lower() == "bearer" + return token_data - access_token = token_data["access_token"] - id_token_jwt = token_data["id_token"] - # -- Step 5: Validate ID token — fetch JWKS, verify JWT signature -- +async def _validate_id_token(client: AsyncClient, id_token_jwt: str, nonce: str) -> str: + """Validate the ID token via JWKS and return the sub claim.""" jwks_res = await client.get("/jwks") assert jwks_res.status_code == 200 jwks = jwks_res.json() assert "keys" in jwks assert len(jwks["keys"]) > 0 - # Decode the ID token header to find the key ID - id_token_header = json.loads(JWS()._decode(id_token_jwt.split(".")[0])) + id_token_header = json.loads(JWS()._decode(id_token_jwt.split(".", maxsplit=1)[0])) kid = id_token_header.get("kid") - # Find the matching key in JWKS matching_keys = [k for k in jwks["keys"] if k.get("kid") == kid] assert len(matching_keys) == 1, f"Expected 1 matching key for kid={kid}, found {len(matching_keys)}" key = key_from_jwk_dict(matching_keys[0]) - # Verify JWT signature and decode payload verifier = JWS() payload = verifier.verify_compact(id_token_jwt, keys=[key]) id_token_payload = json.loads(payload) if isinstance(payload, (str, bytes)) else payload assert id_token_payload["iss"] == "http://localhost:8000" - assert client_id in id_token_payload["aud"] + assert CLIENT_ID in id_token_payload["aud"] assert id_token_payload["nonce"] == nonce - # sub is a pairwise identifier — just verify it exists and is non-empty id_token_sub = id_token_payload["sub"] assert isinstance(id_token_sub, str) assert len(id_token_sub) > 0 + return id_token_sub + + +async def test_full_authorization_code_flow(client: AsyncClient) -> None: + """End-to-end test of the complete OIDC Authorization Code flow. + + Exercises: discovery, client registration, authorization request, + password login, authorization completion, token exchange, ID token + validation (JWKS + JWT signature), and userinfo. + """ + app = client._transport.app # type: ignore[union-attr] + state = secrets.token_urlsafe(16) + nonce = secrets.token_urlsafe(16) + + await _setup_rp_and_user(app) + code = await _login_and_authorize(client, state, nonce) + token_data = await _exchange_token(client, code) + id_token_sub = await _validate_id_token(client, token_data["id_token"], nonce) # -- Step 6: UserInfo — GET /userinfo with Bearer token -- userinfo_res = await client.get( "/userinfo", - headers={"Authorization": f"Bearer {access_token}"}, + headers={"Authorization": f"Bearer {token_data['access_token']}"}, ) assert userinfo_res.status_code == 200, f"UserInfo failed: {userinfo_res.status_code} {userinfo_res.text}" diff --git a/tests/test_oidc/test_provider.py b/tests/test_oidc/test_provider.py index 7f3b0a7..ebfbf97 100644 --- a/tests/test_oidc/test_provider.py +++ b/tests/test_oidc/test_provider.py @@ -2,6 +2,7 @@ import shutil from pathlib import Path from porchlight.config import Settings +from porchlight.oidc.claims import PorchlightUserInfo from porchlight.oidc.provider import create_oidc_server @@ -49,8 +50,6 @@ def test_create_server_userinfo_is_porchlight() -> None: try: settings = Settings(issuer="http://localhost:8000", sqlite_path=":memory:", signing_key_path=str(key_path)) server = create_oidc_server(settings) - from porchlight.oidc.claims import PorchlightUserInfo - assert isinstance(server.context.userinfo, PorchlightUserInfo) finally: shutil.rmtree(key_path, ignore_errors=True) diff --git a/tests/test_store/conftest.py b/tests/test_store/conftest.py index 4ee4100..3eafd33 100644 --- a/tests/test_store/conftest.py +++ b/tests/test_store/conftest.py @@ -1,3 +1,4 @@ +from collections.abc import AsyncIterator from pathlib import Path import aiosqlite @@ -11,7 +12,7 @@ MIGRATIONS_DIR = ( @pytest.fixture -async def db(): +async def db() -> AsyncIterator[aiosqlite.Connection]: conn = await aiosqlite.connect(":memory:") conn.row_factory = aiosqlite.Row await conn.execute("PRAGMA foreign_keys=ON") diff --git a/tests/test_store/test_sqlite_consent_repo.py b/tests/test_store/test_sqlite_consent_repo.py index d009510..4bb851a 100644 --- a/tests/test_store/test_sqlite_consent_repo.py +++ b/tests/test_store/test_sqlite_consent_repo.py @@ -1,11 +1,13 @@ from datetime import UTC, datetime +import aiosqlite + from porchlight.models import User from porchlight.store.protocols import ConsentRepository from porchlight.store.sqlite.repositories import SQLiteConsentRepository, SQLiteUserRepository -async def _create_user(db) -> User: +async def _create_user(db: aiosqlite.Connection) -> User: """Helper to create a test user.""" user_repo = SQLiteUserRepository(db) user = User( @@ -18,12 +20,12 @@ async def _create_user(db) -> User: return await user_repo.create(user) -async def test_implements_protocol(db) -> None: +async def test_implements_protocol(db: aiosqlite.Connection) -> None: repo = SQLiteConsentRepository(db) assert isinstance(repo, ConsentRepository) -async def test_set_and_get_consent(db) -> None: +async def test_set_and_get_consent(db: aiosqlite.Connection) -> None: user = await _create_user(db) repo = SQLiteConsentRepository(db) await repo.set_consent(user.userid, "test-rp", ["openid", "profile"]) @@ -37,14 +39,14 @@ async def test_set_and_get_consent(db) -> None: assert isinstance(consent.updated_at, datetime) -async def test_get_consent_not_found(db) -> None: +async def test_get_consent_not_found(db: aiosqlite.Connection) -> None: user = await _create_user(db) repo = SQLiteConsentRepository(db) consent = await repo.get_consent(user.userid, "nonexistent") assert consent is None -async def test_set_consent_upserts(db) -> None: +async def test_set_consent_upserts(db: aiosqlite.Connection) -> None: user = await _create_user(db) repo = SQLiteConsentRepository(db) await repo.set_consent(user.userid, "test-rp", ["openid"]) @@ -61,7 +63,7 @@ async def test_set_consent_upserts(db) -> None: assert consent.updated_at >= original.updated_at -async def test_delete_consent(db) -> None: +async def test_delete_consent(db: aiosqlite.Connection) -> None: user = await _create_user(db) repo = SQLiteConsentRepository(db) await repo.set_consent(user.userid, "test-rp", ["openid"]) @@ -73,14 +75,14 @@ async def test_delete_consent(db) -> None: assert consent is None -async def test_delete_consent_not_found(db) -> None: +async def test_delete_consent_not_found(db: aiosqlite.Connection) -> None: user = await _create_user(db) repo = SQLiteConsentRepository(db) result = await repo.delete_consent(user.userid, "nonexistent") assert result is False -async def test_list_consents(db) -> None: +async def test_list_consents(db: aiosqlite.Connection) -> None: user = await _create_user(db) repo = SQLiteConsentRepository(db) await repo.set_consent(user.userid, "rp-a", ["openid"]) @@ -92,14 +94,14 @@ async def test_list_consents(db) -> None: assert client_ids == {"rp-a", "rp-b"} -async def test_list_consents_empty(db) -> None: +async def test_list_consents_empty(db: aiosqlite.Connection) -> None: user = await _create_user(db) repo = SQLiteConsentRepository(db) consents = await repo.list_consents(user.userid) assert consents == [] -async def test_consent_deleted_on_user_cascade(db) -> None: +async def test_consent_deleted_on_user_cascade(db: aiosqlite.Connection) -> None: """Consent records are deleted when the user is deleted (CASCADE).""" user = await _create_user(db) user_repo = SQLiteUserRepository(db) diff --git a/tests/test_store/test_sqlite_credential_repo.py b/tests/test_store/test_sqlite_credential_repo.py index f423f8c..2c5d33d 100644 --- a/tests/test_store/test_sqlite_credential_repo.py +++ b/tests/test_store/test_sqlite_credential_repo.py @@ -171,8 +171,8 @@ async def test_create_duplicate_password(credential_repo: SQLiteCredentialReposi cred = PasswordCredential(user_id=alice.userid, password_hash="hash1") await credential_repo.create_password(cred) + cred2 = PasswordCredential(user_id=alice.userid, password_hash="hash2") with pytest.raises(DuplicateError): - cred2 = PasswordCredential(user_id=alice.userid, password_hash="hash2") await credential_repo.create_password(cred2) diff --git a/tests/test_store/test_sqlite_magic_link_repo.py b/tests/test_store/test_sqlite_magic_link_repo.py index 00664cd..c2c9571 100644 --- a/tests/test_store/test_sqlite_magic_link_repo.py +++ b/tests/test_store/test_sqlite_magic_link_repo.py @@ -1,4 +1,5 @@ from datetime import UTC, datetime, timedelta +from typing import Any import aiosqlite import pytest @@ -14,7 +15,7 @@ def magic_link_repo(db: aiosqlite.Connection) -> SQLiteMagicLinkRepository: return SQLiteMagicLinkRepository(db) -def _make_link(**overrides) -> MagicLink: +def _make_link(**overrides: Any) -> MagicLink: defaults = { "token": "abc123", "username": "alice", diff --git a/tests/test_store/test_sqlite_user_repo.py b/tests/test_store/test_sqlite_user_repo.py index 0239a08..bc6255e 100644 --- a/tests/test_store/test_sqlite_user_repo.py +++ b/tests/test_store/test_sqlite_user_repo.py @@ -1,3 +1,5 @@ +from typing import Any + import aiosqlite import pytest @@ -12,7 +14,7 @@ def user_repo(db: aiosqlite.Connection) -> SQLiteUserRepository: return SQLiteUserRepository(db) -def _make_user(**overrides) -> User: +def _make_user(**overrides: Any) -> User: defaults = {"userid": "lusab-bansen", "username": "alice"} defaults.update(overrides) return User(**defaults) diff --git a/tests/test_validation.py b/tests/test_validation.py index 74e97d5..2be1afc 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -237,11 +237,11 @@ class TestPasswordSet: PasswordSet(password=long_pw, confirm=long_pw) def test_weak_password_rejected(self) -> None: - with pytest.raises(ValidationError, match="[Ww]eak\\b|[Ss]trength|easily guessed"): + with pytest.raises(ValidationError, match=r"[Ww]eak\b|[Ss]trength|easily guessed"): PasswordSet(password="password", confirm="password") def test_common_password_rejected(self) -> None: - with pytest.raises(ValidationError, match="[Ww]eak\\b|[Ss]trength|easily guessed"): + with pytest.raises(ValidationError, match=r"[Ww]eak\b|[Ss]trength|easily guessed"): PasswordSet(password="12345678", confirm="12345678")