feat(go-services): tracker share_* handlers (complete ingest) + shadow tuning

- share.go: cross-machine vital sharing (share_subscribe/unsubscribe/share_*),
  faithful port of the peer-state snapshot + plugin fan-out + /vital-sharing/peers.
  The last ingest handler — the Go tracker now handles every plugin event type.
- shadow consumer: drop the outbound keepalive ping (the firehose is never idle)
  and tighten the read-deadline watchdog to 12s for faster reconnect after the
  upstream's periodic eviction (full-firehose browser clients get evicted ~every
  90s; the watchdog recovers it, ~90% duty cycle). Production-bound /ws/position
  is unaffected (plugins connect to us; no eviction).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Erik 2026-06-24 11:27:25 +02:00
parent 27757636e4
commit 5b2db439a3
6 changed files with 171 additions and 27 deletions

View file

@ -35,13 +35,18 @@ type Ingestor struct {
lastKills map[string]int // "session_id|character_name" -> kills
combatLastSession map[string]map[string]any // "char:session_id" -> last cumulative session
combatLifetimeCache map[string]map[string]any // character_name -> accumulated lifetime
vitalSubscribers map[string]bool
vitalPeerState map[string]map[string]any
plugins *pluginRegistry // for share_* fan-out + plugin_connected status
}
func newIngestor(pool *pgxpool.Pool, log *slog.Logger, broadcast func(map[string]any)) *Ingestor {
func newIngestor(pool *pgxpool.Pool, log *slog.Logger, broadcast func(map[string]any), plugins *pluginRegistry) *Ingestor {
return &Ingestor{
pool: pool,
log: log,
broadcast: broadcast,
plugins: plugins,
liveSnapshots: map[string]map[string]any{},
liveVitals: map[string]map[string]any{},
liveCharacterStats: map[string]map[string]any{},
@ -53,6 +58,8 @@ func newIngestor(pool *pgxpool.Pool, log *slog.Logger, broadcast func(map[string
lastKills: map[string]int{},
combatLastSession: map[string]map[string]any{},
combatLifetimeCache: map[string]map[string]any{},
vitalSubscribers: map[string]bool{},
vitalPeerState: map[string]map[string]any{},
}
}
@ -84,10 +91,17 @@ func (i *Ingestor) dispatch(ctx context.Context, data map[string]any) {
i.handleDungeonMap(data)
case t == "combat_stats":
i.handleCombatStats(ctx, data)
case t == "share_subscribe":
i.handleShareSubscribe(data)
case t == "share_unsubscribe":
i.handleShareUnsubscribe(data)
return // unsubscribe broadcasts its own share_peer_removed; don't re-broadcast
case strings.HasPrefix(t, "share_"):
i.handleShareUpdate(t, data)
case t == "register":
// no DB / no broadcast; plugin_conns belongs to the /ws/position server
case strings.HasPrefix(t, "share_"), t == "chat":
// share_* handled in a later pass; chat is broadcast-only
case t == "chat":
// broadcast-only
}
if i.broadcast != nil {
i.broadcast(data)

View file

@ -116,7 +116,7 @@ func main() {
logger.Error("SHADOW_INGEST_WS set but READ_ONLY=true; refusing to ingest into the production DB")
os.Exit(1)
}
srv.ingestor = newIngestor(pool, logger, srv.hub.broadcast)
srv.ingestor = newIngestor(pool, logger, srv.hub.broadcast, srv.plugins)
go srv.runShadowConsumer(ctx, cfg.IngestWS)
logger.Info("shadow ingest enabled", "source", cfg.IngestWS)
}

View file

@ -5,6 +5,7 @@ import (
"net/http"
"os"
"path/filepath"
"sort"
)
// These endpoints are backed by ingest-only in-memory state in the Python
@ -40,10 +41,15 @@ func (s *Server) handleQuestStatus(w http.ResponseWriter, r *http.Request) {
// GET /vital-sharing/peers (main.py:1800)
func (s *Server) handleVitalSharingPeers(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]any{
"peers": []any{},
"subscriber_count": 0,
if s.ingestor == nil {
writeJSON(w, http.StatusOK, map[string]any{"peers": []any{}, "subscriber_count": 0})
return
}
peers, subCount := s.ingestor.vitalSharingPeers()
sort.Slice(peers, func(i, j int) bool {
return toStr(peers[i]["character_name"]) < toStr(peers[j]["character_name"])
})
writeJSON(w, http.StatusOK, map[string]any{"peers": peers, "subscriber_count": subCount})
}
// GET /equipment-cantrip-state/{name} (main.py:4167)

View file

@ -48,22 +48,8 @@ func (s *Server) shadowConnect(ctx context.Context, wsURL string) error {
connCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Keepalive pings.
go func() {
t := time.NewTicker(20 * time.Second)
defer t.Stop()
for {
select {
case <-connCtx.Done():
return
case <-t.C:
pc, cc := context.WithTimeout(connCtx, 10*time.Second)
_ = c.Ping(pc)
cc()
}
}
}()
// No outbound keepalive ping: the firehose is constant, so the connection is
// never idle, and the read-deadline watchdog below handles dead connections.
// Decouple socket read from ALL processing, including JSON parsing: the read
// loop only copies raw frames onto a queue, so it drains the socket as fast
@ -96,9 +82,9 @@ func (s *Server) shadowConnect(ctx context.Context, wsURL string) error {
func (s *Server) shadowReadLoop(ctx context.Context, c *websocket.Conn, queue chan []byte, n, dropped *int) error {
for {
// Read deadline acts as a liveness watchdog: the firehose is constant, so
// a long silence means the upstream evicted us without closing — time out
// and let runShadowConsumer reconnect.
rctx, rcancel := context.WithTimeout(ctx, 25*time.Second)
// a multi-second silence means the upstream evicted us without closing —
// time out quickly and let runShadowConsumer reconnect (high duty cycle).
rctx, rcancel := context.WithTimeout(ctx, 12*time.Second)
_, raw, err := c.Read(rctx)
rcancel()
if err != nil {

View file

@ -0,0 +1,111 @@
package main
// Cross-machine vital sharing (share_*), a faithful port of main.py:3658-3703 +
// _update_vital_sharing_peer_state / _broadcast_share_to_plugin_clients.
// Memory-only: subscriber set + last-known peer snapshot, fanned out to other
// opted-in plugin clients and to browsers. In shadow mode there are no plugin
// connections, so the fan-out is a no-op; the peer state still drives
// /vital-sharing/peers.
func (i *Ingestor) handleShareSubscribe(data map[string]any) {
char := toStr(data["character_name"])
if char == "" {
return
}
i.mu.Lock()
i.vitalSubscribers[char] = true
entry := i.vitalPeerEntry(char)
if tags, ok := data["tags"].([]any); ok {
entry["tags"] = tags
}
entry["connected"] = true
i.mu.Unlock()
}
func (i *Ingestor) handleShareUnsubscribe(data map[string]any) {
char := toStr(data["character_name"])
if char == "" {
return
}
i.mu.Lock()
delete(i.vitalSubscribers, char)
delete(i.vitalPeerState, char)
i.mu.Unlock()
if i.broadcast != nil {
i.broadcast(map[string]any{"type": "share_peer_removed", "character_name": char})
}
}
func (i *Ingestor) handleShareUpdate(msgType string, data map[string]any) {
origin := toStr(data["character_name"])
i.mu.Lock()
i.updateVitalPeerState(msgType, data)
// Snapshot subscribers for the fan-out.
subs := make(map[string]bool, len(i.vitalSubscribers))
for k := range i.vitalSubscribers {
subs[k] = true
}
i.mu.Unlock()
// Fan out to other opted-in plugin clients (no-op when no plugins connected).
if i.plugins != nil && len(subs) > 0 {
i.plugins.fanoutShare(data, origin, subs)
}
}
// vitalPeerEntry returns (creating if needed) the peer snapshot for char. Caller
// holds i.mu.
func (i *Ingestor) vitalPeerEntry(char string) map[string]any {
entry, ok := i.vitalPeerState[char]
if !ok {
entry = map[string]any{
"character_name": char, "tags": []any{}, "vitals": nil,
"position": nil, "items": nil, "connected": true, "last_update": nil,
}
i.vitalPeerState[char] = entry
}
return entry
}
// updateVitalPeerState mirrors _update_vital_sharing_peer_state. Caller holds i.mu.
func (i *Ingestor) updateVitalPeerState(msgType string, data map[string]any) {
char := toStr(data["character_name"])
if char == "" {
return
}
entry := i.vitalPeerEntry(char)
entry["last_update"] = data["timestamp"]
if tags, ok := data["tags"].([]any); ok {
entry["tags"] = tags
}
switch msgType {
case "share_vital_update":
entry["vitals"] = map[string]any{
"current_health": data["current_health"], "max_health": data["max_health"],
"current_stamina": data["current_stamina"], "max_stamina": data["max_stamina"],
"current_mana": data["current_mana"], "max_mana": data["max_mana"],
}
case "share_position_update":
entry["position"] = map[string]any{
"ew": data["ew"], "ns": data["ns"], "z": data["z"], "heading": data["heading"],
}
case "share_item_update":
entry["items"] = data["items"]
}
}
// vitalSharingPeers returns the peer list for /vital-sharing/peers (main.py:1800).
func (i *Ingestor) vitalSharingPeers() ([]map[string]any, int) {
i.mu.RLock()
defer i.mu.RUnlock()
peers := make([]map[string]any, 0, len(i.vitalPeerState))
for char, entry := range i.vitalPeerState {
p := make(map[string]any, len(entry)+2)
for k, v := range entry {
p[k] = v
}
p["subscribed"] = i.vitalSubscribers[char]
p["plugin_connected"] = i.plugins != nil && i.plugins.isConnected(char)
peers = append(peers, p)
}
return peers, len(i.vitalSubscribers)
}

View file

@ -69,6 +69,33 @@ func (p *pluginRegistry) send(name string, payload map[string]any) {
}
}
// fanoutShare forwards a share_* message to other opted-in plugin clients
// (every connected name that is subscribed and isn't the origin). Send failures
// are logged-and-ignored, not evicted (main.py:2829).
func (p *pluginRegistry) fanoutShare(data map[string]any, origin string, subs map[string]bool) {
p.mu.RLock()
type target struct {
name string
c *websocket.Conn
}
var targets []target
for n, c := range p.conns {
if n != origin && subs[n] {
targets = append(targets, target{n, c})
}
}
p.mu.RUnlock()
if len(targets) == 0 {
return
}
b, _ := json.Marshal(data)
for _, t := range targets {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_ = t.c.Write(ctx, websocket.MessageText, b)
cancel()
}
}
// pluginAuthOK constant-time-compares the supplied secret to SHARED_SECRET (and
// the optional rotation fallback). Fails closed when unset or left at the
// placeholder, matching main.py.