diff --git a/main.py b/main.py index f9958c10..a3dd906e 100644 --- a/main.py +++ b/main.py @@ -2560,6 +2560,14 @@ _inventory_delta_tasks: set[asyncio.Task] = set() # while deltas for DIFFERENT characters still run concurrently. _inventory_char_locks: Dict[str, asyncio.Lock] = {} +# Global cap on concurrent inventory forwards. The plugin debounces inventory +# updates into a periodic per-character flush; if many characters' flush timers +# align (e.g. after a fleet-wide auto-update relog wave) the forwards arrive as +# a synchronized burst. Without a cap that burst floods the single event loop +# and httpx pool, starving telemetry processing → the player count flaps. +# This bounds how many forwards run at once so telemetry always gets the loop. +_inventory_forward_sem = asyncio.Semaphore(8) + def _get_inventory_char_lock(char_name: str) -> asyncio.Lock: lock = _inventory_char_locks.get(char_name) @@ -2592,39 +2600,47 @@ async def _do_handle_inventory_delta(data: dict): char_name = data.get("character_name", "unknown") global _inventory_http_client if _inventory_http_client is None: - _inventory_http_client = httpx.AsyncClient(timeout=10.0) + # Bounded pool + short timeout: a stale/slow connection to + # inventory-service can't tie up a forward slot for long. + _inventory_http_client = httpx.AsyncClient( + timeout=httpx.Timeout(5.0, connect=2.0), + limits=httpx.Limits(max_connections=10, max_keepalive_connections=5), + ) client = _inventory_http_client - if action == "remove": - item_id = data.get("item_id") - if item_id is not None: - resp = await client.delete( - f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item/{item_id}" - ) - if resp.status_code >= 400: - logger.warning( - f"Inventory service returned {resp.status_code} for delta remove item_id={item_id}" + # Cap concurrent forwards (see _inventory_forward_sem) so a synchronized + # flush burst can't monopolize the loop and starve telemetry. + async with _inventory_forward_sem: + if action == "remove": + item_id = data.get("item_id") + if item_id is not None: + resp = await client.delete( + f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item/{item_id}" ) - elif action in ("add", "update"): - item = data.get("item") - if item: - resp = await client.post( - f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item", - json=item, - ) - if resp.status_code < 400: - enriched_item = resp.json().get("item") - if enriched_item: - data = { - "type": "inventory_delta", - "action": action, - "character_name": char_name, - "item": enriched_item, - } - else: - logger.warning( - f"Inventory service returned {resp.status_code} for delta {action}" + if resp.status_code >= 400: + logger.warning( + f"Inventory service returned {resp.status_code} for delta remove item_id={item_id}" + ) + elif action in ("add", "update"): + item = data.get("item") + if item: + resp = await client.post( + f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item", + json=item, ) + if resp.status_code < 400: + enriched_item = resp.json().get("item") + if enriched_item: + data = { + "type": "inventory_delta", + "action": action, + "character_name": char_name, + "item": enriched_item, + } + else: + logger.warning( + f"Inventory service returned {resp.status_code} for delta {action}" + ) # Broadcast delta to all browser clients await _broadcast_to_browser_clients(data)