MosswartOverlord/go-services/inventory-go/ingest.go
Erik c49b81c237 feat(go-services): inventory-go Phase C — ingestion (validated, isolated DB)
Wires the validated item-processor into the ingestion endpoints, writing to an
isolated inventory-go-db (never production):
- schema.go: faithful 7-table replica of inventory-service/database.py.
- ingest.go: /process-inventory (full replace), POST/DELETE single item, with the
  exact delete-then-insert flow, dynamic INSERT builder (quotes reserved "unique"),
  spell union (is_active), and item_raw_data verbatim. enhancements always inserts.
- compose: isolated inventory-go-db (postgres:14, 127.0.0.1:5435) + read-write
  inventory-go-shadow (:8773) that owns it; schema init on boot.

Validated by ingesting a recently-ingested character's items (from production's
original_json) into the shadow DB and diffing vs production: byte-identical —
items 243, combat 243, enhancements 243, ratings 6, requirements 19, spells 52
all match; 0 per-column mismatches across 243 items.

Finding: older production normalized rows can be STALE (predate the code reading
Decal keys 218103832/218103835); Go matches the CURRENT Python code, so validate
ingestion against recently-ingested characters.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-24 12:42:26 +02:00

266 lines
7.8 KiB
Go

package main
import (
"context"
"encoding/json"
"io"
"net/http"
"sort"
"strconv"
"strings"
"time"
"github.com/jackc/pgx/v5"
)
// Ingestion endpoints — port of process_inventory / upsert_inventory_item /
// delete_inventory_item. They write to THIS instance's own DB (ingest mode,
// READ_ONLY=false), reusing the validated item-processor. Production is never
// written: an isolated inventory-go-db backs the shadow instance.
func quoteCol(k string) string {
if k == "unique" {
return `"unique"`
}
return k
}
func buildInsert(table string, cols map[string]any, returningID bool) (string, []any) {
keys := make([]string, 0, len(cols))
for k := range cols {
keys = append(keys, k)
}
sort.Strings(keys)
qc := make([]string, len(keys))
ph := make([]string, len(keys))
args := make([]any, len(keys))
for i, k := range keys {
qc[i] = quoteCol(k)
ph[i] = "$" + strconv.Itoa(i+1)
args[i] = cols[k]
}
sql := "INSERT INTO " + table + " (" + strings.Join(qc, ", ") + ") VALUES (" + strings.Join(ph, ", ") + ")"
if returningID {
sql += " RETURNING id"
}
return sql, args
}
var childTables = []struct{ table, key string }{
{"item_combat_stats", "combat"},
{"item_requirements", "requirements"},
{"item_enhancements", "enhancements"},
{"item_ratings", "ratings"},
}
// ingestItem processes one raw item and inserts it across the 7 tables.
func (s *Server) ingestItem(ctx context.Context, tx pgx.Tx, charName string, ts time.Time, raw map[string]any) error {
p := s.processItem(raw)
items := p["items"].(map[string]any)
items["character_name"] = charName
items["timestamp"] = ts
sql, args := buildInsert("items", items, true)
var id int
if err := tx.QueryRow(ctx, sql, args...).Scan(&id); err != nil {
return err
}
for _, ct := range childTables {
cols, _ := p[ct.key].(map[string]any)
if cols == nil {
continue // table skipped (all-sentinel)
}
cols["item_id"] = id
csql, cargs := buildInsert(ct.table, cols, false)
if _, err := tx.Exec(ctx, csql, cargs...); err != nil {
return err
}
}
if rows, ok := p["spells"].([]map[string]any); ok {
for _, sp := range rows {
if _, err := tx.Exec(ctx,
"INSERT INTO item_spells (item_id, spell_id, is_active) VALUES ($1,$2,$3) ON CONFLICT DO NOTHING",
id, sp["spell_id"], sp["is_active"]); err != nil {
return err
}
}
}
ivb, _ := json.Marshal(bag(raw, "IntValues"))
dvb, _ := json.Marshal(bag(raw, "DoubleValues"))
svb, _ := json.Marshal(bag(raw, "StringValues"))
bvb, _ := json.Marshal(bag(raw, "BoolValues"))
ojb, _ := json.Marshal(raw)
_, err := tx.Exec(ctx,
"INSERT INTO item_raw_data (item_id,int_values,double_values,string_values,bool_values,original_json) VALUES ($1,$2,$3,$4,$5,$6)",
id, ivb, dvb, svb, bvb, ojb)
return err
}
// deleteCharItems removes a character's rows across all tables (children first).
func deleteCharItems(ctx context.Context, tx pgx.Tx, charName string) error {
var ids []int
rows, err := tx.Query(ctx, "SELECT id FROM items WHERE character_name=$1", charName)
if err != nil {
return err
}
for rows.Next() {
var id int
if err := rows.Scan(&id); err != nil {
rows.Close()
return err
}
ids = append(ids, id)
}
rows.Close()
if len(ids) > 0 {
for _, t := range []string{"item_raw_data", "item_combat_stats", "item_requirements", "item_enhancements", "item_ratings", "item_spells"} {
if _, err := tx.Exec(ctx, "DELETE FROM "+t+" WHERE item_id = ANY($1)", ids); err != nil {
return err
}
}
}
_, err = tx.Exec(ctx, "DELETE FROM items WHERE character_name=$1", charName)
return err
}
func deleteOneItem(ctx context.Context, tx pgx.Tx, charName string, itemID int64) error {
var id int
err := tx.QueryRow(ctx, "SELECT id FROM items WHERE character_name=$1 AND item_id=$2", charName, itemID).Scan(&id)
if err == pgx.ErrNoRows {
return nil
}
if err != nil {
return err
}
for _, t := range []string{"item_raw_data", "item_combat_stats", "item_requirements", "item_enhancements", "item_ratings", "item_spells"} {
if _, err := tx.Exec(ctx, "DELETE FROM "+t+" WHERE item_id=$1", id); err != nil {
return err
}
}
_, err = tx.Exec(ctx, "DELETE FROM items WHERE id=$1", id)
return err
}
// POST /process-inventory — full replacement of a character's inventory.
func (s *Server) handleProcessInventory(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(io.LimitReader(r.Body, 64<<20))
var inv struct {
CharacterName string `json:"character_name"`
Timestamp string `json:"timestamp"`
Items []map[string]any `json:"items"`
}
if json.Unmarshal(body, &inv) != nil || inv.CharacterName == "" {
writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "invalid payload"})
return
}
ts := parseNaiveTime(inv.Timestamp)
ctx, cancel := context.WithTimeout(r.Context(), 120*time.Second)
defer cancel()
tx, err := s.pool.Begin(ctx)
if err != nil {
s.dbErr(w, "process-inventory begin", err)
return
}
defer tx.Rollback(ctx)
if err := deleteCharItems(ctx, tx, inv.CharacterName); err != nil {
s.dbErr(w, "process-inventory delete", err)
return
}
processed, errs := 0, 0
for _, raw := range inv.Items {
if raw["Id"] == nil && raw["id"] == nil {
errs++
continue
}
if err := s.ingestItem(ctx, tx, inv.CharacterName, ts, raw); err != nil {
s.log.Error("ingest item failed", "err", err, "char", inv.CharacterName)
errs++
continue
}
processed++
}
if err := tx.Commit(ctx); err != nil {
s.dbErr(w, "process-inventory commit", err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"processed_count": processed, "error_count": errs, "total_items": len(inv.Items)})
}
// POST /inventory/{character_name}/item — single-item upsert.
func (s *Server) handleUpsertItem(w http.ResponseWriter, r *http.Request) {
char := r.PathValue("character_name")
body, _ := io.ReadAll(io.LimitReader(r.Body, 16<<20))
var raw map[string]any
if json.Unmarshal(body, &raw) != nil {
writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "invalid JSON"})
return
}
if raw["Id"] == nil && raw["id"] == nil {
writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "item missing Id"})
return
}
itemID := int64(toFloat(firstNonNil(raw["Id"], raw["id"])))
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
tx, err := s.pool.Begin(ctx)
if err != nil {
s.dbErr(w, "upsert begin", err)
return
}
defer tx.Rollback(ctx)
if err := deleteOneItem(ctx, tx, char, itemID); err != nil {
s.dbErr(w, "upsert delete", err)
return
}
if err := s.ingestItem(ctx, tx, char, time.Now().UTC(), raw); err != nil {
s.dbErr(w, "upsert insert", err)
return
}
if err := tx.Commit(ctx); err != nil {
s.dbErr(w, "upsert commit", err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "item_id": itemID})
}
// DELETE /inventory/{character_name}/item/{item_id}
func (s *Server) handleDeleteItem(w http.ResponseWriter, r *http.Request) {
char := r.PathValue("character_name")
itemID, _ := strconv.ParseInt(r.PathValue("item_id"), 10, 64)
ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
defer cancel()
tx, err := s.pool.Begin(ctx)
if err != nil {
s.dbErr(w, "delete begin", err)
return
}
defer tx.Rollback(ctx)
if err := deleteOneItem(ctx, tx, char, itemID); err != nil {
s.dbErr(w, "delete", err)
return
}
if err := tx.Commit(ctx); err != nil {
s.dbErr(w, "delete commit", err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "deleted", "item_id": itemID})
}
func parseNaiveTime(s string) time.Time {
if s == "" {
return time.Now().UTC()
}
s = strings.Replace(s, "Z", "+00:00", 1)
for _, l := range []string{time.RFC3339Nano, time.RFC3339, "2006-01-02T15:04:05.999999", "2006-01-02T15:04:05"} {
if t, err := time.Parse(l, s); err == nil {
return t.UTC()
}
}
return time.Now().UTC()
}
func firstNonNil(a, b any) any {
if a != nil {
return a
}
return b
}