MosswartOverlord/agent/service.py
Erik 9d4c724b7f feat(agent): security hardening — systemd lockdown, rate limit, audit log
systemd unit now applies defense-in-depth:
- ProtectSystem=strict + ProtectHome=read-only (rest of FS sealed)
- ReadWritePaths only for ~/.claude (session JSONLs) and venv + audit log
- InaccessiblePaths blocks /etc/shadow, /etc/ssh, /root, ~/.ssh, shell history
- NoNewPrivileges + dropped capabilities (no setuid escalation, no caps)
- PrivateTmp, PrivateDevices, ProtectKernel*, MemoryDenyWriteExecute
- SystemCallFilter @system-service ~@privileged ~@debug ~@mount etc.
- RestrictAddressFamilies blocks raw/packet sockets

Application layer:
- Per-user rate limit 60/hour (configurable via AGENT_RATE_MAX)
- Per-user concurrency cap of 1 in-flight (no parallel claude burns)
- JSONL audit log of every /agent/ask to /var/log/overlord-agent/audit.jsonl
  Logs username, message preview, result preview, timing, errors.

Plus secrets migration: EnvironmentFile now prefers /etc/overlord/agent.env
(root:erik 0640) over /home/erik/MosswartOverlord/.env, so even the
read-only /home doesn't expose them. Falls back to old path during
transition.
2026-04-25 21:25:40 +02:00

347 lines
12 KiB
Python

"""Overlord Agent host-side FastAPI service.
Runs OUTSIDE Docker (host-side) on port 8767.
Endpoints:
GET /agent/health — liveness check
POST /agent/sessions/new — returns a fresh session UUID
POST /agent/ask — runs claude -p with given session
GET /agent/sessions/{session_id}/history
— replays a session's JSONL on disk
Auth: every endpoint except /health requires the same browser session
cookie that dereth-tracker issues.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import time
import uuid
from collections import deque
from pathlib import Path
from typing import Any
from fastapi import Depends, FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from . import auth
from .claude_wrapper import CLAUDE_CWD, ClaudeError, ask_claude
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logger = logging.getLogger("agent")
# Audit log — every /agent/ask request gets a JSONL line here, separate
# from journald so the operator can grep without root. Set to /dev/null
# to disable. Rotated externally (logrotate) if it gets big.
AUDIT_LOG_PATH = Path(os.getenv("AGENT_AUDIT_LOG", "/var/log/overlord-agent/audit.jsonl"))
audit_logger = logging.getLogger("agent.audit")
try:
AUDIT_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
_h = logging.FileHandler(AUDIT_LOG_PATH)
_h.setFormatter(logging.Formatter("%(message)s"))
audit_logger.addHandler(_h)
audit_logger.propagate = False
audit_logger.setLevel(logging.INFO)
except OSError as e:
logger.warning("audit log path %s not writable (%s); logging only via journal", AUDIT_LOG_PATH, e)
# Rate limit: per-user count over a rolling window. Defaults are generous
# for a single human at a keyboard but block automated abuse.
RATE_LIMIT_WINDOW_S = int(os.getenv("AGENT_RATE_WINDOW_S", "3600"))
RATE_LIMIT_MAX = int(os.getenv("AGENT_RATE_MAX", "60"))
# Per-user concurrent request cap (no fanning out 50 calls in parallel).
CONCURRENCY_LIMIT_PER_USER = int(os.getenv("AGENT_CONCURRENCY_PER_USER", "1"))
# Rolling timestamps of recent /agent/ask calls per user.
_rate_state: dict[str, deque[float]] = {}
# Per-user semaphores so a single user can't run multiple concurrent claude
# subprocesses (each is expensive).
_user_semaphores: dict[str, asyncio.Semaphore] = {}
def _check_rate_limit(username: str) -> tuple[bool, int]:
"""Return (allowed, retry_after_seconds)."""
now = time.monotonic()
window = _rate_state.setdefault(username, deque())
cutoff = now - RATE_LIMIT_WINDOW_S
while window and window[0] < cutoff:
window.popleft()
if len(window) >= RATE_LIMIT_MAX:
retry_after = int(window[0] + RATE_LIMIT_WINDOW_S - now) + 1
return False, retry_after
window.append(now)
return True, 0
def _user_semaphore(username: str) -> asyncio.Semaphore:
sem = _user_semaphores.get(username)
if sem is None:
sem = asyncio.Semaphore(CONCURRENCY_LIMIT_PER_USER)
_user_semaphores[username] = sem
return sem
def _audit(event: dict[str, Any]) -> None:
"""Emit one JSONL line to the audit log."""
event["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
try:
audit_logger.info(json.dumps(event, ensure_ascii=False))
except Exception: # noqa: BLE001
pass
app = FastAPI(title="Overlord Agent", version="0.1.0")
# ─── Models ──────────────────────────────────────────────────────────
class AskRequest(BaseModel):
session_id: str = Field(
..., description="Stable per-conversation UUID stored in browser localStorage"
)
message: str = Field(..., min_length=1, max_length=10_000)
class AskResponse(BaseModel):
result: str
session_id: str
duration_ms: int
num_turns: int
is_error: bool
class NewSessionResponse(BaseModel):
session_id: str
# ─── Helpers ─────────────────────────────────────────────────────────
def _encode_cwd(cwd: str) -> str:
"""Match Claude Code's on-disk encoding for cwd → directory name.
Claude Code stores sessions at ~/.claude/projects/<encoded-cwd>/<uuid>.jsonl
where non-alphanumerics in the cwd are replaced with hyphens.
Example: /home/erik/MosswartOverlord → -home-erik-MosswartOverlord
"""
return "".join(c if c.isalnum() else "-" for c in cwd)
def _sessions_dir() -> Path:
return Path.home() / ".claude" / "projects" / _encode_cwd(CLAUDE_CWD)
# ─── Endpoints ───────────────────────────────────────────────────────
@app.get("/agent/health")
async def health() -> dict:
"""Liveness probe — no auth, used by deployment scripts."""
return {
"status": "ok",
"claude_cwd": CLAUDE_CWD,
"sessions_dir_exists": _sessions_dir().exists(),
}
@app.post("/agent/sessions/new", response_model=NewSessionResponse)
async def new_session(_user: dict = Depends(auth.require_user)) -> NewSessionResponse:
"""Generate a fresh session UUID. Doesn't touch disk — claude creates the
JSONL file when the first message lands."""
return NewSessionResponse(session_id=str(uuid.uuid4()))
@app.post("/agent/ask", response_model=AskResponse)
async def agent_ask(
req: AskRequest, user: dict = Depends(auth.require_user)
) -> AskResponse:
"""Forward a message to claude -p resuming the given session.
Enforces:
* Per-user rate limit (60 requests/hour by default).
* Per-user concurrency cap (1 in-flight at a time by default).
* Audit log of every request (JSONL).
"""
username = user["username"]
# Rate limit BEFORE acquiring the user semaphore — cheaper to reject.
allowed, retry_after = _check_rate_limit(username)
if not allowed:
_audit(
{
"event": "rate_limited",
"user": username,
"session_id": req.session_id,
"retry_after_s": retry_after,
}
)
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded; retry in {retry_after}s",
headers={"Retry-After": str(retry_after)},
)
sem = _user_semaphore(username)
if sem.locked():
_audit(
{
"event": "concurrency_blocked",
"user": username,
"session_id": req.session_id,
}
)
raise HTTPException(
status_code=429, detail="A previous question is still being processed"
)
started = time.monotonic()
async with sem:
_audit(
{
"event": "ask_start",
"user": username,
"session_id": req.session_id,
"message": req.message[:500],
"message_len": len(req.message),
}
)
try:
result = await ask_claude(req.message, req.session_id)
except ClaudeError as e:
elapsed_ms = int((time.monotonic() - started) * 1000)
logger.warning(
"claude failed user=%s session=%s err=%s", username, req.session_id, e
)
_audit(
{
"event": "ask_error",
"user": username,
"session_id": req.session_id,
"error": str(e)[:500],
"elapsed_ms": elapsed_ms,
}
)
raise HTTPException(status_code=502, detail=str(e))
elapsed_ms = int((time.monotonic() - started) * 1000)
logger.info(
"ask user=%s session=%s turns=%d duration_ms=%d (subprocess=%dms)",
username,
result.session_id,
result.num_turns,
elapsed_ms,
result.duration_ms,
)
_audit(
{
"event": "ask_ok",
"user": username,
"session_id": result.session_id,
"result_preview": (result.result or "")[:300],
"result_len": len(result.result or ""),
"turns": result.num_turns,
"elapsed_ms": elapsed_ms,
"subprocess_ms": result.duration_ms,
"is_error": result.is_error,
}
)
return AskResponse(
result=result.result,
session_id=result.session_id,
duration_ms=result.duration_ms,
num_turns=result.num_turns,
is_error=result.is_error,
)
@app.get("/agent/sessions/{session_id}/history")
async def session_history(
session_id: str, _user: dict = Depends(auth.require_user)
) -> JSONResponse:
"""Replay a session's JSONL from ~/.claude/projects/.../<id>.jsonl.
Returns a flat array of {role, text, timestamp} for the chat window.
Returns an empty array if the session file doesn't exist yet.
"""
# UUID sanity check to prevent path traversal — claude Code uses uuid4
try:
uuid.UUID(session_id)
except ValueError:
raise HTTPException(status_code=400, detail="invalid session_id")
path = _sessions_dir() / f"{session_id}.jsonl"
if not path.is_file():
return JSONResponse({"messages": []})
messages: list[dict[str, Any]] = []
try:
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
# Claude Code records turns with type=user / type=assistant.
# Tool-use traffic is verbose; skip it for the chat UI.
msg_type = obj.get("type")
if msg_type not in ("user", "assistant"):
continue
msg = obj.get("message") or {}
content = msg.get("content")
# `content` may be a string or list[{type,text}].
if isinstance(content, str):
text = content
elif isinstance(content, list):
text = "".join(
part.get("text", "")
for part in content
if isinstance(part, dict) and part.get("type") == "text"
)
else:
text = ""
if not text:
continue
messages.append(
{
"role": msg_type,
"text": text,
"timestamp": obj.get("timestamp"),
}
)
except OSError as e:
logger.warning("failed to read session %s: %s", session_id, e)
raise HTTPException(status_code=500, detail="failed to read session")
return JSONResponse({"messages": messages})
# ─── Entrypoint ──────────────────────────────────────────────────────
def main() -> None:
"""Run via `python -m agent.service` for local testing."""
import uvicorn
uvicorn.run(
"agent.service:app",
host="127.0.0.1",
port=8767,
log_level="info",
)
if __name__ == "__main__":
main()