- Add dedicated streaming proxy endpoint for real-time suitbuilder SSE updates - Implement stable sorting with character_name and name tiebreakers for deterministic results - Refactor locked items to locked slots supporting set_id and spell constraints - Add Mag-SuitBuilder style branch pruning tracking variables - Enhance search with phase updates and detailed progress logging - Update design document with SSE streaming proxy fix details Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2217 lines
No EOL
104 KiB
Python
2217 lines
No EOL
104 KiB
Python
"""
|
|
Suitbuilder - Equipment optimization system for Asheron's Call.
|
|
|
|
Implements constraint satisfaction solver to find optimal equipment combinations
|
|
across multiple characters' inventories based on armor sets, spell coverage, and ratings.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import asyncio
|
|
from enum import IntFlag, Enum
|
|
from typing import Dict, List, Optional, Any, Set, Tuple, AsyncGenerator
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
import time
|
|
|
|
from fastapi import APIRouter, HTTPException, Query, Depends, Request
|
|
from fastapi.responses import StreamingResponse
|
|
from pydantic import BaseModel
|
|
from sse_starlette.sse import EventSourceResponse
|
|
import databases
|
|
import sqlalchemy as sa
|
|
|
|
from database import (
|
|
DATABASE_URL, Item, ItemCombatStats, ItemRequirements,
|
|
ItemEnhancements, ItemRatings, ItemSpells, ItemRawData
|
|
)
|
|
|
|
# Import shared helper functions
|
|
import json as json_module
|
|
# Removed circular import - will implement locally if needed
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Database connection will be injected from main service
|
|
database = None
|
|
|
|
# Set name translation mapping
|
|
SET_NAMES = {
|
|
14: "Adept's",
|
|
16: "Defender's",
|
|
13: "Soldier's",
|
|
21: "Wise",
|
|
40: "Heroic Protector",
|
|
41: "Heroic Destroyer",
|
|
46: "Relic Alduressa",
|
|
47: "Ancient Relic",
|
|
48: "Noble Relic",
|
|
15: "Archer's",
|
|
19: "Hearty",
|
|
20: "Dexterous",
|
|
22: "Swift",
|
|
24: "Reinforced",
|
|
26: "Flame Proof",
|
|
29: "Lightning Proof"
|
|
}
|
|
|
|
def get_set_name(set_id: Optional[int]) -> str:
|
|
"""Translate set ID to readable set name."""
|
|
if set_id is None:
|
|
return ""
|
|
|
|
# If set_id is already a string with "Set" in it, it's already translated
|
|
if isinstance(set_id, str) and "Set" in set_id:
|
|
return set_id
|
|
|
|
# Otherwise translate numeric ID
|
|
return SET_NAMES.get(set_id, f"Set {set_id}")
|
|
|
|
def set_database_connection(db_instance):
|
|
"""Set the database connection from main service."""
|
|
global database
|
|
database = db_instance
|
|
logger.info("Suitbuilder database connection established")
|
|
|
|
# Create router for suitbuilder endpoints
|
|
router = APIRouter()
|
|
|
|
|
|
class CoverageMask(IntFlag):
|
|
"""
|
|
Bit flags for armor coverage areas.
|
|
Values match Mag-SuitBuilder exactly for compatibility.
|
|
"""
|
|
NONE = 0x0
|
|
|
|
# Underwear coverage (shirts/pants)
|
|
UNDERWEAR_UPPER_LEGS = 0x00000002
|
|
UNDERWEAR_LOWER_LEGS = 0x00000004
|
|
UNDERWEAR_CHEST = 0x00000008
|
|
UNDERWEAR_ABDOMEN = 0x00000010
|
|
UNDERWEAR_UPPER_ARMS = 0x00000020
|
|
UNDERWEAR_LOWER_ARMS = 0x00000040
|
|
|
|
# Outerwear/Armor coverage (the important ones for armor suits)
|
|
OUTERWEAR_UPPER_LEGS = 0x00000100
|
|
OUTERWEAR_LOWER_LEGS = 0x00000200
|
|
OUTERWEAR_CHEST = 0x00000400
|
|
OUTERWEAR_ABDOMEN = 0x00000800
|
|
OUTERWEAR_UPPER_ARMS = 0x00001000
|
|
OUTERWEAR_LOWER_ARMS = 0x00002000
|
|
|
|
# Extremities
|
|
HEAD = 0x00004000
|
|
HANDS = 0x00008000
|
|
FEET = 0x00010000
|
|
|
|
# Convenience aliases matching our slot names
|
|
CHEST = OUTERWEAR_CHEST
|
|
ABDOMEN = OUTERWEAR_ABDOMEN
|
|
UPPER_ARMS = OUTERWEAR_UPPER_ARMS
|
|
LOWER_ARMS = OUTERWEAR_LOWER_ARMS
|
|
UPPER_LEGS = OUTERWEAR_UPPER_LEGS
|
|
LOWER_LEGS = OUTERWEAR_LOWER_LEGS
|
|
|
|
def reduction_options(self) -> List['CoverageMask']:
|
|
"""
|
|
Returns possible reductions for multi-coverage items.
|
|
Based on exact Mag-SuitBuilder logic for armor tailoring.
|
|
"""
|
|
# Single coverage items cannot be reduced
|
|
if bin(self.value).count('1') <= 1:
|
|
return []
|
|
|
|
# Robes cannot be reduced (exclude from suits entirely)
|
|
if self.is_robe():
|
|
return []
|
|
|
|
reductions = []
|
|
|
|
# Specific reduction patterns from Mag-SuitBuilder
|
|
if self == (CoverageMask.UPPER_ARMS | CoverageMask.LOWER_ARMS):
|
|
# Arm guards can be reduced to either upper or lower arms
|
|
reductions.extend([CoverageMask.UPPER_ARMS, CoverageMask.LOWER_ARMS])
|
|
elif self == (CoverageMask.UPPER_LEGS | CoverageMask.LOWER_LEGS):
|
|
# Leg guards can be reduced to either upper or lower legs
|
|
reductions.extend([CoverageMask.UPPER_LEGS, CoverageMask.LOWER_LEGS])
|
|
elif self == (CoverageMask.LOWER_LEGS | CoverageMask.FEET):
|
|
# Boots that cover lower legs and feet can be reduced to feet only
|
|
reductions.append(CoverageMask.FEET)
|
|
elif self == (CoverageMask.CHEST | CoverageMask.ABDOMEN):
|
|
# Chest+abdomen pieces can be reduced to chest only
|
|
reductions.append(CoverageMask.CHEST)
|
|
elif self == (CoverageMask.CHEST | CoverageMask.ABDOMEN | CoverageMask.UPPER_ARMS):
|
|
# Chest+abdomen+upper arms can be reduced to chest
|
|
reductions.append(CoverageMask.CHEST)
|
|
elif self == (CoverageMask.CHEST | CoverageMask.UPPER_ARMS | CoverageMask.LOWER_ARMS):
|
|
# Chest+arms can be reduced to chest
|
|
reductions.append(CoverageMask.CHEST)
|
|
elif self == (CoverageMask.CHEST | CoverageMask.UPPER_ARMS):
|
|
# Chest+upper arms can be reduced to chest
|
|
reductions.append(CoverageMask.CHEST)
|
|
elif self == (CoverageMask.ABDOMEN | CoverageMask.UPPER_LEGS | CoverageMask.LOWER_LEGS):
|
|
# Tassets covering abdomen and legs can be reduced to any of the three
|
|
reductions.extend([CoverageMask.ABDOMEN, CoverageMask.UPPER_LEGS, CoverageMask.LOWER_LEGS])
|
|
elif self == (CoverageMask.CHEST | CoverageMask.ABDOMEN | CoverageMask.UPPER_ARMS | CoverageMask.LOWER_ARMS):
|
|
# Hauberks can be reduced to chest
|
|
reductions.append(CoverageMask.CHEST)
|
|
elif self == (CoverageMask.ABDOMEN | CoverageMask.UPPER_LEGS):
|
|
# Pre-2010 retail pieces - reduce to abdomen only
|
|
reductions.append(CoverageMask.ABDOMEN)
|
|
|
|
return reductions
|
|
|
|
def is_robe(self) -> bool:
|
|
"""
|
|
Check if this coverage represents a robe using exact Mag-SuitBuilder logic.
|
|
Robe pattern: 0x00013F00 (7 specific coverage areas excluding head and hands)
|
|
"""
|
|
# Exact robe pattern from Mag-SuitBuilder: 0x00013F00
|
|
MAG_ROBE_PATTERN = 0x00013F00
|
|
|
|
# Check for exact match first (most reliable)
|
|
if self.value == MAG_ROBE_PATTERN:
|
|
return True
|
|
|
|
# Also check for the component pattern manually in case of slight variations
|
|
robe_components = (
|
|
CoverageMask.FEET |
|
|
CoverageMask.OUTERWEAR_UPPER_LEGS |
|
|
CoverageMask.OUTERWEAR_LOWER_LEGS |
|
|
CoverageMask.OUTERWEAR_CHEST |
|
|
CoverageMask.OUTERWEAR_ABDOMEN |
|
|
CoverageMask.OUTERWEAR_UPPER_ARMS |
|
|
CoverageMask.OUTERWEAR_LOWER_ARMS
|
|
)
|
|
|
|
if self.value == robe_components.value:
|
|
return True
|
|
|
|
# Fallback: 6+ coverage areas as general indicator
|
|
# but log it for investigation
|
|
coverage_count = bin(self.value).count('1')
|
|
if coverage_count >= 6:
|
|
logger.info(f"Potential robe detected with {coverage_count} coverage areas: 0x{self.value:08X} (not exact pattern 0x{MAG_ROBE_PATTERN:08X})")
|
|
return True
|
|
|
|
return False
|
|
|
|
def to_slot_name(self) -> Optional[str]:
|
|
"""Convert single coverage mask to equipment slot name."""
|
|
mapping = {
|
|
CoverageMask.HEAD: "Head",
|
|
CoverageMask.CHEST: "Chest",
|
|
CoverageMask.UPPER_ARMS: "Upper Arms",
|
|
CoverageMask.LOWER_ARMS: "Lower Arms",
|
|
CoverageMask.HANDS: "Hands",
|
|
CoverageMask.ABDOMEN: "Abdomen",
|
|
CoverageMask.UPPER_LEGS: "Upper Legs",
|
|
CoverageMask.LOWER_LEGS: "Lower Legs",
|
|
CoverageMask.FEET: "Feet"
|
|
}
|
|
|
|
# Only works for single coverage
|
|
if self in mapping:
|
|
return mapping[self]
|
|
return None
|
|
|
|
|
|
@dataclass
|
|
class SuitItem:
|
|
"""Processed item ready for optimization."""
|
|
id: int
|
|
name: str
|
|
character_name: str
|
|
slot: str # Equipment slot name
|
|
coverage: Optional[CoverageMask] = None # For armor items
|
|
set_id: Optional[int] = None
|
|
armor_level: int = 0
|
|
ratings: Dict[str, int] = field(default_factory=dict)
|
|
spell_bitmap: int = 0
|
|
spell_names: List[str] = field(default_factory=list)
|
|
is_locked: bool = False # For user-locked items
|
|
material: Optional[str] = None # Material type for reduction eligibility
|
|
|
|
def __hash__(self):
|
|
"""Make item hashable for set operations."""
|
|
return hash(self.id)
|
|
|
|
@property
|
|
def is_armor(self) -> bool:
|
|
"""Check if this item provides armor (for Mag-SuitBuilder compatibility)."""
|
|
return self.armor_level > 0
|
|
|
|
|
|
@dataclass
|
|
class ItemBucket:
|
|
"""Container for items that can fit in a specific slot."""
|
|
slot: str
|
|
items: List[SuitItem] = field(default_factory=list)
|
|
is_armor: bool = False
|
|
is_required: bool = False # Some slots might be required by constraints
|
|
|
|
def sort_items(self):
|
|
"""Sort items by priority based on slot type.
|
|
|
|
All sorts include (character_name, name) as stable tiebreakers
|
|
to ensure deterministic ordering for reproducible search results.
|
|
"""
|
|
if self.slot in ['Shirt', 'Pants']:
|
|
# Underclothes: damage_rating first, ignore armor_level (buffed armor irrelevant)
|
|
self.items.sort(
|
|
key=lambda item: (
|
|
item.ratings.get('damage_rating', 0),
|
|
len(item.spell_names),
|
|
sum(r for k, r in item.ratings.items() if k != 'damage_rating'),
|
|
item.character_name,
|
|
item.name
|
|
),
|
|
reverse=True
|
|
)
|
|
elif self.is_armor:
|
|
# Armor: armor_level first, then crit damage, then spells
|
|
self.items.sort(
|
|
key=lambda item: (
|
|
item.armor_level,
|
|
item.ratings.get('crit_damage_rating', 0),
|
|
len(item.spell_names),
|
|
sum(item.ratings.values()),
|
|
item.character_name,
|
|
item.name
|
|
),
|
|
reverse=True
|
|
)
|
|
else:
|
|
# Jewelry: spells first, then total ratings
|
|
self.items.sort(
|
|
key=lambda item: (
|
|
len(item.spell_names),
|
|
sum(item.ratings.values()),
|
|
item.character_name,
|
|
item.name
|
|
),
|
|
reverse=True
|
|
)
|
|
|
|
|
|
class SpellBitmapIndex:
|
|
"""Maps spell names to bit positions for O(1) overlap detection."""
|
|
|
|
def __init__(self):
|
|
self.spell_to_bit: Dict[str, int] = {}
|
|
self.bit_to_spell: Dict[int, str] = {}
|
|
self._next_bit = 0
|
|
|
|
def register_spell(self, spell_name: str) -> int:
|
|
"""Register a spell and return its bit position."""
|
|
if spell_name not in self.spell_to_bit:
|
|
if self._next_bit >= 64:
|
|
# For more than 64 spells, we'd need to use multiple integers
|
|
logger.warning(f"More than 64 unique spells detected. Spell: {spell_name}")
|
|
bit_position = 1 << self._next_bit
|
|
self.spell_to_bit[spell_name] = bit_position
|
|
self.bit_to_spell[bit_position] = spell_name
|
|
self._next_bit += 1
|
|
return self.spell_to_bit[spell_name]
|
|
|
|
def get_bitmap(self, spells: List[str]) -> int:
|
|
"""Convert spell list to bitmap representation."""
|
|
bitmap = 0
|
|
for spell in spells:
|
|
bitmap |= self.register_spell(spell)
|
|
return bitmap
|
|
|
|
def get_spell_names(self, bitmap: int) -> List[str]:
|
|
"""Convert bitmap back to spell names."""
|
|
spells = []
|
|
for bit, spell in self.bit_to_spell.items():
|
|
if bitmap & bit:
|
|
spells.append(spell)
|
|
return spells
|
|
|
|
def would_add_needed_spell(self, item_bitmap: int, needed_bitmap: int, current_bitmap: int) -> bool:
|
|
"""Check if item adds any needed spell not already covered."""
|
|
# New spells the item would add
|
|
new_spells = item_bitmap & ~current_bitmap
|
|
# Check if any new spells are needed
|
|
return bool(new_spells & needed_bitmap)
|
|
|
|
|
|
@dataclass
|
|
class SuitState:
|
|
"""Mutable state during search."""
|
|
items: Dict[str, SuitItem] = field(default_factory=dict) # slot -> item
|
|
spell_bitmap: int = 0
|
|
set_counts: Dict[int, int] = field(default_factory=dict) # set_id -> count
|
|
total_armor: int = 0
|
|
total_ratings: Dict[str, int] = field(default_factory=dict)
|
|
occupied_slots: Set[str] = field(default_factory=set)
|
|
|
|
def push(self, item: SuitItem) -> None:
|
|
"""Add item to suit (modifies state)."""
|
|
self.items[item.slot] = item
|
|
self.occupied_slots.add(item.slot)
|
|
self.spell_bitmap |= item.spell_bitmap
|
|
|
|
# Update set counts
|
|
if item.set_id:
|
|
self.set_counts[item.set_id] = self.set_counts.get(item.set_id, 0) + 1
|
|
logger.debug(f"[STATE] Added item with set_id {item.set_id}, set_counts now: {self.set_counts}")
|
|
|
|
# Update totals
|
|
self.total_armor += item.armor_level
|
|
for rating_name, value in item.ratings.items():
|
|
self.total_ratings[rating_name] = self.total_ratings.get(rating_name, 0) + value
|
|
|
|
def pop(self, slot: str) -> Optional[SuitItem]:
|
|
"""Remove item from slot (modifies state)."""
|
|
if slot not in self.items:
|
|
return None
|
|
|
|
item = self.items.pop(slot)
|
|
self.occupied_slots.remove(slot)
|
|
|
|
# Rebuild spell bitmap (can't just subtract due to overlaps)
|
|
self.spell_bitmap = 0
|
|
for remaining_item in self.items.values():
|
|
self.spell_bitmap |= remaining_item.spell_bitmap
|
|
|
|
# Update set counts
|
|
if item.set_id:
|
|
self.set_counts[item.set_id] -= 1
|
|
if self.set_counts[item.set_id] == 0:
|
|
del self.set_counts[item.set_id]
|
|
|
|
# Update totals
|
|
self.total_armor -= item.armor_level
|
|
for rating_name, value in item.ratings.items():
|
|
if rating_name in self.total_ratings:
|
|
self.total_ratings[rating_name] -= value
|
|
if self.total_ratings[rating_name] <= 0:
|
|
del self.total_ratings[rating_name]
|
|
|
|
return item
|
|
|
|
def clone(self) -> 'SuitState':
|
|
"""Deep copy for branching."""
|
|
new_state = SuitState()
|
|
new_state.items = self.items.copy()
|
|
new_state.spell_bitmap = self.spell_bitmap
|
|
new_state.set_counts = self.set_counts.copy()
|
|
new_state.total_armor = self.total_armor
|
|
new_state.total_ratings = self.total_ratings.copy()
|
|
new_state.occupied_slots = self.occupied_slots.copy()
|
|
return new_state
|
|
|
|
|
|
class ScoringWeights(BaseModel):
|
|
"""Configurable scoring weights."""
|
|
armor_set_complete: int = 1000 # Complete sets (primary/secondary)
|
|
missing_set_penalty: int = -200 # Missing set pieces penalty
|
|
crit_damage_1: int = 10 # CD1 rating points
|
|
crit_damage_2: int = 20 # CD2 rating points
|
|
damage_rating_1: int = 10 # DR1 on clothes
|
|
damage_rating_2: int = 20 # DR2 on clothes
|
|
damage_rating_3: int = 30 # DR3 on clothes
|
|
|
|
|
|
class LockedSlotInfo(BaseModel):
|
|
"""Information about a locked slot."""
|
|
set_id: Optional[int] = None
|
|
spells: List[str] = []
|
|
|
|
|
|
class SearchConstraints(BaseModel):
|
|
"""User-defined search constraints."""
|
|
characters: List[str]
|
|
primary_set: Optional[int] = None
|
|
secondary_set: Optional[int] = None
|
|
required_spells: List[str] = field(default_factory=list)
|
|
locked_slots: Dict[str, LockedSlotInfo] = field(default_factory=dict) # slot -> lock info
|
|
include_equipped: bool = True
|
|
include_inventory: bool = True
|
|
min_armor: Optional[int] = None
|
|
max_armor: Optional[int] = None
|
|
min_crit_damage: Optional[int] = None
|
|
max_crit_damage: Optional[int] = None
|
|
min_damage_rating: Optional[int] = None
|
|
max_damage_rating: Optional[int] = None
|
|
scoring_weights: Optional[ScoringWeights] = None
|
|
max_results: int = 50
|
|
search_timeout: int = 300 # seconds
|
|
|
|
|
|
@dataclass
|
|
class CompletedSuit:
|
|
"""Final suit result."""
|
|
items: Dict[str, SuitItem]
|
|
score: int
|
|
total_armor: int
|
|
total_ratings: Dict[str, int]
|
|
set_counts: Dict[int, int]
|
|
fulfilled_spells: List[str]
|
|
missing_spells: List[str]
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
# Build transfer summary
|
|
transfer_by_character = {}
|
|
total_items = 0
|
|
|
|
for slot, item in self.items.items():
|
|
character = item.character_name
|
|
if character not in transfer_by_character:
|
|
transfer_by_character[character] = []
|
|
transfer_by_character[character].append(item.name)
|
|
total_items += 1
|
|
|
|
# Build transfer instructions
|
|
instructions = []
|
|
step = 1
|
|
for character, items in sorted(transfer_by_character.items()):
|
|
for item_name in items:
|
|
instructions.append(f"{step}. Transfer {item_name} from {character} to new character")
|
|
step += 1
|
|
instructions.append(f"{step}. Equip all transferred items on new character")
|
|
|
|
return {
|
|
"id": hash(tuple(sorted(self.items.keys()))), # Generate ID from slots
|
|
"score": self.score,
|
|
"items": {
|
|
slot: {
|
|
"id": item.id,
|
|
"name": item.name,
|
|
"source_character": item.character_name,
|
|
"armor_level": item.armor_level,
|
|
"ratings": item.ratings,
|
|
"spells": item.spell_names,
|
|
"set_id": item.set_id,
|
|
"set_name": get_set_name(item.set_id) # Translate set ID to name
|
|
}
|
|
for slot, item in self.items.items()
|
|
},
|
|
"stats": {
|
|
"total_armor": self.total_armor,
|
|
"total_crit_damage": self.total_ratings.get('crit_damage_rating', 0),
|
|
"total_damage_rating": self.total_ratings.get('damage_rating', 0),
|
|
"primary_set_count": 0,
|
|
"secondary_set_count": 0,
|
|
"spell_coverage": len(self.fulfilled_spells)
|
|
},
|
|
"missing": self.missing_spells,
|
|
"notes": [],
|
|
"transfer_summary": {
|
|
"total_items": total_items,
|
|
"from_characters": transfer_by_character
|
|
},
|
|
"instructions": instructions
|
|
}
|
|
|
|
|
|
class SearchResult(BaseModel):
|
|
"""Result yielded during search."""
|
|
type: str # "suit", "progress", "complete", "error"
|
|
data: Any
|
|
|
|
|
|
class ItemPreFilter:
|
|
"""Pre-filtering system to remove dominated items before search."""
|
|
|
|
@staticmethod
|
|
def remove_surpassed_items(items: List[SuitItem]) -> List[SuitItem]:
|
|
"""Remove items dominated by better alternatives (LeanMyWorldObjectExtensions.cs:9-24)"""
|
|
filtered_items = []
|
|
for item in items:
|
|
is_surpassed = False
|
|
for compare_item in items:
|
|
if compare_item == item:
|
|
continue
|
|
if ItemPreFilter._is_surpassed_by(item, compare_item):
|
|
is_surpassed = True
|
|
break
|
|
if not is_surpassed:
|
|
filtered_items.append(item)
|
|
logger.info(f"Pre-filter: {len(items)} -> {len(filtered_items)} items (removed {len(items) - len(filtered_items)} surpassed)")
|
|
return filtered_items
|
|
|
|
@staticmethod
|
|
def _is_surpassed_by(item: SuitItem, compare_item: SuitItem) -> bool:
|
|
"""Check if item is dominated by compare_item (LeanMyWorldObject.cs:90-147)"""
|
|
# Items must be same slot to be comparable
|
|
if item.slot != compare_item.slot:
|
|
return False
|
|
|
|
# Items must be same set to be comparable (or both no-set)
|
|
if item.set_id != compare_item.set_id:
|
|
return False
|
|
|
|
# Compare spells (higher level cantrips surpass lower)
|
|
if not ItemPreFilter._spells_surpass_or_equal(compare_item.spell_names, item.spell_names):
|
|
return False
|
|
|
|
# Compare ratings - compare_item must be better in at least one category
|
|
better_in_something = False
|
|
for rating_key in ['crit_damage_rating', 'damage_rating']:
|
|
item_rating = item.ratings.get(rating_key, 0)
|
|
compare_rating = compare_item.ratings.get(rating_key, 0)
|
|
if compare_rating > item_rating:
|
|
better_in_something = True
|
|
elif item_rating > compare_rating:
|
|
return False # Item is better in this category
|
|
|
|
# Also compare armor level for armor pieces
|
|
if item.armor_level > 0 and compare_item.armor_level > 0:
|
|
if compare_item.armor_level > item.armor_level:
|
|
better_in_something = True
|
|
elif item.armor_level > compare_item.armor_level:
|
|
return False
|
|
|
|
return better_in_something
|
|
|
|
@staticmethod
|
|
def _spells_surpass_or_equal(spells1: List[str], spells2: List[str]) -> bool:
|
|
"""Check if spells1 surpass or equal spells2"""
|
|
# For each spell in spells2, find equal or better in spells1
|
|
for spell2 in spells2:
|
|
found_surpassing = False
|
|
for spell1 in spells1:
|
|
if spell1 == spell2 or ItemPreFilter._spell_surpasses(spell1, spell2):
|
|
found_surpassing = True
|
|
break
|
|
if not found_surpassing:
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def _spell_surpasses(spell1: str, spell2: str) -> bool:
|
|
"""Check if spell1 surpasses spell2 (higher level of same type)"""
|
|
# Epic surpasses Major, Legendary surpasses Epic, etc.
|
|
if "Legendary" in spell1 and ("Epic" in spell2 or "Major" in spell2):
|
|
base1 = spell1.replace("Legendary ", "")
|
|
base2 = spell2.replace("Epic ", "").replace("Major ", "")
|
|
return base1 == base2
|
|
if "Epic" in spell1 and "Major" in spell2:
|
|
base1 = spell1.replace("Epic ", "")
|
|
base2 = spell2.replace("Major ", "")
|
|
return base1 == base2
|
|
return False
|
|
|
|
|
|
class ConstraintSatisfactionSolver:
|
|
"""Main optimization solver."""
|
|
|
|
def __init__(self, constraints: SearchConstraints, is_cancelled=None):
|
|
self.constraints = constraints
|
|
self.spell_index = SpellBitmapIndex()
|
|
self.best_suits: List[CompletedSuit] = []
|
|
self.suits_evaluated = 0
|
|
self.start_time = time.time()
|
|
self.scoring_weights = constraints.scoring_weights or ScoringWeights()
|
|
self.search_completed = False
|
|
self.is_cancelled = is_cancelled # Callback to check if search should stop
|
|
|
|
# Branch pruning: track best suit found so far (Mag-SuitBuilder style)
|
|
self.best_suit_item_count = 0
|
|
self.highest_armor_count_suit_built = 0 # Track highest armor piece count seen
|
|
self.total_armor_buckets_with_items = 0 # Will be set during bucket creation
|
|
|
|
# Pre-compute needed spell bitmap
|
|
self.needed_spell_bitmap = self.spell_index.get_bitmap(constraints.required_spells)
|
|
logger.info(f"[SPELL_CONSTRAINTS_DEBUG] Required spells: {constraints.required_spells}")
|
|
logger.info(f"[SPELL_CONSTRAINTS_DEBUG] Needed spell bitmap: {self.needed_spell_bitmap}")
|
|
|
|
async def search(self) -> AsyncGenerator[SearchResult, None]:
|
|
"""Main search entry point with streaming results."""
|
|
try:
|
|
# Phase 1: Loading items
|
|
yield SearchResult(type="phase", data={
|
|
"phase": "loading",
|
|
"message": "Loading items from database...",
|
|
"phase_number": 1,
|
|
"total_phases": 5
|
|
})
|
|
|
|
# Load and preprocess items
|
|
items = await self.load_items()
|
|
logger.info(f"Loaded {len(items)} items for optimization")
|
|
|
|
yield SearchResult(type="phase", data={
|
|
"phase": "loaded",
|
|
"message": f"Loaded {len(items)} items",
|
|
"items_count": len(items),
|
|
"phase_number": 1,
|
|
"total_phases": 5
|
|
})
|
|
yield SearchResult(type="log", data={
|
|
"level": "info",
|
|
"message": f"Loaded {len(items)} items from {len(self.constraints.characters)} characters",
|
|
"timestamp": time.time() - self.start_time
|
|
})
|
|
|
|
if not items:
|
|
yield SearchResult(type="error", data={"message": "No items found for specified characters"})
|
|
return
|
|
|
|
# Phase 2: Creating buckets
|
|
yield SearchResult(type="phase", data={
|
|
"phase": "buckets",
|
|
"message": "Creating equipment buckets...",
|
|
"phase_number": 2,
|
|
"total_phases": 5
|
|
})
|
|
|
|
# Create buckets
|
|
buckets = self.create_buckets(items)
|
|
logger.info(f"Created {len(buckets)} equipment buckets")
|
|
|
|
# Build bucket summary
|
|
bucket_summary = {b.slot: len(b.items) for b in buckets}
|
|
yield SearchResult(type="phase", data={
|
|
"phase": "buckets_done",
|
|
"message": f"Created {len(buckets)} buckets",
|
|
"bucket_count": len(buckets),
|
|
"bucket_summary": bucket_summary,
|
|
"phase_number": 2,
|
|
"total_phases": 5
|
|
})
|
|
# Log bucket details
|
|
bucket_details = ", ".join([f"{b.slot}: {len(b.items)}" for b in buckets[:5]])
|
|
yield SearchResult(type="log", data={
|
|
"level": "info",
|
|
"message": f"Buckets created: {bucket_details}{'...' if len(buckets) > 5 else ''}",
|
|
"timestamp": time.time() - self.start_time
|
|
})
|
|
|
|
# Phase 3: Applying reduction rules
|
|
yield SearchResult(type="phase", data={
|
|
"phase": "reducing",
|
|
"message": "Applying armor reduction rules...",
|
|
"phase_number": 3,
|
|
"total_phases": 5
|
|
})
|
|
|
|
# Apply armor reduction rules
|
|
buckets = self.apply_reduction_options(buckets)
|
|
|
|
# Phase 4: Sorting buckets
|
|
yield SearchResult(type="phase", data={
|
|
"phase": "sorting",
|
|
"message": "Optimizing search order...",
|
|
"phase_number": 4,
|
|
"total_phases": 5
|
|
})
|
|
|
|
# Sort buckets
|
|
buckets = self.sort_buckets(buckets)
|
|
|
|
# Start recursive search
|
|
initial_state = SuitState()
|
|
|
|
# Handle locked slots - filter out locked slots from buckets
|
|
if self.constraints.locked_slots:
|
|
# Debug: log what we received
|
|
logger.info(f"[LOCKED_SLOTS] Received locked_slots: {self.constraints.locked_slots}")
|
|
for slot, lock_info in self.constraints.locked_slots.items():
|
|
logger.info(f"[LOCKED_SLOTS] Slot '{slot}': set_id={lock_info.set_id}, spells={lock_info.spells}")
|
|
|
|
locked_slot_names = set(self.constraints.locked_slots.keys())
|
|
original_bucket_count = len(buckets)
|
|
buckets = [b for b in buckets if b.slot not in locked_slot_names]
|
|
logger.info(f"Filtered out {original_bucket_count - len(buckets)} locked slots: {locked_slot_names}")
|
|
|
|
# Calculate locked set contributions (using numeric set IDs)
|
|
self.locked_set_counts = {}
|
|
for slot, lock_info in self.constraints.locked_slots.items():
|
|
if lock_info.set_id:
|
|
# Use numeric set_id for consistency with state.set_counts
|
|
self.locked_set_counts[lock_info.set_id] = self.locked_set_counts.get(lock_info.set_id, 0) + 1
|
|
logger.info(f"[LOCKED_SLOTS] Added set_id {lock_info.set_id} from slot {slot}")
|
|
logger.info(f"Locked set contributions (by ID): {self.locked_set_counts}")
|
|
logger.info(f"[LOCKED_SLOTS] Primary set ID: {self.constraints.primary_set}, locked count for it: {self.locked_set_counts.get(self.constraints.primary_set, 0)}")
|
|
|
|
# Calculate locked spells to exclude from required
|
|
self.locked_spells = set()
|
|
for lock_info in self.constraints.locked_slots.values():
|
|
self.locked_spells.update(lock_info.spells)
|
|
logger.info(f"Locked spells (already covered): {self.locked_spells}")
|
|
|
|
# Log locked slots info
|
|
yield SearchResult(type="log", data={
|
|
"level": "info",
|
|
"message": f"Locked {len(locked_slot_names)} slots: {', '.join(locked_slot_names)}",
|
|
"timestamp": time.time() - self.start_time
|
|
})
|
|
else:
|
|
self.locked_set_counts = {}
|
|
self.locked_spells = set()
|
|
|
|
# Calculate effective set requirements (subtract locked pieces)
|
|
self.effective_primary_needed = 5 # Default for primary set
|
|
self.effective_secondary_needed = 4 # Default for secondary set
|
|
|
|
if self.constraints.primary_set:
|
|
locked_primary = self.locked_set_counts.get(self.constraints.primary_set, 0)
|
|
self.effective_primary_needed = max(0, 5 - locked_primary)
|
|
|
|
if self.constraints.secondary_set:
|
|
locked_secondary = self.locked_set_counts.get(self.constraints.secondary_set, 0)
|
|
self.effective_secondary_needed = max(0, 4 - locked_secondary)
|
|
|
|
logger.info(f"Effective requirements: {self.effective_primary_needed} primary, {self.effective_secondary_needed} secondary (after locked)")
|
|
|
|
# Log effective requirements
|
|
if self.locked_set_counts:
|
|
yield SearchResult(type="log", data={
|
|
"level": "info",
|
|
"message": f"Need: {self.effective_primary_needed} primary + {self.effective_secondary_needed} secondary pieces",
|
|
"timestamp": time.time() - self.start_time
|
|
})
|
|
|
|
# Phase 5: Searching
|
|
logger.info(f"Starting recursive search with {len(buckets)} buckets")
|
|
yield SearchResult(type="phase", data={
|
|
"phase": "searching",
|
|
"message": "Searching for optimal suits...",
|
|
"total_buckets": len(buckets),
|
|
"phase_number": 5,
|
|
"total_phases": 5
|
|
})
|
|
# Log search start summary
|
|
total_items = sum(len(b.items) for b in buckets)
|
|
yield SearchResult(type="log", data={
|
|
"level": "info",
|
|
"message": f"Starting search: {len(buckets)} buckets, {total_items} total items",
|
|
"timestamp": time.time() - self.start_time
|
|
})
|
|
|
|
logger.info("Starting async iteration over recursive search")
|
|
async for result in self.recursive_search(buckets, 0, initial_state):
|
|
yield result
|
|
|
|
logger.info(f"Recursive search completed, sending final results. Found {len(self.best_suits)} suits")
|
|
# Always send final results
|
|
yield SearchResult(
|
|
type="complete",
|
|
data={
|
|
"suits_found": len(self.best_suits),
|
|
"duration": round(time.time() - self.start_time, 2)
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Search error: {e}", exc_info=True)
|
|
yield SearchResult(type="error", data={"message": str(e)})
|
|
|
|
async def load_items_OLD_BROKEN(self) -> List[SuitItem]:
|
|
"""OLD BROKEN METHOD - REPLACED WITH API CALL"""
|
|
pass
|
|
|
|
async def load_items(self) -> List[SuitItem]:
|
|
"""Load items using the working inventory API with HTTP calls."""
|
|
logger.info("[DEBUG] load_items() method called - starting item loading process")
|
|
try:
|
|
import urllib.request
|
|
import urllib.parse
|
|
import json as json_module
|
|
import asyncio
|
|
|
|
# Get user's set names for filtering
|
|
from main import translate_equipment_set_id
|
|
primary_set_name = translate_equipment_set_id(str(self.constraints.primary_set)) if self.constraints.primary_set else None
|
|
secondary_set_name = translate_equipment_set_id(str(self.constraints.secondary_set)) if self.constraints.secondary_set else None
|
|
|
|
logger.info(f"LOADING ITEMS VIA API: Primary='{primary_set_name}', Secondary='{secondary_set_name}'")
|
|
|
|
def fetch_set_items(set_name: str) -> list:
|
|
"""Synchronous helper to fetch items for a set."""
|
|
# Determine equipment status filter
|
|
equipment_status = None
|
|
if self.constraints.include_equipped and self.constraints.include_inventory:
|
|
# Both - no filter needed (default API behavior)
|
|
equipment_status_log = "both equipped and inventory items"
|
|
elif self.constraints.include_equipped:
|
|
equipment_status = "equipped"
|
|
equipment_status_log = "equipped items only"
|
|
elif self.constraints.include_inventory:
|
|
equipment_status = "unequipped"
|
|
equipment_status_log = "inventory items only"
|
|
else:
|
|
equipment_status_log = "no items (neither equipped nor inventory selected)"
|
|
|
|
# Build URL with selected characters or all characters
|
|
base_params = []
|
|
if self.constraints.characters:
|
|
# Use comma-separated list in single characters parameter
|
|
characters_str = ",".join(self.constraints.characters)
|
|
base_params.append(f"characters={urllib.parse.quote(characters_str)}")
|
|
logger.info(f"Fetching set items for {len(self.constraints.characters)} selected characters: {self.constraints.characters}")
|
|
else:
|
|
base_params.append("include_all_characters=true")
|
|
logger.info(f"Fetching set items for ALL characters")
|
|
|
|
# Add item set filter
|
|
base_params.append(f"item_set={urllib.parse.quote(set_name)}")
|
|
|
|
# Add equipment status filter if needed
|
|
if equipment_status:
|
|
base_params.append(f"equipment_status={equipment_status}")
|
|
|
|
# Add limit
|
|
base_params.append("limit=1000")
|
|
|
|
# Build final URL - use internal port 8000 since we're calling ourselves
|
|
url = f"http://localhost:8000/search/items?{'&'.join(base_params)}"
|
|
|
|
logger.info(f"Equipment status filter: {equipment_status_log}")
|
|
logger.info(f"Fetching set items from: {url}")
|
|
|
|
with urllib.request.urlopen(url) as response:
|
|
data = json_module.load(response)
|
|
items = data.get('items', [])
|
|
logger.info(f"Set ({set_name}) with {equipment_status_log}: {len(items)} items returned")
|
|
return items
|
|
|
|
# Use the working inventory API to get items for both sets
|
|
all_api_items = []
|
|
|
|
# Primary set items
|
|
if primary_set_name:
|
|
primary_items = await asyncio.get_event_loop().run_in_executor(None, fetch_set_items, primary_set_name)
|
|
all_api_items.extend(primary_items)
|
|
|
|
# Secondary set items
|
|
if secondary_set_name:
|
|
secondary_items = await asyncio.get_event_loop().run_in_executor(None, fetch_set_items, secondary_set_name)
|
|
all_api_items.extend(secondary_items)
|
|
|
|
# Clothing items (shirts and pants) - Use separate API endpoints
|
|
def fetch_clothing_items() -> list:
|
|
"""Synchronous helper to fetch clothing items using shirt_only and pants_only endpoints."""
|
|
# Determine equipment status filter
|
|
equipment_status = None
|
|
if self.constraints.include_equipped and self.constraints.include_inventory:
|
|
equipment_status_log = "both equipped and inventory items"
|
|
elif self.constraints.include_equipped:
|
|
equipment_status = "equipped"
|
|
equipment_status_log = "equipped items only"
|
|
elif self.constraints.include_inventory:
|
|
equipment_status = "unequipped"
|
|
equipment_status_log = "inventory items only"
|
|
else:
|
|
equipment_status_log = "no items (neither equipped nor inventory selected)"
|
|
|
|
# Build base params with selected characters or all characters
|
|
def build_base_params():
|
|
base_params = []
|
|
if self.constraints.characters:
|
|
characters_str = ",".join(self.constraints.characters)
|
|
base_params.append(f"characters={urllib.parse.quote(characters_str)}")
|
|
else:
|
|
base_params.append("include_all_characters=true")
|
|
|
|
# Add equipment status filter if needed
|
|
if equipment_status:
|
|
base_params.append(f"equipment_status={equipment_status}")
|
|
|
|
# Add limit
|
|
base_params.append("limit=1000")
|
|
return base_params
|
|
|
|
all_clothing_items = []
|
|
|
|
# Fetch shirts using shirt_only endpoint - only DR3 for optimization
|
|
shirt_params = build_base_params()
|
|
shirt_params.append("shirt_only=true")
|
|
shirt_params.append("min_damage_rating=3") # Only load DR3 shirts
|
|
shirt_url = f"http://localhost:8000/search/items?{'&'.join(shirt_params)}"
|
|
|
|
logger.info(f"Fetching shirt items with {equipment_status_log}")
|
|
logger.info(f"Fetching shirts from: {shirt_url}")
|
|
|
|
try:
|
|
with urllib.request.urlopen(shirt_url) as response:
|
|
data = json_module.load(response)
|
|
shirt_items = data.get('items', [])
|
|
logger.info(f"Shirt items with {equipment_status_log}: {len(shirt_items)} items returned")
|
|
all_clothing_items.extend(shirt_items)
|
|
except Exception as e:
|
|
logger.error(f"Error fetching shirts: {e}")
|
|
|
|
# Fetch pants using pants_only endpoint - only DR3 for optimization
|
|
pants_params = build_base_params()
|
|
pants_params.append("pants_only=true")
|
|
pants_params.append("min_damage_rating=3") # Only load DR3 pants
|
|
pants_url = f"http://localhost:8000/search/items?{'&'.join(pants_params)}"
|
|
|
|
logger.info(f"Fetching pants items with {equipment_status_log}")
|
|
logger.info(f"Fetching pants from: {pants_url}")
|
|
|
|
try:
|
|
with urllib.request.urlopen(pants_url) as response:
|
|
data = json_module.load(response)
|
|
pants_items = data.get('items', [])
|
|
logger.info(f"Pants items with {equipment_status_log}: {len(pants_items)} items returned")
|
|
all_clothing_items.extend(pants_items)
|
|
except Exception as e:
|
|
logger.error(f"Error fetching pants: {e}")
|
|
|
|
logger.info(f"Total clothing items fetched: {len(all_clothing_items)}")
|
|
return all_clothing_items
|
|
|
|
# Jewelry items (rings, necklaces, bracelets, trinkets)
|
|
def fetch_all_jewelry_items() -> list:
|
|
"""Synchronous helper to fetch jewelry items with cantrips/wards."""
|
|
logger.info("[DEBUG] fetch_all_jewelry_items() function called - starting execution")
|
|
|
|
# Determine equipment status filter
|
|
equipment_status = None
|
|
if self.constraints.include_equipped and self.constraints.include_inventory:
|
|
equipment_status_log = "both equipped and inventory items"
|
|
elif self.constraints.include_equipped:
|
|
equipment_status = "equipped"
|
|
equipment_status_log = "equipped items only"
|
|
elif self.constraints.include_inventory:
|
|
equipment_status = "unequipped"
|
|
equipment_status_log = "inventory items only"
|
|
else:
|
|
equipment_status_log = "no items (neither equipped nor inventory selected)"
|
|
|
|
# Build base params with selected characters or all characters
|
|
def build_base_params():
|
|
base_params = []
|
|
if self.constraints.characters:
|
|
characters_str = ",".join(self.constraints.characters)
|
|
base_params.append(f"characters={urllib.parse.quote(characters_str)}")
|
|
else:
|
|
base_params.append("include_all_characters=true")
|
|
|
|
# Add equipment status filter if needed
|
|
if equipment_status:
|
|
base_params.append(f"equipment_status={equipment_status}")
|
|
|
|
# Add limit
|
|
base_params.append("limit=1000")
|
|
return base_params
|
|
|
|
all_jewelry_items = []
|
|
|
|
# Fetch each jewelry type separately using slot_names filter
|
|
# This ensures we get all rings, bracelets, etc. instead of just the first page of all jewelry
|
|
jewelry_slot_types = [
|
|
("Ring", "rings"),
|
|
("Bracelet", "bracelets"),
|
|
("Neck", "necklaces/amulets"),
|
|
("Trinket", "trinkets")
|
|
]
|
|
|
|
for slot_filter, slot_description in jewelry_slot_types:
|
|
jewelry_params = build_base_params()
|
|
jewelry_params.append("jewelry_only=true")
|
|
jewelry_params.append(f"slot_names={slot_filter}")
|
|
jewelry_url = f"http://localhost:8000/search/items?{'&'.join(jewelry_params)}"
|
|
|
|
logger.info(f"Fetching {slot_description} with {equipment_status_log}")
|
|
|
|
try:
|
|
with urllib.request.urlopen(jewelry_url) as response:
|
|
data = json_module.load(response)
|
|
items = data.get('items', [])
|
|
logger.info(f"Fetched {len(items)} {slot_description}")
|
|
all_jewelry_items.extend(items)
|
|
except Exception as e:
|
|
logger.error(f"Error fetching {slot_description}: {e}")
|
|
|
|
logger.info(f"Total jewelry items fetched: {len(all_jewelry_items)}")
|
|
return all_jewelry_items
|
|
|
|
# Fetch clothing items
|
|
clothing_items = await asyncio.get_event_loop().run_in_executor(None, fetch_clothing_items)
|
|
all_api_items.extend(clothing_items)
|
|
|
|
# Fetch jewelry items
|
|
logger.info("[DEBUG] About to call fetch_all_jewelry_items() with asyncio executor")
|
|
try:
|
|
jewelry_items = await asyncio.get_event_loop().run_in_executor(None, fetch_all_jewelry_items)
|
|
logger.info(f"[DEBUG] fetch_all_jewelry_items() completed successfully, returned {len(jewelry_items)} items")
|
|
all_api_items.extend(jewelry_items)
|
|
except Exception as e:
|
|
logger.error(f"[DEBUG] Exception in fetch_all_jewelry_items() executor: {e}", exc_info=True)
|
|
jewelry_items = [] # Continue with empty list if jewelry fetch fails
|
|
|
|
logger.info(f"Total items from inventory API: {len(all_api_items)}")
|
|
|
|
# Helper function to normalize spell_names to list format
|
|
def normalize_spell_names(spell_data):
|
|
"""Convert spell_names to list format regardless of input type.
|
|
|
|
The API may return spell_names as:
|
|
- A list of spell names (correct format)
|
|
- A comma-separated string of spell names or IDs
|
|
- None or empty
|
|
|
|
This ensures we always work with a list.
|
|
"""
|
|
if spell_data is None:
|
|
return []
|
|
if isinstance(spell_data, list):
|
|
return spell_data
|
|
if isinstance(spell_data, str) and spell_data.strip():
|
|
# Split comma-separated values and clean up
|
|
return [s.strip() for s in spell_data.split(',') if s.strip()]
|
|
return []
|
|
|
|
# Convert to SuitItem objects
|
|
items = []
|
|
for api_item in all_api_items:
|
|
# The inventory API provides rich data with all fields we need!
|
|
# Use character_name + name as unique identifier since API doesn't return ID
|
|
unique_id = f"{api_item['character_name']}_{api_item['name']}"
|
|
|
|
# Parse coverage data for armor reduction
|
|
coverage_value = api_item.get('coverage_mask', 0)
|
|
coverage = CoverageMask(coverage_value) if coverage_value else None
|
|
|
|
# Use computed_slot_name from API if available, otherwise fallback to slot_name
|
|
slot_name = api_item.get('computed_slot_name') or api_item.get('slot_name', 'Unknown')
|
|
|
|
# For underclothes, ensure we get simple slot names
|
|
if api_item.get('object_class') == 3: # Clothing
|
|
coverage_mask = api_item.get('coverage_mask', 0)
|
|
if coverage_mask == 104: # Shirt pattern
|
|
slot_name = "Shirt"
|
|
elif coverage_mask in [19, 22]: # Pants/breeches patterns
|
|
slot_name = "Pants"
|
|
|
|
suit_item = SuitItem(
|
|
id=hash(unique_id), # Generate ID from character + item name
|
|
name=api_item['name'],
|
|
character_name=api_item['character_name'],
|
|
slot=slot_name, # Use computed slot or coverage-based slot for underclothes
|
|
coverage=coverage, # Now properly loaded from API
|
|
set_id=self._convert_set_name_to_id(api_item.get('item_set')), # Convert set name to numeric ID
|
|
armor_level=api_item.get('armor_level', 0),
|
|
ratings={
|
|
'crit_damage_rating': api_item.get('crit_damage_rating') if api_item.get('crit_damage_rating') is not None else 0,
|
|
'damage_rating': api_item.get('damage_rating') if api_item.get('damage_rating') is not None else 0,
|
|
'damage_resist_rating': api_item.get('damage_resist_rating') if api_item.get('damage_resist_rating') is not None else 0,
|
|
'crit_damage_resist_rating': api_item.get('crit_damage_resist_rating') if api_item.get('crit_damage_resist_rating') is not None else 0,
|
|
'heal_boost_rating': api_item.get('heal_boost_rating') if api_item.get('heal_boost_rating') is not None else 0,
|
|
'vitality_rating': api_item.get('vitality_rating') if api_item.get('vitality_rating') is not None else 0
|
|
},
|
|
spell_bitmap=0, # Will calculate if needed
|
|
spell_names=normalize_spell_names(api_item.get('spell_names')),
|
|
material=api_item.get('material_name', '')
|
|
)
|
|
items.append(suit_item)
|
|
|
|
# Log comprehensive stats
|
|
slot_counts = {}
|
|
set_counts = {}
|
|
for item in items:
|
|
slot_counts[item.slot] = slot_counts.get(item.slot, 0) + 1
|
|
if item.set_id:
|
|
set_counts[item.set_id] = set_counts.get(item.set_id, 0) + 1
|
|
|
|
logger.info(f"LOADED FROM API: {len(items)} total items")
|
|
logger.info(f"SLOT DISTRIBUTION: {slot_counts}")
|
|
logger.info(f"SET DISTRIBUTION: {set_counts}")
|
|
|
|
# Calculate spell bitmaps for all items
|
|
spell_items_count = 0
|
|
for item in items:
|
|
if item.spell_names:
|
|
item.spell_bitmap = self.spell_index.get_bitmap(item.spell_names)
|
|
spell_items_count += 1
|
|
logger.info(f"[SPELL] {item.name}: {item.spell_names} -> bitmap {item.spell_bitmap}")
|
|
|
|
# CRITICAL DEBUG: Check for Legendary Two Handed Combat specifically
|
|
if "Legendary Two Handed Combat" in item.spell_names:
|
|
logger.info(f"[LEGENDARY_TWO_HANDED_DEBUG] Found item with Legendary Two Handed Combat: {item.name} (set_id: {item.set_id}, spell_bitmap: {item.spell_bitmap})")
|
|
|
|
logger.info(f"SPELL PROCESSING: {spell_items_count} items with spells processed")
|
|
|
|
# Apply pre-filtering to remove dominated items
|
|
filtered_items = ItemPreFilter.remove_surpassed_items(items)
|
|
|
|
# Sort items for optimal search order
|
|
# Define slot sets
|
|
armor_slot_set = {
|
|
"Head", "Chest", "Upper Arms", "Lower Arms", "Hands",
|
|
"Abdomen", "Upper Legs", "Lower Legs", "Feet"
|
|
}
|
|
jewelry_slot_set = {
|
|
"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"
|
|
}
|
|
# Also match generic jewelry slot names that might come from API
|
|
jewelry_fallback_slots = {"Ring", "Bracelet", "Jewelry", "Necklace", "Amulet"}
|
|
clothing_slot_set = {"Shirt", "Pants"}
|
|
|
|
# Helper to check if item matches any slot in a set (handles multi-slot items)
|
|
def matches_slot_set(item_slot: str, slot_set: set, fallback_set: set = None) -> bool:
|
|
if item_slot in slot_set:
|
|
return True
|
|
# Handle multi-slot items like "Left Wrist, Right Wrist"
|
|
if ', ' in item_slot:
|
|
return any(s.strip() in slot_set for s in item_slot.split(', '))
|
|
# Check fallback set for generic names like "Ring", "Jewelry"
|
|
if fallback_set and item_slot in fallback_set:
|
|
return True
|
|
return False
|
|
|
|
armor_items = [item for item in filtered_items if matches_slot_set(item.slot, armor_slot_set)]
|
|
jewelry_items = [item for item in filtered_items if matches_slot_set(item.slot, jewelry_slot_set, jewelry_fallback_slots)]
|
|
clothing_items = [item for item in filtered_items if matches_slot_set(item.slot, clothing_slot_set)]
|
|
|
|
# Sort armor by spell count (most spells first) since armor level deprioritized
|
|
# Include (character_name, name) as stable tiebreakers for deterministic ordering
|
|
armor_items.sort(key=lambda x: (len(x.spell_names), x.character_name, x.name), reverse=True)
|
|
|
|
# Sort jewelry by spell count (most spells first)
|
|
jewelry_items.sort(key=lambda x: (len(x.spell_names), x.character_name, x.name), reverse=True)
|
|
|
|
# Sort clothing by damage rating (highest first)
|
|
clothing_items.sort(key=lambda x: (x.ratings.get('damage_rating', 0), x.character_name, x.name), reverse=True)
|
|
|
|
# Recombine in optimized order
|
|
optimized_items = armor_items + jewelry_items + clothing_items
|
|
|
|
# DETERMINISM CHECK - Log first 5 items of each type to verify consistent ordering
|
|
logger.info("DETERMINISM CHECK - First 5 armor items:")
|
|
for i, item in enumerate(armor_items[:5]):
|
|
logger.info(f" {i}: {item.character_name}/{item.name} spells={len(item.spell_names)}")
|
|
logger.info("DETERMINISM CHECK - First 5 jewelry items:")
|
|
for i, item in enumerate(jewelry_items[:5]):
|
|
logger.info(f" {i}: {item.character_name}/{item.name} spells={len(item.spell_names)}")
|
|
|
|
logger.info(f"ITEM SORTING: {len(armor_items)} armor, {len(jewelry_items)} jewelry, {len(clothing_items)} clothing")
|
|
|
|
# Debug: Log jewelry items with spells
|
|
jewelry_with_spells = [item for item in jewelry_items if item.spell_names]
|
|
logger.info(f"JEWELRY WITH SPELLS: {len(jewelry_with_spells)} items")
|
|
for item in jewelry_with_spells[:5]: # Log first 5
|
|
logger.info(f" - {item.name} (slot: {item.slot}): {item.spell_names}")
|
|
|
|
return optimized_items
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calling inventory API: {e}", exc_info=True)
|
|
raise HTTPException(status_code=500, detail=f"Failed to load items from API: {str(e)}")
|
|
|
|
# Removed _determine_equipment_slot - now using SQL computed slots from main service
|
|
|
|
def create_buckets(self, items: List[SuitItem]) -> List[ItemBucket]:
|
|
"""Group items by equipment slot."""
|
|
# Define all possible slots (ARMOR + JEWELRY + CLOTHING)
|
|
all_slots = [
|
|
# Armor slots (9)
|
|
"Head", "Chest", "Upper Arms", "Lower Arms", "Hands",
|
|
"Abdomen", "Upper Legs", "Lower Legs", "Feet",
|
|
# Jewelry slots (6)
|
|
"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket",
|
|
# Clothing slots (2)
|
|
"Shirt", "Pants"
|
|
]
|
|
|
|
armor_slots = {
|
|
"Head", "Chest", "Upper Arms", "Lower Arms", "Hands",
|
|
"Abdomen", "Upper Legs", "Lower Legs", "Feet"
|
|
}
|
|
|
|
jewelry_slots = {
|
|
"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"
|
|
}
|
|
|
|
# Group items by slot
|
|
slot_items = {slot: [] for slot in all_slots}
|
|
|
|
for item in items:
|
|
# Special debugging for problematic robe
|
|
if "Empowered Robe of the Perfect Light" in item.name:
|
|
logger.info(f"[DEBUG] Analyzing problematic robe: {item.name}")
|
|
if item.coverage:
|
|
logger.info(f"[DEBUG] - Coverage mask: 0x{item.coverage.value:08X}")
|
|
logger.info(f"[DEBUG] - Coverage bits count: {bin(item.coverage.value).count('1')}")
|
|
logger.info(f"[DEBUG] - Matches exact robe pattern (0x00013F00): {item.coverage.value == 0x00013F00}")
|
|
logger.info(f"[DEBUG] - is_robe() result: {item.coverage.is_robe()}")
|
|
else:
|
|
logger.info(f"[DEBUG] - Coverage mask: None")
|
|
logger.info(f"[DEBUG] - Slot: {item.slot}")
|
|
logger.info(f"[DEBUG] - Coverage object: {item.coverage}")
|
|
|
|
# Skip robe detection - user's chosen armor sets don't include robes
|
|
# If user specifically chooses a set that contains robes, they should get those items
|
|
|
|
# Handle both single-slot and multi-slot items
|
|
if item.slot in slot_items:
|
|
# Single slot item - direct assignment
|
|
slot_items[item.slot].append(item)
|
|
elif ', ' in item.slot:
|
|
# Multi-slot item - create single-slot variants for each applicable slot
|
|
possible_slots = [s.strip() for s in item.slot.split(', ')]
|
|
added_to_slots = []
|
|
for possible_slot in possible_slots:
|
|
if possible_slot in slot_items:
|
|
# Create a single-slot variant of the item
|
|
single_slot_item = SuitItem(
|
|
id=item.id,
|
|
name=item.name,
|
|
character_name=item.character_name,
|
|
slot=possible_slot, # Single slot assignment
|
|
coverage=item.coverage,
|
|
set_id=item.set_id,
|
|
armor_level=item.armor_level,
|
|
ratings=item.ratings.copy(),
|
|
spell_bitmap=item.spell_bitmap,
|
|
spell_names=item.spell_names.copy(),
|
|
material=item.material
|
|
)
|
|
slot_items[possible_slot].append(single_slot_item)
|
|
added_to_slots.append(possible_slot)
|
|
|
|
if added_to_slots:
|
|
logger.debug(f"Multi-slot item {item.name} split into slots: {added_to_slots}")
|
|
else:
|
|
logger.warning(f"Multi-slot item {item.name} with slots '{item.slot}' couldn't be mapped to any valid slots")
|
|
else:
|
|
# Handle generic jewelry slot names that need expansion
|
|
generic_jewelry_expansion = {
|
|
"Ring": ["Left Ring", "Right Ring"],
|
|
"Bracelet": ["Left Wrist", "Right Wrist"],
|
|
"Jewelry": ["Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"],
|
|
"Necklace": ["Neck"],
|
|
"Amulet": ["Neck"],
|
|
}
|
|
|
|
mapped_slots = []
|
|
|
|
# Check for generic jewelry slot names first
|
|
if item.slot in generic_jewelry_expansion:
|
|
for target_slot in generic_jewelry_expansion[item.slot]:
|
|
single_slot_item = SuitItem(
|
|
id=item.id,
|
|
name=item.name,
|
|
character_name=item.character_name,
|
|
slot=target_slot,
|
|
coverage=item.coverage,
|
|
set_id=item.set_id,
|
|
armor_level=item.armor_level,
|
|
ratings=item.ratings.copy(),
|
|
spell_bitmap=item.spell_bitmap,
|
|
spell_names=item.spell_names.copy(),
|
|
material=item.material
|
|
)
|
|
slot_items[target_slot].append(single_slot_item)
|
|
mapped_slots.append(target_slot)
|
|
logger.debug(f"Generic jewelry slot '{item.slot}' expanded to: {mapped_slots} for {item.name}")
|
|
else:
|
|
# Check for complex slot patterns that might not use comma separation
|
|
# Handle items with complex slot descriptions that the SQL computed incorrectly
|
|
for known_slot in all_slots:
|
|
if known_slot.lower() in item.slot.lower():
|
|
# Create a single-slot variant of the item
|
|
single_slot_item = SuitItem(
|
|
id=item.id,
|
|
name=item.name,
|
|
character_name=item.character_name,
|
|
slot=known_slot, # Single slot assignment
|
|
coverage=item.coverage,
|
|
set_id=item.set_id,
|
|
armor_level=item.armor_level,
|
|
ratings=item.ratings.copy(),
|
|
spell_bitmap=item.spell_bitmap,
|
|
spell_names=item.spell_names.copy(),
|
|
material=item.material
|
|
)
|
|
slot_items[known_slot].append(single_slot_item)
|
|
mapped_slots.append(known_slot)
|
|
|
|
if mapped_slots:
|
|
logger.debug(f"Complex slot item {item.name} split into slots: {mapped_slots} (original: '{item.slot}')")
|
|
else:
|
|
logger.warning(f"Unknown slot '{item.slot}' for item {item.name} - could not map to any known slots")
|
|
|
|
# Create buckets - CRITICAL: Create ALL buckets even if empty (MagSuitBuilder behavior)
|
|
buckets = []
|
|
for slot in all_slots:
|
|
bucket = ItemBucket(
|
|
slot=slot,
|
|
items=slot_items[slot],
|
|
is_armor=(slot in armor_slots),
|
|
is_required=False # We'll mark required slots later based on constraints
|
|
)
|
|
|
|
# Sort items within bucket by priority
|
|
bucket.sort_items()
|
|
|
|
# ALWAYS add bucket - even empty ones (required for complete search)
|
|
buckets.append(bucket)
|
|
if len(bucket.items) > 0:
|
|
logger.info(f"Created bucket for {slot} with {len(bucket.items)} items")
|
|
else:
|
|
logger.info(f"Created EMPTY bucket for {slot} (will allow incomplete suits)")
|
|
|
|
# Sort buckets: armor first, then by item count (MagSuitBuilder ArmorSearcher.cs:95-100)
|
|
buckets.sort(key=lambda b: (
|
|
0 if b.is_armor else 1, # Armor buckets first
|
|
len(b.items) # Then by item count (smallest first for better pruning)
|
|
))
|
|
|
|
logger.info(f"CREATED {len(buckets)} total buckets (including {len([b for b in buckets if len(b.items) == 0])} empty)")
|
|
logger.info(f"BUCKET ORDER: {[f'{b.slot}({len(b.items)})' for b in buckets]}")
|
|
|
|
# Calculate total armor buckets with items for Mag-SuitBuilder pruning
|
|
self.total_armor_buckets_with_items = sum(
|
|
1 for b in buckets if b.is_armor and len(b.items) > 0
|
|
)
|
|
logger.info(f"ARMOR BUCKETS WITH ITEMS: {self.total_armor_buckets_with_items}")
|
|
|
|
# Log first 3 items of first non-empty bucket for determinism verification
|
|
for bucket in buckets:
|
|
if len(bucket.items) > 0:
|
|
logger.info(f"FIRST BUCKET ({bucket.slot}) - First 3 items:")
|
|
for i, item in enumerate(bucket.items[:3]):
|
|
logger.info(f" {i}: {item.character_name}/{item.name}")
|
|
break
|
|
|
|
return buckets
|
|
|
|
def apply_reduction_options(self, buckets: List[ItemBucket]) -> List[ItemBucket]:
|
|
"""Apply armor reduction rules for multi-coverage items."""
|
|
new_buckets = []
|
|
|
|
for bucket in buckets:
|
|
if not bucket.is_armor:
|
|
# Non-armor items don't need reduction
|
|
new_buckets.append(bucket)
|
|
continue
|
|
|
|
# Process armor items for potential reduction
|
|
original_items = []
|
|
reducible_items = []
|
|
|
|
for item in bucket.items:
|
|
# Debug logging for reduction evaluation
|
|
has_coverage = item.coverage is not None
|
|
has_material = hasattr(item, 'material') and item.material
|
|
reduction_options = item.coverage.reduction_options() if item.coverage else []
|
|
|
|
logger.info(f"Reduction check for '{item.name}' in {bucket.slot}: "
|
|
f"coverage={item.coverage.value if item.coverage else None}, "
|
|
f"material='{item.material}', "
|
|
f"reductions={len(reduction_options)}")
|
|
|
|
# Check if item can be reduced based on Mag-SuitBuilder rules:
|
|
# 1. Must have coverage data
|
|
# 2. Must have material (only loot-generated items can be tailored)
|
|
# 3. Must have valid reduction options
|
|
if (item.coverage and
|
|
hasattr(item, 'material') and item.material and # Only items with materials can be tailored
|
|
item.coverage.reduction_options()):
|
|
# Item can be reduced - we'll add it to multiple buckets
|
|
logger.info(f"Item '{item.name}' is reducible to: {[r.to_slot_name() for r in reduction_options]}")
|
|
reducible_items.append(item)
|
|
else:
|
|
# Item fits exactly in this slot or cannot be reduced
|
|
original_items.append(item)
|
|
|
|
# Keep original items in this bucket
|
|
if original_items or not reducible_items:
|
|
new_bucket = ItemBucket(
|
|
slot=bucket.slot,
|
|
items=original_items,
|
|
is_armor=bucket.is_armor,
|
|
is_required=bucket.is_required
|
|
)
|
|
new_bucket.sort_items()
|
|
new_buckets.append(new_bucket)
|
|
|
|
# Add reducible items to appropriate buckets
|
|
for item in reducible_items:
|
|
reduction_options = item.coverage.reduction_options()
|
|
|
|
for reduced_coverage in reduction_options:
|
|
reduced_slot = reduced_coverage.to_slot_name()
|
|
if not reduced_slot:
|
|
continue
|
|
|
|
# Create a reduced version of the item
|
|
reduced_item = SuitItem(
|
|
id=item.id,
|
|
name=f"{item.name} (tailored to {reduced_slot})",
|
|
character_name=item.character_name,
|
|
slot=reduced_slot,
|
|
coverage=reduced_coverage,
|
|
set_id=item.set_id,
|
|
armor_level=item.armor_level,
|
|
ratings=item.ratings.copy(),
|
|
spell_bitmap=item.spell_bitmap,
|
|
spell_names=item.spell_names.copy(),
|
|
is_locked=item.is_locked,
|
|
material=item.material
|
|
)
|
|
|
|
# Find or create bucket for this slot
|
|
target_bucket = None
|
|
for existing_bucket in new_buckets:
|
|
if existing_bucket.slot == reduced_slot:
|
|
target_bucket = existing_bucket
|
|
break
|
|
|
|
if not target_bucket:
|
|
target_bucket = ItemBucket(
|
|
slot=reduced_slot,
|
|
items=[],
|
|
is_armor=True,
|
|
is_required=False
|
|
)
|
|
new_buckets.append(target_bucket)
|
|
|
|
target_bucket.items.append(reduced_item)
|
|
|
|
# Re-sort all buckets after adding reduced items
|
|
for bucket in new_buckets:
|
|
bucket.sort_items()
|
|
|
|
# Count reduction statistics
|
|
original_items_count = sum(len(bucket.items) for bucket in buckets)
|
|
new_items_count = sum(len(bucket.items) for bucket in new_buckets)
|
|
|
|
logger.info(f"Applied reductions: {len(buckets)} original buckets -> {len(new_buckets)} buckets")
|
|
logger.info(f"Item count: {original_items_count} original -> {new_items_count} total (including reductions)")
|
|
return new_buckets
|
|
|
|
def sort_buckets(self, buckets: List[ItemBucket]) -> List[ItemBucket]:
|
|
"""Sort buckets for optimal search order and prioritize user's chosen sets."""
|
|
|
|
# First, sort items within each bucket to prioritize user's chosen sets
|
|
for bucket in buckets:
|
|
bucket.items.sort(key=lambda item: (
|
|
# Priority 1: User's primary set
|
|
0 if item.set_id == self.constraints.primary_set else
|
|
# Priority 2: User's secondary set
|
|
1 if item.set_id == self.constraints.secondary_set else
|
|
# Priority 3: Other items
|
|
2,
|
|
-item.ratings.get('crit_damage_rating', 0), # Higher crit damage first (CD2 > CD1 > CD0)
|
|
-item.ratings.get('damage_rating', 0), # Higher damage rating next
|
|
-item.armor_level # Higher armor within same priority
|
|
))
|
|
|
|
# Prioritize core armor slots, then jewelry, then clothing
|
|
core_armor_priority = ['Chest', 'Head', 'Hands', 'Feet', 'Upper Arms', 'Lower Arms', 'Abdomen', 'Upper Legs', 'Lower Legs']
|
|
jewelry_slots = ['Neck', 'Left Ring', 'Right Ring', 'Left Wrist', 'Right Wrist', 'Trinket']
|
|
clothing_slots = ['Shirt', 'Pants']
|
|
|
|
# Sort buckets by priority order and item count
|
|
def bucket_priority(bucket):
|
|
if bucket.slot in core_armor_priority:
|
|
return (0, core_armor_priority.index(bucket.slot), len(bucket.items))
|
|
elif bucket.slot in jewelry_slots:
|
|
return (1, jewelry_slots.index(bucket.slot), len(bucket.items))
|
|
elif bucket.slot in clothing_slots:
|
|
return (2, clothing_slots.index(bucket.slot), len(bucket.items))
|
|
else:
|
|
return (3, 0, len(bucket.items))
|
|
|
|
sorted_buckets = sorted(buckets, key=bucket_priority)
|
|
logger.info(f"Bucket search order: {[f'{b.slot}({len(b.items)})' for b in sorted_buckets[:10]]}")
|
|
return sorted_buckets
|
|
|
|
async def recursive_search(self, buckets: List[ItemBucket], bucket_idx: int,
|
|
state: SuitState) -> AsyncGenerator[SearchResult, None]:
|
|
"""Depth-first search with pruning and streaming."""
|
|
# Check for cancellation
|
|
if self.is_cancelled and await self.is_cancelled():
|
|
logger.info("Search cancelled by client")
|
|
return
|
|
|
|
# Early success detection - stop when user's set goals are achieved
|
|
primary_count = state.set_counts.get(self.constraints.primary_set, 0) if self.constraints.primary_set else 0
|
|
secondary_count = state.set_counts.get(self.constraints.secondary_set, 0) if self.constraints.secondary_set else 0
|
|
|
|
# Branch pruning - Mag-SuitBuilder style aggressive pruning (ArmorSearcher.cs:138)
|
|
# Formula: if (builder.Count + 1 < highestArmorCountSuitBuilt - (totalArmorBucketsWithItems - min(index, totalArmorBucketsWithItems)))
|
|
# This prunes branches where the best possible suit can't compete with what we've found
|
|
if self.highest_armor_count_suit_built > 0:
|
|
current_count = len(state.items)
|
|
# Calculate remaining armor slots that could be filled
|
|
remaining_armor_potential = self.total_armor_buckets_with_items - min(bucket_idx, self.total_armor_buckets_with_items)
|
|
# Minimum required: highest seen minus potential remaining (gives us 1-piece buffer)
|
|
min_required = self.highest_armor_count_suit_built - remaining_armor_potential
|
|
# If we can't even hit minimum required with 1 more piece, prune
|
|
if current_count + 1 < min_required:
|
|
return
|
|
|
|
# Also keep the simpler max-items pruning as backup
|
|
remaining_buckets = len(buckets) - bucket_idx
|
|
max_possible_items = len(state.items) + remaining_buckets
|
|
if self.best_suit_item_count > 0 and max_possible_items < self.best_suit_item_count:
|
|
return
|
|
|
|
# Base case: all buckets processed
|
|
if bucket_idx >= len(buckets):
|
|
logger.debug(f"[DEBUG] BASE CASE: All {len(buckets)} buckets processed, state has {len(state.items)} items")
|
|
suit = self.finalize_suit(state)
|
|
if suit:
|
|
logger.debug(f"[DEBUG] Suit created with score {suit.score}, {len(suit.items)} items")
|
|
if self.is_better_than_existing(suit):
|
|
logger.debug(f"[DEBUG] Suit ACCEPTED: score {suit.score} is better than existing")
|
|
logger.info(f"Found suit with score {suit.score}: {len(suit.items)} items")
|
|
self.best_suits.append(suit)
|
|
# Update best suit item count for pruning
|
|
if len(suit.items) > self.best_suit_item_count:
|
|
self.best_suit_item_count = len(suit.items)
|
|
# Update armor piece count for Mag-SuitBuilder style pruning
|
|
armor_slots = {"Head", "Chest", "Upper Arms", "Lower Arms", "Hands",
|
|
"Abdomen", "Upper Legs", "Lower Legs", "Feet"}
|
|
armor_piece_count = sum(1 for slot in suit.items.keys() if slot in armor_slots)
|
|
if armor_piece_count > self.highest_armor_count_suit_built:
|
|
self.highest_armor_count_suit_built = armor_piece_count
|
|
logger.info(f"[PRUNING] New highest armor count: {armor_piece_count}")
|
|
self.best_suits.sort(key=lambda s: s.score, reverse=True)
|
|
self.best_suits = self.best_suits[:self.constraints.max_results]
|
|
|
|
# Pass constraint info to to_dict for proper set counts
|
|
suit_data = suit.to_dict()
|
|
from main import translate_equipment_set_id
|
|
primary_set_name = translate_equipment_set_id(str(self.constraints.primary_set)) if self.constraints.primary_set else None
|
|
secondary_set_name = translate_equipment_set_id(str(self.constraints.secondary_set)) if self.constraints.secondary_set else None
|
|
|
|
# FIXED: set_counts uses numeric keys, not string names
|
|
primary_count = suit.set_counts.get(self.constraints.primary_set, 0) if self.constraints.primary_set else 0
|
|
secondary_count = suit.set_counts.get(self.constraints.secondary_set, 0) if self.constraints.secondary_set else 0
|
|
|
|
# Add locked slot contributions to the counts
|
|
if hasattr(self, 'locked_set_counts'):
|
|
primary_count += self.locked_set_counts.get(self.constraints.primary_set, 0) if self.constraints.primary_set else 0
|
|
secondary_count += self.locked_set_counts.get(self.constraints.secondary_set, 0) if self.constraints.secondary_set else 0
|
|
|
|
# Get locked counts for breakdown
|
|
locked_primary = self.locked_set_counts.get(self.constraints.primary_set, 0) if hasattr(self, 'locked_set_counts') and self.constraints.primary_set else 0
|
|
locked_secondary = self.locked_set_counts.get(self.constraints.secondary_set, 0) if hasattr(self, 'locked_set_counts') and self.constraints.secondary_set else 0
|
|
|
|
suit_data['stats']['primary_set_count'] = primary_count # Total (found + locked)
|
|
suit_data['stats']['secondary_set_count'] = secondary_count # Total (found + locked)
|
|
suit_data['stats']['primary_set'] = primary_set_name
|
|
suit_data['stats']['secondary_set'] = secondary_set_name
|
|
suit_data['stats']['locked_slots'] = len(self.constraints.locked_slots) if self.constraints.locked_slots else 0
|
|
suit_data['stats']['primary_locked'] = locked_primary
|
|
suit_data['stats']['secondary_locked'] = locked_secondary
|
|
|
|
yield SearchResult(type="suit", data=suit_data)
|
|
|
|
# Send log event for suit found
|
|
yield SearchResult(type="log", data={
|
|
"level": "success",
|
|
"message": f"Found suit #{len(self.best_suits)} with score {suit.score} ({len(suit.items)} items)",
|
|
"timestamp": time.time() - self.start_time
|
|
})
|
|
else:
|
|
logger.debug(f"[DEBUG] Suit REJECTED: score {suit.score} not better than existing")
|
|
else:
|
|
logger.debug(f"[DEBUG] No suit created from current state")
|
|
return
|
|
|
|
# Progress update and debug info
|
|
self.suits_evaluated += 1
|
|
if self.suits_evaluated % 100 == 0: # Every 100 evaluations for reduced log spam
|
|
# Check for cancellation during progress update
|
|
if self.is_cancelled and await self.is_cancelled():
|
|
logger.info("Search cancelled during progress update")
|
|
return
|
|
|
|
elapsed = time.time() - self.start_time
|
|
rate = round(self.suits_evaluated / elapsed, 1) if elapsed > 0 else 0
|
|
best_score = self.best_suits[0].score if self.best_suits else 0
|
|
current_bucket_name = buckets[bucket_idx].slot if bucket_idx < len(buckets) else None
|
|
|
|
logger.info(f"Search progress: evaluated {self.suits_evaluated}, depth {bucket_idx}/{len(buckets)}, found {len(self.best_suits)} suits, best has {self.best_suit_item_count} items")
|
|
yield SearchResult(
|
|
type="progress",
|
|
data={
|
|
"evaluated": self.suits_evaluated,
|
|
"found": len(self.best_suits),
|
|
"current_depth": bucket_idx,
|
|
"total_buckets": len(buckets),
|
|
"current_items": len(state.items),
|
|
"elapsed": elapsed,
|
|
"rate": rate,
|
|
"current_bucket": current_bucket_name,
|
|
"best_score": best_score
|
|
}
|
|
)
|
|
|
|
# Send verbose log every 500 evaluations
|
|
if self.suits_evaluated % 500 == 0:
|
|
yield SearchResult(type="log", data={
|
|
"level": "info",
|
|
"message": f"Evaluated {self.suits_evaluated:,} combinations | Bucket: {current_bucket_name} ({bucket_idx+1}/{len(buckets)}) | Rate: {rate}/s",
|
|
"timestamp": elapsed
|
|
})
|
|
|
|
bucket = buckets[bucket_idx]
|
|
|
|
# DEBUG: Log bucket processing (reduced verbosity)
|
|
logger.debug(f"[DEBUG] Processing bucket {bucket_idx}: {bucket.slot} with {len(bucket.items)} items")
|
|
|
|
# Try each item in current bucket
|
|
items_tried = 0
|
|
items_accepted = 0
|
|
for item in bucket.items:
|
|
items_tried += 1
|
|
logger.debug(f"[DEBUG] Trying item {items_tried}/{len(bucket.items)} in {bucket.slot}: {item.name}")
|
|
|
|
if self.can_add_item(item, state):
|
|
items_accepted += 1
|
|
logger.debug(f"[DEBUG] Item ACCEPTED: {item.name} (#{items_accepted})")
|
|
|
|
# Add item to state
|
|
state.push(item)
|
|
|
|
# Continue search with next bucket
|
|
async for result in self.recursive_search(buckets, bucket_idx + 1, state):
|
|
yield result
|
|
|
|
# Remove item from state (backtrack)
|
|
state.pop(item.slot)
|
|
else:
|
|
logger.debug(f"[DEBUG] Item REJECTED: {item.name}")
|
|
|
|
logger.debug(f"[DEBUG] Bucket {bucket.slot} summary: {items_tried} tried, {items_accepted} accepted")
|
|
|
|
# Only skip if no items were accepted (allows incomplete suits when no valid items exist)
|
|
# If items were accepted, we already explored those paths - don't also explore skip
|
|
if items_accepted == 0:
|
|
logger.debug(f"[DEBUG] No items accepted for {bucket.slot}, trying skip")
|
|
async for result in self.recursive_search(buckets, bucket_idx + 1, state):
|
|
yield result
|
|
|
|
def can_add_item(self, item: SuitItem, state: SuitState) -> bool:
|
|
"""Check if item can be added without violating constraints."""
|
|
# Import translation function
|
|
def translate_equipment_set_id(set_id: str) -> str:
|
|
import main
|
|
if not hasattr(main, 'ENUM_MAPPINGS') or main.ENUM_MAPPINGS is None:
|
|
return f"Set {set_id}"
|
|
dictionaries = main.ENUM_MAPPINGS.get('dictionaries', {})
|
|
attribute_set_info = dictionaries.get('AttributeSetInfo', {}).get('values', {})
|
|
set_name = attribute_set_info.get(str(set_id))
|
|
return set_name if set_name else str(set_id)
|
|
|
|
# 1. Slot availability
|
|
if item.slot in state.occupied_slots:
|
|
logger.debug(f"[DEBUG] REJECT {item.name}: slot {item.slot} already occupied")
|
|
return False
|
|
|
|
# 2. Item uniqueness - same physical item can't be used in multiple slots
|
|
for existing_item in state.items.values():
|
|
if existing_item.id == item.id:
|
|
logger.debug(f"[DEBUG] REJECT {item.name}: item already used (duplicate ID)")
|
|
return False
|
|
|
|
# 3. Set piece validation - Use EFFECTIVE limits (account for locked slots)
|
|
if item.set_id:
|
|
# Convert item.set_id to numeric for comparison (it might be string or int)
|
|
try:
|
|
item_set_numeric = int(item.set_id) if isinstance(item.set_id, str) and item.set_id.isdigit() else item.set_id
|
|
except:
|
|
item_set_numeric = item.set_id
|
|
|
|
current_count = state.set_counts.get(item_set_numeric, 0)
|
|
|
|
# Use effective limits which account for locked slots
|
|
eff_primary = getattr(self, 'effective_primary_needed', 5)
|
|
eff_secondary = getattr(self, 'effective_secondary_needed', 4)
|
|
|
|
if item_set_numeric == self.constraints.primary_set:
|
|
# Primary set: use effective limit (accounts for locked pieces)
|
|
if current_count >= eff_primary:
|
|
logger.info(f"[SET_LIMIT] REJECT {item.name}: primary set {item_set_numeric} already has {current_count} pieces (effective max {eff_primary})")
|
|
return False
|
|
elif item_set_numeric == self.constraints.secondary_set:
|
|
# Secondary set: use effective limit (accounts for locked pieces)
|
|
if current_count >= eff_secondary:
|
|
logger.info(f"[SET_LIMIT] REJECT {item.name}: secondary set {item_set_numeric} already has {current_count} pieces (effective max {eff_secondary})")
|
|
return False
|
|
else:
|
|
# Check if this is a jewelry item - only allow if it contributes required spells
|
|
jewelry_slots = {"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"}
|
|
if item.slot in jewelry_slots:
|
|
# Jewelry MUST contribute to required spells to be accepted
|
|
if not self._jewelry_contributes_required_spell(item, state):
|
|
logger.debug(f"[DEBUG] REJECT {item.name}: jewelry doesn't contribute any required spells")
|
|
return False
|
|
logger.debug(f"[DEBUG] ACCEPT {item.name}: jewelry from set '{item_set_numeric}' contributes required spells")
|
|
else:
|
|
# STRICT: Reject armor items from other sets
|
|
# Only allow armor from the two user-selected sets
|
|
logger.debug(f"[DEBUG] REJECT {item.name}: armor from other set '{item_set_numeric}', only primary '{self.constraints.primary_set}' and secondary '{self.constraints.secondary_set}' allowed")
|
|
return False
|
|
else:
|
|
# For set optimization, reject items with no set ID unless they're clothing or jewelry
|
|
jewelry_slots = {"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"}
|
|
if item.slot in ['Shirt', 'Pants']:
|
|
# Allow clothing items even without set ID
|
|
logger.debug(f"[DEBUG] ACCEPT {item.name}: clothing item without set ID, allowed")
|
|
elif item.slot in jewelry_slots:
|
|
# Jewelry MUST contribute to required spells to be accepted
|
|
if not self._jewelry_contributes_required_spell(item, state):
|
|
logger.debug(f"[DEBUG] REJECT {item.name}: jewelry doesn't contribute any required spells")
|
|
return False
|
|
logger.debug(f"[DEBUG] ACCEPT {item.name}: jewelry without set ID contributes required spells")
|
|
else:
|
|
# Reject armor items without set ID for set optimization
|
|
logger.debug(f"[DEBUG] REJECT {item.name}: no set ID and not clothing/jewelry")
|
|
return False
|
|
|
|
# 4. Spell overlap constraints - STRICT: Reject items that don't contribute new spells
|
|
if self.constraints.required_spells and item.spell_names:
|
|
if not self._can_get_beneficial_spell_from(item, state):
|
|
# STRICT MODE: No fallback for target sets or good stats
|
|
# Items with ALL duplicate spells are rejected
|
|
# (Items with SOME new spells + some duplicates are accepted by _can_get_beneficial_spell_from)
|
|
logger.debug(f"[DEBUG] REJECT {item.name}: all spells are duplicates")
|
|
return False
|
|
|
|
logger.debug(f"[DEBUG] ACCEPT {item.name}: passed all constraints")
|
|
return True
|
|
|
|
def _is_double_spell_acceptable(self, item: SuitItem, overlap: int) -> bool:
|
|
"""Check if overlapping spell is acceptable (double spell both needed)."""
|
|
# Item must have exactly 2 spells for double spell logic
|
|
if len(item.spell_names) != 2:
|
|
return False
|
|
|
|
# Both spells must be in needed constraints
|
|
needed_bitmap = self.needed_spell_bitmap
|
|
for spell in item.spell_names:
|
|
spell_bit = self.spell_index.get_bitmap([spell])
|
|
if not (spell_bit & needed_bitmap):
|
|
return False
|
|
|
|
return True
|
|
|
|
def finalize_suit(self, state: SuitState) -> Optional[CompletedSuit]:
|
|
"""Convert state to completed suit with scoring."""
|
|
if not state.items:
|
|
return None
|
|
|
|
# Calculate score based on constraints and priorities
|
|
score = self._calculate_score(state)
|
|
|
|
# Determine fulfilled and missing spells
|
|
fulfilled_spells = []
|
|
missing_spells = []
|
|
|
|
if self.constraints.required_spells:
|
|
needed_bitmap = self.needed_spell_bitmap
|
|
fulfilled_bitmap = state.spell_bitmap & needed_bitmap
|
|
missing_bitmap = needed_bitmap & ~state.spell_bitmap
|
|
|
|
fulfilled_spells = self.spell_index.get_spell_names(fulfilled_bitmap)
|
|
missing_spells = self.spell_index.get_spell_names(missing_bitmap)
|
|
|
|
# Add locked spells to fulfilled and remove from missing
|
|
if hasattr(self, 'locked_spells') and self.locked_spells:
|
|
for spell in self.locked_spells:
|
|
if spell in missing_spells:
|
|
missing_spells.remove(spell)
|
|
fulfilled_spells.append(spell)
|
|
elif spell not in fulfilled_spells:
|
|
fulfilled_spells.append(spell)
|
|
|
|
return CompletedSuit(
|
|
items=state.items.copy(),
|
|
score=score,
|
|
total_armor=state.total_armor,
|
|
total_ratings=state.total_ratings.copy(),
|
|
set_counts=state.set_counts.copy(),
|
|
fulfilled_spells=fulfilled_spells,
|
|
missing_spells=missing_spells
|
|
)
|
|
|
|
def _calculate_score(self, state: SuitState) -> int:
|
|
"""Calculate suit score based on user specifications."""
|
|
score = 0
|
|
weights = self.scoring_weights
|
|
|
|
logger.debug(f"[SCORING] Starting score calculation for suit with {len(state.items)} items")
|
|
logger.debug(f"[SCORING] Set counts: {state.set_counts}")
|
|
logger.debug(f"[SCORING] Total ratings: {state.total_ratings}")
|
|
|
|
# 1. Complete armor sets (highest priority)
|
|
from main import translate_equipment_set_id
|
|
primary_set_name = translate_equipment_set_id(str(self.constraints.primary_set)) if self.constraints.primary_set else None
|
|
secondary_set_name = translate_equipment_set_id(str(self.constraints.secondary_set)) if self.constraints.secondary_set else None
|
|
|
|
logger.debug(f"[SCORING] Looking for primary set: {primary_set_name} (ID: {self.constraints.primary_set})")
|
|
logger.debug(f"[SCORING] Looking for secondary set: {secondary_set_name} (ID: {self.constraints.secondary_set})")
|
|
|
|
# Get FOUND counts (items in this suit, not including locked)
|
|
found_primary = state.set_counts.get(self.constraints.primary_set, 0) if self.constraints.primary_set else 0
|
|
found_secondary = state.set_counts.get(self.constraints.secondary_set, 0) if self.constraints.secondary_set else 0
|
|
|
|
# Get locked counts for display
|
|
locked_primary = self.locked_set_counts.get(self.constraints.primary_set, 0) if hasattr(self, 'locked_set_counts') and self.constraints.primary_set else 0
|
|
locked_secondary = self.locked_set_counts.get(self.constraints.secondary_set, 0) if hasattr(self, 'locked_set_counts') and self.constraints.secondary_set else 0
|
|
|
|
# Total counts (found + locked) for display
|
|
total_primary = found_primary + locked_primary
|
|
total_secondary = found_secondary + locked_secondary
|
|
|
|
# Get effective requirements with fallback
|
|
eff_primary = getattr(self, 'effective_primary_needed', 5)
|
|
eff_secondary = getattr(self, 'effective_secondary_needed', 4)
|
|
|
|
logger.debug(f"[SCORING] Primary: {found_primary} found + {locked_primary} locked = {total_primary} total (need {eff_primary} more)")
|
|
logger.debug(f"[SCORING] Secondary: {found_secondary} found + {locked_secondary} locked = {total_secondary} total (need {eff_secondary} more)")
|
|
|
|
# Complete set bonuses: compare FOUND against EFFECTIVE requirements
|
|
if found_primary >= eff_primary:
|
|
score += weights.armor_set_complete
|
|
logger.debug(f"[SCORING] Primary set complete: +{weights.armor_set_complete}")
|
|
# Penalty for EXCESS primary pieces (should have gone to secondary)
|
|
if found_primary > eff_primary:
|
|
excess = found_primary - eff_primary
|
|
excess_penalty = excess * 500 # STRONG penalty per excess piece
|
|
score -= excess_penalty
|
|
logger.debug(f"[SCORING] Primary set EXCESS ({excess} extra): -{excess_penalty}")
|
|
else:
|
|
# Missing set penalty: -200 per missing piece
|
|
if self.constraints.primary_set and found_primary > 0:
|
|
missing_pieces = eff_primary - found_primary
|
|
penalty = missing_pieces * weights.missing_set_penalty
|
|
score += penalty # negative penalty
|
|
logger.debug(f"[SCORING] Primary set incomplete ({found_primary}/{eff_primary}): {penalty}")
|
|
|
|
if found_secondary >= eff_secondary:
|
|
score += weights.armor_set_complete
|
|
logger.debug(f"[SCORING] Secondary set complete: +{weights.armor_set_complete}")
|
|
# Penalty for EXCESS secondary pieces
|
|
if found_secondary > eff_secondary:
|
|
excess = found_secondary - eff_secondary
|
|
excess_penalty = excess * 500 # STRONG penalty
|
|
score -= excess_penalty
|
|
logger.debug(f"[SCORING] Secondary set EXCESS ({excess} extra): -{excess_penalty}")
|
|
else:
|
|
# Missing set penalty: -200 per missing piece
|
|
if self.constraints.secondary_set and found_secondary > 0:
|
|
missing_pieces = eff_secondary - found_secondary
|
|
penalty = missing_pieces * weights.missing_set_penalty
|
|
score += penalty # negative penalty
|
|
logger.debug(f"[SCORING] Secondary set incomplete ({found_secondary}/{eff_secondary}): {penalty}")
|
|
|
|
# 2. Crit Damage Rating: CD1 = +10, CD2 = +20 per piece
|
|
for item in state.items.values():
|
|
crit_rating = item.ratings.get('crit_damage_rating', 0)
|
|
if crit_rating == 1:
|
|
score += weights.crit_damage_1
|
|
elif crit_rating == 2:
|
|
score += weights.crit_damage_2
|
|
|
|
# 3. Damage Rating on clothes only: DR1=+10, DR2=+20, DR3=+30 per piece
|
|
for item in state.items.values():
|
|
if item.slot in ['Shirt', 'Pants']: # Only clothes
|
|
damage_rating = item.ratings.get('damage_rating', 0)
|
|
if damage_rating == 1:
|
|
score += weights.damage_rating_1
|
|
elif damage_rating == 2:
|
|
score += weights.damage_rating_2
|
|
elif damage_rating == 3:
|
|
score += weights.damage_rating_3
|
|
|
|
# 4. Spell Coverage: +100 per fulfilled cantrip/ward (no duplicates)
|
|
if self.constraints.required_spells:
|
|
fulfilled_spells = state.spell_bitmap & self.needed_spell_bitmap
|
|
fulfilled_count = bin(fulfilled_spells).count('1')
|
|
spell_score = fulfilled_count * 100
|
|
score += spell_score
|
|
logger.debug(f"[SCORING] Spell coverage: {fulfilled_count} spells = +{spell_score} points")
|
|
|
|
# 5. Base score for having items (so suits aren't rejected as 0)
|
|
# Add small base score per item to avoid 0 scores
|
|
base_item_score = len(state.items) * 5
|
|
score += base_item_score
|
|
logger.debug(f"[SCORING] Base item score: {len(state.items)} items = +{base_item_score} points")
|
|
|
|
# 6. Armor level as tiebreaker (LOWEST PRIORITY)
|
|
# Scale down significantly so it only matters when other scores are equal
|
|
armor_score = state.total_armor // 100 # ~5 points per 500 AL
|
|
score += armor_score
|
|
logger.debug(f"[SCORING] Armor level tiebreaker: {state.total_armor} AL = +{armor_score} points")
|
|
|
|
logger.debug(f"[SCORING] Final score: {score}")
|
|
|
|
return max(0, score) # Never negative
|
|
|
|
def is_better_than_existing(self, suit: CompletedSuit) -> bool:
|
|
"""Check if suit is worth keeping."""
|
|
logger.info(f"[DEBUG] is_better_than_existing: checking suit with score {suit.score}, current best_suits count: {len(self.best_suits)}, max_results: {self.constraints.max_results}")
|
|
if len(self.best_suits) < self.constraints.max_results:
|
|
logger.info(f"[DEBUG] is_better_than_existing: ACCEPTING suit - not at max capacity")
|
|
return True
|
|
# Keep suits with more items even if score is lower - we want complete suits
|
|
lowest_suit = self.best_suits[-1]
|
|
logger.info(f"[DEBUG] is_better_than_existing: comparing against lowest suit score {lowest_suit.score}")
|
|
if len(suit.items) > len(lowest_suit.items):
|
|
logger.info(f"[DEBUG] is_better_than_existing: ACCEPTING suit - more items ({len(suit.items)} > {len(lowest_suit.items)})")
|
|
return True
|
|
result = suit.score > lowest_suit.score
|
|
logger.info(f"[DEBUG] is_better_than_existing: score comparison result: {result}")
|
|
return result
|
|
|
|
def _has_room_for_armor_set(self, item: SuitItem, state: SuitState) -> bool:
|
|
"""Check if adding this armor piece violates set limits (Mag-SuitBuilder HasRoomForArmorSet).
|
|
|
|
Uses effective limits which account for locked slots.
|
|
"""
|
|
if not item.set_id:
|
|
return True # Non-set items don't count against limits
|
|
|
|
current_count = state.set_counts.get(item.set_id, 0)
|
|
|
|
# Use effective limits (which account for locked slots)
|
|
eff_primary = getattr(self, 'effective_primary_needed', 5)
|
|
eff_secondary = getattr(self, 'effective_secondary_needed', 4)
|
|
|
|
if item.set_id == self.constraints.primary_set:
|
|
# Hard limit: don't add more primary set pieces than needed
|
|
has_room = current_count < eff_primary
|
|
if not has_room:
|
|
logger.debug(f"[SET_LIMIT] Rejecting {item.name} - already have {current_count}/{eff_primary} primary set pieces")
|
|
return has_room
|
|
elif item.set_id == self.constraints.secondary_set:
|
|
has_room = current_count < eff_secondary
|
|
if not has_room:
|
|
logger.debug(f"[SET_LIMIT] Rejecting {item.name} - already have {current_count}/{eff_secondary} secondary set pieces")
|
|
return has_room
|
|
else:
|
|
# STRICT: Other sets not allowed for armor
|
|
# Only jewelry can be from other sets
|
|
jewelry_slots = {"Neck", "Left Ring", "Right Ring", "Left Wrist", "Right Wrist", "Trinket"}
|
|
return item.slot in jewelry_slots
|
|
|
|
def _convert_set_name_to_id(self, set_name: Optional[str]) -> Optional[int]:
|
|
"""Convert set name to numeric ID for constraint comparison."""
|
|
if not set_name:
|
|
return None
|
|
|
|
# Reverse mapping of SET_NAMES
|
|
name_to_id = {
|
|
"Adept's Set": 14,
|
|
"Defender's Set": 16,
|
|
"Soldier's Set": 13,
|
|
"Wise Set": 21,
|
|
"Heroic Protector Set": 40,
|
|
"Heroic Destroyer Set": 41,
|
|
"Relic Alduressa Set": 46,
|
|
"Ancient Relic Set": 47,
|
|
"Noble Relic Set": 48,
|
|
"Archer's Set": 15,
|
|
"Hearty Set": 19,
|
|
"Dexterous Set": 20,
|
|
"Swift Set": 22,
|
|
"Reinforced Set": 24,
|
|
"Flame Proof Set": 26,
|
|
"Lightning Proof Set": 29
|
|
}
|
|
|
|
return name_to_id.get(set_name)
|
|
|
|
def _can_get_beneficial_spell_from(self, item: SuitItem, state: SuitState) -> bool:
|
|
"""Check if item provides beneficial spells without duplicates.
|
|
|
|
STRICT MODE: Reject items that only have duplicate spells, even from target sets.
|
|
This prevents wasted spell slots (e.g., Flame Ward on 3 armor pieces).
|
|
"""
|
|
# Non-spell items are always beneficial (armor/ratings only)
|
|
if not item.spell_names:
|
|
return True
|
|
|
|
# If no spell constraints specified, allow any item
|
|
if not self.constraints.required_spells:
|
|
return True
|
|
|
|
# STRICT: Check if item provides ANY new required spell not already covered
|
|
needed_bitmap = self.needed_spell_bitmap
|
|
current_bitmap = state.spell_bitmap
|
|
item_bitmap = item.spell_bitmap
|
|
|
|
# Item MUST provide at least one new required spell
|
|
new_beneficial_spells = item_bitmap & needed_bitmap & ~current_bitmap
|
|
|
|
if new_beneficial_spells != 0:
|
|
return True # Has new required spells - beneficial
|
|
|
|
# STRICT REJECTION: No new required spells = reject
|
|
# This applies even to primary/secondary set pieces
|
|
# Rationale: Duplicate spells waste valuable spell slots
|
|
return False
|
|
|
|
def _jewelry_contributes_required_spell(self, item: SuitItem, state: SuitState) -> bool:
|
|
"""Check if jewelry item contributes at least one required spell not already covered.
|
|
|
|
Jewelry should ONLY be added if it fulfills spell constraints. Empty slots are
|
|
preferred over jewelry that doesn't contribute to required spells.
|
|
"""
|
|
# If no spell constraints, don't add jewelry (nothing to contribute)
|
|
if not self.constraints.required_spells:
|
|
logger.debug(f"[JEWELRY] REJECT {item.name}: no required spells specified")
|
|
return False
|
|
|
|
# Item must have spells to contribute
|
|
if not item.spell_names:
|
|
logger.debug(f"[JEWELRY] REJECT {item.name}: item has no spells")
|
|
return False
|
|
|
|
# Check if item has ANY required spell that's not already in the suit
|
|
needed_bitmap = self.needed_spell_bitmap
|
|
current_bitmap = state.spell_bitmap
|
|
|
|
for spell in item.spell_names:
|
|
spell_bit = self.spell_index.get_bitmap([spell])
|
|
# Check if this spell is required AND not already covered
|
|
if (spell_bit & needed_bitmap) and not (current_bitmap & spell_bit):
|
|
logger.debug(f"[JEWELRY] ACCEPT {item.name}: contributes uncovered spell '{spell}'")
|
|
return True
|
|
|
|
logger.debug(f"[JEWELRY] REJECT {item.name}: no new required spells contributed")
|
|
return False
|
|
|
|
|
|
# API Endpoints
|
|
@router.get("/characters")
|
|
async def get_available_characters():
|
|
"""Get list of characters with inventory data."""
|
|
query = """
|
|
SELECT DISTINCT character_name
|
|
FROM items
|
|
ORDER BY character_name
|
|
"""
|
|
|
|
try:
|
|
rows = await database.fetch_all(query)
|
|
characters = [row['character_name'] for row in rows]
|
|
return {"characters": characters}
|
|
except Exception as e:
|
|
logger.error(f"Error fetching characters: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/search")
|
|
async def search_suits(request: Request, constraints: SearchConstraints):
|
|
"""Start suit search with streaming results."""
|
|
# Create cancellation check function
|
|
async def is_cancelled():
|
|
return await request.is_disconnected()
|
|
|
|
solver = ConstraintSatisfactionSolver(constraints, is_cancelled=is_cancelled)
|
|
|
|
async def event_generator():
|
|
try:
|
|
logger.info("Starting SSE event generator")
|
|
async for result in solver.search():
|
|
# Check for client disconnection
|
|
if await request.is_disconnected():
|
|
logger.info("Client disconnected, stopping search")
|
|
yield {
|
|
"event": "cancelled",
|
|
"data": json.dumps({"message": "Search cancelled by client"})
|
|
}
|
|
return
|
|
|
|
logger.info(f"Yielding SSE event: {result.type}")
|
|
try:
|
|
# Send full data for frontend processing
|
|
data_json = json.dumps(result.data)
|
|
|
|
yield {
|
|
"event": result.type,
|
|
"data": data_json
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error serializing result data: {e}", exc_info=True)
|
|
logger.error(f"Result type: {result.type}, Data keys: {list(result.data.keys()) if hasattr(result.data, 'keys') else 'N/A'}")
|
|
yield {
|
|
"event": "error",
|
|
"data": json.dumps({"message": f"Serialization error: {str(e)}"})
|
|
}
|
|
logger.info("SSE event generator completed")
|
|
except Exception as e:
|
|
logger.error(f"Error in search generator: {e}", exc_info=True)
|
|
yield {
|
|
"event": "error",
|
|
"data": json.dumps({"message": f"Search error: {str(e)}"})
|
|
}
|
|
|
|
async def sse_generator():
|
|
async for event in event_generator():
|
|
# Manual SSE format
|
|
yield f"event: {event['event']}\n"
|
|
yield f"data: {event['data']}\n\n"
|
|
|
|
return StreamingResponse(
|
|
sse_generator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"Access-Control-Allow-Origin": "*",
|
|
"Access-Control-Allow-Headers": "Cache-Control"
|
|
}
|
|
)
|
|
|
|
|
|
@router.post("/search-test")
|
|
async def search_suits_test(constraints: SearchConstraints):
|
|
"""Test endpoint for suit search without SSE."""
|
|
try:
|
|
solver = ConstraintSatisfactionSolver(constraints)
|
|
results = []
|
|
|
|
async for result in solver.search():
|
|
if result.type == "suit":
|
|
results.append({
|
|
"score": result.data.get("score", 0),
|
|
"total_armor": result.data.get("total_armor", 0),
|
|
"item_count": len(result.data.get("items", {}))
|
|
})
|
|
elif result.type == "complete":
|
|
return {
|
|
"success": True,
|
|
"suits_found": len(results),
|
|
"duration": result.data.get("duration", 0),
|
|
"results": results[:5] # Return first 5 results
|
|
}
|
|
|
|
return {"success": False, "error": "No completion event received"}
|
|
except Exception as e:
|
|
logger.error(f"Test search error: {e}", exc_info=True)
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
@router.get("/sets")
|
|
async def get_equipment_sets():
|
|
"""Get available equipment sets."""
|
|
# Return hardcoded sets for now
|
|
sets = [
|
|
{"id": 14, "name": "Adept's"},
|
|
{"id": 16, "name": "Defender's"},
|
|
{"id": 13, "name": "Soldier's"},
|
|
{"id": 21, "name": "Wise"},
|
|
{"id": 40, "name": "Heroic Protector"},
|
|
{"id": 41, "name": "Heroic Destroyer"},
|
|
{"id": 46, "name": "Relic Alduressa"},
|
|
{"id": 47, "name": "Ancient Relic"},
|
|
{"id": 48, "name": "Noble Relic"},
|
|
{"id": 15, "name": "Archer's"},
|
|
{"id": 19, "name": "Hearty"},
|
|
{"id": 20, "name": "Dexterous"},
|
|
{"id": 22, "name": "Swift"},
|
|
{"id": 24, "name": "Reinforced"},
|
|
{"id": 26, "name": "Flame Proof"},
|
|
{"id": 29, "name": "Lightning Proof"}
|
|
]
|
|
return {"sets": sets} |