132 lines
4.9 KiB
Python
132 lines
4.9 KiB
Python
import os
|
|
from base64 import urlsafe_b64encode
|
|
from datetime import UTC, datetime
|
|
|
|
from argon2 import PasswordHasher
|
|
from cryptography.hazmat.primitives.asymmetric import ec
|
|
from fido2.cose import ES256
|
|
from fido2.utils import sha256
|
|
from fido2.webauthn import (
|
|
Aaguid,
|
|
AttestationObject,
|
|
AttestedCredentialData,
|
|
AuthenticatorAttestationResponse,
|
|
AuthenticatorData,
|
|
CollectedClientData,
|
|
RegistrationResponse,
|
|
)
|
|
from httpx import AsyncClient
|
|
|
|
from porchlight.authn.password import PasswordService
|
|
from porchlight.models import PasswordCredential, User, WebAuthnCredential
|
|
|
|
RP_ID = "localhost"
|
|
ORIGIN = "http://localhost:8000"
|
|
|
|
|
|
async def _create_user_and_login(client: AsyncClient) -> str:
|
|
"""Create user with password, log in, return userid."""
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
user_repo = app.state.user_repo
|
|
cred_repo = app.state.credential_repo
|
|
|
|
user = User(userid="lusab-bansen", username="alice", created_at=datetime.now(UTC), updated_at=datetime.now(UTC))
|
|
await user_repo.create(user)
|
|
|
|
svc = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
|
await cred_repo.create_password(PasswordCredential(user_id=user.userid, password_hash=svc.hash("testpass")))
|
|
|
|
await client.post(
|
|
"/login/password",
|
|
data={"username": "alice", "password": "testpass"},
|
|
headers={"HX-Request": "true"},
|
|
)
|
|
return user.userid
|
|
|
|
|
|
def _generate_credential() -> tuple[ec.EllipticCurvePrivateKey, bytes, AttestedCredentialData]:
|
|
private_key = ec.generate_private_key(ec.SECP256R1())
|
|
cose_key = ES256.from_cryptography_key(private_key.public_key())
|
|
credential_id = os.urandom(32)
|
|
attested = AttestedCredentialData.create(aaguid=Aaguid.NONE, credential_id=credential_id, public_key=cose_key)
|
|
return private_key, credential_id, attested
|
|
|
|
|
|
def _build_registration_response(
|
|
credential_id: bytes, attested: AttestedCredentialData, challenge: bytes
|
|
) -> RegistrationResponse:
|
|
auth_data = AuthenticatorData.create(
|
|
rp_id_hash=sha256(RP_ID.encode()),
|
|
flags=AuthenticatorData.FLAG.UP | AuthenticatorData.FLAG.AT,
|
|
counter=0,
|
|
credential_data=attested,
|
|
)
|
|
attestation_object = AttestationObject.create(fmt="none", auth_data=auth_data, att_stmt={})
|
|
client_data = CollectedClientData.create(type=CollectedClientData.TYPE.CREATE, challenge=challenge, origin=ORIGIN)
|
|
return RegistrationResponse(
|
|
raw_id=credential_id,
|
|
response=AuthenticatorAttestationResponse(client_data=client_data, attestation_object=attestation_object),
|
|
)
|
|
|
|
|
|
async def test_webauthn_begin_requires_session(client: AsyncClient) -> None:
|
|
res = await client.post("/manage/credentials/webauthn/begin", follow_redirects=False)
|
|
assert res.status_code in (302, 303, 401)
|
|
|
|
|
|
async def test_webauthn_begin_returns_options(client: AsyncClient) -> None:
|
|
await _create_user_and_login(client)
|
|
|
|
res = await client.post("/manage/credentials/webauthn/begin")
|
|
assert res.status_code == 200
|
|
data = res.json()
|
|
assert "publicKey" in data
|
|
assert "challenge" in data["publicKey"]
|
|
|
|
|
|
async def test_webauthn_complete_creates_credential(client: AsyncClient) -> None:
|
|
userid = await _create_user_and_login(client)
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
cred_repo = app.state.credential_repo
|
|
|
|
# Begin registration via service directly to get raw state
|
|
# (the session-based flow is hard to test e2e since we can't extract the state)
|
|
webauthn_service = app.state.webauthn_service
|
|
_private_key, credential_id, attested = _generate_credential()
|
|
|
|
_options, state = webauthn_service.begin_registration(user_id=userid.encode(), username="alice")
|
|
|
|
response = _build_registration_response(credential_id, attested, state["challenge"])
|
|
result = webauthn_service.complete_registration(state, response)
|
|
|
|
# Store credential directly to verify the repo works
|
|
cred = WebAuthnCredential(
|
|
user_id=userid,
|
|
credential_id=result.credential_data.credential_id,
|
|
public_key=bytes(result.credential_data),
|
|
)
|
|
await cred_repo.create_webauthn(cred)
|
|
|
|
creds = await cred_repo.get_webauthn_by_user(userid)
|
|
assert len(creds) == 1
|
|
assert creds[0].credential_id == credential_id
|
|
|
|
|
|
async def test_delete_webauthn_credential(client: AsyncClient) -> None:
|
|
userid = await _create_user_and_login(client)
|
|
app = client._transport.app # type: ignore[union-attr]
|
|
cred_repo = app.state.credential_repo
|
|
|
|
# User already has password credential from login. Add a webauthn credential.
|
|
await cred_repo.create_webauthn(WebAuthnCredential(user_id=userid, credential_id=b"cred1", public_key=b"key1"))
|
|
|
|
cred_id_b64 = urlsafe_b64encode(b"cred1").decode().rstrip("=")
|
|
|
|
res = await client.delete(
|
|
f"/manage/credentials/webauthn/{cred_id_b64}",
|
|
headers={"HX-Request": "true"},
|
|
)
|
|
assert res.status_code == 200
|
|
|
|
creds = await cred_repo.get_webauthn_by_user(userid)
|
|
assert len(creds) == 0
|