chore: remove completed implementation plan docs
This commit is contained in:
parent
dee1b1e3bc
commit
c726ae18d3
8 changed files with 0 additions and 7758 deletions
|
|
@ -1,860 +0,0 @@
|
|||
# Project Scaffolding Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Set up the project foundation: tooling (ruff, ty), package structure, configuration, Pydantic models, repository protocols, app factory, and initial test infrastructure.
|
||||
|
||||
**Architecture:** FastAPI app factory pattern with src layout. Pydantic models for data, Protocol-based repository interfaces, env-based configuration via pydantic-settings. All code checked by ruff (broad ruleset) and ty (strict mode).
|
||||
|
||||
**Tech Stack:** Python 3.13, FastAPI, pydantic-settings, ruff, ty, pytest, pytest-asyncio
|
||||
|
||||
**Design Document:** `docs/plans/2026-02-11-oidc-op-design.md`
|
||||
|
||||
---
|
||||
|
||||
### Task 1: Configure pyproject.toml with dependencies and tooling
|
||||
|
||||
**Files:**
|
||||
- Modify: `pyproject.toml`
|
||||
|
||||
**Step 1: Update pyproject.toml with all sections**
|
||||
|
||||
```toml
|
||||
[project]
|
||||
name = "fastapi-oidc-op"
|
||||
version = "0.1.0"
|
||||
description = "OIDC OpenID Provider with user management"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = [
|
||||
"fastapi>=0.115",
|
||||
"uvicorn[standard]>=0.34",
|
||||
"idpyoidc>=5.0",
|
||||
"pydantic-settings>=2.7",
|
||||
"jinja2>=3.1",
|
||||
"fido2>=2.1",
|
||||
"argon2-cffi>=25.1",
|
||||
"motor>=3.7",
|
||||
"aiosqlite>=0.21",
|
||||
"proquint>=0.2",
|
||||
"python-multipart>=0.0.20",
|
||||
"httpx>=0.28",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
fastapi-oidc-op = "fastapi_oidc_op.cli:main"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/fastapi_oidc_op"]
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
"pytest>=8.0",
|
||||
"pytest-asyncio>=0.25",
|
||||
"ruff>=0.15",
|
||||
"ty>=0.0.16",
|
||||
]
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 120
|
||||
target-version = "py311"
|
||||
src = ["src", "tests"]
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["E", "F", "UP", "B", "SIM", "I", "C4", "RUF"]
|
||||
ignore = ["E501"]
|
||||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"tests/*" = ["S101"]
|
||||
|
||||
[tool.ruff.format]
|
||||
quote-style = "double"
|
||||
indent-style = "space"
|
||||
docstring-code-format = true
|
||||
|
||||
[tool.ty]
|
||||
python-version = "3.13"
|
||||
src = ["src"]
|
||||
|
||||
[tool.ty.rules]
|
||||
possibly-unresolved-reference = "error"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
asyncio_mode = "auto"
|
||||
testpaths = ["tests"]
|
||||
```
|
||||
|
||||
**Step 2: Install dependencies**
|
||||
|
||||
Run: `uv sync`
|
||||
Expected: All dependencies resolve and install successfully.
|
||||
|
||||
**Step 3: Verify tools work**
|
||||
|
||||
Run: `uv run ruff check --version && uv run ty --version`
|
||||
Expected: Both print version numbers.
|
||||
|
||||
**Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add pyproject.toml uv.lock
|
||||
git commit -m "chore: configure project dependencies and tooling (ruff, ty, pytest)"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 2: Create package structure with __init__.py files
|
||||
|
||||
**Files:**
|
||||
- Create: `src/fastapi_oidc_op/__init__.py`
|
||||
- Create: `src/fastapi_oidc_op/oidc/__init__.py`
|
||||
- Create: `src/fastapi_oidc_op/authn/__init__.py`
|
||||
- Create: `src/fastapi_oidc_op/manage/__init__.py`
|
||||
- Create: `src/fastapi_oidc_op/store/__init__.py`
|
||||
- Create: `src/fastapi_oidc_op/store/mongodb/__init__.py`
|
||||
- Create: `src/fastapi_oidc_op/store/sqlite/__init__.py`
|
||||
- Create: `src/fastapi_oidc_op/invite/__init__.py`
|
||||
- Create: `tests/__init__.py`
|
||||
- Create: `tests/test_store/__init__.py`
|
||||
- Create: `tests/test_authn/__init__.py`
|
||||
- Create: `tests/test_oidc/__init__.py`
|
||||
- Create: `tests/test_manage/__init__.py`
|
||||
|
||||
**Step 1: Create all directories and __init__.py files**
|
||||
|
||||
All `__init__.py` files are empty. Create the directory tree:
|
||||
|
||||
```
|
||||
src/
|
||||
└── fastapi_oidc_op/
|
||||
├── __init__.py
|
||||
├── oidc/
|
||||
│ └── __init__.py
|
||||
├── authn/
|
||||
│ └── __init__.py
|
||||
├── manage/
|
||||
│ └── __init__.py
|
||||
├── store/
|
||||
│ ├── __init__.py
|
||||
│ ├── mongodb/
|
||||
│ │ └── __init__.py
|
||||
│ └── sqlite/
|
||||
│ └── __init__.py
|
||||
└── invite/
|
||||
└── __init__.py
|
||||
tests/
|
||||
├── __init__.py
|
||||
├── test_store/
|
||||
│ └── __init__.py
|
||||
├── test_authn/
|
||||
│ └── __init__.py
|
||||
├── test_oidc/
|
||||
│ └── __init__.py
|
||||
└── test_manage/
|
||||
└── __init__.py
|
||||
```
|
||||
|
||||
**Step 2: Verify package is importable**
|
||||
|
||||
Run: `uv run python -c "import fastapi_oidc_op; print('OK')"`
|
||||
Expected: Prints `OK`
|
||||
|
||||
**Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add src/ tests/
|
||||
git commit -m "chore: create package structure with src layout"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 3: Implement configuration module
|
||||
|
||||
**Files:**
|
||||
- Create: `src/fastapi_oidc_op/config.py`
|
||||
- Create: `tests/test_config.py`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```python
|
||||
# tests/test_config.py
|
||||
from fastapi_oidc_op.config import Settings, StorageBackend
|
||||
|
||||
|
||||
def test_default_settings() -> None:
|
||||
settings = Settings(
|
||||
issuer="http://localhost:8000",
|
||||
)
|
||||
assert settings.issuer == "http://localhost:8000"
|
||||
assert settings.storage_backend == StorageBackend.SQLITE
|
||||
assert settings.sqlite_path == "data/oidc_op.db"
|
||||
assert settings.manage_client_id == "manage-app"
|
||||
assert settings.invite_ttl == 86400
|
||||
assert settings.theme == "default"
|
||||
|
||||
|
||||
def test_mongodb_settings() -> None:
|
||||
settings = Settings(
|
||||
issuer="http://localhost:8000",
|
||||
storage_backend=StorageBackend.MONGODB,
|
||||
mongodb_uri="mongodb://mongo:27017",
|
||||
mongodb_database="test_db",
|
||||
)
|
||||
assert settings.storage_backend == StorageBackend.MONGODB
|
||||
assert settings.mongodb_uri == "mongodb://mongo:27017"
|
||||
assert settings.mongodb_database == "test_db"
|
||||
|
||||
|
||||
def test_settings_from_env(monkeypatch: "pytest.MonkeyPatch") -> None:
|
||||
import pytest
|
||||
|
||||
monkeypatch.setenv("OIDC_OP_ISSUER", "https://op.example.org")
|
||||
monkeypatch.setenv("OIDC_OP_STORAGE_BACKEND", "mongodb")
|
||||
monkeypatch.setenv("OIDC_OP_MONGODB_URI", "mongodb://remote:27017")
|
||||
settings = Settings() # type: ignore[call-arg]
|
||||
assert settings.issuer == "https://op.example.org"
|
||||
assert settings.storage_backend == StorageBackend.MONGODB
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `uv run pytest tests/test_config.py -v`
|
||||
Expected: FAIL - ImportError
|
||||
|
||||
**Step 3: Write the implementation**
|
||||
|
||||
```python
|
||||
# src/fastapi_oidc_op/config.py
|
||||
from enum import StrEnum
|
||||
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
class StorageBackend(StrEnum):
|
||||
SQLITE = "sqlite"
|
||||
MONGODB = "mongodb"
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
model_config = {"env_prefix": "OIDC_OP_"}
|
||||
|
||||
# Core
|
||||
issuer: str
|
||||
debug: bool = False
|
||||
|
||||
# Storage
|
||||
storage_backend: StorageBackend = StorageBackend.SQLITE
|
||||
|
||||
# SQLite
|
||||
sqlite_path: str = "data/oidc_op.db"
|
||||
|
||||
# MongoDB
|
||||
mongodb_uri: str = "mongodb://localhost:27017"
|
||||
mongodb_database: str = "oidc_op"
|
||||
|
||||
# Management RP
|
||||
manage_client_id: str = "manage-app"
|
||||
|
||||
# Magic links
|
||||
invite_ttl: int = 86400 # seconds
|
||||
|
||||
# Theme
|
||||
theme: str = "default"
|
||||
```
|
||||
|
||||
**Step 4: Run tests to verify they pass**
|
||||
|
||||
Run: `uv run pytest tests/test_config.py -v`
|
||||
Expected: All 3 tests PASS
|
||||
|
||||
**Step 5: Run ruff and ty**
|
||||
|
||||
Run: `uv run ruff check src/fastapi_oidc_op/config.py tests/test_config.py && uv run ruff format --check src/fastapi_oidc_op/config.py tests/test_config.py && uv run ty check src/fastapi_oidc_op/config.py`
|
||||
Expected: No errors
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/fastapi_oidc_op/config.py tests/test_config.py
|
||||
git commit -m "feat: add configuration module with env-based settings"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 4: Implement Pydantic models
|
||||
|
||||
**Files:**
|
||||
- Create: `src/fastapi_oidc_op/models.py`
|
||||
- Create: `tests/test_models.py`
|
||||
|
||||
**Step 1: Write the failing tests**
|
||||
|
||||
```python
|
||||
# tests/test_models.py
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from fastapi_oidc_op.models import (
|
||||
CredentialType,
|
||||
MagicLink,
|
||||
PasswordCredential,
|
||||
User,
|
||||
WebAuthnCredential,
|
||||
)
|
||||
|
||||
|
||||
def test_user_creation() -> None:
|
||||
user = User(
|
||||
userid="lusab-bansen",
|
||||
username="alice",
|
||||
)
|
||||
assert user.userid == "lusab-bansen"
|
||||
assert user.username == "alice"
|
||||
assert user.preferred_username is None
|
||||
assert user.email is None
|
||||
assert user.active is True
|
||||
assert user.groups == []
|
||||
assert user.created_at is not None
|
||||
assert user.updated_at is not None
|
||||
|
||||
|
||||
def test_user_with_all_fields() -> None:
|
||||
user = User(
|
||||
userid="lusab-bansen",
|
||||
username="alice",
|
||||
preferred_username="Alice W.",
|
||||
given_name="Alice",
|
||||
family_name="Wonderland",
|
||||
nickname="ally",
|
||||
email="alice@example.com",
|
||||
email_verified=True,
|
||||
phone_number="+1234567890",
|
||||
phone_number_verified=False,
|
||||
picture="https://example.com/alice.jpg",
|
||||
locale="en-US",
|
||||
active=True,
|
||||
groups=["admin", "users"],
|
||||
)
|
||||
assert user.given_name == "Alice"
|
||||
assert user.groups == ["admin", "users"]
|
||||
|
||||
|
||||
def test_webauthn_credential() -> None:
|
||||
cred = WebAuthnCredential(
|
||||
user_id="lusab-bansen",
|
||||
credential_id=b"\x01\x02\x03",
|
||||
public_key=b"\x04\x05\x06",
|
||||
sign_count=0,
|
||||
device_name="YubiKey 5",
|
||||
)
|
||||
assert cred.type == CredentialType.WEBAUTHN
|
||||
assert cred.credential_id == b"\x01\x02\x03"
|
||||
assert cred.device_name == "YubiKey 5"
|
||||
|
||||
|
||||
def test_password_credential() -> None:
|
||||
cred = PasswordCredential(
|
||||
user_id="lusab-bansen",
|
||||
password_hash="$argon2id$v=19$m=65536,t=3,p=4$hash",
|
||||
)
|
||||
assert cred.type == CredentialType.PASSWORD
|
||||
assert cred.password_hash.startswith("$argon2")
|
||||
|
||||
|
||||
def test_magic_link() -> None:
|
||||
link = MagicLink(
|
||||
token="abc123def456",
|
||||
username="newuser",
|
||||
)
|
||||
assert link.token == "abc123def456"
|
||||
assert link.username == "newuser"
|
||||
assert link.used is False
|
||||
assert link.created_by is None
|
||||
assert link.expires_at > datetime.now(timezone.utc)
|
||||
```
|
||||
|
||||
**Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `uv run pytest tests/test_models.py -v`
|
||||
Expected: FAIL - ImportError
|
||||
|
||||
**Step 3: Write the implementation**
|
||||
|
||||
```python
|
||||
# src/fastapi_oidc_op/models.py
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from enum import StrEnum
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
def _utcnow() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def _default_expiry() -> datetime:
|
||||
return datetime.now(timezone.utc) + timedelta(hours=24)
|
||||
|
||||
|
||||
class CredentialType(StrEnum):
|
||||
WEBAUTHN = "webauthn"
|
||||
PASSWORD = "password"
|
||||
|
||||
|
||||
class User(BaseModel):
|
||||
userid: str
|
||||
username: str
|
||||
preferred_username: str | None = None
|
||||
given_name: str | None = None
|
||||
family_name: str | None = None
|
||||
nickname: str | None = None
|
||||
email: str | None = None
|
||||
email_verified: bool = False
|
||||
phone_number: str | None = None
|
||||
phone_number_verified: bool = False
|
||||
picture: str | None = None
|
||||
locale: str | None = None
|
||||
active: bool = True
|
||||
created_at: datetime = Field(default_factory=_utcnow)
|
||||
updated_at: datetime = Field(default_factory=_utcnow)
|
||||
groups: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class WebAuthnCredential(BaseModel):
|
||||
user_id: str
|
||||
type: CredentialType = CredentialType.WEBAUTHN
|
||||
credential_id: bytes
|
||||
public_key: bytes
|
||||
sign_count: int = 0
|
||||
device_name: str = ""
|
||||
created_at: datetime = Field(default_factory=_utcnow)
|
||||
|
||||
|
||||
class PasswordCredential(BaseModel):
|
||||
user_id: str
|
||||
type: CredentialType = CredentialType.PASSWORD
|
||||
password_hash: str
|
||||
created_at: datetime = Field(default_factory=_utcnow)
|
||||
|
||||
|
||||
class MagicLink(BaseModel):
|
||||
token: str
|
||||
username: str
|
||||
expires_at: datetime = Field(default_factory=_default_expiry)
|
||||
used: bool = False
|
||||
created_by: str | None = None
|
||||
note: str | None = None
|
||||
```
|
||||
|
||||
**Step 4: Run tests to verify they pass**
|
||||
|
||||
Run: `uv run pytest tests/test_models.py -v`
|
||||
Expected: All 5 tests PASS
|
||||
|
||||
**Step 5: Run ruff and ty**
|
||||
|
||||
Run: `uv run ruff check src/fastapi_oidc_op/models.py tests/test_models.py && uv run ruff format --check src/fastapi_oidc_op/models.py tests/test_models.py && uv run ty check src/fastapi_oidc_op/models.py`
|
||||
Expected: No errors
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/fastapi_oidc_op/models.py tests/test_models.py
|
||||
git commit -m "feat: add Pydantic models for User, Credential, and MagicLink"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 5: Implement repository protocols
|
||||
|
||||
**Files:**
|
||||
- Create: `src/fastapi_oidc_op/store/protocols.py`
|
||||
- Create: `tests/test_store/test_protocols.py`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```python
|
||||
# tests/test_store/test_protocols.py
|
||||
from typing import runtime_checkable
|
||||
|
||||
from fastapi_oidc_op.store.protocols import (
|
||||
CredentialRepository,
|
||||
MagicLinkRepository,
|
||||
UserRepository,
|
||||
)
|
||||
|
||||
|
||||
def test_protocols_are_runtime_checkable() -> None:
|
||||
assert runtime_checkable(UserRepository) # type: ignore[arg-type]
|
||||
assert runtime_checkable(CredentialRepository) # type: ignore[arg-type]
|
||||
assert runtime_checkable(MagicLinkRepository) # type: ignore[arg-type]
|
||||
```
|
||||
|
||||
Note: This test just verifies the protocols are importable and runtime-checkable. The actual conformance tests come when we implement the SQLite/MongoDB repositories.
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `uv run pytest tests/test_store/test_protocols.py -v`
|
||||
Expected: FAIL - ImportError
|
||||
|
||||
**Step 3: Write the implementation**
|
||||
|
||||
```python
|
||||
# src/fastapi_oidc_op/store/protocols.py
|
||||
from typing import Protocol, runtime_checkable
|
||||
|
||||
from fastapi_oidc_op.models import (
|
||||
MagicLink,
|
||||
PasswordCredential,
|
||||
User,
|
||||
WebAuthnCredential,
|
||||
)
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class UserRepository(Protocol):
|
||||
async def create(self, user: User) -> User: ...
|
||||
|
||||
async def get_by_userid(self, userid: str) -> User | None: ...
|
||||
|
||||
async def get_by_username(self, username: str) -> User | None: ...
|
||||
|
||||
async def update(self, user: User) -> User: ...
|
||||
|
||||
async def list_users(self, offset: int = 0, limit: int = 100) -> list[User]: ...
|
||||
|
||||
async def delete(self, userid: str) -> bool: ...
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class CredentialRepository(Protocol):
|
||||
async def create_webauthn(self, credential: WebAuthnCredential) -> WebAuthnCredential: ...
|
||||
|
||||
async def create_password(self, credential: PasswordCredential) -> PasswordCredential: ...
|
||||
|
||||
async def get_webauthn_by_user(self, user_id: str) -> list[WebAuthnCredential]: ...
|
||||
|
||||
async def get_webauthn_by_credential_id(self, credential_id: bytes) -> WebAuthnCredential | None: ...
|
||||
|
||||
async def get_password_by_user(self, user_id: str) -> PasswordCredential | None: ...
|
||||
|
||||
async def update_webauthn(self, credential: WebAuthnCredential) -> WebAuthnCredential: ...
|
||||
|
||||
async def delete_webauthn(self, user_id: str, credential_id: bytes) -> bool: ...
|
||||
|
||||
async def delete_password(self, user_id: str) -> bool: ...
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class MagicLinkRepository(Protocol):
|
||||
async def create(self, link: MagicLink) -> MagicLink: ...
|
||||
|
||||
async def get_by_token(self, token: str) -> MagicLink | None: ...
|
||||
|
||||
async def mark_used(self, token: str) -> bool: ...
|
||||
|
||||
async def delete_expired(self) -> int: ...
|
||||
```
|
||||
|
||||
**Step 4: Run tests to verify they pass**
|
||||
|
||||
Run: `uv run pytest tests/test_store/test_protocols.py -v`
|
||||
Expected: PASS
|
||||
|
||||
**Step 5: Run ruff and ty**
|
||||
|
||||
Run: `uv run ruff check src/fastapi_oidc_op/store/protocols.py && uv run ruff format --check src/fastapi_oidc_op/store/protocols.py && uv run ty check src/fastapi_oidc_op/store/protocols.py`
|
||||
Expected: No errors
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/fastapi_oidc_op/store/protocols.py tests/test_store/test_protocols.py
|
||||
git commit -m "feat: add repository Protocol interfaces for User, Credential, MagicLink"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 6: Implement userid generation utility
|
||||
|
||||
**Files:**
|
||||
- Create: `src/fastapi_oidc_op/userid.py`
|
||||
- Create: `tests/test_userid.py`
|
||||
|
||||
**Step 1: Write the failing tests**
|
||||
|
||||
```python
|
||||
# tests/test_userid.py
|
||||
import re
|
||||
|
||||
from fastapi_oidc_op.userid import generate_userid
|
||||
|
||||
|
||||
def test_generate_userid_format() -> None:
|
||||
userid = generate_userid()
|
||||
# 32-bit proquint format: xxxxx-xxxxx
|
||||
parts = userid.split("-")
|
||||
assert len(parts) == 2
|
||||
for part in parts:
|
||||
assert len(part) == 5
|
||||
|
||||
|
||||
def test_generate_userid_uniqueness() -> None:
|
||||
ids = {generate_userid() for _ in range(100)}
|
||||
assert len(ids) == 100 # All unique
|
||||
|
||||
|
||||
def test_generate_userid_is_lowercase() -> None:
|
||||
userid = generate_userid()
|
||||
assert userid == userid.lower()
|
||||
```
|
||||
|
||||
**Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `uv run pytest tests/test_userid.py -v`
|
||||
Expected: FAIL - ImportError
|
||||
|
||||
**Step 3: Write the implementation**
|
||||
|
||||
```python
|
||||
# src/fastapi_oidc_op/userid.py
|
||||
import secrets
|
||||
|
||||
from proquint import uint2quint
|
||||
|
||||
|
||||
def generate_userid() -> str:
|
||||
"""Generate a unique user identifier in proquint format.
|
||||
|
||||
Returns a 32-bit proquint string like 'lusab-bansen'.
|
||||
"""
|
||||
return uint2quint(secrets.randbits(32))
|
||||
```
|
||||
|
||||
**Step 4: Run tests to verify they pass**
|
||||
|
||||
Run: `uv run pytest tests/test_userid.py -v`
|
||||
Expected: All 3 tests PASS
|
||||
|
||||
**Step 5: Run ruff and ty**
|
||||
|
||||
Run: `uv run ruff check src/fastapi_oidc_op/userid.py tests/test_userid.py && uv run ruff format --check src/fastapi_oidc_op/userid.py tests/test_userid.py && uv run ty check src/fastapi_oidc_op/userid.py`
|
||||
Expected: No errors (ty may warn about proquint missing type stubs - suppress with `# type: ignore[import-untyped]` if needed)
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/fastapi_oidc_op/userid.py tests/test_userid.py
|
||||
git commit -m "feat: add proquint-based userid generation"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 7: Implement app factory and health endpoint
|
||||
|
||||
**Files:**
|
||||
- Create: `src/fastapi_oidc_op/app.py`
|
||||
- Create: `tests/conftest.py`
|
||||
- Create: `tests/test_app.py`
|
||||
- Delete: `main.py` (replaced by app.py)
|
||||
|
||||
**Step 1: Write the failing tests**
|
||||
|
||||
```python
|
||||
# tests/conftest.py
|
||||
from collections.abc import AsyncIterator
|
||||
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
|
||||
from fastapi_oidc_op.app import create_app
|
||||
from fastapi_oidc_op.config import Settings
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def settings() -> Settings:
|
||||
return Settings(issuer="http://localhost:8000")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def client(settings: Settings) -> AsyncIterator[AsyncClient]:
|
||||
app = create_app(settings)
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url=settings.issuer) as ac:
|
||||
yield ac
|
||||
```
|
||||
|
||||
```python
|
||||
# tests/test_app.py
|
||||
from httpx import AsyncClient
|
||||
|
||||
|
||||
async def test_health_endpoint(client: AsyncClient) -> None:
|
||||
response = await client.get("/health")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["status"] == "ok"
|
||||
|
||||
|
||||
async def test_app_has_title(client: AsyncClient) -> None:
|
||||
response = await client.get("/openapi.json")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["info"]["title"] == "FastAPI OIDC OP"
|
||||
```
|
||||
|
||||
**Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `uv run pytest tests/test_app.py -v`
|
||||
Expected: FAIL - ImportError
|
||||
|
||||
**Step 3: Write the implementation**
|
||||
|
||||
```python
|
||||
# src/fastapi_oidc_op/app.py
|
||||
from fastapi import FastAPI
|
||||
|
||||
from fastapi_oidc_op.config import Settings
|
||||
|
||||
|
||||
def create_app(settings: Settings | None = None) -> FastAPI:
|
||||
if settings is None:
|
||||
settings = Settings() # type: ignore[call-arg]
|
||||
|
||||
app = FastAPI(
|
||||
title="FastAPI OIDC OP",
|
||||
version="0.1.0",
|
||||
docs_url="/docs" if settings.debug else None,
|
||||
redoc_url=None,
|
||||
)
|
||||
|
||||
app.state.settings = settings
|
||||
|
||||
@app.get("/health")
|
||||
async def health() -> dict[str, str]:
|
||||
return {"status": "ok"}
|
||||
|
||||
return app
|
||||
```
|
||||
|
||||
**Step 4: Run tests to verify they pass**
|
||||
|
||||
Run: `uv run pytest tests/test_app.py -v`
|
||||
Expected: All 2 tests PASS
|
||||
|
||||
**Step 5: Delete the old main.py**
|
||||
|
||||
Run: `rm main.py`
|
||||
|
||||
**Step 6: Run ruff and ty on all source files**
|
||||
|
||||
Run: `uv run ruff check src/ tests/ && uv run ruff format --check src/ tests/ && uv run ty check src/`
|
||||
Expected: No errors
|
||||
|
||||
**Step 7: Commit**
|
||||
|
||||
```bash
|
||||
git rm main.py
|
||||
git add src/fastapi_oidc_op/app.py tests/conftest.py tests/test_app.py
|
||||
git commit -m "feat: add app factory with health endpoint and test infrastructure"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 8: Run full quality checks and format
|
||||
|
||||
**Files:**
|
||||
- All files in `src/` and `tests/`
|
||||
|
||||
**Step 1: Format all code with ruff**
|
||||
|
||||
Run: `uv run ruff format src/ tests/`
|
||||
Expected: Files formatted (or already formatted)
|
||||
|
||||
**Step 2: Run ruff lint with auto-fix**
|
||||
|
||||
Run: `uv run ruff check src/ tests/ --fix`
|
||||
Expected: No errors (or safe fixes applied)
|
||||
|
||||
**Step 3: Run ty type check**
|
||||
|
||||
Run: `uv run ty check src/`
|
||||
Expected: No errors
|
||||
|
||||
**Step 4: Run full test suite**
|
||||
|
||||
Run: `uv run pytest -v`
|
||||
Expected: All tests pass
|
||||
|
||||
**Step 5: Commit any formatting changes**
|
||||
|
||||
```bash
|
||||
git add -A
|
||||
git diff --cached --quiet || git commit -m "style: apply ruff formatting"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 9: Add pre-commit quality gate script
|
||||
|
||||
**Files:**
|
||||
- Create: `scripts/check.sh`
|
||||
|
||||
**Step 1: Create the check script**
|
||||
|
||||
```bash
|
||||
#!/usr/bin/env bash
|
||||
# Run all quality checks
|
||||
set -euo pipefail
|
||||
|
||||
echo "==> Formatting..."
|
||||
uv run ruff format src/ tests/
|
||||
|
||||
echo "==> Linting..."
|
||||
uv run ruff check src/ tests/ --fix
|
||||
|
||||
echo "==> Type checking..."
|
||||
uv run ty check src/
|
||||
|
||||
echo "==> Testing..."
|
||||
uv run pytest -v
|
||||
|
||||
echo "==> All checks passed!"
|
||||
```
|
||||
|
||||
**Step 2: Make it executable**
|
||||
|
||||
Run: `chmod +x scripts/check.sh`
|
||||
|
||||
**Step 3: Run the script to verify it works**
|
||||
|
||||
Run: `./scripts/check.sh`
|
||||
Expected: All checks pass
|
||||
|
||||
**Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add scripts/check.sh
|
||||
git commit -m "chore: add quality check script (ruff, ty, pytest)"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Summary
|
||||
|
||||
After completing these 9 tasks, the project will have:
|
||||
|
||||
1. **pyproject.toml** - Dependencies, ruff config (broad ruleset), ty config (strict), pytest config
|
||||
2. **Package structure** - Full src layout with all subpackages
|
||||
3. **Configuration** - Pydantic Settings with env vars, storage backend selection
|
||||
4. **Models** - User, WebAuthnCredential, PasswordCredential, MagicLink
|
||||
5. **Repository protocols** - Type-safe interfaces for all data access
|
||||
6. **Userid generation** - Proquint-based unique identifiers
|
||||
7. **App factory** - FastAPI app with health endpoint
|
||||
8. **Test infrastructure** - pytest + httpx async client fixtures
|
||||
9. **Quality tooling** - ruff (lint + format), ty (type check), check script
|
||||
|
||||
The next plan will cover the SQLite repository implementation, followed by authentication, OIDC provider integration, and the management UI.
|
||||
|
|
@ -1,702 +0,0 @@
|
|||
# Authentication Services Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Implement backend authentication services: PasswordService (argon2), WebAuthnService (fido2), and MagicLinkService (token lifecycle).
|
||||
|
||||
**Architecture:** Three independent services with no cross-dependencies. PasswordService and WebAuthnService are pure logic (no repo dependency). MagicLinkService depends on MagicLinkRepository. All testable with in-memory SQLite or unit tests.
|
||||
|
||||
**Tech Stack:** argon2-cffi, python-fido2 v2.1, cryptography (for test key generation)
|
||||
|
||||
**Quality gate:** `./scripts/check.sh` (ruff format, ruff check, ty check, pytest)
|
||||
|
||||
---
|
||||
|
||||
### Task 1: PasswordService
|
||||
|
||||
**Files:**
|
||||
- Create: `src/fastapi_oidc_op/authn/password.py`
|
||||
- Create: `tests/test_authn/__init__.py`
|
||||
- Create: `tests/test_authn/test_password.py`
|
||||
|
||||
**Step 1: Write the failing tests**
|
||||
|
||||
```python
|
||||
# tests/test_authn/test_password.py
|
||||
from argon2 import PasswordHasher
|
||||
|
||||
from fastapi_oidc_op.authn.password import PasswordService
|
||||
|
||||
|
||||
def test_hash_returns_argon2_string() -> None:
|
||||
service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||
result = service.hash("correcthorse")
|
||||
assert result.startswith("$argon2id$")
|
||||
|
||||
|
||||
def test_verify_correct_password() -> None:
|
||||
service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||
hashed = service.hash("correcthorse")
|
||||
assert service.verify(hashed, "correcthorse") is True
|
||||
|
||||
|
||||
def test_verify_wrong_password() -> None:
|
||||
service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||
hashed = service.hash("correcthorse")
|
||||
assert service.verify(hashed, "wrongpassword") is False
|
||||
|
||||
|
||||
def test_verify_invalid_hash() -> None:
|
||||
service = PasswordService(hasher=PasswordHasher(time_cost=1, memory_cost=8192))
|
||||
assert service.verify("not-a-hash", "password") is False
|
||||
|
||||
|
||||
def test_default_hasher() -> None:
|
||||
service = PasswordService()
|
||||
hashed = service.hash("test")
|
||||
assert service.verify(hashed, "test") is True
|
||||
```
|
||||
|
||||
**Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `pytest tests/test_authn/test_password.py -v`
|
||||
Expected: FAIL — cannot import `PasswordService`
|
||||
|
||||
**Step 3: Create `__init__.py` and write the implementation**
|
||||
|
||||
```python
|
||||
# tests/test_authn/__init__.py
|
||||
# (empty file)
|
||||
```
|
||||
|
||||
```python
|
||||
# src/fastapi_oidc_op/authn/password.py
|
||||
from argon2 import PasswordHasher
|
||||
from argon2.exceptions import InvalidHashError, VerifyMismatchError
|
||||
|
||||
|
||||
class PasswordService:
|
||||
"""Argon2 password hashing and verification."""
|
||||
|
||||
def __init__(self, hasher: PasswordHasher | None = None) -> None:
|
||||
self._hasher = hasher or PasswordHasher()
|
||||
|
||||
def hash(self, password: str) -> str:
|
||||
"""Hash a password using argon2id. Returns the full hash string."""
|
||||
return self._hasher.hash(password)
|
||||
|
||||
def verify(self, password_hash: str, password: str) -> bool:
|
||||
"""Verify a password against an argon2 hash. Returns True if valid."""
|
||||
try:
|
||||
return self._hasher.verify(password_hash, password)
|
||||
except (VerifyMismatchError, InvalidHashError):
|
||||
return False
|
||||
```
|
||||
|
||||
**Step 4: Run tests to verify they pass**
|
||||
|
||||
Run: `pytest tests/test_authn/test_password.py -v`
|
||||
Expected: 5 PASSED
|
||||
|
||||
**Step 5: Run quality gate**
|
||||
|
||||
Run: `./scripts/check.sh`
|
||||
Expected: All green
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/fastapi_oidc_op/authn/password.py tests/test_authn/__init__.py tests/test_authn/test_password.py
|
||||
git commit -m "feat: add PasswordService with argon2 hash/verify"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 2: WebAuthnService
|
||||
|
||||
**Files:**
|
||||
- Create: `src/fastapi_oidc_op/authn/webauthn.py`
|
||||
- Create: `tests/test_authn/test_webauthn.py`
|
||||
|
||||
**Step 1: Write the failing tests**
|
||||
|
||||
These tests build real cryptographic registration and authentication responses using fido2's factory methods and the `cryptography` library.
|
||||
|
||||
```python
|
||||
# tests/test_authn/test_webauthn.py
|
||||
import os
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.hashes import SHA256
|
||||
from fido2.cose import ES256
|
||||
from fido2.utils import sha256
|
||||
from fido2.webauthn import (
|
||||
Aaguid,
|
||||
AttestedCredentialData,
|
||||
AttestationObject,
|
||||
AuthenticationResponse,
|
||||
AuthenticatorAssertionResponse,
|
||||
AuthenticatorAttestationResponse,
|
||||
AuthenticatorData,
|
||||
CollectedClientData,
|
||||
PublicKeyCredentialDescriptor,
|
||||
PublicKeyCredentialType,
|
||||
RegistrationResponse,
|
||||
)
|
||||
|
||||
from fastapi_oidc_op.authn.webauthn import WebAuthnService
|
||||
|
||||
RP_ID = "localhost"
|
||||
RP_NAME = "Test RP"
|
||||
ORIGIN = "http://localhost:8000"
|
||||
|
||||
|
||||
def _make_service() -> WebAuthnService:
|
||||
return WebAuthnService(rp_id=RP_ID, rp_name=RP_NAME, origin=ORIGIN)
|
||||
|
||||
|
||||
def _generate_credential() -> tuple[ec.EllipticCurvePrivateKey, bytes, AttestedCredentialData]:
|
||||
"""Generate a test credential: (private_key, credential_id, attested_credential_data)."""
|
||||
private_key = ec.generate_private_key(ec.SECP256R1())
|
||||
cose_key = ES256.from_cryptography_key(private_key.public_key())
|
||||
credential_id = os.urandom(32)
|
||||
attested = AttestedCredentialData.create(
|
||||
aaguid=Aaguid.NONE,
|
||||
credential_id=credential_id,
|
||||
public_key=cose_key,
|
||||
)
|
||||
return private_key, credential_id, attested
|
||||
|
||||
|
||||
def _build_registration_response(
|
||||
credential_id: bytes,
|
||||
attested: AttestedCredentialData,
|
||||
challenge: bytes,
|
||||
) -> RegistrationResponse:
|
||||
"""Build a valid registration response for the given challenge."""
|
||||
auth_data = AuthenticatorData.create(
|
||||
rp_id_hash=sha256(RP_ID.encode()),
|
||||
flags=AuthenticatorData.FLAG.UP | AuthenticatorData.FLAG.AT,
|
||||
counter=0,
|
||||
credential_data=attested,
|
||||
)
|
||||
attestation_object = AttestationObject.create(fmt="none", auth_data=auth_data, att_stmt={})
|
||||
client_data = CollectedClientData.create(
|
||||
type=CollectedClientData.TYPE.CREATE,
|
||||
challenge=challenge,
|
||||
origin=ORIGIN,
|
||||
)
|
||||
return RegistrationResponse(
|
||||
raw_id=credential_id,
|
||||
response=AuthenticatorAttestationResponse(
|
||||
client_data=client_data,
|
||||
attestation_object=attestation_object,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _build_authentication_response(
|
||||
private_key: ec.EllipticCurvePrivateKey,
|
||||
credential_id: bytes,
|
||||
challenge: bytes,
|
||||
counter: int = 1,
|
||||
) -> AuthenticationResponse:
|
||||
"""Build a valid authentication response signed with the private key."""
|
||||
client_data = CollectedClientData.create(
|
||||
type=CollectedClientData.TYPE.GET,
|
||||
challenge=challenge,
|
||||
origin=ORIGIN,
|
||||
)
|
||||
auth_data = AuthenticatorData.create(
|
||||
rp_id_hash=sha256(RP_ID.encode()),
|
||||
flags=AuthenticatorData.FLAG.UP,
|
||||
counter=counter,
|
||||
)
|
||||
signature = private_key.sign(auth_data + client_data.hash, ec.ECDSA(SHA256()))
|
||||
return AuthenticationResponse(
|
||||
raw_id=credential_id,
|
||||
response=AuthenticatorAssertionResponse(
|
||||
client_data=client_data,
|
||||
authenticator_data=auth_data,
|
||||
signature=signature,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# --- Registration tests ---
|
||||
|
||||
def test_begin_registration_returns_options_and_state() -> None:
|
||||
service = _make_service()
|
||||
options, state = service.begin_registration(
|
||||
user_id=b"user-123",
|
||||
username="alice",
|
||||
)
|
||||
# Options should be a dict suitable for JSON serialization
|
||||
assert "publicKey" in options
|
||||
pub_key = options["publicKey"]
|
||||
assert "challenge" in pub_key
|
||||
assert "rp" in pub_key
|
||||
assert "user" in pub_key
|
||||
# State should be a dict with challenge
|
||||
assert "challenge" in state
|
||||
|
||||
|
||||
def test_complete_registration_returns_credential_data() -> None:
|
||||
service = _make_service()
|
||||
private_key, credential_id, attested = _generate_credential()
|
||||
|
||||
options, state = service.begin_registration(
|
||||
user_id=b"user-123",
|
||||
username="alice",
|
||||
)
|
||||
|
||||
# Extract challenge from state to build a matching response
|
||||
challenge = state["challenge"]
|
||||
response = _build_registration_response(credential_id, attested, challenge)
|
||||
|
||||
result = service.complete_registration(state, response)
|
||||
assert result.credential_data is not None
|
||||
assert result.credential_data.credential_id == credential_id
|
||||
|
||||
|
||||
def test_begin_registration_with_existing_credentials() -> None:
|
||||
service = _make_service()
|
||||
_, cred_id, attested = _generate_credential()
|
||||
existing = [
|
||||
PublicKeyCredentialDescriptor(
|
||||
type=PublicKeyCredentialType.PUBLIC_KEY,
|
||||
id=cred_id,
|
||||
)
|
||||
]
|
||||
|
||||
options, state = service.begin_registration(
|
||||
user_id=b"user-123",
|
||||
username="alice",
|
||||
existing_credentials=existing,
|
||||
)
|
||||
pub_key = options["publicKey"]
|
||||
assert "excludeCredentials" in pub_key
|
||||
assert len(pub_key["excludeCredentials"]) == 1
|
||||
|
||||
|
||||
# --- Authentication tests ---
|
||||
|
||||
def test_begin_authentication_returns_options_and_state() -> None:
|
||||
service = _make_service()
|
||||
_, cred_id, attested = _generate_credential()
|
||||
credentials = [
|
||||
PublicKeyCredentialDescriptor(
|
||||
type=PublicKeyCredentialType.PUBLIC_KEY,
|
||||
id=cred_id,
|
||||
)
|
||||
]
|
||||
|
||||
options, state = service.begin_authentication(credentials=credentials)
|
||||
assert "publicKey" in options
|
||||
assert "challenge" in state
|
||||
|
||||
|
||||
def test_complete_authentication_verifies_signature() -> None:
|
||||
service = _make_service()
|
||||
private_key, credential_id, attested = _generate_credential()
|
||||
|
||||
credentials = [
|
||||
PublicKeyCredentialDescriptor(
|
||||
type=PublicKeyCredentialType.PUBLIC_KEY,
|
||||
id=credential_id,
|
||||
)
|
||||
]
|
||||
|
||||
options, state = service.begin_authentication(credentials=credentials)
|
||||
challenge = state["challenge"]
|
||||
|
||||
response = _build_authentication_response(private_key, credential_id, challenge)
|
||||
|
||||
result = service.complete_authentication(
|
||||
state=state,
|
||||
credentials=[attested],
|
||||
response=response,
|
||||
)
|
||||
assert result.credential_id == credential_id
|
||||
|
||||
|
||||
def test_complete_authentication_wrong_signature_raises() -> None:
|
||||
import pytest
|
||||
|
||||
service = _make_service()
|
||||
private_key, credential_id, attested = _generate_credential()
|
||||
|
||||
# Generate a different key to produce a wrong signature
|
||||
wrong_key = ec.generate_private_key(ec.SECP256R1())
|
||||
|
||||
credentials = [
|
||||
PublicKeyCredentialDescriptor(
|
||||
type=PublicKeyCredentialType.PUBLIC_KEY,
|
||||
id=credential_id,
|
||||
)
|
||||
]
|
||||
|
||||
options, state = service.begin_authentication(credentials=credentials)
|
||||
challenge = state["challenge"]
|
||||
|
||||
response = _build_authentication_response(wrong_key, credential_id, challenge)
|
||||
|
||||
with pytest.raises(Exception): # fido2 raises ValueError or InvalidSignature
|
||||
service.complete_authentication(
|
||||
state=state,
|
||||
credentials=[attested],
|
||||
response=response,
|
||||
)
|
||||
```
|
||||
|
||||
**Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `pytest tests/test_authn/test_webauthn.py -v`
|
||||
Expected: FAIL — cannot import `WebAuthnService`
|
||||
|
||||
**Step 3: Write the implementation**
|
||||
|
||||
```python
|
||||
# src/fastapi_oidc_op/authn/webauthn.py
|
||||
from typing import Any, Sequence
|
||||
|
||||
from fido2.server import Fido2Server
|
||||
from fido2.webauthn import (
|
||||
AttestedCredentialData,
|
||||
AuthenticationResponse,
|
||||
AuthenticatorData,
|
||||
PublicKeyCredentialDescriptor,
|
||||
PublicKeyCredentialRpEntity,
|
||||
PublicKeyCredentialUserEntity,
|
||||
RegistrationResponse,
|
||||
)
|
||||
|
||||
|
||||
class WebAuthnService:
|
||||
"""FIDO2 WebAuthn registration and authentication."""
|
||||
|
||||
def __init__(self, rp_id: str, rp_name: str, origin: str) -> None:
|
||||
rp = PublicKeyCredentialRpEntity(name=rp_name, id=rp_id)
|
||||
self._server = Fido2Server(rp, verify_origin=lambda o: o == origin)
|
||||
|
||||
def begin_registration(
|
||||
self,
|
||||
user_id: bytes,
|
||||
username: str,
|
||||
existing_credentials: Sequence[PublicKeyCredentialDescriptor] | None = None,
|
||||
) -> tuple[dict[str, Any], dict[str, Any]]:
|
||||
"""Begin WebAuthn registration.
|
||||
|
||||
Returns (options_dict, state_dict).
|
||||
options_dict is JSON-serializable for sending to the browser.
|
||||
state_dict must be stored by the caller and passed to complete_registration.
|
||||
"""
|
||||
user = PublicKeyCredentialUserEntity(id=user_id, name=username, display_name=username)
|
||||
options, state = self._server.register_begin(
|
||||
user=user,
|
||||
credentials=existing_credentials,
|
||||
)
|
||||
return dict(options), state
|
||||
|
||||
def complete_registration(
|
||||
self,
|
||||
state: dict[str, Any],
|
||||
response: RegistrationResponse | dict[str, Any],
|
||||
) -> AuthenticatorData:
|
||||
"""Complete WebAuthn registration.
|
||||
|
||||
Returns AuthenticatorData with credential_data containing the public key.
|
||||
"""
|
||||
return self._server.register_complete(state, response)
|
||||
|
||||
def begin_authentication(
|
||||
self,
|
||||
credentials: Sequence[PublicKeyCredentialDescriptor] | None = None,
|
||||
) -> tuple[dict[str, Any], dict[str, Any]]:
|
||||
"""Begin WebAuthn authentication.
|
||||
|
||||
Returns (options_dict, state_dict).
|
||||
"""
|
||||
options, state = self._server.authenticate_begin(credentials=credentials)
|
||||
return dict(options), state
|
||||
|
||||
def complete_authentication(
|
||||
self,
|
||||
state: dict[str, Any],
|
||||
credentials: Sequence[AttestedCredentialData],
|
||||
response: AuthenticationResponse | dict[str, Any],
|
||||
) -> AttestedCredentialData:
|
||||
"""Complete WebAuthn authentication.
|
||||
|
||||
Verifies the assertion signature against stored credentials.
|
||||
Returns the matched AttestedCredentialData.
|
||||
Raises on verification failure.
|
||||
"""
|
||||
return self._server.authenticate_complete(state, credentials, response)
|
||||
```
|
||||
|
||||
**Step 4: Run tests to verify they pass**
|
||||
|
||||
Run: `pytest tests/test_authn/test_webauthn.py -v`
|
||||
Expected: 6 PASSED
|
||||
|
||||
**Step 5: Run quality gate**
|
||||
|
||||
Run: `./scripts/check.sh`
|
||||
Expected: All green
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/fastapi_oidc_op/authn/webauthn.py tests/test_authn/test_webauthn.py
|
||||
git commit -m "feat: add WebAuthnService with fido2 registration and authentication"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 3: MagicLinkService
|
||||
|
||||
**Files:**
|
||||
- Create: `src/fastapi_oidc_op/invite/service.py`
|
||||
- Create: `tests/test_invite/__init__.py`
|
||||
- Create: `tests/test_invite/test_service.py`
|
||||
|
||||
**Step 1: Write the failing tests**
|
||||
|
||||
Uses in-memory SQLite fixtures for the MagicLinkRepository.
|
||||
|
||||
```python
|
||||
# tests/test_invite/test_service.py
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import aiosqlite
|
||||
import pytest
|
||||
|
||||
from fastapi_oidc_op.invite.service import MagicLinkService
|
||||
from fastapi_oidc_op.store.sqlite.migrations import run_migrations
|
||||
from fastapi_oidc_op.store.sqlite.repositories import SQLiteMagicLinkRepository
|
||||
|
||||
MIGRATIONS_DIR = (
|
||||
Path(__file__).resolve().parent.parent.parent / "src" / "fastapi_oidc_op" / "store" / "sqlite" / "migrations"
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def db():
|
||||
conn = await aiosqlite.connect(":memory:")
|
||||
conn.row_factory = aiosqlite.Row
|
||||
await conn.execute("PRAGMA foreign_keys=ON")
|
||||
await run_migrations(conn, MIGRATIONS_DIR)
|
||||
yield conn
|
||||
await conn.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def repo(db: aiosqlite.Connection) -> SQLiteMagicLinkRepository:
|
||||
return SQLiteMagicLinkRepository(db)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def service(repo: SQLiteMagicLinkRepository) -> MagicLinkService:
|
||||
return MagicLinkService(repo=repo, ttl=3600)
|
||||
|
||||
|
||||
async def test_create_returns_magic_link(service: MagicLinkService) -> None:
|
||||
link = await service.create(username="alice")
|
||||
assert link.username == "alice"
|
||||
assert link.token # non-empty
|
||||
assert link.used is False
|
||||
assert link.expires_at > datetime.now(UTC)
|
||||
|
||||
|
||||
async def test_create_generates_unique_tokens(service: MagicLinkService) -> None:
|
||||
link1 = await service.create(username="alice")
|
||||
link2 = await service.create(username="bob")
|
||||
assert link1.token != link2.token
|
||||
|
||||
|
||||
async def test_create_with_optional_fields(service: MagicLinkService) -> None:
|
||||
link = await service.create(username="alice", created_by="admin-id", note="Welcome")
|
||||
assert link.created_by == "admin-id"
|
||||
assert link.note == "Welcome"
|
||||
|
||||
|
||||
async def test_create_respects_ttl(service: MagicLinkService) -> None:
|
||||
link = await service.create(username="alice")
|
||||
expected_min = datetime.now(UTC) + timedelta(seconds=3500)
|
||||
expected_max = datetime.now(UTC) + timedelta(seconds=3700)
|
||||
assert expected_min < link.expires_at < expected_max
|
||||
|
||||
|
||||
async def test_validate_valid_token(service: MagicLinkService) -> None:
|
||||
link = await service.create(username="alice")
|
||||
result = await service.validate(link.token)
|
||||
assert result is not None
|
||||
assert result.token == link.token
|
||||
assert result.username == "alice"
|
||||
|
||||
|
||||
async def test_validate_nonexistent_token(service: MagicLinkService) -> None:
|
||||
result = await service.validate("nonexistent-token")
|
||||
assert result is None
|
||||
|
||||
|
||||
async def test_validate_used_token(service: MagicLinkService) -> None:
|
||||
link = await service.create(username="alice")
|
||||
await service.mark_used(link.token)
|
||||
result = await service.validate(link.token)
|
||||
assert result is None
|
||||
|
||||
|
||||
async def test_validate_expired_token(
|
||||
service: MagicLinkService, repo: SQLiteMagicLinkRepository
|
||||
) -> None:
|
||||
# Create a link that's already expired by using a very short TTL service
|
||||
expired_service = MagicLinkService(repo=repo, ttl=0)
|
||||
link = await expired_service.create(username="alice")
|
||||
# The link expires_at is essentially now, so by the time we validate it should be expired
|
||||
# To be safe, patch datetime to ensure expiry
|
||||
result = await service.validate(link.token)
|
||||
assert result is None
|
||||
|
||||
|
||||
async def test_mark_used_returns_true(service: MagicLinkService) -> None:
|
||||
link = await service.create(username="alice")
|
||||
result = await service.mark_used(link.token)
|
||||
assert result is True
|
||||
|
||||
|
||||
async def test_mark_used_nonexistent_returns_false(service: MagicLinkService) -> None:
|
||||
result = await service.mark_used("nonexistent")
|
||||
assert result is False
|
||||
|
||||
|
||||
async def test_cleanup_expired(service: MagicLinkService, repo: SQLiteMagicLinkRepository) -> None:
|
||||
# Create an expired link
|
||||
expired_service = MagicLinkService(repo=repo, ttl=-3600)
|
||||
await expired_service.create(username="expired-user")
|
||||
|
||||
# Create a valid link
|
||||
await service.create(username="valid-user")
|
||||
|
||||
count = await service.cleanup_expired()
|
||||
assert count == 1
|
||||
```
|
||||
|
||||
**Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `pytest tests/test_invite/test_service.py -v`
|
||||
Expected: FAIL — cannot import `MagicLinkService`
|
||||
|
||||
**Step 3: Create `__init__.py` and write the implementation**
|
||||
|
||||
```python
|
||||
# tests/test_invite/__init__.py
|
||||
# (empty file)
|
||||
```
|
||||
|
||||
```python
|
||||
# src/fastapi_oidc_op/invite/service.py
|
||||
import secrets
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
from fastapi_oidc_op.models import MagicLink
|
||||
from fastapi_oidc_op.store.protocols import MagicLinkRepository
|
||||
|
||||
|
||||
class MagicLinkService:
|
||||
"""Magic link token lifecycle management."""
|
||||
|
||||
def __init__(self, repo: MagicLinkRepository, ttl: int = 86400) -> None:
|
||||
self._repo = repo
|
||||
self._ttl = ttl
|
||||
|
||||
async def create(
|
||||
self,
|
||||
username: str,
|
||||
created_by: str | None = None,
|
||||
note: str | None = None,
|
||||
) -> MagicLink:
|
||||
"""Generate and store a new magic link for the given username."""
|
||||
token = secrets.token_urlsafe(32)
|
||||
expires_at = datetime.now(UTC) + timedelta(seconds=self._ttl)
|
||||
link = MagicLink(
|
||||
token=token,
|
||||
username=username,
|
||||
expires_at=expires_at,
|
||||
created_by=created_by,
|
||||
note=note,
|
||||
)
|
||||
return await self._repo.create(link)
|
||||
|
||||
async def validate(self, token: str) -> MagicLink | None:
|
||||
"""Validate a magic link token.
|
||||
|
||||
Returns the MagicLink if it exists, is not used, and has not expired.
|
||||
Returns None otherwise.
|
||||
"""
|
||||
link = await self._repo.get_by_token(token)
|
||||
if link is None or link.used:
|
||||
return None
|
||||
if link.expires_at < datetime.now(UTC):
|
||||
return None
|
||||
return link
|
||||
|
||||
async def mark_used(self, token: str) -> bool:
|
||||
"""Mark a magic link as used. Returns True if found and marked."""
|
||||
return await self._repo.mark_used(token)
|
||||
|
||||
async def cleanup_expired(self) -> int:
|
||||
"""Delete expired unused links. Returns count deleted."""
|
||||
return await self._repo.delete_expired()
|
||||
```
|
||||
|
||||
**Step 4: Run tests to verify they pass**
|
||||
|
||||
Run: `pytest tests/test_invite/test_service.py -v`
|
||||
Expected: All PASSED
|
||||
|
||||
**Step 5: Run quality gate**
|
||||
|
||||
Run: `./scripts/check.sh`
|
||||
Expected: All green
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/fastapi_oidc_op/invite/service.py tests/test_invite/__init__.py tests/test_invite/test_service.py
|
||||
git commit -m "feat: add MagicLinkService with token create/validate/cleanup"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 4: Update Design Documents
|
||||
|
||||
**Files:**
|
||||
- Modify: `docs/plans/2026-02-12-sqlite-repositories-design.md`
|
||||
|
||||
**Step 1: Update the roadmap section**
|
||||
|
||||
In `docs/plans/2026-02-12-sqlite-repositories-design.md`, update the Roadmap section to mark Authentication services as complete:
|
||||
|
||||
Change:
|
||||
```
|
||||
3. **Authentication** (WebAuthn + password) — next phase
|
||||
```
|
||||
|
||||
To:
|
||||
```
|
||||
3. ~~Authentication services~~ (done) — PasswordService, WebAuthnService, MagicLinkService
|
||||
4. **Authentication routes** (login/register endpoints + templates) — next phase
|
||||
```
|
||||
|
||||
And renumber the remaining items.
|
||||
|
||||
**Step 2: Commit**
|
||||
|
||||
```bash
|
||||
git add docs/plans/2026-02-12-sqlite-repositories-design.md
|
||||
git commit -m "docs: update roadmap to reflect completed auth services"
|
||||
```
|
||||
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
|
|
@ -1,820 +0,0 @@
|
|||
# E2E Playwright Tests Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Comprehensive end-to-end browser tests covering all user-facing flows in Porchlight.
|
||||
|
||||
**Architecture:** Each test file covers one functional area. Tests use raw Playwright (no test framework) with a shared assertion helper pattern. Server-side state is set up via direct HTTP calls to the API where the browser UI can't reach (e.g., creating users/magic links). The existing `run.sh` handles app lifecycle.
|
||||
|
||||
**Tech Stack:** Playwright (Node.js), Chromium headless, HTMX-aware testing patterns.
|
||||
|
||||
---
|
||||
|
||||
## Test Inventory
|
||||
|
||||
The app has these testable user flows:
|
||||
|
||||
| Flow | Route(s) | Test File |
|
||||
|---|---|---|
|
||||
| Login page (existing) | `GET /login` | `test_login.js` (done) |
|
||||
| Password auth + error states | `POST /login/password` | `test_password_auth.js` |
|
||||
| Magic link registration | `GET /register/{token}` | `test_registration.js` |
|
||||
| Credential management page | `GET /manage/credentials` | `test_credentials.js` |
|
||||
| Set/change password (HTMX) | `POST /manage/credentials/password` | `test_credentials.js` |
|
||||
| Auth guard (redirect to login) | All `/manage/*` routes | `test_auth_guard.js` |
|
||||
| Health endpoint | `GET /health` | `test_health.js` |
|
||||
|
||||
WebAuthn flows (`/login/webauthn/*`, `/manage/credentials/webauthn/*`) require hardware key simulation which is complex. They are excluded from this plan and noted as a future enhancement.
|
||||
|
||||
---
|
||||
|
||||
## Shared Helper
|
||||
|
||||
All test files duplicate the `assert()` pattern. Before writing new tests, extract a shared helper.
|
||||
|
||||
---
|
||||
|
||||
### Task 1: Extract shared test helper
|
||||
|
||||
**Files:**
|
||||
- Create: `tests/e2e/helpers.js`
|
||||
- Modify: `tests/e2e/test_login.js`
|
||||
|
||||
**Step 1: Create `tests/e2e/helpers.js`**
|
||||
|
||||
```javascript
|
||||
// tests/e2e/helpers.js
|
||||
// Shared utilities for Porchlight e2e tests.
|
||||
|
||||
const { chromium } = require('playwright');
|
||||
|
||||
const TARGET_URL = process.env.TARGET_URL || 'http://localhost:8099';
|
||||
|
||||
/**
|
||||
* Simple test runner with pass/fail counting.
|
||||
*
|
||||
* Usage:
|
||||
* const { run } = require('./helpers');
|
||||
* run(async (page, assert) => {
|
||||
* await page.goto(TARGET_URL + '/login');
|
||||
* assert(true, 'page loaded');
|
||||
* });
|
||||
*/
|
||||
async function run(testFn) {
|
||||
let passed = 0;
|
||||
let failed = 0;
|
||||
|
||||
function assert(condition, description) {
|
||||
if (condition) {
|
||||
console.log(` PASS: ${description}`);
|
||||
passed++;
|
||||
} else {
|
||||
console.log(` FAIL: ${description}`);
|
||||
failed++;
|
||||
}
|
||||
}
|
||||
|
||||
const browser = await chromium.launch({ headless: true });
|
||||
const page = await browser.newPage();
|
||||
|
||||
try {
|
||||
await testFn(page, assert);
|
||||
} finally {
|
||||
await browser.close();
|
||||
}
|
||||
|
||||
console.log(`\n========================================`);
|
||||
console.log(`Results: ${passed} passed, ${failed} failed`);
|
||||
console.log(`========================================\n`);
|
||||
|
||||
process.exit(failed > 0 ? 1 : 0);
|
||||
}
|
||||
|
||||
module.exports = { TARGET_URL, run };
|
||||
```
|
||||
|
||||
**Step 2: Refactor `tests/e2e/test_login.js` to use the helper**
|
||||
|
||||
Replace the boilerplate (chromium launch, assert function, browser.close, summary) with:
|
||||
|
||||
```javascript
|
||||
const { TARGET_URL, run } = require('./helpers');
|
||||
|
||||
run(async (page, assert) => {
|
||||
// ... all existing test logic stays the same, just remove boilerplate ...
|
||||
});
|
||||
```
|
||||
|
||||
Keep all existing assertions unchanged. Only remove duplicated setup/teardown.
|
||||
|
||||
**Step 3: Run test to verify refactor is clean**
|
||||
|
||||
Run: `./tests/e2e/run.sh tests/e2e/test_login.js`
|
||||
Expected: 28 passed, 0 failed
|
||||
|
||||
**Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add tests/e2e/helpers.js tests/e2e/test_login.js
|
||||
git commit -m "refactor: extract shared e2e test helper with assert runner"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 2: Password authentication test
|
||||
|
||||
**Files:**
|
||||
- Create: `tests/e2e/test_password_auth.js`
|
||||
|
||||
This test needs an existing user with a password. Since there's no admin API to create users directly via the browser, we use the magic link registration flow as setup, then set a password, and test login.
|
||||
|
||||
**Step 1: Create `tests/e2e/test_password_auth.js`**
|
||||
|
||||
```javascript
|
||||
// tests/e2e/test_password_auth.js
|
||||
// Tests password login flow: successful login, invalid password, nonexistent user.
|
||||
|
||||
const { TARGET_URL, run } = require('./helpers');
|
||||
|
||||
run(async (page, assert) => {
|
||||
// ---- Setup: Register a user via magic link API ----
|
||||
// Create a magic link by hitting the app's internal service.
|
||||
// We do this by making a direct API call to create a magic link token,
|
||||
// then visiting the registration URL.
|
||||
console.log('\n--- Setup: create test user ---');
|
||||
|
||||
// Use the /register endpoint with a token created via the API.
|
||||
// The app uses MagicLinkService internally. Since we don't have an admin API,
|
||||
// we'll use a helper endpoint approach: insert via direct DB calls isn't possible
|
||||
// in e2e. Instead, we use Playwright's request context to call a setup script.
|
||||
//
|
||||
// Alternative: Create magic link via a Python helper script.
|
||||
// For now, we use a pragmatic approach: start a Python subprocess to create a
|
||||
// magic link and return the token.
|
||||
|
||||
// Execute setup script that creates a user with a password and returns credentials
|
||||
const { execSync } = require('child_process');
|
||||
const setupResult = execSync(
|
||||
`uv run python -c "
|
||||
import asyncio, json, aiosqlite
|
||||
from pathlib import Path
|
||||
from fastapi_oidc_op.config import Settings, StorageBackend
|
||||
from fastapi_oidc_op.store.sqlite.repositories import SQLiteUserRepository, SQLiteCredentialRepository, SQLiteMagicLinkRepository
|
||||
from fastapi_oidc_op.store.sqlite.migrations import run_migrations
|
||||
from fastapi_oidc_op.invite.service import MagicLinkService
|
||||
|
||||
async def setup():
|
||||
# Connect to the same DB the running app uses (in-memory won't work across processes)
|
||||
# So we use the magic link approach: create link, return token
|
||||
# Actually for :memory: DB, we must go through the running app's HTTP interface.
|
||||
# Use the health endpoint to verify the app is up, then we need another approach.
|
||||
#
|
||||
# Best approach: just test the flows we can through the browser.
|
||||
pass
|
||||
|
||||
asyncio.run(setup())
|
||||
"`,
|
||||
{ encoding: 'utf-8' }
|
||||
);
|
||||
|
||||
// REVISED APPROACH: Since the app uses :memory: SQLite, we cannot access the DB
|
||||
// from outside the process. Instead, we test what we can through the browser:
|
||||
//
|
||||
// 1. Go to login page
|
||||
// 2. Try to login with nonexistent user (should show error)
|
||||
// 3. Try to login with empty fields (browser validation)
|
||||
|
||||
// ---- Test: Login with nonexistent user ----
|
||||
console.log('\n--- Login with nonexistent user ---');
|
||||
await page.goto(`${TARGET_URL}/login`);
|
||||
|
||||
// Fill in form and submit via HTMX
|
||||
await page.fill('#username', 'nonexistent');
|
||||
await page.fill('#password', 'wrongpassword');
|
||||
await page.click('form[hx-post="/login/password"] button[type="submit"]');
|
||||
|
||||
// Wait for HTMX response
|
||||
await page.waitForSelector('[role="alert"]', { timeout: 5000 });
|
||||
const errorText = await page.locator('[role="alert"]').textContent();
|
||||
assert(
|
||||
errorText.includes('Invalid username or password'),
|
||||
`Shows error for nonexistent user (got: "${errorText}")`
|
||||
);
|
||||
|
||||
// ---- Test: Login with empty username (browser validation) ----
|
||||
console.log('\n--- Browser form validation ---');
|
||||
await page.goto(`${TARGET_URL}/login`);
|
||||
|
||||
// The username field has `required` attribute, so clicking submit with empty
|
||||
// fields should not send the request. We verify the field has the required attribute.
|
||||
const usernameRequired = await page.locator('#username').getAttribute('required');
|
||||
assert(usernameRequired !== null, 'Username field has required attribute');
|
||||
|
||||
const passwordRequired = await page.locator('#password').getAttribute('required');
|
||||
assert(passwordRequired !== null, 'Password field has required attribute');
|
||||
|
||||
// ---- Test: HTMX attributes are correct ----
|
||||
console.log('\n--- HTMX form configuration ---');
|
||||
const hxPost = await page.locator('form[hx-post="/login/password"]').getAttribute('hx-post');
|
||||
assert(hxPost === '/login/password', `Form posts to /login/password`);
|
||||
|
||||
const hxTarget = await page.locator('form[hx-post="/login/password"]').getAttribute('hx-target');
|
||||
assert(hxTarget === '#login-error', `Form targets #login-error div`);
|
||||
|
||||
const hxSwap = await page.locator('form[hx-post="/login/password"]').getAttribute('hx-swap');
|
||||
assert(hxSwap === 'innerHTML', `Form swaps innerHTML`);
|
||||
});
|
||||
```
|
||||
|
||||
**NOTE:** The `:memory:` SQLite database is only accessible within the running uvicorn process. E2E tests cannot seed data externally. The password auth test is therefore limited to testing error states and form behavior that doesn't require pre-existing users. Full login-success testing requires either:
|
||||
- (a) A test setup endpoint (future)
|
||||
- (b) Using a file-based SQLite path for e2e so a setup script can seed data
|
||||
- (c) Testing through the magic link registration flow first (Task 3)
|
||||
|
||||
We take approach (c): Task 3 will test the full happy path (register via magic link → set password → logout → login with password).
|
||||
|
||||
**Step 2: Run the test**
|
||||
|
||||
Run: `./tests/e2e/run.sh tests/e2e/test_password_auth.js`
|
||||
Expected: All PASS
|
||||
|
||||
**Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add tests/e2e/test_password_auth.js
|
||||
git commit -m "test: add e2e test for password login error states and form validation"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 3: Magic link registration + full login flow
|
||||
|
||||
This is the most important test — it exercises the complete user journey:
|
||||
1. Visit magic link → user created, redirected to credentials page
|
||||
2. Set password on credentials page
|
||||
3. Logout
|
||||
4. Login with the password just set
|
||||
|
||||
Since we can't create magic links from outside the process, we need to change `run.sh` to use a file-based SQLite DB and add a setup script. Alternatively, we add a **test-only setup endpoint** to the app that is only active when `debug=True`.
|
||||
|
||||
**Files:**
|
||||
- Modify: `tests/e2e/run.sh` — use file-based SQLite in a temp directory
|
||||
- Create: `tests/e2e/setup_db.py` — Python script to seed test data into the SQLite DB
|
||||
- Create: `tests/e2e/test_full_flow.js` — full registration → password → logout → login test
|
||||
|
||||
**Step 1: Modify `run.sh` to use file-based SQLite**
|
||||
|
||||
Change the SQLite path from `:memory:` to a temp file:
|
||||
|
||||
```bash
|
||||
# Near the top, after SCRIPT_DIR/PROJECT_ROOT:
|
||||
E2E_TMPDIR="$(mktemp -d)"
|
||||
export OIDC_OP_SQLITE_PATH="${E2E_TMPDIR}/e2e_test.db"
|
||||
|
||||
# In the cleanup function, add:
|
||||
rm -rf "$E2E_TMPDIR"
|
||||
```
|
||||
|
||||
Update the uvicorn start command to use `OIDC_OP_SQLITE_PATH` from the environment (remove the `:memory:` override).
|
||||
|
||||
**Step 2: Create `tests/e2e/setup_db.py`**
|
||||
|
||||
```python
|
||||
#!/usr/bin/env python3
|
||||
"""Seed the e2e test database with test fixtures.
|
||||
|
||||
Outputs JSON with the created test data (magic link tokens, usernames, etc.)
|
||||
so the JS tests can use them.
|
||||
|
||||
Requires OIDC_OP_SQLITE_PATH env var pointing to the app's SQLite DB.
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiosqlite
|
||||
|
||||
from fastapi_oidc_op.authn.password import PasswordService
|
||||
from fastapi_oidc_op.invite.service import MagicLinkService
|
||||
from fastapi_oidc_op.models import PasswordCredential, User
|
||||
from fastapi_oidc_op.store.sqlite.repositories import (
|
||||
SQLiteCredentialRepository,
|
||||
SQLiteMagicLinkRepository,
|
||||
SQLiteUserRepository,
|
||||
)
|
||||
|
||||
|
||||
async def seed():
|
||||
db_path = os.environ.get("OIDC_OP_SQLITE_PATH")
|
||||
if not db_path:
|
||||
print("OIDC_OP_SQLITE_PATH not set", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
db = await aiosqlite.connect(db_path)
|
||||
db.row_factory = aiosqlite.Row
|
||||
|
||||
user_repo = SQLiteUserRepository(db)
|
||||
cred_repo = SQLiteCredentialRepository(db)
|
||||
magic_link_repo = SQLiteMagicLinkRepository(db)
|
||||
password_service = PasswordService()
|
||||
magic_link_service = MagicLinkService(repo=magic_link_repo)
|
||||
|
||||
result = {}
|
||||
|
||||
# 1. Create a magic link for registration test
|
||||
link = await magic_link_service.create(username="newuser")
|
||||
result["register_token"] = link.token
|
||||
result["register_username"] = "newuser"
|
||||
|
||||
# 2. Create a user with a password for login test
|
||||
user = User(userid="test-user-01", username="testuser", groups=["users"])
|
||||
await user_repo.create(user)
|
||||
password_hash = password_service.hash("testpassword123")
|
||||
await cred_repo.create_password(
|
||||
PasswordCredential(user_id=user.userid, password_hash=password_hash)
|
||||
)
|
||||
result["login_username"] = "testuser"
|
||||
result["login_password"] = "testpassword123"
|
||||
|
||||
# 3. Create an expired/used magic link for negative test
|
||||
expired_link = await magic_link_service.create(username="expired")
|
||||
await magic_link_service.mark_used(expired_link.token)
|
||||
result["used_token"] = expired_link.token
|
||||
|
||||
await db.close()
|
||||
print(json.dumps(result))
|
||||
|
||||
|
||||
asyncio.run(seed())
|
||||
```
|
||||
|
||||
**Step 3: Update `run.sh` to run setup script after server is healthy**
|
||||
|
||||
After the "Server ready." line, add:
|
||||
|
||||
```bash
|
||||
# --- Seed test data ---
|
||||
echo "Seeding test data..."
|
||||
E2E_FIXTURES=$(uv run --directory "$PROJECT_ROOT" python tests/e2e/setup_db.py)
|
||||
export E2E_FIXTURES
|
||||
echo "Test fixtures: ${E2E_FIXTURES}"
|
||||
```
|
||||
|
||||
**Step 4: Create `tests/e2e/test_full_flow.js`**
|
||||
|
||||
```javascript
|
||||
// tests/e2e/test_full_flow.js
|
||||
// Full user journey: magic link registration → set password → logout → login.
|
||||
|
||||
const { TARGET_URL, run } = require('./helpers');
|
||||
|
||||
run(async (page, assert) => {
|
||||
const fixtures = JSON.parse(process.env.E2E_FIXTURES || '{}');
|
||||
assert(fixtures.register_token, 'Test fixtures loaded (register_token present)');
|
||||
|
||||
// ---- Step 1: Register via magic link ----
|
||||
console.log('\n--- Magic link registration ---');
|
||||
await page.goto(`${TARGET_URL}/register/${fixtures.register_token}`);
|
||||
|
||||
// Should redirect to /manage/credentials?setup=1
|
||||
await page.waitForURL('**/manage/credentials?setup=1', { timeout: 5000 });
|
||||
assert(
|
||||
page.url().includes('/manage/credentials'),
|
||||
`Redirected to credentials page (url: ${page.url()})`
|
||||
);
|
||||
|
||||
// Should show welcome message
|
||||
const welcome = page.locator('[role="status"]');
|
||||
assert(await welcome.isVisible(), 'Welcome/setup message is visible');
|
||||
const welcomeText = await welcome.textContent();
|
||||
assert(
|
||||
welcomeText.includes('Welcome'),
|
||||
`Welcome message shown (got: "${welcomeText}")`
|
||||
);
|
||||
|
||||
// Page title should be Porchlight
|
||||
const title = await page.title();
|
||||
assert(title.includes('Porchlight'), `Credentials page title contains Porchlight`);
|
||||
|
||||
// ---- Step 2: Set password ----
|
||||
console.log('\n--- Set password ---');
|
||||
const passwordInput = page.locator('#password');
|
||||
const confirmInput = page.locator('#confirm');
|
||||
assert(await passwordInput.isVisible(), 'Password input is visible');
|
||||
assert(await confirmInput.isVisible(), 'Confirm password input is visible');
|
||||
|
||||
await passwordInput.fill('mypassword123');
|
||||
await confirmInput.fill('mypassword123');
|
||||
await page.click('#password-section button[type="submit"]');
|
||||
|
||||
// Wait for HTMX response — should show success
|
||||
await page.waitForSelector('[role="status"]', { timeout: 5000 });
|
||||
// Find the status message inside #password-section
|
||||
const successMsg = page.locator('#password-section [role="status"]');
|
||||
await successMsg.waitFor({ timeout: 5000 });
|
||||
const successText = await successMsg.textContent();
|
||||
assert(
|
||||
successText.includes('Password updated'),
|
||||
`Password set successfully (got: "${successText}")`
|
||||
);
|
||||
|
||||
// ---- Step 3: Logout ----
|
||||
console.log('\n--- Logout ---');
|
||||
// The app uses POST /logout with HX-Redirect. We'll call it via fetch.
|
||||
const logoutResp = await page.request.post(`${TARGET_URL}/logout`);
|
||||
assert(logoutResp.ok() || logoutResp.status() === 200, `Logout returns OK`);
|
||||
|
||||
// Navigate to credentials — should redirect to login
|
||||
await page.goto(`${TARGET_URL}/manage/credentials`);
|
||||
await page.waitForURL('**/login', { timeout: 5000 });
|
||||
assert(page.url().includes('/login'), `Redirected to login after logout`);
|
||||
|
||||
// ---- Step 4: Login with the password we just set ----
|
||||
console.log('\n--- Login with new password ---');
|
||||
await page.fill('#username', fixtures.register_username);
|
||||
await page.fill('#password', 'mypassword123');
|
||||
|
||||
// Submit via HTMX — on success, HX-Redirect header triggers redirect
|
||||
await page.click('form[hx-post="/login/password"] button[type="submit"]');
|
||||
|
||||
// Wait for redirect to credentials page
|
||||
await page.waitForURL('**/manage/credentials', { timeout: 5000 });
|
||||
assert(
|
||||
page.url().includes('/manage/credentials'),
|
||||
`Login succeeded, redirected to credentials (url: ${page.url()})`
|
||||
);
|
||||
|
||||
// Should NOT show setup message (no ?setup=1)
|
||||
const setupMsgCount = await page.locator('[role="status"]:has-text("Welcome")').count();
|
||||
assert(setupMsgCount === 0, 'No welcome/setup message on normal login');
|
||||
});
|
||||
```
|
||||
|
||||
**Step 5: Run tests**
|
||||
|
||||
Run: `./tests/e2e/run.sh tests/e2e/test_full_flow.js`
|
||||
Expected: All PASS
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add tests/e2e/run.sh tests/e2e/setup_db.py tests/e2e/test_full_flow.js
|
||||
git commit -m "test: add full user journey e2e test (register, set password, logout, login)"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 4: Auth guard test
|
||||
|
||||
**Files:**
|
||||
- Create: `tests/e2e/test_auth_guard.js`
|
||||
|
||||
Tests that unauthenticated users are redirected to `/login` when accessing protected pages.
|
||||
|
||||
**Step 1: Create `tests/e2e/test_auth_guard.js`**
|
||||
|
||||
```javascript
|
||||
// tests/e2e/test_auth_guard.js
|
||||
// Tests that protected routes redirect unauthenticated users to /login.
|
||||
|
||||
const { TARGET_URL, run } = require('./helpers');
|
||||
|
||||
run(async (page, assert) => {
|
||||
// ---- Unauthenticated access to /manage/credentials ----
|
||||
console.log('\n--- Auth guard: /manage/credentials ---');
|
||||
await page.goto(`${TARGET_URL}/manage/credentials`);
|
||||
await page.waitForURL('**/login', { timeout: 5000 });
|
||||
assert(page.url().includes('/login'), `GET /manage/credentials redirects to /login`);
|
||||
|
||||
// ---- Unauthenticated access to /manage/credentials?setup=1 ----
|
||||
console.log('\n--- Auth guard: /manage/credentials?setup=1 ---');
|
||||
await page.goto(`${TARGET_URL}/manage/credentials?setup=1`);
|
||||
await page.waitForURL('**/login', { timeout: 5000 });
|
||||
assert(page.url().includes('/login'), `GET /manage/credentials?setup=1 redirects to /login`);
|
||||
});
|
||||
```
|
||||
|
||||
**Step 2: Run test**
|
||||
|
||||
Run: `./tests/e2e/run.sh tests/e2e/test_auth_guard.js`
|
||||
Expected: All PASS
|
||||
|
||||
**Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add tests/e2e/test_auth_guard.js
|
||||
git commit -m "test: add e2e auth guard test for protected routes"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 5: Password login error states test
|
||||
|
||||
**Files:**
|
||||
- Create: `tests/e2e/test_password_auth.js`
|
||||
|
||||
**Step 1: Create `tests/e2e/test_password_auth.js`**
|
||||
|
||||
```javascript
|
||||
// tests/e2e/test_password_auth.js
|
||||
// Tests password login error states: wrong password, nonexistent user, form validation.
|
||||
|
||||
const { TARGET_URL, run } = require('./helpers');
|
||||
|
||||
run(async (page, assert) => {
|
||||
const fixtures = JSON.parse(process.env.E2E_FIXTURES || '{}');
|
||||
|
||||
// ---- Test: Nonexistent user ----
|
||||
console.log('\n--- Login: nonexistent user ---');
|
||||
await page.goto(`${TARGET_URL}/login`);
|
||||
await page.fill('#username', 'nobody');
|
||||
await page.fill('#password', 'whatever');
|
||||
await page.click('form[hx-post="/login/password"] button[type="submit"]');
|
||||
|
||||
await page.waitForSelector('[role="alert"]', { timeout: 5000 });
|
||||
const error1 = await page.locator('[role="alert"]').textContent();
|
||||
assert(
|
||||
error1.includes('Invalid username or password'),
|
||||
`Error shown for nonexistent user (got: "${error1}")`
|
||||
);
|
||||
|
||||
// ---- Test: Wrong password for existing user ----
|
||||
console.log('\n--- Login: wrong password ---');
|
||||
await page.goto(`${TARGET_URL}/login`);
|
||||
await page.fill('#username', fixtures.login_username);
|
||||
await page.fill('#password', 'wrongpassword');
|
||||
await page.click('form[hx-post="/login/password"] button[type="submit"]');
|
||||
|
||||
await page.waitForSelector('[role="alert"]', { timeout: 5000 });
|
||||
const error2 = await page.locator('[role="alert"]').textContent();
|
||||
assert(
|
||||
error2.includes('Invalid username or password'),
|
||||
`Error shown for wrong password (got: "${error2}")`
|
||||
);
|
||||
|
||||
// ---- Test: Successful login ----
|
||||
console.log('\n--- Login: correct password ---');
|
||||
await page.goto(`${TARGET_URL}/login`);
|
||||
await page.fill('#username', fixtures.login_username);
|
||||
await page.fill('#password', fixtures.login_password);
|
||||
await page.click('form[hx-post="/login/password"] button[type="submit"]');
|
||||
|
||||
await page.waitForURL('**/manage/credentials', { timeout: 5000 });
|
||||
assert(
|
||||
page.url().includes('/manage/credentials'),
|
||||
`Successful login redirects to credentials (url: ${page.url()})`
|
||||
);
|
||||
|
||||
// ---- Test: Form validation attributes ----
|
||||
console.log('\n--- Form validation attributes ---');
|
||||
await page.goto(`${TARGET_URL}/login`);
|
||||
const usernameRequired = await page.locator('#username').getAttribute('required');
|
||||
assert(usernameRequired !== null, 'Username has required attribute');
|
||||
const passwordRequired = await page.locator('#password').getAttribute('required');
|
||||
assert(passwordRequired !== null, 'Password has required attribute');
|
||||
|
||||
const usernameAutocomplete = await page.locator('#username').getAttribute('autocomplete');
|
||||
assert(usernameAutocomplete === 'username', `Username autocomplete is "username"`);
|
||||
const passwordAutocomplete = await page.locator('#password').getAttribute('autocomplete');
|
||||
assert(passwordAutocomplete === 'current-password', `Password autocomplete is "current-password"`);
|
||||
});
|
||||
```
|
||||
|
||||
**Step 2: Run test**
|
||||
|
||||
Run: `./tests/e2e/run.sh tests/e2e/test_password_auth.js`
|
||||
Expected: All PASS
|
||||
|
||||
**Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add tests/e2e/test_password_auth.js
|
||||
git commit -m "test: add e2e password auth test with error states and successful login"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 6: Credentials page test
|
||||
|
||||
**Files:**
|
||||
- Create: `tests/e2e/test_credentials.js`
|
||||
|
||||
Tests the credential management page when authenticated: page structure, set password (validation errors), change password.
|
||||
|
||||
**Step 1: Create `tests/e2e/test_credentials.js`**
|
||||
|
||||
```javascript
|
||||
// tests/e2e/test_credentials.js
|
||||
// Tests credential management page: structure, password set/change, validation errors.
|
||||
|
||||
const { TARGET_URL, run } = require('./helpers');
|
||||
|
||||
run(async (page, assert) => {
|
||||
const fixtures = JSON.parse(process.env.E2E_FIXTURES || '{}');
|
||||
|
||||
// ---- Setup: Log in first ----
|
||||
console.log('\n--- Setup: login ---');
|
||||
await page.goto(`${TARGET_URL}/login`);
|
||||
await page.fill('#username', fixtures.login_username);
|
||||
await page.fill('#password', fixtures.login_password);
|
||||
await page.click('form[hx-post="/login/password"] button[type="submit"]');
|
||||
await page.waitForURL('**/manage/credentials', { timeout: 5000 });
|
||||
|
||||
// ---- Page structure ----
|
||||
console.log('\n--- Credentials page structure ---');
|
||||
const title = await page.title();
|
||||
assert(title.includes('Credentials'), `Title contains "Credentials" (got: "${title}")`);
|
||||
assert(title.includes('Porchlight'), `Title contains "Porchlight" (got: "${title}")`);
|
||||
|
||||
const h1 = await page.locator('h1').textContent();
|
||||
assert(h1 === 'Credentials', `H1 says "Credentials" (got: "${h1}")`);
|
||||
|
||||
// Security keys section
|
||||
const securityKeysH2 = page.locator('h2:has-text("Security keys")');
|
||||
assert(await securityKeysH2.isVisible(), 'Security keys heading visible');
|
||||
|
||||
const registerBtn = page.locator('#webauthn-register-btn');
|
||||
assert(await registerBtn.isVisible(), 'Add security key button visible');
|
||||
|
||||
// Password section
|
||||
const passwordH2 = page.locator('h2:has-text("Password")');
|
||||
assert(await passwordH2.isVisible(), 'Password heading visible');
|
||||
|
||||
const passwordStatus = page.locator('#password-section');
|
||||
assert(await passwordStatus.isVisible(), 'Password section visible');
|
||||
|
||||
// ---- Password validation: mismatch ----
|
||||
console.log('\n--- Password validation: mismatch ---');
|
||||
await page.fill('#password', 'newpassword1');
|
||||
await page.fill('#confirm', 'newpassword2');
|
||||
await page.click('#password-section button[type="submit"]');
|
||||
|
||||
await page.waitForSelector('#password-section [role="alert"]', { timeout: 5000 });
|
||||
const mismatchErr = await page.locator('#password-section [role="alert"]').textContent();
|
||||
assert(
|
||||
mismatchErr.includes('do not match'),
|
||||
`Shows mismatch error (got: "${mismatchErr}")`
|
||||
);
|
||||
|
||||
// ---- Password validation: too short ----
|
||||
console.log('\n--- Password validation: too short ---');
|
||||
// Reload page to clear HTMX state
|
||||
await page.goto(`${TARGET_URL}/manage/credentials`);
|
||||
await page.fill('#password', 'short');
|
||||
await page.fill('#confirm', 'short');
|
||||
await page.click('#password-section button[type="submit"]');
|
||||
|
||||
await page.waitForSelector('#password-section [role="alert"]', { timeout: 5000 });
|
||||
const shortErr = await page.locator('#password-section [role="alert"]').textContent();
|
||||
assert(
|
||||
shortErr.includes('at least 8 characters'),
|
||||
`Shows too-short error (got: "${shortErr}")`
|
||||
);
|
||||
|
||||
// ---- Password change: success ----
|
||||
console.log('\n--- Password change: success ---');
|
||||
await page.goto(`${TARGET_URL}/manage/credentials`);
|
||||
await page.fill('#password', 'newpassword123');
|
||||
await page.fill('#confirm', 'newpassword123');
|
||||
await page.click('#password-section button[type="submit"]');
|
||||
|
||||
await page.waitForSelector('#password-section [role="status"]', { timeout: 5000 });
|
||||
const successMsg = await page.locator('#password-section [role="status"]').textContent();
|
||||
assert(
|
||||
successMsg.includes('Password updated'),
|
||||
`Shows success message (got: "${successMsg}")`
|
||||
);
|
||||
});
|
||||
```
|
||||
|
||||
**Step 2: Run test**
|
||||
|
||||
Run: `./tests/e2e/run.sh tests/e2e/test_credentials.js`
|
||||
Expected: All PASS
|
||||
|
||||
**Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add tests/e2e/test_credentials.js
|
||||
git commit -m "test: add e2e credential management test with password validation"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 7: Registration error states test
|
||||
|
||||
**Files:**
|
||||
- Create: `tests/e2e/test_registration.js`
|
||||
|
||||
Tests magic link edge cases: used token, invalid token.
|
||||
|
||||
**Step 1: Create `tests/e2e/test_registration.js`**
|
||||
|
||||
```javascript
|
||||
// tests/e2e/test_registration.js
|
||||
// Tests magic link registration error states.
|
||||
|
||||
const { TARGET_URL, run } = require('./helpers');
|
||||
|
||||
run(async (page, assert) => {
|
||||
const fixtures = JSON.parse(process.env.E2E_FIXTURES || '{}');
|
||||
|
||||
// ---- Invalid token ----
|
||||
console.log('\n--- Invalid registration token ---');
|
||||
const resp1 = await page.goto(`${TARGET_URL}/register/invalid-token-12345`);
|
||||
assert(resp1.status() === 400, `Invalid token returns 400 (got: ${resp1.status()})`);
|
||||
const body1 = await page.locator('body').textContent();
|
||||
assert(
|
||||
body1.includes('Invalid or expired'),
|
||||
`Shows invalid/expired message (got: "${body1.trim()}")`
|
||||
);
|
||||
|
||||
// ---- Used token ----
|
||||
console.log('\n--- Used registration token ---');
|
||||
assert(fixtures.used_token, 'Used token fixture exists');
|
||||
const resp2 = await page.goto(`${TARGET_URL}/register/${fixtures.used_token}`);
|
||||
assert(resp2.status() === 400, `Used token returns 400 (got: ${resp2.status()})`);
|
||||
const body2 = await page.locator('body').textContent();
|
||||
assert(
|
||||
body2.includes('Invalid or expired'),
|
||||
`Shows invalid/expired for used token (got: "${body2.trim()}")`
|
||||
);
|
||||
});
|
||||
```
|
||||
|
||||
**Step 2: Run test**
|
||||
|
||||
Run: `./tests/e2e/run.sh tests/e2e/test_registration.js`
|
||||
Expected: All PASS
|
||||
|
||||
**Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add tests/e2e/test_registration.js
|
||||
git commit -m "test: add e2e registration error states test (invalid/used tokens)"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 8: Health endpoint test
|
||||
|
||||
**Files:**
|
||||
- Create: `tests/e2e/test_health.js`
|
||||
|
||||
Simple smoke test for the health endpoint.
|
||||
|
||||
**Step 1: Create `tests/e2e/test_health.js`**
|
||||
|
||||
```javascript
|
||||
// tests/e2e/test_health.js
|
||||
// Smoke test: health endpoint returns OK.
|
||||
|
||||
const { TARGET_URL, run } = require('./helpers');
|
||||
|
||||
run(async (page, assert) => {
|
||||
console.log('\n--- Health endpoint ---');
|
||||
const resp = await page.request.get(`${TARGET_URL}/health`);
|
||||
assert(resp.ok(), `Health endpoint returns 200 (status: ${resp.status()})`);
|
||||
|
||||
const body = await resp.json();
|
||||
assert(body.status === 'ok', `Health response is {"status":"ok"} (got: ${JSON.stringify(body)})`);
|
||||
});
|
||||
```
|
||||
|
||||
**Step 2: Run test**
|
||||
|
||||
Run: `./tests/e2e/run.sh tests/e2e/test_health.js`
|
||||
Expected: All PASS
|
||||
|
||||
**Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add tests/e2e/test_health.js
|
||||
git commit -m "test: add e2e health endpoint smoke test"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 9: Run full suite and verify
|
||||
|
||||
**Step 1: Run the complete e2e suite**
|
||||
|
||||
Run: `./tests/e2e/run.sh`
|
||||
Expected: All test files pass, server starts and stops cleanly.
|
||||
|
||||
**Step 2: Run the existing pytest suite to ensure no regressions**
|
||||
|
||||
Run: `uv run pytest -x -q`
|
||||
Expected: All 120 tests pass.
|
||||
|
||||
**Step 3: Final commit (if any cleanup needed)**
|
||||
|
||||
---
|
||||
|
||||
## Future Enhancements (Not in Scope)
|
||||
|
||||
- **WebAuthn tests** — Playwright supports virtual authenticators via CDP. Could test registration and login with `cdpSession.send('WebAuthn.enable')`.
|
||||
- **Dark mode tests** — Use `page.emulateMedia({ colorScheme: 'dark' })` to verify dark theme colors.
|
||||
- **Visual regression** — Screenshot comparison with `expect(page).toHaveScreenshot()` (requires @playwright/test).
|
||||
- **OIDC flow tests** — Once the OIDC provider is implemented, test authorization code flow end-to-end.
|
||||
File diff suppressed because it is too large
Load diff
|
|
@ -1,645 +0,0 @@
|
|||
# Rename to Porchlight + CLI Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Rename the project from `fastapi-oidc-op` / `fastapi_oidc_op` to `porchlight` and add a typer-based CLI with `init-admin` and `create-invite` commands.
|
||||
|
||||
**Architecture:** The rename is a mechanical find-and-replace across the entire codebase (package dir, imports, pyproject.toml, Dockerfile). The CLI adds `typer>=0.15` as a dependency, a shared `open_db()` context manager extracted from the lifespan DB setup, and a `cli.py` module with two async command implementations bridged to sync via `asyncio.run()`.
|
||||
|
||||
**Tech Stack:** Python 3.13, typer, aiosqlite, pydantic-settings
|
||||
|
||||
**Quality gate:** `./scripts/check.sh` (ruff format, ruff check, ty check, pytest — 147 tests)
|
||||
|
||||
---
|
||||
|
||||
### Task 1: Rename package directory
|
||||
|
||||
**Files:**
|
||||
- Rename: `src/fastapi_oidc_op/` → `src/porchlight/`
|
||||
|
||||
**Step 1: Move the directory**
|
||||
|
||||
```bash
|
||||
git mv src/fastapi_oidc_op src/porchlight
|
||||
```
|
||||
|
||||
**Step 2: Verify directory exists**
|
||||
|
||||
```bash
|
||||
ls src/porchlight/app.py
|
||||
```
|
||||
|
||||
Expected: file exists.
|
||||
|
||||
**Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add -A && git commit -m "refactor: rename package directory fastapi_oidc_op to porchlight"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 2: Update pyproject.toml
|
||||
|
||||
**Files:**
|
||||
- Modify: `pyproject.toml` (lines 2, 24, 31)
|
||||
|
||||
**Step 1: Update project name, script entry point, and hatch packages**
|
||||
|
||||
Change these three lines:
|
||||
|
||||
```
|
||||
line 2: name = "fastapi-oidc-op" → name = "porchlight"
|
||||
line 24: fastapi-oidc-op = "fastapi_oidc_op.cli:main" → porchlight = "porchlight.cli:main"
|
||||
line 31: packages = ["src/fastapi_oidc_op"] → packages = ["src/porchlight"]
|
||||
```
|
||||
|
||||
**Step 2: Add typer dependency**
|
||||
|
||||
Add `"typer>=0.15"` to the `dependencies` list.
|
||||
|
||||
**Step 3: Regenerate lock file**
|
||||
|
||||
```bash
|
||||
uv lock
|
||||
```
|
||||
|
||||
**Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add pyproject.toml uv.lock && git commit -m "refactor: update pyproject.toml for porchlight rename and add typer dependency"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 3: Update all imports in source files
|
||||
|
||||
**Files:**
|
||||
- Modify all `.py` files under `src/porchlight/` that contain `fastapi_oidc_op`
|
||||
|
||||
The following files need `fastapi_oidc_op` replaced with `porchlight` in all import statements:
|
||||
|
||||
- `src/porchlight/app.py` (lines 13-22: 10 imports)
|
||||
- `src/porchlight/config.py` (line 1: comment only)
|
||||
- `src/porchlight/dependencies.py` (line 3)
|
||||
- `src/porchlight/authn/routes.py` (lines 10-11)
|
||||
- `src/porchlight/invite/service.py` (lines 4-5)
|
||||
- `src/porchlight/manage/routes.py` (lines 7-8)
|
||||
- `src/porchlight/oidc/claims.py` (line 5)
|
||||
- `src/porchlight/oidc/endpoints.py` (line 13)
|
||||
- `src/porchlight/oidc/provider.py` (lines 7-8)
|
||||
- `src/porchlight/store/protocols.py` (line 3)
|
||||
- `src/porchlight/store/sqlite/repositories.py` (lines 5-6)
|
||||
- `src/porchlight/userid.py` (line 5)
|
||||
|
||||
**Step 1: Replace all occurrences**
|
||||
|
||||
In every `.py` file under `src/porchlight/`, replace `fastapi_oidc_op` with `porchlight`.
|
||||
|
||||
**Step 2: Verify no stale references remain**
|
||||
|
||||
```bash
|
||||
grep -r "fastapi_oidc_op" src/
|
||||
```
|
||||
|
||||
Expected: no output.
|
||||
|
||||
**Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add src/ && git commit -m "refactor: update all source imports from fastapi_oidc_op to porchlight"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 4: Update all imports in test files
|
||||
|
||||
**Files:**
|
||||
- Modify all `.py` files under `tests/` that contain `fastapi_oidc_op`
|
||||
|
||||
Files with import references to update (`fastapi_oidc_op` → `porchlight`):
|
||||
|
||||
- `tests/conftest.py` (lines 6-7)
|
||||
- `tests/test_app.py` (lines 19, 34)
|
||||
- `tests/test_config.py` (line 4)
|
||||
- `tests/test_models.py` (line 3)
|
||||
- `tests/test_userid.py` (lines 5-6)
|
||||
- `tests/e2e/setup_db.py` (lines 17-20)
|
||||
- `tests/test_auth_routes/test_last_credential_guard.py` (lines 7-8)
|
||||
- `tests/test_auth_routes/test_manage_credentials_page.py` (lines 6-7)
|
||||
- `tests/test_auth_routes/test_manage_password_credential.py` (lines 6-7)
|
||||
- `tests/test_auth_routes/test_manage_webauthn_credential.py` (lines 20-21)
|
||||
- `tests/test_auth_routes/test_password_login.py` (lines 6-7)
|
||||
- `tests/test_auth_routes/test_register_magic_link.py` (line 5)
|
||||
- `tests/test_auth_routes/test_session_deps.py` (line 6)
|
||||
- `tests/test_auth_routes/test_webauthn_login.py` (line 12)
|
||||
- `tests/test_authn/test_password.py` (line 3)
|
||||
- `tests/test_authn/test_webauthn.py` (line 21)
|
||||
- `tests/test_invite/test_service.py` (lines 7-9, 12 — note line 12 has a Path string)
|
||||
- `tests/test_oidc/test_claims.py` (lines 3-4)
|
||||
- `tests/test_oidc/test_e2e_flow.py` (lines 12-13)
|
||||
- `tests/test_oidc/test_login_oidc_redirect.py` (lines 7-8)
|
||||
- `tests/test_oidc/test_provider.py` (lines 4-5, 52)
|
||||
- `tests/test_oidc/test_token.py` (lines 9-10)
|
||||
- `tests/test_oidc/test_userinfo.py` (lines 9-10)
|
||||
- `tests/test_store/conftest.py` (lines 6, 9 — note line 9 has a Path string)
|
||||
- `tests/test_store/test_exceptions.py` (line 1)
|
||||
- `tests/test_store/test_migrations.py` (lines 5, 8 — note line 8 has a Path string)
|
||||
- `tests/test_store/test_protocols.py` (line 3)
|
||||
- `tests/test_store/test_sqlite_credential_repo.py` (lines 4-7)
|
||||
- `tests/test_store/test_sqlite_magic_link_repo.py` (lines 6-9)
|
||||
- `tests/test_store/test_sqlite_user_repo.py` (lines 4-7)
|
||||
|
||||
**Important:** Three files have `Path(...)` strings containing `"fastapi_oidc_op"` that also need updating:
|
||||
- `tests/test_invite/test_service.py` line 12: `"src" / "fastapi_oidc_op" / "store"` → `"src" / "porchlight" / "store"`
|
||||
- `tests/test_store/conftest.py` line 9: same pattern
|
||||
- `tests/test_store/test_migrations.py` line 8: same pattern
|
||||
|
||||
**Step 1: Replace all occurrences**
|
||||
|
||||
In every `.py` file under `tests/`, replace `fastapi_oidc_op` with `porchlight` (both in imports and string literals).
|
||||
|
||||
**Step 2: Verify no stale references remain**
|
||||
|
||||
```bash
|
||||
grep -r "fastapi_oidc_op" tests/
|
||||
```
|
||||
|
||||
Expected: no output.
|
||||
|
||||
**Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add tests/ && git commit -m "refactor: update all test imports from fastapi_oidc_op to porchlight"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 5: Update Dockerfile
|
||||
|
||||
**Files:**
|
||||
- Modify: `Dockerfile` (lines 29, 45)
|
||||
|
||||
**Step 1: Replace module references**
|
||||
|
||||
```
|
||||
line 29: "fastapi_oidc_op.app:create_app" → "porchlight.app:create_app"
|
||||
line 45: "fastapi_oidc_op.app:create_app" → "porchlight.app:create_app"
|
||||
```
|
||||
|
||||
**Step 2: Commit**
|
||||
|
||||
```bash
|
||||
git add Dockerfile && git commit -m "refactor: update Dockerfile for porchlight rename"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 6: Run quality gate to verify rename
|
||||
|
||||
**Step 1: Run full quality gate**
|
||||
|
||||
```bash
|
||||
./scripts/check.sh
|
||||
```
|
||||
|
||||
Expected: all 147 tests pass, ruff clean, ty clean.
|
||||
|
||||
**Step 2: Verify no remaining references**
|
||||
|
||||
```bash
|
||||
grep -r "fastapi_oidc_op" src/ tests/ Dockerfile pyproject.toml docker-compose.yml
|
||||
```
|
||||
|
||||
Expected: no output (docker-compose.yml has no package refs, only env vars with `OIDC_OP_` prefix which we keep).
|
||||
|
||||
**Step 3: If anything fails, fix and amend the relevant commit**
|
||||
|
||||
---
|
||||
|
||||
### Task 7: Extract `open_db()` context manager
|
||||
|
||||
**Files:**
|
||||
- Create: `src/porchlight/store/sqlite/db.py`
|
||||
- Test: `tests/test_store/test_db.py`
|
||||
- Modify: `src/porchlight/app.py` (refactor lifespan to use `open_db`)
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```python
|
||||
# tests/test_store/test_db.py
|
||||
from pathlib import Path
|
||||
|
||||
import aiosqlite
|
||||
import pytest
|
||||
|
||||
from porchlight.store.sqlite.db import open_db
|
||||
|
||||
MIGRATIONS_DIR = Path(__file__).resolve().parent.parent.parent / "src" / "porchlight" / "store" / "sqlite" / "migrations"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def migrations_dir() -> Path:
|
||||
return MIGRATIONS_DIR
|
||||
|
||||
|
||||
async def test_open_db_returns_connection(tmp_path: Path, migrations_dir: Path) -> None:
|
||||
db_path = str(tmp_path / "test.db")
|
||||
async with open_db(db_path, migrations_dir) as db:
|
||||
assert isinstance(db, aiosqlite.Connection)
|
||||
|
||||
|
||||
async def test_open_db_runs_migrations(tmp_path: Path, migrations_dir: Path) -> None:
|
||||
db_path = str(tmp_path / "test.db")
|
||||
async with open_db(db_path, migrations_dir) as db:
|
||||
async with db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'") as cursor:
|
||||
row = await cursor.fetchone()
|
||||
assert row is not None
|
||||
|
||||
|
||||
async def test_open_db_sets_wal_mode(tmp_path: Path, migrations_dir: Path) -> None:
|
||||
db_path = str(tmp_path / "test.db")
|
||||
async with open_db(db_path, migrations_dir) as db:
|
||||
async with db.execute("PRAGMA journal_mode") as cursor:
|
||||
row = await cursor.fetchone()
|
||||
assert row[0] == "wal"
|
||||
|
||||
|
||||
async def test_open_db_creates_parent_dirs(tmp_path: Path, migrations_dir: Path) -> None:
|
||||
db_path = str(tmp_path / "sub" / "dir" / "test.db")
|
||||
async with open_db(db_path, migrations_dir) as db:
|
||||
assert isinstance(db, aiosqlite.Connection)
|
||||
assert (tmp_path / "sub" / "dir" / "test.db").exists()
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
```bash
|
||||
pytest tests/test_store/test_db.py -v
|
||||
```
|
||||
|
||||
Expected: FAIL — `porchlight.store.sqlite.db` does not exist.
|
||||
|
||||
**Step 3: Write the implementation**
|
||||
|
||||
```python
|
||||
# src/porchlight/store/sqlite/db.py
|
||||
from collections.abc import AsyncIterator
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
|
||||
import aiosqlite
|
||||
|
||||
from porchlight.store.sqlite.migrations import run_migrations
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def open_db(db_path: str, migrations_dir: Path) -> AsyncIterator[aiosqlite.Connection]:
|
||||
"""Open a SQLite connection with WAL mode, foreign keys, and migrations applied."""
|
||||
if db_path != ":memory:":
|
||||
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
db = await aiosqlite.connect(db_path)
|
||||
try:
|
||||
db.row_factory = aiosqlite.Row
|
||||
await db.execute("PRAGMA journal_mode=WAL")
|
||||
await db.execute("PRAGMA foreign_keys=ON")
|
||||
await run_migrations(db, migrations_dir)
|
||||
yield db
|
||||
finally:
|
||||
await db.close()
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
```bash
|
||||
pytest tests/test_store/test_db.py -v
|
||||
```
|
||||
|
||||
Expected: all 4 tests PASS.
|
||||
|
||||
**Step 5: Refactor `app.py` lifespan to use `open_db`**
|
||||
|
||||
Replace the manual DB setup in `lifespan()` with:
|
||||
|
||||
```python
|
||||
from porchlight.store.sqlite.db import open_db
|
||||
|
||||
# In lifespan(), replace lines 36-43 with:
|
||||
async with open_db(settings.sqlite_path, MIGRATIONS_DIR) as db:
|
||||
# ... rest of setup using db ...
|
||||
yield
|
||||
```
|
||||
|
||||
The `open_db` context manager handles mkdir, connect, row_factory, PRAGMAs, migrations, and close.
|
||||
|
||||
**Step 6: Run full quality gate**
|
||||
|
||||
```bash
|
||||
./scripts/check.sh
|
||||
```
|
||||
|
||||
Expected: all 147 tests pass.
|
||||
|
||||
**Step 7: Commit**
|
||||
|
||||
```bash
|
||||
git add src/porchlight/store/sqlite/db.py tests/test_store/test_db.py src/porchlight/app.py
|
||||
git commit -m "refactor: extract open_db() context manager from lifespan"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 8: Create CLI module with `create-invite` command
|
||||
|
||||
**Files:**
|
||||
- Create: `src/porchlight/cli.py`
|
||||
- Test: `tests/test_cli.py`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```python
|
||||
# tests/test_cli.py
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from porchlight.store.sqlite.db import open_db
|
||||
from porchlight.store.sqlite.repositories import SQLiteMagicLinkRepository
|
||||
|
||||
MIGRATIONS_DIR = Path(__file__).resolve().parent.parent / "src" / "porchlight" / "store" / "sqlite" / "migrations"
|
||||
|
||||
|
||||
async def test_create_invite_creates_magic_link(tmp_path: Path) -> None:
|
||||
from porchlight.cli import _create_invite
|
||||
|
||||
db_path = str(tmp_path / "test.db")
|
||||
async with open_db(db_path, MIGRATIONS_DIR) as db:
|
||||
result = await _create_invite(db=db, username="alice", issuer="http://localhost:8000")
|
||||
|
||||
assert "alice" in result
|
||||
assert "http://localhost:8000/register?token=" in result
|
||||
|
||||
|
||||
async def test_create_invite_with_ttl(tmp_path: Path) -> None:
|
||||
from porchlight.cli import _create_invite
|
||||
|
||||
db_path = str(tmp_path / "test.db")
|
||||
async with open_db(db_path, MIGRATIONS_DIR) as db:
|
||||
result = await _create_invite(db=db, username="bob", issuer="http://localhost:8000", ttl=3600)
|
||||
|
||||
assert "bob" in result
|
||||
assert "http://localhost:8000/register?token=" in result
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
```bash
|
||||
pytest tests/test_cli.py -v
|
||||
```
|
||||
|
||||
Expected: FAIL — `porchlight.cli` does not exist.
|
||||
|
||||
**Step 3: Write the implementation**
|
||||
|
||||
```python
|
||||
# src/porchlight/cli.py
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
|
||||
import aiosqlite
|
||||
import typer
|
||||
|
||||
from porchlight.config import Settings
|
||||
from porchlight.invite.service import MagicLinkService
|
||||
from porchlight.store.sqlite.db import open_db
|
||||
from porchlight.store.sqlite.repositories import SQLiteMagicLinkRepository
|
||||
|
||||
PACKAGE_DIR = Path(__file__).parent
|
||||
MIGRATIONS_DIR = PACKAGE_DIR / "store" / "sqlite" / "migrations"
|
||||
|
||||
app = typer.Typer(name="porchlight", help="Porchlight OIDC Provider management CLI.")
|
||||
|
||||
|
||||
async def _create_invite(
|
||||
db: aiosqlite.Connection,
|
||||
username: str,
|
||||
issuer: str,
|
||||
ttl: int = 86400,
|
||||
) -> str:
|
||||
"""Create a magic link invite. Returns a human-readable message with the URL."""
|
||||
repo = SQLiteMagicLinkRepository(db)
|
||||
service = MagicLinkService(repo=repo, ttl=ttl)
|
||||
link = await service.create(username=username, created_by="cli")
|
||||
url = f"{issuer.rstrip('/')}/register?token={link.token}"
|
||||
return f"Invite created for: {username}\nRegistration link: {url}"
|
||||
|
||||
|
||||
@app.command()
|
||||
def create_invite(
|
||||
username: str = typer.Argument(help="Username for the new invite"),
|
||||
ttl: int = typer.Option(86400, help="Link expiry in seconds"),
|
||||
) -> None:
|
||||
"""Create a magic-link registration invite for a user."""
|
||||
settings = Settings() # type: ignore[call-arg]
|
||||
|
||||
async def _run() -> str:
|
||||
async with open_db(settings.sqlite_path, MIGRATIONS_DIR) as db:
|
||||
return await _create_invite(db=db, username=username, issuer=settings.issuer, ttl=ttl)
|
||||
|
||||
result = asyncio.run(_run())
|
||||
typer.echo(result)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
app()
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
```bash
|
||||
pytest tests/test_cli.py::test_create_invite_creates_magic_link tests/test_cli.py::test_create_invite_with_ttl -v
|
||||
```
|
||||
|
||||
Expected: PASS.
|
||||
|
||||
**Step 5: Run quality gate**
|
||||
|
||||
```bash
|
||||
./scripts/check.sh
|
||||
```
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/porchlight/cli.py tests/test_cli.py
|
||||
git commit -m "feat: add create-invite CLI command"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 9: Add `init-admin` CLI command
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/porchlight/cli.py`
|
||||
- Modify: `tests/test_cli.py`
|
||||
|
||||
**Step 1: Write the failing tests**
|
||||
|
||||
Add to `tests/test_cli.py`:
|
||||
|
||||
```python
|
||||
async def test_init_admin_creates_user_and_link(tmp_path: Path) -> None:
|
||||
from porchlight.cli import _init_admin
|
||||
|
||||
db_path = str(tmp_path / "test.db")
|
||||
async with open_db(db_path, MIGRATIONS_DIR) as db:
|
||||
result = await _init_admin(db=db, username="admin", issuer="http://localhost:8000")
|
||||
|
||||
assert "admin" in result
|
||||
assert "http://localhost:8000/register?token=" in result
|
||||
|
||||
|
||||
async def test_init_admin_user_in_admin_group(tmp_path: Path) -> None:
|
||||
from porchlight.cli import _init_admin
|
||||
|
||||
db_path = str(tmp_path / "test.db")
|
||||
async with open_db(db_path, MIGRATIONS_DIR) as db:
|
||||
await _init_admin(db=db, username="admin", issuer="http://localhost:8000")
|
||||
|
||||
from porchlight.store.sqlite.repositories import SQLiteUserRepository
|
||||
user_repo = SQLiteUserRepository(db)
|
||||
user = await user_repo.get_by_username("admin")
|
||||
|
||||
assert user is not None
|
||||
assert "admin" in user.groups
|
||||
|
||||
|
||||
async def test_init_admin_duplicate_username_fails(tmp_path: Path) -> None:
|
||||
from porchlight.cli import _init_admin
|
||||
|
||||
db_path = str(tmp_path / "test.db")
|
||||
async with open_db(db_path, MIGRATIONS_DIR) as db:
|
||||
await _init_admin(db=db, username="admin", issuer="http://localhost:8000")
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
await _init_admin(db=db, username="admin", issuer="http://localhost:8000")
|
||||
```
|
||||
|
||||
**Step 2: Run tests to verify they fail**
|
||||
|
||||
```bash
|
||||
pytest tests/test_cli.py -v -k "init_admin"
|
||||
```
|
||||
|
||||
Expected: FAIL — `_init_admin` does not exist.
|
||||
|
||||
**Step 3: Write the implementation**
|
||||
|
||||
Add to `src/porchlight/cli.py`:
|
||||
|
||||
```python
|
||||
from porchlight.models import User
|
||||
from porchlight.store.exceptions import DuplicateError
|
||||
from porchlight.store.sqlite.repositories import SQLiteMagicLinkRepository, SQLiteUserRepository
|
||||
from porchlight.userid import generate_userid
|
||||
|
||||
|
||||
async def _init_admin(
|
||||
db: aiosqlite.Connection,
|
||||
username: str,
|
||||
issuer: str,
|
||||
ttl: int = 86400,
|
||||
) -> str:
|
||||
"""Create an admin user and a registration magic link. Returns message with URL."""
|
||||
user_repo = SQLiteUserRepository(db)
|
||||
magic_link_repo = SQLiteMagicLinkRepository(db)
|
||||
service = MagicLinkService(repo=magic_link_repo, ttl=ttl)
|
||||
|
||||
user = User(userid=generate_userid(), username=username, groups=["admin"])
|
||||
try:
|
||||
await user_repo.create(user)
|
||||
except DuplicateError:
|
||||
typer.echo(f"Error: user '{username}' already exists.", err=True)
|
||||
raise typer.Exit(code=1)
|
||||
|
||||
link = await service.create(username=username, created_by="cli")
|
||||
url = f"{issuer.rstrip('/')}/register?token={link.token}"
|
||||
return f"Created admin user: {username}\nRegistration link: {url}"
|
||||
|
||||
|
||||
@app.command()
|
||||
def init_admin(
|
||||
username: str = typer.Argument(help="Username for the admin user"),
|
||||
ttl: int = typer.Option(86400, help="Registration link expiry in seconds"),
|
||||
) -> None:
|
||||
"""Create an initial admin user with a registration link."""
|
||||
settings = Settings() # type: ignore[call-arg]
|
||||
|
||||
async def _run() -> str:
|
||||
async with open_db(settings.sqlite_path, MIGRATIONS_DIR) as db:
|
||||
return await _init_admin(db=db, username=username, issuer=settings.issuer, ttl=ttl)
|
||||
|
||||
result = asyncio.run(_run())
|
||||
typer.echo(result)
|
||||
```
|
||||
|
||||
**Step 4: Run tests to verify they pass**
|
||||
|
||||
```bash
|
||||
pytest tests/test_cli.py -v
|
||||
```
|
||||
|
||||
Expected: all tests PASS.
|
||||
|
||||
**Step 5: Run quality gate**
|
||||
|
||||
```bash
|
||||
./scripts/check.sh
|
||||
```
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add src/porchlight/cli.py tests/test_cli.py
|
||||
git commit -m "feat: add init-admin CLI command"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 10: Final verification and Docker rebuild
|
||||
|
||||
**Step 1: Run full quality gate**
|
||||
|
||||
```bash
|
||||
./scripts/check.sh
|
||||
```
|
||||
|
||||
Expected: all tests pass (147 original + ~7 new CLI tests), ruff clean, ty clean.
|
||||
|
||||
**Step 2: Rebuild Docker images**
|
||||
|
||||
```bash
|
||||
docker build --target prod -t porchlight:prod .
|
||||
docker build --target dev -t porchlight:dev .
|
||||
```
|
||||
|
||||
Expected: both build successfully.
|
||||
|
||||
**Step 3: Verify dev container starts**
|
||||
|
||||
```bash
|
||||
docker compose --profile dev up -d
|
||||
curl -sf http://localhost:8000/health
|
||||
docker compose --profile dev down
|
||||
```
|
||||
|
||||
Expected: `{"status":"ok"}`
|
||||
|
||||
**Step 4: Verify CLI entry point works in prod image**
|
||||
|
||||
```bash
|
||||
docker run --rm -e OIDC_OP_ISSUER=http://localhost:8000 porchlight:prod uv run porchlight --help
|
||||
```
|
||||
|
||||
Expected: shows typer help with `create-invite` and `init-admin` commands.
|
||||
|
|
@ -1,373 +0,0 @@
|
|||
# CLI Module Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Add `create-invite` and `initial-admin` CLI commands so operators can bootstrap users and generate invite links from the terminal.
|
||||
|
||||
**Architecture:** A Typer app in `src/porchlight/cli.py` with two commands. Both open the SQLite DB directly via the existing `open_db()` context manager and use `asyncio.run()` for async operations. The `/register/{token}` route is modified to handle pre-existing users (created by `initial-admin`).
|
||||
|
||||
**Tech Stack:** Typer 0.15+, asyncio, existing SQLite storage layer
|
||||
|
||||
**Design doc:** `docs/plans/2026-02-17-cli-module-design.md`
|
||||
|
||||
---
|
||||
|
||||
### Task 1: Make `/register/{token}` handle existing users
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/porchlight/authn/routes.py:72-90`
|
||||
- Test: `tests/test_auth_routes/test_register_magic_link.py`
|
||||
|
||||
**Step 1: Write failing test for existing user registration**
|
||||
|
||||
Add to `tests/test_auth_routes/test_register_magic_link.py`:
|
||||
|
||||
```python
|
||||
async def test_register_existing_user_logs_in_and_redirects(client: AsyncClient) -> None:
|
||||
"""When initial-admin creates a user, the invite link should log them in."""
|
||||
app = client._transport.app # type: ignore[union-attr]
|
||||
magic_link_repo = app.state.magic_link_repo
|
||||
user_repo = app.state.user_repo
|
||||
|
||||
# Pre-create the user (as initial-admin would)
|
||||
from porchlight.models import User
|
||||
|
||||
user = User(userid="lusab-bansen", username="admin", groups=["admin", "users"])
|
||||
await user_repo.create(user)
|
||||
|
||||
# Create invite for the same username
|
||||
await magic_link_repo.create(
|
||||
MagicLink(
|
||||
token="admin-setup",
|
||||
username="admin",
|
||||
expires_at=datetime.now(UTC) + timedelta(hours=1),
|
||||
)
|
||||
)
|
||||
|
||||
res = await client.get("/register/admin-setup", follow_redirects=False)
|
||||
assert res.status_code in (302, 303)
|
||||
assert "/manage/credentials" in res.headers["location"]
|
||||
assert "setup=1" in res.headers["location"]
|
||||
|
||||
# Token should be marked used
|
||||
link = await magic_link_repo.get_by_token("admin-setup")
|
||||
assert link is not None
|
||||
assert link.used is True
|
||||
|
||||
# Original user should still exist with original groups
|
||||
existing = await user_repo.get_by_username("admin")
|
||||
assert existing is not None
|
||||
assert existing.userid == "lusab-bansen"
|
||||
assert "admin" in existing.groups
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `uv run python -m pytest tests/test_auth_routes/test_register_magic_link.py::test_register_existing_user_logs_in_and_redirects -v`
|
||||
Expected: FAIL -- `DuplicateError` because `user_repo.create()` tries to insert a duplicate username.
|
||||
|
||||
**Step 3: Modify `register_magic_link` to handle existing users**
|
||||
|
||||
In `src/porchlight/authn/routes.py`, replace the `register_magic_link` function:
|
||||
|
||||
```python
|
||||
@router.get("/register/{token}")
|
||||
async def register_magic_link(request: Request, token: str) -> Response:
|
||||
magic_link_service = request.app.state.magic_link_service
|
||||
user_repo = request.app.state.user_repo
|
||||
|
||||
link = await magic_link_service.validate(token)
|
||||
if link is None:
|
||||
return HTMLResponse("<p>Invalid or expired registration link.</p>", status_code=400)
|
||||
|
||||
# Check if user already exists (e.g. created by initial-admin)
|
||||
user = await user_repo.get_by_username(link.username)
|
||||
if user is None:
|
||||
userid = await generate_unique_userid(user_repo)
|
||||
user = User(userid=userid, username=link.username, groups=["users"])
|
||||
await user_repo.create(user)
|
||||
|
||||
await magic_link_service.mark_used(token)
|
||||
|
||||
request.session["userid"] = user.userid
|
||||
request.session["username"] = user.username
|
||||
|
||||
return RedirectResponse("/manage/credentials?setup=1", status_code=303)
|
||||
```
|
||||
|
||||
**Step 4: Run all registration tests**
|
||||
|
||||
Run: `uv run python -m pytest tests/test_auth_routes/test_register_magic_link.py -v`
|
||||
Expected: ALL PASS
|
||||
|
||||
**Step 5: Run full test suite to check for regressions**
|
||||
|
||||
Run: `uv run python -m pytest tests/ --ignore=tests/e2e -v`
|
||||
Expected: ALL PASS (151+ tests)
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```
|
||||
feat: allow /register/{token} to handle pre-existing users
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 2: Create CLI module with `create-invite` command
|
||||
|
||||
**Files:**
|
||||
- Create: `src/porchlight/cli.py`
|
||||
- Test: `tests/test_cli.py`
|
||||
|
||||
**Step 1: Write failing tests for `create-invite`**
|
||||
|
||||
Create `tests/test_cli.py`:
|
||||
|
||||
```python
|
||||
from unittest.mock import patch
|
||||
|
||||
from typer.testing import CliRunner
|
||||
|
||||
from porchlight.cli import app
|
||||
|
||||
runner = CliRunner()
|
||||
|
||||
|
||||
def test_create_invite_prints_url() -> None:
|
||||
"""create-invite should print a registration URL."""
|
||||
with patch.dict(
|
||||
"os.environ",
|
||||
{"OIDC_OP_ISSUER": "http://localhost:8000", "OIDC_OP_SQLITE_PATH": ":memory:"},
|
||||
):
|
||||
result = runner.invoke(app, ["create-invite", "alice"])
|
||||
assert result.exit_code == 0
|
||||
assert "http://localhost:8000/register/" in result.stdout
|
||||
|
||||
|
||||
def test_create_invite_with_note() -> None:
|
||||
"""create-invite should accept a --note flag."""
|
||||
with patch.dict(
|
||||
"os.environ",
|
||||
{"OIDC_OP_ISSUER": "http://localhost:8000", "OIDC_OP_SQLITE_PATH": ":memory:"},
|
||||
):
|
||||
result = runner.invoke(app, ["create-invite", "bob", "--note", "Welcome!"])
|
||||
assert result.exit_code == 0
|
||||
assert "http://localhost:8000/register/" in result.stdout
|
||||
```
|
||||
|
||||
**Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `uv run python -m pytest tests/test_cli.py -v`
|
||||
Expected: FAIL -- `ImportError: cannot import name 'app' from 'porchlight.cli'`
|
||||
|
||||
**Step 3: Implement `cli.py` with `create-invite`**
|
||||
|
||||
Create `src/porchlight/cli.py`:
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from typing import Annotated
|
||||
|
||||
import typer
|
||||
|
||||
from porchlight.config import Settings
|
||||
from porchlight.invite.service import MagicLinkService
|
||||
from porchlight.store.sqlite.db import open_db
|
||||
from porchlight.store.sqlite.repositories import SQLiteMagicLinkRepository
|
||||
|
||||
PACKAGE_DIR = Path(__file__).parent
|
||||
MIGRATIONS_DIR = PACKAGE_DIR / "store" / "sqlite" / "migrations"
|
||||
|
||||
app = typer.Typer(help="Porchlight OIDC Provider CLI")
|
||||
|
||||
|
||||
@app.command()
|
||||
def create_invite(
|
||||
username: str,
|
||||
note: Annotated[str | None, typer.Option(help="Optional note for the invite")] = None,
|
||||
ttl: Annotated[int | None, typer.Option(help="Link expiration in seconds")] = None,
|
||||
) -> None:
|
||||
"""Generate a magic link registration URL for a new user."""
|
||||
settings = Settings() # type: ignore[call-arg]
|
||||
asyncio.run(_create_invite(settings, username, note, ttl))
|
||||
|
||||
|
||||
async def _create_invite(settings: Settings, username: str, note: str | None, ttl: int | None) -> None:
|
||||
effective_ttl = ttl if ttl is not None else settings.invite_ttl
|
||||
async with open_db(settings.sqlite_path, MIGRATIONS_DIR) as db:
|
||||
repo = SQLiteMagicLinkRepository(db)
|
||||
service = MagicLinkService(repo=repo, ttl=effective_ttl)
|
||||
link = await service.create(username=username, note=note, created_by="cli")
|
||||
url = f"{settings.issuer}/register/{link.token}"
|
||||
typer.echo(url)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
app()
|
||||
```
|
||||
|
||||
**Step 4: Run tests to verify they pass**
|
||||
|
||||
Run: `uv run python -m pytest tests/test_cli.py -v`
|
||||
Expected: ALL PASS
|
||||
|
||||
**Step 5: Verify the entry point works**
|
||||
|
||||
Run: `uv run python -m porchlight.cli --help`
|
||||
Expected: Shows help with `create-invite` command listed.
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```
|
||||
feat: add create-invite CLI command for generating magic link URLs
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 3: Add `initial-admin` command
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/porchlight/cli.py`
|
||||
- Modify: `tests/test_cli.py`
|
||||
|
||||
**Step 1: Write failing tests for `initial-admin`**
|
||||
|
||||
Add to `tests/test_cli.py`:
|
||||
|
||||
```python
|
||||
def test_initial_admin_prints_url() -> None:
|
||||
"""initial-admin should create user and print a registration URL."""
|
||||
with patch.dict(
|
||||
"os.environ",
|
||||
{"OIDC_OP_ISSUER": "http://localhost:8000", "OIDC_OP_SQLITE_PATH": ":memory:"},
|
||||
):
|
||||
result = runner.invoke(app, ["initial-admin", "admin"])
|
||||
assert result.exit_code == 0
|
||||
assert "http://localhost:8000/register/" in result.stdout
|
||||
|
||||
|
||||
def test_initial_admin_duplicate_username_fails() -> None:
|
||||
"""initial-admin should fail if the username already exists."""
|
||||
with patch.dict(
|
||||
"os.environ",
|
||||
{"OIDC_OP_ISSUER": "http://localhost:8000", "OIDC_OP_SQLITE_PATH": ":memory:"},
|
||||
):
|
||||
# First call succeeds
|
||||
result1 = runner.invoke(app, ["initial-admin", "admin"])
|
||||
assert result1.exit_code == 0
|
||||
|
||||
# Second call with same username fails
|
||||
result2 = runner.invoke(app, ["initial-admin", "admin"])
|
||||
assert result2.exit_code == 1
|
||||
assert "already exists" in result2.stdout
|
||||
|
||||
|
||||
def test_initial_admin_custom_groups() -> None:
|
||||
"""initial-admin should accept --group flags."""
|
||||
with patch.dict(
|
||||
"os.environ",
|
||||
{"OIDC_OP_ISSUER": "http://localhost:8000", "OIDC_OP_SQLITE_PATH": ":memory:"},
|
||||
):
|
||||
result = runner.invoke(app, ["initial-admin", "admin", "--group", "superadmin", "--group", "users"])
|
||||
assert result.exit_code == 0
|
||||
assert "http://localhost:8000/register/" in result.stdout
|
||||
```
|
||||
|
||||
**Step 2: Run tests to verify they fail**
|
||||
|
||||
Run: `uv run python -m pytest tests/test_cli.py::test_initial_admin_prints_url tests/test_cli.py::test_initial_admin_duplicate_username_fails tests/test_cli.py::test_initial_admin_custom_groups -v`
|
||||
Expected: FAIL -- `initial-admin` command not found.
|
||||
|
||||
**Step 3: Implement `initial-admin` command**
|
||||
|
||||
Add imports to `src/porchlight/cli.py`:
|
||||
|
||||
```python
|
||||
from porchlight.store.sqlite.repositories import (
|
||||
SQLiteMagicLinkRepository,
|
||||
SQLiteUserRepository,
|
||||
)
|
||||
from porchlight.models import User
|
||||
from porchlight.userid import generate_unique_userid
|
||||
```
|
||||
|
||||
Add the command:
|
||||
|
||||
```python
|
||||
@app.command()
|
||||
def initial_admin(
|
||||
username: str,
|
||||
group: Annotated[list[str] | None, typer.Option(help="Groups to assign (repeatable)")] = None,
|
||||
) -> None:
|
||||
"""Bootstrap an admin user with a registration link for credential setup."""
|
||||
settings = Settings() # type: ignore[call-arg]
|
||||
asyncio.run(_initial_admin(settings, username, group))
|
||||
|
||||
|
||||
async def _initial_admin(settings: Settings, username: str, groups: list[str] | None) -> None:
|
||||
effective_groups = groups if groups is not None else ["admin", "users"]
|
||||
async with open_db(settings.sqlite_path, MIGRATIONS_DIR) as db:
|
||||
user_repo = SQLiteUserRepository(db)
|
||||
magic_link_repo = SQLiteMagicLinkRepository(db)
|
||||
|
||||
# Check if username already exists
|
||||
existing = await user_repo.get_by_username(username)
|
||||
if existing is not None:
|
||||
typer.echo(f"Error: user '{username}' already exists", err=True)
|
||||
raise typer.Exit(code=1)
|
||||
|
||||
# Create the user
|
||||
userid = await generate_unique_userid(user_repo)
|
||||
user = User(userid=userid, username=username, groups=effective_groups)
|
||||
await user_repo.create(user)
|
||||
|
||||
# Create invite link for credential setup
|
||||
service = MagicLinkService(repo=magic_link_repo, ttl=settings.invite_ttl)
|
||||
link = await service.create(username=username, note="initial admin setup", created_by="cli")
|
||||
url = f"{settings.issuer}/register/{link.token}"
|
||||
typer.echo(url)
|
||||
```
|
||||
|
||||
**Step 4: Run all CLI tests**
|
||||
|
||||
Run: `uv run python -m pytest tests/test_cli.py -v`
|
||||
Expected: ALL PASS
|
||||
|
||||
**Step 5: Run full test suite**
|
||||
|
||||
Run: `uv run python -m pytest tests/ --ignore=tests/e2e -v`
|
||||
Expected: ALL PASS
|
||||
|
||||
**Step 6: Commit**
|
||||
|
||||
```
|
||||
feat: add initial-admin CLI command for bootstrapping admin users
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 4: Run full quality check
|
||||
|
||||
**Step 1: Format and lint**
|
||||
|
||||
Run: `uv run python -m ruff format src/porchlight/cli.py tests/test_cli.py && uv run python -m ruff check src/porchlight/cli.py tests/test_cli.py --fix`
|
||||
|
||||
**Step 2: Type check**
|
||||
|
||||
Run: `uv run ty check src/porchlight/cli.py`
|
||||
|
||||
**Step 3: Run all tests**
|
||||
|
||||
Run: `uv run python -m pytest tests/ --ignore=tests/e2e -v`
|
||||
Expected: ALL PASS
|
||||
|
||||
**Step 4: Fix any issues found**
|
||||
|
||||
If any lint, type, or test failures occur, fix them before committing.
|
||||
|
||||
**Step 5: Final commit if any fixes needed**
|
||||
|
||||
```
|
||||
refactor: fix lint/type issues from CLI module changes
|
||||
```
|
||||
Loading…
Add table
Add a link
Reference in a new issue