MosswartOverlord/inventory-service/suitbuilder.py
erik e0265e261c Add suitbuilder backend improvements and SSE streaming fix
- Add dedicated streaming proxy endpoint for real-time suitbuilder SSE updates
- Implement stable sorting with character_name and name tiebreakers for deterministic results
- Refactor locked items to locked slots supporting set_id and spell constraints
- Add Mag-SuitBuilder style branch pruning tracking variables
- Enhance search with phase updates and detailed progress logging
- Update design document with SSE streaming proxy fix details

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 19:14:07 +00:00

2217 lines
No EOL
104 KiB
Python

"""
Suitbuilder - Equipment optimization system for Asheron's Call.
Implements constraint satisfaction solver to find optimal equipment combinations
across multiple characters' inventories based on armor sets, spell coverage, and ratings.
"""
import json
import logging
import asyncio
from enum import IntFlag, Enum
from typing import Dict, List, Optional, Any, Set, Tuple, AsyncGenerator
from dataclasses import dataclass, field
from datetime import datetime
import time
from fastapi import APIRouter, HTTPException, Query, Depends, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
import databases
import sqlalchemy as sa
from database import (
DATABASE_URL, Item, ItemCombatStats, ItemRequirements,
ItemEnhancements, ItemRatings, ItemSpells, ItemRawData
)
# Import shared helper functions
import json as json_module
# Removed circular import - will implement locally if needed
# Configure logging
logger = logging.getLogger(__name__)
# Database connection will be injected from main service
database = None
# Set name translation mapping
SET_NAMES = {
14: "Adept's",
16: "Defender's",
13: "Soldier's",
21: "Wise",
40: "Heroic Protector",
41: "Heroic Destroyer",
46: "Relic Alduressa",
47: "Ancient Relic",
48: "Noble Relic",
15: "Archer's",
19: "Hearty",
20: "Dexterous",
22: "Swift",
24: "Reinforced",
26: "Flame Proof",
29: "Lightning Proof"
}
def get_set_name(set_id: Optional[int]) -> str:
"""Translate set ID to readable set name."""
if set_id is None:
return ""
# If set_id is already a string with "Set" in it, it's already translated
if isinstance(set_id, str) and "Set" in set_id:
return set_id
# Otherwise translate numeric ID
return SET_NAMES.get(set_id, f"Set {set_id}")
def set_database_connection(db_instance):
"""Set the database connection from main service."""
global database
database = db_instance
logger.info("Suitbuilder database connection established")
# Create router for suitbuilder endpoints
router = APIRouter()
class CoverageMask(IntFlag):
"""
Bit flags for armor coverage areas.
Values match Mag-SuitBuilder exactly for compatibility.
"""
NONE = 0x0
# Underwear coverage (shirts/pants)
UNDERWEAR_UPPER_LEGS = 0x00000002
UNDERWEAR_LOWER_LEGS = 0x00000004
UNDERWEAR_CHEST = 0x00000008
UNDERWEAR_ABDOMEN = 0x00000010
UNDERWEAR_UPPER_ARMS = 0x00000020
UNDERWEAR_LOWER_ARMS = 0x00000040
# Outerwear/Armor coverage (the important ones for armor suits)
OUTERWEAR_UPPER_LEGS = 0x00000100
OUTERWEAR_LOWER_LEGS = 0x00000200
OUTERWEAR_CHEST = 0x00000400
OUTERWEAR_ABDOMEN = 0x00000800
OUTERWEAR_UPPER_ARMS = 0x00001000
OUTERWEAR_LOWER_ARMS = 0x00002000
# Extremities
HEAD = 0x00004000
HANDS = 0x00008000
FEET = 0x00010000
# Convenience aliases matching our slot names
CHEST = OUTERWEAR_CHEST
ABDOMEN = OUTERWEAR_ABDOMEN
UPPER_ARMS = OUTERWEAR_UPPER_ARMS
LOWER_ARMS = OUTERWEAR_LOWER_ARMS
UPPER_LEGS = OUTERWEAR_UPPER_LEGS
LOWER_LEGS = OUTERWEAR_LOWER_LEGS
def reduction_options(self) -> List['CoverageMask']:
"""
Returns possible reductions for multi-coverage items.
Based on exact Mag-SuitBuilder logic for armor tailoring.
"""
# Single coverage items cannot be reduced
if bin(self.value).count('1') <= 1:
return []
# Robes cannot be reduced (exclude from suits entirely)
if self.is_robe():
return []
reductions = []
# Specific reduction patterns from Mag-SuitBuilder
if self == (CoverageMask.UPPER_ARMS | CoverageMask.LOWER_ARMS):
# Arm guards can be reduced to either upper or lower arms
reductions.extend([CoverageMask.UPPER_ARMS, CoverageMask.LOWER_ARMS])
elif self == (CoverageMask.UPPER_LEGS | CoverageMask.LOWER_LEGS):
# Leg guards can be reduced to either upper or lower legs
reductions.extend([CoverageMask.UPPER_LEGS, CoverageMask.LOWER_LEGS])
elif self == (CoverageMask.LOWER_LEGS | CoverageMask.FEET):
# Boots that cover lower legs and feet can be reduced to feet only
reductions.append(CoverageMask.FEET)
elif self == (CoverageMask.CHEST | CoverageMask.ABDOMEN):
# Chest+abdomen pieces can be reduced to chest only
reductions.append(CoverageMask.CHEST)
elif self == (CoverageMask.CHEST | CoverageMask.ABDOMEN | CoverageMask.UPPER_ARMS):
# Chest+abdomen+upper arms can be reduced to chest
reductions.append(CoverageMask.CHEST)
elif self == (CoverageMask.CHEST | CoverageMask.UPPER_ARMS | CoverageMask.LOWER_ARMS):
# Chest+arms can be reduced to chest
reductions.append(CoverageMask.CHEST)
elif self == (CoverageMask.CHEST | CoverageMask.UPPER_ARMS):
# Chest+upper arms can be reduced to chest
reductions.append(CoverageMask.CHEST)
elif self == (CoverageMask.ABDOMEN | CoverageMask.UPPER_LEGS | CoverageMask.LOWER_LEGS):
# Tassets covering abdomen and legs can be reduced to any of the three
reductions.extend([CoverageMask.ABDOMEN, CoverageMask.UPPER_LEGS, CoverageMask.LOWER_LEGS])
elif self == (CoverageMask.CHEST | CoverageMask.ABDOMEN | CoverageMask.UPPER_ARMS | CoverageMask.LOWER_ARMS):
# Hauberks can be reduced to chest
reductions.append(CoverageMask.CHEST)
elif self == (CoverageMask.ABDOMEN | CoverageMask.UPPER_LEGS):
# Pre-2010 retail pieces - reduce to abdomen only
reductions.append(CoverageMask.ABDOMEN)
return reductions
def is_robe(self) -> bool:
"""
Check if this coverage represents a robe using exact Mag-SuitBuilder logic.
Robe pattern: 0x00013F00 (7 specific coverage areas excluding head and hands)
"""
# Exact robe pattern from Mag-SuitBuilder: 0x00013F00
MAG_ROBE_PATTERN = 0x00013F00
# Check for exact match first (most reliable)
if self.value == MAG_ROBE_PATTERN:
return True
# Also check for the component pattern manually in case of slight variations
robe_components = (
CoverageMask.FEET |
CoverageMask.OUTERWEAR_UPPER_LEGS |
CoverageMask.OUTERWEAR_LOWER_LEGS |
CoverageMask.OUTERWEAR_CHEST |
CoverageMask.OUTERWEAR_ABDOMEN |
CoverageMask.OUTERWEAR_UPPER_ARMS |
CoverageMask.OUTERWEAR_LOWER_ARMS
)
if self.value == robe_components.value:
return True
# Fallback: 6+ coverage areas as general indicator
# but log it for investigation
coverage_count = bin(self.value).count('1')
if coverage_count >= 6:
logger.info(f"Potential robe detected with {coverage_count} coverage areas: 0x{self.value:08X} (not exact pattern 0x{MAG_ROBE_PATTERN:08X})")
return True
return False
def to_slot_name(self) -> Optional[str]:
"""Convert single coverage mask to equipment slot name."""
mapping = {
CoverageMask.HEAD: "Head",
CoverageMask.CHEST: "Chest",
CoverageMask.UPPER_ARMS: "Upper Arms",
CoverageMask.LOWER_ARMS: "Lower Arms",
CoverageMask.HANDS: "Hands",
CoverageMask.ABDOMEN: "Abdomen",
CoverageMask.UPPER_LEGS: "Upper Legs",
CoverageMask.LOWER_LEGS: "Lower Legs",
CoverageMask.FEET: "Feet"
}
# Only works for single coverage
if self in mapping:
return mapping[self]
return None
@dataclass
class SuitItem:
"""Processed item ready for optimization."""
id: int
name: str
character_name: str
slot: str # Equipment slot name
coverage: Optional[CoverageMask] = None # For armor items
set_id: Optional[int] = None
armor_level: int = 0
ratings: Dict[str, int] = field(default_factory=dict)
spell_bitmap: int = 0
spell_names: List[str] = field(default_factory=list)
is_locked: bool = False # For user-locked items
material: Optional[str] = None # Material type for reduction eligibility
def __hash__(self):
"""Make item hashable for set operations."""
return hash(self.id)
@property
def is_armor(self) -> bool:
"""Check if this item provides armor (for Mag-SuitBuilder compatibility)."""
return self.armor_level > 0
@dataclass
class ItemBucket:
"""Container for items that can fit in a specific slot."""
slot: str
items: List[SuitItem] = field(default_factory=list)
is_armor: bool = False
is_required: bool = False # Some slots might be required by constraints
def sort_items(self):
"""Sort items by priority based on slot type.
All sorts include (character_name, name) as stable tiebreakers
to ensure deterministic ordering for reproducible search results.
"""
if self.slot in ['Shirt', 'Pants']:
# Underclothes: damage_rating first, ignore armor_level (buffed armor irrelevant)
self.items.sort(
key=lambda item: (
item.ratings.get('damage_rating', 0),
len(item.spell_names),
sum(r for k, r in item.ratings.items() if k != 'damage_rating'),
item.character_name,
item.name
),
reverse=True
)
elif self.is_armor:
# Armor: armor_level first, then crit damage, then spells
self.items.sort(
key=lambda item: (
item.armor_level,
item.ratings.get('crit_damage_rating', 0),
len(item.spell_names),
sum(item.ratings.values()),
item.character_name,
item.name
),
reverse=True
)
else:
# Jewelry: spells first, then total ratings
self.items.sort(
key=lambda item: (
len(item.spell_names),
sum(item.ratings.values()),
item.character_name,
item.name
),
reverse=True
)
class SpellBitmapIndex:
"""Maps spell names to bit positions for O(1) overlap detection."""
def __init__(self):
self.spell_to_bit: Dict[str, int] = {}
self.bit_to_spell: Dict[int, str] = {}
self._next_bit = 0
def register_spell(self, spell_name: str) -> int:
"""Register a spell and return its bit position."""
if spell_name not in self.spell_to_bit:
if self._next_bit >= 64:
# For more than 64 spells, we'd need to use multiple integers
logger.warning(f"More than 64 unique spells detected. Spell: {spell_name}")
bit_position = 1 << self._next_bit
self.spell_to_bit[spell_name] = bit_position
self.bit_to_spell[bit_position] = spell_name
self._next_bit += 1
return self.spell_to_bit[spell_name]
def get_bitmap(self, spells: List[str]) -> int:
"""Convert spell list to bitmap representation."""
bitmap = 0
for spell in spells:
bitmap |= self.register_spell(spell)
return bitmap
def get_spell_names(self, bitmap: int) -> List[str]:
"""Convert bitmap back to spell names."""
spells = []
for bit, spell in self.bit_to_spell.items():
if bitmap & bit:
spells.append(spell)
return spells
def would_add_needed_spell(self, item_bitmap: int, needed_bitmap: int, current_bitmap: int) -> bool:
"""Check if item adds any needed spell not already covered."""
# New spells the item would add
new_spells = item_bitmap & ~current_bitmap
# Check if any new spells are needed
return bool(new_spells & needed_bitmap)
@dataclass
class SuitState:
"""Mutable state during search."""
items: Dict[str, SuitItem] = field(default_factory=dict) # slot -> item
spell_bitmap: int = 0
set_counts: Dict[int, int] = field(default_factory=dict) # set_id -> count
total_armor: int = 0
total_ratings: Dict[str, int] = field(default_factory=dict)
occupied_slots: Set[str] = field(default_factory=set)
def push(self, item: SuitItem) -> None:
"""Add item to suit (modifies state)."""
self.items[item.slot] = item
self.occupied_slots.add(item.slot)
self.spell_bitmap |= item.spell_bitmap
# Update set counts
if item.set_id:
self.set_counts[item.set_id] = self.set_counts.get(item.set_id, 0) + 1
logger.debug(f"[STATE] Added item with set_id {item.set_id}, set_counts now: {self.set_counts}")
# Update totals
self.total_armor += item.armor_level
for rating_name, value in item.ratings.items():
self.total_ratings[rating_name] = self.total_ratings.get(rating_name, 0) + value
def pop(self, slot: str) -> Optional[SuitItem]:
"""Remove item from slot (modifies state)."""
if slot not in self.items:
return None
item = self.items.pop(slot)
self.occupied_slots.remove(slot)
# Rebuild spell bitmap (can't just subtract due to overlaps)
self.spell_bitmap = 0
for remaining_item in self.items.values():
self.spell_bitmap |= remaining_item.spell_bitmap
# Update set counts
if item.set_id:
self.set_counts[item.set_id] -= 1
if self.set_counts[item.set_id] == 0:
del self.set_counts[item.set_id]
# Update totals
self.total_armor -= item.armor_level
for rating_name, value in item.ratings.items():
if rating_name in self.total_ratings:
self.total_ratings[rating_name] -= value
if self.total_ratings[rating_name] <= 0:
del self.total_ratings[rating_name]
return item
def clone(self) -> 'SuitState':
"""Deep copy for branching."""
new_state = SuitState()
new_state.items = self.items.copy()
new_state.spell_bitmap = self.spell_bitmap
new_state.set_counts = self.set_counts.copy()
new_state.total_armor = self.total_armor
new_state.total_ratings = self.total_ratings.copy()
new_state.occupied_slots = self.occupied_slots.copy()
return new_state
class ScoringWeights(BaseModel):
"""Configurable scoring weights."""
armor_set_complete: int = 1000 # Complete sets (primary/secondary)
missing_set_penalty: int = -200 # Missing set pieces penalty
crit_damage_1: int = 10 # CD1 rating points
crit_damage_2: int = 20 # CD2 rating points
damage_rating_1: int = 10 # DR1 on clothes
damage_rating_2: int = 20 # DR2 on clothes
damage_rating_3: int = 30 # DR3 on clothes
class LockedSlotInfo(BaseModel):
"""Information about a locked slot."""
set_id: Optional[int] = None
spells: List[str] = []
class SearchConstraints(BaseModel):
"""User-defined search constraints."""
characters: List[str]
primary_set: Optional[int] = None
secondary_set: Optional[int] = None
required_spells: List[str] = field(default_factory=list)
locked_slots: Dict[str, LockedSlotInfo] = field(default_factory=dict) # slot -> lock info
include_equipped: bool = True
include_inventory: bool = True
min_armor: Optional[int] = None
max_armor: Optional[int] = None
min_crit_damage: Optional[int] = None
max_crit_damage: Optional[int] = None
min_damage_rating: Optional[int] = None
max_damage_rating: Optional[int] = None
scoring_weights: Optional[ScoringWeights] = None
max_results: int = 50
search_timeout: int = 300 # seconds
@dataclass
class CompletedSuit:
"""Final suit result."""
items: Dict[str, SuitItem]
score: int
total_armor: int
total_ratings: Dict[str, int]
set_counts: Dict[int, int]
fulfilled_spells: List[str]
missing_spells: List[str]
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
# Build transfer summary
transfer_by_character = {}
total_items = 0
for slot, item in self.items.items():
character = item.character_name
if character not in transfer_by_character:
transfer_by_character[character] = []
transfer_by_character[character].append(item.name)
total_items += 1
# Build transfer instructions
instructions = []
step = 1
for character, items in sorted(transfer_by_character.items()):
for item_name in items:
instructions.append(f"{step}. Transfer {item_name} from {character} to new character")
step += 1
instructions.append(f"{step}. Equip all transferred items on new character")
return {
"id": hash(tuple(sorted(self.items.keys()))), # Generate ID from slots
"score": self.score,
"items": {
slot: {
"id": item.id,
"name": item.name,
"source_character": item.character_name,
"armor_level": item.armor_level,
"ratings": item.ratings,
"spells": item.spell_names,
"set_id": item.set_id,
"set_name": get_set_name(item.set_id) # Translate set ID to name
}
for slot, item in self.items.items()
},
"stats": {
"total_armor": self.total_armor,
"total_crit_damage": self.total_ratings.get('crit_damage_rating', 0),
"total_damage_rating": self.total_ratings.get('damage_rating', 0),
"primary_set_count": 0,
"secondary_set_count": 0,
"spell_coverage": len(self.fulfilled_spells)
},
"missing": self.missing_spells,
"notes": [],
"transfer_summary": {
"total_items": total_items,
"from_characters": transfer_by_character
},
"instructions": instructions
}
class SearchResult(BaseModel):
"""Result yielded during search."""
type: str # "suit", "progress", "complete", "error"
data: Any
class ItemPreFilter:
"""Pre-filtering system to remove dominated items before search."""
@staticmethod
def remove_surpassed_items(items: List[SuitItem]) -> List[SuitItem]:
"""Remove items dominated by better alternatives (LeanMyWorldObjectExtensions.cs:9-24)"""
filtered_items = []
for item in items:
is_surpassed = False
for compare_item in items:
if compare_item == item:
continue
if ItemPreFilter._is_surpassed_by(item, compare_item):
is_surpassed = True
break
if not is_surpassed:
filtered_items.append(item)
logger.info(f"Pre-filter: {len(items)} -> {len(filtered_items)} items (removed {len(items) - len(filtered_items)} surpassed)")
return filtered_items
@staticmethod
def _is_surpassed_by(item: SuitItem, compare_item: SuitItem) -> bool:
"""Check if item is dominated by compare_item (LeanMyWorldObject.cs:90-147)"""
# Items must be same slot to be comparable
if item.slot != compare_item.slot:
return False
# Items must be same set to be comparable (or both no-set)
if item.set_id != compare_item.set_id:
return False
# Compare spells (higher level cantrips surpass lower)
if not ItemPreFilter._spells_surpass_or_equal(compare_item.spell_names, item.spell_names):
return False
# Compare ratings - compare_item must be better in at least one category
better_in_something = False
for rating_key in ['crit_damage_rating', 'damage_rating']:
item_rating = item.ratings.get(rating_key, 0)
compare_rating = compare_item.ratings.get(rating_key, 0)
if compare_rating > item_rating:
better_in_something = True
elif item_rating > compare_rating:
return False # Item is better in this category
# Also compare armor level for armor pieces
if item.armor_level > 0 and compare_item.armor_level > 0:
if compare_item.armor_level > item.armor_level:
better_in_something = True
elif item.armor_level > compare_item.armor_level:
return False
return better_in_something
@staticmethod
def _spells_surpass_or_equal(spells1: List[str], spells2: List[str]) -> bool:
"""Check if spells1 surpass or equal spells2"""
# For each spell in spells2, find equal or better in spells1
for spell2 in spells2:
found_surpassing = False
for spell1 in spells1:
if spell1 == spell2 or ItemPreFilter._spell_surpasses(spell1, spell2):
found_surpassing = True
break
if not found_surpassing:
return False
return True
@staticmethod
def _spell_surpasses(spell1: str, spell2: str) -> bool:
"""Check if spell1 surpasses spell2 (higher level of same type)"""
# Epic surpasses Major, Legendary surpasses Epic, etc.
if "Legendary" in spell1 and ("Epic" in spell2 or "Major" in spell2):
base1 = spell1.replace("Legendary ", "")
base2 = spell2.replace("Epic ", "").replace("Major ", "")
return base1 == base2
if "Epic" in spell1 and "Major" in spell2:
base1 = spell1.replace("Epic ", "")
base2 = spell2.replace("Major ", "")
return base1 == base2
return False
class ConstraintSatisfactionSolver:
"""Main optimization solver."""
def __init__(self, constraints: SearchConstraints, is_cancelled=None):
self.constraints = constraints
self.spell_index = SpellBitmapIndex()
self.best_suits: List[CompletedSuit] = []
self.suits_evaluated = 0
self.start_time = time.time()
self.scoring_weights = constraints.scoring_weights or ScoringWeights()
self.search_completed = False
self.is_cancelled = is_cancelled # Callback to check if search should stop
# Branch pruning: track best suit found so far (Mag-SuitBuilder style)
self.best_suit_item_count = 0
self.highest_armor_count_suit_built = 0 # Track highest armor piece count seen
self.total_armor_buckets_with_items = 0 # Will be set during bucket creation
# Pre-compute needed spell bitmap
self.needed_spell_bitmap = self.spell_index.get_bitmap(constraints.required_spells)
logger.info(f"[SPELL_CONSTRAINTS_DEBUG] Required spells: {constraints.required_spells}")
logger.info(f"[SPELL_CONSTRAINTS_DEBUG] Needed spell bitmap: {self.needed_spell_bitmap}")
async def search(self) -> AsyncGenerator[SearchResult, None]:
"""Main search entry point with streaming results."""
try:
# Phase 1: Loading items
yield SearchResult(type="phase", data={
"phase": "loading",
"message": "Loading items from database...",
"phase_number": 1,
"total_phases": 5
})
# Load and preprocess items
items = await self.load_items()
logger.info(f"Loaded {len(items)} items for optimization")
yield SearchResult(type="phase", data={
"phase": "loaded",
"message": f"Loaded {len(items)} items",
"items_count": len(items),
"phase_number": 1,
"total_phases": 5
})
yield SearchResult(type="log", data={
"level": "info",
"message": f"Loaded {len(items)} items from {len(self.constraints.characters)} characters",
"timestamp": time.time() - self.start_time
})
if not items:
yield SearchResult(type="error", data={"message": "No items found for specified characters"})
return
# Phase 2: Creating buckets
yield SearchResult(type="phase", data={
"phase": "buckets",
"message": "Creating equipment buckets...",
"phase_number": 2,
"total_phases": 5
})
# Create buckets
buckets = self.create_buckets(items)
logger.info(f"Created {len(buckets)} equipment buckets")
# Build bucket summary
bucket_summary = {b.slot: len(b.items) for b in buckets}
yield SearchResult(type="phase", data={
"phase": "buckets_done",
"message": f"Created {len(buckets)} buckets",
"bucket_count": len(buckets),
"bucket_summary": bucket_summary,
"phase_number": 2,
"total_phases": 5
})
# Log bucket details
bucket_details = ", ".join([f"{b.slot}: {len(b.items)}" for b in buckets[:5]])
yield SearchResult(type="log", data={
"level": "info",
"message": f"Buckets created: {bucket_details}{'...' if len(buckets) > 5 else ''}",
"timestamp": time.time() - self.start_time
})
# Phase 3: Applying reduction rules
yield SearchResult(type="phase", data={
"phase": "reducing",
"message": "Applying armor reduction rules...",
"phase_number": 3,
"total_phases": 5
})
# Apply armor reduction rules
buckets = self.apply_reduction_options(buckets)
# Phase 4: Sorting buckets
yield SearchResult(type="phase", data={
"phase": "sorting",
"message": "Optimizing search order...",
"phase_number": 4,
"total_phases": 5
})
# Sort buckets
buckets = self.sort_buckets(buckets)
# Start recursive search
initial_state = SuitState()
# Handle locked slots - filter out locked slots from buckets
if self.constraints.locked_slots:
# Debug: log what we received
logger.info(f"[LOCKED_SLOTS] Received locked_slots: {self.constraints.locked_slots}")
for slot, lock_info in self.constraints.locked_slots.items():
logger.info(f"[LOCKED_SLOTS] Slot '{slot}': set_id={lock_info.set_id}, spells={lock_info.spells}")
locked_slot_names = set(self.constraints.locked_slots.keys())
original_bucket_count = len(buckets)
buckets = [b for b in buckets if b.slot not in locked_slot_names]
logger.info(f"Filtered out {original_bucket_count - len(buckets)} locked slots: {locked_slot_names}")
# Calculate locked set contributions (using numeric set IDs)
self.locked_set_counts = {}
for slot, lock_info in self.constraints.locked_slots.items():
if lock_info.set_id:
# Use numeric set_id for consistency with state.set_counts
self.locked_set_counts[lock_info.set_id] = self.locked_set_counts.get(lock_info.set_id, 0) + 1
logger.info(f"[LOCKED_SLOTS] Added set_id {lock_info.set_id} from slot {slot}")
logger.info(f"Locked set contributions (by ID): {self.locked_set_counts}")
logger.info(f"[LOCKED_SLOTS] Primary set ID: {self.constraints.primary_set}, locked count for it: {self.locked_set_counts.get(self.constraints.primary_set, 0)}")
# Calculate locked spells to exclude from required
self.locked_spells = set()
for lock_info in self.constraints.locked_slots.values():
self.locked_spells.update(lock_info.spells)
logger.info(f"Locked spells (already covered): {self.locked_spells}")
# Log locked slots info
yield SearchResult(type="log", data={
"level": "info",
"message": f"Locked {len(locked_slot_names)} slots: {', '.join(locked_slot_names)}",
"timestamp": time.time() - self.start_time
})
else:
self.locked_set_counts = {}
self.locked_spells = set()
# Calculate effective set requirements (subtract locked pieces)
self.effective_primary_needed = 5 # Default for primary set
self.effective_secondary_needed = 4 # Default for secondary set
if self.constraints.primary_set:
locked_primary = self.locked_set_counts.get(self.constraints.primary_set, 0)
self.effective_primary_needed = max(0, 5 - locked_primary)
if self.constraints.secondary_set:
locked_secondary = self.locked_set_counts.get(self.constraints.secondary_set, 0)
self.effective_secondary_needed = max(0, 4 - locked_secondary)
logger.info(f"Effective requirements: {self.effective_primary_needed} primary, {self.effective_secondary_needed} secondary (after locked)")
# Log effective requirements
if self.locked_set_counts:
yield SearchResult(type="log", data={
"level": "info",
"message": f"Need: {self.effective_primary_needed} primary + {self.effective_secondary_needed} secondary pieces",
"timestamp": time.time() - self.start_time
})
# Phase 5: Searching
logger.info(f"Starting recursive search with {len(buckets)} buckets")
yield SearchResult(type="phase", data={
"phase": "searching",
"message": "Searching for optimal suits...",
"total_buckets": len(buckets),
"phase_number": 5,
"total_phases": 5
})
# Log search start summary
total_items = sum(len(b.items) for b in buckets)
yield SearchResult(type="log", data={
"level": "info",
"message": f"Starting search: {len(buckets)} buckets, {total_items} total items",
"timestamp": time.time() - self.start_time
})
logger.info("Starting async iteration over recursive search")
async for result in self.recursive_search(buckets, 0, initial_state):
yield result
logger.info(f"Recursive search completed, sending final results. Found {len(self.best_suits)} suits")
# Always send final results
yield SearchResult(
type="complete",
data={
"suits_found": len(self.best_suits),
"duration": round(time.time() - self.start_time, 2)
}
)
except Exception as e:
logger.error(f"Search error: {e}", exc_info=True)
yield SearchResult(type="error", data={"message": str(e)})
async def load_items_OLD_BROKEN(self) -> List[SuitItem]:
"""OLD BROKEN METHOD - REPLACED WITH API CALL"""
pass
async def load_items(self) -> List[SuitItem]:
"""Load items using the working inventory API with HTTP calls."""
logger.info("[DEBUG] load_items() method called - starting item loading process")
try:
import urllib.request
import urllib.parse
import json as json_module
import asyncio
# Get user's set names for filtering
from main import translate_equipment_set_id
primary_set_name = translate_equipment_set_id(str(self.constraints.primary_set)) if self.constraints.primary_set else None
secondary_set_name = translate_equipment_set_id(str(self.constraints.secondary_set)) if self.constraints.secondary_set else None
logger.info(f"LOADING ITEMS VIA API: Primary='{primary_set_name}', Secondary='{secondary_set_name}'")
def fetch_set_items(set_name: str) -> list:
"""Synchronous helper to fetch items for a set."""
# Determine equipment status filter
equipment_status = None
if self.constraints.include_equipped and self.constraints.include_inventory:
# Both - no filter needed (default API behavior)
equipment_status_log = "both equipped and inventory items"
elif self.constraints.include_equipped:
equipment_status = "equipped"
equipment_status_log = "equipped items only"
elif self.constraints.include_inventory:
equipment_status = "unequipped"
equipment_status_log = "inventory items only"
else:
equipment_status_log = "no items (neither equipped nor inventory selected)"
# Build URL with selected characters or all characters
base_params = []
if self.constraints.characters:
# Use comma-separated list in single characters parameter
characters_str = ",".join(self.constraints.characters)
base_params.append(f"characters={urllib.parse.quote(characters_str)}")
logger.info(f"Fetching set items for {len(self.constraints.characters)} selected characters: {self.constraints.characters}")
else:
base_params.append("include_all_characters=true")
logger.info(f"Fetching set items for ALL characters")
# Add item set filter
base_params.append(f"item_set={urllib.parse.quote(set_name)}")
# Add equipment status filter if needed
if equipment_status:
base_params.append(f"equipment_status={equipment_status}")
# Add limit
base_params.append("limit=1000")
# Build final URL - use internal port 8000 since we're calling ourselves
url = f"http://localhost:8000/search/items?{'&'.join(base_params)}"
logger.info(f"Equipment status filter: {equipment_status_log}")
logger.info(f"Fetching set items from: {url}")
with urllib.request.urlopen(url) as response:
data = json_module.load(response)
items = data.get('items', [])
logger.info(f"Set ({set_name}) with {equipment_status_log}: {len(items)} items returned")
return items
# Use the working inventory API to get items for both sets
all_api_items = []
# Primary set items
if primary_set_name:
primary_items = await asyncio.get_event_loop().run_in_executor(None, fetch_set_items, primary_set_name)
all_api_items.extend(primary_items)
# Secondary set items
if secondary_set_name:
secondary_items = await asyncio.get_event_loop().run_in_executor(None, fetch_set_items, secondary_set_name)
all_api_items.extend(secondary_items)
# Clothing items (shirts and pants) - Use separate API endpoints
def fetch_clothing_items() -> list:
"""Synchronous helper to fetch clothing items using shirt_only and pants_only endpoints."""
# Determine equipment status filter
equipment_status = None
if self.constraints.include_equipped and self.constraints.include_inventory:
equipment_status_log = "both equipped and inventory items"
elif self.constraints.include_equipped:
equipment_status = "equipped"
equipment_status_log = "equipped items only"
elif self.constraints.include_inventory:
equipment_status = "unequipped"
equipment_status_log = "inventory items only"
else:
equipment_status_log = "no items (neither equipped nor inventory selected)"
# Build base params with selected characters or all characters
def build_base_params():
base_params = []
if self.constraints.characters:
characters_str = ",".join(self.constraints.characters)
base_params.append(f"characters={urllib.parse.quote(characters_str)}")
else:
base_params.append("include_all_characters=true")
# Add equipment status filter if needed
if equipment_status:
base_params.append(f"equipment_status={equipment_status}")
# Add limit
base_params.append("limit=1000")
return base_params
all_clothing_items = []
# Fetch shirts using shirt_only endpoint - only DR3 for optimization
shirt_params = build_base_params()
shirt_params.append("shirt_only=true")
shirt_params.append("min_damage_rating=3") # Only load DR3 shirts
shirt_url = f"http://localhost:8000/search/items?{'&'.join(shirt_params)}"
logger.info(f"Fetching shirt items with {equipment_status_log}")
logger.info(f"Fetching shirts from: {shirt_url}")
try:
with urllib.request.urlopen(shirt_url) as response:
data = json_module.load(response)
shirt_items = data.get('items', [])
logger.info(f"Shirt items with {equipment_status_log}: {len(shirt_items)} items returned")
all_clothing_items.extend(shirt_items)
except Exception as e:
logger.error(f"Error fetching shirts: {e}")
# Fetch pants using pants_only endpoint - only DR3 for optimization
pants_params = build_base_params()
pants_params.append("pants_only=true")
pants_params.append("min_damage_rating=3") # Only load DR3 pants
pants_url = f"http://localhost:8000/search/items?{'&'.join(pants_params)}"
logger.info(f"Fetching pants items with {equipment_status_log}")
logger.info(f"Fetching pants from: {pants_url}")
try:
with urllib.request.urlopen(pants_url) as response:
data = json_module.load(response)
pants_items = data.get('items', [])
logger.info(f"Pants items with {equipment_status_log}: {len(pants_items)} items returned")
all_clothing_items.extend(pants_items)
except Exception as e:
logger.error(f"Error fetching pants: {e}")
logger.info(f"Total clothing items fetched: {len(all_clothing_items)}")
return all_clothing_items
# Jewelry items (rings, necklaces, bracelets, trinkets)
def fetch_all_jewelry_items() -> list:
"""Synchronous helper to fetch jewelry items with cantrips/wards."""
logger.info("[DEBUG] fetch_all_jewelry_items() function called - starting execution")
# Determine equipment status filter
equipment_status = None
if self.constraints.include_equipped and self.constraints.include_inventory:
equipment_status_log = "both equipped and inventory items"
elif self.constraints.include_equipped:
equipment_status = "equipped"
equipment_status_log = "equipped items only"
elif self.constraints.include_inventory:
equipment_status = "unequipped"
equipment_status_log = "inventory items only"
else:
equipment_status_log = "no items (neither equipped nor inventory selected)"
# Build base params with selected characters or all characters
def build_base_params():
base_params = []
if self.constraints.characters:
characters_str = ",".join(self.constraints.characters)
base_params.append(f"characters={urllib.parse.quote(characters_str)}")
else:
base_params.append("include_all_characters=true")
# Add equipment status filter if needed
if equipment_status:
base_params.append(f"equipment_status={equipment_status}")
# Add limit
base_params.append("limit=1000")
return base_params
all_jewelry_items = []
# Fetch each jewelry type separately using slot_names filter
# This ensures we get all rings, bracelets, etc. instead of just the first page of all jewelry
jewelry_slot_types = [
("Ring", "rings"),
("Bracelet", "bracelets"),
("Neck", "necklaces/amulets"),
("Trinket", "trinkets")
]
for slot_filter, slot_description in jewelry_slot_types:
jewelry_params = build_base_params()
jewelry_params.append("jewelry_only=true")
jewelry_params.append(f"slot_names={slot_filter}")
jewelry_url = f"http://localhost:8000/search/items?{'&'.join(jewelry_params)}"
logger.info(f"Fetching {slot_description} with {equipment_status_log}")
try:
with urllib.request.urlopen(jewelry_url) as response:
data = json_module.load(response)
items = data.get('items', [])
logger.info(f"Fetched {len(items)} {slot_description}")
all_jewelry_items.extend(items)
except Exception as e:
logger.error(f"Error fetching {slot_description}: {e}")
logger.info(f"Total jewelry items fetched: {len(all_jewelry_items)}")
return all_jewelry_items
# Fetch clothing items
clothing_items = await asyncio.get_event_loop().run_in_executor(None, fetch_clothing_items)
all_api_items.extend(clothing_items)
# Fetch jewelry items
logger.info("[DEBUG] About to call fetch_all_jewelry_items() with asyncio executor")
try:
jewelry_items = await asyncio.get_event_loop().run_in_executor(None, fetch_all_jewelry_items)
logger.info(f"[DEBUG] fetch_all_jewelry_items() completed successfully, returned {len(jewelry_items)} items")
all_api_items.extend(jewelry_items)
except Exception as e:
logger.error(f"[DEBUG] Exception in fetch_all_jewelry_items() executor: {e}", exc_info=True)
jewelry_items = [] # Continue with empty list if jewelry fetch fails
logger.info(f"Total items from inventory API: {len(all_api_items)}")
# Helper function to normalize spell_names to list format
def normalize_spell_names(spell_data):
"""Convert spell_names to list format regardless of input type.
The API may return spell_names as:
- A list of spell names (correct format)
- A comma-separated string of spell names or IDs
- None or empty
This ensures we always work with a list.
"""
if spell_data is None:
return []
if isinstance(spell_data, list):
return spell_data
if isinstance(spell_data, str) and spell_data.strip():
# Split comma-separated values and clean up
return [s.strip() for s in spell_data.split(',') if s.strip()]
return []
# Convert to SuitItem objects
items = []
for api_item in all_api_items:
# The inventory API provides rich data with all fields we need!
# Use character_name + name as unique identifier since API doesn't return ID
unique_id = f"{api_item['character_name']}_{api_item['name']}"
# Parse coverage data for armor reduction
coverage_value = api_item.get('coverage_mask', 0)
coverage = CoverageMask(coverage_value) if coverage_value else None
# Use computed_slot_name from API if available, otherwise fallback to slot_name
slot_name = api_item.get('computed_slot_name') or api_item.get('slot_name', 'Unknown')
# For underclothes, ensure we get simple slot names
if api_item.get('object_class') == 3: # Clothing
coverage_mask = api_item.get('coverage_mask', 0)
if coverage_mask == 104: # Shirt pattern
slot_name = "Shirt"
elif coverage_mask in [19, 22]: # Pants/breeches patterns
slot_name = "Pants"
suit_item = SuitItem(
id=hash(unique_id), # Generate ID from character + item name
name=api_item['name'],
character_name=api_item['character_name'],
slot=slot_name, # Use computed slot or coverage-based slot for underclothes
coverage=coverage, # Now properly loaded from API
set_id=self._convert_set_name_to_id(api_item.get('item_set')), # Convert set name to numeric ID
armor_level=api_item.get('armor_level', 0),
ratings={
'crit_damage_rating': api_item.get('crit_damage_rating') if api_item.get('crit_damage_rating') is not None else 0,
'damage_rating': api_item.get('damage_rating') if api_item.get('damage_rating') is not None else 0,
'damage_resist_rating': api_item.get('damage_resist_rating') if api_item.get('damage_resist_rating') is not None else 0,
'crit_damage_resist_rating': api_item.get('crit_damage_resist_rating') if api_item.get('crit_damage_resist_rating') is not None else 0,
'heal_boost_rating': api_item.get('heal_boost_rating') if api_item.get('heal_boost_rating') is not None else 0,
'vitality_rating': api_item.get('vitality_rating') if api_item.get('vitality_rating') is not None else 0
},
spell_bitmap=0, # Will calculate if needed
spell_names=normalize_spell_names(api_item.get('spell_names')),
material=api_item.get('material_name', '')
)
items.append(suit_item)
# Log comprehensive stats
slot_counts = {}
set_counts = {}
for item in items:
slot_counts[item.slot] = slot_counts.get(item.slot, 0) + 1
if item.set_id:
set_counts[item.set_id] = set_counts.get(item.set_id, 0) + 1
logger.info(f"LOADED FROM API: {len(items)} total items")
logger.info(f"SLOT DISTRIBUTION: {slot_counts}")
logger.info(f"SET DISTRIBUTION: {set_counts}")
# Calculate spell bitmaps for all items
spell_items_count = 0
for item in items:
if item.spell_names:
item.spell_bitmap = self.spell_index.get_bitmap(item.spell_names)
spell_items_count += 1
logger.info(f"[SPELL] {item.name}: {item.spell_names} -> bitmap {item.spell_bitmap}")
# CRITICAL DEBUG: Check for Legendary Two Handed Combat specifically
if "Legendary Two Handed Combat" in item.spell_names:
logger.info(f"[LEGENDARY_TWO_HANDED_DEBUG] Found item with Legendary Two Handed Combat: {item.name} (set_id: {item.set_id}, spell_bitmap: {item.spell_bitmap})")
logger.info(f"SPELL PROCESSING: {spell_items_count} items with spells processed")
# Apply pre-filtering to remove dominated items
filtered_items = ItemPreFilter.remove_surpassed_items(items)
# Sort items for optimal search order
# Define slot sets
armor_slot_set = {
"Head", "Chest", "Upper Arms", "Lower Arms", "Hands",
"Abdomen", "Upper Legs", "Lower Legs", "Feet"
}
jewelry_slot_set = {
"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"
}
# Also match generic jewelry slot names that might come from API
jewelry_fallback_slots = {"Ring", "Bracelet", "Jewelry", "Necklace", "Amulet"}
clothing_slot_set = {"Shirt", "Pants"}
# Helper to check if item matches any slot in a set (handles multi-slot items)
def matches_slot_set(item_slot: str, slot_set: set, fallback_set: set = None) -> bool:
if item_slot in slot_set:
return True
# Handle multi-slot items like "Left Wrist, Right Wrist"
if ', ' in item_slot:
return any(s.strip() in slot_set for s in item_slot.split(', '))
# Check fallback set for generic names like "Ring", "Jewelry"
if fallback_set and item_slot in fallback_set:
return True
return False
armor_items = [item for item in filtered_items if matches_slot_set(item.slot, armor_slot_set)]
jewelry_items = [item for item in filtered_items if matches_slot_set(item.slot, jewelry_slot_set, jewelry_fallback_slots)]
clothing_items = [item for item in filtered_items if matches_slot_set(item.slot, clothing_slot_set)]
# Sort armor by spell count (most spells first) since armor level deprioritized
# Include (character_name, name) as stable tiebreakers for deterministic ordering
armor_items.sort(key=lambda x: (len(x.spell_names), x.character_name, x.name), reverse=True)
# Sort jewelry by spell count (most spells first)
jewelry_items.sort(key=lambda x: (len(x.spell_names), x.character_name, x.name), reverse=True)
# Sort clothing by damage rating (highest first)
clothing_items.sort(key=lambda x: (x.ratings.get('damage_rating', 0), x.character_name, x.name), reverse=True)
# Recombine in optimized order
optimized_items = armor_items + jewelry_items + clothing_items
# DETERMINISM CHECK - Log first 5 items of each type to verify consistent ordering
logger.info("DETERMINISM CHECK - First 5 armor items:")
for i, item in enumerate(armor_items[:5]):
logger.info(f" {i}: {item.character_name}/{item.name} spells={len(item.spell_names)}")
logger.info("DETERMINISM CHECK - First 5 jewelry items:")
for i, item in enumerate(jewelry_items[:5]):
logger.info(f" {i}: {item.character_name}/{item.name} spells={len(item.spell_names)}")
logger.info(f"ITEM SORTING: {len(armor_items)} armor, {len(jewelry_items)} jewelry, {len(clothing_items)} clothing")
# Debug: Log jewelry items with spells
jewelry_with_spells = [item for item in jewelry_items if item.spell_names]
logger.info(f"JEWELRY WITH SPELLS: {len(jewelry_with_spells)} items")
for item in jewelry_with_spells[:5]: # Log first 5
logger.info(f" - {item.name} (slot: {item.slot}): {item.spell_names}")
return optimized_items
except Exception as e:
logger.error(f"Error calling inventory API: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to load items from API: {str(e)}")
# Removed _determine_equipment_slot - now using SQL computed slots from main service
def create_buckets(self, items: List[SuitItem]) -> List[ItemBucket]:
"""Group items by equipment slot."""
# Define all possible slots (ARMOR + JEWELRY + CLOTHING)
all_slots = [
# Armor slots (9)
"Head", "Chest", "Upper Arms", "Lower Arms", "Hands",
"Abdomen", "Upper Legs", "Lower Legs", "Feet",
# Jewelry slots (6)
"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket",
# Clothing slots (2)
"Shirt", "Pants"
]
armor_slots = {
"Head", "Chest", "Upper Arms", "Lower Arms", "Hands",
"Abdomen", "Upper Legs", "Lower Legs", "Feet"
}
jewelry_slots = {
"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"
}
# Group items by slot
slot_items = {slot: [] for slot in all_slots}
for item in items:
# Special debugging for problematic robe
if "Empowered Robe of the Perfect Light" in item.name:
logger.info(f"[DEBUG] Analyzing problematic robe: {item.name}")
if item.coverage:
logger.info(f"[DEBUG] - Coverage mask: 0x{item.coverage.value:08X}")
logger.info(f"[DEBUG] - Coverage bits count: {bin(item.coverage.value).count('1')}")
logger.info(f"[DEBUG] - Matches exact robe pattern (0x00013F00): {item.coverage.value == 0x00013F00}")
logger.info(f"[DEBUG] - is_robe() result: {item.coverage.is_robe()}")
else:
logger.info(f"[DEBUG] - Coverage mask: None")
logger.info(f"[DEBUG] - Slot: {item.slot}")
logger.info(f"[DEBUG] - Coverage object: {item.coverage}")
# Skip robe detection - user's chosen armor sets don't include robes
# If user specifically chooses a set that contains robes, they should get those items
# Handle both single-slot and multi-slot items
if item.slot in slot_items:
# Single slot item - direct assignment
slot_items[item.slot].append(item)
elif ', ' in item.slot:
# Multi-slot item - create single-slot variants for each applicable slot
possible_slots = [s.strip() for s in item.slot.split(', ')]
added_to_slots = []
for possible_slot in possible_slots:
if possible_slot in slot_items:
# Create a single-slot variant of the item
single_slot_item = SuitItem(
id=item.id,
name=item.name,
character_name=item.character_name,
slot=possible_slot, # Single slot assignment
coverage=item.coverage,
set_id=item.set_id,
armor_level=item.armor_level,
ratings=item.ratings.copy(),
spell_bitmap=item.spell_bitmap,
spell_names=item.spell_names.copy(),
material=item.material
)
slot_items[possible_slot].append(single_slot_item)
added_to_slots.append(possible_slot)
if added_to_slots:
logger.debug(f"Multi-slot item {item.name} split into slots: {added_to_slots}")
else:
logger.warning(f"Multi-slot item {item.name} with slots '{item.slot}' couldn't be mapped to any valid slots")
else:
# Handle generic jewelry slot names that need expansion
generic_jewelry_expansion = {
"Ring": ["Left Ring", "Right Ring"],
"Bracelet": ["Left Wrist", "Right Wrist"],
"Jewelry": ["Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"],
"Necklace": ["Neck"],
"Amulet": ["Neck"],
}
mapped_slots = []
# Check for generic jewelry slot names first
if item.slot in generic_jewelry_expansion:
for target_slot in generic_jewelry_expansion[item.slot]:
single_slot_item = SuitItem(
id=item.id,
name=item.name,
character_name=item.character_name,
slot=target_slot,
coverage=item.coverage,
set_id=item.set_id,
armor_level=item.armor_level,
ratings=item.ratings.copy(),
spell_bitmap=item.spell_bitmap,
spell_names=item.spell_names.copy(),
material=item.material
)
slot_items[target_slot].append(single_slot_item)
mapped_slots.append(target_slot)
logger.debug(f"Generic jewelry slot '{item.slot}' expanded to: {mapped_slots} for {item.name}")
else:
# Check for complex slot patterns that might not use comma separation
# Handle items with complex slot descriptions that the SQL computed incorrectly
for known_slot in all_slots:
if known_slot.lower() in item.slot.lower():
# Create a single-slot variant of the item
single_slot_item = SuitItem(
id=item.id,
name=item.name,
character_name=item.character_name,
slot=known_slot, # Single slot assignment
coverage=item.coverage,
set_id=item.set_id,
armor_level=item.armor_level,
ratings=item.ratings.copy(),
spell_bitmap=item.spell_bitmap,
spell_names=item.spell_names.copy(),
material=item.material
)
slot_items[known_slot].append(single_slot_item)
mapped_slots.append(known_slot)
if mapped_slots:
logger.debug(f"Complex slot item {item.name} split into slots: {mapped_slots} (original: '{item.slot}')")
else:
logger.warning(f"Unknown slot '{item.slot}' for item {item.name} - could not map to any known slots")
# Create buckets - CRITICAL: Create ALL buckets even if empty (MagSuitBuilder behavior)
buckets = []
for slot in all_slots:
bucket = ItemBucket(
slot=slot,
items=slot_items[slot],
is_armor=(slot in armor_slots),
is_required=False # We'll mark required slots later based on constraints
)
# Sort items within bucket by priority
bucket.sort_items()
# ALWAYS add bucket - even empty ones (required for complete search)
buckets.append(bucket)
if len(bucket.items) > 0:
logger.info(f"Created bucket for {slot} with {len(bucket.items)} items")
else:
logger.info(f"Created EMPTY bucket for {slot} (will allow incomplete suits)")
# Sort buckets: armor first, then by item count (MagSuitBuilder ArmorSearcher.cs:95-100)
buckets.sort(key=lambda b: (
0 if b.is_armor else 1, # Armor buckets first
len(b.items) # Then by item count (smallest first for better pruning)
))
logger.info(f"CREATED {len(buckets)} total buckets (including {len([b for b in buckets if len(b.items) == 0])} empty)")
logger.info(f"BUCKET ORDER: {[f'{b.slot}({len(b.items)})' for b in buckets]}")
# Calculate total armor buckets with items for Mag-SuitBuilder pruning
self.total_armor_buckets_with_items = sum(
1 for b in buckets if b.is_armor and len(b.items) > 0
)
logger.info(f"ARMOR BUCKETS WITH ITEMS: {self.total_armor_buckets_with_items}")
# Log first 3 items of first non-empty bucket for determinism verification
for bucket in buckets:
if len(bucket.items) > 0:
logger.info(f"FIRST BUCKET ({bucket.slot}) - First 3 items:")
for i, item in enumerate(bucket.items[:3]):
logger.info(f" {i}: {item.character_name}/{item.name}")
break
return buckets
def apply_reduction_options(self, buckets: List[ItemBucket]) -> List[ItemBucket]:
"""Apply armor reduction rules for multi-coverage items."""
new_buckets = []
for bucket in buckets:
if not bucket.is_armor:
# Non-armor items don't need reduction
new_buckets.append(bucket)
continue
# Process armor items for potential reduction
original_items = []
reducible_items = []
for item in bucket.items:
# Debug logging for reduction evaluation
has_coverage = item.coverage is not None
has_material = hasattr(item, 'material') and item.material
reduction_options = item.coverage.reduction_options() if item.coverage else []
logger.info(f"Reduction check for '{item.name}' in {bucket.slot}: "
f"coverage={item.coverage.value if item.coverage else None}, "
f"material='{item.material}', "
f"reductions={len(reduction_options)}")
# Check if item can be reduced based on Mag-SuitBuilder rules:
# 1. Must have coverage data
# 2. Must have material (only loot-generated items can be tailored)
# 3. Must have valid reduction options
if (item.coverage and
hasattr(item, 'material') and item.material and # Only items with materials can be tailored
item.coverage.reduction_options()):
# Item can be reduced - we'll add it to multiple buckets
logger.info(f"Item '{item.name}' is reducible to: {[r.to_slot_name() for r in reduction_options]}")
reducible_items.append(item)
else:
# Item fits exactly in this slot or cannot be reduced
original_items.append(item)
# Keep original items in this bucket
if original_items or not reducible_items:
new_bucket = ItemBucket(
slot=bucket.slot,
items=original_items,
is_armor=bucket.is_armor,
is_required=bucket.is_required
)
new_bucket.sort_items()
new_buckets.append(new_bucket)
# Add reducible items to appropriate buckets
for item in reducible_items:
reduction_options = item.coverage.reduction_options()
for reduced_coverage in reduction_options:
reduced_slot = reduced_coverage.to_slot_name()
if not reduced_slot:
continue
# Create a reduced version of the item
reduced_item = SuitItem(
id=item.id,
name=f"{item.name} (tailored to {reduced_slot})",
character_name=item.character_name,
slot=reduced_slot,
coverage=reduced_coverage,
set_id=item.set_id,
armor_level=item.armor_level,
ratings=item.ratings.copy(),
spell_bitmap=item.spell_bitmap,
spell_names=item.spell_names.copy(),
is_locked=item.is_locked,
material=item.material
)
# Find or create bucket for this slot
target_bucket = None
for existing_bucket in new_buckets:
if existing_bucket.slot == reduced_slot:
target_bucket = existing_bucket
break
if not target_bucket:
target_bucket = ItemBucket(
slot=reduced_slot,
items=[],
is_armor=True,
is_required=False
)
new_buckets.append(target_bucket)
target_bucket.items.append(reduced_item)
# Re-sort all buckets after adding reduced items
for bucket in new_buckets:
bucket.sort_items()
# Count reduction statistics
original_items_count = sum(len(bucket.items) for bucket in buckets)
new_items_count = sum(len(bucket.items) for bucket in new_buckets)
logger.info(f"Applied reductions: {len(buckets)} original buckets -> {len(new_buckets)} buckets")
logger.info(f"Item count: {original_items_count} original -> {new_items_count} total (including reductions)")
return new_buckets
def sort_buckets(self, buckets: List[ItemBucket]) -> List[ItemBucket]:
"""Sort buckets for optimal search order and prioritize user's chosen sets."""
# First, sort items within each bucket to prioritize user's chosen sets
for bucket in buckets:
bucket.items.sort(key=lambda item: (
# Priority 1: User's primary set
0 if item.set_id == self.constraints.primary_set else
# Priority 2: User's secondary set
1 if item.set_id == self.constraints.secondary_set else
# Priority 3: Other items
2,
-item.ratings.get('crit_damage_rating', 0), # Higher crit damage first (CD2 > CD1 > CD0)
-item.ratings.get('damage_rating', 0), # Higher damage rating next
-item.armor_level # Higher armor within same priority
))
# Prioritize core armor slots, then jewelry, then clothing
core_armor_priority = ['Chest', 'Head', 'Hands', 'Feet', 'Upper Arms', 'Lower Arms', 'Abdomen', 'Upper Legs', 'Lower Legs']
jewelry_slots = ['Neck', 'Left Ring', 'Right Ring', 'Left Wrist', 'Right Wrist', 'Trinket']
clothing_slots = ['Shirt', 'Pants']
# Sort buckets by priority order and item count
def bucket_priority(bucket):
if bucket.slot in core_armor_priority:
return (0, core_armor_priority.index(bucket.slot), len(bucket.items))
elif bucket.slot in jewelry_slots:
return (1, jewelry_slots.index(bucket.slot), len(bucket.items))
elif bucket.slot in clothing_slots:
return (2, clothing_slots.index(bucket.slot), len(bucket.items))
else:
return (3, 0, len(bucket.items))
sorted_buckets = sorted(buckets, key=bucket_priority)
logger.info(f"Bucket search order: {[f'{b.slot}({len(b.items)})' for b in sorted_buckets[:10]]}")
return sorted_buckets
async def recursive_search(self, buckets: List[ItemBucket], bucket_idx: int,
state: SuitState) -> AsyncGenerator[SearchResult, None]:
"""Depth-first search with pruning and streaming."""
# Check for cancellation
if self.is_cancelled and await self.is_cancelled():
logger.info("Search cancelled by client")
return
# Early success detection - stop when user's set goals are achieved
primary_count = state.set_counts.get(self.constraints.primary_set, 0) if self.constraints.primary_set else 0
secondary_count = state.set_counts.get(self.constraints.secondary_set, 0) if self.constraints.secondary_set else 0
# Branch pruning - Mag-SuitBuilder style aggressive pruning (ArmorSearcher.cs:138)
# Formula: if (builder.Count + 1 < highestArmorCountSuitBuilt - (totalArmorBucketsWithItems - min(index, totalArmorBucketsWithItems)))
# This prunes branches where the best possible suit can't compete with what we've found
if self.highest_armor_count_suit_built > 0:
current_count = len(state.items)
# Calculate remaining armor slots that could be filled
remaining_armor_potential = self.total_armor_buckets_with_items - min(bucket_idx, self.total_armor_buckets_with_items)
# Minimum required: highest seen minus potential remaining (gives us 1-piece buffer)
min_required = self.highest_armor_count_suit_built - remaining_armor_potential
# If we can't even hit minimum required with 1 more piece, prune
if current_count + 1 < min_required:
return
# Also keep the simpler max-items pruning as backup
remaining_buckets = len(buckets) - bucket_idx
max_possible_items = len(state.items) + remaining_buckets
if self.best_suit_item_count > 0 and max_possible_items < self.best_suit_item_count:
return
# Base case: all buckets processed
if bucket_idx >= len(buckets):
logger.debug(f"[DEBUG] BASE CASE: All {len(buckets)} buckets processed, state has {len(state.items)} items")
suit = self.finalize_suit(state)
if suit:
logger.debug(f"[DEBUG] Suit created with score {suit.score}, {len(suit.items)} items")
if self.is_better_than_existing(suit):
logger.debug(f"[DEBUG] Suit ACCEPTED: score {suit.score} is better than existing")
logger.info(f"Found suit with score {suit.score}: {len(suit.items)} items")
self.best_suits.append(suit)
# Update best suit item count for pruning
if len(suit.items) > self.best_suit_item_count:
self.best_suit_item_count = len(suit.items)
# Update armor piece count for Mag-SuitBuilder style pruning
armor_slots = {"Head", "Chest", "Upper Arms", "Lower Arms", "Hands",
"Abdomen", "Upper Legs", "Lower Legs", "Feet"}
armor_piece_count = sum(1 for slot in suit.items.keys() if slot in armor_slots)
if armor_piece_count > self.highest_armor_count_suit_built:
self.highest_armor_count_suit_built = armor_piece_count
logger.info(f"[PRUNING] New highest armor count: {armor_piece_count}")
self.best_suits.sort(key=lambda s: s.score, reverse=True)
self.best_suits = self.best_suits[:self.constraints.max_results]
# Pass constraint info to to_dict for proper set counts
suit_data = suit.to_dict()
from main import translate_equipment_set_id
primary_set_name = translate_equipment_set_id(str(self.constraints.primary_set)) if self.constraints.primary_set else None
secondary_set_name = translate_equipment_set_id(str(self.constraints.secondary_set)) if self.constraints.secondary_set else None
# FIXED: set_counts uses numeric keys, not string names
primary_count = suit.set_counts.get(self.constraints.primary_set, 0) if self.constraints.primary_set else 0
secondary_count = suit.set_counts.get(self.constraints.secondary_set, 0) if self.constraints.secondary_set else 0
# Add locked slot contributions to the counts
if hasattr(self, 'locked_set_counts'):
primary_count += self.locked_set_counts.get(self.constraints.primary_set, 0) if self.constraints.primary_set else 0
secondary_count += self.locked_set_counts.get(self.constraints.secondary_set, 0) if self.constraints.secondary_set else 0
# Get locked counts for breakdown
locked_primary = self.locked_set_counts.get(self.constraints.primary_set, 0) if hasattr(self, 'locked_set_counts') and self.constraints.primary_set else 0
locked_secondary = self.locked_set_counts.get(self.constraints.secondary_set, 0) if hasattr(self, 'locked_set_counts') and self.constraints.secondary_set else 0
suit_data['stats']['primary_set_count'] = primary_count # Total (found + locked)
suit_data['stats']['secondary_set_count'] = secondary_count # Total (found + locked)
suit_data['stats']['primary_set'] = primary_set_name
suit_data['stats']['secondary_set'] = secondary_set_name
suit_data['stats']['locked_slots'] = len(self.constraints.locked_slots) if self.constraints.locked_slots else 0
suit_data['stats']['primary_locked'] = locked_primary
suit_data['stats']['secondary_locked'] = locked_secondary
yield SearchResult(type="suit", data=suit_data)
# Send log event for suit found
yield SearchResult(type="log", data={
"level": "success",
"message": f"Found suit #{len(self.best_suits)} with score {suit.score} ({len(suit.items)} items)",
"timestamp": time.time() - self.start_time
})
else:
logger.debug(f"[DEBUG] Suit REJECTED: score {suit.score} not better than existing")
else:
logger.debug(f"[DEBUG] No suit created from current state")
return
# Progress update and debug info
self.suits_evaluated += 1
if self.suits_evaluated % 100 == 0: # Every 100 evaluations for reduced log spam
# Check for cancellation during progress update
if self.is_cancelled and await self.is_cancelled():
logger.info("Search cancelled during progress update")
return
elapsed = time.time() - self.start_time
rate = round(self.suits_evaluated / elapsed, 1) if elapsed > 0 else 0
best_score = self.best_suits[0].score if self.best_suits else 0
current_bucket_name = buckets[bucket_idx].slot if bucket_idx < len(buckets) else None
logger.info(f"Search progress: evaluated {self.suits_evaluated}, depth {bucket_idx}/{len(buckets)}, found {len(self.best_suits)} suits, best has {self.best_suit_item_count} items")
yield SearchResult(
type="progress",
data={
"evaluated": self.suits_evaluated,
"found": len(self.best_suits),
"current_depth": bucket_idx,
"total_buckets": len(buckets),
"current_items": len(state.items),
"elapsed": elapsed,
"rate": rate,
"current_bucket": current_bucket_name,
"best_score": best_score
}
)
# Send verbose log every 500 evaluations
if self.suits_evaluated % 500 == 0:
yield SearchResult(type="log", data={
"level": "info",
"message": f"Evaluated {self.suits_evaluated:,} combinations | Bucket: {current_bucket_name} ({bucket_idx+1}/{len(buckets)}) | Rate: {rate}/s",
"timestamp": elapsed
})
bucket = buckets[bucket_idx]
# DEBUG: Log bucket processing (reduced verbosity)
logger.debug(f"[DEBUG] Processing bucket {bucket_idx}: {bucket.slot} with {len(bucket.items)} items")
# Try each item in current bucket
items_tried = 0
items_accepted = 0
for item in bucket.items:
items_tried += 1
logger.debug(f"[DEBUG] Trying item {items_tried}/{len(bucket.items)} in {bucket.slot}: {item.name}")
if self.can_add_item(item, state):
items_accepted += 1
logger.debug(f"[DEBUG] Item ACCEPTED: {item.name} (#{items_accepted})")
# Add item to state
state.push(item)
# Continue search with next bucket
async for result in self.recursive_search(buckets, bucket_idx + 1, state):
yield result
# Remove item from state (backtrack)
state.pop(item.slot)
else:
logger.debug(f"[DEBUG] Item REJECTED: {item.name}")
logger.debug(f"[DEBUG] Bucket {bucket.slot} summary: {items_tried} tried, {items_accepted} accepted")
# Only skip if no items were accepted (allows incomplete suits when no valid items exist)
# If items were accepted, we already explored those paths - don't also explore skip
if items_accepted == 0:
logger.debug(f"[DEBUG] No items accepted for {bucket.slot}, trying skip")
async for result in self.recursive_search(buckets, bucket_idx + 1, state):
yield result
def can_add_item(self, item: SuitItem, state: SuitState) -> bool:
"""Check if item can be added without violating constraints."""
# Import translation function
def translate_equipment_set_id(set_id: str) -> str:
import main
if not hasattr(main, 'ENUM_MAPPINGS') or main.ENUM_MAPPINGS is None:
return f"Set {set_id}"
dictionaries = main.ENUM_MAPPINGS.get('dictionaries', {})
attribute_set_info = dictionaries.get('AttributeSetInfo', {}).get('values', {})
set_name = attribute_set_info.get(str(set_id))
return set_name if set_name else str(set_id)
# 1. Slot availability
if item.slot in state.occupied_slots:
logger.debug(f"[DEBUG] REJECT {item.name}: slot {item.slot} already occupied")
return False
# 2. Item uniqueness - same physical item can't be used in multiple slots
for existing_item in state.items.values():
if existing_item.id == item.id:
logger.debug(f"[DEBUG] REJECT {item.name}: item already used (duplicate ID)")
return False
# 3. Set piece validation - Use EFFECTIVE limits (account for locked slots)
if item.set_id:
# Convert item.set_id to numeric for comparison (it might be string or int)
try:
item_set_numeric = int(item.set_id) if isinstance(item.set_id, str) and item.set_id.isdigit() else item.set_id
except:
item_set_numeric = item.set_id
current_count = state.set_counts.get(item_set_numeric, 0)
# Use effective limits which account for locked slots
eff_primary = getattr(self, 'effective_primary_needed', 5)
eff_secondary = getattr(self, 'effective_secondary_needed', 4)
if item_set_numeric == self.constraints.primary_set:
# Primary set: use effective limit (accounts for locked pieces)
if current_count >= eff_primary:
logger.info(f"[SET_LIMIT] REJECT {item.name}: primary set {item_set_numeric} already has {current_count} pieces (effective max {eff_primary})")
return False
elif item_set_numeric == self.constraints.secondary_set:
# Secondary set: use effective limit (accounts for locked pieces)
if current_count >= eff_secondary:
logger.info(f"[SET_LIMIT] REJECT {item.name}: secondary set {item_set_numeric} already has {current_count} pieces (effective max {eff_secondary})")
return False
else:
# Check if this is a jewelry item - only allow if it contributes required spells
jewelry_slots = {"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"}
if item.slot in jewelry_slots:
# Jewelry MUST contribute to required spells to be accepted
if not self._jewelry_contributes_required_spell(item, state):
logger.debug(f"[DEBUG] REJECT {item.name}: jewelry doesn't contribute any required spells")
return False
logger.debug(f"[DEBUG] ACCEPT {item.name}: jewelry from set '{item_set_numeric}' contributes required spells")
else:
# STRICT: Reject armor items from other sets
# Only allow armor from the two user-selected sets
logger.debug(f"[DEBUG] REJECT {item.name}: armor from other set '{item_set_numeric}', only primary '{self.constraints.primary_set}' and secondary '{self.constraints.secondary_set}' allowed")
return False
else:
# For set optimization, reject items with no set ID unless they're clothing or jewelry
jewelry_slots = {"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"}
if item.slot in ['Shirt', 'Pants']:
# Allow clothing items even without set ID
logger.debug(f"[DEBUG] ACCEPT {item.name}: clothing item without set ID, allowed")
elif item.slot in jewelry_slots:
# Jewelry MUST contribute to required spells to be accepted
if not self._jewelry_contributes_required_spell(item, state):
logger.debug(f"[DEBUG] REJECT {item.name}: jewelry doesn't contribute any required spells")
return False
logger.debug(f"[DEBUG] ACCEPT {item.name}: jewelry without set ID contributes required spells")
else:
# Reject armor items without set ID for set optimization
logger.debug(f"[DEBUG] REJECT {item.name}: no set ID and not clothing/jewelry")
return False
# 4. Spell overlap constraints - STRICT: Reject items that don't contribute new spells
if self.constraints.required_spells and item.spell_names:
if not self._can_get_beneficial_spell_from(item, state):
# STRICT MODE: No fallback for target sets or good stats
# Items with ALL duplicate spells are rejected
# (Items with SOME new spells + some duplicates are accepted by _can_get_beneficial_spell_from)
logger.debug(f"[DEBUG] REJECT {item.name}: all spells are duplicates")
return False
logger.debug(f"[DEBUG] ACCEPT {item.name}: passed all constraints")
return True
def _is_double_spell_acceptable(self, item: SuitItem, overlap: int) -> bool:
"""Check if overlapping spell is acceptable (double spell both needed)."""
# Item must have exactly 2 spells for double spell logic
if len(item.spell_names) != 2:
return False
# Both spells must be in needed constraints
needed_bitmap = self.needed_spell_bitmap
for spell in item.spell_names:
spell_bit = self.spell_index.get_bitmap([spell])
if not (spell_bit & needed_bitmap):
return False
return True
def finalize_suit(self, state: SuitState) -> Optional[CompletedSuit]:
"""Convert state to completed suit with scoring."""
if not state.items:
return None
# Calculate score based on constraints and priorities
score = self._calculate_score(state)
# Determine fulfilled and missing spells
fulfilled_spells = []
missing_spells = []
if self.constraints.required_spells:
needed_bitmap = self.needed_spell_bitmap
fulfilled_bitmap = state.spell_bitmap & needed_bitmap
missing_bitmap = needed_bitmap & ~state.spell_bitmap
fulfilled_spells = self.spell_index.get_spell_names(fulfilled_bitmap)
missing_spells = self.spell_index.get_spell_names(missing_bitmap)
# Add locked spells to fulfilled and remove from missing
if hasattr(self, 'locked_spells') and self.locked_spells:
for spell in self.locked_spells:
if spell in missing_spells:
missing_spells.remove(spell)
fulfilled_spells.append(spell)
elif spell not in fulfilled_spells:
fulfilled_spells.append(spell)
return CompletedSuit(
items=state.items.copy(),
score=score,
total_armor=state.total_armor,
total_ratings=state.total_ratings.copy(),
set_counts=state.set_counts.copy(),
fulfilled_spells=fulfilled_spells,
missing_spells=missing_spells
)
def _calculate_score(self, state: SuitState) -> int:
"""Calculate suit score based on user specifications."""
score = 0
weights = self.scoring_weights
logger.debug(f"[SCORING] Starting score calculation for suit with {len(state.items)} items")
logger.debug(f"[SCORING] Set counts: {state.set_counts}")
logger.debug(f"[SCORING] Total ratings: {state.total_ratings}")
# 1. Complete armor sets (highest priority)
from main import translate_equipment_set_id
primary_set_name = translate_equipment_set_id(str(self.constraints.primary_set)) if self.constraints.primary_set else None
secondary_set_name = translate_equipment_set_id(str(self.constraints.secondary_set)) if self.constraints.secondary_set else None
logger.debug(f"[SCORING] Looking for primary set: {primary_set_name} (ID: {self.constraints.primary_set})")
logger.debug(f"[SCORING] Looking for secondary set: {secondary_set_name} (ID: {self.constraints.secondary_set})")
# Get FOUND counts (items in this suit, not including locked)
found_primary = state.set_counts.get(self.constraints.primary_set, 0) if self.constraints.primary_set else 0
found_secondary = state.set_counts.get(self.constraints.secondary_set, 0) if self.constraints.secondary_set else 0
# Get locked counts for display
locked_primary = self.locked_set_counts.get(self.constraints.primary_set, 0) if hasattr(self, 'locked_set_counts') and self.constraints.primary_set else 0
locked_secondary = self.locked_set_counts.get(self.constraints.secondary_set, 0) if hasattr(self, 'locked_set_counts') and self.constraints.secondary_set else 0
# Total counts (found + locked) for display
total_primary = found_primary + locked_primary
total_secondary = found_secondary + locked_secondary
# Get effective requirements with fallback
eff_primary = getattr(self, 'effective_primary_needed', 5)
eff_secondary = getattr(self, 'effective_secondary_needed', 4)
logger.debug(f"[SCORING] Primary: {found_primary} found + {locked_primary} locked = {total_primary} total (need {eff_primary} more)")
logger.debug(f"[SCORING] Secondary: {found_secondary} found + {locked_secondary} locked = {total_secondary} total (need {eff_secondary} more)")
# Complete set bonuses: compare FOUND against EFFECTIVE requirements
if found_primary >= eff_primary:
score += weights.armor_set_complete
logger.debug(f"[SCORING] Primary set complete: +{weights.armor_set_complete}")
# Penalty for EXCESS primary pieces (should have gone to secondary)
if found_primary > eff_primary:
excess = found_primary - eff_primary
excess_penalty = excess * 500 # STRONG penalty per excess piece
score -= excess_penalty
logger.debug(f"[SCORING] Primary set EXCESS ({excess} extra): -{excess_penalty}")
else:
# Missing set penalty: -200 per missing piece
if self.constraints.primary_set and found_primary > 0:
missing_pieces = eff_primary - found_primary
penalty = missing_pieces * weights.missing_set_penalty
score += penalty # negative penalty
logger.debug(f"[SCORING] Primary set incomplete ({found_primary}/{eff_primary}): {penalty}")
if found_secondary >= eff_secondary:
score += weights.armor_set_complete
logger.debug(f"[SCORING] Secondary set complete: +{weights.armor_set_complete}")
# Penalty for EXCESS secondary pieces
if found_secondary > eff_secondary:
excess = found_secondary - eff_secondary
excess_penalty = excess * 500 # STRONG penalty
score -= excess_penalty
logger.debug(f"[SCORING] Secondary set EXCESS ({excess} extra): -{excess_penalty}")
else:
# Missing set penalty: -200 per missing piece
if self.constraints.secondary_set and found_secondary > 0:
missing_pieces = eff_secondary - found_secondary
penalty = missing_pieces * weights.missing_set_penalty
score += penalty # negative penalty
logger.debug(f"[SCORING] Secondary set incomplete ({found_secondary}/{eff_secondary}): {penalty}")
# 2. Crit Damage Rating: CD1 = +10, CD2 = +20 per piece
for item in state.items.values():
crit_rating = item.ratings.get('crit_damage_rating', 0)
if crit_rating == 1:
score += weights.crit_damage_1
elif crit_rating == 2:
score += weights.crit_damage_2
# 3. Damage Rating on clothes only: DR1=+10, DR2=+20, DR3=+30 per piece
for item in state.items.values():
if item.slot in ['Shirt', 'Pants']: # Only clothes
damage_rating = item.ratings.get('damage_rating', 0)
if damage_rating == 1:
score += weights.damage_rating_1
elif damage_rating == 2:
score += weights.damage_rating_2
elif damage_rating == 3:
score += weights.damage_rating_3
# 4. Spell Coverage: +100 per fulfilled cantrip/ward (no duplicates)
if self.constraints.required_spells:
fulfilled_spells = state.spell_bitmap & self.needed_spell_bitmap
fulfilled_count = bin(fulfilled_spells).count('1')
spell_score = fulfilled_count * 100
score += spell_score
logger.debug(f"[SCORING] Spell coverage: {fulfilled_count} spells = +{spell_score} points")
# 5. Base score for having items (so suits aren't rejected as 0)
# Add small base score per item to avoid 0 scores
base_item_score = len(state.items) * 5
score += base_item_score
logger.debug(f"[SCORING] Base item score: {len(state.items)} items = +{base_item_score} points")
# 6. Armor level as tiebreaker (LOWEST PRIORITY)
# Scale down significantly so it only matters when other scores are equal
armor_score = state.total_armor // 100 # ~5 points per 500 AL
score += armor_score
logger.debug(f"[SCORING] Armor level tiebreaker: {state.total_armor} AL = +{armor_score} points")
logger.debug(f"[SCORING] Final score: {score}")
return max(0, score) # Never negative
def is_better_than_existing(self, suit: CompletedSuit) -> bool:
"""Check if suit is worth keeping."""
logger.info(f"[DEBUG] is_better_than_existing: checking suit with score {suit.score}, current best_suits count: {len(self.best_suits)}, max_results: {self.constraints.max_results}")
if len(self.best_suits) < self.constraints.max_results:
logger.info(f"[DEBUG] is_better_than_existing: ACCEPTING suit - not at max capacity")
return True
# Keep suits with more items even if score is lower - we want complete suits
lowest_suit = self.best_suits[-1]
logger.info(f"[DEBUG] is_better_than_existing: comparing against lowest suit score {lowest_suit.score}")
if len(suit.items) > len(lowest_suit.items):
logger.info(f"[DEBUG] is_better_than_existing: ACCEPTING suit - more items ({len(suit.items)} > {len(lowest_suit.items)})")
return True
result = suit.score > lowest_suit.score
logger.info(f"[DEBUG] is_better_than_existing: score comparison result: {result}")
return result
def _has_room_for_armor_set(self, item: SuitItem, state: SuitState) -> bool:
"""Check if adding this armor piece violates set limits (Mag-SuitBuilder HasRoomForArmorSet).
Uses effective limits which account for locked slots.
"""
if not item.set_id:
return True # Non-set items don't count against limits
current_count = state.set_counts.get(item.set_id, 0)
# Use effective limits (which account for locked slots)
eff_primary = getattr(self, 'effective_primary_needed', 5)
eff_secondary = getattr(self, 'effective_secondary_needed', 4)
if item.set_id == self.constraints.primary_set:
# Hard limit: don't add more primary set pieces than needed
has_room = current_count < eff_primary
if not has_room:
logger.debug(f"[SET_LIMIT] Rejecting {item.name} - already have {current_count}/{eff_primary} primary set pieces")
return has_room
elif item.set_id == self.constraints.secondary_set:
has_room = current_count < eff_secondary
if not has_room:
logger.debug(f"[SET_LIMIT] Rejecting {item.name} - already have {current_count}/{eff_secondary} secondary set pieces")
return has_room
else:
# STRICT: Other sets not allowed for armor
# Only jewelry can be from other sets
jewelry_slots = {"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"}
return item.slot in jewelry_slots
def _convert_set_name_to_id(self, set_name: Optional[str]) -> Optional[int]:
"""Convert set name to numeric ID for constraint comparison."""
if not set_name:
return None
# Reverse mapping of SET_NAMES
name_to_id = {
"Adept's Set": 14,
"Defender's Set": 16,
"Soldier's Set": 13,
"Wise Set": 21,
"Heroic Protector Set": 40,
"Heroic Destroyer Set": 41,
"Relic Alduressa Set": 46,
"Ancient Relic Set": 47,
"Noble Relic Set": 48,
"Archer's Set": 15,
"Hearty Set": 19,
"Dexterous Set": 20,
"Swift Set": 22,
"Reinforced Set": 24,
"Flame Proof Set": 26,
"Lightning Proof Set": 29
}
return name_to_id.get(set_name)
def _can_get_beneficial_spell_from(self, item: SuitItem, state: SuitState) -> bool:
"""Check if item provides beneficial spells without duplicates.
STRICT MODE: Reject items that only have duplicate spells, even from target sets.
This prevents wasted spell slots (e.g., Flame Ward on 3 armor pieces).
"""
# Non-spell items are always beneficial (armor/ratings only)
if not item.spell_names:
return True
# If no spell constraints specified, allow any item
if not self.constraints.required_spells:
return True
# STRICT: Check if item provides ANY new required spell not already covered
needed_bitmap = self.needed_spell_bitmap
current_bitmap = state.spell_bitmap
item_bitmap = item.spell_bitmap
# Item MUST provide at least one new required spell
new_beneficial_spells = item_bitmap & needed_bitmap & ~current_bitmap
if new_beneficial_spells != 0:
return True # Has new required spells - beneficial
# STRICT REJECTION: No new required spells = reject
# This applies even to primary/secondary set pieces
# Rationale: Duplicate spells waste valuable spell slots
return False
def _jewelry_contributes_required_spell(self, item: SuitItem, state: SuitState) -> bool:
"""Check if jewelry item contributes at least one required spell not already covered.
Jewelry should ONLY be added if it fulfills spell constraints. Empty slots are
preferred over jewelry that doesn't contribute to required spells.
"""
# If no spell constraints, don't add jewelry (nothing to contribute)
if not self.constraints.required_spells:
logger.debug(f"[JEWELRY] REJECT {item.name}: no required spells specified")
return False
# Item must have spells to contribute
if not item.spell_names:
logger.debug(f"[JEWELRY] REJECT {item.name}: item has no spells")
return False
# Check if item has ANY required spell that's not already in the suit
needed_bitmap = self.needed_spell_bitmap
current_bitmap = state.spell_bitmap
for spell in item.spell_names:
spell_bit = self.spell_index.get_bitmap([spell])
# Check if this spell is required AND not already covered
if (spell_bit & needed_bitmap) and not (current_bitmap & spell_bit):
logger.debug(f"[JEWELRY] ACCEPT {item.name}: contributes uncovered spell '{spell}'")
return True
logger.debug(f"[JEWELRY] REJECT {item.name}: no new required spells contributed")
return False
# API Endpoints
@router.get("/characters")
async def get_available_characters():
"""Get list of characters with inventory data."""
query = """
SELECT DISTINCT character_name
FROM items
ORDER BY character_name
"""
try:
rows = await database.fetch_all(query)
characters = [row['character_name'] for row in rows]
return {"characters": characters}
except Exception as e:
logger.error(f"Error fetching characters: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/search")
async def search_suits(request: Request, constraints: SearchConstraints):
"""Start suit search with streaming results."""
# Create cancellation check function
async def is_cancelled():
return await request.is_disconnected()
solver = ConstraintSatisfactionSolver(constraints, is_cancelled=is_cancelled)
async def event_generator():
try:
logger.info("Starting SSE event generator")
async for result in solver.search():
# Check for client disconnection
if await request.is_disconnected():
logger.info("Client disconnected, stopping search")
yield {
"event": "cancelled",
"data": json.dumps({"message": "Search cancelled by client"})
}
return
logger.info(f"Yielding SSE event: {result.type}")
try:
# Send full data for frontend processing
data_json = json.dumps(result.data)
yield {
"event": result.type,
"data": data_json
}
except Exception as e:
logger.error(f"Error serializing result data: {e}", exc_info=True)
logger.error(f"Result type: {result.type}, Data keys: {list(result.data.keys()) if hasattr(result.data, 'keys') else 'N/A'}")
yield {
"event": "error",
"data": json.dumps({"message": f"Serialization error: {str(e)}"})
}
logger.info("SSE event generator completed")
except Exception as e:
logger.error(f"Error in search generator: {e}", exc_info=True)
yield {
"event": "error",
"data": json.dumps({"message": f"Search error: {str(e)}"})
}
async def sse_generator():
async for event in event_generator():
# Manual SSE format
yield f"event: {event['event']}\n"
yield f"data: {event['data']}\n\n"
return StreamingResponse(
sse_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Cache-Control"
}
)
@router.post("/search-test")
async def search_suits_test(constraints: SearchConstraints):
"""Test endpoint for suit search without SSE."""
try:
solver = ConstraintSatisfactionSolver(constraints)
results = []
async for result in solver.search():
if result.type == "suit":
results.append({
"score": result.data.get("score", 0),
"total_armor": result.data.get("total_armor", 0),
"item_count": len(result.data.get("items", {}))
})
elif result.type == "complete":
return {
"success": True,
"suits_found": len(results),
"duration": result.data.get("duration", 0),
"results": results[:5] # Return first 5 results
}
return {"success": False, "error": "No completion event received"}
except Exception as e:
logger.error(f"Test search error: {e}", exc_info=True)
return {"success": False, "error": str(e)}
@router.get("/sets")
async def get_equipment_sets():
"""Get available equipment sets."""
# Return hardcoded sets for now
sets = [
{"id": 14, "name": "Adept's"},
{"id": 16, "name": "Defender's"},
{"id": 13, "name": "Soldier's"},
{"id": 21, "name": "Wise"},
{"id": 40, "name": "Heroic Protector"},
{"id": 41, "name": "Heroic Destroyer"},
{"id": 46, "name": "Relic Alduressa"},
{"id": 47, "name": "Ancient Relic"},
{"id": 48, "name": "Noble Relic"},
{"id": 15, "name": "Archer's"},
{"id": 19, "name": "Hearty"},
{"id": 20, "name": "Dexterous"},
{"id": 22, "name": "Swift"},
{"id": 24, "name": "Reinforced"},
{"id": 26, "name": "Flame Proof"},
{"id": 29, "name": "Lightning Proof"}
]
return {"sets": sets}