systemd unit now applies defense-in-depth: - ProtectSystem=strict + ProtectHome=read-only (rest of FS sealed) - ReadWritePaths only for ~/.claude (session JSONLs) and venv + audit log - InaccessiblePaths blocks /etc/shadow, /etc/ssh, /root, ~/.ssh, shell history - NoNewPrivileges + dropped capabilities (no setuid escalation, no caps) - PrivateTmp, PrivateDevices, ProtectKernel*, MemoryDenyWriteExecute - SystemCallFilter @system-service ~@privileged ~@debug ~@mount etc. - RestrictAddressFamilies blocks raw/packet sockets Application layer: - Per-user rate limit 60/hour (configurable via AGENT_RATE_MAX) - Per-user concurrency cap of 1 in-flight (no parallel claude burns) - JSONL audit log of every /agent/ask to /var/log/overlord-agent/audit.jsonl Logs username, message preview, result preview, timing, errors. Plus secrets migration: EnvironmentFile now prefers /etc/overlord/agent.env (root:erik 0640) over /home/erik/MosswartOverlord/.env, so even the read-only /home doesn't expose them. Falls back to old path during transition.
347 lines
12 KiB
Python
347 lines
12 KiB
Python
"""Overlord Agent host-side FastAPI service.
|
|
|
|
Runs OUTSIDE Docker (host-side) on port 8767.
|
|
|
|
Endpoints:
|
|
GET /agent/health — liveness check
|
|
POST /agent/sessions/new — returns a fresh session UUID
|
|
POST /agent/ask — runs claude -p with given session
|
|
GET /agent/sessions/{session_id}/history
|
|
— replays a session's JSONL on disk
|
|
|
|
Auth: every endpoint except /health requires the same browser session
|
|
cookie that dereth-tracker issues.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import uuid
|
|
from collections import deque
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from fastapi import Depends, FastAPI, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
from pydantic import BaseModel, Field
|
|
|
|
from . import auth
|
|
from .claude_wrapper import CLAUDE_CWD, ClaudeError, ask_claude
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
)
|
|
logger = logging.getLogger("agent")
|
|
|
|
# Audit log — every /agent/ask request gets a JSONL line here, separate
|
|
# from journald so the operator can grep without root. Set to /dev/null
|
|
# to disable. Rotated externally (logrotate) if it gets big.
|
|
AUDIT_LOG_PATH = Path(os.getenv("AGENT_AUDIT_LOG", "/var/log/overlord-agent/audit.jsonl"))
|
|
audit_logger = logging.getLogger("agent.audit")
|
|
try:
|
|
AUDIT_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
_h = logging.FileHandler(AUDIT_LOG_PATH)
|
|
_h.setFormatter(logging.Formatter("%(message)s"))
|
|
audit_logger.addHandler(_h)
|
|
audit_logger.propagate = False
|
|
audit_logger.setLevel(logging.INFO)
|
|
except OSError as e:
|
|
logger.warning("audit log path %s not writable (%s); logging only via journal", AUDIT_LOG_PATH, e)
|
|
|
|
# Rate limit: per-user count over a rolling window. Defaults are generous
|
|
# for a single human at a keyboard but block automated abuse.
|
|
RATE_LIMIT_WINDOW_S = int(os.getenv("AGENT_RATE_WINDOW_S", "3600"))
|
|
RATE_LIMIT_MAX = int(os.getenv("AGENT_RATE_MAX", "60"))
|
|
# Per-user concurrent request cap (no fanning out 50 calls in parallel).
|
|
CONCURRENCY_LIMIT_PER_USER = int(os.getenv("AGENT_CONCURRENCY_PER_USER", "1"))
|
|
|
|
# Rolling timestamps of recent /agent/ask calls per user.
|
|
_rate_state: dict[str, deque[float]] = {}
|
|
# Per-user semaphores so a single user can't run multiple concurrent claude
|
|
# subprocesses (each is expensive).
|
|
_user_semaphores: dict[str, asyncio.Semaphore] = {}
|
|
|
|
|
|
def _check_rate_limit(username: str) -> tuple[bool, int]:
|
|
"""Return (allowed, retry_after_seconds)."""
|
|
now = time.monotonic()
|
|
window = _rate_state.setdefault(username, deque())
|
|
cutoff = now - RATE_LIMIT_WINDOW_S
|
|
while window and window[0] < cutoff:
|
|
window.popleft()
|
|
if len(window) >= RATE_LIMIT_MAX:
|
|
retry_after = int(window[0] + RATE_LIMIT_WINDOW_S - now) + 1
|
|
return False, retry_after
|
|
window.append(now)
|
|
return True, 0
|
|
|
|
|
|
def _user_semaphore(username: str) -> asyncio.Semaphore:
|
|
sem = _user_semaphores.get(username)
|
|
if sem is None:
|
|
sem = asyncio.Semaphore(CONCURRENCY_LIMIT_PER_USER)
|
|
_user_semaphores[username] = sem
|
|
return sem
|
|
|
|
|
|
def _audit(event: dict[str, Any]) -> None:
|
|
"""Emit one JSONL line to the audit log."""
|
|
event["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
|
try:
|
|
audit_logger.info(json.dumps(event, ensure_ascii=False))
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
|
|
|
|
app = FastAPI(title="Overlord Agent", version="0.1.0")
|
|
|
|
|
|
# ─── Models ──────────────────────────────────────────────────────────
|
|
|
|
|
|
class AskRequest(BaseModel):
|
|
session_id: str = Field(
|
|
..., description="Stable per-conversation UUID stored in browser localStorage"
|
|
)
|
|
message: str = Field(..., min_length=1, max_length=10_000)
|
|
|
|
|
|
class AskResponse(BaseModel):
|
|
result: str
|
|
session_id: str
|
|
duration_ms: int
|
|
num_turns: int
|
|
is_error: bool
|
|
|
|
|
|
class NewSessionResponse(BaseModel):
|
|
session_id: str
|
|
|
|
|
|
# ─── Helpers ─────────────────────────────────────────────────────────
|
|
|
|
|
|
def _encode_cwd(cwd: str) -> str:
|
|
"""Match Claude Code's on-disk encoding for cwd → directory name.
|
|
|
|
Claude Code stores sessions at ~/.claude/projects/<encoded-cwd>/<uuid>.jsonl
|
|
where non-alphanumerics in the cwd are replaced with hyphens.
|
|
Example: /home/erik/MosswartOverlord → -home-erik-MosswartOverlord
|
|
"""
|
|
return "".join(c if c.isalnum() else "-" for c in cwd)
|
|
|
|
|
|
def _sessions_dir() -> Path:
|
|
return Path.home() / ".claude" / "projects" / _encode_cwd(CLAUDE_CWD)
|
|
|
|
|
|
# ─── Endpoints ───────────────────────────────────────────────────────
|
|
|
|
|
|
@app.get("/agent/health")
|
|
async def health() -> dict:
|
|
"""Liveness probe — no auth, used by deployment scripts."""
|
|
return {
|
|
"status": "ok",
|
|
"claude_cwd": CLAUDE_CWD,
|
|
"sessions_dir_exists": _sessions_dir().exists(),
|
|
}
|
|
|
|
|
|
@app.post("/agent/sessions/new", response_model=NewSessionResponse)
|
|
async def new_session(_user: dict = Depends(auth.require_user)) -> NewSessionResponse:
|
|
"""Generate a fresh session UUID. Doesn't touch disk — claude creates the
|
|
JSONL file when the first message lands."""
|
|
return NewSessionResponse(session_id=str(uuid.uuid4()))
|
|
|
|
|
|
@app.post("/agent/ask", response_model=AskResponse)
|
|
async def agent_ask(
|
|
req: AskRequest, user: dict = Depends(auth.require_user)
|
|
) -> AskResponse:
|
|
"""Forward a message to claude -p resuming the given session.
|
|
|
|
Enforces:
|
|
* Per-user rate limit (60 requests/hour by default).
|
|
* Per-user concurrency cap (1 in-flight at a time by default).
|
|
* Audit log of every request (JSONL).
|
|
"""
|
|
username = user["username"]
|
|
|
|
# Rate limit BEFORE acquiring the user semaphore — cheaper to reject.
|
|
allowed, retry_after = _check_rate_limit(username)
|
|
if not allowed:
|
|
_audit(
|
|
{
|
|
"event": "rate_limited",
|
|
"user": username,
|
|
"session_id": req.session_id,
|
|
"retry_after_s": retry_after,
|
|
}
|
|
)
|
|
raise HTTPException(
|
|
status_code=429,
|
|
detail=f"Rate limit exceeded; retry in {retry_after}s",
|
|
headers={"Retry-After": str(retry_after)},
|
|
)
|
|
|
|
sem = _user_semaphore(username)
|
|
if sem.locked():
|
|
_audit(
|
|
{
|
|
"event": "concurrency_blocked",
|
|
"user": username,
|
|
"session_id": req.session_id,
|
|
}
|
|
)
|
|
raise HTTPException(
|
|
status_code=429, detail="A previous question is still being processed"
|
|
)
|
|
|
|
started = time.monotonic()
|
|
async with sem:
|
|
_audit(
|
|
{
|
|
"event": "ask_start",
|
|
"user": username,
|
|
"session_id": req.session_id,
|
|
"message": req.message[:500],
|
|
"message_len": len(req.message),
|
|
}
|
|
)
|
|
try:
|
|
result = await ask_claude(req.message, req.session_id)
|
|
except ClaudeError as e:
|
|
elapsed_ms = int((time.monotonic() - started) * 1000)
|
|
logger.warning(
|
|
"claude failed user=%s session=%s err=%s", username, req.session_id, e
|
|
)
|
|
_audit(
|
|
{
|
|
"event": "ask_error",
|
|
"user": username,
|
|
"session_id": req.session_id,
|
|
"error": str(e)[:500],
|
|
"elapsed_ms": elapsed_ms,
|
|
}
|
|
)
|
|
raise HTTPException(status_code=502, detail=str(e))
|
|
|
|
elapsed_ms = int((time.monotonic() - started) * 1000)
|
|
logger.info(
|
|
"ask user=%s session=%s turns=%d duration_ms=%d (subprocess=%dms)",
|
|
username,
|
|
result.session_id,
|
|
result.num_turns,
|
|
elapsed_ms,
|
|
result.duration_ms,
|
|
)
|
|
_audit(
|
|
{
|
|
"event": "ask_ok",
|
|
"user": username,
|
|
"session_id": result.session_id,
|
|
"result_preview": (result.result or "")[:300],
|
|
"result_len": len(result.result or ""),
|
|
"turns": result.num_turns,
|
|
"elapsed_ms": elapsed_ms,
|
|
"subprocess_ms": result.duration_ms,
|
|
"is_error": result.is_error,
|
|
}
|
|
)
|
|
|
|
return AskResponse(
|
|
result=result.result,
|
|
session_id=result.session_id,
|
|
duration_ms=result.duration_ms,
|
|
num_turns=result.num_turns,
|
|
is_error=result.is_error,
|
|
)
|
|
|
|
|
|
@app.get("/agent/sessions/{session_id}/history")
|
|
async def session_history(
|
|
session_id: str, _user: dict = Depends(auth.require_user)
|
|
) -> JSONResponse:
|
|
"""Replay a session's JSONL from ~/.claude/projects/.../<id>.jsonl.
|
|
|
|
Returns a flat array of {role, text, timestamp} for the chat window.
|
|
Returns an empty array if the session file doesn't exist yet.
|
|
"""
|
|
# UUID sanity check to prevent path traversal — claude Code uses uuid4
|
|
try:
|
|
uuid.UUID(session_id)
|
|
except ValueError:
|
|
raise HTTPException(status_code=400, detail="invalid session_id")
|
|
|
|
path = _sessions_dir() / f"{session_id}.jsonl"
|
|
if not path.is_file():
|
|
return JSONResponse({"messages": []})
|
|
|
|
messages: list[dict[str, Any]] = []
|
|
try:
|
|
with path.open("r", encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
# Claude Code records turns with type=user / type=assistant.
|
|
# Tool-use traffic is verbose; skip it for the chat UI.
|
|
msg_type = obj.get("type")
|
|
if msg_type not in ("user", "assistant"):
|
|
continue
|
|
msg = obj.get("message") or {}
|
|
content = msg.get("content")
|
|
# `content` may be a string or list[{type,text}].
|
|
if isinstance(content, str):
|
|
text = content
|
|
elif isinstance(content, list):
|
|
text = "".join(
|
|
part.get("text", "")
|
|
for part in content
|
|
if isinstance(part, dict) and part.get("type") == "text"
|
|
)
|
|
else:
|
|
text = ""
|
|
if not text:
|
|
continue
|
|
messages.append(
|
|
{
|
|
"role": msg_type,
|
|
"text": text,
|
|
"timestamp": obj.get("timestamp"),
|
|
}
|
|
)
|
|
except OSError as e:
|
|
logger.warning("failed to read session %s: %s", session_id, e)
|
|
raise HTTPException(status_code=500, detail="failed to read session")
|
|
|
|
return JSONResponse({"messages": messages})
|
|
|
|
|
|
# ─── Entrypoint ──────────────────────────────────────────────────────
|
|
|
|
|
|
def main() -> None:
|
|
"""Run via `python -m agent.service` for local testing."""
|
|
import uvicorn
|
|
|
|
uvicorn.run(
|
|
"agent.service:app",
|
|
host="127.0.0.1",
|
|
port=8767,
|
|
log_level="info",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|