package main import ( "context" "encoding/json" "time" "github.com/coder/websocket" ) // runShadowConsumer connects to the Python tracker's /ws/live, receives the full // broadcast firehose (no subscribe = all types), and replays each message // through the ingest handlers into THIS instance's own DB. This validates the // Go ingest path against real traffic without touching production or stealing // plugin connections. Reconnects with exponential backoff. // // Note: telemetry broadcasts carry no "type" field (dispatch matches by shape); // spawn and full_inventory are NOT broadcast, so they don't arrive here (covered // by unit tests / the future /ws/position path). func (s *Server) runShadowConsumer(ctx context.Context, wsURL string) { backoff := time.Second const maxBackoff = 60 * time.Second for ctx.Err() == nil { err := s.shadowConnect(ctx, wsURL) if ctx.Err() != nil { return } s.log.Warn("shadow consumer disconnected; reconnecting", "err", err, "backoff", backoff.String()) select { case <-ctx.Done(): return case <-time.After(backoff): } backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } } } func (s *Server) shadowConnect(ctx context.Context, wsURL string) error { c, _, err := websocket.Dial(ctx, wsURL, nil) if err != nil { return err } defer c.CloseNow() c.SetReadLimit(32 << 20) // nearby_objects / dungeon_map payloads can be large // Keepalive pings. pingCtx, cancel := context.WithCancel(ctx) defer cancel() go func() { t := time.NewTicker(20 * time.Second) defer t.Stop() for { select { case <-pingCtx.Done(): return case <-t.C: pc, cc := context.WithTimeout(pingCtx, 10*time.Second) _ = c.Ping(pc) cc() } } }() s.log.Info("shadow consumer connected; replaying /ws/live into ingest", "url", wsURL) var n int for { _, raw, err := c.Read(ctx) if err != nil { return err } var m map[string]any if json.Unmarshal(raw, &m) != nil { continue } s.ingestor.dispatch(ctx, m) n++ if n%5000 == 0 { s.log.Info("shadow consumer progress", "messages", n) } } }