diff --git a/main.py b/main.py index ff312607..04db6054 100644 --- a/main.py +++ b/main.py @@ -1247,6 +1247,14 @@ async def on_shutdown(): Ensures the database connection is closed cleanly. """ + # Close shared httpx client used for inventory-service forwarding + global _inventory_http_client + if _inventory_http_client is not None: + try: + await _inventory_http_client.aclose() + except Exception: + pass + # Stop cache refresh tasks global _cache_task, _rares_cache_task, _server_health_task, _cleanup_task if _cache_task: @@ -2474,6 +2482,69 @@ browser_conns: Dict[WebSocket, set[str] | None] = {} # Mapping of plugin clients by character_name to their WebSocket for command forwarding plugin_conns: Dict[str, WebSocket] = {} +# Shared httpx client for inventory-service forwarding. Reusing one client +# pools connections — creating a fresh AsyncClient per delta blew open a new +# socket for every loot pickup, which contributed to receive-loop stalls. +_inventory_http_client: httpx.AsyncClient | None = None + +# Tasks for in-flight inventory_delta processing (fire-and-forget). +# Held in a set so they aren't garbage-collected mid-flight. +_inventory_delta_tasks: set[asyncio.Task] = set() + + +async def _handle_inventory_delta(data: dict): + """Forward an inventory_delta to inventory-service and broadcast to browsers. + + Runs as a background task so a slow inventory-service POST never blocks + the plugin WebSocket receive loop. If the receive loop blocks long enough, + Starlette stops processing keepalives and the connection drops. + """ + try: + action = data.get("action") + char_name = data.get("character_name", "unknown") + global _inventory_http_client + if _inventory_http_client is None: + _inventory_http_client = httpx.AsyncClient(timeout=10.0) + client = _inventory_http_client + + if action == "remove": + item_id = data.get("item_id") + if item_id is not None: + resp = await client.delete( + f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item/{item_id}" + ) + if resp.status_code >= 400: + logger.warning( + f"Inventory service returned {resp.status_code} for delta remove item_id={item_id}" + ) + elif action in ("add", "update"): + item = data.get("item") + if item: + resp = await client.post( + f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item", + json=item, + ) + if resp.status_code < 400: + enriched_item = resp.json().get("item") + if enriched_item: + data = { + "type": "inventory_delta", + "action": action, + "character_name": char_name, + "item": enriched_item, + } + else: + logger.warning( + f"Inventory service returned {resp.status_code} for delta {action}" + ) + + # Broadcast delta to all browser clients + await _broadcast_to_browser_clients(data) + logger.debug(f"Inventory delta ({action}) for {char_name}") + except Exception as e: + logger.error(f"Failed to process inventory delta: {e}", exc_info=True) + + # --- Vital sharing (cross-machine VTankFellowHeals replacement) ---------- # Characters that have opted-in to vital/position/item/cast sharing. Backend # forwards share_* messages only between subscribers and excludes the sender. @@ -2852,8 +2923,19 @@ async def ws_receive_snapshots( raw = await websocket.receive_text() # Debug: log all incoming plugin WebSocket messages logger.debug(f"Plugin WebSocket RX from {websocket.client}: {raw}") - except WebSocketDisconnect: - logger.info(f"🔌 PLUGIN_DISCONNECTED: {websocket.client}") + except WebSocketDisconnect as e: + logger.info( + f"🔌 PLUGIN_DISCONNECTED: {websocket.client} (code={e.code}, reason={e.reason!r})" + ) + break + except RuntimeError as e: + # Starlette raises RuntimeError instead of WebSocketDisconnect + # in some edge cases (connection closed mid-await before accept + # completed, or send happening on a torn-down socket). Treat + # the same as a clean disconnect rather than crashing the loop. + logger.info( + f"🔌 PLUGIN_DISCONNECTED (runtime): {websocket.client} ({e})" + ) break # Parse JSON payload try: @@ -3162,53 +3244,12 @@ async def ws_receive_snapshots( ) continue # --- Inventory delta: single item add/remove/update --- + # Fire-and-forget: if inventory-service is slow, do NOT block this + # WS receive loop (which would starve keepalives → connection drop). if msg_type == "inventory_delta": - try: - action = data.get("action") - char_name = data.get("character_name", "unknown") - - if action == "remove": - item_id = data.get("item_id") - if item_id is not None: - async with httpx.AsyncClient(timeout=10.0) as client: - resp = await client.delete( - f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item/{item_id}" - ) - if resp.status_code >= 400: - logger.warning( - f"Inventory service returned {resp.status_code} for delta remove item_id={item_id}" - ) - elif action in ("add", "update"): - item = data.get("item") - if item: - async with httpx.AsyncClient(timeout=10.0) as client: - resp = await client.post( - f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item", - json=item, - ) - if resp.status_code < 400: - # Use enriched item from inventory-service response for broadcast - resp_json = resp.json() - enriched_item = resp_json.get("item") - if enriched_item: - data = { - "type": "inventory_delta", - "action": action, - "character_name": char_name, - "item": enriched_item, - } - else: - logger.warning( - f"Inventory service returned {resp.status_code} for delta {action}" - ) - - # Broadcast delta to all browser clients - await _broadcast_to_browser_clients(data) - logger.debug(f"Inventory delta ({action}) for {char_name}") - except Exception as e: - logger.error( - f"Failed to process inventory delta: {e}", exc_info=True - ) + task = asyncio.create_task(_handle_inventory_delta(data)) + _inventory_delta_tasks.add(task) + task.add_done_callback(_inventory_delta_tasks.discard) continue # --- Vitals message: store character health/stamina/mana and broadcast --- if msg_type == "vitals":