MosswartOverlord/go-services/tracker-go/shadow.go
Erik 5b2db439a3 feat(go-services): tracker share_* handlers (complete ingest) + shadow tuning
- share.go: cross-machine vital sharing (share_subscribe/unsubscribe/share_*),
  faithful port of the peer-state snapshot + plugin fan-out + /vital-sharing/peers.
  The last ingest handler — the Go tracker now handles every plugin event type.
- shadow consumer: drop the outbound keepalive ping (the firehose is never idle)
  and tighten the read-deadline watchdog to 12s for faster reconnect after the
  upstream's periodic eviction (full-firehose browser clients get evicted ~every
  90s; the watchdog recovers it, ~90% duty cycle). Production-bound /ws/position
  is unaffected (plugins connect to us; no eviction).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-24 11:27:25 +02:00

106 lines
3.3 KiB
Go

package main
import (
"context"
"encoding/json"
"time"
"github.com/coder/websocket"
)
// runShadowConsumer connects to the Python tracker's /ws/live, receives the full
// broadcast firehose (no subscribe = all types), and replays each message
// through the ingest handlers into THIS instance's own DB. This validates the
// Go ingest path against real traffic without touching production or stealing
// plugin connections. Reconnects with exponential backoff.
//
// Note: telemetry broadcasts carry no "type" field (dispatch matches by shape);
// spawn and full_inventory are NOT broadcast, so they don't arrive here (covered
// by unit tests / the future /ws/position path).
func (s *Server) runShadowConsumer(ctx context.Context, wsURL string) {
backoff := time.Second
const maxBackoff = 60 * time.Second
for ctx.Err() == nil {
err := s.shadowConnect(ctx, wsURL)
if ctx.Err() != nil {
return
}
s.log.Warn("shadow consumer disconnected; reconnecting", "err", err, "backoff", backoff.String())
select {
case <-ctx.Done():
return
case <-time.After(backoff):
}
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
func (s *Server) shadowConnect(ctx context.Context, wsURL string) error {
c, _, err := websocket.Dial(ctx, wsURL, nil)
if err != nil {
return err
}
defer c.CloseNow()
c.SetReadLimit(32 << 20) // nearby_objects / dungeon_map payloads can be large
connCtx, cancel := context.WithCancel(ctx)
defer cancel()
// No outbound keepalive ping: the firehose is constant, so the connection is
// never idle, and the read-deadline watchdog below handles dead connections.
// Decouple socket read from ALL processing, including JSON parsing: the read
// loop only copies raw frames onto a queue, so it drains the socket as fast
// as the network delivers. If parsing or DB-bound dispatch ran inline, the
// read would stall, the upstream /ws/live broadcast send would error, and
// Python would evict us (Read then blocks forever). A single worker
// unmarshals + dispatches in order, preserving per-char kill-delta / combat
// accumulation.
queue := make(chan []byte, 16384)
done := make(chan struct{})
go func() {
defer close(done)
for raw := range queue {
var m map[string]any
if json.Unmarshal(raw, &m) != nil {
continue
}
s.ingestor.dispatch(connCtx, m)
}
}()
s.log.Info("shadow consumer connected; replaying /ws/live into ingest", "url", wsURL)
var n, dropped int
loopErr := s.shadowReadLoop(ctx, c, queue, &n, &dropped)
close(queue)
<-done
return loopErr
}
func (s *Server) shadowReadLoop(ctx context.Context, c *websocket.Conn, queue chan []byte, n, dropped *int) error {
for {
// Read deadline acts as a liveness watchdog: the firehose is constant, so
// a multi-second silence means the upstream evicted us without closing —
// time out quickly and let runShadowConsumer reconnect (high duty cycle).
rctx, rcancel := context.WithTimeout(ctx, 12*time.Second)
_, raw, err := c.Read(rctx)
rcancel()
if err != nil {
return err
}
select {
case queue <- raw:
default:
*dropped++
if *dropped%1000 == 1 {
s.log.Warn("shadow queue full; dropping messages", "dropped", *dropped)
}
}
*n++
if *n%5000 == 0 {
s.log.Info("shadow consumer progress", "messages", *n, "queued", len(queue), "dropped", *dropped)
}
}
}