diff --git a/go-services/docker-compose.go.yml b/go-services/docker-compose.go.yml index dfc32040..c699f522 100644 --- a/go-services/docker-compose.go.yml +++ b/go-services/docker-compose.go.yml @@ -110,6 +110,8 @@ services: READ_ONLY: "false" # owns its DB; creates schema on boot INVENTORY_SERVICE_URL: "http://inventory-service:8000" SECRET_KEY: "${SECRET_KEY}" + SHARED_SECRET: "${SHARED_SECRET}" # /ws/position plugin auth (cutover-ready) + SHARED_SECRET_LEGACY: "${SHARED_SECRET_LEGACY:-}" # Replay the Python /ws/live firehose into the ingest handlers (shadow). SHADOW_INGEST_WS: "ws://dereth-tracker:8765/ws/live" LOG_LEVEL: "INFO" diff --git a/go-services/tracker-go/auth.go b/go-services/tracker-go/auth.go index 979f3a4e..439ba27c 100644 --- a/go-services/tracker-go/auth.go +++ b/go-services/tracker-go/auth.go @@ -147,7 +147,8 @@ func isPublicPath(p string) bool { case "/login", "/logout", "/login.html", "/login-style.css", "/health": return true } - return strings.HasPrefix(p, "/icons/") + // WS endpoints authenticate inside their own handlers. + return strings.HasPrefix(p, "/icons/") || strings.HasPrefix(p, "/ws/") } func clientIP(r *http.Request) string { diff --git a/go-services/tracker-go/ingest.go b/go-services/tracker-go/ingest.go index 1ecc21af..c825094e 100644 --- a/go-services/tracker-go/ingest.go +++ b/go-services/tracker-go/ingest.go @@ -336,6 +336,14 @@ func (i *Ingestor) handleEquipmentCantrip(data map[string]any) { i.mu.Unlock() } +// clearEquipmentCantrip drops a character's cantrip overlay on plugin register +// (main.py:3106). +func (i *Ingestor) clearEquipmentCantrip(name string) { + i.mu.Lock() + delete(i.liveEquipmentCantrip, name) + i.mu.Unlock() +} + func (i *Ingestor) handleNearbyObjects(data map[string]any) { name := toStr(data["character_name"]) if name == "" { diff --git a/go-services/tracker-go/main.go b/go-services/tracker-go/main.go index bcdfdf43..652cfa20 100644 --- a/go-services/tracker-go/main.go +++ b/go-services/tracker-go/main.go @@ -36,11 +36,15 @@ type Server struct { pool *pgxpool.Pool cache *liveCache totals *totalsCache - invProxy *httputil.ReverseProxy - staticDir string - secretKey string - ingestor *Ingestor // non-nil only in ingest/shadow mode - log *slog.Logger + invProxy *httputil.ReverseProxy + staticDir string + secretKey string + sharedSecret string + sharedSecretLegacy string + ingestor *Ingestor // non-nil only in ingest/shadow mode + hub *Hub // browser /ws/live fan-out + plugins *pluginRegistry + log *slog.Logger } func main() { @@ -62,11 +66,15 @@ func main() { defer stop() srv := &Server{ - cache: newLiveCache(), - totals: newTotalsCache(), - staticDir: cfg.StaticDir, - secretKey: cfg.SecretKey, - log: logger, + cache: newLiveCache(), + totals: newTotalsCache(), + staticDir: cfg.StaticDir, + secretKey: cfg.SecretKey, + sharedSecret: cfg.SharedSecret, + sharedSecretLegacy: cfg.SharedSecretLegacy, + hub: newHub(), + plugins: newPluginRegistry(logger), + log: logger, } if cfg.SecretKey == "" { // Fail closed like the Python service: with no key, no external cookie @@ -108,7 +116,7 @@ func main() { logger.Error("SHADOW_INGEST_WS set but READ_ONLY=true; refusing to ingest into the production DB") os.Exit(1) } - srv.ingestor = newIngestor(pool, logger, nil) + srv.ingestor = newIngestor(pool, logger, srv.hub.broadcast) go srv.runShadowConsumer(ctx, cfg.IngestWS) logger.Info("shadow ingest enabled", "source", cfg.IngestWS) } @@ -154,19 +162,23 @@ type config struct { ReadOnly bool // true = read-side parity (force read-only txns); false = ingest/shadow (owns its DB) InventoryURL string // inventory-service base URL StaticDir string // directory for static assets / openissues.json - SecretKey string // session-cookie signing key (must match the Python service) - IngestWS string // optional: a /ws/live URL to shadow-ingest from (Python tracker) + SecretKey string // session-cookie signing key (must match the Python service) + SharedSecret string // plugin /ws/position auth + SharedSecretLegacy string // plugin auth rotation fallback + IngestWS string // optional: a /ws/live URL to shadow-ingest from (Python tracker) } func loadConfig() config { return config{ - Addr: ":" + envOr("PORT", "8770"), - DatabaseURL: os.Getenv("DATABASE_URL"), - ReadOnly: envOr("READ_ONLY", "true") != "false", - InventoryURL: envOr("INVENTORY_SERVICE_URL", "http://inventory-service:8000"), - StaticDir: envOr("STATIC_DIR", "static"), - SecretKey: os.Getenv("SECRET_KEY"), - IngestWS: os.Getenv("SHADOW_INGEST_WS"), + Addr: ":" + envOr("PORT", "8770"), + DatabaseURL: os.Getenv("DATABASE_URL"), + ReadOnly: envOr("READ_ONLY", "true") != "false", + InventoryURL: envOr("INVENTORY_SERVICE_URL", "http://inventory-service:8000"), + StaticDir: envOr("STATIC_DIR", "static"), + SecretKey: os.Getenv("SECRET_KEY"), + SharedSecret: os.Getenv("SHARED_SECRET"), + SharedSecretLegacy: os.Getenv("SHARED_SECRET_LEGACY"), + IngestWS: os.Getenv("SHADOW_INGEST_WS"), } } @@ -212,6 +224,10 @@ func (s *Server) registerRoutes(mux *http.ServeMux) { mux.HandleFunc("GET /issues", s.handleIssues) mux.HandleFunc("GET /me", s.handleMe) + // WebSocket servers (cutover-ready): browser fan-out + plugin ingest. + mux.HandleFunc("GET /ws/live", s.handleWSLive) + mux.HandleFunc("GET /ws/position", s.handleWSPosition) + // Inventory-service reverse proxies. s.registerProxyRoutes(mux) } @@ -261,3 +277,10 @@ func (s *statusRecorder) WriteHeader(code int) { s.status = code s.ResponseWriter.WriteHeader(code) } + +// Unwrap lets http.ResponseController (used by coder/websocket to hijack the +// connection for /ws upgrades) reach the underlying ResponseWriter through this +// logging wrapper. Without it, WebSocket handshakes fail. +func (s *statusRecorder) Unwrap() http.ResponseWriter { + return s.ResponseWriter +} diff --git a/go-services/tracker-go/shadow.go b/go-services/tracker-go/shadow.go index 49f8ea88..0545c4ed 100644 --- a/go-services/tracker-go/shadow.go +++ b/go-services/tracker-go/shadow.go @@ -46,39 +46,75 @@ func (s *Server) shadowConnect(ctx context.Context, wsURL string) error { defer c.CloseNow() c.SetReadLimit(32 << 20) // nearby_objects / dungeon_map payloads can be large - // Keepalive pings. - pingCtx, cancel := context.WithCancel(ctx) + connCtx, cancel := context.WithCancel(ctx) defer cancel() + + // Keepalive pings. go func() { t := time.NewTicker(20 * time.Second) defer t.Stop() for { select { - case <-pingCtx.Done(): + case <-connCtx.Done(): return case <-t.C: - pc, cc := context.WithTimeout(pingCtx, 10*time.Second) + pc, cc := context.WithTimeout(connCtx, 10*time.Second) _ = c.Ping(pc) cc() } } }() + // Decouple socket read from ALL processing, including JSON parsing: the read + // loop only copies raw frames onto a queue, so it drains the socket as fast + // as the network delivers. If parsing or DB-bound dispatch ran inline, the + // read would stall, the upstream /ws/live broadcast send would error, and + // Python would evict us (Read then blocks forever). A single worker + // unmarshals + dispatches in order, preserving per-char kill-delta / combat + // accumulation. + queue := make(chan []byte, 16384) + done := make(chan struct{}) + go func() { + defer close(done) + for raw := range queue { + var m map[string]any + if json.Unmarshal(raw, &m) != nil { + continue + } + s.ingestor.dispatch(connCtx, m) + } + }() + s.log.Info("shadow consumer connected; replaying /ws/live into ingest", "url", wsURL) - var n int + var n, dropped int + loopErr := s.shadowReadLoop(ctx, c, queue, &n, &dropped) + close(queue) + <-done + return loopErr +} + +func (s *Server) shadowReadLoop(ctx context.Context, c *websocket.Conn, queue chan []byte, n, dropped *int) error { for { - _, raw, err := c.Read(ctx) + // Read deadline acts as a liveness watchdog: the firehose is constant, so + // a long silence means the upstream evicted us without closing — time out + // and let runShadowConsumer reconnect. + rctx, rcancel := context.WithTimeout(ctx, 25*time.Second) + _, raw, err := c.Read(rctx) + rcancel() if err != nil { return err } - var m map[string]any - if json.Unmarshal(raw, &m) != nil { - continue + select { + case queue <- raw: + default: + *dropped++ + if *dropped%1000 == 1 { + s.log.Warn("shadow queue full; dropping messages", "dropped", *dropped) + } } - s.ingestor.dispatch(ctx, m) - n++ - if n%5000 == 0 { - s.log.Info("shadow consumer progress", "messages", n) + *n++ + if *n%5000 == 0 { + s.log.Info("shadow consumer progress", "messages", *n, "queued", len(queue), "dropped", *dropped) } } } diff --git a/go-services/tracker-go/wslive.go b/go-services/tracker-go/wslive.go new file mode 100644 index 00000000..0fe1eb27 --- /dev/null +++ b/go-services/tracker-go/wslive.go @@ -0,0 +1,185 @@ +package main + +import ( + "context" + "encoding/json" + "net/http" + "sync" + "time" + + "github.com/coder/websocket" +) + +// Hub is the browser broadcast fan-out for /ws/live, mirroring main.py's +// browser_conns + _do_broadcast: each client has an optional message-type +// filter (nil = all); a message is delivered when the filter is nil or contains +// the message's "type". Telemetry broadcasts carry no type, so only unfiltered +// clients receive them (matching Python — which is why the React map polls /live +// over HTTP rather than relying on the WS for positions). +type Hub struct { + mu sync.RWMutex + clients map[*browserClient]bool +} + +type browserClient struct { + filter map[string]bool // nil = all types + send chan []byte +} + +func newHub() *Hub { return &Hub{clients: map[*browserClient]bool{}} } + +func (h *Hub) add(c *browserClient) { + h.mu.Lock() + h.clients[c] = true + h.mu.Unlock() +} + +func (h *Hub) remove(c *browserClient) { + h.mu.Lock() + if h.clients[c] { + delete(h.clients, c) + close(c.send) + } + h.mu.Unlock() +} + +func (h *Hub) count() int { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.clients) +} + +// broadcast serializes once and delivers to matching clients. A slow client +// (full send buffer) is skipped for this message rather than blocking the +// ingest path, matching the spirit of Python's per-send timeout + eviction. +func (h *Hub) broadcast(data map[string]any) { + h.mu.RLock() + empty := len(h.clients) == 0 + h.mu.RUnlock() + if empty { + return // no browsers: skip the marshal entirely + } + msg, err := json.Marshal(data) + if err != nil { + return + } + msgType, _ := data["type"].(string) + h.mu.RLock() + for c := range h.clients { + if c.filter != nil && (msgType == "" || !c.filter[msgType]) { + continue + } + select { + case c.send <- msg: + default: + } + } + h.mu.RUnlock() +} + +func (s *Server) handleWSLive(w http.ResponseWriter, r *http.Request) { + // Auth: internal-trust (private peer + no XFF) OR a valid session cookie. + if !(r.Header.Get("X-Forwarded-For") == "" && isPrivateAddr(clientIP(r))) { + c, err := r.Cookie("session") + if err != nil || verifySessionCookie(s.secretKey, c.Value) == nil { + http.Error(w, "Not authenticated", http.StatusUnauthorized) + return + } + } + conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{InsecureSkipVerify: true}) + if err != nil { + return + } + defer conn.CloseNow() + conn.SetReadLimit(8 << 20) + + client := &browserClient{send: make(chan []byte, 256)} + s.hub.add(client) + defer s.hub.remove(client) + + ctx := r.Context() + // Writer goroutine: the only writer for this conn (serializes writes). + go func() { + for msg := range client.send { + wctx, cancel := context.WithTimeout(ctx, 5*time.Second) + err := conn.Write(wctx, websocket.MessageText, msg) + cancel() + if err != nil { + conn.CloseNow() + return + } + } + }() + + for { + _, raw, err := conn.Read(ctx) + if err != nil { + return + } + var m map[string]any + if json.Unmarshal(raw, &m) != nil { + continue + } + s.handleBrowserMessage(client, m) + } +} + +// handleBrowserMessage handles subscribe / request_dungeon_map / command +// envelopes from a browser client (main.py:3846). +func (s *Server) handleBrowserMessage(c *browserClient, m map[string]any) { + switch toStr(m["type"]) { + case "subscribe": + types := toStringSlice(m["message_types"]) + if len(types) == 0 { + c.filter = nil // all + return + } + f := make(map[string]bool, len(types)) + for _, t := range types { + f[t] = true + } + c.filter = f + return + case "request_dungeon_map": + lb := toStr(m["landblock"]) + if lb != "" && s.ingestor != nil { + if dm, ok := s.ingestor.snapshot(s.ingestor.dungeonMapCache, lb); ok { + if b, err := json.Marshal(dm); err == nil { + select { + case c.send <- b: + default: + } + } + } + } + return + } + // Command envelopes: new {player_name, command} or legacy {type:command, character_name, text}. + if pn, ok := m["player_name"].(string); ok { + if cmd, ok := m["command"].(string); ok { + s.plugins.send(pn, map[string]any{"player_name": pn, "command": cmd}) + return + } + } + if toStr(m["type"]) == "command" { + pn := toStr(m["character_name"]) + text := toStr(m["text"]) + if pn != "" { + s.plugins.send(pn, map[string]any{"player_name": pn, "command": text}) + } + } +} + +func toStringSlice(v any) []string { + arr, ok := v.([]any) + if !ok { + return nil + } + out := make([]string, 0, len(arr)) + for _, e := range arr { + if s, ok := e.(string); ok { + out = append(out, s) + } + } + return out +} diff --git a/go-services/tracker-go/wsposition.go b/go-services/tracker-go/wsposition.go new file mode 100644 index 00000000..73a075d9 --- /dev/null +++ b/go-services/tracker-go/wsposition.go @@ -0,0 +1,129 @@ +package main + +import ( + "context" + "crypto/hmac" + "encoding/json" + "log/slog" + "net/http" + "sync" + "time" + + "github.com/coder/websocket" +) + +// pluginRegistry maps character_name -> plugin connection for backend->plugin +// command routing (main.py plugin_conns). +type pluginRegistry struct { + mu sync.RWMutex + conns map[string]*websocket.Conn + log *slog.Logger +} + +func newPluginRegistry(log *slog.Logger) *pluginRegistry { + return &pluginRegistry{conns: map[string]*websocket.Conn{}, log: log} +} + +func (p *pluginRegistry) register(name string, c *websocket.Conn) { + p.mu.Lock() + p.conns[name] = c + p.mu.Unlock() +} + +// removeConn drops every name bound to this connection (on disconnect). +func (p *pluginRegistry) removeConn(c *websocket.Conn) { + p.mu.Lock() + for n, cc := range p.conns { + if cc == c { + delete(p.conns, n) + } + } + p.mu.Unlock() +} + +func (p *pluginRegistry) isConnected(name string) bool { + p.mu.RLock() + defer p.mu.RUnlock() + _, ok := p.conns[name] + return ok +} + +// send routes an opaque {player_name, command} envelope to a plugin; evicts the +// connection on write failure (main.py command-forward semantics). +func (p *pluginRegistry) send(name string, payload map[string]any) { + p.mu.RLock() + c := p.conns[name] + p.mu.RUnlock() + if c == nil { + return + } + b, _ := json.Marshal(payload) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := c.Write(ctx, websocket.MessageText, b); err != nil { + p.mu.Lock() + if p.conns[name] == c { + delete(p.conns, name) + } + p.mu.Unlock() + } +} + +// pluginAuthOK constant-time-compares the supplied secret to SHARED_SECRET (and +// the optional rotation fallback). Fails closed when unset or left at the +// placeholder, matching main.py. +func (s *Server) pluginAuthOK(key string) bool { + ok := s.sharedSecret != "" && s.sharedSecret != "your_shared_secret" && + hmac.Equal([]byte(key), []byte(s.sharedSecret)) + if !ok && s.sharedSecretLegacy != "" { + ok = hmac.Equal([]byte(key), []byte(s.sharedSecretLegacy)) + } + return ok +} + +func (s *Server) handleWSPosition(w http.ResponseWriter, r *http.Request) { + if s.ingestor == nil { + http.Error(w, "ingest disabled on this instance", http.StatusServiceUnavailable) + return + } + key := r.URL.Query().Get("secret") + if key == "" { + key = r.Header.Get("X-Plugin-Secret") + } + if !s.pluginAuthOK(key) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{InsecureSkipVerify: true}) + if err != nil { + return + } + defer conn.CloseNow() + defer s.plugins.removeConn(conn) + conn.SetReadLimit(32 << 20) + + ctx := r.Context() + for { + _, raw, err := conn.Read(ctx) + if err != nil { + return + } + var m map[string]any + if json.Unmarshal(raw, &m) != nil { + continue + } + if toStr(m["type"]) == "register" { + name := toStr(m["character_name"]) + if name == "" { + name = toStr(m["player_name"]) + } + if name != "" { + s.plugins.register(name, conn) + s.ingestor.clearEquipmentCantrip(name) + s.log.Info("plugin registered", "character", name) + } + continue + } + s.ingestor.dispatch(ctx, m) + } +}