package main import ( "bytes" "encoding/json" "fmt" "io" "log/slog" "net/http" "net/url" "strings" "sync" "time" ) // invForwarder forwards plugin inventory events to the inventory service, // porting main.py's _forward_to_inventory_service / _handle_inventory_delta. // Only active in cutover (write) mode; nil in shadow/read-only mode, where the // plugin firehose never carries inventory anyway. // // full_inventory -> POST {url}/process-inventory (full replace) // inventory_delta add/update -> POST {url}/inventory/{char}/item // inventory_delta remove -> DELETE {url}/inventory/{char}/item/{item_id} // // Deltas are fire-and-forget (never block the /ws/position read loop), serialized // per-character (so a char's rapid deltas don't race the inventory DELETE+INSERT), // and globally capped at 8 concurrent forwards. type invForwarder struct { url string client *http.Client sem chan struct{} mu sync.Mutex locks map[string]*sync.Mutex log *slog.Logger broadcast func(map[string]any) } func newInvForwarder(rawURL string, log *slog.Logger, broadcast func(map[string]any)) *invForwarder { return &invForwarder{ url: strings.TrimRight(rawURL, "/"), client: &http.Client{Timeout: 30 * time.Second}, sem: make(chan struct{}, 8), locks: map[string]*sync.Mutex{}, log: log, broadcast: broadcast, } } func (f *invForwarder) charLock(name string) *sync.Mutex { f.mu.Lock() defer f.mu.Unlock() l := f.locks[name] if l == nil { l = &sync.Mutex{} f.locks[name] = l } return l } // forwardFullInventory POSTs a full inventory snapshot (full replace). Runs // inline on the /ws/position handler — main.py awaits _store_inventory too. func (f *invForwarder) forwardFullInventory(data map[string]any) { char := toStr(data["character_name"]) body, _ := json.Marshal(map[string]any{ "character_name": char, "timestamp": data["timestamp"], "items": data["items"], }) resp, err := f.client.Post(f.url+"/process-inventory", "application/json", bytes.NewReader(body)) if err != nil { f.log.Error("full_inventory forward failed", "err", err, "char", char) return } defer drain(resp) if resp.StatusCode >= 400 { f.log.Warn("inventory service error (full_inventory)", "status", resp.StatusCode, "char", char) } } // handleInventoryDelta forwards a single add/update/remove. Fire-and-forget. func (f *invForwarder) handleInventoryDelta(data map[string]any) { go func() { char := toStr(data["character_name"]) lock := f.charLock(char) lock.Lock() defer lock.Unlock() f.sem <- struct{}{} defer func() { <-f.sem }() out := data switch toStr(data["action"]) { case "remove": if itemID := data["item_id"]; itemID != nil { req, _ := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/inventory/%s/item/%v", f.url, url.PathEscape(char), itemID), nil) if resp, err := f.client.Do(req); err != nil { f.log.Warn("inventory delta remove failed", "err", err, "char", char) } else { if resp.StatusCode >= 400 { f.log.Warn("inventory service error (delta remove)", "status", resp.StatusCode, "char", char) } drain(resp) } } case "add", "update": if item := data["item"]; item != nil { b, _ := json.Marshal(item) resp, err := f.client.Post(fmt.Sprintf("%s/inventory/%s/item", f.url, url.PathEscape(char)), "application/json", bytes.NewReader(b)) if err != nil { f.log.Warn("inventory delta add/update failed", "err", err, "char", char) } else { if resp.StatusCode < 400 { // Re-broadcast the enriched item the service returns. var r map[string]any if json.NewDecoder(resp.Body).Decode(&r) == nil { if enriched, ok := r["item"]; ok && enriched != nil { out = map[string]any{ "type": "inventory_delta", "action": toStr(data["action"]), "character_name": char, "item": enriched, } } } } else { f.log.Warn("inventory service error (delta add/update)", "status", resp.StatusCode, "char", char) } drain(resp) } } } if f.broadcast != nil { f.broadcast(out) } }() } func drain(resp *http.Response) { if resp != nil && resp.Body != nil { _, _ = io.Copy(io.Discard, resp.Body) _ = resp.Body.Close() } }