fix(ws): non-blocking inventory_delta + better disconnect handling
Two issues causing plugin WS disconnects on heavy-loot characters:
1. inventory_delta processing was awaiting an httpx POST to inventory-
service inline within the WS receive loop. Each delta also created a
fresh httpx.AsyncClient (no connection pool reuse). When inventory-
service was slow under load, the receive loop blocked, keepalives
stopped flowing, and the connection eventually dropped (especially
for characters spamming deltas: Elliot was reconnecting ~every 4 min).
Fix: process each delta as an asyncio.create_task() — the WS receive
loop returns immediately to read the next message. Use a shared
httpx.AsyncClient with connection pooling.
2. websocket.receive_text() raises RuntimeError ("Need to call accept
first") instead of WebSocketDisconnect in some race conditions when
the connection closes mid-await. The receive loop only caught
WebSocketDisconnect, so RuntimeError propagated up as an exception
traceback in logs.
Fix: catch RuntimeError and log as a clean disconnect.
Also: log close code/reason on WebSocketDisconnect so we can tell apart
clean closes (1000/1001) from network drops (1006) etc.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
f111e5063b
commit
e512c1c296
1 changed files with 89 additions and 48 deletions
137
main.py
137
main.py
|
|
@ -1247,6 +1247,14 @@ async def on_shutdown():
|
|||
|
||||
Ensures the database connection is closed cleanly.
|
||||
"""
|
||||
# Close shared httpx client used for inventory-service forwarding
|
||||
global _inventory_http_client
|
||||
if _inventory_http_client is not None:
|
||||
try:
|
||||
await _inventory_http_client.aclose()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Stop cache refresh tasks
|
||||
global _cache_task, _rares_cache_task, _server_health_task, _cleanup_task
|
||||
if _cache_task:
|
||||
|
|
@ -2474,6 +2482,69 @@ browser_conns: Dict[WebSocket, set[str] | None] = {}
|
|||
# Mapping of plugin clients by character_name to their WebSocket for command forwarding
|
||||
plugin_conns: Dict[str, WebSocket] = {}
|
||||
|
||||
# Shared httpx client for inventory-service forwarding. Reusing one client
|
||||
# pools connections — creating a fresh AsyncClient per delta blew open a new
|
||||
# socket for every loot pickup, which contributed to receive-loop stalls.
|
||||
_inventory_http_client: httpx.AsyncClient | None = None
|
||||
|
||||
# Tasks for in-flight inventory_delta processing (fire-and-forget).
|
||||
# Held in a set so they aren't garbage-collected mid-flight.
|
||||
_inventory_delta_tasks: set[asyncio.Task] = set()
|
||||
|
||||
|
||||
async def _handle_inventory_delta(data: dict):
|
||||
"""Forward an inventory_delta to inventory-service and broadcast to browsers.
|
||||
|
||||
Runs as a background task so a slow inventory-service POST never blocks
|
||||
the plugin WebSocket receive loop. If the receive loop blocks long enough,
|
||||
Starlette stops processing keepalives and the connection drops.
|
||||
"""
|
||||
try:
|
||||
action = data.get("action")
|
||||
char_name = data.get("character_name", "unknown")
|
||||
global _inventory_http_client
|
||||
if _inventory_http_client is None:
|
||||
_inventory_http_client = httpx.AsyncClient(timeout=10.0)
|
||||
client = _inventory_http_client
|
||||
|
||||
if action == "remove":
|
||||
item_id = data.get("item_id")
|
||||
if item_id is not None:
|
||||
resp = await client.delete(
|
||||
f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item/{item_id}"
|
||||
)
|
||||
if resp.status_code >= 400:
|
||||
logger.warning(
|
||||
f"Inventory service returned {resp.status_code} for delta remove item_id={item_id}"
|
||||
)
|
||||
elif action in ("add", "update"):
|
||||
item = data.get("item")
|
||||
if item:
|
||||
resp = await client.post(
|
||||
f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item",
|
||||
json=item,
|
||||
)
|
||||
if resp.status_code < 400:
|
||||
enriched_item = resp.json().get("item")
|
||||
if enriched_item:
|
||||
data = {
|
||||
"type": "inventory_delta",
|
||||
"action": action,
|
||||
"character_name": char_name,
|
||||
"item": enriched_item,
|
||||
}
|
||||
else:
|
||||
logger.warning(
|
||||
f"Inventory service returned {resp.status_code} for delta {action}"
|
||||
)
|
||||
|
||||
# Broadcast delta to all browser clients
|
||||
await _broadcast_to_browser_clients(data)
|
||||
logger.debug(f"Inventory delta ({action}) for {char_name}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process inventory delta: {e}", exc_info=True)
|
||||
|
||||
|
||||
# --- Vital sharing (cross-machine VTankFellowHeals replacement) ----------
|
||||
# Characters that have opted-in to vital/position/item/cast sharing. Backend
|
||||
# forwards share_* messages only between subscribers and excludes the sender.
|
||||
|
|
@ -2852,8 +2923,19 @@ async def ws_receive_snapshots(
|
|||
raw = await websocket.receive_text()
|
||||
# Debug: log all incoming plugin WebSocket messages
|
||||
logger.debug(f"Plugin WebSocket RX from {websocket.client}: {raw}")
|
||||
except WebSocketDisconnect:
|
||||
logger.info(f"🔌 PLUGIN_DISCONNECTED: {websocket.client}")
|
||||
except WebSocketDisconnect as e:
|
||||
logger.info(
|
||||
f"🔌 PLUGIN_DISCONNECTED: {websocket.client} (code={e.code}, reason={e.reason!r})"
|
||||
)
|
||||
break
|
||||
except RuntimeError as e:
|
||||
# Starlette raises RuntimeError instead of WebSocketDisconnect
|
||||
# in some edge cases (connection closed mid-await before accept
|
||||
# completed, or send happening on a torn-down socket). Treat
|
||||
# the same as a clean disconnect rather than crashing the loop.
|
||||
logger.info(
|
||||
f"🔌 PLUGIN_DISCONNECTED (runtime): {websocket.client} ({e})"
|
||||
)
|
||||
break
|
||||
# Parse JSON payload
|
||||
try:
|
||||
|
|
@ -3162,53 +3244,12 @@ async def ws_receive_snapshots(
|
|||
)
|
||||
continue
|
||||
# --- Inventory delta: single item add/remove/update ---
|
||||
# Fire-and-forget: if inventory-service is slow, do NOT block this
|
||||
# WS receive loop (which would starve keepalives → connection drop).
|
||||
if msg_type == "inventory_delta":
|
||||
try:
|
||||
action = data.get("action")
|
||||
char_name = data.get("character_name", "unknown")
|
||||
|
||||
if action == "remove":
|
||||
item_id = data.get("item_id")
|
||||
if item_id is not None:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
resp = await client.delete(
|
||||
f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item/{item_id}"
|
||||
)
|
||||
if resp.status_code >= 400:
|
||||
logger.warning(
|
||||
f"Inventory service returned {resp.status_code} for delta remove item_id={item_id}"
|
||||
)
|
||||
elif action in ("add", "update"):
|
||||
item = data.get("item")
|
||||
if item:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
resp = await client.post(
|
||||
f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item",
|
||||
json=item,
|
||||
)
|
||||
if resp.status_code < 400:
|
||||
# Use enriched item from inventory-service response for broadcast
|
||||
resp_json = resp.json()
|
||||
enriched_item = resp_json.get("item")
|
||||
if enriched_item:
|
||||
data = {
|
||||
"type": "inventory_delta",
|
||||
"action": action,
|
||||
"character_name": char_name,
|
||||
"item": enriched_item,
|
||||
}
|
||||
else:
|
||||
logger.warning(
|
||||
f"Inventory service returned {resp.status_code} for delta {action}"
|
||||
)
|
||||
|
||||
# Broadcast delta to all browser clients
|
||||
await _broadcast_to_browser_clients(data)
|
||||
logger.debug(f"Inventory delta ({action}) for {char_name}")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to process inventory delta: {e}", exc_info=True
|
||||
)
|
||||
task = asyncio.create_task(_handle_inventory_delta(data))
|
||||
_inventory_delta_tasks.add(task)
|
||||
task.add_done_callback(_inventory_delta_tasks.discard)
|
||||
continue
|
||||
# --- Vitals message: store character health/stamina/mana and broadcast ---
|
||||
if msg_type == "vitals":
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue