Debug and inventory

This commit is contained in:
erik 2025-06-19 17:46:19 +00:00
parent 1febf6e918
commit 80a0a16bab
15 changed files with 2764 additions and 341 deletions

439
main.py
View file

@ -10,6 +10,7 @@ import json
import logging
import os
import sys
import time
from typing import Dict, List, Any
from pathlib import Path
@ -60,6 +61,249 @@ _cached_total_rares: dict = {"all_time": 0, "today": 0, "last_updated": None}
_cache_task: asyncio.Task | None = None
_rares_cache_task: asyncio.Task | None = None
# Player tracking for debug purposes
_player_history: list = [] # List of player sets from last 10 refreshes
_player_events: list = [] # List of player enter/exit events
_max_history_size = 10 # Keep last 10 player sets
_max_events_size = 100 # Keep last 100 events
# Telemetry timing tracking for debug purposes
_player_telemetry_times: dict = {} # character_name -> list of timestamps
_max_telemetry_history = 20 # Keep last 20 telemetry timestamps per player
# Simple WebSocket connection counters (Phase 1)
_plugin_connections = 0
_browser_connections = 0
# Simple database query performance counters (Phase 2)
_total_queries = 0
_total_query_time = 0.0
# Simple recent activity tracking (Phase 3)
_recent_telemetry_messages = []
_max_recent_messages = 50
def _track_player_changes(new_players: list) -> None:
"""Track player changes for debugging flapping issues."""
from datetime import datetime, timezone
# Get current player names
current_players = {p["character_name"] for p in new_players}
timestamp = datetime.now(timezone.utc)
# Track telemetry timing for each player
for player_data in new_players:
player_name = player_data["character_name"]
player_timestamp = player_data.get("timestamp")
# Convert timestamp if it's a string
if isinstance(player_timestamp, str):
try:
player_timestamp = datetime.fromisoformat(player_timestamp.replace('Z', '+00:00'))
except:
player_timestamp = timestamp
elif player_timestamp is None:
player_timestamp = timestamp
# Initialize player telemetry tracking if needed
if player_name not in _player_telemetry_times:
_player_telemetry_times[player_name] = []
# Add this telemetry timestamp
_player_telemetry_times[player_name].append(player_timestamp)
# Trim to max history
if len(_player_telemetry_times[player_name]) > _max_telemetry_history:
_player_telemetry_times[player_name].pop(0)
# Get previous player names if we have history
previous_players = set()
if _player_history:
previous_players = {p["character_name"] for p in _player_history[-1]["players"]}
# Find players who entered and exited
entered_players = current_players - previous_players
exited_players = previous_players - current_players
# Log events with telemetry timing analysis
for player in entered_players:
# Check if this is due to timing gap
timing_gap = None
if player in _player_telemetry_times and len(_player_telemetry_times[player]) >= 2:
last_two = _player_telemetry_times[player][-2:]
timing_gap = (last_two[1] - last_two[0]).total_seconds()
event = {
"timestamp": timestamp,
"type": "enter",
"character_name": player,
"total_players": len(current_players),
"timing_gap": timing_gap
}
_player_events.append(event)
gap_info = f" (gap: {timing_gap:.1f}s)" if timing_gap and timing_gap > 25 else ""
logger.debug(f"Player entered: {player} (total: {len(current_players)}){gap_info}")
for player in exited_players:
# Calculate time since last telemetry
last_telemetry_age = None
if player in _player_telemetry_times and _player_telemetry_times[player]:
last_telemetry = _player_telemetry_times[player][-1]
last_telemetry_age = (timestamp - last_telemetry).total_seconds()
event = {
"timestamp": timestamp,
"type": "exit",
"character_name": player,
"total_players": len(current_players),
"last_telemetry_age": last_telemetry_age
}
_player_events.append(event)
age_info = f" (last telemetry: {last_telemetry_age:.1f}s ago)" if last_telemetry_age else ""
logger.debug(f"Player exited: {player} (total: {len(current_players)}){age_info}")
# Add current state to history
history_entry = {
"timestamp": timestamp,
"players": new_players,
"player_count": len(new_players),
"player_names": list(current_players)
}
_player_history.append(history_entry)
# Trim history to max size
if len(_player_history) > _max_history_size:
_player_history.pop(0)
# Trim events to max size
if len(_player_events) > _max_events_size:
_player_events.pop(0)
def _analyze_flapping_patterns() -> dict:
"""Analyze player events to identify flapping patterns."""
from collections import Counter, defaultdict
if not _player_events:
return {"flapping_players": [], "frequent_events": [], "analysis": "No events to analyze"}
# Count events per player
player_event_counts = Counter()
player_flap_counts = defaultdict(int)
# Track recent activity per player (last 10 events)
recent_player_activity = defaultdict(list)
for event in _player_events[-50:]: # Analyze last 50 events
player = event["character_name"]
event_type = event["type"]
player_event_counts[player] += 1
recent_player_activity[player].append(event_type)
# Identify flapping players (players with many enter/exit cycles)
flapping_players = []
for player, activity in recent_player_activity.items():
if len(activity) >= 4: # At least 4 events
# Count alternating enter/exit patterns
flap_score = 0
for i in range(1, len(activity)):
if activity[i] != activity[i-1]: # Different from previous
flap_score += 1
if flap_score >= 3: # At least 3 transitions
flapping_players.append({
"character_name": player,
"events": len(activity),
"flap_score": flap_score,
"recent_activity": activity[-10:] # Last 10 events
})
# Sort by flap score
flapping_players.sort(key=lambda x: x["flap_score"], reverse=True)
# Most active players
frequent_events = [
{"character_name": player, "event_count": count}
for player, count in player_event_counts.most_common(10)
]
# Recent activity summary
recent_enters = sum(1 for e in _player_events[-20:] if e["type"] == "enter")
recent_exits = sum(1 for e in _player_events[-20:] if e["type"] == "exit")
return {
"flapping_players": flapping_players,
"frequent_events": frequent_events,
"recent_activity": {
"enters": recent_enters,
"exits": recent_exits,
"net_change": recent_enters - recent_exits
},
"analysis": f"Found {len(flapping_players)} potentially flapping players"
}
def _analyze_telemetry_timing() -> dict:
"""Analyze telemetry timing patterns for all players."""
from datetime import datetime, timezone
timing_analysis = {}
problem_players = []
for player_name, timestamps in _player_telemetry_times.items():
if len(timestamps) < 2:
continue
# Calculate intervals between telemetry messages
intervals = []
for i in range(1, len(timestamps)):
interval = (timestamps[i] - timestamps[i-1]).total_seconds()
intervals.append(interval)
if not intervals:
continue
# Calculate timing statistics
avg_interval = sum(intervals) / len(intervals)
min_interval = min(intervals)
max_interval = max(intervals)
# Count problematic intervals (>30s)
long_gaps = [i for i in intervals if i > 30]
recent_long_gaps = [i for i in intervals[-5:] if i > 30] # Last 5 intervals
# Determine if this player has timing issues
has_timing_issues = len(long_gaps) > 0 or max_interval > 35
timing_stats = {
"character_name": player_name,
"total_messages": len(timestamps),
"avg_interval": round(avg_interval, 1),
"min_interval": round(min_interval, 1),
"max_interval": round(max_interval, 1),
"long_gaps_count": len(long_gaps),
"recent_long_gaps": len(recent_long_gaps),
"last_message_age": (datetime.now(timezone.utc) - timestamps[-1]).total_seconds() if timestamps else 0,
"has_timing_issues": has_timing_issues,
"recent_intervals": [round(i, 1) for i in intervals[-5:]] # Last 5 intervals
}
timing_analysis[player_name] = timing_stats
if has_timing_issues:
problem_players.append(timing_stats)
# Sort problem players by severity (max interval)
problem_players.sort(key=lambda x: x["max_interval"], reverse=True)
return {
"all_players": timing_analysis,
"problem_players": problem_players,
"summary": {
"total_tracked_players": len(timing_analysis),
"players_with_issues": len(problem_players),
"avg_intervals": [stats["avg_interval"] for stats in timing_analysis.values()],
}
}
async def _refresh_cache_loop() -> None:
"""Background task: refresh `/live` and `/trails` caches every 5 seconds."""
consecutive_failures = 0
@ -92,7 +336,12 @@ async def _refresh_cache_loop() -> None:
# Use a single connection for both queries to reduce connection churn
async with database.connection() as conn:
rows = await conn.fetch_all(sql_live, {"cutoff": cutoff})
_cached_live["players"] = [dict(r) for r in rows]
new_players = [dict(r) for r in rows]
# Track player changes for debugging
_track_player_changes(new_players)
_cached_live["players"] = new_players
# Recompute trails (last 600s)
cutoff2 = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=600)
@ -347,6 +596,104 @@ async def on_shutdown():
def debug():
return {"status": "OK"}
@app.get("/debug/player-flapping")
async def get_player_flapping_debug():
"""Return player tracking data for debugging flapping issues."""
try:
# Analyze flapping patterns
flapping_analysis = _analyze_flapping_patterns()
# Analyze telemetry timing
timing_analysis = _analyze_telemetry_timing()
# Get recent events (last 50)
recent_events = _player_events[-50:] if len(_player_events) > 50 else _player_events
# Convert timestamps to ISO format for JSON serialization
formatted_events = []
for event in recent_events:
formatted_event = event.copy()
formatted_event["timestamp"] = event["timestamp"].isoformat()
formatted_events.append(formatted_event)
# Format history
formatted_history = []
for entry in _player_history:
formatted_entry = {
"timestamp": entry["timestamp"].isoformat(),
"player_count": entry["player_count"],
"player_names": entry["player_names"]
}
formatted_history.append(formatted_entry)
# Format timing data for JSON serialization
formatted_timing = {}
for player_name, timing_data in timing_analysis["all_players"].items():
formatted_timing[player_name] = timing_data.copy()
# Round last_message_age for readability
formatted_timing[player_name]["last_message_age"] = round(timing_data["last_message_age"], 1)
return {
"current_players": len(_cached_live.get("players", [])),
"history": formatted_history,
"recent_events": formatted_events,
"flapping_analysis": flapping_analysis,
"timing_analysis": {
"all_players": formatted_timing,
"problem_players": timing_analysis["problem_players"],
"summary": timing_analysis["summary"]
},
"tracking_stats": {
"history_entries": len(_player_history),
"total_events": len(_player_events),
"tracked_players": len(_player_telemetry_times),
"max_history_size": _max_history_size,
"max_events_size": _max_events_size
}
}
except Exception as e:
logger.error(f"Failed to get player flapping debug data: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/debug/websocket-health")
async def get_websocket_health():
"""Return simple WebSocket connection counts."""
try:
return {
"plugin_connections": _plugin_connections,
"browser_connections": _browser_connections,
"total_connections": _plugin_connections + _browser_connections
}
except Exception as e:
logger.error(f"Failed to get WebSocket health data: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/debug/database-performance")
async def get_database_performance():
"""Return simple database query performance statistics."""
try:
avg_query_time = (_total_query_time / _total_queries) if _total_queries > 0 else 0.0
return {
"total_queries": _total_queries,
"total_query_time": round(_total_query_time, 3),
"average_query_time": round(avg_query_time, 3)
}
except Exception as e:
logger.error(f"Failed to get database performance data: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/debug/recent-activity")
async def get_recent_activity():
"""Return recent telemetry activity feed."""
try:
return {
"recent_messages": _recent_telemetry_messages.copy(),
"total_messages": len(_recent_telemetry_messages),
"max_messages": _max_recent_messages
}
except Exception as e:
logger.error(f"Failed to get recent activity data: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/live", response_model=dict)
@app.get("/live/", response_model=dict)
@ -899,6 +1246,8 @@ async def ws_receive_snapshots(
- rare: update total and session rare counts, persist event
- chat: broadcast chat messages to browsers
"""
global _plugin_connections
# Authenticate plugin connection using shared secret
key = secret or x_plugin_secret
if key != SHARED_SECRET:
@ -908,7 +1257,11 @@ async def ws_receive_snapshots(
return
# Accept the WebSocket connection
await websocket.accept()
logger.info(f"Plugin WebSocket connected: {websocket.client}")
logger.info(f"🔌 PLUGIN_CONNECTED: {websocket.client}")
# Track plugin connection
_plugin_connections += 1
try:
while True:
# Read next text frame
@ -917,7 +1270,7 @@ async def ws_receive_snapshots(
# Debug: log all incoming plugin WebSocket messages
logger.debug(f"Plugin WebSocket RX from {websocket.client}: {raw}")
except WebSocketDisconnect:
logger.info(f"Plugin WebSocket disconnected: {websocket.client}")
logger.info(f"🔌 PLUGIN_DISCONNECTED: {websocket.client}")
break
# Parse JSON payload
try:
@ -931,7 +1284,7 @@ async def ws_receive_snapshots(
name = data.get("character_name") or data.get("player_name")
if isinstance(name, str):
plugin_conns[name] = websocket
logger.info(f"Registered plugin connection for character: {name}")
logger.info(f"📋 PLUGIN_REGISTERED: {name} from {websocket.client}")
continue
# --- Spawn event: persist to spawn_events table ---
if msg_type == "spawn":
@ -952,6 +1305,12 @@ async def ws_receive_snapshots(
# Parse telemetry snapshot and update in-memory state
payload = data.copy()
payload.pop("type", None)
character_name = payload.get('character_name', 'unknown')
# Track message receipt and start timing
telemetry_start_time = time.time()
logger.info(f"📨 TELEMETRY_RECEIVED: {character_name} from {websocket.client}")
try:
snap = TelemetrySnapshot.parse_obj(payload)
live_snapshots[snap.character_name] = snap.dict()
@ -974,6 +1333,16 @@ async def ws_receive_snapshots(
delta = snap.kills - last
# Persist snapshot and any kill delta in a single transaction
db_start_time = time.time()
# Log connection pool status before database operation
try:
pool_status = f"pool_size:{database._pool._queue.qsize()}" if hasattr(database, '_pool') and hasattr(database._pool, '_queue') else "pool_status:unknown"
except:
pool_status = "pool_status:error"
logger.info(f"💾 TELEMETRY_DB_WRITE_ATTEMPT: {snap.character_name} session:{snap.session_id[:8]} kills:{snap.kills} delta:{delta} {pool_status}")
try:
async with database.transaction():
await database.execute(
@ -989,15 +1358,60 @@ async def ws_receive_snapshots(
)
await database.execute(stmt)
logger.debug(f"Updated kills for {snap.character_name}: +{delta} (total from {last} to {snap.kills})")
# Success: log timing and update cache
db_duration = (time.time() - db_start_time) * 1000
ws_receive_snapshots._last_kills[key] = snap.kills
# Track database performance (Phase 2)
global _total_queries, _total_query_time
_total_queries += 1
_total_query_time += db_duration / 1000.0 # Convert ms to seconds
# Track recent activity (Phase 3)
global _recent_telemetry_messages, _max_recent_messages
activity_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"character_name": snap.character_name,
"kills": snap.kills,
"kill_delta": delta,
"query_time": round(db_duration, 1)
}
_recent_telemetry_messages.append(activity_entry)
if len(_recent_telemetry_messages) > _max_recent_messages:
_recent_telemetry_messages.pop(0)
# Log final pool status after successful operation
try:
final_pool_status = f"pool_size:{database._pool._queue.qsize()}" if hasattr(database, '_pool') and hasattr(database._pool, '_queue') else "pool_status:unknown"
except:
final_pool_status = "pool_status:error"
logger.info(f"✅ TELEMETRY_DB_WRITE_SUCCESS: {snap.character_name} took {db_duration:.1f}ms {final_pool_status}")
except Exception as db_error:
logger.error(f"💾 Database transaction failed for {snap.character_name} (session: {snap.session_id[:8]}): {db_error}", exc_info=True)
db_duration = (time.time() - db_start_time) * 1000
# Log pool status during failure
try:
error_pool_status = f"pool_size:{database._pool._queue.qsize()}" if hasattr(database, '_pool') and hasattr(database._pool, '_queue') else "pool_status:unknown"
except:
error_pool_status = "pool_status:error"
logger.error(f"❌ TELEMETRY_DB_WRITE_FAILED: {snap.character_name} session:{snap.session_id[:8]} took {db_duration:.1f}ms {error_pool_status} error:{db_error}", exc_info=True)
continue
# Broadcast updated snapshot to all browser clients
await _broadcast_to_browser_clients(snap.dict())
logger.debug(f"✅ Processed telemetry from {snap.character_name} (session: {snap.session_id[:8]}, kills: {snap.kills})")
# Log successful processing completion with timing
total_duration = (time.time() - telemetry_start_time) * 1000
logger.info(f"⏱️ TELEMETRY_PROCESSING_COMPLETE: {snap.character_name} took {total_duration:.1f}ms total")
except Exception as e:
logger.error(f"❌ Failed to process telemetry event from {data.get('character_name', 'unknown')}: {e}", exc_info=True)
total_duration = (time.time() - telemetry_start_time) * 1000
logger.error(f"❌ TELEMETRY_PROCESSING_FAILED: {character_name} took {total_duration:.1f}ms error:{e}", exc_info=True)
continue
# --- Rare event: update total and session counters and persist ---
if msg_type == "rare":
@ -1082,6 +1496,9 @@ async def ws_receive_snapshots(
if msg_type:
logger.warning(f"Unknown message type '{msg_type}' from {websocket.client}")
finally:
# Track plugin disconnection
_plugin_connections = max(0, _plugin_connections - 1)
# Clean up any plugin registrations for this socket
to_remove = [n for n, ws in plugin_conns.items() if ws is websocket]
for n in to_remove:
@ -1153,10 +1570,15 @@ async def ws_live_updates(websocket: WebSocket):
Manages a set of connected browser clients; listens for incoming command messages
and forwards them to the appropriate plugin client WebSocket.
"""
global _browser_connections
# Add new browser client to the set
await websocket.accept()
browser_conns.add(websocket)
logger.info(f"Browser WebSocket connected: {websocket.client}")
# Track browser connection
_browser_connections += 1
try:
while True:
# Receive command messages from browser
@ -1198,6 +1620,9 @@ async def ws_live_updates(websocket: WebSocket):
except WebSocketDisconnect:
pass
finally:
# Track browser disconnection
_browser_connections = max(0, _browser_connections - 1)
browser_conns.discard(websocket)
logger.debug(f"Removed browser WebSocket from connection pool: {websocket.client}")