"""Subprocess wrapper around `claude -p` (Claude Code in headless JSON mode). Run from cwd=/home/erik/MosswartOverlord so: • Sessions persist at ~/.claude/projects/-home-erik-MosswartOverlord/.jsonl • Project-level .mcp.json is auto-loaded • CLAUDE.md in the repo root briefs the agent The `--session-id` flag both creates a new session (first call) and resumes an existing one (subsequent calls), so we don't need separate code paths. """ from __future__ import annotations import asyncio import json import logging import os from dataclasses import dataclass from pathlib import Path from typing import Any logger = logging.getLogger(__name__) # These can be overridden via env vars for non-prod testing. CLAUDE_BIN = os.getenv("CLAUDE_BIN", "/home/erik/.local/bin/claude") CLAUDE_CWD = os.getenv("CLAUDE_CWD", "/home/erik/MosswartOverlord") # Hard cap on how long a single agent turn may take. Claude Code can spin a # while when chaining many tool calls; we don't want to leave a zombie # subprocess if something gets stuck. CLAUDE_TIMEOUT_S = int(os.getenv("CLAUDE_TIMEOUT_S", "240")) @dataclass class ClaudeResult: result: str session_id: str duration_ms: int num_turns: int is_error: bool raw: dict[str, Any] class ClaudeError(RuntimeError): """Raised when the claude CLI returns a non-zero exit or unparseable output.""" def _session_exists(session_id: str) -> bool: """True if Claude Code has already persisted a JSONL for this session. Claude Code stores sessions at ~/.claude/projects//.jsonl where non-alphanumerics in the cwd are replaced with hyphens. """ encoded = "".join(c if c.isalnum() else "-" for c in CLAUDE_CWD) path = Path.home() / ".claude" / "projects" / encoded / f"{session_id}.jsonl" return path.is_file() async def ask_claude(message: str, session_id: str) -> ClaudeResult: """Send `message` to `claude -p` for `session_id`; return parsed result. On the FIRST message of a session uses `--session-id ` to create it. On subsequent messages uses `--resume ` because claude rejects `--session-id` on existing sessions ("Session ID ... is already in use"). Raises ClaudeError on subprocess failure, JSON parse failure, or timeout. """ if not Path(CLAUDE_BIN).exists(): raise ClaudeError(f"claude binary not found at {CLAUDE_BIN}") if not Path(CLAUDE_CWD).is_dir(): raise ClaudeError(f"CLAUDE_CWD does not exist: {CLAUDE_CWD}") # Whitelist only our MCP tools so Claude Code can call them without # human approval. Names follow the convention mcp____. # We deliberately omit built-in tools (Bash, Write, Edit, Read, etc.) # — the assistant doesn't need them for live-state Q&A and they'd be a # security/permissions footgun on an unattended service. allowed_tools = ",".join( [ "mcp__overlord__get_live_players", "mcp__overlord__get_recent_rares", "mcp__overlord__query_telemetry_db", "mcp__overlord__get_player_state", "mcp__overlord__get_inventory", "mcp__overlord__get_inventory_search", "mcp__overlord__search_items", "mcp__overlord__get_combat_stats", "mcp__overlord__get_equipment_cantrips", "mcp__overlord__get_quest_status", "mcp__overlord__get_server_health", "mcp__overlord__suitbuilder_search", ] ) # Pick --session-id (creates) vs --resume (continues) based on whether # the session JSONL already exists on disk. is_new = not _session_exists(session_id) session_flag = "--session-id" if is_new else "--resume" args = [ CLAUDE_BIN, "-p", session_flag, session_id, "--output-format", "json", "--allowed-tools", allowed_tools, # CRITICAL: dontAsk auto-DENIES anything outside --allowed-tools. # Do NOT use bypassPermissions here — that mode ignores the whitelist # entirely and lets the model call Bash/Write/Edit/etc. (verified # the hard way: it wrote /tmp/owned.sh when prompted to). # See https://code.claude.com/docs/en/permission-modes.md "--permission-mode", "dontAsk", ] logger.info( "claude exec: session=%s mode=%s msg_len=%d cwd=%s", session_id, "new" if is_new else "resume", len(message), CLAUDE_CWD, ) proc = await asyncio.create_subprocess_exec( *args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=CLAUDE_CWD, ) try: stdout, stderr = await asyncio.wait_for( proc.communicate(input=message.encode("utf-8")), timeout=CLAUDE_TIMEOUT_S, ) except asyncio.TimeoutError: try: proc.kill() except ProcessLookupError: pass raise ClaudeError(f"claude timed out after {CLAUDE_TIMEOUT_S}s") if proc.returncode != 0: stderr_text = stderr.decode("utf-8", "replace") # If we picked the wrong flag (e.g. JSONL deleted from disk between # our check and exec, or a never-flushed session), claude prints # "Session ID … is already in use." Re-issue with --resume. if is_new and "already in use" in stderr_text: logger.info("session %s actually exists; retrying with --resume", session_id) args2 = list(args) args2[2] = "--resume" proc2 = await asyncio.create_subprocess_exec( *args2, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=CLAUDE_CWD, ) try: stdout, stderr = await asyncio.wait_for( proc2.communicate(input=message.encode("utf-8")), timeout=CLAUDE_TIMEOUT_S, ) except asyncio.TimeoutError: try: proc2.kill() except ProcessLookupError: pass raise ClaudeError(f"claude timed out after {CLAUDE_TIMEOUT_S}s") if proc2.returncode != 0: raise ClaudeError( f"claude exited {proc2.returncode} after retry: " f"{stderr.decode('utf-8', 'replace')[:500]}" ) else: raise ClaudeError( f"claude exited {proc.returncode}: {stderr_text[:500]}" ) raw_text = stdout.decode("utf-8", "replace").strip() if not raw_text: raise ClaudeError("claude produced empty stdout") # In --output-format json mode the LAST line is the JSON envelope; some # earlier lines may be progress. Be tolerant. try: envelope = json.loads(raw_text) except json.JSONDecodeError: # Try the last non-empty line last = next( (line for line in reversed(raw_text.splitlines()) if line.strip()), "", ) try: envelope = json.loads(last) except json.JSONDecodeError as e: raise ClaudeError( f"claude stdout was not JSON: {raw_text[:500]}" ) from e return ClaudeResult( result=envelope.get("result", ""), session_id=envelope.get("session_id", session_id), duration_ms=int(envelope.get("duration_ms", 0)), num_turns=int(envelope.get("num_turns", 0)), is_error=bool(envelope.get("is_error", False)), raw=envelope, )