Completes the Go tracker as a cutover-ready drop-in: - wslive.go: browser broadcast hub with per-client subscribe filters (nil=all), request_dungeon_map replies, and command routing; auth = internal-trust or session cookie. The ingestor broadcasts every handled event to it. - wsposition.go: plugin ingest server with X-Plugin-Secret/SHARED_SECRET auth (constant-time, fails closed, legacy fallback), register -> plugin_conns, and dispatch into the shared Ingestor. plugin registry for backend->plugin commands. - main.go: statusRecorder.Unwrap() so coder/websocket can hijack through the logging middleware (WS handshakes failed without it); /ws/ bypasses HTTP auth. Shadow consumer robustness (the harness was being evicted under the full firehose): decouple socket read from processing — the read loop only copies raw frames to a queue; a worker unmarshals + dispatches. JSON parsing in the read loop was slowing it enough that Python's broadcast send errored and evicted us (Read then blocked forever). Added a 25s read-deadline watchdog to self-heal. Validated live: shadow /live online = 73 = production; telemetry sustained ~12/s, 0 drops, no eviction; and the shadow's /ws/live re-broadcast stream is IDENTICAL to production's (TOTAL 2150=2150, every event type exact). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
120 lines
3.4 KiB
Go
120 lines
3.4 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/coder/websocket"
|
|
)
|
|
|
|
// runShadowConsumer connects to the Python tracker's /ws/live, receives the full
|
|
// broadcast firehose (no subscribe = all types), and replays each message
|
|
// through the ingest handlers into THIS instance's own DB. This validates the
|
|
// Go ingest path against real traffic without touching production or stealing
|
|
// plugin connections. Reconnects with exponential backoff.
|
|
//
|
|
// Note: telemetry broadcasts carry no "type" field (dispatch matches by shape);
|
|
// spawn and full_inventory are NOT broadcast, so they don't arrive here (covered
|
|
// by unit tests / the future /ws/position path).
|
|
func (s *Server) runShadowConsumer(ctx context.Context, wsURL string) {
|
|
backoff := time.Second
|
|
const maxBackoff = 60 * time.Second
|
|
for ctx.Err() == nil {
|
|
err := s.shadowConnect(ctx, wsURL)
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
s.log.Warn("shadow consumer disconnected; reconnecting", "err", err, "backoff", backoff.String())
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(backoff):
|
|
}
|
|
backoff *= 2
|
|
if backoff > maxBackoff {
|
|
backoff = maxBackoff
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) shadowConnect(ctx context.Context, wsURL string) error {
|
|
c, _, err := websocket.Dial(ctx, wsURL, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer c.CloseNow()
|
|
c.SetReadLimit(32 << 20) // nearby_objects / dungeon_map payloads can be large
|
|
|
|
connCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// Keepalive pings.
|
|
go func() {
|
|
t := time.NewTicker(20 * time.Second)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-connCtx.Done():
|
|
return
|
|
case <-t.C:
|
|
pc, cc := context.WithTimeout(connCtx, 10*time.Second)
|
|
_ = c.Ping(pc)
|
|
cc()
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Decouple socket read from ALL processing, including JSON parsing: the read
|
|
// loop only copies raw frames onto a queue, so it drains the socket as fast
|
|
// as the network delivers. If parsing or DB-bound dispatch ran inline, the
|
|
// read would stall, the upstream /ws/live broadcast send would error, and
|
|
// Python would evict us (Read then blocks forever). A single worker
|
|
// unmarshals + dispatches in order, preserving per-char kill-delta / combat
|
|
// accumulation.
|
|
queue := make(chan []byte, 16384)
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer close(done)
|
|
for raw := range queue {
|
|
var m map[string]any
|
|
if json.Unmarshal(raw, &m) != nil {
|
|
continue
|
|
}
|
|
s.ingestor.dispatch(connCtx, m)
|
|
}
|
|
}()
|
|
|
|
s.log.Info("shadow consumer connected; replaying /ws/live into ingest", "url", wsURL)
|
|
var n, dropped int
|
|
loopErr := s.shadowReadLoop(ctx, c, queue, &n, &dropped)
|
|
close(queue)
|
|
<-done
|
|
return loopErr
|
|
}
|
|
|
|
func (s *Server) shadowReadLoop(ctx context.Context, c *websocket.Conn, queue chan []byte, n, dropped *int) error {
|
|
for {
|
|
// Read deadline acts as a liveness watchdog: the firehose is constant, so
|
|
// a long silence means the upstream evicted us without closing — time out
|
|
// and let runShadowConsumer reconnect.
|
|
rctx, rcancel := context.WithTimeout(ctx, 25*time.Second)
|
|
_, raw, err := c.Read(rctx)
|
|
rcancel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
select {
|
|
case queue <- raw:
|
|
default:
|
|
*dropped++
|
|
if *dropped%1000 == 1 {
|
|
s.log.Warn("shadow queue full; dropping messages", "dropped", *dropped)
|
|
}
|
|
}
|
|
*n++
|
|
if *n%5000 == 0 {
|
|
s.log.Info("shadow consumer progress", "messages", *n, "queued", len(queue), "dropped", *dropped)
|
|
}
|
|
}
|
|
}
|