MosswartOverlord/inventory-service/suitbuilder.py
erik 25e5dd32a4 Remove unused scripts and add missing modules to inventory-service
Removed from git (moved to unused/inventoryservice locally):
- extract_*.py scripts (one-time enum extraction tools)
- init_db.py, debug_ratings.py, test_suitbuilder*.py
- Old JSON outputs superseded by comprehensive_enum_database_v2.json

Added previously untracked required files:
- helpers.py (shared enum state for suitbuilder)
- suitbuilder.py (equipment optimization router)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 15:47:51 +00:00

1848 lines
No EOL
85 KiB
Python

"""
Suitbuilder - Equipment optimization system for Asheron's Call.
Implements constraint satisfaction solver to find optimal equipment combinations
across multiple characters' inventories based on armor sets, spell coverage, and ratings.
"""
import json
import logging
import asyncio
from enum import IntFlag, Enum
from typing import Dict, List, Optional, Any, Set, Tuple, AsyncGenerator
from dataclasses import dataclass, field
from datetime import datetime
import time
from fastapi import APIRouter, HTTPException, Query, Depends, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
import databases
import sqlalchemy as sa
from database import (
DATABASE_URL, Item, ItemCombatStats, ItemRequirements,
ItemEnhancements, ItemRatings, ItemSpells, ItemRawData
)
# Import shared helper functions
import json as json_module
# Removed circular import - will implement locally if needed
# Configure logging
logger = logging.getLogger(__name__)
# Database connection will be injected from main service
database = None
# Set name translation mapping
SET_NAMES = {
14: "Adept's",
16: "Defender's",
13: "Soldier's",
21: "Wise",
40: "Heroic Protector",
41: "Heroic Destroyer",
46: "Relic Alduressa",
47: "Ancient Relic",
48: "Noble Relic",
15: "Archer's",
19: "Hearty",
20: "Dexterous",
22: "Swift",
24: "Reinforced",
26: "Flame Proof",
29: "Lightning Proof"
}
def get_set_name(set_id: Optional[int]) -> str:
"""Translate set ID to readable set name."""
if set_id is None:
return ""
# If set_id is already a string with "Set" in it, it's already translated
if isinstance(set_id, str) and "Set" in set_id:
return set_id
# Otherwise translate numeric ID
return SET_NAMES.get(set_id, f"Set {set_id}")
def set_database_connection(db_instance):
"""Set the database connection from main service."""
global database
database = db_instance
logger.info("Suitbuilder database connection established")
# Create router for suitbuilder endpoints
router = APIRouter()
class CoverageMask(IntFlag):
"""
Bit flags for armor coverage areas.
Values match Mag-SuitBuilder exactly for compatibility.
"""
NONE = 0x0
# Underwear coverage (shirts/pants)
UNDERWEAR_UPPER_LEGS = 0x00000002
UNDERWEAR_LOWER_LEGS = 0x00000004
UNDERWEAR_CHEST = 0x00000008
UNDERWEAR_ABDOMEN = 0x00000010
UNDERWEAR_UPPER_ARMS = 0x00000020
UNDERWEAR_LOWER_ARMS = 0x00000040
# Outerwear/Armor coverage (the important ones for armor suits)
OUTERWEAR_UPPER_LEGS = 0x00000100
OUTERWEAR_LOWER_LEGS = 0x00000200
OUTERWEAR_CHEST = 0x00000400
OUTERWEAR_ABDOMEN = 0x00000800
OUTERWEAR_UPPER_ARMS = 0x00001000
OUTERWEAR_LOWER_ARMS = 0x00002000
# Extremities
HEAD = 0x00004000
HANDS = 0x00008000
FEET = 0x00010000
# Convenience aliases matching our slot names
CHEST = OUTERWEAR_CHEST
ABDOMEN = OUTERWEAR_ABDOMEN
UPPER_ARMS = OUTERWEAR_UPPER_ARMS
LOWER_ARMS = OUTERWEAR_LOWER_ARMS
UPPER_LEGS = OUTERWEAR_UPPER_LEGS
LOWER_LEGS = OUTERWEAR_LOWER_LEGS
def reduction_options(self) -> List['CoverageMask']:
"""
Returns possible reductions for multi-coverage items.
Based on exact Mag-SuitBuilder logic for armor tailoring.
"""
# Single coverage items cannot be reduced
if bin(self.value).count('1') <= 1:
return []
# Robes cannot be reduced (exclude from suits entirely)
if self.is_robe():
return []
reductions = []
# Specific reduction patterns from Mag-SuitBuilder
if self == (CoverageMask.UPPER_ARMS | CoverageMask.LOWER_ARMS):
# Arm guards can be reduced to either upper or lower arms
reductions.extend([CoverageMask.UPPER_ARMS, CoverageMask.LOWER_ARMS])
elif self == (CoverageMask.UPPER_LEGS | CoverageMask.LOWER_LEGS):
# Leg guards can be reduced to either upper or lower legs
reductions.extend([CoverageMask.UPPER_LEGS, CoverageMask.LOWER_LEGS])
elif self == (CoverageMask.LOWER_LEGS | CoverageMask.FEET):
# Boots that cover lower legs and feet can be reduced to feet only
reductions.append(CoverageMask.FEET)
elif self == (CoverageMask.CHEST | CoverageMask.ABDOMEN):
# Chest+abdomen pieces can be reduced to chest only
reductions.append(CoverageMask.CHEST)
elif self == (CoverageMask.CHEST | CoverageMask.ABDOMEN | CoverageMask.UPPER_ARMS):
# Chest+abdomen+upper arms can be reduced to chest
reductions.append(CoverageMask.CHEST)
elif self == (CoverageMask.CHEST | CoverageMask.UPPER_ARMS | CoverageMask.LOWER_ARMS):
# Chest+arms can be reduced to chest
reductions.append(CoverageMask.CHEST)
elif self == (CoverageMask.CHEST | CoverageMask.UPPER_ARMS):
# Chest+upper arms can be reduced to chest
reductions.append(CoverageMask.CHEST)
elif self == (CoverageMask.ABDOMEN | CoverageMask.UPPER_LEGS | CoverageMask.LOWER_LEGS):
# Tassets covering abdomen and legs can be reduced to any of the three
reductions.extend([CoverageMask.ABDOMEN, CoverageMask.UPPER_LEGS, CoverageMask.LOWER_LEGS])
elif self == (CoverageMask.CHEST | CoverageMask.ABDOMEN | CoverageMask.UPPER_ARMS | CoverageMask.LOWER_ARMS):
# Hauberks can be reduced to chest
reductions.append(CoverageMask.CHEST)
elif self == (CoverageMask.ABDOMEN | CoverageMask.UPPER_LEGS):
# Pre-2010 retail pieces - reduce to abdomen only
reductions.append(CoverageMask.ABDOMEN)
return reductions
def is_robe(self) -> bool:
"""
Check if this coverage represents a robe using exact Mag-SuitBuilder logic.
Robe pattern: 0x00013F00 (7 specific coverage areas excluding head and hands)
"""
# Exact robe pattern from Mag-SuitBuilder: 0x00013F00
MAG_ROBE_PATTERN = 0x00013F00
# Check for exact match first (most reliable)
if self.value == MAG_ROBE_PATTERN:
return True
# Also check for the component pattern manually in case of slight variations
robe_components = (
CoverageMask.FEET |
CoverageMask.OUTERWEAR_UPPER_LEGS |
CoverageMask.OUTERWEAR_LOWER_LEGS |
CoverageMask.OUTERWEAR_CHEST |
CoverageMask.OUTERWEAR_ABDOMEN |
CoverageMask.OUTERWEAR_UPPER_ARMS |
CoverageMask.OUTERWEAR_LOWER_ARMS
)
if self.value == robe_components.value:
return True
# Fallback: 6+ coverage areas as general indicator
# but log it for investigation
coverage_count = bin(self.value).count('1')
if coverage_count >= 6:
logger.info(f"Potential robe detected with {coverage_count} coverage areas: 0x{self.value:08X} (not exact pattern 0x{MAG_ROBE_PATTERN:08X})")
return True
return False
def to_slot_name(self) -> Optional[str]:
"""Convert single coverage mask to equipment slot name."""
mapping = {
CoverageMask.HEAD: "Head",
CoverageMask.CHEST: "Chest",
CoverageMask.UPPER_ARMS: "Upper Arms",
CoverageMask.LOWER_ARMS: "Lower Arms",
CoverageMask.HANDS: "Hands",
CoverageMask.ABDOMEN: "Abdomen",
CoverageMask.UPPER_LEGS: "Upper Legs",
CoverageMask.LOWER_LEGS: "Lower Legs",
CoverageMask.FEET: "Feet"
}
# Only works for single coverage
if self in mapping:
return mapping[self]
return None
@dataclass
class SuitItem:
"""Processed item ready for optimization."""
id: int
name: str
character_name: str
slot: str # Equipment slot name
coverage: Optional[CoverageMask] = None # For armor items
set_id: Optional[int] = None
armor_level: int = 0
ratings: Dict[str, int] = field(default_factory=dict)
spell_bitmap: int = 0
spell_names: List[str] = field(default_factory=list)
is_locked: bool = False # For user-locked items
material: Optional[str] = None # Material type for reduction eligibility
def __hash__(self):
"""Make item hashable for set operations."""
return hash(self.id)
@property
def is_armor(self) -> bool:
"""Check if this item provides armor (for Mag-SuitBuilder compatibility)."""
return self.armor_level > 0
@dataclass
class ItemBucket:
"""Container for items that can fit in a specific slot."""
slot: str
items: List[SuitItem] = field(default_factory=list)
is_armor: bool = False
is_required: bool = False # Some slots might be required by constraints
def sort_items(self):
"""Sort items by priority based on slot type."""
if self.slot in ['Shirt', 'Pants']:
# Underclothes: damage_rating first, ignore armor_level (buffed armor irrelevant)
self.items.sort(
key=lambda item: (
item.ratings.get('damage_rating', 0),
len(item.spell_names),
sum(r for k, r in item.ratings.items() if k != 'damage_rating')
),
reverse=True
)
elif self.is_armor:
# Armor: armor_level first, then crit damage, then spells
self.items.sort(
key=lambda item: (
item.armor_level,
item.ratings.get('crit_damage_rating', 0),
len(item.spell_names),
sum(item.ratings.values())
),
reverse=True
)
else:
# Jewelry: spells first, then total ratings
self.items.sort(
key=lambda item: (
len(item.spell_names),
sum(item.ratings.values())
),
reverse=True
)
class SpellBitmapIndex:
"""Maps spell names to bit positions for O(1) overlap detection."""
def __init__(self):
self.spell_to_bit: Dict[str, int] = {}
self.bit_to_spell: Dict[int, str] = {}
self._next_bit = 0
def register_spell(self, spell_name: str) -> int:
"""Register a spell and return its bit position."""
if spell_name not in self.spell_to_bit:
if self._next_bit >= 64:
# For more than 64 spells, we'd need to use multiple integers
logger.warning(f"More than 64 unique spells detected. Spell: {spell_name}")
bit_position = 1 << self._next_bit
self.spell_to_bit[spell_name] = bit_position
self.bit_to_spell[bit_position] = spell_name
self._next_bit += 1
return self.spell_to_bit[spell_name]
def get_bitmap(self, spells: List[str]) -> int:
"""Convert spell list to bitmap representation."""
bitmap = 0
for spell in spells:
bitmap |= self.register_spell(spell)
return bitmap
def get_spell_names(self, bitmap: int) -> List[str]:
"""Convert bitmap back to spell names."""
spells = []
for bit, spell in self.bit_to_spell.items():
if bitmap & bit:
spells.append(spell)
return spells
def would_add_needed_spell(self, item_bitmap: int, needed_bitmap: int, current_bitmap: int) -> bool:
"""Check if item adds any needed spell not already covered."""
# New spells the item would add
new_spells = item_bitmap & ~current_bitmap
# Check if any new spells are needed
return bool(new_spells & needed_bitmap)
@dataclass
class SuitState:
"""Mutable state during search."""
items: Dict[str, SuitItem] = field(default_factory=dict) # slot -> item
spell_bitmap: int = 0
set_counts: Dict[int, int] = field(default_factory=dict) # set_id -> count
total_armor: int = 0
total_ratings: Dict[str, int] = field(default_factory=dict)
occupied_slots: Set[str] = field(default_factory=set)
def push(self, item: SuitItem) -> None:
"""Add item to suit (modifies state)."""
self.items[item.slot] = item
self.occupied_slots.add(item.slot)
self.spell_bitmap |= item.spell_bitmap
# Update set counts
if item.set_id:
self.set_counts[item.set_id] = self.set_counts.get(item.set_id, 0) + 1
logger.debug(f"[STATE] Added item with set_id {item.set_id}, set_counts now: {self.set_counts}")
# Update totals
self.total_armor += item.armor_level
for rating_name, value in item.ratings.items():
self.total_ratings[rating_name] = self.total_ratings.get(rating_name, 0) + value
def pop(self, slot: str) -> Optional[SuitItem]:
"""Remove item from slot (modifies state)."""
if slot not in self.items:
return None
item = self.items.pop(slot)
self.occupied_slots.remove(slot)
# Rebuild spell bitmap (can't just subtract due to overlaps)
self.spell_bitmap = 0
for remaining_item in self.items.values():
self.spell_bitmap |= remaining_item.spell_bitmap
# Update set counts
if item.set_id:
self.set_counts[item.set_id] -= 1
if self.set_counts[item.set_id] == 0:
del self.set_counts[item.set_id]
# Update totals
self.total_armor -= item.armor_level
for rating_name, value in item.ratings.items():
if rating_name in self.total_ratings:
self.total_ratings[rating_name] -= value
if self.total_ratings[rating_name] <= 0:
del self.total_ratings[rating_name]
return item
def clone(self) -> 'SuitState':
"""Deep copy for branching."""
new_state = SuitState()
new_state.items = self.items.copy()
new_state.spell_bitmap = self.spell_bitmap
new_state.set_counts = self.set_counts.copy()
new_state.total_armor = self.total_armor
new_state.total_ratings = self.total_ratings.copy()
new_state.occupied_slots = self.occupied_slots.copy()
return new_state
class ScoringWeights(BaseModel):
"""Configurable scoring weights."""
armor_set_complete: int = 1000 # Complete sets (primary/secondary)
missing_set_penalty: int = -200 # Missing set pieces penalty
crit_damage_1: int = 10 # CD1 rating points
crit_damage_2: int = 20 # CD2 rating points
damage_rating_1: int = 10 # DR1 on clothes
damage_rating_2: int = 20 # DR2 on clothes
damage_rating_3: int = 30 # DR3 on clothes
class SearchConstraints(BaseModel):
"""User-defined search constraints."""
characters: List[str]
primary_set: Optional[int] = None
secondary_set: Optional[int] = None
required_spells: List[str] = field(default_factory=list)
locked_items: Dict[str, int] = field(default_factory=dict) # slot -> item_id
include_equipped: bool = True
include_inventory: bool = True
min_armor: Optional[int] = None
max_armor: Optional[int] = None
min_crit_damage: Optional[int] = None
max_crit_damage: Optional[int] = None
min_damage_rating: Optional[int] = None
max_damage_rating: Optional[int] = None
scoring_weights: Optional[ScoringWeights] = None
max_results: int = 50
search_timeout: int = 300 # seconds
@dataclass
class CompletedSuit:
"""Final suit result."""
items: Dict[str, SuitItem]
score: int
total_armor: int
total_ratings: Dict[str, int]
set_counts: Dict[int, int]
fulfilled_spells: List[str]
missing_spells: List[str]
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
# Build transfer summary
transfer_by_character = {}
total_items = 0
for slot, item in self.items.items():
character = item.character_name
if character not in transfer_by_character:
transfer_by_character[character] = []
transfer_by_character[character].append(item.name)
total_items += 1
# Build transfer instructions
instructions = []
step = 1
for character, items in sorted(transfer_by_character.items()):
for item_name in items:
instructions.append(f"{step}. Transfer {item_name} from {character} to new character")
step += 1
instructions.append(f"{step}. Equip all transferred items on new character")
return {
"id": hash(tuple(sorted(self.items.keys()))), # Generate ID from slots
"score": self.score,
"items": {
slot: {
"id": item.id,
"name": item.name,
"source_character": item.character_name,
"armor_level": item.armor_level,
"ratings": item.ratings,
"spells": item.spell_names,
"set_id": item.set_id,
"set_name": get_set_name(item.set_id) # Translate set ID to name
}
for slot, item in self.items.items()
},
"stats": {
"total_armor": self.total_armor,
"total_crit_damage": self.total_ratings.get('crit_damage_rating', 0),
"total_damage_rating": self.total_ratings.get('damage_rating', 0),
"primary_set_count": 0,
"secondary_set_count": 0,
"spell_coverage": len(self.fulfilled_spells)
},
"missing": self.missing_spells,
"notes": [],
"transfer_summary": {
"total_items": total_items,
"from_characters": transfer_by_character
},
"instructions": instructions
}
class SearchResult(BaseModel):
"""Result yielded during search."""
type: str # "suit", "progress", "complete", "error"
data: Any
class ItemPreFilter:
"""Pre-filtering system to remove dominated items before search."""
@staticmethod
def remove_surpassed_items(items: List[SuitItem]) -> List[SuitItem]:
"""Remove items dominated by better alternatives (LeanMyWorldObjectExtensions.cs:9-24)"""
filtered_items = []
for item in items:
is_surpassed = False
for compare_item in items:
if compare_item == item:
continue
if ItemPreFilter._is_surpassed_by(item, compare_item):
is_surpassed = True
break
if not is_surpassed:
filtered_items.append(item)
logger.info(f"Pre-filter: {len(items)} -> {len(filtered_items)} items (removed {len(items) - len(filtered_items)} surpassed)")
return filtered_items
@staticmethod
def _is_surpassed_by(item: SuitItem, compare_item: SuitItem) -> bool:
"""Check if item is dominated by compare_item (LeanMyWorldObject.cs:90-147)"""
# Items must be same slot to be comparable
if item.slot != compare_item.slot:
return False
# Items must be same set to be comparable (or both no-set)
if item.set_id != compare_item.set_id:
return False
# Compare spells (higher level cantrips surpass lower)
if not ItemPreFilter._spells_surpass_or_equal(compare_item.spell_names, item.spell_names):
return False
# Compare ratings - compare_item must be better in at least one category
better_in_something = False
for rating_key in ['crit_damage_rating', 'damage_rating']:
item_rating = item.ratings.get(rating_key, 0)
compare_rating = compare_item.ratings.get(rating_key, 0)
if compare_rating > item_rating:
better_in_something = True
elif item_rating > compare_rating:
return False # Item is better in this category
# Also compare armor level for armor pieces
if item.armor_level > 0 and compare_item.armor_level > 0:
if compare_item.armor_level > item.armor_level:
better_in_something = True
elif item.armor_level > compare_item.armor_level:
return False
return better_in_something
@staticmethod
def _spells_surpass_or_equal(spells1: List[str], spells2: List[str]) -> bool:
"""Check if spells1 surpass or equal spells2"""
# For each spell in spells2, find equal or better in spells1
for spell2 in spells2:
found_surpassing = False
for spell1 in spells1:
if spell1 == spell2 or ItemPreFilter._spell_surpasses(spell1, spell2):
found_surpassing = True
break
if not found_surpassing:
return False
return True
@staticmethod
def _spell_surpasses(spell1: str, spell2: str) -> bool:
"""Check if spell1 surpasses spell2 (higher level of same type)"""
# Epic surpasses Major, Legendary surpasses Epic, etc.
if "Legendary" in spell1 and ("Epic" in spell2 or "Major" in spell2):
base1 = spell1.replace("Legendary ", "")
base2 = spell2.replace("Epic ", "").replace("Major ", "")
return base1 == base2
if "Epic" in spell1 and "Major" in spell2:
base1 = spell1.replace("Epic ", "")
base2 = spell2.replace("Major ", "")
return base1 == base2
return False
class ConstraintSatisfactionSolver:
"""Main optimization solver."""
def __init__(self, constraints: SearchConstraints, is_cancelled=None):
self.constraints = constraints
self.spell_index = SpellBitmapIndex()
self.best_suits: List[CompletedSuit] = []
self.suits_evaluated = 0
self.start_time = time.time()
self.scoring_weights = constraints.scoring_weights or ScoringWeights()
self.search_completed = False
self.is_cancelled = is_cancelled # Callback to check if search should stop
# Pre-compute needed spell bitmap
self.needed_spell_bitmap = self.spell_index.get_bitmap(constraints.required_spells)
logger.info(f"[SPELL_CONSTRAINTS_DEBUG] Required spells: {constraints.required_spells}")
logger.info(f"[SPELL_CONSTRAINTS_DEBUG] Needed spell bitmap: {self.needed_spell_bitmap}")
async def search(self) -> AsyncGenerator[SearchResult, None]:
"""Main search entry point with streaming results."""
try:
# Load and preprocess items
items = await self.load_items()
logger.info(f"Loaded {len(items)} items for optimization")
if not items:
yield SearchResult(type="error", data={"message": "No items found for specified characters"})
return
# Create buckets
buckets = self.create_buckets(items)
logger.info(f"Created {len(buckets)} equipment buckets")
# Apply armor reduction rules
buckets = self.apply_reduction_options(buckets)
# Sort buckets
buckets = self.sort_buckets(buckets)
# Start recursive search
initial_state = SuitState()
# Apply locked items
for slot, item_id in self.constraints.locked_items.items():
# Find the locked item
for bucket in buckets:
if bucket.slot == slot:
for item in bucket.items:
if item.id == item_id:
item.is_locked = True
initial_state.push(item)
break
# Start search
logger.info(f"Starting recursive search with {len(buckets)} buckets")
yield SearchResult(type="progress", data={
"message": "Search started",
"buckets": len(buckets),
"evaluated": 0,
"found": 0,
"elapsed": 0.0
})
logger.info("Starting async iteration over recursive search")
async for result in self.recursive_search(buckets, 0, initial_state):
yield result
logger.info(f"Recursive search completed, sending final results. Found {len(self.best_suits)} suits")
# Always send final results
yield SearchResult(
type="complete",
data={
"suits_found": len(self.best_suits),
"duration": round(time.time() - self.start_time, 2)
}
)
except Exception as e:
logger.error(f"Search error: {e}", exc_info=True)
yield SearchResult(type="error", data={"message": str(e)})
async def load_items_OLD_BROKEN(self) -> List[SuitItem]:
"""OLD BROKEN METHOD - REPLACED WITH API CALL"""
pass
async def load_items(self) -> List[SuitItem]:
"""Load items using the working inventory API with HTTP calls."""
logger.info("[DEBUG] load_items() method called - starting item loading process")
try:
import urllib.request
import urllib.parse
import json as json_module
import asyncio
# Get user's set names for filtering
from main import translate_equipment_set_id
primary_set_name = translate_equipment_set_id(str(self.constraints.primary_set)) if self.constraints.primary_set else None
secondary_set_name = translate_equipment_set_id(str(self.constraints.secondary_set)) if self.constraints.secondary_set else None
logger.info(f"LOADING ITEMS VIA API: Primary='{primary_set_name}', Secondary='{secondary_set_name}'")
def fetch_set_items(set_name: str) -> list:
"""Synchronous helper to fetch items for a set."""
# Determine equipment status filter
equipment_status = None
if self.constraints.include_equipped and self.constraints.include_inventory:
# Both - no filter needed (default API behavior)
equipment_status_log = "both equipped and inventory items"
elif self.constraints.include_equipped:
equipment_status = "equipped"
equipment_status_log = "equipped items only"
elif self.constraints.include_inventory:
equipment_status = "unequipped"
equipment_status_log = "inventory items only"
else:
equipment_status_log = "no items (neither equipped nor inventory selected)"
# Build URL with selected characters or all characters
base_params = []
if self.constraints.characters:
# Use comma-separated list in single characters parameter
characters_str = ",".join(self.constraints.characters)
base_params.append(f"characters={urllib.parse.quote(characters_str)}")
logger.info(f"Fetching set items for {len(self.constraints.characters)} selected characters: {self.constraints.characters}")
else:
base_params.append("include_all_characters=true")
logger.info(f"Fetching set items for ALL characters")
# Add item set filter
base_params.append(f"item_set={urllib.parse.quote(set_name)}")
# Add equipment status filter if needed
if equipment_status:
base_params.append(f"equipment_status={equipment_status}")
# Add limit
base_params.append("limit=1000")
# Build final URL - use internal port 8000 since we're calling ourselves
url = f"http://localhost:8000/search/items?{'&'.join(base_params)}"
logger.info(f"Equipment status filter: {equipment_status_log}")
logger.info(f"Fetching set items from: {url}")
with urllib.request.urlopen(url) as response:
data = json_module.load(response)
items = data.get('items', [])
logger.info(f"Set ({set_name}) with {equipment_status_log}: {len(items)} items returned")
return items
# Use the working inventory API to get items for both sets
all_api_items = []
# Primary set items
if primary_set_name:
primary_items = await asyncio.get_event_loop().run_in_executor(None, fetch_set_items, primary_set_name)
all_api_items.extend(primary_items)
# Secondary set items
if secondary_set_name:
secondary_items = await asyncio.get_event_loop().run_in_executor(None, fetch_set_items, secondary_set_name)
all_api_items.extend(secondary_items)
# Clothing items (shirts and pants) - Use separate API endpoints
def fetch_clothing_items() -> list:
"""Synchronous helper to fetch clothing items using shirt_only and pants_only endpoints."""
# Determine equipment status filter
equipment_status = None
if self.constraints.include_equipped and self.constraints.include_inventory:
equipment_status_log = "both equipped and inventory items"
elif self.constraints.include_equipped:
equipment_status = "equipped"
equipment_status_log = "equipped items only"
elif self.constraints.include_inventory:
equipment_status = "unequipped"
equipment_status_log = "inventory items only"
else:
equipment_status_log = "no items (neither equipped nor inventory selected)"
# Build base params with selected characters or all characters
def build_base_params():
base_params = []
if self.constraints.characters:
characters_str = ",".join(self.constraints.characters)
base_params.append(f"characters={urllib.parse.quote(characters_str)}")
else:
base_params.append("include_all_characters=true")
# Add equipment status filter if needed
if equipment_status:
base_params.append(f"equipment_status={equipment_status}")
# Add limit
base_params.append("limit=1000")
return base_params
all_clothing_items = []
# Fetch shirts using shirt_only endpoint - only DR3 for optimization
shirt_params = build_base_params()
shirt_params.append("shirt_only=true")
shirt_params.append("min_damage_rating=3") # Only load DR3 shirts
shirt_url = f"http://localhost:8000/search/items?{'&'.join(shirt_params)}"
logger.info(f"Fetching shirt items with {equipment_status_log}")
logger.info(f"Fetching shirts from: {shirt_url}")
try:
with urllib.request.urlopen(shirt_url) as response:
data = json_module.load(response)
shirt_items = data.get('items', [])
logger.info(f"Shirt items with {equipment_status_log}: {len(shirt_items)} items returned")
all_clothing_items.extend(shirt_items)
except Exception as e:
logger.error(f"Error fetching shirts: {e}")
# Fetch pants using pants_only endpoint - only DR3 for optimization
pants_params = build_base_params()
pants_params.append("pants_only=true")
pants_params.append("min_damage_rating=3") # Only load DR3 pants
pants_url = f"http://localhost:8000/search/items?{'&'.join(pants_params)}"
logger.info(f"Fetching pants items with {equipment_status_log}")
logger.info(f"Fetching pants from: {pants_url}")
try:
with urllib.request.urlopen(pants_url) as response:
data = json_module.load(response)
pants_items = data.get('items', [])
logger.info(f"Pants items with {equipment_status_log}: {len(pants_items)} items returned")
all_clothing_items.extend(pants_items)
except Exception as e:
logger.error(f"Error fetching pants: {e}")
logger.info(f"Total clothing items fetched: {len(all_clothing_items)}")
return all_clothing_items
# Jewelry items (rings, necklaces, bracelets, trinkets)
def fetch_all_jewelry_items() -> list:
"""Synchronous helper to fetch jewelry items with cantrips/wards."""
logger.info("[DEBUG] fetch_all_jewelry_items() function called - starting execution")
# Determine equipment status filter
equipment_status = None
if self.constraints.include_equipped and self.constraints.include_inventory:
equipment_status_log = "both equipped and inventory items"
elif self.constraints.include_equipped:
equipment_status = "equipped"
equipment_status_log = "equipped items only"
elif self.constraints.include_inventory:
equipment_status = "unequipped"
equipment_status_log = "inventory items only"
else:
equipment_status_log = "no items (neither equipped nor inventory selected)"
# Build base params with selected characters or all characters
def build_base_params():
base_params = []
if self.constraints.characters:
characters_str = ",".join(self.constraints.characters)
base_params.append(f"characters={urllib.parse.quote(characters_str)}")
else:
base_params.append("include_all_characters=true")
# Add equipment status filter if needed
if equipment_status:
base_params.append(f"equipment_status={equipment_status}")
# Add limit
base_params.append("limit=1000")
return base_params
all_jewelry_items = []
# Fetch jewelry items (object_class=4) - all jewelry for now
logger.info("[DEBUG] Building jewelry API parameters")
jewelry_params = build_base_params()
jewelry_params.append("object_class=4") # Jewelry
# Note: has_spells filter doesn't seem to work, so load all jewelry for now
jewelry_url = f"http://localhost:8000/search/items?{'&'.join(jewelry_params)}"
logger.info(f"[DEBUG] Constructed jewelry URL: {jewelry_url}")
logger.info(f"Fetching jewelry items with {equipment_status_log}")
logger.info(f"Fetching jewelry from: {jewelry_url}")
try:
with urllib.request.urlopen(jewelry_url) as response:
data = json_module.load(response)
jewelry_items = data.get('items', [])
logger.info(f"Jewelry items with {equipment_status_log}: {len(jewelry_items)} items returned")
all_jewelry_items.extend(jewelry_items)
except Exception as e:
logger.error(f"Error fetching jewelry: {e}")
logger.info(f"Total jewelry items fetched: {len(all_jewelry_items)}")
return all_jewelry_items
# Fetch clothing items
clothing_items = await asyncio.get_event_loop().run_in_executor(None, fetch_clothing_items)
all_api_items.extend(clothing_items)
# Fetch jewelry items
logger.info("[DEBUG] About to call fetch_all_jewelry_items() with asyncio executor")
try:
jewelry_items = await asyncio.get_event_loop().run_in_executor(None, fetch_all_jewelry_items)
logger.info(f"[DEBUG] fetch_all_jewelry_items() completed successfully, returned {len(jewelry_items)} items")
all_api_items.extend(jewelry_items)
except Exception as e:
logger.error(f"[DEBUG] Exception in fetch_all_jewelry_items() executor: {e}", exc_info=True)
jewelry_items = [] # Continue with empty list if jewelry fetch fails
logger.info(f"Total items from inventory API: {len(all_api_items)}")
# Convert to SuitItem objects
items = []
for api_item in all_api_items:
# The inventory API provides rich data with all fields we need!
# Use character_name + name as unique identifier since API doesn't return ID
unique_id = f"{api_item['character_name']}_{api_item['name']}"
# Parse coverage data for armor reduction
coverage_value = api_item.get('coverage_mask', 0)
coverage = CoverageMask(coverage_value) if coverage_value else None
# Use computed_slot_name from API if available, otherwise fallback to slot_name
slot_name = api_item.get('computed_slot_name') or api_item.get('slot_name', 'Unknown')
# For underclothes, ensure we get simple slot names
if api_item.get('object_class') == 3: # Clothing
coverage_mask = api_item.get('coverage_mask', 0)
if coverage_mask == 104: # Shirt pattern
slot_name = "Shirt"
elif coverage_mask in [19, 22]: # Pants/breeches patterns
slot_name = "Pants"
suit_item = SuitItem(
id=hash(unique_id), # Generate ID from character + item name
name=api_item['name'],
character_name=api_item['character_name'],
slot=slot_name, # Use computed slot or coverage-based slot for underclothes
coverage=coverage, # Now properly loaded from API
set_id=self._convert_set_name_to_id(api_item.get('item_set')), # Convert set name to numeric ID
armor_level=api_item.get('armor_level', 0),
ratings={
'crit_damage_rating': api_item.get('crit_damage_rating') if api_item.get('crit_damage_rating') is not None else 0,
'damage_rating': api_item.get('damage_rating') if api_item.get('damage_rating') is not None else 0,
'damage_resist_rating': api_item.get('damage_resist_rating') if api_item.get('damage_resist_rating') is not None else 0,
'crit_damage_resist_rating': api_item.get('crit_damage_resist_rating') if api_item.get('crit_damage_resist_rating') is not None else 0,
'heal_boost_rating': api_item.get('heal_boost_rating') if api_item.get('heal_boost_rating') is not None else 0,
'vitality_rating': api_item.get('vitality_rating') if api_item.get('vitality_rating') is not None else 0
},
spell_bitmap=0, # Will calculate if needed
spell_names=api_item.get('spell_names', []),
material=api_item.get('material_name', '')
)
items.append(suit_item)
# Log comprehensive stats
slot_counts = {}
set_counts = {}
for item in items:
slot_counts[item.slot] = slot_counts.get(item.slot, 0) + 1
if item.set_id:
set_counts[item.set_id] = set_counts.get(item.set_id, 0) + 1
logger.info(f"LOADED FROM API: {len(items)} total items")
logger.info(f"SLOT DISTRIBUTION: {slot_counts}")
logger.info(f"SET DISTRIBUTION: {set_counts}")
# Calculate spell bitmaps for all items
spell_items_count = 0
for item in items:
if item.spell_names:
item.spell_bitmap = self.spell_index.get_bitmap(item.spell_names)
spell_items_count += 1
logger.info(f"[SPELL] {item.name}: {item.spell_names} -> bitmap {item.spell_bitmap}")
# CRITICAL DEBUG: Check for Legendary Two Handed Combat specifically
if "Legendary Two Handed Combat" in item.spell_names:
logger.info(f"[LEGENDARY_TWO_HANDED_DEBUG] Found item with Legendary Two Handed Combat: {item.name} (set_id: {item.set_id}, spell_bitmap: {item.spell_bitmap})")
logger.info(f"SPELL PROCESSING: {spell_items_count} items with spells processed")
# Apply pre-filtering to remove dominated items
filtered_items = ItemPreFilter.remove_surpassed_items(items)
# Sort items for optimal search order
armor_items = [item for item in filtered_items if item.slot in {
"Head", "Chest", "Upper Arms", "Lower Arms", "Hands",
"Abdomen", "Upper Legs", "Lower Legs", "Feet"
}]
jewelry_items = [item for item in filtered_items if item.slot in {
"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"
}]
clothing_items = [item for item in filtered_items if item.slot in {"Shirt", "Pants"}]
# Sort armor by spell count (most spells first) since armor level deprioritized
armor_items.sort(key=lambda x: len(x.spell_names), reverse=True)
# Sort jewelry by spell count (most spells first)
jewelry_items.sort(key=lambda x: len(x.spell_names), reverse=True)
# Sort clothing by damage rating (highest first)
clothing_items.sort(key=lambda x: x.ratings.get('damage_rating', 0), reverse=True)
# Recombine in optimized order
optimized_items = armor_items + jewelry_items + clothing_items
logger.info(f"ITEM SORTING: {len(armor_items)} armor, {len(jewelry_items)} jewelry, {len(clothing_items)} clothing")
return optimized_items
except Exception as e:
logger.error(f"Error calling inventory API: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to load items from API: {str(e)}")
# Removed _determine_equipment_slot - now using SQL computed slots from main service
def create_buckets(self, items: List[SuitItem]) -> List[ItemBucket]:
"""Group items by equipment slot."""
# Define all possible slots (ARMOR + JEWELRY + CLOTHING)
all_slots = [
# Armor slots (9)
"Head", "Chest", "Upper Arms", "Lower Arms", "Hands",
"Abdomen", "Upper Legs", "Lower Legs", "Feet",
# Jewelry slots (6)
"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket",
# Clothing slots (2)
"Shirt", "Pants"
]
armor_slots = {
"Head", "Chest", "Upper Arms", "Lower Arms", "Hands",
"Abdomen", "Upper Legs", "Lower Legs", "Feet"
}
jewelry_slots = {
"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"
}
# Group items by slot
slot_items = {slot: [] for slot in all_slots}
for item in items:
# Special debugging for problematic robe
if "Empowered Robe of the Perfect Light" in item.name:
logger.info(f"[DEBUG] Analyzing problematic robe: {item.name}")
if item.coverage:
logger.info(f"[DEBUG] - Coverage mask: 0x{item.coverage.value:08X}")
logger.info(f"[DEBUG] - Coverage bits count: {bin(item.coverage.value).count('1')}")
logger.info(f"[DEBUG] - Matches exact robe pattern (0x00013F00): {item.coverage.value == 0x00013F00}")
logger.info(f"[DEBUG] - is_robe() result: {item.coverage.is_robe()}")
else:
logger.info(f"[DEBUG] - Coverage mask: None")
logger.info(f"[DEBUG] - Slot: {item.slot}")
logger.info(f"[DEBUG] - Coverage object: {item.coverage}")
# Skip robe detection - user's chosen armor sets don't include robes
# If user specifically chooses a set that contains robes, they should get those items
# Handle both single-slot and multi-slot items
if item.slot in slot_items:
# Single slot item - direct assignment
slot_items[item.slot].append(item)
elif ', ' in item.slot:
# Multi-slot item - create single-slot variants for each applicable slot
possible_slots = [s.strip() for s in item.slot.split(', ')]
added_to_slots = []
for possible_slot in possible_slots:
if possible_slot in slot_items:
# Create a single-slot variant of the item
single_slot_item = SuitItem(
id=item.id,
name=item.name,
character_name=item.character_name,
slot=possible_slot, # Single slot assignment
coverage=item.coverage,
set_id=item.set_id,
armor_level=item.armor_level,
ratings=item.ratings.copy(),
spell_bitmap=item.spell_bitmap,
spell_names=item.spell_names.copy(),
material=item.material
)
slot_items[possible_slot].append(single_slot_item)
added_to_slots.append(possible_slot)
if added_to_slots:
logger.debug(f"Multi-slot item {item.name} split into slots: {added_to_slots}")
else:
logger.warning(f"Multi-slot item {item.name} with slots '{item.slot}' couldn't be mapped to any valid slots")
else:
# Check for complex slot patterns that might not use comma separation
# Handle items with complex slot descriptions that the SQL computed incorrectly
mapped_slots = []
# Check if any of our known slots are mentioned in the slot string
for known_slot in all_slots:
if known_slot.lower() in item.slot.lower():
# Create a single-slot variant of the item
single_slot_item = SuitItem(
id=item.id,
name=item.name,
character_name=item.character_name,
slot=known_slot, # Single slot assignment
coverage=item.coverage,
set_id=item.set_id,
armor_level=item.armor_level,
ratings=item.ratings.copy(),
spell_bitmap=item.spell_bitmap,
spell_names=item.spell_names.copy(),
material=item.material
)
slot_items[known_slot].append(single_slot_item)
mapped_slots.append(known_slot)
if mapped_slots:
logger.debug(f"Complex slot item {item.name} split into slots: {mapped_slots} (original: '{item.slot}')")
else:
logger.warning(f"Unknown slot '{item.slot}' for item {item.name} - could not map to any known slots")
# Create buckets - CRITICAL: Create ALL buckets even if empty (MagSuitBuilder behavior)
buckets = []
for slot in all_slots:
bucket = ItemBucket(
slot=slot,
items=slot_items[slot],
is_armor=(slot in armor_slots),
is_required=False # We'll mark required slots later based on constraints
)
# Sort items within bucket by priority
bucket.sort_items()
# ALWAYS add bucket - even empty ones (required for complete search)
buckets.append(bucket)
if len(bucket.items) > 0:
logger.info(f"Created bucket for {slot} with {len(bucket.items)} items")
else:
logger.info(f"Created EMPTY bucket for {slot} (will allow incomplete suits)")
# Sort buckets: armor first, then by item count (MagSuitBuilder ArmorSearcher.cs:95-100)
buckets.sort(key=lambda b: (
0 if b.is_armor else 1, # Armor buckets first
len(b.items) # Then by item count (smallest first for better pruning)
))
logger.info(f"CREATED {len(buckets)} total buckets (including {len([b for b in buckets if len(b.items) == 0])} empty)")
logger.info(f"BUCKET ORDER: {[f'{b.slot}({len(b.items)})' for b in buckets]}")
return buckets
def apply_reduction_options(self, buckets: List[ItemBucket]) -> List[ItemBucket]:
"""Apply armor reduction rules for multi-coverage items."""
new_buckets = []
for bucket in buckets:
if not bucket.is_armor:
# Non-armor items don't need reduction
new_buckets.append(bucket)
continue
# Process armor items for potential reduction
original_items = []
reducible_items = []
for item in bucket.items:
# Debug logging for reduction evaluation
has_coverage = item.coverage is not None
has_material = hasattr(item, 'material') and item.material
reduction_options = item.coverage.reduction_options() if item.coverage else []
logger.info(f"Reduction check for '{item.name}' in {bucket.slot}: "
f"coverage={item.coverage.value if item.coverage else None}, "
f"material='{item.material}', "
f"reductions={len(reduction_options)}")
# Check if item can be reduced based on Mag-SuitBuilder rules:
# 1. Must have coverage data
# 2. Must have material (only loot-generated items can be tailored)
# 3. Must have valid reduction options
if (item.coverage and
hasattr(item, 'material') and item.material and # Only items with materials can be tailored
item.coverage.reduction_options()):
# Item can be reduced - we'll add it to multiple buckets
logger.info(f"Item '{item.name}' is reducible to: {[r.to_slot_name() for r in reduction_options]}")
reducible_items.append(item)
else:
# Item fits exactly in this slot or cannot be reduced
original_items.append(item)
# Keep original items in this bucket
if original_items or not reducible_items:
new_bucket = ItemBucket(
slot=bucket.slot,
items=original_items,
is_armor=bucket.is_armor,
is_required=bucket.is_required
)
new_bucket.sort_items()
new_buckets.append(new_bucket)
# Add reducible items to appropriate buckets
for item in reducible_items:
reduction_options = item.coverage.reduction_options()
for reduced_coverage in reduction_options:
reduced_slot = reduced_coverage.to_slot_name()
if not reduced_slot:
continue
# Create a reduced version of the item
reduced_item = SuitItem(
id=item.id,
name=f"{item.name} (tailored to {reduced_slot})",
character_name=item.character_name,
slot=reduced_slot,
coverage=reduced_coverage,
set_id=item.set_id,
armor_level=item.armor_level,
ratings=item.ratings.copy(),
spell_bitmap=item.spell_bitmap,
spell_names=item.spell_names.copy(),
is_locked=item.is_locked,
material=item.material
)
# Find or create bucket for this slot
target_bucket = None
for existing_bucket in new_buckets:
if existing_bucket.slot == reduced_slot:
target_bucket = existing_bucket
break
if not target_bucket:
target_bucket = ItemBucket(
slot=reduced_slot,
items=[],
is_armor=True,
is_required=False
)
new_buckets.append(target_bucket)
target_bucket.items.append(reduced_item)
# Re-sort all buckets after adding reduced items
for bucket in new_buckets:
bucket.sort_items()
# Count reduction statistics
original_items_count = sum(len(bucket.items) for bucket in buckets)
new_items_count = sum(len(bucket.items) for bucket in new_buckets)
logger.info(f"Applied reductions: {len(buckets)} original buckets -> {len(new_buckets)} buckets")
logger.info(f"Item count: {original_items_count} original -> {new_items_count} total (including reductions)")
return new_buckets
def sort_buckets(self, buckets: List[ItemBucket]) -> List[ItemBucket]:
"""Sort buckets for optimal search order and prioritize user's chosen sets."""
# First, sort items within each bucket to prioritize user's chosen sets
for bucket in buckets:
bucket.items.sort(key=lambda item: (
# Priority 1: User's primary set
0 if item.set_id == self.constraints.primary_set else
# Priority 2: User's secondary set
1 if item.set_id == self.constraints.secondary_set else
# Priority 3: Other items
2,
-item.ratings.get('crit_damage_rating', 0), # Higher crit damage first (CD2 > CD1 > CD0)
-item.ratings.get('damage_rating', 0), # Higher damage rating next
-item.armor_level # Higher armor within same priority
))
# Prioritize core armor slots, then jewelry, then clothing
core_armor_priority = ['Chest', 'Head', 'Hands', 'Feet', 'Upper Arms', 'Lower Arms', 'Abdomen', 'Upper Legs', 'Lower Legs']
jewelry_slots = ['Neck', 'Left Ring', 'Right Ring', 'Left Wrist', 'Right Wrist', 'Trinket']
clothing_slots = ['Shirt', 'Pants']
# Sort buckets by priority order and item count
def bucket_priority(bucket):
if bucket.slot in core_armor_priority:
return (0, core_armor_priority.index(bucket.slot), len(bucket.items))
elif bucket.slot in jewelry_slots:
return (1, jewelry_slots.index(bucket.slot), len(bucket.items))
elif bucket.slot in clothing_slots:
return (2, clothing_slots.index(bucket.slot), len(bucket.items))
else:
return (3, 0, len(bucket.items))
sorted_buckets = sorted(buckets, key=bucket_priority)
logger.info(f"Bucket search order: {[f'{b.slot}({len(b.items)})' for b in sorted_buckets[:10]]}")
return sorted_buckets
async def recursive_search(self, buckets: List[ItemBucket], bucket_idx: int,
state: SuitState) -> AsyncGenerator[SearchResult, None]:
"""Depth-first search with pruning and streaming."""
# Check for cancellation
if self.is_cancelled and await self.is_cancelled():
logger.info("Search cancelled by client")
return
# No timeout - search continuously until user stops
# Early success detection - stop when user's set goals are achieved
primary_count = state.set_counts.get(self.constraints.primary_set, 0) if self.constraints.primary_set else 0
secondary_count = state.set_counts.get(self.constraints.secondary_set, 0) if self.constraints.secondary_set else 0
# REMOVED: Don't stop early - we need to search deeper for complete suits
# Let the search continue to find better combinations
# REMOVED: Aggressive pruning that prevents finding complete suits
# This pruning was cutting off valid search branches too early
# We need to search deeper to find 9-piece complete suits
# Base case: all buckets processed
if bucket_idx >= len(buckets):
logger.info(f"[DEBUG] BASE CASE: All {len(buckets)} buckets processed, state has {len(state.items)} items")
suit = self.finalize_suit(state)
if suit:
logger.info(f"[DEBUG] Suit created with score {suit.score}, {len(suit.items)} items")
if self.is_better_than_existing(suit):
logger.info(f"[DEBUG] Suit ACCEPTED: score {suit.score} is better than existing")
logger.info(f"Found suit with score {suit.score}: {len(suit.items)} items")
self.best_suits.append(suit)
self.best_suits.sort(key=lambda s: s.score, reverse=True)
self.best_suits = self.best_suits[:self.constraints.max_results]
# Pass constraint info to to_dict for proper set counts - FIXED: use translated names
suit_data = suit.to_dict()
from main import translate_equipment_set_id
primary_set_name = translate_equipment_set_id(str(self.constraints.primary_set)) if self.constraints.primary_set else None
secondary_set_name = translate_equipment_set_id(str(self.constraints.secondary_set)) if self.constraints.secondary_set else None
suit_data['stats']['primary_set_count'] = suit.set_counts.get(primary_set_name, 0) if primary_set_name else 0
suit_data['stats']['secondary_set_count'] = suit.set_counts.get(secondary_set_name, 0) if secondary_set_name else 0
suit_data['stats']['primary_set'] = primary_set_name
suit_data['stats']['secondary_set'] = secondary_set_name
yield SearchResult(type="suit", data=suit_data)
# TEMPORARY FIX: Stop search after finding first suit to test completion
if len(self.best_suits) >= 1:
logger.info(f"[DEBUG] EARLY TERMINATION: Found {len(self.best_suits)} suits, stopping search for testing")
return
else:
logger.info(f"[DEBUG] Suit REJECTED: score {suit.score} not better than existing")
else:
logger.info(f"[DEBUG] No suit created from current state")
return
# Progress update and debug info
self.suits_evaluated += 1
if self.suits_evaluated % 10 == 0: # Every 10 evaluations for better granularity
# Check for cancellation during progress update
if self.is_cancelled and await self.is_cancelled():
logger.info("Search cancelled during progress update")
return
logger.info(f"Search progress: evaluated {self.suits_evaluated}, depth {bucket_idx}/{len(buckets)}, found {len(self.best_suits)} suits, current state: {len(state.items)} items")
yield SearchResult(
type="progress",
data={
"evaluated": self.suits_evaluated,
"found": len(self.best_suits),
"current_depth": bucket_idx,
"total_buckets": len(buckets),
"current_items": len(state.items),
"elapsed": time.time() - self.start_time
}
)
bucket = buckets[bucket_idx]
# Implement Mag-SuitBuilder parallel processing for first bucket (ArmorSearcher.cs:192-210)
if bucket_idx == 0 and len(bucket.items) > 10: # Only parallelize if enough items
# For first bucket, we could use asyncio.gather for parallel processing
# but for now, keep sequential to avoid complexity with async generators
pass
# DEBUG: Log bucket processing
logger.info(f"[DEBUG] Processing bucket {bucket_idx}: {bucket.slot} with {len(bucket.items)} items")
# Continue searching to find more combinations - no aggressive pruning
# Try each item in current bucket
items_tried = 0
items_accepted = 0
for item in bucket.items:
items_tried += 1
logger.info(f"[DEBUG] Trying item {items_tried}/{len(bucket.items)} in {bucket.slot}: {item.name}")
if self.can_add_item(item, state):
items_accepted += 1
logger.info(f"[DEBUG] Item ACCEPTED: {item.name} (#{items_accepted})")
# Add item to state
state.push(item)
logger.info(f"[DEBUG] State after push: {len(state.items)} items, going to bucket {bucket_idx + 1}")
# Continue search with next bucket
recursion_count = 0
async for result in self.recursive_search(buckets, bucket_idx + 1, state):
recursion_count += 1
logger.info(f"[DEBUG] Received result #{recursion_count} from recursion")
yield result
# Remove item from state (backtrack)
state.pop(item.slot)
logger.info(f"[DEBUG] Backtracked from {item.name}, state now: {len(state.items)} items")
else:
logger.info(f"[DEBUG] Item REJECTED: {item.name}")
logger.info(f"[DEBUG] Bucket {bucket.slot} summary: {items_tried} tried, {items_accepted} accepted")
# ALWAYS try skipping buckets to allow incomplete suits
logger.info(f"[DEBUG] Trying skip bucket {bucket.slot} (bucket {bucket_idx})")
skip_recursion_count = 0
async for result in self.recursive_search(buckets, bucket_idx + 1, state):
skip_recursion_count += 1
logger.info(f"[DEBUG] Skip-bucket result #{skip_recursion_count}")
yield result
logger.info(f"[DEBUG] Skip bucket {bucket.slot} yielded {skip_recursion_count} results")
def can_add_item(self, item: SuitItem, state: SuitState) -> bool:
"""Check if item can be added without violating constraints."""
# Import translation function
def translate_equipment_set_id(set_id: str) -> str:
import main
if not hasattr(main, 'ENUM_MAPPINGS') or main.ENUM_MAPPINGS is None:
return f"Set {set_id}"
dictionaries = main.ENUM_MAPPINGS.get('dictionaries', {})
attribute_set_info = dictionaries.get('AttributeSetInfo', {}).get('values', {})
set_name = attribute_set_info.get(str(set_id))
return set_name if set_name else str(set_id)
# 1. Slot availability
if item.slot in state.occupied_slots:
logger.info(f"[DEBUG] REJECT {item.name}: slot {item.slot} already occupied")
return False
# 2. Item uniqueness - same physical item can't be used in multiple slots
for existing_item in state.items.values():
if existing_item.id == item.id:
logger.info(f"[DEBUG] REJECT {item.name}: item already used (duplicate ID)")
return False
# 3. Set piece validation - FIXED: Use numeric IDs consistently
if item.set_id:
# Convert item.set_id to numeric for comparison (it might be string or int)
try:
item_set_numeric = int(item.set_id) if isinstance(item.set_id, str) and item.set_id.isdigit() else item.set_id
except:
item_set_numeric = item.set_id
current_count = state.set_counts.get(item_set_numeric, 0)
if item_set_numeric == self.constraints.primary_set:
# Primary set: max 5 pieces
if current_count >= 5:
logger.info(f"[DEBUG] REJECT {item.name}: primary set {item_set_numeric} already has {current_count} pieces (max 5)")
return False
elif item_set_numeric == self.constraints.secondary_set:
# Secondary set: max 4 pieces
if current_count >= 4:
logger.info(f"[DEBUG] REJECT {item.name}: secondary set {item_set_numeric} already has {current_count} pieces (max 4)")
return False
else:
# Check if this is a jewelry item - always allow jewelry even if from other sets
jewelry_slots = {"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"}
if item.slot in jewelry_slots:
# Always allow jewelry items regardless of set ID (they provide spells)
logger.info(f"[DEBUG] ACCEPT {item.name}: jewelry item from set '{item_set_numeric}', allowed for spells")
else:
# STRICT: Reject armor items from other sets
# Only allow armor from the two user-selected sets
logger.info(f"[DEBUG] REJECT {item.name}: armor from other set '{item_set_numeric}', only primary '{self.constraints.primary_set}' and secondary '{self.constraints.secondary_set}' allowed")
return False
else:
# For set optimization, reject items with no set ID unless they're clothing or jewelry
jewelry_slots = {"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"}
if item.slot in ['Shirt', 'Pants']:
# Allow clothing items even without set ID
logger.info(f"[DEBUG] ACCEPT {item.name}: clothing item without set ID, allowed")
elif item.slot in jewelry_slots:
# Allow jewelry items even without set ID (they provide cantrips/wards)
logger.info(f"[DEBUG] ACCEPT {item.name}: jewelry item without set ID, allowed")
else:
# Reject armor items without set ID for set optimization
logger.info(f"[DEBUG] REJECT {item.name}: no set ID and not clothing/jewelry")
return False
# 4. Spell overlap constraints - FIXED: Only reject if item adds no value
if self.constraints.required_spells and item.spell_names:
# Check if item would add any beneficial spells
if not self._can_get_beneficial_spell_from(item, state):
# Additional check: item might still be valuable for armor/ratings
if (item.armor_level < 300 and # Low armor
item.ratings.get('crit_damage_rating', 0) == 0 and # No crit damage
item.ratings.get('damage_rating', 0) == 0 and # No damage rating
item.set_id not in [self.constraints.primary_set, self.constraints.secondary_set]): # Not from target sets
logger.info(f"[DEBUG] REJECT {item.name}: no beneficial spells and low stats")
return False
else:
logger.info(f"[DEBUG] ACCEPT {item.name}: overlapping spells but good stats/set")
logger.info(f"[DEBUG] ACCEPT {item.name}: passed all constraints")
return True
def _is_double_spell_acceptable(self, item: SuitItem, overlap: int) -> bool:
"""Check if overlapping spell is acceptable (double spell both needed)."""
# Item must have exactly 2 spells for double spell logic
if len(item.spell_names) != 2:
return False
# Both spells must be in needed constraints
needed_bitmap = self.needed_spell_bitmap
for spell in item.spell_names:
spell_bit = self.spell_index.get_bitmap([spell])
if not (spell_bit & needed_bitmap):
return False
return True
def finalize_suit(self, state: SuitState) -> Optional[CompletedSuit]:
"""Convert state to completed suit with scoring."""
if not state.items:
return None
# Calculate score based on constraints and priorities
score = self._calculate_score(state)
# Determine fulfilled and missing spells
fulfilled_spells = []
missing_spells = []
if self.constraints.required_spells:
needed_bitmap = self.needed_spell_bitmap
fulfilled_bitmap = state.spell_bitmap & needed_bitmap
missing_bitmap = needed_bitmap & ~state.spell_bitmap
fulfilled_spells = self.spell_index.get_spell_names(fulfilled_bitmap)
missing_spells = self.spell_index.get_spell_names(missing_bitmap)
return CompletedSuit(
items=state.items.copy(),
score=score,
total_armor=state.total_armor,
total_ratings=state.total_ratings.copy(),
set_counts=state.set_counts.copy(),
fulfilled_spells=fulfilled_spells,
missing_spells=missing_spells
)
def _calculate_score(self, state: SuitState) -> int:
"""Calculate suit score based on user specifications."""
score = 0
weights = self.scoring_weights
logger.debug(f"[SCORING] Starting score calculation for suit with {len(state.items)} items")
logger.debug(f"[SCORING] Set counts: {state.set_counts}")
logger.debug(f"[SCORING] Total ratings: {state.total_ratings}")
# 1. Complete armor sets (highest priority)
from main import translate_equipment_set_id
primary_set_name = translate_equipment_set_id(str(self.constraints.primary_set)) if self.constraints.primary_set else None
secondary_set_name = translate_equipment_set_id(str(self.constraints.secondary_set)) if self.constraints.secondary_set else None
logger.debug(f"[SCORING] Looking for primary set: {primary_set_name} (ID: {self.constraints.primary_set})")
logger.debug(f"[SCORING] Looking for secondary set: {secondary_set_name} (ID: {self.constraints.secondary_set})")
# FIXED: set_counts uses numeric IDs, not translated names
primary_count = state.set_counts.get(self.constraints.primary_set, 0) if self.constraints.primary_set else 0
secondary_count = state.set_counts.get(self.constraints.secondary_set, 0) if self.constraints.secondary_set else 0
logger.debug(f"[SCORING] Primary set {self.constraints.primary_set} has {primary_count} pieces")
logger.debug(f"[SCORING] Secondary set {self.constraints.secondary_set} has {secondary_count} pieces")
# Complete set bonuses: +1000 for complete sets only
if primary_count >= 5:
score += weights.armor_set_complete
logger.debug(f"[SCORING] Primary set complete: +{weights.armor_set_complete}")
else:
# Missing set penalty: -200 per missing piece
if self.constraints.primary_set and primary_count > 0:
missing_pieces = 5 - primary_count
penalty = missing_pieces * weights.missing_set_penalty
score += penalty # negative penalty
logger.debug(f"[SCORING] Primary set incomplete ({primary_count}/5): {penalty}")
if secondary_count >= 4:
score += weights.armor_set_complete
logger.debug(f"[SCORING] Secondary set complete: +{weights.armor_set_complete}")
else:
# Missing set penalty: -200 per missing piece
if self.constraints.secondary_set and secondary_count > 0:
missing_pieces = 4 - secondary_count
penalty = missing_pieces * weights.missing_set_penalty
score += penalty # negative penalty
logger.debug(f"[SCORING] Secondary set incomplete ({secondary_count}/4): {penalty}")
# 2. Crit Damage Rating: CD1 = +10, CD2 = +20 per piece
for item in state.items.values():
crit_rating = item.ratings.get('crit_damage_rating', 0)
if crit_rating == 1:
score += weights.crit_damage_1
elif crit_rating == 2:
score += weights.crit_damage_2
# 3. Damage Rating on clothes only: DR1=+10, DR2=+20, DR3=+30 per piece
for item in state.items.values():
if item.slot in ['Shirt', 'Pants']: # Only clothes
damage_rating = item.ratings.get('damage_rating', 0)
if damage_rating == 1:
score += weights.damage_rating_1
elif damage_rating == 2:
score += weights.damage_rating_2
elif damage_rating == 3:
score += weights.damage_rating_3
# 4. Spell Coverage: +100 per fulfilled cantrip/ward (no duplicates)
if self.constraints.required_spells:
fulfilled_spells = state.spell_bitmap & self.needed_spell_bitmap
fulfilled_count = bin(fulfilled_spells).count('1')
spell_score = fulfilled_count * 100
score += spell_score
logger.debug(f"[SCORING] Spell coverage: {fulfilled_count} spells = +{spell_score} points")
# 5. Base score for having items (so suits aren't rejected as 0)
# Add small base score per item to avoid 0 scores
base_item_score = len(state.items) * 5
score += base_item_score
logger.debug(f"[SCORING] Base item score: {len(state.items)} items = +{base_item_score} points")
logger.debug(f"[SCORING] Final score: {score}")
return max(0, score) # Never negative
def is_better_than_existing(self, suit: CompletedSuit) -> bool:
"""Check if suit is worth keeping."""
logger.info(f"[DEBUG] is_better_than_existing: checking suit with score {suit.score}, current best_suits count: {len(self.best_suits)}, max_results: {self.constraints.max_results}")
if len(self.best_suits) < self.constraints.max_results:
logger.info(f"[DEBUG] is_better_than_existing: ACCEPTING suit - not at max capacity")
return True
# Keep suits with more items even if score is lower - we want complete suits
lowest_suit = self.best_suits[-1]
logger.info(f"[DEBUG] is_better_than_existing: comparing against lowest suit score {lowest_suit.score}")
if len(suit.items) > len(lowest_suit.items):
logger.info(f"[DEBUG] is_better_than_existing: ACCEPTING suit - more items ({len(suit.items)} > {len(lowest_suit.items)})")
return True
result = suit.score > lowest_suit.score
logger.info(f"[DEBUG] is_better_than_existing: score comparison result: {result}")
return result
def _has_room_for_armor_set(self, item: SuitItem, state: SuitState) -> bool:
"""Check if adding this armor piece violates set limits (Mag-SuitBuilder HasRoomForArmorSet)."""
if not item.set_id:
return True # Non-set items don't count against limits
current_count = state.set_counts.get(item.set_id, 0)
if item.set_id == self.constraints.primary_set:
return current_count < 5
elif item.set_id == self.constraints.secondary_set:
return current_count < 4
else:
# STRICT: Other sets not allowed for armor
# Only jewelry can be from other sets
jewelry_slots = {"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"}
return item.slot in jewelry_slots
def _convert_set_name_to_id(self, set_name: Optional[str]) -> Optional[int]:
"""Convert set name to numeric ID for constraint comparison."""
if not set_name:
return None
# Reverse mapping of SET_NAMES
name_to_id = {
"Adept's Set": 14,
"Defender's Set": 16,
"Soldier's Set": 13,
"Wise Set": 21,
"Heroic Protector Set": 40,
"Heroic Destroyer Set": 41,
"Relic Alduressa Set": 46,
"Ancient Relic Set": 47,
"Noble Relic Set": 48,
"Archer's Set": 15,
"Hearty Set": 19,
"Dexterous Set": 20,
"Swift Set": 22,
"Reinforced Set": 24,
"Flame Proof Set": 26,
"Lightning Proof Set": 29
}
return name_to_id.get(set_name)
def _can_get_beneficial_spell_from(self, item: SuitItem, state: SuitState) -> bool:
"""Check if item provides beneficial spells (Mag-SuitBuilder CanGetBeneficialSpellFrom)."""
if not item.spell_names:
return True # Non-spell items are always beneficial for armor/ratings
# If no spell constraints, any spell item is beneficial
if not self.constraints.required_spells:
return True
# FIXED: Items are beneficial for multiple reasons, not just spells
# 1. Items from requested armor sets are always beneficial (set completion)
if (item.set_id == self.constraints.primary_set or
item.set_id == self.constraints.secondary_set):
return True
# 2. Items with good ratings are beneficial even if spells overlap
if (item.ratings.get('crit_damage_rating', 0) > 0 or
item.ratings.get('damage_rating', 0) > 0 or
item.armor_level > 500): # High armor items are valuable
return True
# 3. Check if item provides any needed spells not already covered
needed_bitmap = self.needed_spell_bitmap
current_bitmap = state.spell_bitmap
item_bitmap = item.spell_bitmap
# Item is beneficial if it provides any needed spell we don't have
beneficial_spells = item_bitmap & needed_bitmap & ~current_bitmap
return beneficial_spells != 0
# API Endpoints
@router.get("/characters")
async def get_available_characters():
"""Get list of characters with inventory data."""
query = """
SELECT DISTINCT character_name
FROM items
ORDER BY character_name
"""
try:
rows = await database.fetch_all(query)
characters = [row['character_name'] for row in rows]
return {"characters": characters}
except Exception as e:
logger.error(f"Error fetching characters: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/search")
async def search_suits(request: Request, constraints: SearchConstraints):
"""Start suit search with streaming results."""
# Create cancellation check function
async def is_cancelled():
return await request.is_disconnected()
solver = ConstraintSatisfactionSolver(constraints, is_cancelled=is_cancelled)
async def event_generator():
try:
logger.info("Starting SSE event generator")
async for result in solver.search():
# Check for client disconnection
if await request.is_disconnected():
logger.info("Client disconnected, stopping search")
yield {
"event": "cancelled",
"data": json.dumps({"message": "Search cancelled by client"})
}
return
logger.info(f"Yielding SSE event: {result.type}")
try:
# Send full data for frontend processing
data_json = json.dumps(result.data)
yield {
"event": result.type,
"data": data_json
}
except Exception as e:
logger.error(f"Error serializing result data: {e}", exc_info=True)
logger.error(f"Result type: {result.type}, Data keys: {list(result.data.keys()) if hasattr(result.data, 'keys') else 'N/A'}")
yield {
"event": "error",
"data": json.dumps({"message": f"Serialization error: {str(e)}"})
}
logger.info("SSE event generator completed")
except Exception as e:
logger.error(f"Error in search generator: {e}", exc_info=True)
yield {
"event": "error",
"data": json.dumps({"message": f"Search error: {str(e)}"})
}
async def sse_generator():
async for event in event_generator():
# Manual SSE format
yield f"event: {event['event']}\n"
yield f"data: {event['data']}\n\n"
return StreamingResponse(
sse_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Cache-Control"
}
)
@router.post("/search-test")
async def search_suits_test(constraints: SearchConstraints):
"""Test endpoint for suit search without SSE."""
try:
solver = ConstraintSatisfactionSolver(constraints)
results = []
async for result in solver.search():
if result.type == "suit":
results.append({
"score": result.data.get("score", 0),
"total_armor": result.data.get("total_armor", 0),
"item_count": len(result.data.get("items", {}))
})
elif result.type == "complete":
return {
"success": True,
"suits_found": len(results),
"duration": result.data.get("duration", 0),
"results": results[:5] # Return first 5 results
}
return {"success": False, "error": "No completion event received"}
except Exception as e:
logger.error(f"Test search error: {e}", exc_info=True)
return {"success": False, "error": str(e)}
@router.get("/sets")
async def get_equipment_sets():
"""Get available equipment sets."""
# Return hardcoded sets for now
sets = [
{"id": 14, "name": "Adept's"},
{"id": 16, "name": "Defender's"},
{"id": 13, "name": "Soldier's"},
{"id": 21, "name": "Wise"},
{"id": 40, "name": "Heroic Protector"},
{"id": 41, "name": "Heroic Destroyer"},
{"id": 46, "name": "Relic Alduressa"},
{"id": 47, "name": "Ancient Relic"},
{"id": 48, "name": "Noble Relic"},
{"id": 15, "name": "Archer's"},
{"id": 19, "name": "Hearty"},
{"id": 20, "name": "Dexterous"},
{"id": 22, "name": "Swift"},
{"id": 24, "name": "Reinforced"},
{"id": 26, "name": "Flame Proof"},
{"id": 29, "name": "Lightning Proof"}
]
return {"sets": sets}