package main import ( "context" "encoding/json" "log/slog" "strings" "sync" "time" "github.com/jackc/pgx/v5/pgxpool" ) // Ingestor implements the plugin event handlers (the /ws/position logic), // faithfully mirroring main.py's write semantics. It owns the in-memory live // state and writes to a read-write pool (its own DB in shadow/cutover mode). // // It is fed either by the real /ws/position server (cutover) or by the shadow // consumer replaying Python's /ws/live broadcast firehose. broadcast is invoked // after each handled event (nil = no browser fan-out, e.g. shadow mode). type Ingestor struct { pool *pgxpool.Pool log *slog.Logger broadcast func(map[string]any) mu sync.RWMutex liveSnapshots map[string]map[string]any liveVitals map[string]map[string]any liveCharacterStats map[string]map[string]any liveEquipmentCantrip map[string]map[string]any liveNearbyObjects map[string]map[string]any liveCombatStats map[string]map[string]any dungeonMapCache map[string]map[string]any questStatus map[string]map[string]string lastKills map[string]int // "session_id|character_name" -> kills combatLastSession map[string]map[string]any // "char:session_id" -> last cumulative session combatLifetimeCache map[string]map[string]any // character_name -> accumulated lifetime vitalSubscribers map[string]bool vitalPeerState map[string]map[string]any plugins *pluginRegistry // for share_* fan-out + plugin_connected status invFwd *invForwarder // inventory forwarding (cutover only; nil in shadow/read) aclog *aclogPoster // death/idle Discord alerts (cutover only; nil otherwise) } func newIngestor(pool *pgxpool.Pool, log *slog.Logger, broadcast func(map[string]any), plugins *pluginRegistry) *Ingestor { return &Ingestor{ pool: pool, log: log, broadcast: broadcast, plugins: plugins, liveSnapshots: map[string]map[string]any{}, liveVitals: map[string]map[string]any{}, liveCharacterStats: map[string]map[string]any{}, liveEquipmentCantrip: map[string]map[string]any{}, liveNearbyObjects: map[string]map[string]any{}, liveCombatStats: map[string]map[string]any{}, dungeonMapCache: map[string]map[string]any{}, questStatus: map[string]map[string]string{}, lastKills: map[string]int{}, combatLastSession: map[string]map[string]any{}, combatLifetimeCache: map[string]map[string]any{}, vitalSubscribers: map[string]bool{}, vitalPeerState: map[string]map[string]any{}, } } // dispatch routes a parsed message to the right handler. Over /ws/position the // discriminator is the "type" field; over the /ws/live broadcast, telemetry has // NO type (it's the raw snapshot), so we also match it by shape. func (i *Ingestor) dispatch(ctx context.Context, data map[string]any) { t := toStr(data["type"]) switch { case t == "telemetry" || (t == "" && hasTelemetryShape(data)): i.handleTelemetry(ctx, data) // Python broadcasts telemetry as a TYPELESS snapshot (snap.dict()); the // browser intentionally ignores typeless messages (useLiveData drops // `if (!msg.type) return`) and takes player data from the 5s /live poll // instead. Broadcasting it WITH a type makes the UI overwrite the // /live-derived telemetry (which has total_kills/total_rares/session_rares) // with the raw plugin payload (which lacks them), flapping those counters // 0<->value. Strip the type to match. if i.broadcast != nil { i.broadcast(stripType(data)) } return case t == "rare": i.handleRare(ctx, data) case t == "portal": i.handlePortal(ctx, data) case t == "character_stats": i.handleCharacterStats(ctx, data) case t == "spawn": i.handleSpawn(ctx, data) case t == "vitals": i.handleVitals(data) case t == "quest": i.handleQuest(data) case t == "equipment_cantrip_state": i.handleEquipmentCantrip(data) case t == "nearby_objects": i.handleNearbyObjects(data) case t == "dungeon_map": i.handleDungeonMap(data) case t == "combat_stats": i.handleCombatStats(ctx, data) case t == "full_inventory": // Forward the full snapshot to the inventory service; not browser-broadcast. if i.invFwd != nil { i.invFwd.forwardFullInventory(data) } return case t == "inventory_delta": // Fire-and-forget forward; the forwarder broadcasts the enriched delta. if i.invFwd != nil { i.invFwd.handleInventoryDelta(data) } return case t == "share_subscribe": i.handleShareSubscribe(data) case t == "share_unsubscribe": i.handleShareUnsubscribe(data) return // unsubscribe broadcasts its own share_peer_removed; don't re-broadcast case strings.HasPrefix(t, "share_"): i.handleShareUpdate(t, data) case t == "register": // no DB / no broadcast; plugin_conns belongs to the /ws/position server case t == "chat": // broadcast-only } if i.broadcast != nil { i.broadcast(data) } } // stripType returns a shallow copy of the message without its "type" key, so the // browser treats it as a typeless snapshot (and ignores it, deferring to /live). func stripType(data map[string]any) map[string]any { cp := make(map[string]any, len(data)) for k, v := range data { if k != "type" { cp[k] = v } } return cp } func hasTelemetryShape(d map[string]any) bool { _, a := d["session_id"] _, b := d["ew"] _, c := d["kills"] return a && b && c } // --- telemetry: INSERT telemetry_events + kill-delta into char_stats (main.py:3124) --- const insTelemetry = `INSERT INTO telemetry_events (character_name,char_tag,session_id,timestamp,ew,ns,z,kills,kills_per_hour,onlinetime, deaths,total_deaths,rares_found,prismatic_taper_count,vt_state,mem_mb,cpu_pct,mem_handles,latency_ms,received_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,0,$13,$14,$15,$16,$17,$18,$19)` const upsertCharKills = `INSERT INTO char_stats (character_name,total_kills) VALUES ($1,$2) ON CONFLICT (character_name) DO UPDATE SET total_kills = char_stats.total_kills + $2` func (i *Ingestor) handleTelemetry(ctx context.Context, data map[string]any) { name := toStr(data["character_name"]) sessionID := toStr(data["session_id"]) if name == "" || sessionID == "" { return } kills := toInt(data["kills"]) received := time.Now().UTC() key := sessionID + "|" + name i.mu.RLock() last, ok := i.lastKills[key] i.mu.RUnlock() if !ok { if row, err := queryRowAsMap(ctx, i.pool, `SELECT kills FROM telemetry_events WHERE character_name=$1 AND session_id=$2 ORDER BY timestamp DESC LIMIT 1`, name, sessionID); err == nil && row != nil { last = toInt(row["kills"]) } } delta := kills - last tx, err := i.pool.Begin(ctx) if err != nil { i.log.Error("telemetry tx begin failed", "err", err) return } defer tx.Rollback(ctx) if _, err := tx.Exec(ctx, insTelemetry, name, nstr(data["char_tag"]), sessionID, parseTSAny(data["timestamp"]), toFloat(data["ew"]), toFloat(data["ns"]), toFloat(data["z"]), kills, nfloat(data["kills_per_hour"]), nstr(data["onlinetime"]), toInt(data["deaths"]), nint(data["total_deaths"]), toInt(data["prismatic_taper_count"]), nstr(data["vt_state"]), nfloat(data["mem_mb"]), nfloat(data["cpu_pct"]), nint(data["mem_handles"]), nfloat(data["latency_ms"]), received, ); err != nil { i.log.Error("telemetry insert failed", "err", err, "char", name) return } if delta > 0 { if _, err := tx.Exec(ctx, upsertCharKills, name, delta); err != nil { i.log.Error("char_stats upsert failed", "err", err, "char", name) return } } if err := tx.Commit(ctx); err != nil { i.log.Error("telemetry commit failed", "err", err, "char", name) return } i.mu.Lock() i.lastKills[key] = kills i.liveSnapshots[name] = data i.mu.Unlock() } // --- rare: rare_stats + rare_stats_sessions + rare_events (main.py:3234) --- const upsertRareStats = `INSERT INTO rare_stats (character_name,total_rares) VALUES ($1,1) ON CONFLICT (character_name) DO UPDATE SET total_rares = rare_stats.total_rares + 1` const upsertRareSession = `INSERT INTO rare_stats_sessions (character_name,session_id,session_rares) VALUES ($1,$2,1) ON CONFLICT (character_name,session_id) DO UPDATE SET session_rares = rare_stats_sessions.session_rares + 1` const insRareEvent = `INSERT INTO rare_events (character_name,name,timestamp,ew,ns,z) VALUES ($1,$2,$3,$4,$5,$6)` func (i *Ingestor) handleRare(ctx context.Context, data map[string]any) { name := toStr(data["character_name"]) if strings.TrimSpace(name) == "" { return } if _, err := i.pool.Exec(ctx, upsertRareStats, name); err != nil { i.log.Error("rare_stats upsert failed", "err", err, "char", name) return } // Session id: live snapshot first, else latest telemetry row. i.mu.RLock() sessionID := toStr(i.liveSnapshots[name]["session_id"]) i.mu.RUnlock() if sessionID == "" { if row, err := queryRowAsMap(ctx, i.pool, `SELECT session_id FROM telemetry_events WHERE character_name=$1 ORDER BY timestamp DESC LIMIT 1`, name); err == nil && row != nil { sessionID = toStr(row["session_id"]) } } if sessionID != "" { if _, err := i.pool.Exec(ctx, upsertRareSession, name, sessionID); err != nil { i.log.Error("rare_stats_sessions upsert failed", "err", err, "char", name) } } if _, err := i.pool.Exec(ctx, insRareEvent, name, toStr(data["name"]), parseTSAny(data["timestamp"]), toFloat(data["ew"]), toFloat(data["ns"]), toFloatOr(data["z"], 0), ); err != nil { i.log.Error("rare_events insert failed", "err", err, "char", name) } } // --- portal: upsert on rounded coords (main.py:3567) --- const upsertPortal = `INSERT INTO portals (portal_name,ns,ew,z,discovered_at,discovered_by) VALUES ($1,$2,$3,$4,$5,$6) ON CONFLICT (ROUND(ns::numeric,1), ROUND(ew::numeric,1)) DO UPDATE SET discovered_at = EXCLUDED.discovered_at, discovered_by = EXCLUDED.discovered_by, portal_name = EXCLUDED.portal_name` func (i *Ingestor) handlePortal(ctx context.Context, data map[string]any) { name := toStr(data["character_name"]) portalName := toStr(data["portal_name"]) ts := data["timestamp"] if name == "" || portalName == "" || data["ns"] == nil || data["ew"] == nil || data["z"] == nil || ts == nil { return } if _, err := i.pool.Exec(ctx, upsertPortal, portalName, toFloat(data["ns"]), toFloat(data["ew"]), toFloat(data["z"]), parseTSAny(ts), name, ); err != nil { i.log.Error("portal upsert failed", "err", err, "char", name) } } // --- character_stats: build stats_data subset + upsert (main.py:3443) --- const upsertCharacterStats = `INSERT INTO character_stats (character_name,timestamp,level,total_xp,unassigned_xp,luminance_earned,luminance_total,deaths,stats_data) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) ON CONFLICT (character_name) DO UPDATE SET timestamp=EXCLUDED.timestamp, level=EXCLUDED.level, total_xp=EXCLUDED.total_xp, unassigned_xp=EXCLUDED.unassigned_xp, luminance_earned=EXCLUDED.luminance_earned, luminance_total=EXCLUDED.luminance_total, deaths=EXCLUDED.deaths, stats_data=EXCLUDED.stats_data` var statsDataKeys = []string{ "attributes", "vitals", "skills", "allegiance", "active_item_enchantments", "race", "gender", "birth", "current_title", "skill_credits", "burden", "burden_units", "encumbrance_capacity", "properties", "titles", } func (i *Ingestor) handleCharacterStats(ctx context.Context, data map[string]any) { name := toStr(data["character_name"]) if name == "" { return } statsData := map[string]any{} for _, k := range statsDataKeys { if v, ok := data[k]; ok && v != nil { statsData[k] = v } } sdJSON, _ := json.Marshal(statsData) if _, err := i.pool.Exec(ctx, upsertCharacterStats, name, parseTSAny(data["timestamp"]), nint(data["level"]), nint(data["total_xp"]), nint(data["unassigned_xp"]), nint(data["luminance_earned"]), nint(data["luminance_total"]), nint(data["deaths"]), sdJSON, ); err != nil { i.log.Error("character_stats upsert failed", "err", err, "char", name) return } i.mu.Lock() i.liveCharacterStats[name] = data i.mu.Unlock() } // --- spawn: INSERT spawn_events (main.py:3110). Not broadcast, so only the real // /ws/position path feeds this; covered by ingest_test.go. --- const insSpawn = `INSERT INTO spawn_events (character_name,mob,timestamp,ew,ns,z) VALUES ($1,$2,$3,$4,$5,$6)` func (i *Ingestor) handleSpawn(ctx context.Context, data map[string]any) { name := toStr(data["character_name"]) mob := toStr(data["mob"]) if name == "" || mob == "" { return } if _, err := i.pool.Exec(ctx, insSpawn, name, mob, parseTSAny(data["timestamp"]), toFloat(data["ew"]), toFloat(data["ns"]), toFloatOr(data["z"], 0), ); err != nil { i.log.Error("spawn insert failed", "err", err, "char", name) } } // --- memory-only handlers --- func (i *Ingestor) handleVitals(data map[string]any) { name := toStr(data["character_name"]) if name == "" { return } // Death detection (main.py:3419): vitae crossing 0 -> >0. Only in cutover // (i.aclog != nil); in shadow mode it stays off to avoid duplicating the // production alert. if i.aclog != nil { i.mu.RLock() prev := i.liveVitals[name] i.mu.RUnlock() var prevVitae float64 if prev != nil { prevVitae = toFloat(prev["vitae"]) } if newVitae := toFloat(data["vitae"]); prevVitae == 0 && newVitae > 0 { i.aclog.maybeDeath(name, newVitae) } } i.mu.Lock() i.liveVitals[name] = data i.mu.Unlock() } var allowedQuests = map[string]bool{ "Stipend Collection Timer": true, "Blank Augmentation Gem Pickup Timer": true, "Insatiable Eater Jaw": true, } func (i *Ingestor) handleQuest(data map[string]any) { name := toStr(data["character_name"]) quest := toStr(data["quest_name"]) countdown, ok := data["countdown"] if name == "" || quest == "" || !ok || countdown == nil || !allowedQuests[quest] { return } i.mu.Lock() if i.questStatus[name] == nil { i.questStatus[name] = map[string]string{} } i.questStatus[name][quest] = toStr(countdown) i.mu.Unlock() } func (i *Ingestor) handleEquipmentCantrip(data map[string]any) { name := toStr(data["character_name"]) if name == "" { return } i.mu.Lock() i.liveEquipmentCantrip[name] = data i.mu.Unlock() } // clearEquipmentCantrip drops a character's cantrip overlay on plugin register // (main.py:3106). func (i *Ingestor) clearEquipmentCantrip(name string) { i.mu.Lock() delete(i.liveEquipmentCantrip, name) i.mu.Unlock() } func (i *Ingestor) handleNearbyObjects(data map[string]any) { name := toStr(data["character_name"]) if name == "" { return } i.mu.Lock() i.liveNearbyObjects[name] = data i.mu.Unlock() } func (i *Ingestor) handleDungeonMap(data map[string]any) { lb := toStr(data["landblock"]) if lb == "" { return } i.mu.Lock() i.dungeonMapCache[lb] = data i.mu.Unlock() } // --- read-side overlay accessors (used by the HTTP handlers when an ingestor // is present, mirroring Python's "live cache first, DB fallback") --- func (i *Ingestor) snapshot(m map[string]map[string]any, name string) (map[string]any, bool) { i.mu.RLock() defer i.mu.RUnlock() v, ok := m[name] return v, ok } func (i *Ingestor) getCharacterStats(name string) (map[string]any, bool) { return i.snapshot(i.liveCharacterStats, name) } func (i *Ingestor) getEquipmentCantrip(name string) (map[string]any, bool) { return i.snapshot(i.liveEquipmentCantrip, name) } func (i *Ingestor) getCombatStats(name string) (map[string]any, bool) { return i.snapshot(i.liveCombatStats, name) } func (i *Ingestor) allCombatStats() map[string]map[string]any { i.mu.RLock() defer i.mu.RUnlock() out := make(map[string]map[string]any, len(i.liveCombatStats)) for k, v := range i.liveCombatStats { out[k] = v } return out } func (i *Ingestor) questData() (map[string]map[string]string, int) { i.mu.RLock() defer i.mu.RUnlock() out := make(map[string]map[string]string, len(i.questStatus)) for c, qs := range i.questStatus { cp := make(map[string]string, len(qs)) for k, v := range qs { cp[k] = v } out[c] = cp } return out, len(i.questStatus) } // --- small value helpers (JSON numbers decode as float64) --- func nstr(v any) any { if s, ok := v.(string); ok { return s } return nil } // nint/nfloat return a typed number or nil (for nullable columns), coercing // string-encoded numbers the plugin sends (see coerceNum). func nint(v any) any { if f, ok := coerceNum(v); ok { return int64(f) } return nil } func nfloat(v any) any { if f, ok := coerceNum(v); ok { return f } return nil } func toFloatOr(v any, def float64) float64 { if f, ok := coerceNum(v); ok { return f } return def } func parseTSAny(v any) time.Time { s, ok := v.(string) if !ok { return time.Now().UTC() } s = strings.Replace(s, "Z", "+00:00", 1) for _, l := range []string{ time.RFC3339Nano, time.RFC3339, "2006-01-02T15:04:05.999999-07:00", "2006-01-02T15:04:05-07:00", "2006-01-02T15:04:05.999999", "2006-01-02T15:04:05", } { if t, err := time.Parse(l, s); err == nil { return t } } return time.Now().UTC() }