"""Overlord Agent host-side FastAPI service. Runs OUTSIDE Docker (host-side) on port 8767. Endpoints: GET /agent/health — liveness check POST /agent/sessions/new — returns a fresh session UUID POST /agent/ask — runs claude -p with given session GET /agent/sessions/{session_id}/history — replays a session's JSONL on disk Auth: every endpoint except /health requires the same browser session cookie that dereth-tracker issues. """ from __future__ import annotations import asyncio import json import logging import os import time import uuid from collections import deque from pathlib import Path from typing import Any from fastapi import Depends, FastAPI, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel, Field from . import auth from .claude_wrapper import CLAUDE_CWD, ClaudeError, ask_claude logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) logger = logging.getLogger("agent") # Audit log — every /agent/ask request gets a JSONL line here, separate # from journald so the operator can grep without root. Set to /dev/null # to disable. Rotated externally (logrotate) if it gets big. AUDIT_LOG_PATH = Path(os.getenv("AGENT_AUDIT_LOG", "/var/log/overlord-agent/audit.jsonl")) audit_logger = logging.getLogger("agent.audit") try: AUDIT_LOG_PATH.parent.mkdir(parents=True, exist_ok=True) _h = logging.FileHandler(AUDIT_LOG_PATH) _h.setFormatter(logging.Formatter("%(message)s")) audit_logger.addHandler(_h) audit_logger.propagate = False audit_logger.setLevel(logging.INFO) except OSError as e: logger.warning("audit log path %s not writable (%s); logging only via journal", AUDIT_LOG_PATH, e) # Rate limit: per-user count over a rolling window. Defaults are generous # for a single human at a keyboard but block automated abuse. RATE_LIMIT_WINDOW_S = int(os.getenv("AGENT_RATE_WINDOW_S", "3600")) RATE_LIMIT_MAX = int(os.getenv("AGENT_RATE_MAX", "60")) # Per-user concurrent request cap (no fanning out 50 calls in parallel). CONCURRENCY_LIMIT_PER_USER = int(os.getenv("AGENT_CONCURRENCY_PER_USER", "1")) # Rolling timestamps of recent /agent/ask calls per user. _rate_state: dict[str, deque[float]] = {} # Per-user semaphores so a single user can't run multiple concurrent claude # subprocesses (each is expensive). _user_semaphores: dict[str, asyncio.Semaphore] = {} def _check_rate_limit(username: str) -> tuple[bool, int]: """Return (allowed, retry_after_seconds).""" now = time.monotonic() window = _rate_state.setdefault(username, deque()) cutoff = now - RATE_LIMIT_WINDOW_S while window and window[0] < cutoff: window.popleft() if len(window) >= RATE_LIMIT_MAX: retry_after = int(window[0] + RATE_LIMIT_WINDOW_S - now) + 1 return False, retry_after window.append(now) return True, 0 def _user_semaphore(username: str) -> asyncio.Semaphore: sem = _user_semaphores.get(username) if sem is None: sem = asyncio.Semaphore(CONCURRENCY_LIMIT_PER_USER) _user_semaphores[username] = sem return sem def _audit(event: dict[str, Any]) -> None: """Emit one JSONL line to the audit log.""" event["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) try: audit_logger.info(json.dumps(event, ensure_ascii=False)) except Exception: # noqa: BLE001 pass app = FastAPI(title="Overlord Agent", version="0.1.0") # ─── Models ────────────────────────────────────────────────────────── class AskRequest(BaseModel): session_id: str = Field( ..., description="Stable per-conversation UUID stored in browser localStorage" ) message: str = Field(..., min_length=1, max_length=10_000) class AskResponse(BaseModel): result: str session_id: str duration_ms: int num_turns: int is_error: bool class NewSessionResponse(BaseModel): session_id: str # ─── Helpers ───────────────────────────────────────────────────────── def _encode_cwd(cwd: str) -> str: """Match Claude Code's on-disk encoding for cwd → directory name. Claude Code stores sessions at ~/.claude/projects//.jsonl where non-alphanumerics in the cwd are replaced with hyphens. Example: /home/erik/MosswartOverlord → -home-erik-MosswartOverlord """ return "".join(c if c.isalnum() else "-" for c in cwd) def _sessions_dir() -> Path: return Path.home() / ".claude" / "projects" / _encode_cwd(CLAUDE_CWD) # ─── Endpoints ─────────────────────────────────────────────────────── @app.get("/agent/health") async def health() -> dict: """Liveness probe — no auth, used by deployment scripts.""" return { "status": "ok", "claude_cwd": CLAUDE_CWD, "sessions_dir_exists": _sessions_dir().exists(), } @app.post("/agent/sessions/new", response_model=NewSessionResponse) async def new_session(_user: dict = Depends(auth.require_user)) -> NewSessionResponse: """Generate a fresh session UUID. Doesn't touch disk — claude creates the JSONL file when the first message lands.""" return NewSessionResponse(session_id=str(uuid.uuid4())) @app.post("/agent/ask", response_model=AskResponse) async def agent_ask( req: AskRequest, user: dict = Depends(auth.require_user) ) -> AskResponse: """Forward a message to claude -p resuming the given session. Enforces: * Per-user rate limit (60 requests/hour by default). * Per-user concurrency cap (1 in-flight at a time by default). * Audit log of every request (JSONL). """ username = user["username"] # Rate limit BEFORE acquiring the user semaphore — cheaper to reject. allowed, retry_after = _check_rate_limit(username) if not allowed: _audit( { "event": "rate_limited", "user": username, "session_id": req.session_id, "retry_after_s": retry_after, } ) raise HTTPException( status_code=429, detail=f"Rate limit exceeded; retry in {retry_after}s", headers={"Retry-After": str(retry_after)}, ) sem = _user_semaphore(username) if sem.locked(): _audit( { "event": "concurrency_blocked", "user": username, "session_id": req.session_id, } ) raise HTTPException( status_code=429, detail="A previous question is still being processed" ) started = time.monotonic() async with sem: _audit( { "event": "ask_start", "user": username, "session_id": req.session_id, "message": req.message[:500], "message_len": len(req.message), } ) try: result = await ask_claude(req.message, req.session_id) except ClaudeError as e: elapsed_ms = int((time.monotonic() - started) * 1000) logger.warning( "claude failed user=%s session=%s err=%s", username, req.session_id, e ) _audit( { "event": "ask_error", "user": username, "session_id": req.session_id, "error": str(e)[:500], "elapsed_ms": elapsed_ms, } ) raise HTTPException(status_code=502, detail=str(e)) elapsed_ms = int((time.monotonic() - started) * 1000) logger.info( "ask user=%s session=%s turns=%d duration_ms=%d (subprocess=%dms)", username, result.session_id, result.num_turns, elapsed_ms, result.duration_ms, ) _audit( { "event": "ask_ok", "user": username, "session_id": result.session_id, "result_preview": (result.result or "")[:300], "result_len": len(result.result or ""), "turns": result.num_turns, "elapsed_ms": elapsed_ms, "subprocess_ms": result.duration_ms, "is_error": result.is_error, } ) return AskResponse( result=result.result, session_id=result.session_id, duration_ms=result.duration_ms, num_turns=result.num_turns, is_error=result.is_error, ) @app.get("/agent/sessions/{session_id}/history") async def session_history( session_id: str, _user: dict = Depends(auth.require_user) ) -> JSONResponse: """Replay a session's JSONL from ~/.claude/projects/.../.jsonl. Returns a flat array of {role, text, timestamp} for the chat window. Returns an empty array if the session file doesn't exist yet. """ # UUID sanity check to prevent path traversal — claude Code uses uuid4 try: uuid.UUID(session_id) except ValueError: raise HTTPException(status_code=400, detail="invalid session_id") path = _sessions_dir() / f"{session_id}.jsonl" if not path.is_file(): return JSONResponse({"messages": []}) messages: list[dict[str, Any]] = [] try: with path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) except json.JSONDecodeError: continue # Claude Code records turns with type=user / type=assistant. # Tool-use traffic is verbose; skip it for the chat UI. msg_type = obj.get("type") if msg_type not in ("user", "assistant"): continue msg = obj.get("message") or {} content = msg.get("content") # `content` may be a string or list[{type,text}]. if isinstance(content, str): text = content elif isinstance(content, list): text = "".join( part.get("text", "") for part in content if isinstance(part, dict) and part.get("type") == "text" ) else: text = "" if not text: continue messages.append( { "role": msg_type, "text": text, "timestamp": obj.get("timestamp"), } ) except OSError as e: logger.warning("failed to read session %s: %s", session_id, e) raise HTTPException(status_code=500, detail="failed to read session") return JSONResponse({"messages": messages}) # ─── Entrypoint ────────────────────────────────────────────────────── def main() -> None: """Run via `python -m agent.service` for local testing.""" import uvicorn uvicorn.run( "agent.service:app", host="127.0.0.1", port=8767, log_level="info", ) if __name__ == "__main__": main()