MosswartOverlord/main.py

2311 lines
100 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
main.py - FastAPI-based telemetry server for Dereth Tracker.
This service ingests real-time position and event data from plugin clients via WebSockets,
stores telemetry and statistics in a TimescaleDB backend, and exposes HTTP and WebSocket
endpoints for browser clients to retrieve live and historical data, trails, and per-character stats.
"""
from datetime import datetime, timedelta, timezone
import json
import logging
import os
import sys
import time
from typing import Dict, List, Any
from pathlib import Path
import asyncio
import socket
import struct
from fastapi import FastAPI, Header, HTTPException, Query, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import JSONResponse, Response
from fastapi.routing import APIRoute
from fastapi.staticfiles import StaticFiles
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from typing import Optional
import httpx
# Async database support
from sqlalchemy.dialects.postgresql import insert as pg_insert
from db_async import (
database,
telemetry_events,
char_stats,
rare_stats,
rare_stats_sessions,
spawn_events,
rare_events,
character_inventories,
portals,
server_health_checks,
server_status,
init_db_async,
cleanup_old_portals
)
import asyncio
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
]
)
logger = logging.getLogger(__name__)
# Get log level from environment (DEBUG, INFO, WARNING, ERROR)
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
logger.setLevel(getattr(logging, log_level, logging.INFO))
# Inventory service configuration
INVENTORY_SERVICE_URL = os.getenv('INVENTORY_SERVICE_URL', 'http://inventory-service:8000')
# In-memory caches for REST endpoints
_cached_live: dict = {"players": []}
_cached_trails: dict = {"trails": []}
_cached_total_rares: dict = {"all_time": 0, "today": 0, "last_updated": None}
_cache_task: asyncio.Task | None = None
_rares_cache_task: asyncio.Task | None = None
_cleanup_task: asyncio.Task | None = None
# Player tracking for debug purposes
_player_history: list = [] # List of player sets from last 10 refreshes
_player_events: list = [] # List of player enter/exit events
_max_history_size = 10 # Keep last 10 player sets
_max_events_size = 100 # Keep last 100 events
# Telemetry timing tracking for debug purposes
_player_telemetry_times: dict = {} # character_name -> list of timestamps
_max_telemetry_history = 20 # Keep last 20 telemetry timestamps per player
# Simple WebSocket connection counters (Phase 1)
_plugin_connections = 0
_browser_connections = 0
# Simple database query performance counters (Phase 2)
_total_queries = 0
_total_query_time = 0.0
# Simple recent activity tracking (Phase 3)
_recent_telemetry_messages = []
_max_recent_messages = 50
# Server health monitoring
_server_health_task = None
_server_status_cache = {
"status": "unknown",
"latency_ms": None,
"player_count": None,
"last_check": None,
"uptime_seconds": 0,
"last_restart": None
}
# Quest status cache - stores last received quest data per player
# Structure: {character_name: {quest_name: countdown_value}}
_quest_status_cache: Dict[str, Dict[str, str]] = {}
# AC Hash32 checksum algorithm (based on ThwargLauncher)
def calculate_hash32(data: bytes) -> int:
"""Calculate AC Hash32 checksum as used in ThwargLauncher."""
length = len(data)
checksum = (length << 16) & 0xFFFFFFFF
# Process 4-byte chunks
for i in range(0, length - 3, 4):
chunk = struct.unpack('<I', data[i:i+4])[0]
checksum = (checksum + chunk) & 0xFFFFFFFF
# Handle remaining bytes
remaining_start = (length // 4) * 4
shift = 24
for i in range(remaining_start, length):
byte_val = data[i] << shift
checksum = (checksum + byte_val) & 0xFFFFFFFF
shift -= 8
return checksum
# Create AC EchoRequest packet for server health check (based on ThwargLauncher)
def create_echo_request_packet():
"""Create an AC EchoRequest packet for server health checking."""
# AC packet header: sequence(4) + flags(4) + checksum(4) + id(2) + time(2) + size(2) + table(2) = 20 bytes + padding
packet = bytearray(32) # 32 bytes total (0x20)
# Sequence (4 bytes) - can be 0
struct.pack_into('<I', packet, 0, 0)
# Flags (4 bytes) - EchoRequest = 0x02000000
struct.pack_into('<I', packet, 4, 0x02000000)
# Temporary checksum (4 bytes) - required for proper checksum calculation
struct.pack_into('<I', packet, 8, 0x0BADD70D)
# ID (2 bytes) - can be 0
struct.pack_into('<H', packet, 12, 0)
# Time (2 bytes) - can be 0
struct.pack_into('<H', packet, 14, 0)
# Size (2 bytes) - header size = 32 (0x20)
struct.pack_into('<H', packet, 16, 32)
# Table (2 bytes) - can be 0
struct.pack_into('<H', packet, 18, 0)
# Calculate proper AC Hash32 checksum
# First, set checksum field to 0
struct.pack_into('<I', packet, 8, 0)
# Calculate checksum using Hash32 algorithm
checksum = calculate_hash32(bytes(packet))
struct.pack_into('<I', packet, 8, checksum)
return bytes(packet)
AC_ECHO_PACKET = create_echo_request_packet()
# AC login packet for server health check (same as ThwargLauncher MakeLoginPacket)
AC_LOGIN_PACKET = bytes([
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x93, 0x00, 0xd0, 0x05,
0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x04, 0x00, 0x31, 0x38,
0x30, 0x32, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x3e, 0xb8, 0xa8, 0x58, 0x1c, 0x00, 0x61, 0x63,
0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x74, 0x72, 0x61, 0x63, 0x6b, 0x65,
0x72, 0x3a, 0x6a, 0x6a, 0x39, 0x68, 0x32, 0x36, 0x68, 0x63, 0x73, 0x67,
0x67, 0x63, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
])
async def check_server_health(address: str, port: int, timeout: float = 3.0) -> tuple[bool, float, int]:
"""Check AC server health via UDP packet with retry logic.
Retries 6 times with 5-second delays before declaring server down.
Returns: (is_up, latency_ms, player_count)
"""
max_retries = 6
retry_delay = 5.0
for attempt in range(max_retries):
logger.debug(f"🔍 Health check attempt {attempt + 1}/{max_retries} for {address}:{port}")
start_time = time.time()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(False)
try:
# Send login packet (same as ThwargLauncher)
await asyncio.get_event_loop().sock_sendto(sock, AC_LOGIN_PACKET, (address, port))
# Wait for response with timeout
try:
data, addr = await asyncio.wait_for(
asyncio.get_event_loop().sock_recvfrom(sock, 1024),
timeout=timeout
)
latency_ms = (time.time() - start_time) * 1000
logger.debug(f"📥 Received response from {addr}: {len(data)} bytes, latency: {latency_ms:.1f}ms")
# Check if valid response (support both TimeSynch 0x800000 and ConnectRequest 0x40000)
if len(data) >= 24:
flags = struct.unpack('<I', data[4:8])[0]
# Accept both TimeSynch (0x800000) and ConnectRequest (0x40000) as valid responses
if (flags & 0x800000) or (flags & 0x40000):
# UDP health check is for server status and latency only
# Player count comes from TreeStats.net API (like ThwargLauncher)
logger.debug(f"✅ Valid server response: latency: {latency_ms:.1f}ms")
return True, latency_ms, None
# Any response indicates server is up, even if not the expected format
logger.info(f"✅ Server response (non-standard format): latency: {latency_ms:.1f}ms")
return True, latency_ms, None
except asyncio.TimeoutError:
logger.debug(f"⏰ TIMEOUT: No response from {address}:{port} after {timeout}s")
if attempt < max_retries - 1:
logger.debug(f"Retrying in {retry_delay} seconds...")
await asyncio.sleep(retry_delay)
continue
except Exception as e:
logger.error(f"Server health check error on attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
continue
finally:
sock.close()
# Only declare down after all retries fail
logger.warning(f"❌ Server {address}:{port} is DOWN after {max_retries} attempts over {max_retries * retry_delay} seconds")
return False, None, None
async def get_player_count_from_treestats(server_name: str) -> int:
"""Get player count from TreeStats.net API (same as ThwargLauncher)."""
try:
async with httpx.AsyncClient() as client:
response = await client.get("http://treestats.net/player_counts-latest.json", timeout=10)
if response.status_code == 200:
data = response.json()
for server_data in data:
if server_data.get("server") == server_name:
return server_data.get("count", 0)
return 0
except Exception as e:
logger.debug(f"Failed to get player count from TreeStats.net: {e}")
return 0
async def monitor_server_health():
"""Background task to monitor server health every 30 seconds and cleanup old portals every minute."""
server_name = "Coldeve"
server_address = "play.coldeve.ac"
server_port = 9000
check_interval = 30 # seconds
player_count_interval = 300 # 5 minutes (like ThwargLauncher's 10 minutes, but more frequent)
portal_cleanup_interval = 60 # 1 minute
last_player_count_check = 0
last_portal_cleanup = 0
current_player_count = None
# Initialize server status in database
try:
existing = await database.fetch_one(
"SELECT * FROM server_status WHERE server_name = :name",
{"name": server_name}
)
if not existing:
await database.execute(
server_status.insert().values(
server_name=server_name,
current_status="unknown",
total_uptime_seconds=0
)
)
except Exception as e:
logger.error(f"Failed to initialize server status: {e}")
while True:
try:
logger.debug(f"🏥 Running scheduled health check for {server_name} ({server_address}:{server_port})")
# Check server health via UDP (for status and latency)
is_up, latency_ms, _ = await check_server_health(server_address, server_port)
status = "up" if is_up else "down"
now = datetime.now(timezone.utc)
# Get player count from TreeStats.net API (like ThwargLauncher)
current_time = time.time()
if current_time - last_player_count_check >= player_count_interval or current_player_count is None:
new_player_count = await get_player_count_from_treestats(server_name)
if new_player_count > 0: # Only update if we got a valid count
current_player_count = new_player_count
last_player_count_check = current_time
logger.info(f"🏥 Updated player count from TreeStats.net: {current_player_count}")
logger.debug(f"🏥 Health check result: {status}, latency: {latency_ms}, players: {current_player_count}")
# Record health check
await database.execute(
server_health_checks.insert().values(
server_name=server_name,
server_address=f"{server_address}:{server_port}",
timestamp=now,
status=status,
latency_ms=latency_ms,
player_count=current_player_count
)
)
# Get previous status
prev_status = await database.fetch_one(
"SELECT * FROM server_status WHERE server_name = :name",
{"name": server_name}
)
# Calculate uptime and detect restarts
last_restart = prev_status["last_restart"] if prev_status else None
if prev_status and prev_status["current_status"] == "down" and status == "up":
# Server came back up - this is a restart
last_restart = now
logger.info(f"Server {server_name} came back online")
# Broadcast to all browser clients
await _broadcast_to_browser_clients({
"type": "server_status",
"server": server_name,
"status": "up",
"message": "Server is back online"
})
# Calculate uptime from last restart time (not accumulated)
if last_restart and status == "up":
uptime_seconds = int((now - last_restart).total_seconds())
else:
uptime_seconds = 0
# Update server status (always include current_player_count if we have it)
await database.execute(
"""
INSERT INTO server_status (server_name, current_status, last_seen_up, last_restart,
total_uptime_seconds, last_check, last_latency_ms, last_player_count)
VALUES (:name, :status, :last_seen, :restart, :uptime, :check, :latency, :players)
ON CONFLICT (server_name) DO UPDATE SET
current_status = :status,
last_seen_up = CASE WHEN :status = 'up' THEN :last_seen ELSE server_status.last_seen_up END,
last_restart = :restart,
total_uptime_seconds = :uptime,
last_check = :check,
last_latency_ms = :latency,
last_player_count = CASE WHEN :players IS NOT NULL THEN :players ELSE server_status.last_player_count END
""",
{
"name": server_name,
"status": status,
"last_seen": now if status == "up" else None,
"restart": last_restart,
"uptime": uptime_seconds,
"check": now,
"latency": latency_ms,
"players": current_player_count
}
)
# Update cache
global _server_status_cache
_server_status_cache = {
"status": status,
"latency_ms": latency_ms,
"player_count": current_player_count,
"last_check": now.isoformat(),
"uptime_seconds": uptime_seconds,
"last_restart": last_restart.isoformat() if last_restart else None
}
logger.debug(f"Server health check: {status}, latency={latency_ms}ms, players={current_player_count}")
# Portal cleanup (run every minute)
current_time = time.time()
if current_time - last_portal_cleanup >= portal_cleanup_interval:
try:
deleted_count = await cleanup_old_portals()
logger.info(f"Portal cleanup: removed {deleted_count} portals older than 1 hour")
last_portal_cleanup = current_time
except Exception as cleanup_error:
logger.error(f"Portal cleanup error: {cleanup_error}", exc_info=True)
except Exception as e:
logger.error(f"Server health monitoring error: {e}", exc_info=True)
await asyncio.sleep(check_interval)
async def cleanup_connections_loop():
"""Background task to clean up stale WebSocket connections every 5 minutes."""
cleanup_interval = 300 # 5 minutes
logger.info("🧹 Starting WebSocket connection cleanup task")
while True:
try:
await asyncio.sleep(cleanup_interval)
logger.debug("🧹 Running periodic WebSocket connection cleanup")
await cleanup_stale_connections()
except Exception as e:
logger.error(f"WebSocket cleanup task error: {e}", exc_info=True)
def _track_player_changes(new_players: list) -> None:
"""Track player changes for debugging flapping issues."""
from datetime import datetime, timezone
# Get current player names
current_players = {p["character_name"] for p in new_players}
timestamp = datetime.now(timezone.utc)
# Track telemetry timing for each player
for player_data in new_players:
player_name = player_data["character_name"]
player_timestamp = player_data.get("timestamp")
# Convert timestamp if it's a string
if isinstance(player_timestamp, str):
try:
player_timestamp = datetime.fromisoformat(player_timestamp.replace('Z', '+00:00'))
except:
player_timestamp = timestamp
elif player_timestamp is None:
player_timestamp = timestamp
# Initialize player telemetry tracking if needed
if player_name not in _player_telemetry_times:
_player_telemetry_times[player_name] = []
# Add this telemetry timestamp
_player_telemetry_times[player_name].append(player_timestamp)
# Trim to max history
if len(_player_telemetry_times[player_name]) > _max_telemetry_history:
_player_telemetry_times[player_name].pop(0)
# Get previous player names if we have history
previous_players = set()
if _player_history:
previous_players = {p["character_name"] for p in _player_history[-1]["players"]}
# Find players who entered and exited
entered_players = current_players - previous_players
exited_players = previous_players - current_players
# Log events with telemetry timing analysis
for player in entered_players:
# Check if this is due to timing gap
timing_gap = None
if player in _player_telemetry_times and len(_player_telemetry_times[player]) >= 2:
last_two = _player_telemetry_times[player][-2:]
timing_gap = (last_two[1] - last_two[0]).total_seconds()
event = {
"timestamp": timestamp,
"type": "enter",
"character_name": player,
"total_players": len(current_players),
"timing_gap": timing_gap
}
_player_events.append(event)
gap_info = f" (gap: {timing_gap:.1f}s)" if timing_gap and timing_gap > 25 else ""
logger.debug(f"Player entered: {player} (total: {len(current_players)}){gap_info}")
for player in exited_players:
# Calculate time since last telemetry
last_telemetry_age = None
if player in _player_telemetry_times and _player_telemetry_times[player]:
last_telemetry = _player_telemetry_times[player][-1]
last_telemetry_age = (timestamp - last_telemetry).total_seconds()
event = {
"timestamp": timestamp,
"type": "exit",
"character_name": player,
"total_players": len(current_players),
"last_telemetry_age": last_telemetry_age
}
_player_events.append(event)
age_info = f" (last telemetry: {last_telemetry_age:.1f}s ago)" if last_telemetry_age else ""
logger.debug(f"Player exited: {player} (total: {len(current_players)}){age_info}")
# Add current state to history
history_entry = {
"timestamp": timestamp,
"players": new_players,
"player_count": len(new_players),
"player_names": list(current_players)
}
_player_history.append(history_entry)
# Trim history to max size
if len(_player_history) > _max_history_size:
_player_history.pop(0)
# Trim events to max size
if len(_player_events) > _max_events_size:
_player_events.pop(0)
def _analyze_flapping_patterns() -> dict:
"""Analyze player events to identify flapping patterns."""
from collections import Counter, defaultdict
if not _player_events:
return {"flapping_players": [], "frequent_events": [], "analysis": "No events to analyze"}
# Count events per player
player_event_counts = Counter()
player_flap_counts = defaultdict(int)
# Track recent activity per player (last 10 events)
recent_player_activity = defaultdict(list)
for event in _player_events[-50:]: # Analyze last 50 events
player = event["character_name"]
event_type = event["type"]
player_event_counts[player] += 1
recent_player_activity[player].append(event_type)
# Identify flapping players (players with many enter/exit cycles)
flapping_players = []
for player, activity in recent_player_activity.items():
if len(activity) >= 4: # At least 4 events
# Count alternating enter/exit patterns
flap_score = 0
for i in range(1, len(activity)):
if activity[i] != activity[i-1]: # Different from previous
flap_score += 1
if flap_score >= 3: # At least 3 transitions
flapping_players.append({
"character_name": player,
"events": len(activity),
"flap_score": flap_score,
"recent_activity": activity[-10:] # Last 10 events
})
# Sort by flap score
flapping_players.sort(key=lambda x: x["flap_score"], reverse=True)
# Most active players
frequent_events = [
{"character_name": player, "event_count": count}
for player, count in player_event_counts.most_common(10)
]
# Recent activity summary
recent_enters = sum(1 for e in _player_events[-20:] if e["type"] == "enter")
recent_exits = sum(1 for e in _player_events[-20:] if e["type"] == "exit")
return {
"flapping_players": flapping_players,
"frequent_events": frequent_events,
"recent_activity": {
"enters": recent_enters,
"exits": recent_exits,
"net_change": recent_enters - recent_exits
},
"analysis": f"Found {len(flapping_players)} potentially flapping players"
}
def _analyze_telemetry_timing() -> dict:
"""Analyze telemetry timing patterns for all players."""
from datetime import datetime, timezone
timing_analysis = {}
problem_players = []
for player_name, timestamps in _player_telemetry_times.items():
if len(timestamps) < 2:
continue
# Calculate intervals between telemetry messages
intervals = []
for i in range(1, len(timestamps)):
interval = (timestamps[i] - timestamps[i-1]).total_seconds()
intervals.append(interval)
if not intervals:
continue
# Calculate timing statistics
avg_interval = sum(intervals) / len(intervals)
min_interval = min(intervals)
max_interval = max(intervals)
# Count problematic intervals (>30s)
long_gaps = [i for i in intervals if i > 30]
recent_long_gaps = [i for i in intervals[-5:] if i > 30] # Last 5 intervals
# Determine if this player has timing issues
has_timing_issues = len(long_gaps) > 0 or max_interval > 35
timing_stats = {
"character_name": player_name,
"total_messages": len(timestamps),
"avg_interval": round(avg_interval, 1),
"min_interval": round(min_interval, 1),
"max_interval": round(max_interval, 1),
"long_gaps_count": len(long_gaps),
"recent_long_gaps": len(recent_long_gaps),
"last_message_age": (datetime.now(timezone.utc) - timestamps[-1]).total_seconds() if timestamps else 0,
"has_timing_issues": has_timing_issues,
"recent_intervals": [round(i, 1) for i in intervals[-5:]] # Last 5 intervals
}
timing_analysis[player_name] = timing_stats
if has_timing_issues:
problem_players.append(timing_stats)
# Sort problem players by severity (max interval)
problem_players.sort(key=lambda x: x["max_interval"], reverse=True)
return {
"all_players": timing_analysis,
"problem_players": problem_players,
"summary": {
"total_tracked_players": len(timing_analysis),
"players_with_issues": len(problem_players),
"avg_intervals": [stats["avg_interval"] for stats in timing_analysis.values()],
}
}
async def _refresh_cache_loop() -> None:
"""Background task: refresh `/live` and `/trails` caches every 5 seconds."""
consecutive_failures = 0
max_consecutive_failures = 5
while True:
try:
# Recompute live players (last 30s)
cutoff = datetime.now(timezone.utc) - ACTIVE_WINDOW
sql_live = """
SELECT sub.*,
COALESCE(rs.total_rares, 0) AS total_rares,
COALESCE(rss.session_rares, 0) AS session_rares,
COALESCE(cs.total_kills, 0) AS total_kills
FROM (
SELECT DISTINCT ON (character_name) *
FROM telemetry_events
WHERE timestamp > :cutoff
ORDER BY character_name, timestamp DESC
) sub
LEFT JOIN rare_stats rs
ON sub.character_name = rs.character_name
LEFT JOIN rare_stats_sessions rss
ON sub.character_name = rss.character_name
AND sub.session_id = rss.session_id
LEFT JOIN char_stats cs
ON sub.character_name = cs.character_name
"""
# Use a single connection for both queries to reduce connection churn
async with database.connection() as conn:
rows = await conn.fetch_all(sql_live, {"cutoff": cutoff})
new_players = [dict(r) for r in rows]
# Track player changes for debugging
_track_player_changes(new_players)
_cached_live["players"] = new_players
# Recompute trails (last 600s)
cutoff2 = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=600)
sql_trail = """
SELECT timestamp, character_name, ew, ns, z
FROM telemetry_events
WHERE timestamp >= :cutoff
ORDER BY character_name, timestamp
"""
rows2 = await conn.fetch_all(sql_trail, {"cutoff": cutoff2})
_cached_trails["trails"] = [
{"timestamp": r["timestamp"], "character_name": r["character_name"],
"ew": r["ew"], "ns": r["ns"], "z": r["z"]}
for r in rows2
]
# Reset failure counter on success
consecutive_failures = 0
logger.debug(f"Cache refreshed: {len(_cached_live['players'])} players, {len(_cached_trails['trails'])} trail points")
except Exception as e:
consecutive_failures += 1
logger.error(f"Cache refresh failed ({consecutive_failures}/{max_consecutive_failures}): {e}", exc_info=True)
# If too many consecutive failures, wait longer and try to reconnect
if consecutive_failures >= max_consecutive_failures:
logger.warning(f"Too many consecutive cache refresh failures. Attempting database reconnection...")
try:
await database.disconnect()
await asyncio.sleep(2)
await database.connect()
logger.info("Database reconnected successfully")
consecutive_failures = 0
except Exception as reconnect_error:
logger.error(f"Database reconnection failed: {reconnect_error}")
await asyncio.sleep(10) # Wait longer before retrying
continue
await asyncio.sleep(5)
async def _refresh_total_rares_cache() -> None:
"""Background task: refresh total rares cache every 5 minutes."""
consecutive_failures = 0
max_consecutive_failures = 3
while True:
try:
async with database.connection() as conn:
# Get all-time total rares (sum of all characters) - gracefully handle missing table
try:
all_time_query = "SELECT COALESCE(SUM(total_rares), 0) as total FROM rare_stats"
all_time_result = await conn.fetch_one(all_time_query)
all_time_total = all_time_result["total"] if all_time_result else 0
except Exception as e:
logger.debug(f"rare_stats table not available: {e}")
all_time_total = 0
# Get today's rares from rare_events table - gracefully handle missing table
try:
today_query = """
SELECT COUNT(*) as today_count
FROM rare_events
WHERE timestamp >= CURRENT_DATE
"""
today_result = await conn.fetch_one(today_query)
today_total = today_result["today_count"] if today_result else 0
except Exception as e:
logger.debug(f"rare_events table not available or empty: {e}")
today_total = 0
# Update cache
_cached_total_rares["all_time"] = all_time_total
_cached_total_rares["today"] = today_total
_cached_total_rares["last_updated"] = datetime.now(timezone.utc)
consecutive_failures = 0
logger.debug(f"Total rares cache updated: All-time: {all_time_total}, Today: {today_total}")
except Exception as e:
consecutive_failures += 1
logger.error(f"Total rares cache refresh failed ({consecutive_failures}/{max_consecutive_failures}): {e}", exc_info=True)
if consecutive_failures >= max_consecutive_failures:
logger.warning("Too many consecutive total rares cache failures, waiting longer...")
await asyncio.sleep(60) # Wait longer on repeated failures
continue
# Sleep for 5 minutes (300 seconds)
await asyncio.sleep(300)
# ------------------------------------------------------------------
app = FastAPI()
# In-memory store mapping character_name to the most recent telemetry snapshot
live_snapshots: Dict[str, dict] = {}
live_vitals: Dict[str, dict] = {}
# Shared secret used to authenticate plugin WebSocket connections (override for production)
SHARED_SECRET = "your_shared_secret"
# LOG_FILE = "telemetry_log.jsonl"
# ------------------------------------------------------------------
ACTIVE_WINDOW = timedelta(seconds=30) # Time window defining “online” players (last 30 seconds)
"""
Data models for plugin events:
- TelemetrySnapshot: periodic telemetry data from a player client
- SpawnEvent: information about a mob spawn event
- RareEvent: details of a rare mob event
"""
class TelemetrySnapshot(BaseModel):
character_name: str
char_tag: Optional[str] = None
session_id: str
timestamp: datetime
ew: float # +E / W
ns: float # +N / S
z: float
kills: int
kills_per_hour: Optional[float] = None
onlinetime: Optional[str] = None
deaths: int
total_deaths: Optional[int] = None
# Removed from telemetry payload; always enforced to 0 and tracked via rare events
rares_found: Optional[int] = 0
prismatic_taper_count: int
vt_state: str
# Optional telemetry metrics
mem_mb: Optional[float] = None
cpu_pct: Optional[float] = None
mem_handles: Optional[int] = None
latency_ms: Optional[float] = None
class SpawnEvent(BaseModel):
"""
Model for a spawn event emitted by plugin clients when a mob appears.
Records character context, mob type, timestamp, and spawn location.
"""
character_name: str
mob: str
timestamp: datetime
ew: float
ns: float
z: float = 0.0
class RareEvent(BaseModel):
"""
Model for a rare mob event when a player encounters or discovers a rare entity.
Includes character, event name, timestamp, and location coordinates.
"""
character_name: str
name: str
timestamp: datetime
ew: float
ns: float
z: float = 0.0
class FullInventoryMessage(BaseModel):
"""
Model for the full_inventory WebSocket message type.
Contains complete character inventory snapshot with raw item data.
"""
character_name: str
timestamp: datetime
item_count: int
items: List[Dict[str, Any]]
class VitalsMessage(BaseModel):
"""
Model for the vitals WebSocket message type.
Contains character health, stamina, mana, and vitae information.
"""
character_name: str
timestamp: datetime
health_current: int
health_max: int
health_percentage: float
stamina_current: int
stamina_max: int
stamina_percentage: float
mana_current: int
mana_max: int
mana_percentage: float
vitae: int
@app.on_event("startup")
async def on_startup():
"""Event handler triggered when application starts up.
Attempts to connect to the database with retry logic to accommodate
potential startup delays (e.g., waiting for Postgres to be ready).
"""
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
await database.connect()
await init_db_async()
logger.info(f"Database connected successfully on attempt {attempt}")
# Log connection pool configuration
try:
logger.info(f"Database connection established with pool configuration")
except Exception as pool_error:
logger.debug(f"Could not access pool details: {pool_error}")
break
except Exception as e:
logger.warning(f"Database connection failed (attempt {attempt}/{max_attempts}): {e}")
if attempt < max_attempts:
await asyncio.sleep(5)
else:
raise RuntimeError(f"Could not connect to database after {max_attempts} attempts")
# Start background cache refresh (live & trails)
global _cache_task, _rares_cache_task, _server_health_task, _cleanup_task
_cache_task = asyncio.create_task(_refresh_cache_loop())
_rares_cache_task = asyncio.create_task(_refresh_total_rares_cache())
_server_health_task = asyncio.create_task(monitor_server_health())
_cleanup_task = asyncio.create_task(cleanup_connections_loop())
logger.info("Background cache refresh, server monitoring, and connection cleanup tasks started")
@app.on_event("shutdown")
async def on_shutdown():
"""Event handler triggered when application is shutting down.
Ensures the database connection is closed cleanly.
"""
# Stop cache refresh tasks
global _cache_task, _rares_cache_task, _server_health_task, _cleanup_task
if _cache_task:
logger.info("Stopping background cache refresh task")
_cache_task.cancel()
try:
await _cache_task
except asyncio.CancelledError:
pass
if _rares_cache_task:
logger.info("Stopping total rares cache refresh task")
_rares_cache_task.cancel()
try:
await _rares_cache_task
except asyncio.CancelledError:
pass
if _server_health_task:
logger.info("Stopping server health monitoring task")
_server_health_task.cancel()
try:
await _server_health_task
except asyncio.CancelledError:
pass
if _cleanup_task:
logger.info("Stopping WebSocket connection cleanup task")
_cleanup_task.cancel()
try:
await _cleanup_task
except asyncio.CancelledError:
pass
logger.info("Disconnecting from database")
await database.disconnect()
# ------------------------ GET -----------------------------------
@app.get("/debug")
def debug():
return {"status": "OK"}
@app.get("/debug/player-flapping")
async def get_player_flapping_debug():
"""Return player tracking data for debugging flapping issues."""
try:
# Analyze flapping patterns
flapping_analysis = _analyze_flapping_patterns()
# Analyze telemetry timing
timing_analysis = _analyze_telemetry_timing()
# Get recent events (last 50)
recent_events = _player_events[-50:] if len(_player_events) > 50 else _player_events
# Convert timestamps to ISO format for JSON serialization
formatted_events = []
for event in recent_events:
formatted_event = event.copy()
formatted_event["timestamp"] = event["timestamp"].isoformat()
formatted_events.append(formatted_event)
# Format history
formatted_history = []
for entry in _player_history:
formatted_entry = {
"timestamp": entry["timestamp"].isoformat(),
"player_count": entry["player_count"],
"player_names": entry["player_names"]
}
formatted_history.append(formatted_entry)
# Format timing data for JSON serialization
formatted_timing = {}
for player_name, timing_data in timing_analysis["all_players"].items():
formatted_timing[player_name] = timing_data.copy()
# Round last_message_age for readability
formatted_timing[player_name]["last_message_age"] = round(timing_data["last_message_age"], 1)
return {
"current_players": len(_cached_live.get("players", [])),
"history": formatted_history,
"recent_events": formatted_events,
"flapping_analysis": flapping_analysis,
"timing_analysis": {
"all_players": formatted_timing,
"problem_players": timing_analysis["problem_players"],
"summary": timing_analysis["summary"]
},
"tracking_stats": {
"history_entries": len(_player_history),
"total_events": len(_player_events),
"tracked_players": len(_player_telemetry_times),
"max_history_size": _max_history_size,
"max_events_size": _max_events_size
}
}
except Exception as e:
logger.error(f"Failed to get player flapping debug data: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/debug/websocket-health")
async def get_websocket_health():
"""Return simple WebSocket connection counts."""
try:
return {
"plugin_connections": _plugin_connections,
"browser_connections": _browser_connections,
"total_connections": _plugin_connections + _browser_connections
}
except Exception as e:
logger.error(f"Failed to get WebSocket health data: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/debug/database-performance")
async def get_database_performance():
"""Return simple database query performance statistics."""
try:
avg_query_time = (_total_query_time / _total_queries) if _total_queries > 0 else 0.0
return {
"total_queries": _total_queries,
"total_query_time": round(_total_query_time, 3),
"average_query_time": round(avg_query_time, 3)
}
except Exception as e:
logger.error(f"Failed to get database performance data: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/debug/recent-activity")
async def get_recent_activity():
"""Return recent telemetry activity feed."""
try:
return {
"recent_messages": _recent_telemetry_messages.copy(),
"total_messages": len(_recent_telemetry_messages),
"max_messages": _max_recent_messages
}
except Exception as e:
logger.error(f"Failed to get recent activity data: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/server-health")
async def get_server_health():
"""Return current server health status."""
try:
# Get latest status from database if cache is stale
if not _server_status_cache.get("last_check") or \
(datetime.now(timezone.utc) - datetime.fromisoformat(_server_status_cache["last_check"].replace('Z', '+00:00')) > timedelta(minutes=2)):
row = await database.fetch_one(
"SELECT * FROM server_status WHERE server_name = :name",
{"name": "Coldeve"}
)
if row:
_server_status_cache.update({
"status": row["current_status"],
"latency_ms": row["last_latency_ms"],
"player_count": row["last_player_count"],
"last_check": row["last_check"].isoformat() if row["last_check"] else None,
"uptime_seconds": row["total_uptime_seconds"],
"last_restart": row["last_restart"].isoformat() if row["last_restart"] else None
})
# Format uptime
uptime_seconds = _server_status_cache.get("uptime_seconds", 0)
days = uptime_seconds // 86400
hours = (uptime_seconds % 86400) // 3600
minutes = (uptime_seconds % 3600) // 60
uptime_str = f"{days}d {hours}h {minutes}m" if days > 0 else f"{hours}h {minutes}m"
return {
"server_name": "Coldeve",
"status": _server_status_cache.get("status", "unknown"),
"latency_ms": _server_status_cache.get("latency_ms"),
"player_count": _server_status_cache.get("player_count"),
"uptime": uptime_str,
"uptime_seconds": uptime_seconds,
"last_restart": _server_status_cache.get("last_restart"),
"last_check": _server_status_cache.get("last_check")
}
except Exception as e:
logger.error(f"Failed to get server health data: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/quest-status")
async def get_quest_status():
"""Return current cached quest status for all players."""
try:
# Return the quest cache with structured data
return {
"quest_data": _quest_status_cache,
"tracked_quests": [
"Stipend Collection Timer",
"Blank Augmentation Gem Pickup Timer",
"Insatiable Eater Jaw"
],
"player_count": len(_quest_status_cache)
}
except Exception as e:
logger.error(f"Failed to get quest status data: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/portals")
async def get_portals():
"""Return all active portals (less than 1 hour old)."""
try:
# No need for cutoff check - cleanup job handles expiration
query = """
SELECT portal_name, ns, ew, z, discovered_at, discovered_by
FROM portals
ORDER BY discovered_at DESC
"""
rows = await database.fetch_all(query)
portals = []
for row in rows:
portal = {
"portal_name": row["portal_name"],
"coordinates": {
"ns": row["ns"],
"ew": row["ew"],
"z": row["z"]
},
"discovered_at": row["discovered_at"].isoformat(),
"discovered_by": row["discovered_by"]
}
portals.append(portal)
return {
"portals": portals,
"portal_count": len(portals)
}
except Exception as e:
logger.error(f"Failed to get portals data: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/live", response_model=dict)
@app.get("/live/", response_model=dict)
async def get_live_players():
"""Return cached live telemetry per character."""
try:
return JSONResponse(content=jsonable_encoder(_cached_live))
except Exception as e:
logger.error(f"Failed to get live players: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
# --- GET Trails ---------------------------------
@app.get("/trails")
@app.get("/trails/")
async def get_trails(
seconds: int = Query(600, ge=0, description="Lookback window in seconds"),
):
"""Return cached trails (updated every 5 seconds)."""
try:
return JSONResponse(content=jsonable_encoder(_cached_trails))
except Exception as e:
logger.error(f"Failed to get trails: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/total-rares")
@app.get("/total-rares/")
async def get_total_rares():
"""Return cached total rares statistics (updated every 5 minutes)."""
try:
return JSONResponse(content=jsonable_encoder(_cached_total_rares))
except Exception as e:
logger.error(f"Failed to get total rares: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
# --- GET Spawn Heat Map Endpoint ---------------------------------
@app.get("/spawns/heatmap")
async def get_spawn_heatmap_data(
hours: int = Query(24, ge=1, le=168, description="Lookback window in hours (1-168)"),
limit: int = Query(10000, ge=100, le=50000, description="Maximum number of spawn points to return")
):
"""
Aggregate spawn locations for heat-map visualization.
Returns spawn event coordinates grouped by location with intensity counts
for the specified time window.
Response format:
{
"spawn_points": [{"ew": float, "ns": float, "intensity": int}, ...],
"total_points": int,
"timestamp": "UTC-ISO"
}
"""
try:
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
# Aggregate spawn events by coordinates within time window
query = """
SELECT ew, ns, COUNT(*) AS spawn_count
FROM spawn_events
WHERE timestamp >= :cutoff
GROUP BY ew, ns
ORDER BY spawn_count DESC
LIMIT :limit
"""
rows = await database.fetch_all(query, {"cutoff": cutoff, "limit": limit})
spawn_points = [
{
"ew": float(row["ew"]),
"ns": float(row["ns"]),
"intensity": int(row["spawn_count"])
}
for row in rows
]
result = {
"spawn_points": spawn_points,
"total_points": len(spawn_points),
"timestamp": datetime.now(timezone.utc).isoformat(),
"hours_window": hours
}
logger.debug(f"Heat map data: {len(spawn_points)} unique spawn locations from last {hours} hours")
return JSONResponse(content=jsonable_encoder(result))
except Exception as e:
logger.error(f"Heat map query failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Spawn heat map query failed")
# --- GET Inventory Endpoints ---------------------------------
@app.get("/inventory/{character_name}")
async def get_character_inventory(character_name: str):
"""Get the complete inventory for a specific character - inventory service only."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{INVENTORY_SERVICE_URL}/inventory/{character_name}"
)
if response.status_code == 200:
return JSONResponse(content=response.json())
elif response.status_code == 404:
raise HTTPException(status_code=404, detail=f"No inventory found for character '{character_name}'")
else:
logger.error(f"Inventory service returned {response.status_code} for {character_name}")
raise HTTPException(status_code=502, detail="Inventory service error")
except httpx.RequestError as e:
logger.error(f"Could not reach inventory service: {e}")
raise HTTPException(status_code=503, detail="Inventory service unavailable")
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get inventory for {character_name}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/inventory/{character_name}/search")
async def search_character_inventory(
character_name: str,
name: str = Query(None, description="Search by item name (partial match)"),
object_class: int = Query(None, description="Filter by ObjectClass"),
min_value: int = Query(None, description="Minimum item value"),
max_value: int = Query(None, description="Maximum item value"),
min_burden: int = Query(None, description="Minimum burden"),
max_burden: int = Query(None, description="Maximum burden")
):
"""Search and filter inventory items for a character with various criteria."""
try:
conditions = ["character_name = :character_name"]
params = {"character_name": character_name}
if name:
conditions.append("name ILIKE :name")
params["name"] = f"%{name}%"
if object_class is not None:
conditions.append("object_class = :object_class")
params["object_class"] = object_class
if min_value is not None:
conditions.append("value >= :min_value")
params["min_value"] = min_value
if max_value is not None:
conditions.append("value <= :max_value")
params["max_value"] = max_value
if min_burden is not None:
conditions.append("burden >= :min_burden")
params["min_burden"] = min_burden
if max_burden is not None:
conditions.append("burden <= :max_burden")
params["max_burden"] = max_burden
query = f"""
SELECT name, icon, object_class, value, burden, has_id_data, item_data, timestamp
FROM character_inventories
WHERE {' AND '.join(conditions)}
ORDER BY value DESC, name
"""
rows = await database.fetch_all(query, params)
items = []
for row in rows:
item = dict(row)
items.append(item)
return JSONResponse(content=jsonable_encoder({
"character_name": character_name,
"item_count": len(items),
"search_criteria": {
"name": name,
"object_class": object_class,
"min_value": min_value,
"max_value": max_value,
"min_burden": min_burden,
"max_burden": max_burden
},
"items": items
}))
except Exception as e:
logger.error(f"Failed to search inventory for {character_name}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/inventories")
async def list_characters_with_inventories():
"""List all characters that have stored inventories with item counts."""
try:
query = """
SELECT character_name, COUNT(*) as item_count, MAX(timestamp) as last_updated
FROM character_inventories
GROUP BY character_name
ORDER BY last_updated DESC
"""
rows = await database.fetch_all(query)
characters = []
for row in rows:
characters.append({
"character_name": row["character_name"],
"item_count": row["item_count"],
"last_updated": row["last_updated"]
})
return JSONResponse(content=jsonable_encoder({
"characters": characters,
"total_characters": len(characters)
}))
except Exception as e:
logger.error(f"Failed to list inventory characters: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
# --- Inventory Service Character List Proxy ---------------------
@app.get("/inventory-characters")
async def get_inventory_characters():
"""Get character list from inventory service - proxy to avoid routing conflicts."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{INVENTORY_SERVICE_URL}/characters/list")
if response.status_code == 200:
return JSONResponse(content=response.json())
else:
logger.error(f"Inventory service returned {response.status_code}: {response.text}")
raise HTTPException(status_code=response.status_code, detail="Failed to get characters from inventory service")
except Exception as e:
logger.error(f"Failed to proxy inventory characters request: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to get inventory characters")
# --- Inventory Search Service Proxy Endpoints -------------------
@app.get("/search/items")
async def search_items_proxy(
text: str = Query(None, description="Search item names, descriptions, or properties"),
character: str = Query(None, description="Limit search to specific character"),
include_all_characters: bool = Query(False, description="Search across all characters"),
equipment_status: str = Query(None, description="equipped, unequipped, or all"),
equipment_slot: int = Query(None, description="Equipment slot mask"),
# Item category filtering
armor_only: bool = Query(False, description="Show only armor items"),
jewelry_only: bool = Query(False, description="Show only jewelry items"),
weapon_only: bool = Query(False, description="Show only weapon items"),
# Spell filtering
has_spell: str = Query(None, description="Must have this specific spell (by name)"),
spell_contains: str = Query(None, description="Spell name contains this text"),
legendary_cantrips: str = Query(None, description="Comma-separated list of legendary cantrip names"),
# Combat properties
min_damage: int = Query(None, description="Minimum damage"),
max_damage: int = Query(None, description="Maximum damage"),
min_armor: int = Query(None, description="Minimum armor level"),
max_armor: int = Query(None, description="Maximum armor level"),
min_attack_bonus: float = Query(None, description="Minimum attack bonus"),
min_crit_damage_rating: int = Query(None, description="Minimum critical damage rating"),
min_damage_rating: int = Query(None, description="Minimum damage rating"),
min_heal_boost_rating: int = Query(None, description="Minimum heal boost rating"),
max_level: int = Query(None, description="Maximum wield level requirement"),
min_level: int = Query(None, description="Minimum wield level requirement"),
material: str = Query(None, description="Material type (partial match)"),
min_workmanship: float = Query(None, description="Minimum workmanship"),
has_imbue: bool = Query(None, description="Has imbue effects"),
item_set: str = Query(None, description="Item set name (partial match)"),
min_tinks: int = Query(None, description="Minimum tinker count"),
bonded: bool = Query(None, description="Bonded status"),
attuned: bool = Query(None, description="Attuned status"),
unique: bool = Query(None, description="Unique item status"),
is_rare: bool = Query(None, description="Rare item status"),
min_condition: int = Query(None, description="Minimum condition percentage"),
min_value: int = Query(None, description="Minimum item value"),
max_value: int = Query(None, description="Maximum item value"),
max_burden: int = Query(None, description="Maximum burden"),
sort_by: str = Query("name", description="Sort field: name, value, damage, armor, workmanship"),
sort_dir: str = Query("asc", description="Sort direction: asc or desc"),
page: int = Query(1, ge=1, description="Page number"),
limit: int = Query(50, ge=1, le=200, description="Items per page")
):
"""Proxy to inventory service comprehensive item search."""
try:
# Build query parameters
params = {}
if text: params["text"] = text
if character: params["character"] = character
if include_all_characters: params["include_all_characters"] = include_all_characters
if equipment_status: params["equipment_status"] = equipment_status
if equipment_slot is not None: params["equipment_slot"] = equipment_slot
# Category filtering
if armor_only: params["armor_only"] = armor_only
if jewelry_only: params["jewelry_only"] = jewelry_only
if weapon_only: params["weapon_only"] = weapon_only
# Spell filtering
if has_spell: params["has_spell"] = has_spell
if spell_contains: params["spell_contains"] = spell_contains
if legendary_cantrips: params["legendary_cantrips"] = legendary_cantrips
# Combat properties
if min_damage is not None: params["min_damage"] = min_damage
if max_damage is not None: params["max_damage"] = max_damage
if min_armor is not None: params["min_armor"] = min_armor
if max_armor is not None: params["max_armor"] = max_armor
if min_attack_bonus is not None: params["min_attack_bonus"] = min_attack_bonus
if min_crit_damage_rating is not None: params["min_crit_damage_rating"] = min_crit_damage_rating
if min_damage_rating is not None: params["min_damage_rating"] = min_damage_rating
if min_heal_boost_rating is not None: params["min_heal_boost_rating"] = min_heal_boost_rating
if max_level is not None: params["max_level"] = max_level
if min_level is not None: params["min_level"] = min_level
if material: params["material"] = material
if min_workmanship is not None: params["min_workmanship"] = min_workmanship
if has_imbue is not None: params["has_imbue"] = has_imbue
if item_set: params["item_set"] = item_set
if min_tinks is not None: params["min_tinks"] = min_tinks
if bonded is not None: params["bonded"] = bonded
if attuned is not None: params["attuned"] = attuned
if unique is not None: params["unique"] = unique
if is_rare is not None: params["is_rare"] = is_rare
if min_condition is not None: params["min_condition"] = min_condition
if min_value is not None: params["min_value"] = min_value
if max_value is not None: params["max_value"] = max_value
if max_burden is not None: params["max_burden"] = max_burden
params["sort_by"] = sort_by
params["sort_dir"] = sort_dir
params["page"] = page
params["limit"] = limit
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{INVENTORY_SERVICE_URL}/search/items",
params=params
)
if response.status_code == 200:
return JSONResponse(content=response.json())
else:
logger.error(f"Inventory search service returned {response.status_code}")
raise HTTPException(status_code=response.status_code, detail="Inventory search service error")
except httpx.RequestError as e:
logger.error(f"Could not reach inventory service: {e}")
raise HTTPException(status_code=503, detail="Inventory service unavailable")
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to search items: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/search/equipped/{character_name}")
async def search_equipped_items_proxy(
character_name: str,
slot: int = Query(None, description="Specific equipment slot mask")
):
"""Proxy to inventory service equipped items search."""
try:
params = {}
if slot is not None:
params["slot"] = slot
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{INVENTORY_SERVICE_URL}/search/equipped/{character_name}",
params=params
)
if response.status_code == 200:
return JSONResponse(content=response.json())
elif response.status_code == 404:
raise HTTPException(status_code=404, detail=f"No equipped items found for character '{character_name}'")
else:
logger.error(f"Inventory service returned {response.status_code} for equipped items search")
raise HTTPException(status_code=response.status_code, detail="Inventory service error")
except httpx.RequestError as e:
logger.error(f"Could not reach inventory service: {e}")
raise HTTPException(status_code=503, detail="Inventory service unavailable")
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to search equipped items for {character_name}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/search/upgrades/{character_name}/{slot}")
async def find_equipment_upgrades_proxy(
character_name: str,
slot: int,
upgrade_type: str = Query("damage", description="What to optimize for: damage, armor, workmanship, value")
):
"""Proxy to inventory service equipment upgrades search."""
try:
params = {"upgrade_type": upgrade_type}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{INVENTORY_SERVICE_URL}/search/upgrades/{character_name}/{slot}",
params=params
)
if response.status_code == 200:
return JSONResponse(content=response.json())
elif response.status_code == 404:
raise HTTPException(status_code=404, detail=f"No upgrade options found for character '{character_name}' slot {slot}")
else:
logger.error(f"Inventory service returned {response.status_code} for upgrades search")
raise HTTPException(status_code=response.status_code, detail="Inventory service error")
except httpx.RequestError as e:
logger.error(f"Could not reach inventory service: {e}")
raise HTTPException(status_code=503, detail="Inventory service unavailable")
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to find equipment upgrades for {character_name} slot {slot}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/sets/list")
async def list_equipment_sets_proxy():
"""Proxy to inventory service equipment sets list."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{INVENTORY_SERVICE_URL}/sets/list")
if response.status_code == 200:
return JSONResponse(content=response.json())
else:
logger.error(f"Inventory service returned {response.status_code} for sets list")
raise HTTPException(status_code=response.status_code, detail="Inventory service error")
except httpx.RequestError as e:
logger.error(f"Could not reach inventory service: {e}")
raise HTTPException(status_code=503, detail="Inventory service unavailable")
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to list equipment sets: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
# -------------------- WebSocket endpoints -----------------------
## WebSocket connection tracking
# Set of browser WebSocket clients subscribed to live updates
browser_conns: set[WebSocket] = set()
# Mapping of plugin clients by character_name to their WebSocket for command forwarding
plugin_conns: Dict[str, WebSocket] = {}
async def _broadcast_to_browser_clients(snapshot: dict):
"""Broadcast a telemetry or chat message to all connected browser clients.
Converts any non-serializable types (e.g., datetime) before sending.
Handles connection errors gracefully and removes stale connections.
"""
# Convert snapshot payload to JSON-friendly types
data = jsonable_encoder(snapshot)
# Use list() to avoid "set changed size during iteration" errors
disconnected_clients = []
for ws in list(browser_conns):
try:
await ws.send_json(data)
except (WebSocketDisconnect, RuntimeError, ConnectionAbortedError) as e:
# Collect disconnected clients for cleanup
disconnected_clients.append(ws)
logger.debug(f"Detected disconnected browser client: {e}")
except Exception as e:
# Handle any other unexpected errors
disconnected_clients.append(ws)
logger.warning(f"Unexpected error broadcasting to browser client: {e}")
# Clean up disconnected clients
for ws in disconnected_clients:
browser_conns.discard(ws)
async def _forward_to_inventory_service(inventory_msg: FullInventoryMessage):
"""Forward inventory data to the inventory microservice for processing."""
try:
# Prepare data for inventory service
inventory_data = {
"character_name": inventory_msg.character_name,
"timestamp": inventory_msg.timestamp.isoformat(),
"items": inventory_msg.items
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{INVENTORY_SERVICE_URL}/process-inventory",
json=inventory_data
)
if response.status_code == 200:
result = response.json()
logger.info(f"Inventory service processed {result['processed']} items for {inventory_msg.character_name}")
else:
logger.error(f"Inventory service error {response.status_code}: {response.text}")
except Exception as e:
logger.error(f"Failed to forward inventory to service: {e}")
# Don't raise - this shouldn't block the main storage
async def _store_inventory(inventory_msg: FullInventoryMessage):
"""Forward inventory data to inventory microservice for processing and storage."""
try:
# Forward to inventory microservice for enhanced processing and storage
await _forward_to_inventory_service(inventory_msg)
# Optional: Create JSON file for debugging (can be removed in production)
inventory_dir = Path("./inventory")
inventory_dir.mkdir(exist_ok=True)
file_path = inventory_dir / f"{inventory_msg.character_name}_inventory.json"
inventory_data = {
"character_name": inventory_msg.character_name,
"timestamp": inventory_msg.timestamp.isoformat(),
"item_count": inventory_msg.item_count,
"items": inventory_msg.items
}
with open(file_path, 'w') as f:
json.dump(inventory_data, f, indent=2)
except Exception as e:
logger.error(f"Failed to forward inventory for {inventory_msg.character_name}: {e}", exc_info=True)
raise
@app.websocket("/ws/position")
async def ws_receive_snapshots(
websocket: WebSocket,
secret: str | None = Query(None),
x_plugin_secret: str | None = Header(None)
):
"""WebSocket endpoint for plugin clients to send telemetry and events.
Validates a shared secret for authentication, then listens for messages of
various types (register, spawn, telemetry, rare, chat) and handles each:
- register: record plugin WebSocket for command forwarding
- spawn: persist spawn event
- telemetry: store snapshot, update stats, broadcast to browsers
- rare: update total and session rare counts, persist event
- chat: broadcast chat messages to browsers
"""
global _plugin_connections
# Authenticate plugin connection using shared secret
key = secret or x_plugin_secret
if key != SHARED_SECRET:
# Reject without completing the WebSocket handshake
logger.warning(f"Plugin WebSocket authentication failed from {websocket.client}")
await websocket.close(code=1008)
return
# Accept the WebSocket connection
await websocket.accept()
logger.info(f"🔌 PLUGIN_CONNECTED: {websocket.client}")
# Track plugin connection
_plugin_connections += 1
try:
while True:
# Read next text frame
try:
raw = await websocket.receive_text()
# Debug: log all incoming plugin WebSocket messages
logger.debug(f"Plugin WebSocket RX from {websocket.client}: {raw}")
except WebSocketDisconnect:
logger.info(f"🔌 PLUGIN_DISCONNECTED: {websocket.client}")
break
# Parse JSON payload
try:
data = json.loads(raw)
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON from plugin {websocket.client}: {e}")
continue
msg_type = data.get("type")
# --- Registration: associate character_name with this plugin socket ---
if msg_type == "register":
name = data.get("character_name") or data.get("player_name")
if isinstance(name, str):
plugin_conns[name] = websocket
logger.info(f"📋 PLUGIN_REGISTERED: {name} from {websocket.client}")
continue
# --- Spawn event: persist to spawn_events table ---
if msg_type == "spawn":
payload = data.copy()
payload.pop("type", None)
try:
spawn = SpawnEvent.parse_obj(payload)
await database.execute(
spawn_events.insert().values(**spawn.dict())
)
logger.debug(f"Recorded spawn event: {spawn.mob} by {spawn.character_name}")
except Exception as e:
logger.error(f"Failed to process spawn event: {e}")
continue
continue
# --- Telemetry message: persist snapshot and update kill stats ---
if msg_type == "telemetry":
# Parse telemetry snapshot and update in-memory state
payload = data.copy()
payload.pop("type", None)
character_name = payload.get('character_name', 'unknown')
# Track message receipt and start timing
telemetry_start_time = time.time()
logger.info(f"📨 TELEMETRY_RECEIVED: {character_name} from {websocket.client}")
try:
snap = TelemetrySnapshot.parse_obj(payload)
live_snapshots[snap.character_name] = snap.dict()
# Prepare data and compute kill delta
db_data = snap.dict()
db_data['rares_found'] = 0
key = (snap.session_id, snap.character_name)
# Get last recorded kill count for this session
if key in ws_receive_snapshots._last_kills:
last = ws_receive_snapshots._last_kills[key]
else:
# Cache miss - check database for last kill count for this session
row = await database.fetch_one(
"SELECT kills FROM telemetry_events WHERE character_name = :char AND session_id = :session ORDER BY timestamp DESC LIMIT 1",
{"char": snap.character_name, "session": snap.session_id}
)
last = row["kills"] if row else 0
logger.debug(f"Cache miss for {snap.character_name} session {snap.session_id[:8]}: loaded last_kills={last} from database")
delta = snap.kills - last
# Persist snapshot and any kill delta in a single transaction
db_start_time = time.time()
# Log connection pool status before database operation
try:
pool_status = f"pool_size:{database._pool._queue.qsize()}" if hasattr(database, '_pool') and hasattr(database._pool, '_queue') else "pool_status:unknown"
except:
pool_status = "pool_status:error"
logger.info(f"💾 TELEMETRY_DB_WRITE_ATTEMPT: {snap.character_name} session:{snap.session_id[:8]} kills:{snap.kills} delta:{delta} {pool_status}")
try:
async with database.transaction():
await database.execute(
telemetry_events.insert().values(**db_data)
)
if delta > 0:
stmt = pg_insert(char_stats).values(
character_name=snap.character_name,
total_kills=delta
).on_conflict_do_update(
index_elements=["character_name"],
set_={"total_kills": char_stats.c.total_kills + delta},
)
await database.execute(stmt)
logger.debug(f"Updated kills for {snap.character_name}: +{delta} (total from {last} to {snap.kills})")
# Success: log timing and update cache
db_duration = (time.time() - db_start_time) * 1000
ws_receive_snapshots._last_kills[key] = snap.kills
# Track database performance (Phase 2)
global _total_queries, _total_query_time
_total_queries += 1
_total_query_time += db_duration / 1000.0 # Convert ms to seconds
# Track recent activity (Phase 3)
global _recent_telemetry_messages, _max_recent_messages
activity_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"character_name": snap.character_name,
"kills": snap.kills,
"kill_delta": delta,
"query_time": round(db_duration, 1)
}
_recent_telemetry_messages.append(activity_entry)
if len(_recent_telemetry_messages) > _max_recent_messages:
_recent_telemetry_messages.pop(0)
# Log final pool status after successful operation
try:
final_pool_status = f"pool_size:{database._pool._queue.qsize()}" if hasattr(database, '_pool') and hasattr(database._pool, '_queue') else "pool_status:unknown"
except:
final_pool_status = "pool_status:error"
logger.info(f"✅ TELEMETRY_DB_WRITE_SUCCESS: {snap.character_name} took {db_duration:.1f}ms {final_pool_status}")
except Exception as db_error:
db_duration = (time.time() - db_start_time) * 1000
# Log pool status during failure
try:
error_pool_status = f"pool_size:{database._pool._queue.qsize()}" if hasattr(database, '_pool') and hasattr(database._pool, '_queue') else "pool_status:unknown"
except:
error_pool_status = "pool_status:error"
logger.error(f"❌ TELEMETRY_DB_WRITE_FAILED: {snap.character_name} session:{snap.session_id[:8]} took {db_duration:.1f}ms {error_pool_status} error:{db_error}", exc_info=True)
continue
# Broadcast updated snapshot to all browser clients
await _broadcast_to_browser_clients(snap.dict())
# Log successful processing completion with timing
total_duration = (time.time() - telemetry_start_time) * 1000
logger.info(f"⏱️ TELEMETRY_PROCESSING_COMPLETE: {snap.character_name} took {total_duration:.1f}ms total")
except Exception as e:
total_duration = (time.time() - telemetry_start_time) * 1000
logger.error(f"❌ TELEMETRY_PROCESSING_FAILED: {character_name} took {total_duration:.1f}ms error:{e}", exc_info=True)
continue
# --- Rare event: update total and session counters and persist ---
if msg_type == "rare":
name = data.get("character_name")
if isinstance(name, str) and name.strip():
try:
# Total rare count per character
stmt_tot = pg_insert(rare_stats).values(
character_name=name,
total_rares=1
).on_conflict_do_update(
index_elements=["character_name"],
set_={"total_rares": rare_stats.c.total_rares + 1},
)
await database.execute(stmt_tot)
# Session-specific rare count (use live cache or fallback to latest telemetry)
session_id = live_snapshots.get(name, {}).get("session_id")
if not session_id:
row = await database.fetch_one(
"SELECT session_id FROM telemetry_events"
" WHERE character_name = :name"
" ORDER BY timestamp DESC LIMIT 1",
{"name": name}
)
if row:
session_id = row["session_id"]
if session_id:
stmt_sess = pg_insert(rare_stats_sessions).values(
character_name=name,
session_id=session_id,
session_rares=1
).on_conflict_do_update(
index_elements=["character_name", "session_id"],
set_={"session_rares": rare_stats_sessions.c.session_rares + 1},
)
await database.execute(stmt_sess)
# Persist individual rare event for future analysis
payload = data.copy()
payload.pop("type", None)
try:
rare_ev = RareEvent.parse_obj(payload)
await database.execute(
rare_events.insert().values(**rare_ev.dict())
)
logger.info(f"Recorded rare event: {rare_ev.name} by {name}")
# Broadcast rare event to browser clients for epic notifications
await _broadcast_to_browser_clients(data)
except Exception as e:
logger.error(f"Failed to persist rare event: {e}")
except Exception as e:
logger.error(f"Failed to process rare event for {name}: {e}", exc_info=True)
continue
# --- Chat message: forward chat payload to browser clients ---
if msg_type == "chat":
await _broadcast_to_browser_clients(data)
logger.debug(f"Broadcasted chat message from {data.get('character_name', 'unknown')}")
continue
# --- Full inventory message: store complete inventory snapshot ---
if msg_type == "full_inventory":
payload = data.copy()
payload.pop("type", None)
try:
inventory_msg = FullInventoryMessage.parse_obj(payload)
await _store_inventory(inventory_msg)
logger.info(f"Stored inventory for {inventory_msg.character_name}: {inventory_msg.item_count} items")
except Exception as e:
logger.error(f"Failed to process inventory for {data.get('character_name', 'unknown')}: {e}", exc_info=True)
continue
# --- Vitals message: store character health/stamina/mana and broadcast ---
if msg_type == "vitals":
payload = data.copy()
payload.pop("type", None)
try:
vitals_msg = VitalsMessage.parse_obj(payload)
live_vitals[vitals_msg.character_name] = vitals_msg.dict()
await _broadcast_to_browser_clients(data)
logger.debug(f"Updated vitals for {vitals_msg.character_name}: {vitals_msg.health_percentage}% HP, {vitals_msg.stamina_percentage}% Stam, {vitals_msg.mana_percentage}% Mana")
except Exception as e:
logger.error(f"Failed to process vitals for {data.get('character_name', 'unknown')}: {e}", exc_info=True)
continue
# --- Quest message: update cache and broadcast (no database storage) ---
if msg_type == "quest":
character_name = data.get("character_name")
quest_name = data.get("quest_name")
countdown = data.get("countdown")
if character_name and quest_name and countdown is not None:
# Only track specific quest types
allowed_quests = {
"Stipend Collection Timer",
"Blank Augmentation Gem Pickup Timer",
"Insatiable Eater Jaw"
}
if quest_name in allowed_quests:
# Update quest cache
if character_name not in _quest_status_cache:
_quest_status_cache[character_name] = {}
_quest_status_cache[character_name][quest_name] = countdown
# Broadcast to browser clients for real-time updates
await _broadcast_to_browser_clients(data)
logger.debug(f"Updated quest status for {character_name}: {quest_name} = {countdown}")
else:
logger.debug(f"Ignoring non-tracked quest: {quest_name}")
else:
logger.warning(f"Invalid quest message format from {websocket.client}: missing required fields")
continue
# --- Portal message: store in database and broadcast ---
if msg_type == "portal":
character_name = data.get("character_name")
portal_name = data.get("portal_name")
ns = data.get("ns")
ew = data.get("ew")
z = data.get("z")
timestamp_str = data.get("timestamp")
if all([character_name, portal_name, ns, ew, z, timestamp_str]):
try:
# Parse timestamp
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
# Convert coordinates to floats for database storage
ns = float(ns)
ew = float(ew)
z = float(z)
# Round coordinates for comparison (0.1 tolerance to match DB constraint)
ns_rounded = round(ns, 1)
ew_rounded = round(ew, 1)
# Check if portal exists at these coordinates
existing_portal = await database.fetch_one(
"""
SELECT id FROM portals
WHERE ROUND(ns::numeric, 1) = :ns_rounded
AND ROUND(ew::numeric, 1) = :ew_rounded
LIMIT 1
""",
{
"ns_rounded": ns_rounded,
"ew_rounded": ew_rounded
}
)
if not existing_portal:
# Store new portal in database with ON CONFLICT handling
# This prevents race conditions and duplicate key errors
try:
await database.execute(
portals.insert().values(
portal_name=portal_name,
ns=ns,
ew=ew,
z=z,
discovered_at=timestamp,
discovered_by=character_name
)
)
logger.info(f"New portal discovered: {portal_name} at {ns_rounded}, {ew_rounded} by {character_name}")
except Exception as insert_error:
# If insert fails due to duplicate, update the existing portal
if "duplicate key" in str(insert_error).lower():
await database.execute(
"""
UPDATE portals
SET discovered_at = :timestamp, discovered_by = :character_name
WHERE ROUND(ns::numeric, 1) = :ns_rounded
AND ROUND(ew::numeric, 1) = :ew_rounded
""",
{
"timestamp": timestamp,
"character_name": character_name,
"ns_rounded": ns_rounded,
"ew_rounded": ew_rounded
}
)
logger.debug(f"Portal already exists (race condition), updated: {portal_name} at {ns_rounded}, {ew_rounded}")
else:
raise
else:
# Update timestamp for existing portal to keep it alive
await database.execute(
"""
UPDATE portals
SET discovered_at = :timestamp, discovered_by = :character_name
WHERE ROUND(ns::numeric, 1) = :ns_rounded
AND ROUND(ew::numeric, 1) = :ew_rounded
""",
{
"timestamp": timestamp,
"character_name": character_name,
"ns_rounded": ns_rounded,
"ew_rounded": ew_rounded
}
)
logger.info(f"Portal timestamp updated: {portal_name} at {ns_rounded}, {ew_rounded} by {character_name}")
# Broadcast to browser clients for map updates
await _broadcast_to_browser_clients(data)
except Exception as e:
logger.error(f"Failed to process portal discovery for {character_name}: {e}", exc_info=True)
else:
logger.warning(f"Invalid portal message format from {websocket.client}: missing required fields")
continue
# Unknown message types are ignored
if msg_type:
logger.warning(f"Unknown message type '{msg_type}' from {websocket.client}")
finally:
# Track plugin disconnection
_plugin_connections = max(0, _plugin_connections - 1)
# Clean up any plugin registrations for this socket
to_remove = [n for n, ws in plugin_conns.items() if ws is websocket]
for n in to_remove:
# Use pop() instead of del to avoid KeyError if already removed
plugin_conns.pop(n, None)
# Also clean up any entries in the kill tracking cache for this session
# Remove entries that might be associated with disconnected clients
stale_keys = []
for (session_id, char_name), _ in ws_receive_snapshots._last_kills.items():
if char_name in to_remove:
stale_keys.append((session_id, char_name))
for key in stale_keys:
ws_receive_snapshots._last_kills.pop(key, None)
if to_remove:
logger.info(f"Cleaned up plugin connections for characters: {to_remove} from {websocket.client}")
if stale_keys:
logger.debug(f"Cleaned up {len(stale_keys)} kill tracking cache entries")
else:
logger.debug(f"No plugin registrations to clean up for {websocket.client}")
# In-memory cache of last seen kill counts per (session_id, character_name)
# Used to compute deltas for updating persistent kill statistics efficiently
ws_receive_snapshots._last_kills = {}
async def cleanup_stale_connections():
"""Periodic cleanup of stale WebSocket connections.
This function can be called periodically to clean up connections
that may have become stale but weren't properly cleaned up.
"""
# Clean up plugin connections that no longer have valid WebSockets
stale_plugins = []
for char_name, ws in list(plugin_conns.items()):
try:
# Test if the WebSocket is still alive by checking its state
if ws.client_state.name != 'CONNECTED':
stale_plugins.append(char_name)
except Exception:
# If we can't check the state, consider it stale
stale_plugins.append(char_name)
for char_name in stale_plugins:
plugin_conns.pop(char_name, None)
logger.info(f"Cleaned up stale plugin connection: {char_name}")
# Clean up browser connections
stale_browsers = []
for ws in list(browser_conns):
try:
if ws.client_state.name != 'CONNECTED':
stale_browsers.append(ws)
except Exception:
stale_browsers.append(ws)
for ws in stale_browsers:
browser_conns.discard(ws)
if stale_browsers:
logger.info(f"Cleaned up {len(stale_browsers)} stale browser connections")
logger.debug(f"Connection health check: {len(plugin_conns)} plugins, {len(browser_conns)} browsers")
@app.websocket("/ws/live")
async def ws_live_updates(websocket: WebSocket):
"""WebSocket endpoint for browser clients to receive live updates and send commands.
Manages a set of connected browser clients; listens for incoming command messages
and forwards them to the appropriate plugin client WebSocket.
"""
global _browser_connections
# Add new browser client to the set
await websocket.accept()
browser_conns.add(websocket)
logger.info(f"Browser WebSocket connected: {websocket.client}")
# Track browser connection
_browser_connections += 1
try:
while True:
# Receive command messages from browser
try:
data = await websocket.receive_json()
# Debug: log all incoming browser WebSocket messages
logger.debug(f"Browser WebSocket RX from {websocket.client}: {data}")
except WebSocketDisconnect:
logger.info(f"Browser WebSocket disconnected: {websocket.client}")
break
# Determine command envelope format (new or legacy)
if "player_name" in data and "command" in data:
# New format: { player_name, command }
target_name = data["player_name"]
payload = data
elif data.get("type") == "command" and "character_name" in data and "text" in data:
# Legacy format: { type: 'command', character_name, text }
target_name = data.get("character_name")
payload = {"player_name": target_name, "command": data.get("text")}
else:
# Not a recognized command envelope
continue
# Forward command envelope to the appropriate plugin WebSocket
target_ws = plugin_conns.get(target_name)
if target_ws:
try:
await target_ws.send_json(payload)
logger.debug(f"Forwarded command to plugin for {target_name}: {payload}")
except (WebSocketDisconnect, RuntimeError, ConnectionAbortedError) as e:
logger.warning(f"Failed to forward command to {target_name}: {e}")
# Remove stale connection
plugin_conns.pop(target_name, None)
except Exception as e:
logger.error(f"Unexpected error forwarding command to {target_name}: {e}")
# Remove potentially corrupted connection
plugin_conns.pop(target_name, None)
else:
logger.warning(f"No plugin connection found for target character: {target_name}")
except WebSocketDisconnect:
pass
finally:
# Track browser disconnection
_browser_connections = max(0, _browser_connections - 1)
browser_conns.discard(websocket)
logger.debug(f"Removed browser WebSocket from connection pool: {websocket.client}")
## -------------------- static frontend ---------------------------
## (static mount moved to end of file, below API routes)
# list routes for convenience
logger.info("🔍 Registered HTTP API routes:")
for route in app.routes:
if isinstance(route, APIRoute):
# Log the path and allowed methods for each API route
logger.info(f"{route.path} -> {route.methods}")
# Add stats endpoint for per-character metrics
@app.get("/stats/{character_name}")
async def get_stats(character_name: str):
"""
HTTP GET endpoint to retrieve per-character metrics:
- latest_snapshot: most recent telemetry entry for the character
- total_kills: accumulated kills from char_stats
- total_rares: accumulated rares from rare_stats
Returns 404 if character has no recorded telemetry.
"""
try:
# Single optimized query with LEFT JOINs to get all data in one round trip
sql = """
WITH latest AS (
SELECT * FROM telemetry_events
WHERE character_name = :cn
ORDER BY timestamp DESC LIMIT 1
)
SELECT
l.*,
COALESCE(cs.total_kills, 0) as total_kills,
COALESCE(rs.total_rares, 0) as total_rares
FROM latest l
LEFT JOIN char_stats cs ON l.character_name = cs.character_name
LEFT JOIN rare_stats rs ON l.character_name = rs.character_name
"""
row = await database.fetch_one(sql, {"cn": character_name})
if not row:
logger.warning(f"No telemetry data found for character: {character_name}")
raise HTTPException(status_code=404, detail="Character not found")
# Extract latest snapshot data (exclude the added total_kills/total_rares)
snap_dict = {k: v for k, v in dict(row).items()
if k not in ("total_kills", "total_rares")}
result = {
"character_name": character_name,
"latest_snapshot": snap_dict,
"total_kills": row["total_kills"],
"total_rares": row["total_rares"],
}
logger.debug(f"Retrieved stats for character: {character_name} (optimized query)")
return JSONResponse(content=jsonable_encoder(result))
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get stats for character {character_name}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
# -------------------- static frontend ---------------------------
# Custom icon handler that prioritizes clean icons over originals
from fastapi.responses import FileResponse
@app.get("/icons/{icon_filename}")
async def serve_icon(icon_filename: str):
"""Serve icons from static/icons directory"""
# Serve from static/icons directory
icon_path = Path("static/icons") / icon_filename
if icon_path.exists():
return FileResponse(icon_path, media_type="image/png")
# Icon not found
raise HTTPException(status_code=404, detail="Icon not found")
# -------------------- Inventory Service Proxy ---------------------------
@app.get("/inv/test")
async def test_inventory_route():
"""Test route to verify inventory proxy is working"""
return {"message": "Inventory proxy route is working"}
@app.api_route("/inv/{path:path}", methods=["GET", "POST"])
async def proxy_inventory_service(path: str, request: Request):
"""Proxy all inventory service requests"""
try:
inventory_service_url = os.getenv('INVENTORY_SERVICE_URL', 'http://inventory-service:8000')
logger.info(f"Proxying to inventory service: {inventory_service_url}/{path}")
# Forward the request to inventory service
async with httpx.AsyncClient() as client:
response = await client.request(
method=request.method,
url=f"{inventory_service_url}/{path}",
params=request.query_params,
headers=dict(request.headers),
content=await request.body()
)
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
except Exception as e:
logger.error(f"Failed to proxy inventory request: {e}")
raise HTTPException(status_code=500, detail="Inventory service unavailable")
# Icons are now served from static/icons directory
# Serve SPA files (catch-all for frontend routes)
# Mount the single-page application frontend (static assets) at root path
app.mount("/", StaticFiles(directory="static", html=True), name="static")