Implements the plugin event handlers (the /ws/position write logic) as a shared Ingestor, validated against real traffic by replaying Python's /ws/live firehose into an isolated dereth_go DB (no production write, no plugin stolen). - ingest.go: faithful ports of telemetry (kill-delta -> char_stats, server received_at stamp), rare (rare_stats/rare_stats_sessions/rare_events), portal (coord upsert), character_stats (stats_data JSONB subset + upsert), spawn, and the memory-only handlers (vitals/quest/equipment_cantrip/nearby/dungeon). In -memory live state + read-side overlay accessors. - shadow.go: coder/websocket consumer of /ws/live -> Ingestor.dispatch (telemetry matched by shape since its broadcast has no type field). - main.go/store.go: ingest mode (READ_ONLY=false + SHADOW_INGEST_WS) wires the ingestor; read handlers (/character-stats, /equipment-cantrip, /quest-status) now consult the live overlay first, like Python. - compose: shadow instance ingests ws://dereth-tracker:8765/ws/live. Validated live: dereth_go has 73 distinct telemetry chars; shadow /live online set == production (73=73); character_stats 5/5 exact byte-match (0 mismatch); char_stats kill-deltas + portals accumulating. compare/compare_ingest.py. Deferred to next pass: combat_stats (delta/merge), share_*, the /ws/position + /ws/live servers (for cutover). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
84 lines
2.1 KiB
Go
84 lines
2.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/coder/websocket"
|
|
)
|
|
|
|
// runShadowConsumer connects to the Python tracker's /ws/live, receives the full
|
|
// broadcast firehose (no subscribe = all types), and replays each message
|
|
// through the ingest handlers into THIS instance's own DB. This validates the
|
|
// Go ingest path against real traffic without touching production or stealing
|
|
// plugin connections. Reconnects with exponential backoff.
|
|
//
|
|
// Note: telemetry broadcasts carry no "type" field (dispatch matches by shape);
|
|
// spawn and full_inventory are NOT broadcast, so they don't arrive here (covered
|
|
// by unit tests / the future /ws/position path).
|
|
func (s *Server) runShadowConsumer(ctx context.Context, wsURL string) {
|
|
backoff := time.Second
|
|
const maxBackoff = 60 * time.Second
|
|
for ctx.Err() == nil {
|
|
err := s.shadowConnect(ctx, wsURL)
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
s.log.Warn("shadow consumer disconnected; reconnecting", "err", err, "backoff", backoff.String())
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(backoff):
|
|
}
|
|
backoff *= 2
|
|
if backoff > maxBackoff {
|
|
backoff = maxBackoff
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) shadowConnect(ctx context.Context, wsURL string) error {
|
|
c, _, err := websocket.Dial(ctx, wsURL, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer c.CloseNow()
|
|
c.SetReadLimit(32 << 20) // nearby_objects / dungeon_map payloads can be large
|
|
|
|
// Keepalive pings.
|
|
pingCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
go func() {
|
|
t := time.NewTicker(20 * time.Second)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-pingCtx.Done():
|
|
return
|
|
case <-t.C:
|
|
pc, cc := context.WithTimeout(pingCtx, 10*time.Second)
|
|
_ = c.Ping(pc)
|
|
cc()
|
|
}
|
|
}
|
|
}()
|
|
|
|
s.log.Info("shadow consumer connected; replaying /ws/live into ingest", "url", wsURL)
|
|
var n int
|
|
for {
|
|
_, raw, err := c.Read(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var m map[string]any
|
|
if json.Unmarshal(raw, &m) != nil {
|
|
continue
|
|
}
|
|
s.ingestor.dispatch(ctx, m)
|
|
n++
|
|
if n%5000 == 0 {
|
|
s.log.Info("shadow consumer progress", "messages", n)
|
|
}
|
|
}
|
|
}
|