From 645feef9aaf769745957c160b0ddda7f43d649b5 Mon Sep 17 00:00:00 2001 From: Erik Date: Tue, 23 Jun 2026 22:18:34 +0200 Subject: [PATCH] perf(inventory): cap concurrent forwards so flush bursts can't starve telemetry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause of the player-count flapping: the plugin's debounced inventory flush, combined with a fleet-wide relog wave (auto-update) phase-aligning the 60s flush timers, produced a synchronized burst of inventory forwards every cycle. The burst flooded the single event loop + httpx pool (errors in _do_handle_inventory_delta even though inventory-service was idle), periodically starving telemetry ingest (cliff 116→5 rows/10s) so characters aged out of the 30s window and the count flapped. - Global asyncio.Semaphore(8) around inventory forwarding: a burst can never monopolize the loop; telemetry always gets through. - Tighten the shared httpx client (max_connections=10, keepalive=5, 5s timeout) so a stale/slow connection can't hold a slot. Pairs with the plugin-side flush-timer jitter (2–5 min, re-rolled per tick) that de-synchronizes the fleet at the source. Co-Authored-By: Claude Fable 5 --- main.py | 74 +++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 29 deletions(-) diff --git a/main.py b/main.py index f9958c10..a3dd906e 100644 --- a/main.py +++ b/main.py @@ -2560,6 +2560,14 @@ _inventory_delta_tasks: set[asyncio.Task] = set() # while deltas for DIFFERENT characters still run concurrently. _inventory_char_locks: Dict[str, asyncio.Lock] = {} +# Global cap on concurrent inventory forwards. The plugin debounces inventory +# updates into a periodic per-character flush; if many characters' flush timers +# align (e.g. after a fleet-wide auto-update relog wave) the forwards arrive as +# a synchronized burst. Without a cap that burst floods the single event loop +# and httpx pool, starving telemetry processing → the player count flaps. +# This bounds how many forwards run at once so telemetry always gets the loop. +_inventory_forward_sem = asyncio.Semaphore(8) + def _get_inventory_char_lock(char_name: str) -> asyncio.Lock: lock = _inventory_char_locks.get(char_name) @@ -2592,39 +2600,47 @@ async def _do_handle_inventory_delta(data: dict): char_name = data.get("character_name", "unknown") global _inventory_http_client if _inventory_http_client is None: - _inventory_http_client = httpx.AsyncClient(timeout=10.0) + # Bounded pool + short timeout: a stale/slow connection to + # inventory-service can't tie up a forward slot for long. + _inventory_http_client = httpx.AsyncClient( + timeout=httpx.Timeout(5.0, connect=2.0), + limits=httpx.Limits(max_connections=10, max_keepalive_connections=5), + ) client = _inventory_http_client - if action == "remove": - item_id = data.get("item_id") - if item_id is not None: - resp = await client.delete( - f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item/{item_id}" - ) - if resp.status_code >= 400: - logger.warning( - f"Inventory service returned {resp.status_code} for delta remove item_id={item_id}" + # Cap concurrent forwards (see _inventory_forward_sem) so a synchronized + # flush burst can't monopolize the loop and starve telemetry. + async with _inventory_forward_sem: + if action == "remove": + item_id = data.get("item_id") + if item_id is not None: + resp = await client.delete( + f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item/{item_id}" ) - elif action in ("add", "update"): - item = data.get("item") - if item: - resp = await client.post( - f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item", - json=item, - ) - if resp.status_code < 400: - enriched_item = resp.json().get("item") - if enriched_item: - data = { - "type": "inventory_delta", - "action": action, - "character_name": char_name, - "item": enriched_item, - } - else: - logger.warning( - f"Inventory service returned {resp.status_code} for delta {action}" + if resp.status_code >= 400: + logger.warning( + f"Inventory service returned {resp.status_code} for delta remove item_id={item_id}" + ) + elif action in ("add", "update"): + item = data.get("item") + if item: + resp = await client.post( + f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item", + json=item, ) + if resp.status_code < 400: + enriched_item = resp.json().get("item") + if enriched_item: + data = { + "type": "inventory_delta", + "action": action, + "character_name": char_name, + "item": enriched_item, + } + else: + logger.warning( + f"Inventory service returned {resp.status_code} for delta {action}" + ) # Broadcast delta to all browser clients await _broadcast_to_browser_clients(data)