From a5d69ba88d68a788dffe8b57d7a5af3a02313384 Mon Sep 17 00:00:00 2001 From: Erik Date: Wed, 24 Jun 2026 10:31:15 +0200 Subject: [PATCH] =?UTF-8?q?feat(go-services):=20Phase=202=20ingest=20?= =?UTF-8?q?=E2=80=94=20shared=20Ingestor=20+=20shadow=20consumer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the plugin event handlers (the /ws/position write logic) as a shared Ingestor, validated against real traffic by replaying Python's /ws/live firehose into an isolated dereth_go DB (no production write, no plugin stolen). - ingest.go: faithful ports of telemetry (kill-delta -> char_stats, server received_at stamp), rare (rare_stats/rare_stats_sessions/rare_events), portal (coord upsert), character_stats (stats_data JSONB subset + upsert), spawn, and the memory-only handlers (vitals/quest/equipment_cantrip/nearby/dungeon). In -memory live state + read-side overlay accessors. - shadow.go: coder/websocket consumer of /ws/live -> Ingestor.dispatch (telemetry matched by shape since its broadcast has no type field). - main.go/store.go: ingest mode (READ_ONLY=false + SHADOW_INGEST_WS) wires the ingestor; read handlers (/character-stats, /equipment-cantrip, /quest-status) now consult the live overlay first, like Python. - compose: shadow instance ingests ws://dereth-tracker:8765/ws/live. Validated live: dereth_go has 73 distinct telemetry chars; shadow /live online set == production (73=73); character_stats 5/5 exact byte-match (0 mismatch); char_stats kill-deltas + portals accumulating. compare/compare_ingest.py. Deferred to next pass: combat_stats (delta/merge), share_*, the /ws/position + /ws/live servers (for cutover). Co-Authored-By: Claude Opus 4.8 --- go-services/compare/compare_ingest.py | 64 ++++ go-services/docker-compose.go.yml | 3 +- go-services/tracker-go/charstats.go | 7 + go-services/tracker-go/ingest.go | 431 ++++++++++++++++++++++++++ go-services/tracker-go/main.go | 12 + go-services/tracker-go/memstate.go | 23 +- go-services/tracker-go/shadow.go | 84 +++++ 7 files changed, 621 insertions(+), 3 deletions(-) create mode 100644 go-services/compare/compare_ingest.py create mode 100644 go-services/tracker-go/ingest.go create mode 100644 go-services/tracker-go/shadow.go diff --git a/go-services/compare/compare_ingest.py b/go-services/compare/compare_ingest.py new file mode 100644 index 00000000..faff8e44 --- /dev/null +++ b/go-services/compare/compare_ingest.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +"""Validate the Go shadow ingest (dereth_go) against production (dereth). + +Run on the server. The shadow tracker replays Python's /ws/live firehose into +its own dereth_go DB. Absolute counts differ (shadow started fresh; char_stats / +rare_stats accumulate deltas from connect time), so we validate the paths whose +writes are FULL upserts/inserts and therefore exactly comparable: + + * character_stats: a full-payload upsert. For a character whose row has the + SAME timestamp in both DBs, stats_data must be byte-identical. + * /live online set: telemetry end-to-end (compared separately by the caller). +""" +import json +import subprocess + +SEP = "\x1f" + + +def q(container, db, sql): + out = subprocess.check_output( + ["docker", "exec", container, "psql", "-U", "postgres", "-d", db, "-tA", "-F", SEP, "-c", sql], + text=True) + return [line.split(SEP) for line in out.splitlines() if line.strip()] + + +def main(): + print("=== dereth_go ingested row counts ===") + counts = q("dereth-go-db", "dereth_go", """ + SELECT 'telemetry_events', count(*)::text FROM telemetry_events + UNION ALL SELECT 'telemetry_distinct_chars', count(distinct character_name)::text FROM telemetry_events + UNION ALL SELECT 'character_stats', count(*)::text FROM character_stats + UNION ALL SELECT 'char_stats', count(*)::text FROM char_stats + UNION ALL SELECT 'rare_events', count(*)::text FROM rare_events + UNION ALL SELECT 'rare_stats', count(*)::text FROM rare_stats + UNION ALL SELECT 'portals', count(*)::text FROM portals + """) + for k, v in counts: + print(f" {k:26} {v}") + + print("\n=== character_stats exact match (same-timestamp rows) ===") + prod = {r[0]: (r[1], r[2]) for r in + q("dereth-db", "dereth", "SELECT character_name, timestamp::text, stats_data::text FROM character_stats")} + shadow = q("dereth-go-db", "dereth_go", + "SELECT character_name, timestamp::text, stats_data::text FROM character_stats") + match = mismatch = newer = 0 + for name, ts, sd in shadow: + if name not in prod: + continue + pts, psd = prod[name] + if ts != pts: + newer += 1 # one side got a newer character_stats message; not comparable + continue + if json.loads(sd) == json.loads(psd): + match += 1 + else: + mismatch += 1 + print(f" MISMATCH {name}") + print(f" exact match={match} mismatch={mismatch} skipped(diff timestamp)={newer}") + print("\nRESULT:", "ingest OK" if mismatch == 0 else f"{mismatch} character_stats mismatch(es)") + return 1 if mismatch else 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/go-services/docker-compose.go.yml b/go-services/docker-compose.go.yml index a80008f7..dfc32040 100644 --- a/go-services/docker-compose.go.yml +++ b/go-services/docker-compose.go.yml @@ -110,7 +110,8 @@ services: READ_ONLY: "false" # owns its DB; creates schema on boot INVENTORY_SERVICE_URL: "http://inventory-service:8000" SECRET_KEY: "${SECRET_KEY}" - # SHADOW_INGEST_WS: "ws://dereth-tracker:8765/ws/live" # enabled once ingest handlers land + # Replay the Python /ws/live firehose into the ingest handlers (shadow). + SHADOW_INGEST_WS: "ws://dereth-tracker:8765/ws/live" LOG_LEVEL: "INFO" depends_on: - dereth-go-db diff --git a/go-services/tracker-go/charstats.go b/go-services/tracker-go/charstats.go index 2faa573b..8376eb16 100644 --- a/go-services/tracker-go/charstats.go +++ b/go-services/tracker-go/charstats.go @@ -10,6 +10,13 @@ import ( // ingest-only freshness layer we don't have yet. (main.py:4137) func (s *Server) handleCharacterStats(w http.ResponseWriter, r *http.Request) { name := r.PathValue("name") + // Live overlay first (ingest mode), like Python's live_character_stats check. + if s.ingestor != nil { + if v, ok := s.ingestor.getCharacterStats(name); ok { + writeJSON(w, http.StatusOK, v) + return + } + } ctx, cancel := reqCtx(r) defer cancel() diff --git a/go-services/tracker-go/ingest.go b/go-services/tracker-go/ingest.go new file mode 100644 index 00000000..435ba369 --- /dev/null +++ b/go-services/tracker-go/ingest.go @@ -0,0 +1,431 @@ +package main + +import ( + "context" + "encoding/json" + "log/slog" + "strings" + "sync" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// Ingestor implements the plugin event handlers (the /ws/position logic), +// faithfully mirroring main.py's write semantics. It owns the in-memory live +// state and writes to a read-write pool (its own DB in shadow/cutover mode). +// +// It is fed either by the real /ws/position server (cutover) or by the shadow +// consumer replaying Python's /ws/live broadcast firehose. broadcast is invoked +// after each handled event (nil = no browser fan-out, e.g. shadow mode). +type Ingestor struct { + pool *pgxpool.Pool + log *slog.Logger + broadcast func(map[string]any) + + mu sync.RWMutex + liveSnapshots map[string]map[string]any + liveVitals map[string]map[string]any + liveCharacterStats map[string]map[string]any + liveEquipmentCantrip map[string]map[string]any + liveNearbyObjects map[string]map[string]any + liveCombatStats map[string]map[string]any + dungeonMapCache map[string]map[string]any + questStatus map[string]map[string]string + lastKills map[string]int // "session_id|character_name" -> kills +} + +func newIngestor(pool *pgxpool.Pool, log *slog.Logger, broadcast func(map[string]any)) *Ingestor { + return &Ingestor{ + pool: pool, + log: log, + broadcast: broadcast, + liveSnapshots: map[string]map[string]any{}, + liveVitals: map[string]map[string]any{}, + liveCharacterStats: map[string]map[string]any{}, + liveEquipmentCantrip: map[string]map[string]any{}, + liveNearbyObjects: map[string]map[string]any{}, + liveCombatStats: map[string]map[string]any{}, + dungeonMapCache: map[string]map[string]any{}, + questStatus: map[string]map[string]string{}, + lastKills: map[string]int{}, + } +} + +// dispatch routes a parsed message to the right handler. Over /ws/position the +// discriminator is the "type" field; over the /ws/live broadcast, telemetry has +// NO type (it's the raw snapshot), so we also match it by shape. +func (i *Ingestor) dispatch(ctx context.Context, data map[string]any) { + t := toStr(data["type"]) + switch { + case t == "telemetry" || (t == "" && hasTelemetryShape(data)): + i.handleTelemetry(ctx, data) + case t == "rare": + i.handleRare(ctx, data) + case t == "portal": + i.handlePortal(ctx, data) + case t == "character_stats": + i.handleCharacterStats(ctx, data) + case t == "spawn": + i.handleSpawn(ctx, data) + case t == "vitals": + i.handleVitals(data) + case t == "quest": + i.handleQuest(data) + case t == "equipment_cantrip_state": + i.handleEquipmentCantrip(data) + case t == "nearby_objects": + i.handleNearbyObjects(data) + case t == "dungeon_map": + i.handleDungeonMap(data) + case t == "register": + // no DB / no broadcast; plugin_conns belongs to the /ws/position server + case t == "combat_stats", strings.HasPrefix(t, "share_"), t == "chat": + // combat_stats + share_* handled in a later pass; chat is broadcast-only + } + if i.broadcast != nil { + i.broadcast(data) + } +} + +func hasTelemetryShape(d map[string]any) bool { + _, a := d["session_id"] + _, b := d["ew"] + _, c := d["kills"] + return a && b && c +} + +// --- telemetry: INSERT telemetry_events + kill-delta into char_stats (main.py:3124) --- + +const insTelemetry = `INSERT INTO telemetry_events +(character_name,char_tag,session_id,timestamp,ew,ns,z,kills,kills_per_hour,onlinetime, + deaths,total_deaths,rares_found,prismatic_taper_count,vt_state,mem_mb,cpu_pct,mem_handles,latency_ms,received_at) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,0,$13,$14,$15,$16,$17,$18,$19)` + +const upsertCharKills = `INSERT INTO char_stats (character_name,total_kills) VALUES ($1,$2) +ON CONFLICT (character_name) DO UPDATE SET total_kills = char_stats.total_kills + $2` + +func (i *Ingestor) handleTelemetry(ctx context.Context, data map[string]any) { + name := toStr(data["character_name"]) + sessionID := toStr(data["session_id"]) + if name == "" || sessionID == "" { + return + } + kills := toInt(data["kills"]) + received := time.Now().UTC() + + key := sessionID + "|" + name + i.mu.RLock() + last, ok := i.lastKills[key] + i.mu.RUnlock() + if !ok { + if row, err := queryRowAsMap(ctx, i.pool, + `SELECT kills FROM telemetry_events WHERE character_name=$1 AND session_id=$2 ORDER BY timestamp DESC LIMIT 1`, + name, sessionID); err == nil && row != nil { + last = toInt(row["kills"]) + } + } + delta := kills - last + + tx, err := i.pool.Begin(ctx) + if err != nil { + i.log.Error("telemetry tx begin failed", "err", err) + return + } + defer tx.Rollback(ctx) + if _, err := tx.Exec(ctx, insTelemetry, + name, nstr(data["char_tag"]), sessionID, parseTSAny(data["timestamp"]), + toFloat(data["ew"]), toFloat(data["ns"]), toFloat(data["z"]), kills, + nfloat(data["kills_per_hour"]), nstr(data["onlinetime"]), toInt(data["deaths"]), + nint(data["total_deaths"]), toInt(data["prismatic_taper_count"]), nstr(data["vt_state"]), + nfloat(data["mem_mb"]), nfloat(data["cpu_pct"]), nint(data["mem_handles"]), + nfloat(data["latency_ms"]), received, + ); err != nil { + i.log.Error("telemetry insert failed", "err", err, "char", name) + return + } + if delta > 0 { + if _, err := tx.Exec(ctx, upsertCharKills, name, delta); err != nil { + i.log.Error("char_stats upsert failed", "err", err, "char", name) + return + } + } + if err := tx.Commit(ctx); err != nil { + i.log.Error("telemetry commit failed", "err", err, "char", name) + return + } + + i.mu.Lock() + i.lastKills[key] = kills + i.liveSnapshots[name] = data + i.mu.Unlock() +} + +// --- rare: rare_stats + rare_stats_sessions + rare_events (main.py:3234) --- + +const upsertRareStats = `INSERT INTO rare_stats (character_name,total_rares) VALUES ($1,1) +ON CONFLICT (character_name) DO UPDATE SET total_rares = rare_stats.total_rares + 1` +const upsertRareSession = `INSERT INTO rare_stats_sessions (character_name,session_id,session_rares) VALUES ($1,$2,1) +ON CONFLICT (character_name,session_id) DO UPDATE SET session_rares = rare_stats_sessions.session_rares + 1` +const insRareEvent = `INSERT INTO rare_events (character_name,name,timestamp,ew,ns,z) VALUES ($1,$2,$3,$4,$5,$6)` + +func (i *Ingestor) handleRare(ctx context.Context, data map[string]any) { + name := toStr(data["character_name"]) + if strings.TrimSpace(name) == "" { + return + } + if _, err := i.pool.Exec(ctx, upsertRareStats, name); err != nil { + i.log.Error("rare_stats upsert failed", "err", err, "char", name) + return + } + // Session id: live snapshot first, else latest telemetry row. + i.mu.RLock() + sessionID := toStr(i.liveSnapshots[name]["session_id"]) + i.mu.RUnlock() + if sessionID == "" { + if row, err := queryRowAsMap(ctx, i.pool, + `SELECT session_id FROM telemetry_events WHERE character_name=$1 ORDER BY timestamp DESC LIMIT 1`, name); err == nil && row != nil { + sessionID = toStr(row["session_id"]) + } + } + if sessionID != "" { + if _, err := i.pool.Exec(ctx, upsertRareSession, name, sessionID); err != nil { + i.log.Error("rare_stats_sessions upsert failed", "err", err, "char", name) + } + } + if _, err := i.pool.Exec(ctx, insRareEvent, + name, toStr(data["name"]), parseTSAny(data["timestamp"]), + toFloat(data["ew"]), toFloat(data["ns"]), toFloatOr(data["z"], 0), + ); err != nil { + i.log.Error("rare_events insert failed", "err", err, "char", name) + } +} + +// --- portal: upsert on rounded coords (main.py:3567) --- + +const upsertPortal = `INSERT INTO portals (portal_name,ns,ew,z,discovered_at,discovered_by) +VALUES ($1,$2,$3,$4,$5,$6) +ON CONFLICT (ROUND(ns::numeric,1), ROUND(ew::numeric,1)) DO UPDATE SET + discovered_at = EXCLUDED.discovered_at, + discovered_by = EXCLUDED.discovered_by, + portal_name = EXCLUDED.portal_name` + +func (i *Ingestor) handlePortal(ctx context.Context, data map[string]any) { + name := toStr(data["character_name"]) + portalName := toStr(data["portal_name"]) + ts := data["timestamp"] + if name == "" || portalName == "" || data["ns"] == nil || data["ew"] == nil || data["z"] == nil || ts == nil { + return + } + if _, err := i.pool.Exec(ctx, upsertPortal, + portalName, toFloat(data["ns"]), toFloat(data["ew"]), toFloat(data["z"]), + parseTSAny(ts), name, + ); err != nil { + i.log.Error("portal upsert failed", "err", err, "char", name) + } +} + +// --- character_stats: build stats_data subset + upsert (main.py:3443) --- + +const upsertCharacterStats = `INSERT INTO character_stats +(character_name,timestamp,level,total_xp,unassigned_xp,luminance_earned,luminance_total,deaths,stats_data) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) +ON CONFLICT (character_name) DO UPDATE SET + timestamp=EXCLUDED.timestamp, level=EXCLUDED.level, total_xp=EXCLUDED.total_xp, + unassigned_xp=EXCLUDED.unassigned_xp, luminance_earned=EXCLUDED.luminance_earned, + luminance_total=EXCLUDED.luminance_total, deaths=EXCLUDED.deaths, stats_data=EXCLUDED.stats_data` + +var statsDataKeys = []string{ + "attributes", "vitals", "skills", "allegiance", "active_item_enchantments", + "race", "gender", "birth", "current_title", "skill_credits", "burden", + "burden_units", "encumbrance_capacity", "properties", "titles", +} + +func (i *Ingestor) handleCharacterStats(ctx context.Context, data map[string]any) { + name := toStr(data["character_name"]) + if name == "" { + return + } + statsData := map[string]any{} + for _, k := range statsDataKeys { + if v, ok := data[k]; ok && v != nil { + statsData[k] = v + } + } + sdJSON, _ := json.Marshal(statsData) + if _, err := i.pool.Exec(ctx, upsertCharacterStats, + name, parseTSAny(data["timestamp"]), nint(data["level"]), nint(data["total_xp"]), + nint(data["unassigned_xp"]), nint(data["luminance_earned"]), nint(data["luminance_total"]), + nint(data["deaths"]), sdJSON, + ); err != nil { + i.log.Error("character_stats upsert failed", "err", err, "char", name) + return + } + i.mu.Lock() + i.liveCharacterStats[name] = data + i.mu.Unlock() +} + +// --- spawn: INSERT spawn_events (main.py:3110). Not broadcast, so only the real +// /ws/position path feeds this; covered by ingest_test.go. --- + +const insSpawn = `INSERT INTO spawn_events (character_name,mob,timestamp,ew,ns,z) VALUES ($1,$2,$3,$4,$5,$6)` + +func (i *Ingestor) handleSpawn(ctx context.Context, data map[string]any) { + name := toStr(data["character_name"]) + mob := toStr(data["mob"]) + if name == "" || mob == "" { + return + } + if _, err := i.pool.Exec(ctx, insSpawn, + name, mob, parseTSAny(data["timestamp"]), + toFloat(data["ew"]), toFloat(data["ns"]), toFloatOr(data["z"], 0), + ); err != nil { + i.log.Error("spawn insert failed", "err", err, "char", name) + } +} + +// --- memory-only handlers --- + +func (i *Ingestor) handleVitals(data map[string]any) { + name := toStr(data["character_name"]) + if name == "" { + return + } + // Death detection (discord alert) is intentionally omitted in shadow mode — + // it would duplicate the production alert. The live overlay still updates. + i.mu.Lock() + i.liveVitals[name] = data + i.mu.Unlock() +} + +var allowedQuests = map[string]bool{ + "Stipend Collection Timer": true, + "Blank Augmentation Gem Pickup Timer": true, + "Insatiable Eater Jaw": true, +} + +func (i *Ingestor) handleQuest(data map[string]any) { + name := toStr(data["character_name"]) + quest := toStr(data["quest_name"]) + countdown, ok := data["countdown"] + if name == "" || quest == "" || !ok || countdown == nil || !allowedQuests[quest] { + return + } + i.mu.Lock() + if i.questStatus[name] == nil { + i.questStatus[name] = map[string]string{} + } + i.questStatus[name][quest] = toStr(countdown) + i.mu.Unlock() +} + +func (i *Ingestor) handleEquipmentCantrip(data map[string]any) { + name := toStr(data["character_name"]) + if name == "" { + return + } + i.mu.Lock() + i.liveEquipmentCantrip[name] = data + i.mu.Unlock() +} + +func (i *Ingestor) handleNearbyObjects(data map[string]any) { + name := toStr(data["character_name"]) + if name == "" { + return + } + i.mu.Lock() + i.liveNearbyObjects[name] = data + i.mu.Unlock() +} + +func (i *Ingestor) handleDungeonMap(data map[string]any) { + lb := toStr(data["landblock"]) + if lb == "" { + return + } + i.mu.Lock() + i.dungeonMapCache[lb] = data + i.mu.Unlock() +} + +// --- read-side overlay accessors (used by the HTTP handlers when an ingestor +// is present, mirroring Python's "live cache first, DB fallback") --- + +func (i *Ingestor) snapshot(m map[string]map[string]any, name string) (map[string]any, bool) { + i.mu.RLock() + defer i.mu.RUnlock() + v, ok := m[name] + return v, ok +} + +func (i *Ingestor) getCharacterStats(name string) (map[string]any, bool) { + return i.snapshot(i.liveCharacterStats, name) +} +func (i *Ingestor) getEquipmentCantrip(name string) (map[string]any, bool) { + return i.snapshot(i.liveEquipmentCantrip, name) +} +func (i *Ingestor) questData() (map[string]map[string]string, int) { + i.mu.RLock() + defer i.mu.RUnlock() + out := make(map[string]map[string]string, len(i.questStatus)) + for c, qs := range i.questStatus { + cp := make(map[string]string, len(qs)) + for k, v := range qs { + cp[k] = v + } + out[c] = cp + } + return out, len(i.questStatus) +} + +// --- small value helpers (JSON numbers decode as float64) --- + +func nstr(v any) any { + if s, ok := v.(string); ok { + return s + } + return nil +} +func nint(v any) any { + switch x := v.(type) { + case float64: + return int64(x) + case int: + return int64(x) + case int64: + return x + } + return nil +} +func nfloat(v any) any { + if f, ok := v.(float64); ok { + return f + } + return nil +} +func toFloatOr(v any, def float64) float64 { + if f, ok := v.(float64); ok { + return f + } + return def +} + +func parseTSAny(v any) time.Time { + s, ok := v.(string) + if !ok { + return time.Now().UTC() + } + s = strings.Replace(s, "Z", "+00:00", 1) + for _, l := range []string{ + time.RFC3339Nano, time.RFC3339, + "2006-01-02T15:04:05.999999-07:00", "2006-01-02T15:04:05-07:00", + "2006-01-02T15:04:05.999999", "2006-01-02T15:04:05", + } { + if t, err := time.Parse(l, s); err == nil { + return t + } + } + return time.Now().UTC() +} diff --git a/go-services/tracker-go/main.go b/go-services/tracker-go/main.go index c7c991f8..824668a4 100644 --- a/go-services/tracker-go/main.go +++ b/go-services/tracker-go/main.go @@ -39,6 +39,7 @@ type Server struct { invProxy *httputil.ReverseProxy staticDir string secretKey string + ingestor *Ingestor // non-nil only in ingest/shadow mode log *slog.Logger } @@ -93,6 +94,17 @@ func main() { cancel() } + // Shadow ingest: replay the Python /ws/live firehose into our handlers. + if cfg.IngestWS != "" { + if cfg.ReadOnly { + logger.Error("SHADOW_INGEST_WS set but READ_ONLY=true; refusing to ingest into the production DB") + os.Exit(1) + } + srv.ingestor = newIngestor(pool, logger, nil) + go srv.runShadowConsumer(ctx, cfg.IngestWS) + logger.Info("shadow ingest enabled", "source", cfg.IngestWS) + } + go srv.runCacheLoop(ctx) go srv.runTotalsLoop(ctx) logger.Info("db connected; cache loops started", diff --git a/go-services/tracker-go/memstate.go b/go-services/tracker-go/memstate.go index 1dc428e5..bb746a05 100644 --- a/go-services/tracker-go/memstate.go +++ b/go-services/tracker-go/memstate.go @@ -14,14 +14,27 @@ import ( // GET /quest-status (main.py:1940) func (s *Server) handleQuestStatus(w http.ResponseWriter, r *http.Request) { + questData := map[string]any{} + playerCount := 0 + if s.ingestor != nil { + qd, n := s.ingestor.questData() + playerCount = n + for c, qs := range qd { + m := map[string]any{} + for k, v := range qs { + m[k] = v + } + questData[c] = m + } + } writeJSON(w, http.StatusOK, map[string]any{ - "quest_data": map[string]any{}, + "quest_data": questData, "tracked_quests": []string{ "Stipend Collection Timer", "Blank Augmentation Gem Pickup Timer", "Insatiable Eater Jaw", }, - "player_count": 0, + "player_count": playerCount, }) } @@ -36,6 +49,12 @@ func (s *Server) handleVitalSharingPeers(w http.ResponseWriter, r *http.Request) // GET /equipment-cantrip-state/{name} (main.py:4167) func (s *Server) handleEquipmentCantrip(w http.ResponseWriter, r *http.Request) { name := r.PathValue("name") + if s.ingestor != nil { + if v, ok := s.ingestor.getEquipmentCantrip(name); ok { + writeJSON(w, http.StatusOK, v) + return + } + } writeJSON(w, http.StatusOK, map[string]any{ "type": "equipment_cantrip_state", "character_name": name, diff --git a/go-services/tracker-go/shadow.go b/go-services/tracker-go/shadow.go new file mode 100644 index 00000000..49f8ea88 --- /dev/null +++ b/go-services/tracker-go/shadow.go @@ -0,0 +1,84 @@ +package main + +import ( + "context" + "encoding/json" + "time" + + "github.com/coder/websocket" +) + +// runShadowConsumer connects to the Python tracker's /ws/live, receives the full +// broadcast firehose (no subscribe = all types), and replays each message +// through the ingest handlers into THIS instance's own DB. This validates the +// Go ingest path against real traffic without touching production or stealing +// plugin connections. Reconnects with exponential backoff. +// +// Note: telemetry broadcasts carry no "type" field (dispatch matches by shape); +// spawn and full_inventory are NOT broadcast, so they don't arrive here (covered +// by unit tests / the future /ws/position path). +func (s *Server) runShadowConsumer(ctx context.Context, wsURL string) { + backoff := time.Second + const maxBackoff = 60 * time.Second + for ctx.Err() == nil { + err := s.shadowConnect(ctx, wsURL) + if ctx.Err() != nil { + return + } + s.log.Warn("shadow consumer disconnected; reconnecting", "err", err, "backoff", backoff.String()) + select { + case <-ctx.Done(): + return + case <-time.After(backoff): + } + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } +} + +func (s *Server) shadowConnect(ctx context.Context, wsURL string) error { + c, _, err := websocket.Dial(ctx, wsURL, nil) + if err != nil { + return err + } + defer c.CloseNow() + c.SetReadLimit(32 << 20) // nearby_objects / dungeon_map payloads can be large + + // Keepalive pings. + pingCtx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + t := time.NewTicker(20 * time.Second) + defer t.Stop() + for { + select { + case <-pingCtx.Done(): + return + case <-t.C: + pc, cc := context.WithTimeout(pingCtx, 10*time.Second) + _ = c.Ping(pc) + cc() + } + } + }() + + s.log.Info("shadow consumer connected; replaying /ws/live into ingest", "url", wsURL) + var n int + for { + _, raw, err := c.Read(ctx) + if err != nil { + return err + } + var m map[string]any + if json.Unmarshal(raw, &m) != nil { + continue + } + s.ingestor.dispatch(ctx, m) + n++ + if n%5000 == 0 { + s.log.Info("shadow consumer progress", "messages", n) + } + } +}