package main import ( "context" "encoding/json" "time" "github.com/coder/websocket" ) // runShadowConsumer connects to the Python tracker's /ws/live, receives the full // broadcast firehose (no subscribe = all types), and replays each message // through the ingest handlers into THIS instance's own DB. This validates the // Go ingest path against real traffic without touching production or stealing // plugin connections. Reconnects with exponential backoff. // // Note: telemetry broadcasts carry no "type" field (dispatch matches by shape); // spawn and full_inventory are NOT broadcast, so they don't arrive here (covered // by unit tests / the future /ws/position path). func (s *Server) runShadowConsumer(ctx context.Context, wsURL string) { backoff := time.Second const maxBackoff = 60 * time.Second for ctx.Err() == nil { err := s.shadowConnect(ctx, wsURL) if ctx.Err() != nil { return } s.log.Warn("shadow consumer disconnected; reconnecting", "err", err, "backoff", backoff.String()) select { case <-ctx.Done(): return case <-time.After(backoff): } backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } } } func (s *Server) shadowConnect(ctx context.Context, wsURL string) error { c, _, err := websocket.Dial(ctx, wsURL, nil) if err != nil { return err } defer c.CloseNow() c.SetReadLimit(32 << 20) // nearby_objects / dungeon_map payloads can be large connCtx, cancel := context.WithCancel(ctx) defer cancel() // No outbound keepalive ping: the firehose is constant, so the connection is // never idle, and the read-deadline watchdog below handles dead connections. // Decouple socket read from ALL processing, including JSON parsing: the read // loop only copies raw frames onto a queue, so it drains the socket as fast // as the network delivers. If parsing or DB-bound dispatch ran inline, the // read would stall, the upstream /ws/live broadcast send would error, and // Python would evict us (Read then blocks forever). A single worker // unmarshals + dispatches in order, preserving per-char kill-delta / combat // accumulation. queue := make(chan []byte, 16384) done := make(chan struct{}) go func() { defer close(done) for raw := range queue { var m map[string]any if json.Unmarshal(raw, &m) != nil { continue } s.ingestor.dispatch(connCtx, m) } }() s.log.Info("shadow consumer connected; replaying /ws/live into ingest", "url", wsURL) var n, dropped int loopErr := s.shadowReadLoop(ctx, c, queue, &n, &dropped) close(queue) <-done return loopErr } func (s *Server) shadowReadLoop(ctx context.Context, c *websocket.Conn, queue chan []byte, n, dropped *int) error { for { // Read deadline acts as a liveness watchdog: the firehose is constant, so // a multi-second silence means the upstream evicted us without closing — // time out quickly and let runShadowConsumer reconnect (high duty cycle). rctx, rcancel := context.WithTimeout(ctx, 12*time.Second) _, raw, err := c.Read(rctx) rcancel() if err != nil { return err } select { case queue <- raw: default: *dropped++ if *dropped%1000 == 1 { s.log.Warn("shadow queue full; dropping messages", "dropped", *dropped) } } *n++ if *n%5000 == 0 { s.log.Info("shadow consumer progress", "messages", *n, "queued", len(queue), "dropped", *dropped) } } }