perf(broadcast): serialize once with stdlib json, drop jsonable_encoder from hot path
Every browser broadcast ran jsonable_encoder (slow recursive encode) and then re-serialized per client via send_json — so a payload to N browsers was encoded N+1 times, on the same single event-loop core that the telemetry/ inventory firehose already saturates. Now serialize ONCE with json.dumps + a datetime-aware default (_json_default mirrors jsonable_encoder for the types that actually appear: datetime, Enum, Decimal, set, bytes) and send the prebuilt string to every client via send_text. Verified the wire output parses identically to the old path. Pure backend change — no plugin, no frontend, no schema change; stdlib only so it deploys via restart with no image rebuild / dependency churn. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
parent
d86bc48862
commit
349c15d944
1 changed files with 44 additions and 11 deletions
55
main.py
55
main.py
|
|
@ -8,6 +8,8 @@ endpoints for browser clients to retrieve live and historical data, trails, and
|
|||
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from decimal import Decimal
|
||||
from enum import Enum
|
||||
import hmac
|
||||
import html as _html
|
||||
import ipaddress
|
||||
|
|
@ -2859,10 +2861,33 @@ def _update_vital_sharing_peer_state(msg_type: str, data: dict) -> None:
|
|||
entry["items"] = data.get("items")
|
||||
|
||||
|
||||
async def _send_to_browser(ws: WebSocket, data: dict) -> WebSocket | None:
|
||||
"""Send data to a single browser client. Returns the ws if it failed, None if ok."""
|
||||
def _json_default(o):
|
||||
"""Fallback serializer for json.dumps on the broadcast path.
|
||||
|
||||
Mirrors what jsonable_encoder did for the types that actually appear in
|
||||
broadcast payloads (datetime → ISO string), plus a few defensive cases,
|
||||
but WITHOUT jsonable_encoder's slow recursive walk of the whole structure.
|
||||
"""
|
||||
if isinstance(o, datetime):
|
||||
return o.isoformat()
|
||||
if isinstance(o, Enum):
|
||||
return o.value
|
||||
if isinstance(o, Decimal):
|
||||
return float(o)
|
||||
if isinstance(o, (set, frozenset)):
|
||||
return list(o)
|
||||
if isinstance(o, bytes):
|
||||
return o.decode("utf-8", "replace")
|
||||
return str(o)
|
||||
|
||||
|
||||
async def _send_to_browser(ws: WebSocket, text: str) -> WebSocket | None:
|
||||
"""Send a pre-serialized JSON string to one browser client.
|
||||
|
||||
Returns the ws if the send failed (so the caller drops it), None if ok.
|
||||
"""
|
||||
try:
|
||||
await asyncio.wait_for(ws.send_json(data), timeout=1.0)
|
||||
await asyncio.wait_for(ws.send_text(text), timeout=1.0)
|
||||
except (WebSocketDisconnect, RuntimeError, ConnectionAbortedError) as e:
|
||||
logger.debug(f"Detected disconnected browser client: {e}")
|
||||
return ws
|
||||
|
|
@ -2877,13 +2902,13 @@ async def _send_to_browser(ws: WebSocket, data: dict) -> WebSocket | None:
|
|||
return None
|
||||
|
||||
|
||||
async def _do_broadcast(data: dict):
|
||||
"""Send data to all browser clients concurrently. Runs as a background task.
|
||||
async def _do_broadcast(msg_type, text: str):
|
||||
"""Send a pre-serialized message to all subscribed browser clients
|
||||
concurrently. Runs as a background task.
|
||||
|
||||
Respects per-client message type filters: clients that sent a ``subscribe``
|
||||
message only receive the types they asked for.
|
||||
"""
|
||||
msg_type = data.get("type")
|
||||
# Build list of clients that should receive this message
|
||||
targets = []
|
||||
for ws, allowed_types in list(browser_conns.items()):
|
||||
|
|
@ -2891,7 +2916,7 @@ async def _do_broadcast(data: dict):
|
|||
targets.append(ws)
|
||||
if not targets:
|
||||
return
|
||||
results = await asyncio.gather(*(_send_to_browser(ws, data) for ws in targets))
|
||||
results = await asyncio.gather(*(_send_to_browser(ws, text) for ws in targets))
|
||||
for ws in results:
|
||||
if ws is not None:
|
||||
browser_conns.pop(ws, None)
|
||||
|
|
@ -2900,11 +2925,19 @@ async def _do_broadcast(data: dict):
|
|||
async def _broadcast_to_browser_clients(snapshot: dict):
|
||||
"""Broadcast a telemetry or chat message to all connected browser clients.
|
||||
|
||||
Fires off a background task so the plugin receive loop is never blocked
|
||||
by slow browser connections.
|
||||
Serializes the payload to JSON ONCE here (stdlib json + a datetime-aware
|
||||
default) instead of the old jsonable_encoder + per-client ``send_json``,
|
||||
which re-encoded the whole dict for every connected browser. Fires off a
|
||||
background task so the plugin receive loop is never blocked by slow
|
||||
browser connections.
|
||||
"""
|
||||
data = jsonable_encoder(snapshot)
|
||||
task = asyncio.create_task(_do_broadcast(data))
|
||||
try:
|
||||
text = json.dumps(snapshot, default=_json_default, separators=(",", ":"))
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to encode broadcast payload: {e}")
|
||||
return
|
||||
msg_type = snapshot.get("type")
|
||||
task = asyncio.create_task(_do_broadcast(msg_type, text))
|
||||
_broadcast_tasks.add(task)
|
||||
task.add_done_callback(_broadcast_tasks.discard)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue