diff --git a/main.py b/main.py index 6d718c90..48e108c0 100644 --- a/main.py +++ b/main.py @@ -81,6 +81,7 @@ _cached_total_kills: dict = {"total": 0, "last_updated": None} _cache_task: asyncio.Task | None = None _rares_cache_task: asyncio.Task | None = None _cleanup_task: asyncio.Task | None = None +_broadcast_tasks: set[asyncio.Task] = set() # Player tracking for debug purposes _player_history: list = [] # List of player sets from last 10 refreshes @@ -1196,6 +1197,14 @@ async def on_shutdown(): await _cleanup_task except asyncio.CancelledError: pass + # Cancel any in-flight broadcast tasks + if _broadcast_tasks: + logger.info(f"Cancelling {len(_broadcast_tasks)} in-flight broadcast tasks") + for task in _broadcast_tasks: + task.cancel() + await asyncio.gather(*_broadcast_tasks, return_exceptions=True) + _broadcast_tasks.clear() + logger.info("Disconnecting from database") await database.disconnect() @@ -1992,37 +2001,45 @@ browser_conns: set[WebSocket] = set() plugin_conns: Dict[str, WebSocket] = {} +async def _send_to_browser(ws: WebSocket, data: dict) -> WebSocket | None: + """Send data to a single browser client. Returns the ws if it failed, None if ok.""" + try: + await asyncio.wait_for(ws.send_json(data), timeout=1.0) + except (WebSocketDisconnect, RuntimeError, ConnectionAbortedError) as e: + logger.debug(f"Detected disconnected browser client: {e}") + return ws + except asyncio.TimeoutError: + logger.warning( + "Timed out broadcasting to browser client; removing stale connection" + ) + return ws + except Exception as e: + logger.warning(f"Unexpected error broadcasting to browser client: {e}") + return ws + return None + + +async def _do_broadcast(data: dict): + """Send data to all browser clients concurrently. Runs as a background task.""" + clients = list(browser_conns) + if not clients: + return + results = await asyncio.gather(*(_send_to_browser(ws, data) for ws in clients)) + for ws in results: + if ws is not None: + browser_conns.discard(ws) + + async def _broadcast_to_browser_clients(snapshot: dict): """Broadcast a telemetry or chat message to all connected browser clients. - Converts any non-serializable types (e.g., datetime) before sending. - Handles connection errors gracefully and removes stale connections. + Fires off a background task so the plugin receive loop is never blocked + by slow browser connections. """ - # Convert snapshot payload to JSON-friendly types data = jsonable_encoder(snapshot) - # Use list() to avoid "set changed size during iteration" errors - disconnected_clients = [] - - for ws in list(browser_conns): - try: - await asyncio.wait_for(ws.send_json(data), timeout=1.0) - except (WebSocketDisconnect, RuntimeError, ConnectionAbortedError) as e: - # Collect disconnected clients for cleanup - disconnected_clients.append(ws) - logger.debug(f"Detected disconnected browser client: {e}") - except asyncio.TimeoutError: - disconnected_clients.append(ws) - logger.warning( - "Timed out broadcasting to browser client; removing stale connection" - ) - except Exception as e: - # Handle any other unexpected errors - disconnected_clients.append(ws) - logger.warning(f"Unexpected error broadcasting to browser client: {e}") - - # Clean up disconnected clients - for ws in disconnected_clients: - browser_conns.discard(ws) + task = asyncio.create_task(_do_broadcast(data)) + _broadcast_tasks.add(task) + task.add_done_callback(_broadcast_tasks.discard) async def _forward_to_inventory_service(inventory_msg: FullInventoryMessage):