perf(inventory): cap concurrent forwards so flush bursts can't starve telemetry
Root cause of the player-count flapping: the plugin's debounced inventory flush, combined with a fleet-wide relog wave (auto-update) phase-aligning the 60s flush timers, produced a synchronized burst of inventory forwards every cycle. The burst flooded the single event loop + httpx pool (errors in _do_handle_inventory_delta even though inventory-service was idle), periodically starving telemetry ingest (cliff 116→5 rows/10s) so characters aged out of the 30s window and the count flapped. - Global asyncio.Semaphore(8) around inventory forwarding: a burst can never monopolize the loop; telemetry always gets through. - Tighten the shared httpx client (max_connections=10, keepalive=5, 5s timeout) so a stale/slow connection can't hold a slot. Pairs with the plugin-side flush-timer jitter (2–5 min, re-rolled per tick) that de-synchronizes the fleet at the source. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
parent
349c15d944
commit
645feef9aa
1 changed files with 45 additions and 29 deletions
74
main.py
74
main.py
|
|
@ -2560,6 +2560,14 @@ _inventory_delta_tasks: set[asyncio.Task] = set()
|
|||
# while deltas for DIFFERENT characters still run concurrently.
|
||||
_inventory_char_locks: Dict[str, asyncio.Lock] = {}
|
||||
|
||||
# Global cap on concurrent inventory forwards. The plugin debounces inventory
|
||||
# updates into a periodic per-character flush; if many characters' flush timers
|
||||
# align (e.g. after a fleet-wide auto-update relog wave) the forwards arrive as
|
||||
# a synchronized burst. Without a cap that burst floods the single event loop
|
||||
# and httpx pool, starving telemetry processing → the player count flaps.
|
||||
# This bounds how many forwards run at once so telemetry always gets the loop.
|
||||
_inventory_forward_sem = asyncio.Semaphore(8)
|
||||
|
||||
|
||||
def _get_inventory_char_lock(char_name: str) -> asyncio.Lock:
|
||||
lock = _inventory_char_locks.get(char_name)
|
||||
|
|
@ -2592,39 +2600,47 @@ async def _do_handle_inventory_delta(data: dict):
|
|||
char_name = data.get("character_name", "unknown")
|
||||
global _inventory_http_client
|
||||
if _inventory_http_client is None:
|
||||
_inventory_http_client = httpx.AsyncClient(timeout=10.0)
|
||||
# Bounded pool + short timeout: a stale/slow connection to
|
||||
# inventory-service can't tie up a forward slot for long.
|
||||
_inventory_http_client = httpx.AsyncClient(
|
||||
timeout=httpx.Timeout(5.0, connect=2.0),
|
||||
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
|
||||
)
|
||||
client = _inventory_http_client
|
||||
|
||||
if action == "remove":
|
||||
item_id = data.get("item_id")
|
||||
if item_id is not None:
|
||||
resp = await client.delete(
|
||||
f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item/{item_id}"
|
||||
)
|
||||
if resp.status_code >= 400:
|
||||
logger.warning(
|
||||
f"Inventory service returned {resp.status_code} for delta remove item_id={item_id}"
|
||||
# Cap concurrent forwards (see _inventory_forward_sem) so a synchronized
|
||||
# flush burst can't monopolize the loop and starve telemetry.
|
||||
async with _inventory_forward_sem:
|
||||
if action == "remove":
|
||||
item_id = data.get("item_id")
|
||||
if item_id is not None:
|
||||
resp = await client.delete(
|
||||
f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item/{item_id}"
|
||||
)
|
||||
elif action in ("add", "update"):
|
||||
item = data.get("item")
|
||||
if item:
|
||||
resp = await client.post(
|
||||
f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item",
|
||||
json=item,
|
||||
)
|
||||
if resp.status_code < 400:
|
||||
enriched_item = resp.json().get("item")
|
||||
if enriched_item:
|
||||
data = {
|
||||
"type": "inventory_delta",
|
||||
"action": action,
|
||||
"character_name": char_name,
|
||||
"item": enriched_item,
|
||||
}
|
||||
else:
|
||||
logger.warning(
|
||||
f"Inventory service returned {resp.status_code} for delta {action}"
|
||||
if resp.status_code >= 400:
|
||||
logger.warning(
|
||||
f"Inventory service returned {resp.status_code} for delta remove item_id={item_id}"
|
||||
)
|
||||
elif action in ("add", "update"):
|
||||
item = data.get("item")
|
||||
if item:
|
||||
resp = await client.post(
|
||||
f"{INVENTORY_SERVICE_URL}/inventory/{char_name}/item",
|
||||
json=item,
|
||||
)
|
||||
if resp.status_code < 400:
|
||||
enriched_item = resp.json().get("item")
|
||||
if enriched_item:
|
||||
data = {
|
||||
"type": "inventory_delta",
|
||||
"action": action,
|
||||
"character_name": char_name,
|
||||
"item": enriched_item,
|
||||
}
|
||||
else:
|
||||
logger.warning(
|
||||
f"Inventory service returned {resp.status_code} for delta {action}"
|
||||
)
|
||||
|
||||
# Broadcast delta to all browser clients
|
||||
await _broadcast_to_browser_clients(data)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue