Wires the validated item-processor into the ingestion endpoints, writing to an isolated inventory-go-db (never production): - schema.go: faithful 7-table replica of inventory-service/database.py. - ingest.go: /process-inventory (full replace), POST/DELETE single item, with the exact delete-then-insert flow, dynamic INSERT builder (quotes reserved "unique"), spell union (is_active), and item_raw_data verbatim. enhancements always inserts. - compose: isolated inventory-go-db (postgres:14, 127.0.0.1:5435) + read-write inventory-go-shadow (:8773) that owns it; schema init on boot. Validated by ingesting a recently-ingested character's items (from production's original_json) into the shadow DB and diffing vs production: byte-identical — items 243, combat 243, enhancements 243, ratings 6, requirements 19, spells 52 all match; 0 per-column mismatches across 243 items. Finding: older production normalized rows can be STALE (predate the code reading Decal keys 218103832/218103835); Go matches the CURRENT Python code, so validate ingestion against recently-ingested characters. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
266 lines
7.8 KiB
Go
266 lines
7.8 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"net/http"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
// Ingestion endpoints — port of process_inventory / upsert_inventory_item /
|
|
// delete_inventory_item. They write to THIS instance's own DB (ingest mode,
|
|
// READ_ONLY=false), reusing the validated item-processor. Production is never
|
|
// written: an isolated inventory-go-db backs the shadow instance.
|
|
|
|
func quoteCol(k string) string {
|
|
if k == "unique" {
|
|
return `"unique"`
|
|
}
|
|
return k
|
|
}
|
|
|
|
func buildInsert(table string, cols map[string]any, returningID bool) (string, []any) {
|
|
keys := make([]string, 0, len(cols))
|
|
for k := range cols {
|
|
keys = append(keys, k)
|
|
}
|
|
sort.Strings(keys)
|
|
qc := make([]string, len(keys))
|
|
ph := make([]string, len(keys))
|
|
args := make([]any, len(keys))
|
|
for i, k := range keys {
|
|
qc[i] = quoteCol(k)
|
|
ph[i] = "$" + strconv.Itoa(i+1)
|
|
args[i] = cols[k]
|
|
}
|
|
sql := "INSERT INTO " + table + " (" + strings.Join(qc, ", ") + ") VALUES (" + strings.Join(ph, ", ") + ")"
|
|
if returningID {
|
|
sql += " RETURNING id"
|
|
}
|
|
return sql, args
|
|
}
|
|
|
|
var childTables = []struct{ table, key string }{
|
|
{"item_combat_stats", "combat"},
|
|
{"item_requirements", "requirements"},
|
|
{"item_enhancements", "enhancements"},
|
|
{"item_ratings", "ratings"},
|
|
}
|
|
|
|
// ingestItem processes one raw item and inserts it across the 7 tables.
|
|
func (s *Server) ingestItem(ctx context.Context, tx pgx.Tx, charName string, ts time.Time, raw map[string]any) error {
|
|
p := s.processItem(raw)
|
|
items := p["items"].(map[string]any)
|
|
items["character_name"] = charName
|
|
items["timestamp"] = ts
|
|
sql, args := buildInsert("items", items, true)
|
|
var id int
|
|
if err := tx.QueryRow(ctx, sql, args...).Scan(&id); err != nil {
|
|
return err
|
|
}
|
|
for _, ct := range childTables {
|
|
cols, _ := p[ct.key].(map[string]any)
|
|
if cols == nil {
|
|
continue // table skipped (all-sentinel)
|
|
}
|
|
cols["item_id"] = id
|
|
csql, cargs := buildInsert(ct.table, cols, false)
|
|
if _, err := tx.Exec(ctx, csql, cargs...); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if rows, ok := p["spells"].([]map[string]any); ok {
|
|
for _, sp := range rows {
|
|
if _, err := tx.Exec(ctx,
|
|
"INSERT INTO item_spells (item_id, spell_id, is_active) VALUES ($1,$2,$3) ON CONFLICT DO NOTHING",
|
|
id, sp["spell_id"], sp["is_active"]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
ivb, _ := json.Marshal(bag(raw, "IntValues"))
|
|
dvb, _ := json.Marshal(bag(raw, "DoubleValues"))
|
|
svb, _ := json.Marshal(bag(raw, "StringValues"))
|
|
bvb, _ := json.Marshal(bag(raw, "BoolValues"))
|
|
ojb, _ := json.Marshal(raw)
|
|
_, err := tx.Exec(ctx,
|
|
"INSERT INTO item_raw_data (item_id,int_values,double_values,string_values,bool_values,original_json) VALUES ($1,$2,$3,$4,$5,$6)",
|
|
id, ivb, dvb, svb, bvb, ojb)
|
|
return err
|
|
}
|
|
|
|
// deleteCharItems removes a character's rows across all tables (children first).
|
|
func deleteCharItems(ctx context.Context, tx pgx.Tx, charName string) error {
|
|
var ids []int
|
|
rows, err := tx.Query(ctx, "SELECT id FROM items WHERE character_name=$1", charName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for rows.Next() {
|
|
var id int
|
|
if err := rows.Scan(&id); err != nil {
|
|
rows.Close()
|
|
return err
|
|
}
|
|
ids = append(ids, id)
|
|
}
|
|
rows.Close()
|
|
if len(ids) > 0 {
|
|
for _, t := range []string{"item_raw_data", "item_combat_stats", "item_requirements", "item_enhancements", "item_ratings", "item_spells"} {
|
|
if _, err := tx.Exec(ctx, "DELETE FROM "+t+" WHERE item_id = ANY($1)", ids); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
_, err = tx.Exec(ctx, "DELETE FROM items WHERE character_name=$1", charName)
|
|
return err
|
|
}
|
|
|
|
func deleteOneItem(ctx context.Context, tx pgx.Tx, charName string, itemID int64) error {
|
|
var id int
|
|
err := tx.QueryRow(ctx, "SELECT id FROM items WHERE character_name=$1 AND item_id=$2", charName, itemID).Scan(&id)
|
|
if err == pgx.ErrNoRows {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, t := range []string{"item_raw_data", "item_combat_stats", "item_requirements", "item_enhancements", "item_ratings", "item_spells"} {
|
|
if _, err := tx.Exec(ctx, "DELETE FROM "+t+" WHERE item_id=$1", id); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
_, err = tx.Exec(ctx, "DELETE FROM items WHERE id=$1", id)
|
|
return err
|
|
}
|
|
|
|
// POST /process-inventory — full replacement of a character's inventory.
|
|
func (s *Server) handleProcessInventory(w http.ResponseWriter, r *http.Request) {
|
|
body, _ := io.ReadAll(io.LimitReader(r.Body, 64<<20))
|
|
var inv struct {
|
|
CharacterName string `json:"character_name"`
|
|
Timestamp string `json:"timestamp"`
|
|
Items []map[string]any `json:"items"`
|
|
}
|
|
if json.Unmarshal(body, &inv) != nil || inv.CharacterName == "" {
|
|
writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "invalid payload"})
|
|
return
|
|
}
|
|
ts := parseNaiveTime(inv.Timestamp)
|
|
ctx, cancel := context.WithTimeout(r.Context(), 120*time.Second)
|
|
defer cancel()
|
|
tx, err := s.pool.Begin(ctx)
|
|
if err != nil {
|
|
s.dbErr(w, "process-inventory begin", err)
|
|
return
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
if err := deleteCharItems(ctx, tx, inv.CharacterName); err != nil {
|
|
s.dbErr(w, "process-inventory delete", err)
|
|
return
|
|
}
|
|
processed, errs := 0, 0
|
|
for _, raw := range inv.Items {
|
|
if raw["Id"] == nil && raw["id"] == nil {
|
|
errs++
|
|
continue
|
|
}
|
|
if err := s.ingestItem(ctx, tx, inv.CharacterName, ts, raw); err != nil {
|
|
s.log.Error("ingest item failed", "err", err, "char", inv.CharacterName)
|
|
errs++
|
|
continue
|
|
}
|
|
processed++
|
|
}
|
|
if err := tx.Commit(ctx); err != nil {
|
|
s.dbErr(w, "process-inventory commit", err)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"processed_count": processed, "error_count": errs, "total_items": len(inv.Items)})
|
|
}
|
|
|
|
// POST /inventory/{character_name}/item — single-item upsert.
|
|
func (s *Server) handleUpsertItem(w http.ResponseWriter, r *http.Request) {
|
|
char := r.PathValue("character_name")
|
|
body, _ := io.ReadAll(io.LimitReader(r.Body, 16<<20))
|
|
var raw map[string]any
|
|
if json.Unmarshal(body, &raw) != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "invalid JSON"})
|
|
return
|
|
}
|
|
if raw["Id"] == nil && raw["id"] == nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "item missing Id"})
|
|
return
|
|
}
|
|
itemID := int64(toFloat(firstNonNil(raw["Id"], raw["id"])))
|
|
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
|
|
defer cancel()
|
|
tx, err := s.pool.Begin(ctx)
|
|
if err != nil {
|
|
s.dbErr(w, "upsert begin", err)
|
|
return
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
if err := deleteOneItem(ctx, tx, char, itemID); err != nil {
|
|
s.dbErr(w, "upsert delete", err)
|
|
return
|
|
}
|
|
if err := s.ingestItem(ctx, tx, char, time.Now().UTC(), raw); err != nil {
|
|
s.dbErr(w, "upsert insert", err)
|
|
return
|
|
}
|
|
if err := tx.Commit(ctx); err != nil {
|
|
s.dbErr(w, "upsert commit", err)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "item_id": itemID})
|
|
}
|
|
|
|
// DELETE /inventory/{character_name}/item/{item_id}
|
|
func (s *Server) handleDeleteItem(w http.ResponseWriter, r *http.Request) {
|
|
char := r.PathValue("character_name")
|
|
itemID, _ := strconv.ParseInt(r.PathValue("item_id"), 10, 64)
|
|
ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
|
|
defer cancel()
|
|
tx, err := s.pool.Begin(ctx)
|
|
if err != nil {
|
|
s.dbErr(w, "delete begin", err)
|
|
return
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
if err := deleteOneItem(ctx, tx, char, itemID); err != nil {
|
|
s.dbErr(w, "delete", err)
|
|
return
|
|
}
|
|
if err := tx.Commit(ctx); err != nil {
|
|
s.dbErr(w, "delete commit", err)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"status": "deleted", "item_id": itemID})
|
|
}
|
|
|
|
func parseNaiveTime(s string) time.Time {
|
|
if s == "" {
|
|
return time.Now().UTC()
|
|
}
|
|
s = strings.Replace(s, "Z", "+00:00", 1)
|
|
for _, l := range []string{time.RFC3339Nano, time.RFC3339, "2006-01-02T15:04:05.999999", "2006-01-02T15:04:05"} {
|
|
if t, err := time.Parse(l, s); err == nil {
|
|
return t.UTC()
|
|
}
|
|
}
|
|
return time.Now().UTC()
|
|
}
|
|
|
|
func firstNonNil(a, b any) any {
|
|
if a != nil {
|
|
return a
|
|
}
|
|
return b
|
|
}
|