From c49b81c2376c23f3f4c1d923a155768ed0551145 Mon Sep 17 00:00:00 2001 From: Erik Date: Wed, 24 Jun 2026 12:42:26 +0200 Subject: [PATCH] =?UTF-8?q?feat(go-services):=20inventory-go=20Phase=20C?= =?UTF-8?q?=20=E2=80=94=20ingestion=20(validated,=20isolated=20DB)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the validated item-processor into the ingestion endpoints, writing to an isolated inventory-go-db (never production): - schema.go: faithful 7-table replica of inventory-service/database.py. - ingest.go: /process-inventory (full replace), POST/DELETE single item, with the exact delete-then-insert flow, dynamic INSERT builder (quotes reserved "unique"), spell union (is_active), and item_raw_data verbatim. enhancements always inserts. - compose: isolated inventory-go-db (postgres:14, 127.0.0.1:5435) + read-write inventory-go-shadow (:8773) that owns it; schema init on boot. Validated by ingesting a recently-ingested character's items (from production's original_json) into the shadow DB and diffing vs production: byte-identical — items 243, combat 243, enhancements 243, ratings 6, requirements 19, spells 52 all match; 0 per-column mismatches across 243 items. Finding: older production normalized rows can be STALE (predate the code reading Decal keys 218103832/218103835); Go matches the CURRENT Python code, so validate ingestion against recently-ingested characters. Co-Authored-By: Claude Opus 4.8 --- go-services/docker-compose.go.yml | 45 +++++ go-services/inventory-go/ingest.go | 266 ++++++++++++++++++++++++++++ go-services/inventory-go/main.go | 12 ++ go-services/inventory-go/process.go | 19 +- go-services/inventory-go/schema.go | 127 +++++++++++++ 5 files changed, 466 insertions(+), 3 deletions(-) create mode 100644 go-services/inventory-go/ingest.go create mode 100644 go-services/inventory-go/schema.go diff --git a/go-services/docker-compose.go.yml b/go-services/docker-compose.go.yml index c285d4cf..bc7bc3ba 100644 --- a/go-services/docker-compose.go.yml +++ b/go-services/docker-compose.go.yml @@ -152,5 +152,50 @@ services: max-size: "10m" max-file: "3" + # Phase C: isolated inventory DB the Go ingestion writes to (never production). + inventory-go-db: + image: postgres:14 + container_name: inventory-go-db + ports: + - "127.0.0.1:5435:5432" + environment: + POSTGRES_DB: "inventory_db" + POSTGRES_USER: "inventory_user" + POSTGRES_PASSWORD: "${INVENTORY_DB_PASSWORD}" + volumes: + - inventory-go-data:/var/lib/postgresql/data + restart: unless-stopped + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" + + # Read-write inventory-go instance: owns inventory-go-db, exposes the ingestion + # endpoints. Used to validate ingestion (POST a character's items, compare the + # resulting normalized rows to production) without touching the production DB. + inventory-go-shadow: + image: inventory-go:local + container_name: inventory-go-shadow + ports: + - "127.0.0.1:8773:8773" + environment: + PORT: "8773" + DATABASE_URL: "postgresql://inventory_user:${INVENTORY_DB_PASSWORD}@inventory-go-db:5432/inventory_db" + READ_ONLY: "false" + ENUM_DB_PATH: "/enums/comprehensive_enum_database_v2.json" + LOG_LEVEL: "INFO" + volumes: + - ./inventory-service/comprehensive_enum_database_v2.json:/enums/comprehensive_enum_database_v2.json:ro + depends_on: + - inventory-go-db + restart: unless-stopped + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" + volumes: dereth-go-data: + inventory-go-data: diff --git a/go-services/inventory-go/ingest.go b/go-services/inventory-go/ingest.go new file mode 100644 index 00000000..04ab99e3 --- /dev/null +++ b/go-services/inventory-go/ingest.go @@ -0,0 +1,266 @@ +package main + +import ( + "context" + "encoding/json" + "io" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/jackc/pgx/v5" +) + +// Ingestion endpoints — port of process_inventory / upsert_inventory_item / +// delete_inventory_item. They write to THIS instance's own DB (ingest mode, +// READ_ONLY=false), reusing the validated item-processor. Production is never +// written: an isolated inventory-go-db backs the shadow instance. + +func quoteCol(k string) string { + if k == "unique" { + return `"unique"` + } + return k +} + +func buildInsert(table string, cols map[string]any, returningID bool) (string, []any) { + keys := make([]string, 0, len(cols)) + for k := range cols { + keys = append(keys, k) + } + sort.Strings(keys) + qc := make([]string, len(keys)) + ph := make([]string, len(keys)) + args := make([]any, len(keys)) + for i, k := range keys { + qc[i] = quoteCol(k) + ph[i] = "$" + strconv.Itoa(i+1) + args[i] = cols[k] + } + sql := "INSERT INTO " + table + " (" + strings.Join(qc, ", ") + ") VALUES (" + strings.Join(ph, ", ") + ")" + if returningID { + sql += " RETURNING id" + } + return sql, args +} + +var childTables = []struct{ table, key string }{ + {"item_combat_stats", "combat"}, + {"item_requirements", "requirements"}, + {"item_enhancements", "enhancements"}, + {"item_ratings", "ratings"}, +} + +// ingestItem processes one raw item and inserts it across the 7 tables. +func (s *Server) ingestItem(ctx context.Context, tx pgx.Tx, charName string, ts time.Time, raw map[string]any) error { + p := s.processItem(raw) + items := p["items"].(map[string]any) + items["character_name"] = charName + items["timestamp"] = ts + sql, args := buildInsert("items", items, true) + var id int + if err := tx.QueryRow(ctx, sql, args...).Scan(&id); err != nil { + return err + } + for _, ct := range childTables { + cols, _ := p[ct.key].(map[string]any) + if cols == nil { + continue // table skipped (all-sentinel) + } + cols["item_id"] = id + csql, cargs := buildInsert(ct.table, cols, false) + if _, err := tx.Exec(ctx, csql, cargs...); err != nil { + return err + } + } + if rows, ok := p["spells"].([]map[string]any); ok { + for _, sp := range rows { + if _, err := tx.Exec(ctx, + "INSERT INTO item_spells (item_id, spell_id, is_active) VALUES ($1,$2,$3) ON CONFLICT DO NOTHING", + id, sp["spell_id"], sp["is_active"]); err != nil { + return err + } + } + } + ivb, _ := json.Marshal(bag(raw, "IntValues")) + dvb, _ := json.Marshal(bag(raw, "DoubleValues")) + svb, _ := json.Marshal(bag(raw, "StringValues")) + bvb, _ := json.Marshal(bag(raw, "BoolValues")) + ojb, _ := json.Marshal(raw) + _, err := tx.Exec(ctx, + "INSERT INTO item_raw_data (item_id,int_values,double_values,string_values,bool_values,original_json) VALUES ($1,$2,$3,$4,$5,$6)", + id, ivb, dvb, svb, bvb, ojb) + return err +} + +// deleteCharItems removes a character's rows across all tables (children first). +func deleteCharItems(ctx context.Context, tx pgx.Tx, charName string) error { + var ids []int + rows, err := tx.Query(ctx, "SELECT id FROM items WHERE character_name=$1", charName) + if err != nil { + return err + } + for rows.Next() { + var id int + if err := rows.Scan(&id); err != nil { + rows.Close() + return err + } + ids = append(ids, id) + } + rows.Close() + if len(ids) > 0 { + for _, t := range []string{"item_raw_data", "item_combat_stats", "item_requirements", "item_enhancements", "item_ratings", "item_spells"} { + if _, err := tx.Exec(ctx, "DELETE FROM "+t+" WHERE item_id = ANY($1)", ids); err != nil { + return err + } + } + } + _, err = tx.Exec(ctx, "DELETE FROM items WHERE character_name=$1", charName) + return err +} + +func deleteOneItem(ctx context.Context, tx pgx.Tx, charName string, itemID int64) error { + var id int + err := tx.QueryRow(ctx, "SELECT id FROM items WHERE character_name=$1 AND item_id=$2", charName, itemID).Scan(&id) + if err == pgx.ErrNoRows { + return nil + } + if err != nil { + return err + } + for _, t := range []string{"item_raw_data", "item_combat_stats", "item_requirements", "item_enhancements", "item_ratings", "item_spells"} { + if _, err := tx.Exec(ctx, "DELETE FROM "+t+" WHERE item_id=$1", id); err != nil { + return err + } + } + _, err = tx.Exec(ctx, "DELETE FROM items WHERE id=$1", id) + return err +} + +// POST /process-inventory — full replacement of a character's inventory. +func (s *Server) handleProcessInventory(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(io.LimitReader(r.Body, 64<<20)) + var inv struct { + CharacterName string `json:"character_name"` + Timestamp string `json:"timestamp"` + Items []map[string]any `json:"items"` + } + if json.Unmarshal(body, &inv) != nil || inv.CharacterName == "" { + writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "invalid payload"}) + return + } + ts := parseNaiveTime(inv.Timestamp) + ctx, cancel := context.WithTimeout(r.Context(), 120*time.Second) + defer cancel() + tx, err := s.pool.Begin(ctx) + if err != nil { + s.dbErr(w, "process-inventory begin", err) + return + } + defer tx.Rollback(ctx) + if err := deleteCharItems(ctx, tx, inv.CharacterName); err != nil { + s.dbErr(w, "process-inventory delete", err) + return + } + processed, errs := 0, 0 + for _, raw := range inv.Items { + if raw["Id"] == nil && raw["id"] == nil { + errs++ + continue + } + if err := s.ingestItem(ctx, tx, inv.CharacterName, ts, raw); err != nil { + s.log.Error("ingest item failed", "err", err, "char", inv.CharacterName) + errs++ + continue + } + processed++ + } + if err := tx.Commit(ctx); err != nil { + s.dbErr(w, "process-inventory commit", err) + return + } + writeJSON(w, http.StatusOK, map[string]any{"processed_count": processed, "error_count": errs, "total_items": len(inv.Items)}) +} + +// POST /inventory/{character_name}/item — single-item upsert. +func (s *Server) handleUpsertItem(w http.ResponseWriter, r *http.Request) { + char := r.PathValue("character_name") + body, _ := io.ReadAll(io.LimitReader(r.Body, 16<<20)) + var raw map[string]any + if json.Unmarshal(body, &raw) != nil { + writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "invalid JSON"}) + return + } + if raw["Id"] == nil && raw["id"] == nil { + writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "item missing Id"}) + return + } + itemID := int64(toFloat(firstNonNil(raw["Id"], raw["id"]))) + ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) + defer cancel() + tx, err := s.pool.Begin(ctx) + if err != nil { + s.dbErr(w, "upsert begin", err) + return + } + defer tx.Rollback(ctx) + if err := deleteOneItem(ctx, tx, char, itemID); err != nil { + s.dbErr(w, "upsert delete", err) + return + } + if err := s.ingestItem(ctx, tx, char, time.Now().UTC(), raw); err != nil { + s.dbErr(w, "upsert insert", err) + return + } + if err := tx.Commit(ctx); err != nil { + s.dbErr(w, "upsert commit", err) + return + } + writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "item_id": itemID}) +} + +// DELETE /inventory/{character_name}/item/{item_id} +func (s *Server) handleDeleteItem(w http.ResponseWriter, r *http.Request) { + char := r.PathValue("character_name") + itemID, _ := strconv.ParseInt(r.PathValue("item_id"), 10, 64) + ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second) + defer cancel() + tx, err := s.pool.Begin(ctx) + if err != nil { + s.dbErr(w, "delete begin", err) + return + } + defer tx.Rollback(ctx) + if err := deleteOneItem(ctx, tx, char, itemID); err != nil { + s.dbErr(w, "delete", err) + return + } + if err := tx.Commit(ctx); err != nil { + s.dbErr(w, "delete commit", err) + return + } + writeJSON(w, http.StatusOK, map[string]any{"status": "deleted", "item_id": itemID}) +} + +func parseNaiveTime(s string) time.Time { + if s == "" { + return time.Now().UTC() + } + s = strings.Replace(s, "Z", "+00:00", 1) + for _, l := range []string{time.RFC3339Nano, time.RFC3339, "2006-01-02T15:04:05.999999", "2006-01-02T15:04:05"} { + if t, err := time.Parse(l, s); err == nil { + return t.UTC() + } + } + return time.Now().UTC() +} + +func firstNonNil(a, b any) any { + if a != nil { + return a + } + return b +} diff --git a/go-services/inventory-go/main.go b/go-services/inventory-go/main.go index 2f41cbd0..68576db7 100644 --- a/go-services/inventory-go/main.go +++ b/go-services/inventory-go/main.go @@ -71,12 +71,24 @@ func main() { defer pool.Close() srv.pool = pool + // Ingest mode owns its DB: create the schema on first run. + if !readOnly { + sctx, c := context.WithTimeout(ctx, 60*time.Second) + initSchema(sctx, pool, logger) + c() + } + mux := http.NewServeMux() mux.HandleFunc("GET /health", srv.handleHealth) mux.HandleFunc("GET /sets/list", srv.handleSetsList) mux.HandleFunc("GET /characters/list", srv.handleCharactersList) mux.HandleFunc("GET /search/items", srv.handleSearchItems) mux.HandleFunc("POST /debug/process", srv.handleDebugProcess) + // Ingestion (works in read-write mode; on the read-only prod instance these + // fail the read-only transaction, which is the intended guard). + mux.HandleFunc("POST /process-inventory", srv.handleProcessInventory) + mux.HandleFunc("POST /inventory/{character_name}/item", srv.handleUpsertItem) + mux.HandleFunc("DELETE /inventory/{character_name}/item/{item_id}", srv.handleDeleteItem) httpSrv := &http.Server{Addr: addr, Handler: withLogging(mux), ReadHeaderTimeout: 10 * time.Second} go func() { diff --git a/go-services/inventory-go/process.go b/go-services/inventory-go/process.go index 95bab38a..4a377309 100644 --- a/go-services/inventory-go/process.go +++ b/go-services/inventory-go/process.go @@ -266,7 +266,7 @@ func (s *Server) processItem(raw map[string]any) map[string]any { "items": items, "combat": nullify(combat, sentinelCombat), "requirements": nullify(requirements, sentinelReq), - "enhancements": nullify(enhancements, sentinelEnh), // always present + "enhancements": nullifyKeep(enhancements, sentinelEnh), // ALWAYS inserts a row "ratings": nullify(ratings, sentinelRating), "spells": spellRows, } @@ -334,8 +334,21 @@ func nullify(m map[string]any, isSentinel func(any) bool) map[string]any { } } if !any_ { - // caller distinguishes; combat/req/ratings skip (nil), enhancements keeps. - return nil + return nil // combat/req/ratings: skip the insert when all-sentinel + } + return out +} + +// nullifyKeep is like nullify but ALWAYS returns the map (for item_enhancements, +// which inserts a row even when every value is NULL). +func nullifyKeep(m map[string]any, isSentinel func(any) bool) map[string]any { + out := make(map[string]any, len(m)) + for k, v := range m { + if isSentinel(v) { + out[k] = nil + } else { + out[k] = v + } } return out } diff --git a/go-services/inventory-go/schema.go b/go-services/inventory-go/schema.go new file mode 100644 index 00000000..43950bbb --- /dev/null +++ b/go-services/inventory-go/schema.go @@ -0,0 +1,127 @@ +package main + +import ( + "context" + "log/slog" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// initSchema creates the normalized inventory schema on an ingest-owned database +// (a faithful replica of inventory-service/database.py). Run only when this +// instance owns its DB (READ_ONLY=false) — never against production. Idempotent; +// logs and continues per statement. +func initSchema(ctx context.Context, pool *pgxpool.Pool, log *slog.Logger) { + stmts := []string{ + `CREATE TABLE IF NOT EXISTS items ( + id SERIAL PRIMARY KEY, + character_name VARCHAR(50) NOT NULL, + item_id BIGINT NOT NULL, + timestamp TIMESTAMP NOT NULL, + name VARCHAR(200) NOT NULL, + icon INTEGER NOT NULL, + object_class INTEGER NOT NULL, + value INTEGER DEFAULT 0, + burden INTEGER DEFAULT 0, + current_wielded_location INTEGER DEFAULT 0, + container_id BIGINT DEFAULT 0, + slot INTEGER DEFAULT -1, + bonded INTEGER DEFAULT 0, + attuned INTEGER DEFAULT 0, + "unique" BOOLEAN DEFAULT false, + stack_size INTEGER DEFAULT 1, + max_stack_size INTEGER DEFAULT 1, + items_capacity INTEGER, + containers_capacity INTEGER, + structure INTEGER, + max_structure INTEGER, + rare_id INTEGER, + lifespan INTEGER, + remaining_lifespan INTEGER, + has_id_data BOOLEAN DEFAULT false, + last_id_time BIGINT DEFAULT 0, + CONSTRAINT uq_char_item UNIQUE (character_name, item_id) + )`, + `CREATE INDEX IF NOT EXISTS ix_items_character_name ON items (character_name)`, + `CREATE INDEX IF NOT EXISTS ix_items_name ON items (name)`, + `CREATE INDEX IF NOT EXISTS ix_items_object_class ON items (object_class)`, + `CREATE INDEX IF NOT EXISTS ix_items_current_wielded_location ON items (current_wielded_location)`, + + `CREATE TABLE IF NOT EXISTS item_combat_stats ( + item_id INTEGER PRIMARY KEY REFERENCES items(id), + max_damage INTEGER, damage INTEGER, damage_type INTEGER, damage_bonus DOUBLE PRECISION, + elemental_damage_bonus INTEGER, elemental_damage_vs_monsters DOUBLE PRECISION, variance DOUBLE PRECISION, + cleaving INTEGER, crit_damage_rating INTEGER, damage_over_time INTEGER, + attack_bonus DOUBLE PRECISION, weapon_time INTEGER, weapon_skill INTEGER, + armor_level INTEGER, shield_value INTEGER, melee_defense_bonus DOUBLE PRECISION, + missile_defense_bonus DOUBLE PRECISION, magic_defense_bonus DOUBLE PRECISION, + resist_magic INTEGER, crit_resist_rating INTEGER, crit_damage_resist_rating INTEGER, + dot_resist_rating INTEGER, life_resist_rating INTEGER, nether_resist_rating INTEGER, + heal_over_time INTEGER, healing_resist_rating INTEGER, mana_conversion_bonus DOUBLE PRECISION, + pk_damage_rating INTEGER, pk_damage_resist_rating INTEGER, gear_pk_damage_rating INTEGER, + gear_pk_damage_resist_rating INTEGER, + base_armor_level INTEGER, base_max_damage INTEGER, base_attack_bonus DOUBLE PRECISION, + base_melee_defense_bonus DOUBLE PRECISION, base_elemental_damage_vs_monsters DOUBLE PRECISION, + base_mana_conversion_bonus DOUBLE PRECISION + )`, + `CREATE INDEX IF NOT EXISTS ix_combat_armor ON item_combat_stats (armor_level)`, + + `CREATE TABLE IF NOT EXISTS item_requirements ( + item_id INTEGER PRIMARY KEY REFERENCES items(id), + wield_level INTEGER, wield_requirement INTEGER, skill_level INTEGER, + lore_requirement INTEGER, equip_skill VARCHAR(50), mastery VARCHAR(50) + )`, + `CREATE INDEX IF NOT EXISTS ix_req_level ON item_requirements (wield_level)`, + + `CREATE TABLE IF NOT EXISTS item_enhancements ( + item_id INTEGER PRIMARY KEY REFERENCES items(id), + material VARCHAR(50), imbue VARCHAR(50), tinks INTEGER, workmanship DOUBLE PRECISION, + salvage_workmanship DOUBLE PRECISION, num_times_tinkered INTEGER DEFAULT 0, + free_tinkers_bitfield INTEGER, num_items_in_material INTEGER, + imbue_attempts INTEGER DEFAULT 0, imbue_successes INTEGER DEFAULT 0, + imbued_effect2 INTEGER, imbued_effect3 INTEGER, imbued_effect4 INTEGER, imbued_effect5 INTEGER, + imbue_stacking_bits INTEGER, item_set VARCHAR(100), equipment_set_extra INTEGER, + aetheria_bitfield INTEGER, heritage_specific_armor INTEGER, shared_cooldown INTEGER + )`, + `CREATE INDEX IF NOT EXISTS ix_enh_material_set ON item_enhancements (material, item_set)`, + + `CREATE TABLE IF NOT EXISTS item_ratings ( + item_id INTEGER PRIMARY KEY REFERENCES items(id), + damage_rating INTEGER, damage_resist_rating INTEGER, crit_rating INTEGER, + crit_resist_rating INTEGER, crit_damage_rating INTEGER, crit_damage_resist_rating INTEGER, + heal_boost_rating INTEGER, vitality_rating INTEGER, healing_rating INTEGER, + mana_conversion_rating INTEGER, weakness_rating INTEGER, nether_over_time INTEGER, + healing_resist_rating INTEGER, nether_resist_rating INTEGER, dot_resist_rating INTEGER, + life_resist_rating INTEGER, sneak_attack_rating INTEGER, recklessness_rating INTEGER, + deception_rating INTEGER, pk_damage_rating INTEGER, pk_damage_resist_rating INTEGER, + gear_pk_damage_rating INTEGER, gear_pk_damage_resist_rating INTEGER, + gear_damage INTEGER, gear_damage_resist INTEGER, gear_crit INTEGER, gear_crit_resist INTEGER, + gear_crit_damage INTEGER, gear_crit_damage_resist INTEGER, gear_healing_boost INTEGER, + gear_max_health INTEGER, gear_nether_resist INTEGER, gear_life_resist INTEGER, + gear_overpower INTEGER, gear_overpower_resist INTEGER, total_rating INTEGER + )`, + + `CREATE TABLE IF NOT EXISTS item_spells ( + item_id INTEGER REFERENCES items(id), + spell_id INTEGER, + is_active BOOLEAN DEFAULT false, + PRIMARY KEY (item_id, spell_id) + )`, + + `CREATE TABLE IF NOT EXISTS item_raw_data ( + item_id INTEGER PRIMARY KEY REFERENCES items(id), + int_values JSONB, double_values JSONB, string_values JSONB, bool_values JSONB, + original_json JSONB + )`, + } + ok, failed := 0, 0 + for _, s := range stmts { + if _, err := pool.Exec(ctx, s); err != nil { + failed++ + log.Warn("schema statement failed (continuing)", "err", err) + continue + } + ok++ + } + log.Info("inventory schema init complete", "ok", ok, "failed", failed) +}