2251 lines
97 KiB
Python
2251 lines
97 KiB
Python
"""
|
||
main.py - FastAPI-based telemetry server for Dereth Tracker.
|
||
|
||
This service ingests real-time position and event data from plugin clients via WebSockets,
|
||
stores telemetry and statistics in a TimescaleDB backend, and exposes HTTP and WebSocket
|
||
endpoints for browser clients to retrieve live and historical data, trails, and per-character stats.
|
||
"""
|
||
from datetime import datetime, timedelta, timezone
|
||
import json
|
||
import logging
|
||
import os
|
||
import sys
|
||
import time
|
||
from typing import Dict, List, Any
|
||
from pathlib import Path
|
||
import asyncio
|
||
import socket
|
||
import struct
|
||
|
||
from fastapi import FastAPI, Header, HTTPException, Query, WebSocket, WebSocketDisconnect, Request
|
||
from fastapi.responses import JSONResponse, Response
|
||
from fastapi.routing import APIRoute
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.encoders import jsonable_encoder
|
||
from pydantic import BaseModel
|
||
from typing import Optional
|
||
import httpx
|
||
|
||
# Async database support
|
||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||
from db_async import (
|
||
database,
|
||
telemetry_events,
|
||
char_stats,
|
||
rare_stats,
|
||
rare_stats_sessions,
|
||
spawn_events,
|
||
rare_events,
|
||
character_inventories,
|
||
portal_discoveries,
|
||
server_health_checks,
|
||
server_status,
|
||
init_db_async,
|
||
cleanup_old_portals
|
||
)
|
||
import asyncio
|
||
|
||
# Configure logging
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.StreamHandler(sys.stdout),
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Get log level from environment (DEBUG, INFO, WARNING, ERROR)
|
||
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
|
||
logger.setLevel(getattr(logging, log_level, logging.INFO))
|
||
|
||
# Inventory service configuration
|
||
INVENTORY_SERVICE_URL = os.getenv('INVENTORY_SERVICE_URL', 'http://inventory-service:8000')
|
||
# In-memory caches for REST endpoints
|
||
_cached_live: dict = {"players": []}
|
||
_cached_trails: dict = {"trails": []}
|
||
_cached_total_rares: dict = {"all_time": 0, "today": 0, "last_updated": None}
|
||
_cache_task: asyncio.Task | None = None
|
||
_rares_cache_task: asyncio.Task | None = None
|
||
|
||
# Player tracking for debug purposes
|
||
_player_history: list = [] # List of player sets from last 10 refreshes
|
||
_player_events: list = [] # List of player enter/exit events
|
||
_max_history_size = 10 # Keep last 10 player sets
|
||
_max_events_size = 100 # Keep last 100 events
|
||
|
||
# Telemetry timing tracking for debug purposes
|
||
_player_telemetry_times: dict = {} # character_name -> list of timestamps
|
||
_max_telemetry_history = 20 # Keep last 20 telemetry timestamps per player
|
||
|
||
# Simple WebSocket connection counters (Phase 1)
|
||
_plugin_connections = 0
|
||
_browser_connections = 0
|
||
|
||
# Simple database query performance counters (Phase 2)
|
||
_total_queries = 0
|
||
_total_query_time = 0.0
|
||
|
||
# Simple recent activity tracking (Phase 3)
|
||
_recent_telemetry_messages = []
|
||
_max_recent_messages = 50
|
||
|
||
# Server health monitoring
|
||
_server_health_task = None
|
||
_server_status_cache = {
|
||
"status": "unknown",
|
||
"latency_ms": None,
|
||
"player_count": None,
|
||
"last_check": None,
|
||
"uptime_seconds": 0,
|
||
"last_restart": None
|
||
}
|
||
|
||
# Quest status cache - stores last received quest data per player
|
||
# Structure: {character_name: {quest_name: countdown_value}}
|
||
_quest_status_cache: Dict[str, Dict[str, str]] = {}
|
||
|
||
# AC Hash32 checksum algorithm (based on ThwargLauncher)
|
||
def calculate_hash32(data: bytes) -> int:
|
||
"""Calculate AC Hash32 checksum as used in ThwargLauncher."""
|
||
length = len(data)
|
||
checksum = (length << 16) & 0xFFFFFFFF
|
||
|
||
# Process 4-byte chunks
|
||
for i in range(0, length - 3, 4):
|
||
chunk = struct.unpack('<I', data[i:i+4])[0]
|
||
checksum = (checksum + chunk) & 0xFFFFFFFF
|
||
|
||
# Handle remaining bytes
|
||
remaining_start = (length // 4) * 4
|
||
shift = 24
|
||
for i in range(remaining_start, length):
|
||
byte_val = data[i] << shift
|
||
checksum = (checksum + byte_val) & 0xFFFFFFFF
|
||
shift -= 8
|
||
|
||
return checksum
|
||
|
||
# Create AC EchoRequest packet for server health check (based on ThwargLauncher)
|
||
def create_echo_request_packet():
|
||
"""Create an AC EchoRequest packet for server health checking."""
|
||
# AC packet header: sequence(4) + flags(4) + checksum(4) + id(2) + time(2) + size(2) + table(2) = 20 bytes + padding
|
||
packet = bytearray(32) # 32 bytes total (0x20)
|
||
|
||
# Sequence (4 bytes) - can be 0
|
||
struct.pack_into('<I', packet, 0, 0)
|
||
|
||
# Flags (4 bytes) - EchoRequest = 0x02000000
|
||
struct.pack_into('<I', packet, 4, 0x02000000)
|
||
|
||
# Temporary checksum (4 bytes) - required for proper checksum calculation
|
||
struct.pack_into('<I', packet, 8, 0x0BADD70D)
|
||
|
||
# ID (2 bytes) - can be 0
|
||
struct.pack_into('<H', packet, 12, 0)
|
||
|
||
# Time (2 bytes) - can be 0
|
||
struct.pack_into('<H', packet, 14, 0)
|
||
|
||
# Size (2 bytes) - header size = 32 (0x20)
|
||
struct.pack_into('<H', packet, 16, 32)
|
||
|
||
# Table (2 bytes) - can be 0
|
||
struct.pack_into('<H', packet, 18, 0)
|
||
|
||
# Calculate proper AC Hash32 checksum
|
||
# First, set checksum field to 0
|
||
struct.pack_into('<I', packet, 8, 0)
|
||
|
||
# Calculate checksum using Hash32 algorithm
|
||
checksum = calculate_hash32(bytes(packet))
|
||
struct.pack_into('<I', packet, 8, checksum)
|
||
|
||
return bytes(packet)
|
||
|
||
AC_ECHO_PACKET = create_echo_request_packet()
|
||
|
||
# AC login packet for server health check (same as ThwargLauncher MakeLoginPacket)
|
||
AC_LOGIN_PACKET = bytes([
|
||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x93, 0x00, 0xd0, 0x05,
|
||
0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x04, 0x00, 0x31, 0x38,
|
||
0x30, 0x32, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
|
||
0x00, 0x00, 0x00, 0x00, 0x3e, 0xb8, 0xa8, 0x58, 0x1c, 0x00, 0x61, 0x63,
|
||
0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x65,
|
||
0x72, 0x3a, 0x6a, 0x6a, 0x39, 0x68, 0x32, 0x36, 0x68, 0x63, 0x73, 0x67,
|
||
0x67, 0x63, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
|
||
])
|
||
|
||
async def check_server_health(address: str, port: int, timeout: float = 3.0) -> tuple[bool, float, int]:
|
||
"""Check AC server health via UDP packet with retry logic.
|
||
|
||
Retries 6 times with 5-second delays before declaring server down.
|
||
Returns: (is_up, latency_ms, player_count)
|
||
"""
|
||
max_retries = 6
|
||
retry_delay = 5.0
|
||
|
||
for attempt in range(max_retries):
|
||
logger.debug(f"🔍 Health check attempt {attempt + 1}/{max_retries} for {address}:{port}")
|
||
start_time = time.time()
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
sock.setblocking(False)
|
||
|
||
try:
|
||
# Send login packet (same as ThwargLauncher)
|
||
await asyncio.get_event_loop().sock_sendto(sock, AC_LOGIN_PACKET, (address, port))
|
||
|
||
# Wait for response with timeout
|
||
try:
|
||
data, addr = await asyncio.wait_for(
|
||
asyncio.get_event_loop().sock_recvfrom(sock, 1024),
|
||
timeout=timeout
|
||
)
|
||
|
||
latency_ms = (time.time() - start_time) * 1000
|
||
logger.debug(f"📥 Received response from {addr}: {len(data)} bytes, latency: {latency_ms:.1f}ms")
|
||
|
||
# Check if valid response (support both TimeSynch 0x800000 and ConnectRequest 0x40000)
|
||
if len(data) >= 24:
|
||
flags = struct.unpack('<I', data[4:8])[0]
|
||
|
||
# Accept both TimeSynch (0x800000) and ConnectRequest (0x40000) as valid responses
|
||
if (flags & 0x800000) or (flags & 0x40000):
|
||
# UDP health check is for server status and latency only
|
||
# Player count comes from TreeStats.net API (like ThwargLauncher)
|
||
logger.debug(f"✅ Valid server response: latency: {latency_ms:.1f}ms")
|
||
return True, latency_ms, None
|
||
|
||
# Any response indicates server is up, even if not the expected format
|
||
logger.info(f"✅ Server response (non-standard format): latency: {latency_ms:.1f}ms")
|
||
return True, latency_ms, None
|
||
|
||
except asyncio.TimeoutError:
|
||
logger.debug(f"⏰ TIMEOUT: No response from {address}:{port} after {timeout}s")
|
||
if attempt < max_retries - 1:
|
||
logger.debug(f"Retrying in {retry_delay} seconds...")
|
||
await asyncio.sleep(retry_delay)
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.error(f"Server health check error on attempt {attempt + 1}: {e}")
|
||
if attempt < max_retries - 1:
|
||
await asyncio.sleep(retry_delay)
|
||
continue
|
||
finally:
|
||
sock.close()
|
||
|
||
# Only declare down after all retries fail
|
||
logger.warning(f"❌ Server {address}:{port} is DOWN after {max_retries} attempts over {max_retries * retry_delay} seconds")
|
||
return False, None, None
|
||
|
||
async def get_player_count_from_treestats(server_name: str) -> int:
|
||
"""Get player count from TreeStats.net API (same as ThwargLauncher)."""
|
||
try:
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.get("http://treestats.net/player_counts-latest.json", timeout=10)
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
for server_data in data:
|
||
if server_data.get("server") == server_name:
|
||
return server_data.get("count", 0)
|
||
return 0
|
||
except Exception as e:
|
||
logger.debug(f"Failed to get player count from TreeStats.net: {e}")
|
||
return 0
|
||
|
||
async def monitor_server_health():
|
||
"""Background task to monitor server health every 30 seconds and cleanup old portals hourly."""
|
||
server_name = "Coldeve"
|
||
server_address = "play.coldeve.ac"
|
||
server_port = 9000
|
||
check_interval = 30 # seconds
|
||
player_count_interval = 300 # 5 minutes (like ThwargLauncher's 10 minutes, but more frequent)
|
||
portal_cleanup_interval = 3600 # 1 hour
|
||
last_player_count_check = 0
|
||
last_portal_cleanup = 0
|
||
current_player_count = None
|
||
|
||
# Initialize server status in database
|
||
try:
|
||
existing = await database.fetch_one(
|
||
"SELECT * FROM server_status WHERE server_name = :name",
|
||
{"name": server_name}
|
||
)
|
||
if not existing:
|
||
await database.execute(
|
||
server_status.insert().values(
|
||
server_name=server_name,
|
||
current_status="unknown",
|
||
total_uptime_seconds=0
|
||
)
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Failed to initialize server status: {e}")
|
||
|
||
while True:
|
||
try:
|
||
logger.debug(f"🏥 Running scheduled health check for {server_name} ({server_address}:{server_port})")
|
||
# Check server health via UDP (for status and latency)
|
||
is_up, latency_ms, _ = await check_server_health(server_address, server_port)
|
||
status = "up" if is_up else "down"
|
||
now = datetime.now(timezone.utc)
|
||
|
||
# Get player count from TreeStats.net API (like ThwargLauncher)
|
||
current_time = time.time()
|
||
if current_time - last_player_count_check >= player_count_interval or current_player_count is None:
|
||
new_player_count = await get_player_count_from_treestats(server_name)
|
||
if new_player_count > 0: # Only update if we got a valid count
|
||
current_player_count = new_player_count
|
||
last_player_count_check = current_time
|
||
logger.info(f"🏥 Updated player count from TreeStats.net: {current_player_count}")
|
||
|
||
logger.debug(f"🏥 Health check result: {status}, latency: {latency_ms}, players: {current_player_count}")
|
||
|
||
# Record health check
|
||
await database.execute(
|
||
server_health_checks.insert().values(
|
||
server_name=server_name,
|
||
server_address=f"{server_address}:{server_port}",
|
||
timestamp=now,
|
||
status=status,
|
||
latency_ms=latency_ms,
|
||
player_count=current_player_count
|
||
)
|
||
)
|
||
|
||
# Get previous status
|
||
prev_status = await database.fetch_one(
|
||
"SELECT * FROM server_status WHERE server_name = :name",
|
||
{"name": server_name}
|
||
)
|
||
|
||
# Calculate uptime and detect restarts
|
||
last_restart = prev_status["last_restart"] if prev_status else None
|
||
|
||
if prev_status and prev_status["current_status"] == "down" and status == "up":
|
||
# Server came back up - this is a restart
|
||
last_restart = now
|
||
logger.info(f"Server {server_name} came back online")
|
||
# Broadcast to all browser clients
|
||
await _broadcast_to_browser_clients({
|
||
"type": "server_status",
|
||
"server": server_name,
|
||
"status": "up",
|
||
"message": "Server is back online"
|
||
})
|
||
|
||
# Calculate uptime from last restart time (not accumulated)
|
||
if last_restart and status == "up":
|
||
uptime_seconds = int((now - last_restart).total_seconds())
|
||
else:
|
||
uptime_seconds = 0
|
||
|
||
# Update server status (always include current_player_count if we have it)
|
||
await database.execute(
|
||
"""
|
||
INSERT INTO server_status (server_name, current_status, last_seen_up, last_restart,
|
||
total_uptime_seconds, last_check, last_latency_ms, last_player_count)
|
||
VALUES (:name, :status, :last_seen, :restart, :uptime, :check, :latency, :players)
|
||
ON CONFLICT (server_name) DO UPDATE SET
|
||
current_status = :status,
|
||
last_seen_up = CASE WHEN :status = 'up' THEN :last_seen ELSE server_status.last_seen_up END,
|
||
last_restart = :restart,
|
||
total_uptime_seconds = :uptime,
|
||
last_check = :check,
|
||
last_latency_ms = :latency,
|
||
last_player_count = CASE WHEN :players IS NOT NULL THEN :players ELSE server_status.last_player_count END
|
||
""",
|
||
{
|
||
"name": server_name,
|
||
"status": status,
|
||
"last_seen": now if status == "up" else None,
|
||
"restart": last_restart,
|
||
"uptime": uptime_seconds,
|
||
"check": now,
|
||
"latency": latency_ms,
|
||
"players": current_player_count
|
||
}
|
||
)
|
||
|
||
# Update cache
|
||
global _server_status_cache
|
||
_server_status_cache = {
|
||
"status": status,
|
||
"latency_ms": latency_ms,
|
||
"player_count": current_player_count,
|
||
"last_check": now.isoformat(),
|
||
"uptime_seconds": uptime_seconds,
|
||
"last_restart": last_restart.isoformat() if last_restart else None
|
||
}
|
||
|
||
logger.debug(f"Server health check: {status}, latency={latency_ms}ms, players={current_player_count}")
|
||
|
||
# Portal cleanup (run every hour)
|
||
current_time = time.time()
|
||
if current_time - last_portal_cleanup >= portal_cleanup_interval:
|
||
try:
|
||
deleted_count = await cleanup_old_portals()
|
||
logger.info(f"Portal cleanup: removed {deleted_count} old portal discoveries")
|
||
last_portal_cleanup = current_time
|
||
except Exception as cleanup_error:
|
||
logger.error(f"Portal cleanup error: {cleanup_error}", exc_info=True)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Server health monitoring error: {e}", exc_info=True)
|
||
|
||
await asyncio.sleep(check_interval)
|
||
|
||
def _track_player_changes(new_players: list) -> None:
|
||
"""Track player changes for debugging flapping issues."""
|
||
from datetime import datetime, timezone
|
||
|
||
# Get current player names
|
||
current_players = {p["character_name"] for p in new_players}
|
||
timestamp = datetime.now(timezone.utc)
|
||
|
||
# Track telemetry timing for each player
|
||
for player_data in new_players:
|
||
player_name = player_data["character_name"]
|
||
player_timestamp = player_data.get("timestamp")
|
||
|
||
# Convert timestamp if it's a string
|
||
if isinstance(player_timestamp, str):
|
||
try:
|
||
player_timestamp = datetime.fromisoformat(player_timestamp.replace('Z', '+00:00'))
|
||
except:
|
||
player_timestamp = timestamp
|
||
elif player_timestamp is None:
|
||
player_timestamp = timestamp
|
||
|
||
# Initialize player telemetry tracking if needed
|
||
if player_name not in _player_telemetry_times:
|
||
_player_telemetry_times[player_name] = []
|
||
|
||
# Add this telemetry timestamp
|
||
_player_telemetry_times[player_name].append(player_timestamp)
|
||
|
||
# Trim to max history
|
||
if len(_player_telemetry_times[player_name]) > _max_telemetry_history:
|
||
_player_telemetry_times[player_name].pop(0)
|
||
|
||
# Get previous player names if we have history
|
||
previous_players = set()
|
||
if _player_history:
|
||
previous_players = {p["character_name"] for p in _player_history[-1]["players"]}
|
||
|
||
# Find players who entered and exited
|
||
entered_players = current_players - previous_players
|
||
exited_players = previous_players - current_players
|
||
|
||
# Log events with telemetry timing analysis
|
||
for player in entered_players:
|
||
# Check if this is due to timing gap
|
||
timing_gap = None
|
||
if player in _player_telemetry_times and len(_player_telemetry_times[player]) >= 2:
|
||
last_two = _player_telemetry_times[player][-2:]
|
||
timing_gap = (last_two[1] - last_two[0]).total_seconds()
|
||
|
||
event = {
|
||
"timestamp": timestamp,
|
||
"type": "enter",
|
||
"character_name": player,
|
||
"total_players": len(current_players),
|
||
"timing_gap": timing_gap
|
||
}
|
||
_player_events.append(event)
|
||
gap_info = f" (gap: {timing_gap:.1f}s)" if timing_gap and timing_gap > 25 else ""
|
||
logger.debug(f"Player entered: {player} (total: {len(current_players)}){gap_info}")
|
||
|
||
for player in exited_players:
|
||
# Calculate time since last telemetry
|
||
last_telemetry_age = None
|
||
if player in _player_telemetry_times and _player_telemetry_times[player]:
|
||
last_telemetry = _player_telemetry_times[player][-1]
|
||
last_telemetry_age = (timestamp - last_telemetry).total_seconds()
|
||
|
||
event = {
|
||
"timestamp": timestamp,
|
||
"type": "exit",
|
||
"character_name": player,
|
||
"total_players": len(current_players),
|
||
"last_telemetry_age": last_telemetry_age
|
||
}
|
||
_player_events.append(event)
|
||
age_info = f" (last telemetry: {last_telemetry_age:.1f}s ago)" if last_telemetry_age else ""
|
||
logger.debug(f"Player exited: {player} (total: {len(current_players)}){age_info}")
|
||
|
||
# Add current state to history
|
||
history_entry = {
|
||
"timestamp": timestamp,
|
||
"players": new_players,
|
||
"player_count": len(new_players),
|
||
"player_names": list(current_players)
|
||
}
|
||
_player_history.append(history_entry)
|
||
|
||
# Trim history to max size
|
||
if len(_player_history) > _max_history_size:
|
||
_player_history.pop(0)
|
||
|
||
# Trim events to max size
|
||
if len(_player_events) > _max_events_size:
|
||
_player_events.pop(0)
|
||
|
||
def _analyze_flapping_patterns() -> dict:
|
||
"""Analyze player events to identify flapping patterns."""
|
||
from collections import Counter, defaultdict
|
||
|
||
if not _player_events:
|
||
return {"flapping_players": [], "frequent_events": [], "analysis": "No events to analyze"}
|
||
|
||
# Count events per player
|
||
player_event_counts = Counter()
|
||
player_flap_counts = defaultdict(int)
|
||
|
||
# Track recent activity per player (last 10 events)
|
||
recent_player_activity = defaultdict(list)
|
||
|
||
for event in _player_events[-50:]: # Analyze last 50 events
|
||
player = event["character_name"]
|
||
event_type = event["type"]
|
||
player_event_counts[player] += 1
|
||
recent_player_activity[player].append(event_type)
|
||
|
||
# Identify flapping players (players with many enter/exit cycles)
|
||
flapping_players = []
|
||
for player, activity in recent_player_activity.items():
|
||
if len(activity) >= 4: # At least 4 events
|
||
# Count alternating enter/exit patterns
|
||
flap_score = 0
|
||
for i in range(1, len(activity)):
|
||
if activity[i] != activity[i-1]: # Different from previous
|
||
flap_score += 1
|
||
|
||
if flap_score >= 3: # At least 3 transitions
|
||
flapping_players.append({
|
||
"character_name": player,
|
||
"events": len(activity),
|
||
"flap_score": flap_score,
|
||
"recent_activity": activity[-10:] # Last 10 events
|
||
})
|
||
|
||
# Sort by flap score
|
||
flapping_players.sort(key=lambda x: x["flap_score"], reverse=True)
|
||
|
||
# Most active players
|
||
frequent_events = [
|
||
{"character_name": player, "event_count": count}
|
||
for player, count in player_event_counts.most_common(10)
|
||
]
|
||
|
||
# Recent activity summary
|
||
recent_enters = sum(1 for e in _player_events[-20:] if e["type"] == "enter")
|
||
recent_exits = sum(1 for e in _player_events[-20:] if e["type"] == "exit")
|
||
|
||
return {
|
||
"flapping_players": flapping_players,
|
||
"frequent_events": frequent_events,
|
||
"recent_activity": {
|
||
"enters": recent_enters,
|
||
"exits": recent_exits,
|
||
"net_change": recent_enters - recent_exits
|
||
},
|
||
"analysis": f"Found {len(flapping_players)} potentially flapping players"
|
||
}
|
||
|
||
def _analyze_telemetry_timing() -> dict:
|
||
"""Analyze telemetry timing patterns for all players."""
|
||
from datetime import datetime, timezone
|
||
|
||
timing_analysis = {}
|
||
problem_players = []
|
||
|
||
for player_name, timestamps in _player_telemetry_times.items():
|
||
if len(timestamps) < 2:
|
||
continue
|
||
|
||
# Calculate intervals between telemetry messages
|
||
intervals = []
|
||
for i in range(1, len(timestamps)):
|
||
interval = (timestamps[i] - timestamps[i-1]).total_seconds()
|
||
intervals.append(interval)
|
||
|
||
if not intervals:
|
||
continue
|
||
|
||
# Calculate timing statistics
|
||
avg_interval = sum(intervals) / len(intervals)
|
||
min_interval = min(intervals)
|
||
max_interval = max(intervals)
|
||
|
||
# Count problematic intervals (>30s)
|
||
long_gaps = [i for i in intervals if i > 30]
|
||
recent_long_gaps = [i for i in intervals[-5:] if i > 30] # Last 5 intervals
|
||
|
||
# Determine if this player has timing issues
|
||
has_timing_issues = len(long_gaps) > 0 or max_interval > 35
|
||
|
||
timing_stats = {
|
||
"character_name": player_name,
|
||
"total_messages": len(timestamps),
|
||
"avg_interval": round(avg_interval, 1),
|
||
"min_interval": round(min_interval, 1),
|
||
"max_interval": round(max_interval, 1),
|
||
"long_gaps_count": len(long_gaps),
|
||
"recent_long_gaps": len(recent_long_gaps),
|
||
"last_message_age": (datetime.now(timezone.utc) - timestamps[-1]).total_seconds() if timestamps else 0,
|
||
"has_timing_issues": has_timing_issues,
|
||
"recent_intervals": [round(i, 1) for i in intervals[-5:]] # Last 5 intervals
|
||
}
|
||
|
||
timing_analysis[player_name] = timing_stats
|
||
|
||
if has_timing_issues:
|
||
problem_players.append(timing_stats)
|
||
|
||
# Sort problem players by severity (max interval)
|
||
problem_players.sort(key=lambda x: x["max_interval"], reverse=True)
|
||
|
||
return {
|
||
"all_players": timing_analysis,
|
||
"problem_players": problem_players,
|
||
"summary": {
|
||
"total_tracked_players": len(timing_analysis),
|
||
"players_with_issues": len(problem_players),
|
||
"avg_intervals": [stats["avg_interval"] for stats in timing_analysis.values()],
|
||
}
|
||
}
|
||
|
||
async def _refresh_cache_loop() -> None:
|
||
"""Background task: refresh `/live` and `/trails` caches every 5 seconds."""
|
||
consecutive_failures = 0
|
||
max_consecutive_failures = 5
|
||
|
||
while True:
|
||
try:
|
||
# Recompute live players (last 30s)
|
||
cutoff = datetime.now(timezone.utc) - ACTIVE_WINDOW
|
||
sql_live = """
|
||
SELECT sub.*,
|
||
COALESCE(rs.total_rares, 0) AS total_rares,
|
||
COALESCE(rss.session_rares, 0) AS session_rares,
|
||
COALESCE(cs.total_kills, 0) AS total_kills
|
||
FROM (
|
||
SELECT DISTINCT ON (character_name) *
|
||
FROM telemetry_events
|
||
WHERE timestamp > :cutoff
|
||
ORDER BY character_name, timestamp DESC
|
||
) sub
|
||
LEFT JOIN rare_stats rs
|
||
ON sub.character_name = rs.character_name
|
||
LEFT JOIN rare_stats_sessions rss
|
||
ON sub.character_name = rss.character_name
|
||
AND sub.session_id = rss.session_id
|
||
LEFT JOIN char_stats cs
|
||
ON sub.character_name = cs.character_name
|
||
"""
|
||
|
||
# Use a single connection for both queries to reduce connection churn
|
||
async with database.connection() as conn:
|
||
rows = await conn.fetch_all(sql_live, {"cutoff": cutoff})
|
||
new_players = [dict(r) for r in rows]
|
||
|
||
# Track player changes for debugging
|
||
_track_player_changes(new_players)
|
||
|
||
_cached_live["players"] = new_players
|
||
|
||
# Recompute trails (last 600s)
|
||
cutoff2 = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=600)
|
||
sql_trail = """
|
||
SELECT timestamp, character_name, ew, ns, z
|
||
FROM telemetry_events
|
||
WHERE timestamp >= :cutoff
|
||
ORDER BY character_name, timestamp
|
||
"""
|
||
rows2 = await conn.fetch_all(sql_trail, {"cutoff": cutoff2})
|
||
_cached_trails["trails"] = [
|
||
{"timestamp": r["timestamp"], "character_name": r["character_name"],
|
||
"ew": r["ew"], "ns": r["ns"], "z": r["z"]}
|
||
for r in rows2
|
||
]
|
||
|
||
# Reset failure counter on success
|
||
consecutive_failures = 0
|
||
logger.debug(f"Cache refreshed: {len(_cached_live['players'])} players, {len(_cached_trails['trails'])} trail points")
|
||
|
||
except Exception as e:
|
||
consecutive_failures += 1
|
||
logger.error(f"Cache refresh failed ({consecutive_failures}/{max_consecutive_failures}): {e}", exc_info=True)
|
||
|
||
# If too many consecutive failures, wait longer and try to reconnect
|
||
if consecutive_failures >= max_consecutive_failures:
|
||
logger.warning(f"Too many consecutive cache refresh failures. Attempting database reconnection...")
|
||
try:
|
||
await database.disconnect()
|
||
await asyncio.sleep(2)
|
||
await database.connect()
|
||
logger.info("Database reconnected successfully")
|
||
consecutive_failures = 0
|
||
except Exception as reconnect_error:
|
||
logger.error(f"Database reconnection failed: {reconnect_error}")
|
||
await asyncio.sleep(10) # Wait longer before retrying
|
||
continue
|
||
|
||
await asyncio.sleep(5)
|
||
|
||
async def _refresh_total_rares_cache() -> None:
|
||
"""Background task: refresh total rares cache every 5 minutes."""
|
||
consecutive_failures = 0
|
||
max_consecutive_failures = 3
|
||
|
||
while True:
|
||
try:
|
||
async with database.connection() as conn:
|
||
# Get all-time total rares (sum of all characters) - gracefully handle missing table
|
||
try:
|
||
all_time_query = "SELECT COALESCE(SUM(total_rares), 0) as total FROM rare_stats"
|
||
all_time_result = await conn.fetch_one(all_time_query)
|
||
all_time_total = all_time_result["total"] if all_time_result else 0
|
||
except Exception as e:
|
||
logger.debug(f"rare_stats table not available: {e}")
|
||
all_time_total = 0
|
||
|
||
# Get today's rares from rare_events table - gracefully handle missing table
|
||
try:
|
||
today_query = """
|
||
SELECT COUNT(*) as today_count
|
||
FROM rare_events
|
||
WHERE timestamp >= CURRENT_DATE
|
||
"""
|
||
today_result = await conn.fetch_one(today_query)
|
||
today_total = today_result["today_count"] if today_result else 0
|
||
except Exception as e:
|
||
logger.debug(f"rare_events table not available or empty: {e}")
|
||
today_total = 0
|
||
|
||
# Update cache
|
||
_cached_total_rares["all_time"] = all_time_total
|
||
_cached_total_rares["today"] = today_total
|
||
_cached_total_rares["last_updated"] = datetime.now(timezone.utc)
|
||
|
||
consecutive_failures = 0
|
||
logger.debug(f"Total rares cache updated: All-time: {all_time_total}, Today: {today_total}")
|
||
|
||
except Exception as e:
|
||
consecutive_failures += 1
|
||
logger.error(f"Total rares cache refresh failed ({consecutive_failures}/{max_consecutive_failures}): {e}", exc_info=True)
|
||
|
||
if consecutive_failures >= max_consecutive_failures:
|
||
logger.warning("Too many consecutive total rares cache failures, waiting longer...")
|
||
await asyncio.sleep(60) # Wait longer on repeated failures
|
||
continue
|
||
|
||
# Sleep for 5 minutes (300 seconds)
|
||
await asyncio.sleep(300)
|
||
|
||
# ------------------------------------------------------------------
|
||
app = FastAPI()
|
||
# In-memory store mapping character_name to the most recent telemetry snapshot
|
||
live_snapshots: Dict[str, dict] = {}
|
||
live_vitals: Dict[str, dict] = {}
|
||
|
||
# Shared secret used to authenticate plugin WebSocket connections (override for production)
|
||
SHARED_SECRET = "your_shared_secret"
|
||
# LOG_FILE = "telemetry_log.jsonl"
|
||
# ------------------------------------------------------------------
|
||
ACTIVE_WINDOW = timedelta(seconds=30) # Time window defining “online” players (last 30 seconds)
|
||
|
||
"""
|
||
Data models for plugin events:
|
||
- TelemetrySnapshot: periodic telemetry data from a player client
|
||
- SpawnEvent: information about a mob spawn event
|
||
- RareEvent: details of a rare mob event
|
||
"""
|
||
|
||
|
||
class TelemetrySnapshot(BaseModel):
|
||
character_name: str
|
||
char_tag: Optional[str] = None
|
||
session_id: str
|
||
timestamp: datetime
|
||
|
||
ew: float # +E / –W
|
||
ns: float # +N / –S
|
||
z: float
|
||
|
||
kills: int
|
||
kills_per_hour: Optional[float] = None
|
||
onlinetime: Optional[str] = None
|
||
deaths: int
|
||
total_deaths: Optional[int] = None
|
||
# Removed from telemetry payload; always enforced to 0 and tracked via rare events
|
||
rares_found: int = 0
|
||
prismatic_taper_count: int
|
||
vt_state: str
|
||
# Optional telemetry metrics
|
||
mem_mb: Optional[float] = None
|
||
cpu_pct: Optional[float] = None
|
||
mem_handles: Optional[int] = None
|
||
latency_ms: Optional[float] = None
|
||
|
||
|
||
class SpawnEvent(BaseModel):
|
||
"""
|
||
Model for a spawn event emitted by plugin clients when a mob appears.
|
||
Records character context, mob type, timestamp, and spawn location.
|
||
"""
|
||
character_name: str
|
||
mob: str
|
||
timestamp: datetime
|
||
ew: float
|
||
ns: float
|
||
z: float = 0.0
|
||
|
||
class RareEvent(BaseModel):
|
||
"""
|
||
Model for a rare mob event when a player encounters or discovers a rare entity.
|
||
Includes character, event name, timestamp, and location coordinates.
|
||
"""
|
||
character_name: str
|
||
name: str
|
||
timestamp: datetime
|
||
ew: float
|
||
ns: float
|
||
z: float = 0.0
|
||
|
||
|
||
class FullInventoryMessage(BaseModel):
|
||
"""
|
||
Model for the full_inventory WebSocket message type.
|
||
Contains complete character inventory snapshot with raw item data.
|
||
"""
|
||
character_name: str
|
||
timestamp: datetime
|
||
item_count: int
|
||
items: List[Dict[str, Any]]
|
||
|
||
|
||
class VitalsMessage(BaseModel):
|
||
"""
|
||
Model for the vitals WebSocket message type.
|
||
Contains character health, stamina, mana, and vitae information.
|
||
"""
|
||
character_name: str
|
||
timestamp: datetime
|
||
health_current: int
|
||
health_max: int
|
||
health_percentage: float
|
||
stamina_current: int
|
||
stamina_max: int
|
||
stamina_percentage: float
|
||
mana_current: int
|
||
mana_max: int
|
||
mana_percentage: float
|
||
vitae: int
|
||
|
||
|
||
@app.on_event("startup")
|
||
async def on_startup():
|
||
"""Event handler triggered when application starts up.
|
||
|
||
Attempts to connect to the database with retry logic to accommodate
|
||
potential startup delays (e.g., waiting for Postgres to be ready).
|
||
"""
|
||
max_attempts = 5
|
||
for attempt in range(1, max_attempts + 1):
|
||
try:
|
||
await database.connect()
|
||
await init_db_async()
|
||
logger.info(f"Database connected successfully on attempt {attempt}")
|
||
# Log connection pool configuration
|
||
try:
|
||
logger.info(f"Database connection established with pool configuration")
|
||
except Exception as pool_error:
|
||
logger.debug(f"Could not access pool details: {pool_error}")
|
||
break
|
||
except Exception as e:
|
||
logger.warning(f"Database connection failed (attempt {attempt}/{max_attempts}): {e}")
|
||
if attempt < max_attempts:
|
||
await asyncio.sleep(5)
|
||
else:
|
||
raise RuntimeError(f"Could not connect to database after {max_attempts} attempts")
|
||
# Start background cache refresh (live & trails)
|
||
global _cache_task, _rares_cache_task, _server_health_task
|
||
_cache_task = asyncio.create_task(_refresh_cache_loop())
|
||
_rares_cache_task = asyncio.create_task(_refresh_total_rares_cache())
|
||
_server_health_task = asyncio.create_task(monitor_server_health())
|
||
logger.info("Background cache refresh and server monitoring tasks started")
|
||
@app.on_event("shutdown")
|
||
async def on_shutdown():
|
||
"""Event handler triggered when application is shutting down.
|
||
|
||
Ensures the database connection is closed cleanly.
|
||
"""
|
||
# Stop cache refresh tasks
|
||
global _cache_task, _rares_cache_task, _server_health_task
|
||
if _cache_task:
|
||
logger.info("Stopping background cache refresh task")
|
||
_cache_task.cancel()
|
||
try:
|
||
await _cache_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
if _rares_cache_task:
|
||
logger.info("Stopping total rares cache refresh task")
|
||
_rares_cache_task.cancel()
|
||
try:
|
||
await _rares_cache_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
if _server_health_task:
|
||
logger.info("Stopping server health monitoring task")
|
||
_server_health_task.cancel()
|
||
try:
|
||
await _server_health_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
logger.info("Disconnecting from database")
|
||
await database.disconnect()
|
||
|
||
|
||
|
||
# ------------------------ GET -----------------------------------
|
||
@app.get("/debug")
|
||
def debug():
|
||
return {"status": "OK"}
|
||
|
||
@app.get("/debug/player-flapping")
|
||
async def get_player_flapping_debug():
|
||
"""Return player tracking data for debugging flapping issues."""
|
||
try:
|
||
# Analyze flapping patterns
|
||
flapping_analysis = _analyze_flapping_patterns()
|
||
|
||
# Analyze telemetry timing
|
||
timing_analysis = _analyze_telemetry_timing()
|
||
|
||
# Get recent events (last 50)
|
||
recent_events = _player_events[-50:] if len(_player_events) > 50 else _player_events
|
||
|
||
# Convert timestamps to ISO format for JSON serialization
|
||
formatted_events = []
|
||
for event in recent_events:
|
||
formatted_event = event.copy()
|
||
formatted_event["timestamp"] = event["timestamp"].isoformat()
|
||
formatted_events.append(formatted_event)
|
||
|
||
# Format history
|
||
formatted_history = []
|
||
for entry in _player_history:
|
||
formatted_entry = {
|
||
"timestamp": entry["timestamp"].isoformat(),
|
||
"player_count": entry["player_count"],
|
||
"player_names": entry["player_names"]
|
||
}
|
||
formatted_history.append(formatted_entry)
|
||
|
||
# Format timing data for JSON serialization
|
||
formatted_timing = {}
|
||
for player_name, timing_data in timing_analysis["all_players"].items():
|
||
formatted_timing[player_name] = timing_data.copy()
|
||
# Round last_message_age for readability
|
||
formatted_timing[player_name]["last_message_age"] = round(timing_data["last_message_age"], 1)
|
||
|
||
return {
|
||
"current_players": len(_cached_live.get("players", [])),
|
||
"history": formatted_history,
|
||
"recent_events": formatted_events,
|
||
"flapping_analysis": flapping_analysis,
|
||
"timing_analysis": {
|
||
"all_players": formatted_timing,
|
||
"problem_players": timing_analysis["problem_players"],
|
||
"summary": timing_analysis["summary"]
|
||
},
|
||
"tracking_stats": {
|
||
"history_entries": len(_player_history),
|
||
"total_events": len(_player_events),
|
||
"tracked_players": len(_player_telemetry_times),
|
||
"max_history_size": _max_history_size,
|
||
"max_events_size": _max_events_size
|
||
}
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Failed to get player flapping debug data: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
@app.get("/debug/websocket-health")
|
||
async def get_websocket_health():
|
||
"""Return simple WebSocket connection counts."""
|
||
try:
|
||
return {
|
||
"plugin_connections": _plugin_connections,
|
||
"browser_connections": _browser_connections,
|
||
"total_connections": _plugin_connections + _browser_connections
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Failed to get WebSocket health data: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
@app.get("/debug/database-performance")
|
||
async def get_database_performance():
|
||
"""Return simple database query performance statistics."""
|
||
try:
|
||
avg_query_time = (_total_query_time / _total_queries) if _total_queries > 0 else 0.0
|
||
return {
|
||
"total_queries": _total_queries,
|
||
"total_query_time": round(_total_query_time, 3),
|
||
"average_query_time": round(avg_query_time, 3)
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Failed to get database performance data: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
@app.get("/debug/recent-activity")
|
||
async def get_recent_activity():
|
||
"""Return recent telemetry activity feed."""
|
||
try:
|
||
return {
|
||
"recent_messages": _recent_telemetry_messages.copy(),
|
||
"total_messages": len(_recent_telemetry_messages),
|
||
"max_messages": _max_recent_messages
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Failed to get recent activity data: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
@app.get("/server-health")
|
||
async def get_server_health():
|
||
"""Return current server health status."""
|
||
try:
|
||
# Get latest status from database if cache is stale
|
||
if not _server_status_cache.get("last_check") or \
|
||
(datetime.now(timezone.utc) - datetime.fromisoformat(_server_status_cache["last_check"].replace('Z', '+00:00')) > timedelta(minutes=2)):
|
||
|
||
row = await database.fetch_one(
|
||
"SELECT * FROM server_status WHERE server_name = :name",
|
||
{"name": "Coldeve"}
|
||
)
|
||
|
||
if row:
|
||
_server_status_cache.update({
|
||
"status": row["current_status"],
|
||
"latency_ms": row["last_latency_ms"],
|
||
"player_count": row["last_player_count"],
|
||
"last_check": row["last_check"].isoformat() if row["last_check"] else None,
|
||
"uptime_seconds": row["total_uptime_seconds"],
|
||
"last_restart": row["last_restart"].isoformat() if row["last_restart"] else None
|
||
})
|
||
|
||
# Format uptime
|
||
uptime_seconds = _server_status_cache.get("uptime_seconds", 0)
|
||
days = uptime_seconds // 86400
|
||
hours = (uptime_seconds % 86400) // 3600
|
||
minutes = (uptime_seconds % 3600) // 60
|
||
|
||
uptime_str = f"{days}d {hours}h {minutes}m" if days > 0 else f"{hours}h {minutes}m"
|
||
|
||
return {
|
||
"server_name": "Coldeve",
|
||
"status": _server_status_cache.get("status", "unknown"),
|
||
"latency_ms": _server_status_cache.get("latency_ms"),
|
||
"player_count": _server_status_cache.get("player_count"),
|
||
"uptime": uptime_str,
|
||
"uptime_seconds": uptime_seconds,
|
||
"last_restart": _server_status_cache.get("last_restart"),
|
||
"last_check": _server_status_cache.get("last_check")
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to get server health data: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
@app.get("/quest-status")
|
||
async def get_quest_status():
|
||
"""Return current cached quest status for all players."""
|
||
try:
|
||
# Return the quest cache with structured data
|
||
return {
|
||
"quest_data": _quest_status_cache,
|
||
"tracked_quests": [
|
||
"Stipend Collection Timer",
|
||
"Blank Augmentation Gem Pickup Timer",
|
||
"Insatiable Eater Jaw"
|
||
],
|
||
"player_count": len(_quest_status_cache)
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Failed to get quest status data: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
@app.get("/portals")
|
||
async def get_portals():
|
||
"""Return unique portal discoveries from the last 24 hours."""
|
||
try:
|
||
# Query unique portals from last 24 hours, keeping the most recent discovery of each
|
||
cutoff_time = datetime.now(timezone.utc) - timedelta(hours=24)
|
||
|
||
query = """
|
||
SELECT DISTINCT ON (portal_name)
|
||
character_name, portal_name, timestamp, ns, ew, z
|
||
FROM portal_discoveries
|
||
WHERE timestamp >= :cutoff_time
|
||
ORDER BY portal_name, timestamp DESC
|
||
"""
|
||
|
||
rows = await database.fetch_all(query, {"cutoff_time": cutoff_time})
|
||
|
||
portals = []
|
||
for row in rows:
|
||
portal = {
|
||
"character_name": row["character_name"],
|
||
"portal_name": row["portal_name"],
|
||
"timestamp": row["timestamp"].isoformat(),
|
||
"ns": row["ns"],
|
||
"ew": row["ew"],
|
||
"z": row["z"]
|
||
}
|
||
portals.append(portal)
|
||
|
||
return {
|
||
"portals": portals,
|
||
"portal_count": len(portals),
|
||
"cutoff_time": cutoff_time.isoformat()
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to get portals data: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
@app.get("/live", response_model=dict)
|
||
@app.get("/live/", response_model=dict)
|
||
async def get_live_players():
|
||
"""Return cached live telemetry per character."""
|
||
try:
|
||
return JSONResponse(content=jsonable_encoder(_cached_live))
|
||
except Exception as e:
|
||
logger.error(f"Failed to get live players: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
|
||
|
||
# --- GET Trails ---------------------------------
|
||
@app.get("/trails")
|
||
@app.get("/trails/")
|
||
async def get_trails(
|
||
seconds: int = Query(600, ge=0, description="Lookback window in seconds"),
|
||
):
|
||
"""Return cached trails (updated every 5 seconds)."""
|
||
try:
|
||
return JSONResponse(content=jsonable_encoder(_cached_trails))
|
||
except Exception as e:
|
||
logger.error(f"Failed to get trails: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
@app.get("/total-rares")
|
||
@app.get("/total-rares/")
|
||
async def get_total_rares():
|
||
"""Return cached total rares statistics (updated every 5 minutes)."""
|
||
try:
|
||
return JSONResponse(content=jsonable_encoder(_cached_total_rares))
|
||
except Exception as e:
|
||
logger.error(f"Failed to get total rares: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
# --- GET Spawn Heat Map Endpoint ---------------------------------
|
||
@app.get("/spawns/heatmap")
|
||
async def get_spawn_heatmap_data(
|
||
hours: int = Query(24, ge=1, le=168, description="Lookback window in hours (1-168)"),
|
||
limit: int = Query(10000, ge=100, le=50000, description="Maximum number of spawn points to return")
|
||
):
|
||
"""
|
||
Aggregate spawn locations for heat-map visualization.
|
||
|
||
Returns spawn event coordinates grouped by location with intensity counts
|
||
for the specified time window.
|
||
|
||
Response format:
|
||
{
|
||
"spawn_points": [{"ew": float, "ns": float, "intensity": int}, ...],
|
||
"total_points": int,
|
||
"timestamp": "UTC-ISO"
|
||
}
|
||
"""
|
||
try:
|
||
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
|
||
|
||
# Aggregate spawn events by coordinates within time window
|
||
query = """
|
||
SELECT ew, ns, COUNT(*) AS spawn_count
|
||
FROM spawn_events
|
||
WHERE timestamp >= :cutoff
|
||
GROUP BY ew, ns
|
||
ORDER BY spawn_count DESC
|
||
LIMIT :limit
|
||
"""
|
||
|
||
rows = await database.fetch_all(query, {"cutoff": cutoff, "limit": limit})
|
||
|
||
spawn_points = [
|
||
{
|
||
"ew": float(row["ew"]),
|
||
"ns": float(row["ns"]),
|
||
"intensity": int(row["spawn_count"])
|
||
}
|
||
for row in rows
|
||
]
|
||
|
||
result = {
|
||
"spawn_points": spawn_points,
|
||
"total_points": len(spawn_points),
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"hours_window": hours
|
||
}
|
||
|
||
logger.debug(f"Heat map data: {len(spawn_points)} unique spawn locations from last {hours} hours")
|
||
return JSONResponse(content=jsonable_encoder(result))
|
||
|
||
except Exception as e:
|
||
logger.error(f"Heat map query failed: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Spawn heat map query failed")
|
||
|
||
|
||
# --- GET Inventory Endpoints ---------------------------------
|
||
@app.get("/inventory/{character_name}")
|
||
async def get_character_inventory(character_name: str):
|
||
"""Get the complete inventory for a specific character - inventory service only."""
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
response = await client.get(
|
||
f"{INVENTORY_SERVICE_URL}/inventory/{character_name}"
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
return JSONResponse(content=response.json())
|
||
elif response.status_code == 404:
|
||
raise HTTPException(status_code=404, detail=f"No inventory found for character '{character_name}'")
|
||
else:
|
||
logger.error(f"Inventory service returned {response.status_code} for {character_name}")
|
||
raise HTTPException(status_code=502, detail="Inventory service error")
|
||
|
||
except httpx.RequestError as e:
|
||
logger.error(f"Could not reach inventory service: {e}")
|
||
raise HTTPException(status_code=503, detail="Inventory service unavailable")
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Failed to get inventory for {character_name}: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
@app.get("/inventory/{character_name}/search")
|
||
async def search_character_inventory(
|
||
character_name: str,
|
||
name: str = Query(None, description="Search by item name (partial match)"),
|
||
object_class: int = Query(None, description="Filter by ObjectClass"),
|
||
min_value: int = Query(None, description="Minimum item value"),
|
||
max_value: int = Query(None, description="Maximum item value"),
|
||
min_burden: int = Query(None, description="Minimum burden"),
|
||
max_burden: int = Query(None, description="Maximum burden")
|
||
):
|
||
"""Search and filter inventory items for a character with various criteria."""
|
||
try:
|
||
conditions = ["character_name = :character_name"]
|
||
params = {"character_name": character_name}
|
||
|
||
if name:
|
||
conditions.append("name ILIKE :name")
|
||
params["name"] = f"%{name}%"
|
||
|
||
if object_class is not None:
|
||
conditions.append("object_class = :object_class")
|
||
params["object_class"] = object_class
|
||
|
||
if min_value is not None:
|
||
conditions.append("value >= :min_value")
|
||
params["min_value"] = min_value
|
||
|
||
if max_value is not None:
|
||
conditions.append("value <= :max_value")
|
||
params["max_value"] = max_value
|
||
|
||
if min_burden is not None:
|
||
conditions.append("burden >= :min_burden")
|
||
params["min_burden"] = min_burden
|
||
|
||
if max_burden is not None:
|
||
conditions.append("burden <= :max_burden")
|
||
params["max_burden"] = max_burden
|
||
|
||
query = f"""
|
||
SELECT name, icon, object_class, value, burden, has_id_data, item_data, timestamp
|
||
FROM character_inventories
|
||
WHERE {' AND '.join(conditions)}
|
||
ORDER BY value DESC, name
|
||
"""
|
||
|
||
rows = await database.fetch_all(query, params)
|
||
|
||
items = []
|
||
for row in rows:
|
||
item = dict(row)
|
||
items.append(item)
|
||
|
||
return JSONResponse(content=jsonable_encoder({
|
||
"character_name": character_name,
|
||
"item_count": len(items),
|
||
"search_criteria": {
|
||
"name": name,
|
||
"object_class": object_class,
|
||
"min_value": min_value,
|
||
"max_value": max_value,
|
||
"min_burden": min_burden,
|
||
"max_burden": max_burden
|
||
},
|
||
"items": items
|
||
}))
|
||
except Exception as e:
|
||
logger.error(f"Failed to search inventory for {character_name}: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
@app.get("/inventories")
|
||
async def list_characters_with_inventories():
|
||
"""List all characters that have stored inventories with item counts."""
|
||
try:
|
||
query = """
|
||
SELECT character_name, COUNT(*) as item_count, MAX(timestamp) as last_updated
|
||
FROM character_inventories
|
||
GROUP BY character_name
|
||
ORDER BY last_updated DESC
|
||
"""
|
||
rows = await database.fetch_all(query)
|
||
|
||
characters = []
|
||
for row in rows:
|
||
characters.append({
|
||
"character_name": row["character_name"],
|
||
"item_count": row["item_count"],
|
||
"last_updated": row["last_updated"]
|
||
})
|
||
|
||
return JSONResponse(content=jsonable_encoder({
|
||
"characters": characters,
|
||
"total_characters": len(characters)
|
||
}))
|
||
except Exception as e:
|
||
logger.error(f"Failed to list inventory characters: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
# --- Inventory Service Character List Proxy ---------------------
|
||
@app.get("/inventory-characters")
|
||
async def get_inventory_characters():
|
||
"""Get character list from inventory service - proxy to avoid routing conflicts."""
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
response = await client.get(f"{INVENTORY_SERVICE_URL}/characters/list")
|
||
|
||
if response.status_code == 200:
|
||
return JSONResponse(content=response.json())
|
||
else:
|
||
logger.error(f"Inventory service returned {response.status_code}: {response.text}")
|
||
raise HTTPException(status_code=response.status_code, detail="Failed to get characters from inventory service")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to proxy inventory characters request: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Failed to get inventory characters")
|
||
|
||
|
||
# --- Inventory Search Service Proxy Endpoints -------------------
|
||
@app.get("/search/items")
|
||
async def search_items_proxy(
|
||
text: str = Query(None, description="Search item names, descriptions, or properties"),
|
||
character: str = Query(None, description="Limit search to specific character"),
|
||
include_all_characters: bool = Query(False, description="Search across all characters"),
|
||
equipment_status: str = Query(None, description="equipped, unequipped, or all"),
|
||
equipment_slot: int = Query(None, description="Equipment slot mask"),
|
||
# Item category filtering
|
||
armor_only: bool = Query(False, description="Show only armor items"),
|
||
jewelry_only: bool = Query(False, description="Show only jewelry items"),
|
||
weapon_only: bool = Query(False, description="Show only weapon items"),
|
||
# Spell filtering
|
||
has_spell: str = Query(None, description="Must have this specific spell (by name)"),
|
||
spell_contains: str = Query(None, description="Spell name contains this text"),
|
||
legendary_cantrips: str = Query(None, description="Comma-separated list of legendary cantrip names"),
|
||
# Combat properties
|
||
min_damage: int = Query(None, description="Minimum damage"),
|
||
max_damage: int = Query(None, description="Maximum damage"),
|
||
min_armor: int = Query(None, description="Minimum armor level"),
|
||
max_armor: int = Query(None, description="Maximum armor level"),
|
||
min_attack_bonus: float = Query(None, description="Minimum attack bonus"),
|
||
min_crit_damage_rating: int = Query(None, description="Minimum critical damage rating"),
|
||
min_damage_rating: int = Query(None, description="Minimum damage rating"),
|
||
min_heal_boost_rating: int = Query(None, description="Minimum heal boost rating"),
|
||
max_level: int = Query(None, description="Maximum wield level requirement"),
|
||
min_level: int = Query(None, description="Minimum wield level requirement"),
|
||
material: str = Query(None, description="Material type (partial match)"),
|
||
min_workmanship: float = Query(None, description="Minimum workmanship"),
|
||
has_imbue: bool = Query(None, description="Has imbue effects"),
|
||
item_set: str = Query(None, description="Item set name (partial match)"),
|
||
min_tinks: int = Query(None, description="Minimum tinker count"),
|
||
bonded: bool = Query(None, description="Bonded status"),
|
||
attuned: bool = Query(None, description="Attuned status"),
|
||
unique: bool = Query(None, description="Unique item status"),
|
||
is_rare: bool = Query(None, description="Rare item status"),
|
||
min_condition: int = Query(None, description="Minimum condition percentage"),
|
||
min_value: int = Query(None, description="Minimum item value"),
|
||
max_value: int = Query(None, description="Maximum item value"),
|
||
max_burden: int = Query(None, description="Maximum burden"),
|
||
sort_by: str = Query("name", description="Sort field: name, value, damage, armor, workmanship"),
|
||
sort_dir: str = Query("asc", description="Sort direction: asc or desc"),
|
||
page: int = Query(1, ge=1, description="Page number"),
|
||
limit: int = Query(50, ge=1, le=200, description="Items per page")
|
||
):
|
||
"""Proxy to inventory service comprehensive item search."""
|
||
try:
|
||
# Build query parameters
|
||
params = {}
|
||
if text: params["text"] = text
|
||
if character: params["character"] = character
|
||
if include_all_characters: params["include_all_characters"] = include_all_characters
|
||
if equipment_status: params["equipment_status"] = equipment_status
|
||
if equipment_slot is not None: params["equipment_slot"] = equipment_slot
|
||
# Category filtering
|
||
if armor_only: params["armor_only"] = armor_only
|
||
if jewelry_only: params["jewelry_only"] = jewelry_only
|
||
if weapon_only: params["weapon_only"] = weapon_only
|
||
# Spell filtering
|
||
if has_spell: params["has_spell"] = has_spell
|
||
if spell_contains: params["spell_contains"] = spell_contains
|
||
if legendary_cantrips: params["legendary_cantrips"] = legendary_cantrips
|
||
# Combat properties
|
||
if min_damage is not None: params["min_damage"] = min_damage
|
||
if max_damage is not None: params["max_damage"] = max_damage
|
||
if min_armor is not None: params["min_armor"] = min_armor
|
||
if max_armor is not None: params["max_armor"] = max_armor
|
||
if min_attack_bonus is not None: params["min_attack_bonus"] = min_attack_bonus
|
||
if min_crit_damage_rating is not None: params["min_crit_damage_rating"] = min_crit_damage_rating
|
||
if min_damage_rating is not None: params["min_damage_rating"] = min_damage_rating
|
||
if min_heal_boost_rating is not None: params["min_heal_boost_rating"] = min_heal_boost_rating
|
||
if max_level is not None: params["max_level"] = max_level
|
||
if min_level is not None: params["min_level"] = min_level
|
||
if material: params["material"] = material
|
||
if min_workmanship is not None: params["min_workmanship"] = min_workmanship
|
||
if has_imbue is not None: params["has_imbue"] = has_imbue
|
||
if item_set: params["item_set"] = item_set
|
||
if min_tinks is not None: params["min_tinks"] = min_tinks
|
||
if bonded is not None: params["bonded"] = bonded
|
||
if attuned is not None: params["attuned"] = attuned
|
||
if unique is not None: params["unique"] = unique
|
||
if is_rare is not None: params["is_rare"] = is_rare
|
||
if min_condition is not None: params["min_condition"] = min_condition
|
||
if min_value is not None: params["min_value"] = min_value
|
||
if max_value is not None: params["max_value"] = max_value
|
||
if max_burden is not None: params["max_burden"] = max_burden
|
||
params["sort_by"] = sort_by
|
||
params["sort_dir"] = sort_dir
|
||
params["page"] = page
|
||
params["limit"] = limit
|
||
|
||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||
response = await client.get(
|
||
f"{INVENTORY_SERVICE_URL}/search/items",
|
||
params=params
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
return JSONResponse(content=response.json())
|
||
else:
|
||
logger.error(f"Inventory search service returned {response.status_code}")
|
||
raise HTTPException(status_code=response.status_code, detail="Inventory search service error")
|
||
|
||
except httpx.RequestError as e:
|
||
logger.error(f"Could not reach inventory service: {e}")
|
||
raise HTTPException(status_code=503, detail="Inventory service unavailable")
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Failed to search items: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
@app.get("/search/equipped/{character_name}")
|
||
async def search_equipped_items_proxy(
|
||
character_name: str,
|
||
slot: int = Query(None, description="Specific equipment slot mask")
|
||
):
|
||
"""Proxy to inventory service equipped items search."""
|
||
try:
|
||
params = {}
|
||
if slot is not None:
|
||
params["slot"] = slot
|
||
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
response = await client.get(
|
||
f"{INVENTORY_SERVICE_URL}/search/equipped/{character_name}",
|
||
params=params
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
return JSONResponse(content=response.json())
|
||
elif response.status_code == 404:
|
||
raise HTTPException(status_code=404, detail=f"No equipped items found for character '{character_name}'")
|
||
else:
|
||
logger.error(f"Inventory service returned {response.status_code} for equipped items search")
|
||
raise HTTPException(status_code=response.status_code, detail="Inventory service error")
|
||
|
||
except httpx.RequestError as e:
|
||
logger.error(f"Could not reach inventory service: {e}")
|
||
raise HTTPException(status_code=503, detail="Inventory service unavailable")
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Failed to search equipped items for {character_name}: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
@app.get("/search/upgrades/{character_name}/{slot}")
|
||
async def find_equipment_upgrades_proxy(
|
||
character_name: str,
|
||
slot: int,
|
||
upgrade_type: str = Query("damage", description="What to optimize for: damage, armor, workmanship, value")
|
||
):
|
||
"""Proxy to inventory service equipment upgrades search."""
|
||
try:
|
||
params = {"upgrade_type": upgrade_type}
|
||
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
response = await client.get(
|
||
f"{INVENTORY_SERVICE_URL}/search/upgrades/{character_name}/{slot}",
|
||
params=params
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
return JSONResponse(content=response.json())
|
||
elif response.status_code == 404:
|
||
raise HTTPException(status_code=404, detail=f"No upgrade options found for character '{character_name}' slot {slot}")
|
||
else:
|
||
logger.error(f"Inventory service returned {response.status_code} for upgrades search")
|
||
raise HTTPException(status_code=response.status_code, detail="Inventory service error")
|
||
|
||
except httpx.RequestError as e:
|
||
logger.error(f"Could not reach inventory service: {e}")
|
||
raise HTTPException(status_code=503, detail="Inventory service unavailable")
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Failed to find equipment upgrades for {character_name} slot {slot}: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
@app.get("/sets/list")
|
||
async def list_equipment_sets_proxy():
|
||
"""Proxy to inventory service equipment sets list."""
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
response = await client.get(f"{INVENTORY_SERVICE_URL}/sets/list")
|
||
|
||
if response.status_code == 200:
|
||
return JSONResponse(content=response.json())
|
||
else:
|
||
logger.error(f"Inventory service returned {response.status_code} for sets list")
|
||
raise HTTPException(status_code=response.status_code, detail="Inventory service error")
|
||
|
||
except httpx.RequestError as e:
|
||
logger.error(f"Could not reach inventory service: {e}")
|
||
raise HTTPException(status_code=503, detail="Inventory service unavailable")
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Failed to list equipment sets: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
# -------------------- WebSocket endpoints -----------------------
|
||
## WebSocket connection tracking
|
||
# Set of browser WebSocket clients subscribed to live updates
|
||
browser_conns: set[WebSocket] = set()
|
||
# Mapping of plugin clients by character_name to their WebSocket for command forwarding
|
||
plugin_conns: Dict[str, WebSocket] = {}
|
||
|
||
async def _broadcast_to_browser_clients(snapshot: dict):
|
||
"""Broadcast a telemetry or chat message to all connected browser clients.
|
||
|
||
Converts any non-serializable types (e.g., datetime) before sending.
|
||
Handles connection errors gracefully and removes stale connections.
|
||
"""
|
||
# Convert snapshot payload to JSON-friendly types
|
||
data = jsonable_encoder(snapshot)
|
||
# Use list() to avoid "set changed size during iteration" errors
|
||
disconnected_clients = []
|
||
|
||
for ws in list(browser_conns):
|
||
try:
|
||
await ws.send_json(data)
|
||
except (WebSocketDisconnect, RuntimeError, ConnectionAbortedError) as e:
|
||
# Collect disconnected clients for cleanup
|
||
disconnected_clients.append(ws)
|
||
logger.debug(f"Detected disconnected browser client: {e}")
|
||
except Exception as e:
|
||
# Handle any other unexpected errors
|
||
disconnected_clients.append(ws)
|
||
logger.warning(f"Unexpected error broadcasting to browser client: {e}")
|
||
|
||
# Clean up disconnected clients
|
||
for ws in disconnected_clients:
|
||
browser_conns.discard(ws)
|
||
|
||
|
||
async def _forward_to_inventory_service(inventory_msg: FullInventoryMessage):
|
||
"""Forward inventory data to the inventory microservice for processing."""
|
||
try:
|
||
# Prepare data for inventory service
|
||
inventory_data = {
|
||
"character_name": inventory_msg.character_name,
|
||
"timestamp": inventory_msg.timestamp.isoformat(),
|
||
"items": inventory_msg.items
|
||
}
|
||
|
||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||
response = await client.post(
|
||
f"{INVENTORY_SERVICE_URL}/process-inventory",
|
||
json=inventory_data
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
logger.info(f"Inventory service processed {result['processed']} items for {inventory_msg.character_name}")
|
||
else:
|
||
logger.error(f"Inventory service error {response.status_code}: {response.text}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to forward inventory to service: {e}")
|
||
# Don't raise - this shouldn't block the main storage
|
||
|
||
async def _store_inventory(inventory_msg: FullInventoryMessage):
|
||
"""Forward inventory data to inventory microservice for processing and storage."""
|
||
try:
|
||
# Forward to inventory microservice for enhanced processing and storage
|
||
await _forward_to_inventory_service(inventory_msg)
|
||
|
||
# Optional: Create JSON file for debugging (can be removed in production)
|
||
inventory_dir = Path("./inventory")
|
||
inventory_dir.mkdir(exist_ok=True)
|
||
|
||
file_path = inventory_dir / f"{inventory_msg.character_name}_inventory.json"
|
||
inventory_data = {
|
||
"character_name": inventory_msg.character_name,
|
||
"timestamp": inventory_msg.timestamp.isoformat(),
|
||
"item_count": inventory_msg.item_count,
|
||
"items": inventory_msg.items
|
||
}
|
||
|
||
with open(file_path, 'w') as f:
|
||
json.dump(inventory_data, f, indent=2)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to forward inventory for {inventory_msg.character_name}: {e}", exc_info=True)
|
||
raise
|
||
|
||
|
||
@app.websocket("/ws/position")
|
||
async def ws_receive_snapshots(
|
||
websocket: WebSocket,
|
||
secret: str | None = Query(None),
|
||
x_plugin_secret: str | None = Header(None)
|
||
):
|
||
"""WebSocket endpoint for plugin clients to send telemetry and events.
|
||
|
||
Validates a shared secret for authentication, then listens for messages of
|
||
various types (register, spawn, telemetry, rare, chat) and handles each:
|
||
- register: record plugin WebSocket for command forwarding
|
||
- spawn: persist spawn event
|
||
- telemetry: store snapshot, update stats, broadcast to browsers
|
||
- rare: update total and session rare counts, persist event
|
||
- chat: broadcast chat messages to browsers
|
||
"""
|
||
global _plugin_connections
|
||
|
||
# Authenticate plugin connection using shared secret
|
||
key = secret or x_plugin_secret
|
||
if key != SHARED_SECRET:
|
||
# Reject without completing the WebSocket handshake
|
||
logger.warning(f"Plugin WebSocket authentication failed from {websocket.client}")
|
||
await websocket.close(code=1008)
|
||
return
|
||
# Accept the WebSocket connection
|
||
await websocket.accept()
|
||
logger.info(f"🔌 PLUGIN_CONNECTED: {websocket.client}")
|
||
|
||
# Track plugin connection
|
||
_plugin_connections += 1
|
||
|
||
try:
|
||
while True:
|
||
# Read next text frame
|
||
try:
|
||
raw = await websocket.receive_text()
|
||
# Debug: log all incoming plugin WebSocket messages
|
||
logger.debug(f"Plugin WebSocket RX from {websocket.client}: {raw}")
|
||
except WebSocketDisconnect:
|
||
logger.info(f"🔌 PLUGIN_DISCONNECTED: {websocket.client}")
|
||
break
|
||
# Parse JSON payload
|
||
try:
|
||
data = json.loads(raw)
|
||
except json.JSONDecodeError as e:
|
||
logger.warning(f"Invalid JSON from plugin {websocket.client}: {e}")
|
||
continue
|
||
msg_type = data.get("type")
|
||
# --- Registration: associate character_name with this plugin socket ---
|
||
if msg_type == "register":
|
||
name = data.get("character_name") or data.get("player_name")
|
||
if isinstance(name, str):
|
||
plugin_conns[name] = websocket
|
||
logger.info(f"📋 PLUGIN_REGISTERED: {name} from {websocket.client}")
|
||
continue
|
||
# --- Spawn event: persist to spawn_events table ---
|
||
if msg_type == "spawn":
|
||
payload = data.copy()
|
||
payload.pop("type", None)
|
||
try:
|
||
spawn = SpawnEvent.parse_obj(payload)
|
||
await database.execute(
|
||
spawn_events.insert().values(**spawn.dict())
|
||
)
|
||
logger.debug(f"Recorded spawn event: {spawn.mob} by {spawn.character_name}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to process spawn event: {e}")
|
||
continue
|
||
continue
|
||
# --- Telemetry message: persist snapshot and update kill stats ---
|
||
if msg_type == "telemetry":
|
||
# Parse telemetry snapshot and update in-memory state
|
||
payload = data.copy()
|
||
payload.pop("type", None)
|
||
character_name = payload.get('character_name', 'unknown')
|
||
|
||
# Track message receipt and start timing
|
||
telemetry_start_time = time.time()
|
||
logger.info(f"📨 TELEMETRY_RECEIVED: {character_name} from {websocket.client}")
|
||
|
||
try:
|
||
snap = TelemetrySnapshot.parse_obj(payload)
|
||
live_snapshots[snap.character_name] = snap.dict()
|
||
# Prepare data and compute kill delta
|
||
db_data = snap.dict()
|
||
db_data['rares_found'] = 0
|
||
key = (snap.session_id, snap.character_name)
|
||
|
||
# Get last recorded kill count for this session
|
||
if key in ws_receive_snapshots._last_kills:
|
||
last = ws_receive_snapshots._last_kills[key]
|
||
else:
|
||
# Cache miss - check database for last kill count for this session
|
||
row = await database.fetch_one(
|
||
"SELECT kills FROM telemetry_events WHERE character_name = :char AND session_id = :session ORDER BY timestamp DESC LIMIT 1",
|
||
{"char": snap.character_name, "session": snap.session_id}
|
||
)
|
||
last = row["kills"] if row else 0
|
||
logger.debug(f"Cache miss for {snap.character_name} session {snap.session_id[:8]}: loaded last_kills={last} from database")
|
||
|
||
delta = snap.kills - last
|
||
# Persist snapshot and any kill delta in a single transaction
|
||
db_start_time = time.time()
|
||
|
||
# Log connection pool status before database operation
|
||
try:
|
||
pool_status = f"pool_size:{database._pool._queue.qsize()}" if hasattr(database, '_pool') and hasattr(database._pool, '_queue') else "pool_status:unknown"
|
||
except:
|
||
pool_status = "pool_status:error"
|
||
|
||
logger.info(f"💾 TELEMETRY_DB_WRITE_ATTEMPT: {snap.character_name} session:{snap.session_id[:8]} kills:{snap.kills} delta:{delta} {pool_status}")
|
||
|
||
try:
|
||
async with database.transaction():
|
||
await database.execute(
|
||
telemetry_events.insert().values(**db_data)
|
||
)
|
||
if delta > 0:
|
||
stmt = pg_insert(char_stats).values(
|
||
character_name=snap.character_name,
|
||
total_kills=delta
|
||
).on_conflict_do_update(
|
||
index_elements=["character_name"],
|
||
set_={"total_kills": char_stats.c.total_kills + delta},
|
||
)
|
||
await database.execute(stmt)
|
||
logger.debug(f"Updated kills for {snap.character_name}: +{delta} (total from {last} to {snap.kills})")
|
||
|
||
# Success: log timing and update cache
|
||
db_duration = (time.time() - db_start_time) * 1000
|
||
ws_receive_snapshots._last_kills[key] = snap.kills
|
||
|
||
# Track database performance (Phase 2)
|
||
global _total_queries, _total_query_time
|
||
_total_queries += 1
|
||
_total_query_time += db_duration / 1000.0 # Convert ms to seconds
|
||
|
||
# Track recent activity (Phase 3)
|
||
global _recent_telemetry_messages, _max_recent_messages
|
||
activity_entry = {
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"character_name": snap.character_name,
|
||
"kills": snap.kills,
|
||
"kill_delta": delta,
|
||
"query_time": round(db_duration, 1)
|
||
}
|
||
_recent_telemetry_messages.append(activity_entry)
|
||
if len(_recent_telemetry_messages) > _max_recent_messages:
|
||
_recent_telemetry_messages.pop(0)
|
||
|
||
|
||
# Log final pool status after successful operation
|
||
try:
|
||
final_pool_status = f"pool_size:{database._pool._queue.qsize()}" if hasattr(database, '_pool') and hasattr(database._pool, '_queue') else "pool_status:unknown"
|
||
except:
|
||
final_pool_status = "pool_status:error"
|
||
|
||
logger.info(f"✅ TELEMETRY_DB_WRITE_SUCCESS: {snap.character_name} took {db_duration:.1f}ms {final_pool_status}")
|
||
|
||
except Exception as db_error:
|
||
db_duration = (time.time() - db_start_time) * 1000
|
||
|
||
|
||
# Log pool status during failure
|
||
try:
|
||
error_pool_status = f"pool_size:{database._pool._queue.qsize()}" if hasattr(database, '_pool') and hasattr(database._pool, '_queue') else "pool_status:unknown"
|
||
except:
|
||
error_pool_status = "pool_status:error"
|
||
|
||
logger.error(f"❌ TELEMETRY_DB_WRITE_FAILED: {snap.character_name} session:{snap.session_id[:8]} took {db_duration:.1f}ms {error_pool_status} error:{db_error}", exc_info=True)
|
||
continue
|
||
# Broadcast updated snapshot to all browser clients
|
||
await _broadcast_to_browser_clients(snap.dict())
|
||
|
||
# Log successful processing completion with timing
|
||
total_duration = (time.time() - telemetry_start_time) * 1000
|
||
logger.info(f"⏱️ TELEMETRY_PROCESSING_COMPLETE: {snap.character_name} took {total_duration:.1f}ms total")
|
||
|
||
except Exception as e:
|
||
total_duration = (time.time() - telemetry_start_time) * 1000
|
||
logger.error(f"❌ TELEMETRY_PROCESSING_FAILED: {character_name} took {total_duration:.1f}ms error:{e}", exc_info=True)
|
||
continue
|
||
# --- Rare event: update total and session counters and persist ---
|
||
if msg_type == "rare":
|
||
name = data.get("character_name")
|
||
if isinstance(name, str) and name.strip():
|
||
try:
|
||
# Total rare count per character
|
||
stmt_tot = pg_insert(rare_stats).values(
|
||
character_name=name,
|
||
total_rares=1
|
||
).on_conflict_do_update(
|
||
index_elements=["character_name"],
|
||
set_={"total_rares": rare_stats.c.total_rares + 1},
|
||
)
|
||
await database.execute(stmt_tot)
|
||
# Session-specific rare count (use live cache or fallback to latest telemetry)
|
||
session_id = live_snapshots.get(name, {}).get("session_id")
|
||
if not session_id:
|
||
row = await database.fetch_one(
|
||
"SELECT session_id FROM telemetry_events"
|
||
" WHERE character_name = :name"
|
||
" ORDER BY timestamp DESC LIMIT 1",
|
||
{"name": name}
|
||
)
|
||
if row:
|
||
session_id = row["session_id"]
|
||
if session_id:
|
||
stmt_sess = pg_insert(rare_stats_sessions).values(
|
||
character_name=name,
|
||
session_id=session_id,
|
||
session_rares=1
|
||
).on_conflict_do_update(
|
||
index_elements=["character_name", "session_id"],
|
||
set_={"session_rares": rare_stats_sessions.c.session_rares + 1},
|
||
)
|
||
await database.execute(stmt_sess)
|
||
# Persist individual rare event for future analysis
|
||
payload = data.copy()
|
||
payload.pop("type", None)
|
||
try:
|
||
rare_ev = RareEvent.parse_obj(payload)
|
||
await database.execute(
|
||
rare_events.insert().values(**rare_ev.dict())
|
||
)
|
||
logger.info(f"Recorded rare event: {rare_ev.name} by {name}")
|
||
# Broadcast rare event to browser clients for epic notifications
|
||
await _broadcast_to_browser_clients(data)
|
||
except Exception as e:
|
||
logger.error(f"Failed to persist rare event: {e}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to process rare event for {name}: {e}", exc_info=True)
|
||
continue
|
||
# --- Chat message: forward chat payload to browser clients ---
|
||
if msg_type == "chat":
|
||
await _broadcast_to_browser_clients(data)
|
||
logger.debug(f"Broadcasted chat message from {data.get('character_name', 'unknown')}")
|
||
continue
|
||
# --- Full inventory message: store complete inventory snapshot ---
|
||
if msg_type == "full_inventory":
|
||
payload = data.copy()
|
||
payload.pop("type", None)
|
||
try:
|
||
inventory_msg = FullInventoryMessage.parse_obj(payload)
|
||
await _store_inventory(inventory_msg)
|
||
logger.info(f"Stored inventory for {inventory_msg.character_name}: {inventory_msg.item_count} items")
|
||
except Exception as e:
|
||
logger.error(f"Failed to process inventory for {data.get('character_name', 'unknown')}: {e}", exc_info=True)
|
||
continue
|
||
# --- Vitals message: store character health/stamina/mana and broadcast ---
|
||
if msg_type == "vitals":
|
||
payload = data.copy()
|
||
payload.pop("type", None)
|
||
try:
|
||
vitals_msg = VitalsMessage.parse_obj(payload)
|
||
live_vitals[vitals_msg.character_name] = vitals_msg.dict()
|
||
await _broadcast_to_browser_clients(data)
|
||
logger.debug(f"Updated vitals for {vitals_msg.character_name}: {vitals_msg.health_percentage}% HP, {vitals_msg.stamina_percentage}% Stam, {vitals_msg.mana_percentage}% Mana")
|
||
except Exception as e:
|
||
logger.error(f"Failed to process vitals for {data.get('character_name', 'unknown')}: {e}", exc_info=True)
|
||
continue
|
||
# --- Quest message: update cache and broadcast (no database storage) ---
|
||
if msg_type == "quest":
|
||
character_name = data.get("character_name")
|
||
quest_name = data.get("quest_name")
|
||
countdown = data.get("countdown")
|
||
|
||
if character_name and quest_name and countdown is not None:
|
||
# Only track specific quest types
|
||
allowed_quests = {
|
||
"Stipend Collection Timer",
|
||
"Blank Augmentation Gem Pickup Timer",
|
||
"Insatiable Eater Jaw"
|
||
}
|
||
|
||
if quest_name in allowed_quests:
|
||
# Update quest cache
|
||
if character_name not in _quest_status_cache:
|
||
_quest_status_cache[character_name] = {}
|
||
_quest_status_cache[character_name][quest_name] = countdown
|
||
|
||
# Broadcast to browser clients for real-time updates
|
||
await _broadcast_to_browser_clients(data)
|
||
logger.debug(f"Updated quest status for {character_name}: {quest_name} = {countdown}")
|
||
else:
|
||
logger.debug(f"Ignoring non-tracked quest: {quest_name}")
|
||
else:
|
||
logger.warning(f"Invalid quest message format from {websocket.client}: missing required fields")
|
||
continue
|
||
# --- Portal message: store in database and broadcast ---
|
||
if msg_type == "portal":
|
||
character_name = data.get("character_name")
|
||
portal_name = data.get("portal_name")
|
||
ns = data.get("ns")
|
||
ew = data.get("ew")
|
||
z = data.get("z")
|
||
timestamp_str = data.get("timestamp")
|
||
|
||
if all([character_name, portal_name, ns, ew, z, timestamp_str]):
|
||
try:
|
||
# Parse timestamp
|
||
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
|
||
|
||
# Convert coordinates to floats for database storage
|
||
ns = float(ns)
|
||
ew = float(ew)
|
||
z = float(z)
|
||
|
||
# Check if this portal was recently discovered (within last hour) to avoid duplicates
|
||
recent_check = await database.fetch_one(
|
||
"""
|
||
SELECT id FROM portal_discoveries
|
||
WHERE character_name = :character_name
|
||
AND portal_name = :portal_name
|
||
AND timestamp > :cutoff_time
|
||
LIMIT 1
|
||
""",
|
||
{
|
||
"character_name": character_name,
|
||
"portal_name": portal_name,
|
||
"cutoff_time": timestamp - timedelta(hours=1)
|
||
}
|
||
)
|
||
|
||
if not recent_check:
|
||
# Store portal discovery in database
|
||
await database.execute(
|
||
portal_discoveries.insert().values(
|
||
character_name=character_name,
|
||
portal_name=portal_name,
|
||
timestamp=timestamp,
|
||
ns=ns,
|
||
ew=ew,
|
||
z=z
|
||
)
|
||
)
|
||
logger.info(f"Recorded portal discovery: {portal_name} by {character_name} at {ns}, {ew}")
|
||
else:
|
||
logger.debug(f"Skipping duplicate portal discovery: {portal_name} by {character_name} (already discovered recently)")
|
||
|
||
# Broadcast to browser clients for map updates
|
||
await _broadcast_to_browser_clients(data)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to process portal discovery for {character_name}: {e}", exc_info=True)
|
||
else:
|
||
logger.warning(f"Invalid portal message format from {websocket.client}: missing required fields")
|
||
continue
|
||
# Unknown message types are ignored
|
||
if msg_type:
|
||
logger.warning(f"Unknown message type '{msg_type}' from {websocket.client}")
|
||
finally:
|
||
# Track plugin disconnection
|
||
_plugin_connections = max(0, _plugin_connections - 1)
|
||
|
||
# Clean up any plugin registrations for this socket
|
||
to_remove = [n for n, ws in plugin_conns.items() if ws is websocket]
|
||
for n in to_remove:
|
||
# Use pop() instead of del to avoid KeyError if already removed
|
||
plugin_conns.pop(n, None)
|
||
|
||
# Also clean up any entries in the kill tracking cache for this session
|
||
# Remove entries that might be associated with disconnected clients
|
||
stale_keys = []
|
||
for (session_id, char_name), _ in ws_receive_snapshots._last_kills.items():
|
||
if char_name in to_remove:
|
||
stale_keys.append((session_id, char_name))
|
||
for key in stale_keys:
|
||
ws_receive_snapshots._last_kills.pop(key, None)
|
||
|
||
if to_remove:
|
||
logger.info(f"Cleaned up plugin connections for characters: {to_remove} from {websocket.client}")
|
||
if stale_keys:
|
||
logger.debug(f"Cleaned up {len(stale_keys)} kill tracking cache entries")
|
||
else:
|
||
logger.debug(f"No plugin registrations to clean up for {websocket.client}")
|
||
|
||
# In-memory cache of last seen kill counts per (session_id, character_name)
|
||
# Used to compute deltas for updating persistent kill statistics efficiently
|
||
ws_receive_snapshots._last_kills = {}
|
||
|
||
async def cleanup_stale_connections():
|
||
"""Periodic cleanup of stale WebSocket connections.
|
||
|
||
This function can be called periodically to clean up connections
|
||
that may have become stale but weren't properly cleaned up.
|
||
"""
|
||
# Clean up plugin connections that no longer have valid WebSockets
|
||
stale_plugins = []
|
||
for char_name, ws in list(plugin_conns.items()):
|
||
try:
|
||
# Test if the WebSocket is still alive by checking its state
|
||
if ws.client_state.name != 'CONNECTED':
|
||
stale_plugins.append(char_name)
|
||
except Exception:
|
||
# If we can't check the state, consider it stale
|
||
stale_plugins.append(char_name)
|
||
|
||
for char_name in stale_plugins:
|
||
plugin_conns.pop(char_name, None)
|
||
logger.info(f"Cleaned up stale plugin connection: {char_name}")
|
||
|
||
# Clean up browser connections
|
||
stale_browsers = []
|
||
for ws in list(browser_conns):
|
||
try:
|
||
if ws.client_state.name != 'CONNECTED':
|
||
stale_browsers.append(ws)
|
||
except Exception:
|
||
stale_browsers.append(ws)
|
||
|
||
for ws in stale_browsers:
|
||
browser_conns.discard(ws)
|
||
|
||
if stale_browsers:
|
||
logger.info(f"Cleaned up {len(stale_browsers)} stale browser connections")
|
||
|
||
logger.debug(f"Connection health check: {len(plugin_conns)} plugins, {len(browser_conns)} browsers")
|
||
|
||
@app.websocket("/ws/live")
|
||
async def ws_live_updates(websocket: WebSocket):
|
||
"""WebSocket endpoint for browser clients to receive live updates and send commands.
|
||
|
||
Manages a set of connected browser clients; listens for incoming command messages
|
||
and forwards them to the appropriate plugin client WebSocket.
|
||
"""
|
||
global _browser_connections
|
||
# Add new browser client to the set
|
||
await websocket.accept()
|
||
browser_conns.add(websocket)
|
||
logger.info(f"Browser WebSocket connected: {websocket.client}")
|
||
|
||
# Track browser connection
|
||
_browser_connections += 1
|
||
|
||
try:
|
||
while True:
|
||
# Receive command messages from browser
|
||
try:
|
||
data = await websocket.receive_json()
|
||
# Debug: log all incoming browser WebSocket messages
|
||
logger.debug(f"Browser WebSocket RX from {websocket.client}: {data}")
|
||
except WebSocketDisconnect:
|
||
logger.info(f"Browser WebSocket disconnected: {websocket.client}")
|
||
break
|
||
# Determine command envelope format (new or legacy)
|
||
if "player_name" in data and "command" in data:
|
||
# New format: { player_name, command }
|
||
target_name = data["player_name"]
|
||
payload = data
|
||
elif data.get("type") == "command" and "character_name" in data and "text" in data:
|
||
# Legacy format: { type: 'command', character_name, text }
|
||
target_name = data.get("character_name")
|
||
payload = {"player_name": target_name, "command": data.get("text")}
|
||
else:
|
||
# Not a recognized command envelope
|
||
continue
|
||
# Forward command envelope to the appropriate plugin WebSocket
|
||
target_ws = plugin_conns.get(target_name)
|
||
if target_ws:
|
||
try:
|
||
await target_ws.send_json(payload)
|
||
logger.debug(f"Forwarded command to plugin for {target_name}: {payload}")
|
||
except (WebSocketDisconnect, RuntimeError, ConnectionAbortedError) as e:
|
||
logger.warning(f"Failed to forward command to {target_name}: {e}")
|
||
# Remove stale connection
|
||
plugin_conns.pop(target_name, None)
|
||
except Exception as e:
|
||
logger.error(f"Unexpected error forwarding command to {target_name}: {e}")
|
||
# Remove potentially corrupted connection
|
||
plugin_conns.pop(target_name, None)
|
||
else:
|
||
logger.warning(f"No plugin connection found for target character: {target_name}")
|
||
except WebSocketDisconnect:
|
||
pass
|
||
finally:
|
||
# Track browser disconnection
|
||
_browser_connections = max(0, _browser_connections - 1)
|
||
|
||
browser_conns.discard(websocket)
|
||
logger.debug(f"Removed browser WebSocket from connection pool: {websocket.client}")
|
||
|
||
|
||
## -------------------- static frontend ---------------------------
|
||
## (static mount moved to end of file, below API routes)
|
||
|
||
# list routes for convenience
|
||
logger.info("🔍 Registered HTTP API routes:")
|
||
for route in app.routes:
|
||
if isinstance(route, APIRoute):
|
||
# Log the path and allowed methods for each API route
|
||
logger.info(f"{route.path} -> {route.methods}")
|
||
# Add stats endpoint for per-character metrics
|
||
@app.get("/stats/{character_name}")
|
||
async def get_stats(character_name: str):
|
||
"""
|
||
HTTP GET endpoint to retrieve per-character metrics:
|
||
- latest_snapshot: most recent telemetry entry for the character
|
||
- total_kills: accumulated kills from char_stats
|
||
- total_rares: accumulated rares from rare_stats
|
||
Returns 404 if character has no recorded telemetry.
|
||
"""
|
||
try:
|
||
# Single optimized query with LEFT JOINs to get all data in one round trip
|
||
sql = """
|
||
WITH latest AS (
|
||
SELECT * FROM telemetry_events
|
||
WHERE character_name = :cn
|
||
ORDER BY timestamp DESC LIMIT 1
|
||
)
|
||
SELECT
|
||
l.*,
|
||
COALESCE(cs.total_kills, 0) as total_kills,
|
||
COALESCE(rs.total_rares, 0) as total_rares
|
||
FROM latest l
|
||
LEFT JOIN char_stats cs ON l.character_name = cs.character_name
|
||
LEFT JOIN rare_stats rs ON l.character_name = rs.character_name
|
||
"""
|
||
row = await database.fetch_one(sql, {"cn": character_name})
|
||
if not row:
|
||
logger.warning(f"No telemetry data found for character: {character_name}")
|
||
raise HTTPException(status_code=404, detail="Character not found")
|
||
|
||
# Extract latest snapshot data (exclude the added total_kills/total_rares)
|
||
snap_dict = {k: v for k, v in dict(row).items()
|
||
if k not in ("total_kills", "total_rares")}
|
||
|
||
result = {
|
||
"character_name": character_name,
|
||
"latest_snapshot": snap_dict,
|
||
"total_kills": row["total_kills"],
|
||
"total_rares": row["total_rares"],
|
||
}
|
||
logger.debug(f"Retrieved stats for character: {character_name} (optimized query)")
|
||
return JSONResponse(content=jsonable_encoder(result))
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Failed to get stats for character {character_name}: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
# -------------------- static frontend ---------------------------
|
||
# Custom icon handler that prioritizes clean icons over originals
|
||
from fastapi.responses import FileResponse
|
||
|
||
@app.get("/icons/{icon_filename}")
|
||
async def serve_icon(icon_filename: str):
|
||
"""Serve icons from static/icons directory"""
|
||
|
||
# Serve from static/icons directory
|
||
icon_path = Path("static/icons") / icon_filename
|
||
if icon_path.exists():
|
||
return FileResponse(icon_path, media_type="image/png")
|
||
|
||
# Icon not found
|
||
raise HTTPException(status_code=404, detail="Icon not found")
|
||
|
||
# -------------------- Inventory Service Proxy ---------------------------
|
||
|
||
@app.get("/inv/test")
|
||
async def test_inventory_route():
|
||
"""Test route to verify inventory proxy is working"""
|
||
return {"message": "Inventory proxy route is working"}
|
||
|
||
@app.api_route("/inv/{path:path}", methods=["GET", "POST"])
|
||
async def proxy_inventory_service(path: str, request: Request):
|
||
"""Proxy all inventory service requests"""
|
||
try:
|
||
inventory_service_url = os.getenv('INVENTORY_SERVICE_URL', 'http://inventory-service:8000')
|
||
logger.info(f"Proxying to inventory service: {inventory_service_url}/{path}")
|
||
|
||
# Forward the request to inventory service
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.request(
|
||
method=request.method,
|
||
url=f"{inventory_service_url}/{path}",
|
||
params=request.query_params,
|
||
headers=dict(request.headers),
|
||
content=await request.body()
|
||
)
|
||
return Response(
|
||
content=response.content,
|
||
status_code=response.status_code,
|
||
headers=dict(response.headers)
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Failed to proxy inventory request: {e}")
|
||
raise HTTPException(status_code=500, detail="Inventory service unavailable")
|
||
|
||
# Icons are now served from static/icons directory
|
||
# Serve SPA files (catch-all for frontend routes)
|
||
# Mount the single-page application frontend (static assets) at root path
|
||
app.mount("/", StaticFiles(directory="static", html=True), name="static")
|