Completes the Go tracker as a cutover-ready drop-in: - wslive.go: browser broadcast hub with per-client subscribe filters (nil=all), request_dungeon_map replies, and command routing; auth = internal-trust or session cookie. The ingestor broadcasts every handled event to it. - wsposition.go: plugin ingest server with X-Plugin-Secret/SHARED_SECRET auth (constant-time, fails closed, legacy fallback), register -> plugin_conns, and dispatch into the shared Ingestor. plugin registry for backend->plugin commands. - main.go: statusRecorder.Unwrap() so coder/websocket can hijack through the logging middleware (WS handshakes failed without it); /ws/ bypasses HTTP auth. Shadow consumer robustness (the harness was being evicted under the full firehose): decouple socket read from processing — the read loop only copies raw frames to a queue; a worker unmarshals + dispatches. JSON parsing in the read loop was slowing it enough that Python's broadcast send errored and evicted us (Read then blocked forever). Added a 25s read-deadline watchdog to self-heal. Validated live: shadow /live online = 73 = production; telemetry sustained ~12/s, 0 drops, no eviction; and the shadow's /ws/live re-broadcast stream is IDENTICAL to production's (TOTAL 2150=2150, every event type exact). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
185 lines
4.4 KiB
Go
185 lines
4.4 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/coder/websocket"
|
|
)
|
|
|
|
// Hub is the browser broadcast fan-out for /ws/live, mirroring main.py's
|
|
// browser_conns + _do_broadcast: each client has an optional message-type
|
|
// filter (nil = all); a message is delivered when the filter is nil or contains
|
|
// the message's "type". Telemetry broadcasts carry no type, so only unfiltered
|
|
// clients receive them (matching Python — which is why the React map polls /live
|
|
// over HTTP rather than relying on the WS for positions).
|
|
type Hub struct {
|
|
mu sync.RWMutex
|
|
clients map[*browserClient]bool
|
|
}
|
|
|
|
type browserClient struct {
|
|
filter map[string]bool // nil = all types
|
|
send chan []byte
|
|
}
|
|
|
|
func newHub() *Hub { return &Hub{clients: map[*browserClient]bool{}} }
|
|
|
|
func (h *Hub) add(c *browserClient) {
|
|
h.mu.Lock()
|
|
h.clients[c] = true
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
func (h *Hub) remove(c *browserClient) {
|
|
h.mu.Lock()
|
|
if h.clients[c] {
|
|
delete(h.clients, c)
|
|
close(c.send)
|
|
}
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
func (h *Hub) count() int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
return len(h.clients)
|
|
}
|
|
|
|
// broadcast serializes once and delivers to matching clients. A slow client
|
|
// (full send buffer) is skipped for this message rather than blocking the
|
|
// ingest path, matching the spirit of Python's per-send timeout + eviction.
|
|
func (h *Hub) broadcast(data map[string]any) {
|
|
h.mu.RLock()
|
|
empty := len(h.clients) == 0
|
|
h.mu.RUnlock()
|
|
if empty {
|
|
return // no browsers: skip the marshal entirely
|
|
}
|
|
msg, err := json.Marshal(data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
msgType, _ := data["type"].(string)
|
|
h.mu.RLock()
|
|
for c := range h.clients {
|
|
if c.filter != nil && (msgType == "" || !c.filter[msgType]) {
|
|
continue
|
|
}
|
|
select {
|
|
case c.send <- msg:
|
|
default:
|
|
}
|
|
}
|
|
h.mu.RUnlock()
|
|
}
|
|
|
|
func (s *Server) handleWSLive(w http.ResponseWriter, r *http.Request) {
|
|
// Auth: internal-trust (private peer + no XFF) OR a valid session cookie.
|
|
if !(r.Header.Get("X-Forwarded-For") == "" && isPrivateAddr(clientIP(r))) {
|
|
c, err := r.Cookie("session")
|
|
if err != nil || verifySessionCookie(s.secretKey, c.Value) == nil {
|
|
http.Error(w, "Not authenticated", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
}
|
|
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{InsecureSkipVerify: true})
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer conn.CloseNow()
|
|
conn.SetReadLimit(8 << 20)
|
|
|
|
client := &browserClient{send: make(chan []byte, 256)}
|
|
s.hub.add(client)
|
|
defer s.hub.remove(client)
|
|
|
|
ctx := r.Context()
|
|
// Writer goroutine: the only writer for this conn (serializes writes).
|
|
go func() {
|
|
for msg := range client.send {
|
|
wctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
err := conn.Write(wctx, websocket.MessageText, msg)
|
|
cancel()
|
|
if err != nil {
|
|
conn.CloseNow()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
_, raw, err := conn.Read(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
var m map[string]any
|
|
if json.Unmarshal(raw, &m) != nil {
|
|
continue
|
|
}
|
|
s.handleBrowserMessage(client, m)
|
|
}
|
|
}
|
|
|
|
// handleBrowserMessage handles subscribe / request_dungeon_map / command
|
|
// envelopes from a browser client (main.py:3846).
|
|
func (s *Server) handleBrowserMessage(c *browserClient, m map[string]any) {
|
|
switch toStr(m["type"]) {
|
|
case "subscribe":
|
|
types := toStringSlice(m["message_types"])
|
|
if len(types) == 0 {
|
|
c.filter = nil // all
|
|
return
|
|
}
|
|
f := make(map[string]bool, len(types))
|
|
for _, t := range types {
|
|
f[t] = true
|
|
}
|
|
c.filter = f
|
|
return
|
|
case "request_dungeon_map":
|
|
lb := toStr(m["landblock"])
|
|
if lb != "" && s.ingestor != nil {
|
|
if dm, ok := s.ingestor.snapshot(s.ingestor.dungeonMapCache, lb); ok {
|
|
if b, err := json.Marshal(dm); err == nil {
|
|
select {
|
|
case c.send <- b:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
// Command envelopes: new {player_name, command} or legacy {type:command, character_name, text}.
|
|
if pn, ok := m["player_name"].(string); ok {
|
|
if cmd, ok := m["command"].(string); ok {
|
|
s.plugins.send(pn, map[string]any{"player_name": pn, "command": cmd})
|
|
return
|
|
}
|
|
}
|
|
if toStr(m["type"]) == "command" {
|
|
pn := toStr(m["character_name"])
|
|
text := toStr(m["text"])
|
|
if pn != "" {
|
|
s.plugins.send(pn, map[string]any{"player_name": pn, "command": text})
|
|
}
|
|
}
|
|
}
|
|
|
|
func toStringSlice(v any) []string {
|
|
arr, ok := v.([]any)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
out := make([]string, 0, len(arr))
|
|
for _, e := range arr {
|
|
if s, ok := e.(string); ok {
|
|
out = append(out, s)
|
|
}
|
|
}
|
|
return out
|
|
}
|