feat(go-services): inventory-go Phase C — ingestion (validated, isolated DB)

Wires the validated item-processor into the ingestion endpoints, writing to an
isolated inventory-go-db (never production):
- schema.go: faithful 7-table replica of inventory-service/database.py.
- ingest.go: /process-inventory (full replace), POST/DELETE single item, with the
  exact delete-then-insert flow, dynamic INSERT builder (quotes reserved "unique"),
  spell union (is_active), and item_raw_data verbatim. enhancements always inserts.
- compose: isolated inventory-go-db (postgres:14, 127.0.0.1:5435) + read-write
  inventory-go-shadow (:8773) that owns it; schema init on boot.

Validated by ingesting a recently-ingested character's items (from production's
original_json) into the shadow DB and diffing vs production: byte-identical —
items 243, combat 243, enhancements 243, ratings 6, requirements 19, spells 52
all match; 0 per-column mismatches across 243 items.

Finding: older production normalized rows can be STALE (predate the code reading
Decal keys 218103832/218103835); Go matches the CURRENT Python code, so validate
ingestion against recently-ingested characters.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Erik 2026-06-24 12:42:26 +02:00
parent b90b52c515
commit c49b81c237
5 changed files with 466 additions and 3 deletions

View file

@ -152,5 +152,50 @@ services:
max-size: "10m"
max-file: "3"
# Phase C: isolated inventory DB the Go ingestion writes to (never production).
inventory-go-db:
image: postgres:14
container_name: inventory-go-db
ports:
- "127.0.0.1:5435:5432"
environment:
POSTGRES_DB: "inventory_db"
POSTGRES_USER: "inventory_user"
POSTGRES_PASSWORD: "${INVENTORY_DB_PASSWORD}"
volumes:
- inventory-go-data:/var/lib/postgresql/data
restart: unless-stopped
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
# Read-write inventory-go instance: owns inventory-go-db, exposes the ingestion
# endpoints. Used to validate ingestion (POST a character's items, compare the
# resulting normalized rows to production) without touching the production DB.
inventory-go-shadow:
image: inventory-go:local
container_name: inventory-go-shadow
ports:
- "127.0.0.1:8773:8773"
environment:
PORT: "8773"
DATABASE_URL: "postgresql://inventory_user:${INVENTORY_DB_PASSWORD}@inventory-go-db:5432/inventory_db"
READ_ONLY: "false"
ENUM_DB_PATH: "/enums/comprehensive_enum_database_v2.json"
LOG_LEVEL: "INFO"
volumes:
- ./inventory-service/comprehensive_enum_database_v2.json:/enums/comprehensive_enum_database_v2.json:ro
depends_on:
- inventory-go-db
restart: unless-stopped
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
volumes:
dereth-go-data:
inventory-go-data:

View file

@ -0,0 +1,266 @@
package main
import (
"context"
"encoding/json"
"io"
"net/http"
"sort"
"strconv"
"strings"
"time"
"github.com/jackc/pgx/v5"
)
// Ingestion endpoints — port of process_inventory / upsert_inventory_item /
// delete_inventory_item. They write to THIS instance's own DB (ingest mode,
// READ_ONLY=false), reusing the validated item-processor. Production is never
// written: an isolated inventory-go-db backs the shadow instance.
func quoteCol(k string) string {
if k == "unique" {
return `"unique"`
}
return k
}
func buildInsert(table string, cols map[string]any, returningID bool) (string, []any) {
keys := make([]string, 0, len(cols))
for k := range cols {
keys = append(keys, k)
}
sort.Strings(keys)
qc := make([]string, len(keys))
ph := make([]string, len(keys))
args := make([]any, len(keys))
for i, k := range keys {
qc[i] = quoteCol(k)
ph[i] = "$" + strconv.Itoa(i+1)
args[i] = cols[k]
}
sql := "INSERT INTO " + table + " (" + strings.Join(qc, ", ") + ") VALUES (" + strings.Join(ph, ", ") + ")"
if returningID {
sql += " RETURNING id"
}
return sql, args
}
var childTables = []struct{ table, key string }{
{"item_combat_stats", "combat"},
{"item_requirements", "requirements"},
{"item_enhancements", "enhancements"},
{"item_ratings", "ratings"},
}
// ingestItem processes one raw item and inserts it across the 7 tables.
func (s *Server) ingestItem(ctx context.Context, tx pgx.Tx, charName string, ts time.Time, raw map[string]any) error {
p := s.processItem(raw)
items := p["items"].(map[string]any)
items["character_name"] = charName
items["timestamp"] = ts
sql, args := buildInsert("items", items, true)
var id int
if err := tx.QueryRow(ctx, sql, args...).Scan(&id); err != nil {
return err
}
for _, ct := range childTables {
cols, _ := p[ct.key].(map[string]any)
if cols == nil {
continue // table skipped (all-sentinel)
}
cols["item_id"] = id
csql, cargs := buildInsert(ct.table, cols, false)
if _, err := tx.Exec(ctx, csql, cargs...); err != nil {
return err
}
}
if rows, ok := p["spells"].([]map[string]any); ok {
for _, sp := range rows {
if _, err := tx.Exec(ctx,
"INSERT INTO item_spells (item_id, spell_id, is_active) VALUES ($1,$2,$3) ON CONFLICT DO NOTHING",
id, sp["spell_id"], sp["is_active"]); err != nil {
return err
}
}
}
ivb, _ := json.Marshal(bag(raw, "IntValues"))
dvb, _ := json.Marshal(bag(raw, "DoubleValues"))
svb, _ := json.Marshal(bag(raw, "StringValues"))
bvb, _ := json.Marshal(bag(raw, "BoolValues"))
ojb, _ := json.Marshal(raw)
_, err := tx.Exec(ctx,
"INSERT INTO item_raw_data (item_id,int_values,double_values,string_values,bool_values,original_json) VALUES ($1,$2,$3,$4,$5,$6)",
id, ivb, dvb, svb, bvb, ojb)
return err
}
// deleteCharItems removes a character's rows across all tables (children first).
func deleteCharItems(ctx context.Context, tx pgx.Tx, charName string) error {
var ids []int
rows, err := tx.Query(ctx, "SELECT id FROM items WHERE character_name=$1", charName)
if err != nil {
return err
}
for rows.Next() {
var id int
if err := rows.Scan(&id); err != nil {
rows.Close()
return err
}
ids = append(ids, id)
}
rows.Close()
if len(ids) > 0 {
for _, t := range []string{"item_raw_data", "item_combat_stats", "item_requirements", "item_enhancements", "item_ratings", "item_spells"} {
if _, err := tx.Exec(ctx, "DELETE FROM "+t+" WHERE item_id = ANY($1)", ids); err != nil {
return err
}
}
}
_, err = tx.Exec(ctx, "DELETE FROM items WHERE character_name=$1", charName)
return err
}
func deleteOneItem(ctx context.Context, tx pgx.Tx, charName string, itemID int64) error {
var id int
err := tx.QueryRow(ctx, "SELECT id FROM items WHERE character_name=$1 AND item_id=$2", charName, itemID).Scan(&id)
if err == pgx.ErrNoRows {
return nil
}
if err != nil {
return err
}
for _, t := range []string{"item_raw_data", "item_combat_stats", "item_requirements", "item_enhancements", "item_ratings", "item_spells"} {
if _, err := tx.Exec(ctx, "DELETE FROM "+t+" WHERE item_id=$1", id); err != nil {
return err
}
}
_, err = tx.Exec(ctx, "DELETE FROM items WHERE id=$1", id)
return err
}
// POST /process-inventory — full replacement of a character's inventory.
func (s *Server) handleProcessInventory(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(io.LimitReader(r.Body, 64<<20))
var inv struct {
CharacterName string `json:"character_name"`
Timestamp string `json:"timestamp"`
Items []map[string]any `json:"items"`
}
if json.Unmarshal(body, &inv) != nil || inv.CharacterName == "" {
writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "invalid payload"})
return
}
ts := parseNaiveTime(inv.Timestamp)
ctx, cancel := context.WithTimeout(r.Context(), 120*time.Second)
defer cancel()
tx, err := s.pool.Begin(ctx)
if err != nil {
s.dbErr(w, "process-inventory begin", err)
return
}
defer tx.Rollback(ctx)
if err := deleteCharItems(ctx, tx, inv.CharacterName); err != nil {
s.dbErr(w, "process-inventory delete", err)
return
}
processed, errs := 0, 0
for _, raw := range inv.Items {
if raw["Id"] == nil && raw["id"] == nil {
errs++
continue
}
if err := s.ingestItem(ctx, tx, inv.CharacterName, ts, raw); err != nil {
s.log.Error("ingest item failed", "err", err, "char", inv.CharacterName)
errs++
continue
}
processed++
}
if err := tx.Commit(ctx); err != nil {
s.dbErr(w, "process-inventory commit", err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"processed_count": processed, "error_count": errs, "total_items": len(inv.Items)})
}
// POST /inventory/{character_name}/item — single-item upsert.
func (s *Server) handleUpsertItem(w http.ResponseWriter, r *http.Request) {
char := r.PathValue("character_name")
body, _ := io.ReadAll(io.LimitReader(r.Body, 16<<20))
var raw map[string]any
if json.Unmarshal(body, &raw) != nil {
writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "invalid JSON"})
return
}
if raw["Id"] == nil && raw["id"] == nil {
writeJSON(w, http.StatusBadRequest, map[string]any{"detail": "item missing Id"})
return
}
itemID := int64(toFloat(firstNonNil(raw["Id"], raw["id"])))
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
tx, err := s.pool.Begin(ctx)
if err != nil {
s.dbErr(w, "upsert begin", err)
return
}
defer tx.Rollback(ctx)
if err := deleteOneItem(ctx, tx, char, itemID); err != nil {
s.dbErr(w, "upsert delete", err)
return
}
if err := s.ingestItem(ctx, tx, char, time.Now().UTC(), raw); err != nil {
s.dbErr(w, "upsert insert", err)
return
}
if err := tx.Commit(ctx); err != nil {
s.dbErr(w, "upsert commit", err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "item_id": itemID})
}
// DELETE /inventory/{character_name}/item/{item_id}
func (s *Server) handleDeleteItem(w http.ResponseWriter, r *http.Request) {
char := r.PathValue("character_name")
itemID, _ := strconv.ParseInt(r.PathValue("item_id"), 10, 64)
ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
defer cancel()
tx, err := s.pool.Begin(ctx)
if err != nil {
s.dbErr(w, "delete begin", err)
return
}
defer tx.Rollback(ctx)
if err := deleteOneItem(ctx, tx, char, itemID); err != nil {
s.dbErr(w, "delete", err)
return
}
if err := tx.Commit(ctx); err != nil {
s.dbErr(w, "delete commit", err)
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "deleted", "item_id": itemID})
}
func parseNaiveTime(s string) time.Time {
if s == "" {
return time.Now().UTC()
}
s = strings.Replace(s, "Z", "+00:00", 1)
for _, l := range []string{time.RFC3339Nano, time.RFC3339, "2006-01-02T15:04:05.999999", "2006-01-02T15:04:05"} {
if t, err := time.Parse(l, s); err == nil {
return t.UTC()
}
}
return time.Now().UTC()
}
func firstNonNil(a, b any) any {
if a != nil {
return a
}
return b
}

View file

@ -71,12 +71,24 @@ func main() {
defer pool.Close()
srv.pool = pool
// Ingest mode owns its DB: create the schema on first run.
if !readOnly {
sctx, c := context.WithTimeout(ctx, 60*time.Second)
initSchema(sctx, pool, logger)
c()
}
mux := http.NewServeMux()
mux.HandleFunc("GET /health", srv.handleHealth)
mux.HandleFunc("GET /sets/list", srv.handleSetsList)
mux.HandleFunc("GET /characters/list", srv.handleCharactersList)
mux.HandleFunc("GET /search/items", srv.handleSearchItems)
mux.HandleFunc("POST /debug/process", srv.handleDebugProcess)
// Ingestion (works in read-write mode; on the read-only prod instance these
// fail the read-only transaction, which is the intended guard).
mux.HandleFunc("POST /process-inventory", srv.handleProcessInventory)
mux.HandleFunc("POST /inventory/{character_name}/item", srv.handleUpsertItem)
mux.HandleFunc("DELETE /inventory/{character_name}/item/{item_id}", srv.handleDeleteItem)
httpSrv := &http.Server{Addr: addr, Handler: withLogging(mux), ReadHeaderTimeout: 10 * time.Second}
go func() {

View file

@ -266,7 +266,7 @@ func (s *Server) processItem(raw map[string]any) map[string]any {
"items": items,
"combat": nullify(combat, sentinelCombat),
"requirements": nullify(requirements, sentinelReq),
"enhancements": nullify(enhancements, sentinelEnh), // always present
"enhancements": nullifyKeep(enhancements, sentinelEnh), // ALWAYS inserts a row
"ratings": nullify(ratings, sentinelRating),
"spells": spellRows,
}
@ -334,8 +334,21 @@ func nullify(m map[string]any, isSentinel func(any) bool) map[string]any {
}
}
if !any_ {
// caller distinguishes; combat/req/ratings skip (nil), enhancements keeps.
return nil
return nil // combat/req/ratings: skip the insert when all-sentinel
}
return out
}
// nullifyKeep is like nullify but ALWAYS returns the map (for item_enhancements,
// which inserts a row even when every value is NULL).
func nullifyKeep(m map[string]any, isSentinel func(any) bool) map[string]any {
out := make(map[string]any, len(m))
for k, v := range m {
if isSentinel(v) {
out[k] = nil
} else {
out[k] = v
}
}
return out
}

View file

@ -0,0 +1,127 @@
package main
import (
"context"
"log/slog"
"github.com/jackc/pgx/v5/pgxpool"
)
// initSchema creates the normalized inventory schema on an ingest-owned database
// (a faithful replica of inventory-service/database.py). Run only when this
// instance owns its DB (READ_ONLY=false) — never against production. Idempotent;
// logs and continues per statement.
func initSchema(ctx context.Context, pool *pgxpool.Pool, log *slog.Logger) {
stmts := []string{
`CREATE TABLE IF NOT EXISTS items (
id SERIAL PRIMARY KEY,
character_name VARCHAR(50) NOT NULL,
item_id BIGINT NOT NULL,
timestamp TIMESTAMP NOT NULL,
name VARCHAR(200) NOT NULL,
icon INTEGER NOT NULL,
object_class INTEGER NOT NULL,
value INTEGER DEFAULT 0,
burden INTEGER DEFAULT 0,
current_wielded_location INTEGER DEFAULT 0,
container_id BIGINT DEFAULT 0,
slot INTEGER DEFAULT -1,
bonded INTEGER DEFAULT 0,
attuned INTEGER DEFAULT 0,
"unique" BOOLEAN DEFAULT false,
stack_size INTEGER DEFAULT 1,
max_stack_size INTEGER DEFAULT 1,
items_capacity INTEGER,
containers_capacity INTEGER,
structure INTEGER,
max_structure INTEGER,
rare_id INTEGER,
lifespan INTEGER,
remaining_lifespan INTEGER,
has_id_data BOOLEAN DEFAULT false,
last_id_time BIGINT DEFAULT 0,
CONSTRAINT uq_char_item UNIQUE (character_name, item_id)
)`,
`CREATE INDEX IF NOT EXISTS ix_items_character_name ON items (character_name)`,
`CREATE INDEX IF NOT EXISTS ix_items_name ON items (name)`,
`CREATE INDEX IF NOT EXISTS ix_items_object_class ON items (object_class)`,
`CREATE INDEX IF NOT EXISTS ix_items_current_wielded_location ON items (current_wielded_location)`,
`CREATE TABLE IF NOT EXISTS item_combat_stats (
item_id INTEGER PRIMARY KEY REFERENCES items(id),
max_damage INTEGER, damage INTEGER, damage_type INTEGER, damage_bonus DOUBLE PRECISION,
elemental_damage_bonus INTEGER, elemental_damage_vs_monsters DOUBLE PRECISION, variance DOUBLE PRECISION,
cleaving INTEGER, crit_damage_rating INTEGER, damage_over_time INTEGER,
attack_bonus DOUBLE PRECISION, weapon_time INTEGER, weapon_skill INTEGER,
armor_level INTEGER, shield_value INTEGER, melee_defense_bonus DOUBLE PRECISION,
missile_defense_bonus DOUBLE PRECISION, magic_defense_bonus DOUBLE PRECISION,
resist_magic INTEGER, crit_resist_rating INTEGER, crit_damage_resist_rating INTEGER,
dot_resist_rating INTEGER, life_resist_rating INTEGER, nether_resist_rating INTEGER,
heal_over_time INTEGER, healing_resist_rating INTEGER, mana_conversion_bonus DOUBLE PRECISION,
pk_damage_rating INTEGER, pk_damage_resist_rating INTEGER, gear_pk_damage_rating INTEGER,
gear_pk_damage_resist_rating INTEGER,
base_armor_level INTEGER, base_max_damage INTEGER, base_attack_bonus DOUBLE PRECISION,
base_melee_defense_bonus DOUBLE PRECISION, base_elemental_damage_vs_monsters DOUBLE PRECISION,
base_mana_conversion_bonus DOUBLE PRECISION
)`,
`CREATE INDEX IF NOT EXISTS ix_combat_armor ON item_combat_stats (armor_level)`,
`CREATE TABLE IF NOT EXISTS item_requirements (
item_id INTEGER PRIMARY KEY REFERENCES items(id),
wield_level INTEGER, wield_requirement INTEGER, skill_level INTEGER,
lore_requirement INTEGER, equip_skill VARCHAR(50), mastery VARCHAR(50)
)`,
`CREATE INDEX IF NOT EXISTS ix_req_level ON item_requirements (wield_level)`,
`CREATE TABLE IF NOT EXISTS item_enhancements (
item_id INTEGER PRIMARY KEY REFERENCES items(id),
material VARCHAR(50), imbue VARCHAR(50), tinks INTEGER, workmanship DOUBLE PRECISION,
salvage_workmanship DOUBLE PRECISION, num_times_tinkered INTEGER DEFAULT 0,
free_tinkers_bitfield INTEGER, num_items_in_material INTEGER,
imbue_attempts INTEGER DEFAULT 0, imbue_successes INTEGER DEFAULT 0,
imbued_effect2 INTEGER, imbued_effect3 INTEGER, imbued_effect4 INTEGER, imbued_effect5 INTEGER,
imbue_stacking_bits INTEGER, item_set VARCHAR(100), equipment_set_extra INTEGER,
aetheria_bitfield INTEGER, heritage_specific_armor INTEGER, shared_cooldown INTEGER
)`,
`CREATE INDEX IF NOT EXISTS ix_enh_material_set ON item_enhancements (material, item_set)`,
`CREATE TABLE IF NOT EXISTS item_ratings (
item_id INTEGER PRIMARY KEY REFERENCES items(id),
damage_rating INTEGER, damage_resist_rating INTEGER, crit_rating INTEGER,
crit_resist_rating INTEGER, crit_damage_rating INTEGER, crit_damage_resist_rating INTEGER,
heal_boost_rating INTEGER, vitality_rating INTEGER, healing_rating INTEGER,
mana_conversion_rating INTEGER, weakness_rating INTEGER, nether_over_time INTEGER,
healing_resist_rating INTEGER, nether_resist_rating INTEGER, dot_resist_rating INTEGER,
life_resist_rating INTEGER, sneak_attack_rating INTEGER, recklessness_rating INTEGER,
deception_rating INTEGER, pk_damage_rating INTEGER, pk_damage_resist_rating INTEGER,
gear_pk_damage_rating INTEGER, gear_pk_damage_resist_rating INTEGER,
gear_damage INTEGER, gear_damage_resist INTEGER, gear_crit INTEGER, gear_crit_resist INTEGER,
gear_crit_damage INTEGER, gear_crit_damage_resist INTEGER, gear_healing_boost INTEGER,
gear_max_health INTEGER, gear_nether_resist INTEGER, gear_life_resist INTEGER,
gear_overpower INTEGER, gear_overpower_resist INTEGER, total_rating INTEGER
)`,
`CREATE TABLE IF NOT EXISTS item_spells (
item_id INTEGER REFERENCES items(id),
spell_id INTEGER,
is_active BOOLEAN DEFAULT false,
PRIMARY KEY (item_id, spell_id)
)`,
`CREATE TABLE IF NOT EXISTS item_raw_data (
item_id INTEGER PRIMARY KEY REFERENCES items(id),
int_values JSONB, double_values JSONB, string_values JSONB, bool_values JSONB,
original_json JSONB
)`,
}
ok, failed := 0, 0
for _, s := range stmts {
if _, err := pool.Exec(ctx, s); err != nil {
failed++
log.Warn("schema statement failed (continuing)", "err", err)
continue
}
ok++
}
log.Info("inventory schema init complete", "ok", ok, "failed", failed)
}