MosswartOverlord/agent/mcp_overlord.py
Erik 79cf88d3f7 feat(agent): Phase 1 — chat-window AI assistant via Claude Code subprocess
Adds an in-dashboard AI assistant that answers questions about live game
state. Designed reactively (no background loops) — every user message in
the chat window or via /api/agent/ask runs one `claude -p` invocation.

Architecture:
- New host-side FastAPI service (agent/) on 127.0.0.1:8767, OUTSIDE the
  dereth-tracker Docker container because `claude` and ~/.claude
  credentials live on the host.
- nginx routes /api/agent/* to the host service.
- The same browser session cookie the tracker issues authenticates
  agent requests (shared SECRET_KEY).
- The agent shells out to `claude -p --session-id <uuid>` with
  cwd=/home/erik/MosswartOverlord. Sessions persist as JSONL on disk
  via Claude Code's built-in machinery.
- An MCP stdio server (agent/mcp_overlord.py) exposes tools to Claude:
  get_live_players, get_recent_rares, query_telemetry_db (read-only,
  parsed by sqlglot to reject DML/DDL), get_player_state, get_inventory,
  get_inventory_search, get_combat_stats, get_equipment_cantrips,
  get_quest_status, get_server_health, suitbuilder_search.
- Read-only PG role (overlord_agent_ro) is the second line of defense
  on the SQL tool — even a parser bypass can't mutate.

Frontend:
- AgentWindow.tsx — draggable chat window with localStorage-pinned
  session UUID, "New Chat" button, on-mount rehydration from
  /agent/sessions/{id}/history (parses Claude Code's JSONL).
- Wired into WindowRenderer + Sidebar (🤖 Assistant button).

Operational:
- systemd unit (overlord-agent.service) + install.sh.
- agent/README.md documents env vars, deploy flow, smoke tests.
- nginx/overlord.conf gets a new /api/agent/ location with 180s timeout.
- CLAUDE.md gets an "Overlord Assistant Mode" section briefing the
  agent on which tools to use and how to behave.

NOT YET DEPLOYED — server still needs:
1. Apply agent/sql/0001_overlord_agent_ro.sql + ALTER ROLE password
2. Add AGENT_DB_DSN to /home/erik/MosswartOverlord/.env
3. bash agent/install.sh (creates venv, installs unit, starts service)
4. sudo cp /home/erik/MosswartOverlord/nginx/overlord.conf to nginx + reload

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-25 20:43:59 +02:00

262 lines
9.8 KiB
Python

"""MCP stdio server exposing Overlord data to Claude Code.
Configured via .mcp.json at the repo root, which Claude Code auto-loads
when invoked with cwd=/home/erik/MosswartOverlord. Tool implementations
live in tools.py — this file is just MCP protocol plumbing.
Run directly with:
python3 /home/erik/MosswartOverlord/agent/mcp_overlord.py
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
from . import tools as T
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s mcp_overlord: %(message)s",
)
logger = logging.getLogger("mcp_overlord")
server: Server = Server("overlord")
# ─── Tool registry ──────────────────────────────────────────────────
#
# Each entry: name → (description, JSON schema, callable async fn).
# We register them with @server.list_tools / @server.call_tool below.
TOOL_DEFS: dict[str, dict[str, Any]] = {
"get_live_players": {
"description": (
"Return active characters seen in the last ~30 seconds with their "
"current position, kills, KPH, vitae, online time, and VTank state. "
"Use this for any 'who is online right now / what is X doing' question."
),
"schema": {"type": "object", "properties": {}},
"fn": lambda _args: T.get_live_players(),
},
"get_recent_rares": {
"description": (
"Return rare item finds from the last N hours, newest first. "
"Use for questions about recent drops, who is finding rares, or "
"rare-rate analysis. Defaults to 24 hours, max 30 days."
),
"schema": {
"type": "object",
"properties": {
"hours": {
"type": "integer",
"minimum": 1,
"maximum": 720,
"default": 24,
},
"limit": {
"type": "integer",
"minimum": 1,
"maximum": 200,
"default": 100,
},
},
},
"fn": lambda args: T.get_recent_rares(
hours=int(args.get("hours", 24)),
limit=int(args.get("limit", 100)),
),
},
"query_telemetry_db": {
"description": (
"Run a read-only SQL query against the telemetry database (TimescaleDB). "
"Only SELECT / WITH statements are accepted; any DML or DDL is rejected. "
"Useful for questions that aren't covered by the other tools — top-N "
"lists, custom aggregations, time-window comparisons. "
"Available tables include: telemetry_events (hypertable, 30d retention), "
"rare_events, spawn_events (hypertable, 7d retention), portals, "
"char_stats, rare_stats, rare_stats_sessions, character_stats, "
"combat_stats, combat_stats_sessions, server_status. "
"The query has a 10s timeout and returns at most 200 rows."
),
"schema": {
"type": "object",
"required": ["sql"],
"properties": {
"sql": {
"type": "string",
"description": "A single PostgreSQL SELECT or WITH ... SELECT statement.",
}
},
},
"fn": lambda args: T.query_telemetry_db(str(args["sql"])),
},
"get_player_state": {
"description": (
"Combined snapshot for ONE character: live telemetry (if online) "
"+ full character stats (attributes, skills, augmentations). "
"Use this for questions like 'what is X doing right now' or 'show me X's stats'."
),
"schema": {
"type": "object",
"required": ["character_name"],
"properties": {
"character_name": {"type": "string"},
},
},
"fn": lambda args: T.get_player_state(str(args["character_name"])),
},
"get_inventory": {
"description": (
"Full inventory listing for one character — every item with name, "
"icon, container, equipped slot, spells, material, tinkers, etc. "
"Large response — prefer get_inventory_search for narrow queries."
),
"schema": {
"type": "object",
"required": ["character_name"],
"properties": {"character_name": {"type": "string"}},
},
"fn": lambda args: T.get_inventory(str(args["character_name"])),
},
"get_inventory_search": {
"description": (
"Filtered inventory search for one character. Pass filter query "
"params as the `filters` object. Common filters: name (substring), "
"armor_level_min, armor_level_max, material, item_set, has_spell. "
"Returns matching items in the same shape as get_inventory."
),
"schema": {
"type": "object",
"required": ["character_name"],
"properties": {
"character_name": {"type": "string"},
"filters": {
"type": "object",
"description": "Query params dict, e.g. {\"name\": \"pearl\", \"armor_level_min\": 500}",
},
},
},
"fn": lambda args: T.get_inventory_search(
str(args["character_name"]), args.get("filters") or {}
),
},
"get_combat_stats": {
"description": (
"Lifetime + session combat stats for one character. Includes total "
"damage given/received, per-element offense/defense breakdown, kill "
"counts, and aetheria surge counts."
),
"schema": {
"type": "object",
"required": ["character_name"],
"properties": {"character_name": {"type": "string"}},
},
"fn": lambda args: T.get_combat_stats(str(args["character_name"])),
},
"get_equipment_cantrips": {
"description": (
"Currently-equipped items for a character along with their active "
"cantrip/spell state. Useful for 'what is X wearing' or 'is X "
"running their suit' questions."
),
"schema": {
"type": "object",
"required": ["character_name"],
"properties": {"character_name": {"type": "string"}},
},
"fn": lambda args: T.get_equipment_cantrips(str(args["character_name"])),
},
"get_quest_status": {
"description": (
"Active quest timers and progress across ALL characters. Returns "
"for each character which quests are READY vs counting down."
),
"schema": {"type": "object", "properties": {}},
"fn": lambda _args: T.get_quest_status(),
},
"get_server_health": {
"description": (
"Current Coldeve game-server status: up/down, latency in ms, "
"current player count from TreeStats.net, total uptime. Updated "
"every 30 seconds in the background."
),
"schema": {"type": "object", "properties": {}},
"fn": lambda _args: T.get_server_health(),
},
"suitbuilder_search": {
"description": (
"Run a constraint-satisfaction armor optimization across all "
"characters' inventories ('mules'). Drives the same suitbuilder "
"the /suitbuilder.html page uses. Pass the same params dict the "
"page sends — see /suitbuilder.html JS for the schema. The search "
"is SSE-streaming on the backend; this tool collects until done "
"and returns the final suit(s) plus the last few phase events. "
"Can take up to 5 minutes for complex constraints — only call "
"when the user explicitly asks for an optimization run."
),
"schema": {
"type": "object",
"required": ["params"],
"properties": {
"params": {
"type": "object",
"description": "Suitbuilder request body (characters, locked slots, set constraints, etc.)",
},
},
},
"fn": lambda args: T.suitbuilder_search(args.get("params") or {}),
},
}
# ─── MCP protocol wiring ────────────────────────────────────────────
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(name=name, description=defn["description"], inputSchema=defn["schema"])
for name, defn in TOOL_DEFS.items()
]
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
if name not in TOOL_DEFS:
return [TextContent(type="text", text=f"unknown tool: {name}")]
fn = TOOL_DEFS[name]["fn"]
try:
result = await fn(arguments or {})
except T.SqlNotAllowed as e:
return [TextContent(type="text", text=f"REJECTED: {e}")]
except Exception as e: # noqa: BLE001
logger.exception("tool %s failed", name)
return [TextContent(type="text", text=f"ERROR: {type(e).__name__}: {e}")]
text = json.dumps(result, default=str, ensure_ascii=False, indent=2)
return [TextContent(type="text", text=text)]
async def _run() -> None:
logger.info("starting MCP stdio server (overlord)")
try:
async with stdio_server() as (reader, writer):
await server.run(reader, writer, server.create_initialization_options())
finally:
await T.shutdown()
def main() -> None:
asyncio.run(_run())
if __name__ == "__main__":
main()