diff --git a/go-services/tracker-go/ingest.go b/go-services/tracker-go/ingest.go index c825094e..925aaf80 100644 --- a/go-services/tracker-go/ingest.go +++ b/go-services/tracker-go/ingest.go @@ -32,16 +32,21 @@ type Ingestor struct { liveCombatStats map[string]map[string]any dungeonMapCache map[string]map[string]any questStatus map[string]map[string]string - lastKills map[string]int // "session_id|character_name" -> kills + lastKills map[string]int // "session_id|character_name" -> kills combatLastSession map[string]map[string]any // "char:session_id" -> last cumulative session combatLifetimeCache map[string]map[string]any // character_name -> accumulated lifetime + vitalSubscribers map[string]bool + vitalPeerState map[string]map[string]any + + plugins *pluginRegistry // for share_* fan-out + plugin_connected status } -func newIngestor(pool *pgxpool.Pool, log *slog.Logger, broadcast func(map[string]any)) *Ingestor { +func newIngestor(pool *pgxpool.Pool, log *slog.Logger, broadcast func(map[string]any), plugins *pluginRegistry) *Ingestor { return &Ingestor{ pool: pool, log: log, broadcast: broadcast, + plugins: plugins, liveSnapshots: map[string]map[string]any{}, liveVitals: map[string]map[string]any{}, liveCharacterStats: map[string]map[string]any{}, @@ -53,6 +58,8 @@ func newIngestor(pool *pgxpool.Pool, log *slog.Logger, broadcast func(map[string lastKills: map[string]int{}, combatLastSession: map[string]map[string]any{}, combatLifetimeCache: map[string]map[string]any{}, + vitalSubscribers: map[string]bool{}, + vitalPeerState: map[string]map[string]any{}, } } @@ -84,10 +91,17 @@ func (i *Ingestor) dispatch(ctx context.Context, data map[string]any) { i.handleDungeonMap(data) case t == "combat_stats": i.handleCombatStats(ctx, data) + case t == "share_subscribe": + i.handleShareSubscribe(data) + case t == "share_unsubscribe": + i.handleShareUnsubscribe(data) + return // unsubscribe broadcasts its own share_peer_removed; don't re-broadcast + case strings.HasPrefix(t, "share_"): + i.handleShareUpdate(t, data) case t == "register": // no DB / no broadcast; plugin_conns belongs to the /ws/position server - case strings.HasPrefix(t, "share_"), t == "chat": - // share_* handled in a later pass; chat is broadcast-only + case t == "chat": + // broadcast-only } if i.broadcast != nil { i.broadcast(data) diff --git a/go-services/tracker-go/main.go b/go-services/tracker-go/main.go index 652cfa20..742459b1 100644 --- a/go-services/tracker-go/main.go +++ b/go-services/tracker-go/main.go @@ -116,7 +116,7 @@ func main() { logger.Error("SHADOW_INGEST_WS set but READ_ONLY=true; refusing to ingest into the production DB") os.Exit(1) } - srv.ingestor = newIngestor(pool, logger, srv.hub.broadcast) + srv.ingestor = newIngestor(pool, logger, srv.hub.broadcast, srv.plugins) go srv.runShadowConsumer(ctx, cfg.IngestWS) logger.Info("shadow ingest enabled", "source", cfg.IngestWS) } diff --git a/go-services/tracker-go/memstate.go b/go-services/tracker-go/memstate.go index bb746a05..8c0f8853 100644 --- a/go-services/tracker-go/memstate.go +++ b/go-services/tracker-go/memstate.go @@ -5,6 +5,7 @@ import ( "net/http" "os" "path/filepath" + "sort" ) // These endpoints are backed by ingest-only in-memory state in the Python @@ -40,10 +41,15 @@ func (s *Server) handleQuestStatus(w http.ResponseWriter, r *http.Request) { // GET /vital-sharing/peers (main.py:1800) func (s *Server) handleVitalSharingPeers(w http.ResponseWriter, r *http.Request) { - writeJSON(w, http.StatusOK, map[string]any{ - "peers": []any{}, - "subscriber_count": 0, + if s.ingestor == nil { + writeJSON(w, http.StatusOK, map[string]any{"peers": []any{}, "subscriber_count": 0}) + return + } + peers, subCount := s.ingestor.vitalSharingPeers() + sort.Slice(peers, func(i, j int) bool { + return toStr(peers[i]["character_name"]) < toStr(peers[j]["character_name"]) }) + writeJSON(w, http.StatusOK, map[string]any{"peers": peers, "subscriber_count": subCount}) } // GET /equipment-cantrip-state/{name} (main.py:4167) diff --git a/go-services/tracker-go/shadow.go b/go-services/tracker-go/shadow.go index 0545c4ed..619869b6 100644 --- a/go-services/tracker-go/shadow.go +++ b/go-services/tracker-go/shadow.go @@ -48,22 +48,8 @@ func (s *Server) shadowConnect(ctx context.Context, wsURL string) error { connCtx, cancel := context.WithCancel(ctx) defer cancel() - - // Keepalive pings. - go func() { - t := time.NewTicker(20 * time.Second) - defer t.Stop() - for { - select { - case <-connCtx.Done(): - return - case <-t.C: - pc, cc := context.WithTimeout(connCtx, 10*time.Second) - _ = c.Ping(pc) - cc() - } - } - }() + // No outbound keepalive ping: the firehose is constant, so the connection is + // never idle, and the read-deadline watchdog below handles dead connections. // Decouple socket read from ALL processing, including JSON parsing: the read // loop only copies raw frames onto a queue, so it drains the socket as fast @@ -96,9 +82,9 @@ func (s *Server) shadowConnect(ctx context.Context, wsURL string) error { func (s *Server) shadowReadLoop(ctx context.Context, c *websocket.Conn, queue chan []byte, n, dropped *int) error { for { // Read deadline acts as a liveness watchdog: the firehose is constant, so - // a long silence means the upstream evicted us without closing — time out - // and let runShadowConsumer reconnect. - rctx, rcancel := context.WithTimeout(ctx, 25*time.Second) + // a multi-second silence means the upstream evicted us without closing — + // time out quickly and let runShadowConsumer reconnect (high duty cycle). + rctx, rcancel := context.WithTimeout(ctx, 12*time.Second) _, raw, err := c.Read(rctx) rcancel() if err != nil { diff --git a/go-services/tracker-go/share.go b/go-services/tracker-go/share.go new file mode 100644 index 00000000..63090ea0 --- /dev/null +++ b/go-services/tracker-go/share.go @@ -0,0 +1,111 @@ +package main + +// Cross-machine vital sharing (share_*), a faithful port of main.py:3658-3703 + +// _update_vital_sharing_peer_state / _broadcast_share_to_plugin_clients. +// Memory-only: subscriber set + last-known peer snapshot, fanned out to other +// opted-in plugin clients and to browsers. In shadow mode there are no plugin +// connections, so the fan-out is a no-op; the peer state still drives +// /vital-sharing/peers. + +func (i *Ingestor) handleShareSubscribe(data map[string]any) { + char := toStr(data["character_name"]) + if char == "" { + return + } + i.mu.Lock() + i.vitalSubscribers[char] = true + entry := i.vitalPeerEntry(char) + if tags, ok := data["tags"].([]any); ok { + entry["tags"] = tags + } + entry["connected"] = true + i.mu.Unlock() +} + +func (i *Ingestor) handleShareUnsubscribe(data map[string]any) { + char := toStr(data["character_name"]) + if char == "" { + return + } + i.mu.Lock() + delete(i.vitalSubscribers, char) + delete(i.vitalPeerState, char) + i.mu.Unlock() + if i.broadcast != nil { + i.broadcast(map[string]any{"type": "share_peer_removed", "character_name": char}) + } +} + +func (i *Ingestor) handleShareUpdate(msgType string, data map[string]any) { + origin := toStr(data["character_name"]) + i.mu.Lock() + i.updateVitalPeerState(msgType, data) + // Snapshot subscribers for the fan-out. + subs := make(map[string]bool, len(i.vitalSubscribers)) + for k := range i.vitalSubscribers { + subs[k] = true + } + i.mu.Unlock() + // Fan out to other opted-in plugin clients (no-op when no plugins connected). + if i.plugins != nil && len(subs) > 0 { + i.plugins.fanoutShare(data, origin, subs) + } +} + +// vitalPeerEntry returns (creating if needed) the peer snapshot for char. Caller +// holds i.mu. +func (i *Ingestor) vitalPeerEntry(char string) map[string]any { + entry, ok := i.vitalPeerState[char] + if !ok { + entry = map[string]any{ + "character_name": char, "tags": []any{}, "vitals": nil, + "position": nil, "items": nil, "connected": true, "last_update": nil, + } + i.vitalPeerState[char] = entry + } + return entry +} + +// updateVitalPeerState mirrors _update_vital_sharing_peer_state. Caller holds i.mu. +func (i *Ingestor) updateVitalPeerState(msgType string, data map[string]any) { + char := toStr(data["character_name"]) + if char == "" { + return + } + entry := i.vitalPeerEntry(char) + entry["last_update"] = data["timestamp"] + if tags, ok := data["tags"].([]any); ok { + entry["tags"] = tags + } + switch msgType { + case "share_vital_update": + entry["vitals"] = map[string]any{ + "current_health": data["current_health"], "max_health": data["max_health"], + "current_stamina": data["current_stamina"], "max_stamina": data["max_stamina"], + "current_mana": data["current_mana"], "max_mana": data["max_mana"], + } + case "share_position_update": + entry["position"] = map[string]any{ + "ew": data["ew"], "ns": data["ns"], "z": data["z"], "heading": data["heading"], + } + case "share_item_update": + entry["items"] = data["items"] + } +} + +// vitalSharingPeers returns the peer list for /vital-sharing/peers (main.py:1800). +func (i *Ingestor) vitalSharingPeers() ([]map[string]any, int) { + i.mu.RLock() + defer i.mu.RUnlock() + peers := make([]map[string]any, 0, len(i.vitalPeerState)) + for char, entry := range i.vitalPeerState { + p := make(map[string]any, len(entry)+2) + for k, v := range entry { + p[k] = v + } + p["subscribed"] = i.vitalSubscribers[char] + p["plugin_connected"] = i.plugins != nil && i.plugins.isConnected(char) + peers = append(peers, p) + } + return peers, len(i.vitalSubscribers) +} diff --git a/go-services/tracker-go/wsposition.go b/go-services/tracker-go/wsposition.go index 73a075d9..84892558 100644 --- a/go-services/tracker-go/wsposition.go +++ b/go-services/tracker-go/wsposition.go @@ -69,6 +69,33 @@ func (p *pluginRegistry) send(name string, payload map[string]any) { } } +// fanoutShare forwards a share_* message to other opted-in plugin clients +// (every connected name that is subscribed and isn't the origin). Send failures +// are logged-and-ignored, not evicted (main.py:2829). +func (p *pluginRegistry) fanoutShare(data map[string]any, origin string, subs map[string]bool) { + p.mu.RLock() + type target struct { + name string + c *websocket.Conn + } + var targets []target + for n, c := range p.conns { + if n != origin && subs[n] { + targets = append(targets, target{n, c}) + } + } + p.mu.RUnlock() + if len(targets) == 0 { + return + } + b, _ := json.Marshal(data) + for _, t := range targets { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + _ = t.c.Write(ctx, websocket.MessageText, b) + cancel() + } +} + // pluginAuthOK constant-time-compares the supplied secret to SHARED_SECRET (and // the optional rotation fallback). Fails closed when unset or left at the // placeholder, matching main.py.