268 lines
8.7 KiB
Python
268 lines
8.7 KiB
Python
import secrets
|
|
from datetime import UTC, datetime
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
from argon2 import PasswordHasher
|
|
from httpx import AsyncClient
|
|
|
|
from porchlight.authn.password import PasswordService
|
|
from porchlight.models import PasswordCredential, User
|
|
|
|
|
|
async def test_authorization_shows_consent_for_new_client(client: AsyncClient) -> None:
|
|
"""First-time authorization for an RP should redirect to /consent."""
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
_register_test_rp(app)
|
|
await _create_test_user(app)
|
|
|
|
# Login
|
|
await client.post(
|
|
"/login/password",
|
|
data={"username": "consentuser", "password": "testpass"},
|
|
headers={"HX-Request": "true"},
|
|
)
|
|
|
|
# Authorization request
|
|
res = await client.get(
|
|
"/authorization",
|
|
params={
|
|
"response_type": "code",
|
|
"client_id": "consent-rp",
|
|
"redirect_uri": "http://localhost:9000/callback",
|
|
"scope": "openid profile",
|
|
"state": "teststate",
|
|
},
|
|
follow_redirects=False,
|
|
)
|
|
assert res.status_code == 303
|
|
assert "/consent" in res.headers["location"]
|
|
|
|
|
|
async def test_consent_page_renders(client: AsyncClient) -> None:
|
|
"""GET /consent should render the consent form."""
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
_register_test_rp(app)
|
|
await _create_test_user(app)
|
|
await _login_and_start_auth(client)
|
|
|
|
res = await client.get("/consent")
|
|
assert res.status_code == 200
|
|
assert "consent-rp" in res.text
|
|
assert "profile" in res.text.lower()
|
|
|
|
|
|
async def test_consent_allow_redirects_with_code(client: AsyncClient) -> None:
|
|
"""Approving consent should complete the authorization flow."""
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
_register_test_rp(app)
|
|
await _create_test_user(app)
|
|
await _login_and_start_auth(client)
|
|
|
|
res = await client.post(
|
|
"/consent",
|
|
data={"action": "allow", "scope": ["openid", "profile"]},
|
|
follow_redirects=False,
|
|
)
|
|
assert res.status_code == 303
|
|
location = res.headers["location"]
|
|
parsed = urlparse(location)
|
|
params = parse_qs(parsed.query)
|
|
assert "code" in params
|
|
|
|
|
|
async def test_consent_deny_redirects_with_error(client: AsyncClient) -> None:
|
|
"""Denying consent should redirect with access_denied error."""
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
_register_test_rp(app)
|
|
await _create_test_user(app)
|
|
await _login_and_start_auth(client)
|
|
|
|
res = await client.post(
|
|
"/consent",
|
|
data={"action": "deny"},
|
|
follow_redirects=False,
|
|
)
|
|
assert res.status_code == 303
|
|
location = res.headers["location"]
|
|
parsed = urlparse(location)
|
|
params = parse_qs(parsed.query)
|
|
assert params["error"] == ["access_denied"]
|
|
|
|
|
|
async def test_saved_consent_skips_consent_screen(client: AsyncClient) -> None:
|
|
"""Second authorization with same scopes should skip consent."""
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
_register_test_rp(app)
|
|
await _create_test_user(app)
|
|
|
|
# First flow: login, authorize, consent
|
|
await _login_and_start_auth(client)
|
|
await client.post(
|
|
"/consent",
|
|
data={"action": "allow", "scope": ["openid", "profile"]},
|
|
follow_redirects=False,
|
|
)
|
|
|
|
# Second flow: same scopes, should skip consent
|
|
res = await client.get(
|
|
"/authorization",
|
|
params={
|
|
"response_type": "code",
|
|
"client_id": "consent-rp",
|
|
"redirect_uri": "http://localhost:9000/callback",
|
|
"scope": "openid profile",
|
|
"state": "teststate2",
|
|
},
|
|
follow_redirects=False,
|
|
)
|
|
assert res.status_code == 303
|
|
location = res.headers["location"]
|
|
# Should redirect directly to callback, not /consent
|
|
assert "callback" in location
|
|
assert "code" in location
|
|
|
|
|
|
async def test_new_scopes_reshows_consent(client: AsyncClient) -> None:
|
|
"""If RP requests new scopes, consent screen should reappear."""
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
_register_test_rp(app)
|
|
await _create_test_user(app)
|
|
|
|
# First flow: consent to openid only
|
|
await _login_and_start_auth(client, scope="openid")
|
|
await client.post(
|
|
"/consent",
|
|
data={"action": "allow", "scope": ["openid"]},
|
|
follow_redirects=False,
|
|
)
|
|
|
|
# Second flow: request openid + profile (new scope)
|
|
res = await client.get(
|
|
"/authorization",
|
|
params={
|
|
"response_type": "code",
|
|
"client_id": "consent-rp",
|
|
"redirect_uri": "http://localhost:9000/callback",
|
|
"scope": "openid profile",
|
|
"state": "teststate2",
|
|
},
|
|
follow_redirects=False,
|
|
)
|
|
assert res.status_code == 303
|
|
assert "/consent" in res.headers["location"]
|
|
|
|
|
|
async def test_manage_app_skips_consent(client: AsyncClient) -> None:
|
|
"""The manage-app client should bypass consent entirely."""
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
settings = app.state.settings
|
|
await _create_test_user(app)
|
|
|
|
await client.post(
|
|
"/login/password",
|
|
data={"username": "consentuser", "password": "testpass"},
|
|
headers={"HX-Request": "true"},
|
|
)
|
|
|
|
manage_cdb = app.state.oidc_server.context.cdb[settings.manage_client_id]
|
|
redirect_uri = manage_cdb["redirect_uris"][0][0]
|
|
|
|
res = await client.get(
|
|
"/authorization",
|
|
params={
|
|
"response_type": "code",
|
|
"client_id": settings.manage_client_id,
|
|
"redirect_uri": redirect_uri,
|
|
"scope": "openid profile email",
|
|
"state": "teststate",
|
|
},
|
|
follow_redirects=False,
|
|
)
|
|
assert res.status_code == 303
|
|
location = res.headers["location"]
|
|
# Should redirect directly to callback, not /consent
|
|
assert "code" in location
|
|
assert "/consent" not in location
|
|
|
|
|
|
async def test_partial_consent_filters_scopes(client: AsyncClient) -> None:
|
|
"""User can approve only some scopes (partial consent)."""
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
_register_test_rp(app)
|
|
await _create_test_user(app)
|
|
|
|
# Request openid + profile + email, approve only openid + profile
|
|
await _login_and_start_auth(client, scope="openid profile email")
|
|
res = await client.post(
|
|
"/consent",
|
|
data={"action": "allow", "scope": ["openid", "profile"]},
|
|
follow_redirects=False,
|
|
)
|
|
assert res.status_code == 303
|
|
location = res.headers["location"]
|
|
assert "code" in location
|
|
|
|
# Verify consent was saved with only the approved scopes
|
|
consent_repo = app.state.consent_repo
|
|
consent = await consent_repo.get_consent("lusab-consent", "consent-rp")
|
|
assert consent is not None
|
|
assert set(consent.scopes) == {"openid", "profile"}
|
|
|
|
|
|
# -- Test helpers --
|
|
|
|
|
|
def _register_test_rp(app) -> None:
|
|
oidc_server = app.state.oidc_server
|
|
if "consent-rp" in oidc_server.context.cdb:
|
|
return
|
|
client_id = "consent-rp"
|
|
client_secret = "consent-secret-0123456789abcdef"
|
|
oidc_server.context.cdb[client_id] = {
|
|
"client_id": client_id,
|
|
"client_secret": client_secret,
|
|
"redirect_uris": [("http://localhost:9000/callback", {})],
|
|
"response_types_supported": ["code"],
|
|
"token_endpoint_auth_method": "client_secret_basic",
|
|
"scope": ["openid", "profile", "email"],
|
|
"allowed_scopes": ["openid", "profile", "email"],
|
|
"client_salt": secrets.token_hex(8),
|
|
}
|
|
oidc_server.keyjar.add_symmetric(client_id, client_secret)
|
|
|
|
|
|
async def _create_test_user(app) -> None:
|
|
user_repo = app.state.user_repo
|
|
existing = await user_repo.get_by_username("consentuser")
|
|
if existing:
|
|
return
|
|
user = User(
|
|
userid="lusab-consent",
|
|
username="consentuser",
|
|
email="consent@example.com",
|
|
created_at=datetime.now(UTC),
|
|
updated_at=datetime.now(UTC),
|
|
)
|
|
await user_repo.create(user)
|
|
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
|
cred_repo = app.state.credential_repo
|
|
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
|
|
|
|
|
|
async def _login_and_start_auth(client: AsyncClient, scope: str = "openid profile") -> None:
|
|
await client.post(
|
|
"/login/password",
|
|
data={"username": "consentuser", "password": "testpass"},
|
|
headers={"HX-Request": "true"},
|
|
)
|
|
await client.get(
|
|
"/authorization",
|
|
params={
|
|
"response_type": "code",
|
|
"client_id": "consent-rp",
|
|
"redirect_uri": "http://localhost:9000/callback",
|
|
"scope": scope,
|
|
"state": "teststate",
|
|
},
|
|
follow_redirects=False,
|
|
)
|