fix: decouple browser broadcast from plugin WebSocket receive loop
🧝♂️ Fix 1: The Mailman Story Imagine you're a mailman delivering letters. Before, you had to knock on every door and wait for someone to answer before you could go back to your truck and get more letters. But your truck keeps getting MORE letters dumped in it! And if you're stuck waiting at grandma's door... your truck overflows and letters fall everywhere! 📬💥 The fix: Now you have a magic helper elf! You hand the letters to the elf and say "you go deliver these!" while you run back to the truck to grab more. The elf handles the doors, you handle the truck. Nobody waits! 🧝♂️✨ 🍕 Fix 2: The Pizza Party Story Now let's talk about that elf. The elf had a problem too! Imagine you have 5 friends at a pizza party and you're handing out slices. Before, the elf would: 1. Give pizza to Tommy → wait for him to take a bite 🍕 2. Give pizza to Sally → wait for her to take a bite 🍕 3. Give pizza to Bobby → wait for him to take a bite 🍕 So boring! Everyone's just sitting there hungry! The fix: Now the elf throws ALL the pizza slices at the same time! 🍕🍕🍕🍕🍕 Everyone gets their pizza at once and nobody has to wait for Tommy to finish chewing! Yay! 🎉 Technical details: - Use asyncio.create_task() to fire-and-forget broadcasts - Use asyncio.gather() to send to all browsers concurrently - Plugin receive loop no longer blocks on slow browser clients Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
5c20d40f1f
commit
c0da36c280
1 changed files with 43 additions and 26 deletions
55
main.py
55
main.py
|
|
@ -81,6 +81,7 @@ _cached_total_kills: dict = {"total": 0, "last_updated": None}
|
||||||
_cache_task: asyncio.Task | None = None
|
_cache_task: asyncio.Task | None = None
|
||||||
_rares_cache_task: asyncio.Task | None = None
|
_rares_cache_task: asyncio.Task | None = None
|
||||||
_cleanup_task: asyncio.Task | None = None
|
_cleanup_task: asyncio.Task | None = None
|
||||||
|
_broadcast_tasks: set[asyncio.Task] = set()
|
||||||
|
|
||||||
# Player tracking for debug purposes
|
# Player tracking for debug purposes
|
||||||
_player_history: list = [] # List of player sets from last 10 refreshes
|
_player_history: list = [] # List of player sets from last 10 refreshes
|
||||||
|
|
@ -1196,6 +1197,14 @@ async def on_shutdown():
|
||||||
await _cleanup_task
|
await _cleanup_task
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
|
# Cancel any in-flight broadcast tasks
|
||||||
|
if _broadcast_tasks:
|
||||||
|
logger.info(f"Cancelling {len(_broadcast_tasks)} in-flight broadcast tasks")
|
||||||
|
for task in _broadcast_tasks:
|
||||||
|
task.cancel()
|
||||||
|
await asyncio.gather(*_broadcast_tasks, return_exceptions=True)
|
||||||
|
_broadcast_tasks.clear()
|
||||||
|
|
||||||
logger.info("Disconnecting from database")
|
logger.info("Disconnecting from database")
|
||||||
await database.disconnect()
|
await database.disconnect()
|
||||||
|
|
||||||
|
|
@ -1992,39 +2001,47 @@ browser_conns: set[WebSocket] = set()
|
||||||
plugin_conns: Dict[str, WebSocket] = {}
|
plugin_conns: Dict[str, WebSocket] = {}
|
||||||
|
|
||||||
|
|
||||||
async def _broadcast_to_browser_clients(snapshot: dict):
|
async def _send_to_browser(ws: WebSocket, data: dict) -> WebSocket | None:
|
||||||
"""Broadcast a telemetry or chat message to all connected browser clients.
|
"""Send data to a single browser client. Returns the ws if it failed, None if ok."""
|
||||||
|
|
||||||
Converts any non-serializable types (e.g., datetime) before sending.
|
|
||||||
Handles connection errors gracefully and removes stale connections.
|
|
||||||
"""
|
|
||||||
# Convert snapshot payload to JSON-friendly types
|
|
||||||
data = jsonable_encoder(snapshot)
|
|
||||||
# Use list() to avoid "set changed size during iteration" errors
|
|
||||||
disconnected_clients = []
|
|
||||||
|
|
||||||
for ws in list(browser_conns):
|
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(ws.send_json(data), timeout=1.0)
|
await asyncio.wait_for(ws.send_json(data), timeout=1.0)
|
||||||
except (WebSocketDisconnect, RuntimeError, ConnectionAbortedError) as e:
|
except (WebSocketDisconnect, RuntimeError, ConnectionAbortedError) as e:
|
||||||
# Collect disconnected clients for cleanup
|
|
||||||
disconnected_clients.append(ws)
|
|
||||||
logger.debug(f"Detected disconnected browser client: {e}")
|
logger.debug(f"Detected disconnected browser client: {e}")
|
||||||
|
return ws
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
disconnected_clients.append(ws)
|
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Timed out broadcasting to browser client; removing stale connection"
|
"Timed out broadcasting to browser client; removing stale connection"
|
||||||
)
|
)
|
||||||
|
return ws
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Handle any other unexpected errors
|
|
||||||
disconnected_clients.append(ws)
|
|
||||||
logger.warning(f"Unexpected error broadcasting to browser client: {e}")
|
logger.warning(f"Unexpected error broadcasting to browser client: {e}")
|
||||||
|
return ws
|
||||||
|
return None
|
||||||
|
|
||||||
# Clean up disconnected clients
|
|
||||||
for ws in disconnected_clients:
|
async def _do_broadcast(data: dict):
|
||||||
|
"""Send data to all browser clients concurrently. Runs as a background task."""
|
||||||
|
clients = list(browser_conns)
|
||||||
|
if not clients:
|
||||||
|
return
|
||||||
|
results = await asyncio.gather(*(_send_to_browser(ws, data) for ws in clients))
|
||||||
|
for ws in results:
|
||||||
|
if ws is not None:
|
||||||
browser_conns.discard(ws)
|
browser_conns.discard(ws)
|
||||||
|
|
||||||
|
|
||||||
|
async def _broadcast_to_browser_clients(snapshot: dict):
|
||||||
|
"""Broadcast a telemetry or chat message to all connected browser clients.
|
||||||
|
|
||||||
|
Fires off a background task so the plugin receive loop is never blocked
|
||||||
|
by slow browser connections.
|
||||||
|
"""
|
||||||
|
data = jsonable_encoder(snapshot)
|
||||||
|
task = asyncio.create_task(_do_broadcast(data))
|
||||||
|
_broadcast_tasks.add(task)
|
||||||
|
task.add_done_callback(_broadcast_tasks.discard)
|
||||||
|
|
||||||
|
|
||||||
async def _forward_to_inventory_service(inventory_msg: FullInventoryMessage):
|
async def _forward_to_inventory_service(inventory_msg: FullInventoryMessage):
|
||||||
"""Forward inventory data to the inventory microservice for processing."""
|
"""Forward inventory data to the inventory microservice for processing."""
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue