Major overhaul of db -> hypertable conversion, updated GUI, added inventory

This commit is contained in:
erik 2025-06-08 20:51:06 +00:00
parent fdf9f04bc6
commit f218350959
8 changed files with 1565 additions and 210 deletions

439
main.py
View file

@ -10,7 +10,8 @@ import json
import logging
import os
import sys
from typing import Dict
from typing import Dict, List, Any
from pathlib import Path
from fastapi import FastAPI, Header, HTTPException, Query, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
@ -30,6 +31,7 @@ from db_async import (
rare_stats_sessions,
spawn_events,
rare_events,
character_inventories,
init_db_async
)
import asyncio
@ -54,6 +56,9 @@ _cache_task: asyncio.Task | None = None
async def _refresh_cache_loop() -> None:
"""Background task: refresh `/live` and `/trails` caches every 5 seconds."""
consecutive_failures = 0
max_consecutive_failures = 5
while True:
try:
# Recompute live players (last 30s)
@ -61,7 +66,8 @@ async def _refresh_cache_loop() -> None:
sql_live = """
SELECT sub.*,
COALESCE(rs.total_rares, 0) AS total_rares,
COALESCE(rss.session_rares, 0) AS session_rares
COALESCE(rss.session_rares, 0) AS session_rares,
COALESCE(cs.total_kills, 0) AS total_kills
FROM (
SELECT DISTINCT ON (character_name) *
FROM telemetry_events
@ -73,25 +79,52 @@ async def _refresh_cache_loop() -> None:
LEFT JOIN rare_stats_sessions rss
ON sub.character_name = rss.character_name
AND sub.session_id = rss.session_id
LEFT JOIN char_stats cs
ON sub.character_name = cs.character_name
"""
rows = await database.fetch_all(sql_live, {"cutoff": cutoff})
_cached_live["players"] = [dict(r) for r in rows]
# Recompute trails (last 600s)
cutoff2 = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=600)
sql_trail = """
SELECT timestamp, character_name, ew, ns, z
FROM telemetry_events
WHERE timestamp >= :cutoff
ORDER BY character_name, timestamp
"""
rows2 = await database.fetch_all(sql_trail, {"cutoff": cutoff2})
_cached_trails["trails"] = [
{"timestamp": r["timestamp"], "character_name": r["character_name"],
"ew": r["ew"], "ns": r["ns"], "z": r["z"]}
for r in rows2
]
# Use a single connection for both queries to reduce connection churn
async with database.connection() as conn:
rows = await conn.fetch_all(sql_live, {"cutoff": cutoff})
_cached_live["players"] = [dict(r) for r in rows]
# Recompute trails (last 600s)
cutoff2 = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=600)
sql_trail = """
SELECT timestamp, character_name, ew, ns, z
FROM telemetry_events
WHERE timestamp >= :cutoff
ORDER BY character_name, timestamp
"""
rows2 = await conn.fetch_all(sql_trail, {"cutoff": cutoff2})
_cached_trails["trails"] = [
{"timestamp": r["timestamp"], "character_name": r["character_name"],
"ew": r["ew"], "ns": r["ns"], "z": r["z"]}
for r in rows2
]
# Reset failure counter on success
consecutive_failures = 0
logger.debug(f"Cache refreshed: {len(_cached_live['players'])} players, {len(_cached_trails['trails'])} trail points")
except Exception as e:
logger.error(f"Cache refresh failed: {e}", exc_info=True)
consecutive_failures += 1
logger.error(f"Cache refresh failed ({consecutive_failures}/{max_consecutive_failures}): {e}", exc_info=True)
# If too many consecutive failures, wait longer and try to reconnect
if consecutive_failures >= max_consecutive_failures:
logger.warning(f"Too many consecutive cache refresh failures. Attempting database reconnection...")
try:
await database.disconnect()
await asyncio.sleep(2)
await database.connect()
logger.info("Database reconnected successfully")
consecutive_failures = 0
except Exception as reconnect_error:
logger.error(f"Database reconnection failed: {reconnect_error}")
await asyncio.sleep(10) # Wait longer before retrying
continue
await asyncio.sleep(5)
# ------------------------------------------------------------------
@ -127,6 +160,7 @@ class TelemetrySnapshot(BaseModel):
kills_per_hour: Optional[float] = None
onlinetime: Optional[str] = None
deaths: int
total_deaths: Optional[int] = None
# Removed from telemetry payload; always enforced to 0 and tracked via rare events
rares_found: int = 0
prismatic_taper_count: int
@ -163,6 +197,17 @@ class RareEvent(BaseModel):
z: float = 0.0
class FullInventoryMessage(BaseModel):
"""
Model for the full_inventory WebSocket message type.
Contains complete character inventory snapshot with raw item data.
"""
character_name: str
timestamp: datetime
item_count: int
items: List[Dict[str, Any]]
@app.on_event("startup")
async def on_startup():
"""Event handler triggered when application starts up.
@ -176,6 +221,11 @@ async def on_startup():
await database.connect()
await init_db_async()
logger.info(f"Database connected successfully on attempt {attempt}")
# Log connection pool configuration
try:
logger.info(f"Database connection established with pool configuration")
except Exception as pool_error:
logger.debug(f"Could not access pool details: {pool_error}")
break
except Exception as e:
logger.warning(f"Database connection failed (attempt {attempt}/{max_attempts}): {e}")
@ -239,6 +289,140 @@ async def get_trails(
logger.error(f"Failed to get trails: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
# --- GET Inventory Endpoints ---------------------------------
@app.get("/inventory/{character_name}")
async def get_character_inventory(character_name: str):
"""Get the complete inventory for a specific character from the database."""
try:
query = """
SELECT name, icon, object_class, value, burden, has_id_data, item_data, timestamp
FROM character_inventories
WHERE character_name = :character_name
ORDER BY name
"""
rows = await database.fetch_all(query, {"character_name": character_name})
if not rows:
raise HTTPException(status_code=404, detail=f"No inventory found for character '{character_name}'")
items = []
for row in rows:
item = dict(row)
items.append(item)
return JSONResponse(content=jsonable_encoder({
"character_name": character_name,
"item_count": len(items),
"items": items
}))
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get inventory for {character_name}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/inventory/{character_name}/search")
async def search_character_inventory(
character_name: str,
name: str = Query(None, description="Search by item name (partial match)"),
object_class: int = Query(None, description="Filter by ObjectClass"),
min_value: int = Query(None, description="Minimum item value"),
max_value: int = Query(None, description="Maximum item value"),
min_burden: int = Query(None, description="Minimum burden"),
max_burden: int = Query(None, description="Maximum burden")
):
"""Search and filter inventory items for a character with various criteria."""
try:
conditions = ["character_name = :character_name"]
params = {"character_name": character_name}
if name:
conditions.append("name ILIKE :name")
params["name"] = f"%{name}%"
if object_class is not None:
conditions.append("object_class = :object_class")
params["object_class"] = object_class
if min_value is not None:
conditions.append("value >= :min_value")
params["min_value"] = min_value
if max_value is not None:
conditions.append("value <= :max_value")
params["max_value"] = max_value
if min_burden is not None:
conditions.append("burden >= :min_burden")
params["min_burden"] = min_burden
if max_burden is not None:
conditions.append("burden <= :max_burden")
params["max_burden"] = max_burden
query = f"""
SELECT name, icon, object_class, value, burden, has_id_data, item_data, timestamp
FROM character_inventories
WHERE {' AND '.join(conditions)}
ORDER BY value DESC, name
"""
rows = await database.fetch_all(query, params)
items = []
for row in rows:
item = dict(row)
items.append(item)
return JSONResponse(content=jsonable_encoder({
"character_name": character_name,
"item_count": len(items),
"search_criteria": {
"name": name,
"object_class": object_class,
"min_value": min_value,
"max_value": max_value,
"min_burden": min_burden,
"max_burden": max_burden
},
"items": items
}))
except Exception as e:
logger.error(f"Failed to search inventory for {character_name}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/inventories")
async def list_characters_with_inventories():
"""List all characters that have stored inventories with item counts."""
try:
query = """
SELECT character_name, COUNT(*) as item_count, MAX(timestamp) as last_updated
FROM character_inventories
GROUP BY character_name
ORDER BY last_updated DESC
"""
rows = await database.fetch_all(query)
characters = []
for row in rows:
characters.append({
"character_name": row["character_name"],
"item_count": row["item_count"],
"last_updated": row["last_updated"]
})
return JSONResponse(content=jsonable_encoder({
"characters": characters,
"total_characters": len(characters)
}))
except Exception as e:
logger.error(f"Failed to list inventory characters: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
# -------------------- WebSocket endpoints -----------------------
## WebSocket connection tracking
# Set of browser WebSocket clients subscribed to live updates
@ -250,15 +434,99 @@ async def _broadcast_to_browser_clients(snapshot: dict):
"""Broadcast a telemetry or chat message to all connected browser clients.
Converts any non-serializable types (e.g., datetime) before sending.
Handles connection errors gracefully and removes stale connections.
"""
# Convert snapshot payload to JSON-friendly types
data = jsonable_encoder(snapshot)
# Use list() to avoid "set changed size during iteration" errors
disconnected_clients = []
for ws in list(browser_conns):
try:
await ws.send_json(data)
except (WebSocketDisconnect, RuntimeError) as e:
browser_conns.discard(ws)
logger.debug(f"Removed disconnected browser client from broadcast list: {e}")
except (WebSocketDisconnect, RuntimeError, ConnectionAbortedError) as e:
# Collect disconnected clients for cleanup
disconnected_clients.append(ws)
logger.debug(f"Detected disconnected browser client: {e}")
except Exception as e:
# Handle any other unexpected errors
disconnected_clients.append(ws)
logger.warning(f"Unexpected error broadcasting to browser client: {e}")
# Clean up disconnected clients
for ws in disconnected_clients:
browser_conns.discard(ws)
async def _store_inventory(inventory_msg: FullInventoryMessage):
"""Store complete character inventory to database with extracted searchable fields.
Processes each item to extract key fields for indexing while preserving
complete item data in JSONB format. Uses UPSERT to handle item updates.
"""
try:
# Create inventory directory if it doesn't exist
inventory_dir = Path("./inventory")
inventory_dir.mkdir(exist_ok=True)
# Store as JSON file for backwards compatibility / debugging
file_path = inventory_dir / f"{inventory_msg.character_name}_inventory.json"
inventory_data = {
"character_name": inventory_msg.character_name,
"timestamp": inventory_msg.timestamp.isoformat(),
"item_count": inventory_msg.item_count,
"items": inventory_msg.items
}
with open(file_path, 'w') as f:
json.dump(inventory_data, f, indent=2)
# Store items in database
for item in inventory_msg.items:
# Extract searchable fields
item_id = item.get("Id")
if item_id is None:
continue # Skip items without ID
name = item.get("Name", "")
icon = item.get("Icon", 0)
object_class = item.get("ObjectClass", 0)
value = item.get("Value", 0)
burden = item.get("Burden", 0)
has_id_data = item.get("HasIdData", False)
# UPSERT item into database
stmt = pg_insert(character_inventories).values(
character_name=inventory_msg.character_name,
item_id=item_id,
timestamp=inventory_msg.timestamp,
name=name,
icon=icon,
object_class=object_class,
value=value,
burden=burden,
has_id_data=has_id_data,
item_data=item
).on_conflict_do_update(
constraint="uq_char_item",
set_={
"timestamp": inventory_msg.timestamp,
"name": name,
"icon": icon,
"object_class": object_class,
"value": value,
"burden": burden,
"has_id_data": has_id_data,
"item_data": item
}
)
await database.execute(stmt)
except Exception as e:
logger.error(f"Failed to store inventory for {inventory_msg.character_name}: {e}", exc_info=True)
raise
@app.websocket("/ws/position")
async def ws_receive_snapshots(
@ -336,7 +604,19 @@ async def ws_receive_snapshots(
db_data = snap.dict()
db_data['rares_found'] = 0
key = (snap.session_id, snap.character_name)
last = ws_receive_snapshots._last_kills.get(key, 0)
# Get last recorded kill count for this session
if key in ws_receive_snapshots._last_kills:
last = ws_receive_snapshots._last_kills[key]
else:
# Cache miss - check database for last kill count for this session
row = await database.fetch_one(
"SELECT kills FROM telemetry_events WHERE character_name = :char AND session_id = :session ORDER BY timestamp DESC LIMIT 1",
{"char": snap.character_name, "session": snap.session_id}
)
last = row["kills"] if row else 0
logger.debug(f"Cache miss for {snap.character_name} session {snap.session_id[:8]}: loaded last_kills={last} from database")
delta = snap.kills - last
# Persist snapshot and any kill delta in a single transaction
try:
@ -418,6 +698,17 @@ async def ws_receive_snapshots(
await _broadcast_to_browser_clients(data)
logger.debug(f"Broadcasted chat message from {data.get('character_name', 'unknown')}")
continue
# --- Full inventory message: store complete inventory snapshot ---
if msg_type == "full_inventory":
payload = data.copy()
payload.pop("type", None)
try:
inventory_msg = FullInventoryMessage.parse_obj(payload)
await _store_inventory(inventory_msg)
logger.info(f"Stored inventory for {inventory_msg.character_name}: {inventory_msg.item_count} items")
except Exception as e:
logger.error(f"Failed to process inventory for {data.get('character_name', 'unknown')}: {e}", exc_info=True)
continue
# Unknown message types are ignored
if msg_type:
logger.warning(f"Unknown message type '{msg_type}' from {websocket.client}")
@ -425,9 +716,22 @@ async def ws_receive_snapshots(
# Clean up any plugin registrations for this socket
to_remove = [n for n, ws in plugin_conns.items() if ws is websocket]
for n in to_remove:
del plugin_conns[n]
# Use pop() instead of del to avoid KeyError if already removed
plugin_conns.pop(n, None)
# Also clean up any entries in the kill tracking cache for this session
# Remove entries that might be associated with disconnected clients
stale_keys = []
for (session_id, char_name), _ in ws_receive_snapshots._last_kills.items():
if char_name in to_remove:
stale_keys.append((session_id, char_name))
for key in stale_keys:
ws_receive_snapshots._last_kills.pop(key, None)
if to_remove:
logger.info(f"Cleaned up plugin connections for characters: {to_remove} from {websocket.client}")
if stale_keys:
logger.debug(f"Cleaned up {len(stale_keys)} kill tracking cache entries")
else:
logger.debug(f"No plugin registrations to clean up for {websocket.client}")
@ -435,6 +739,44 @@ async def ws_receive_snapshots(
# Used to compute deltas for updating persistent kill statistics efficiently
ws_receive_snapshots._last_kills = {}
async def cleanup_stale_connections():
"""Periodic cleanup of stale WebSocket connections.
This function can be called periodically to clean up connections
that may have become stale but weren't properly cleaned up.
"""
# Clean up plugin connections that no longer have valid WebSockets
stale_plugins = []
for char_name, ws in list(plugin_conns.items()):
try:
# Test if the WebSocket is still alive by checking its state
if ws.client_state.name != 'CONNECTED':
stale_plugins.append(char_name)
except Exception:
# If we can't check the state, consider it stale
stale_plugins.append(char_name)
for char_name in stale_plugins:
plugin_conns.pop(char_name, None)
logger.info(f"Cleaned up stale plugin connection: {char_name}")
# Clean up browser connections
stale_browsers = []
for ws in list(browser_conns):
try:
if ws.client_state.name != 'CONNECTED':
stale_browsers.append(ws)
except Exception:
stale_browsers.append(ws)
for ws in stale_browsers:
browser_conns.discard(ws)
if stale_browsers:
logger.info(f"Cleaned up {len(stale_browsers)} stale browser connections")
logger.debug(f"Connection health check: {len(plugin_conns)} plugins, {len(browser_conns)} browsers")
@app.websocket("/ws/live")
async def ws_live_updates(websocket: WebSocket):
"""WebSocket endpoint for browser clients to receive live updates and send commands.
@ -478,6 +820,10 @@ async def ws_live_updates(websocket: WebSocket):
logger.warning(f"Failed to forward command to {target_name}: {e}")
# Remove stale connection
plugin_conns.pop(target_name, None)
except Exception as e:
logger.error(f"Unexpected error forwarding command to {target_name}: {e}")
# Remove potentially corrupted connection
plugin_conns.pop(target_name, None)
else:
logger.warning(f"No plugin connection found for target character: {target_name}")
except WebSocketDisconnect:
@ -507,32 +853,37 @@ async def get_stats(character_name: str):
Returns 404 if character has no recorded telemetry.
"""
try:
# Latest snapshot
sql_snap = (
"SELECT * FROM telemetry_events "
"WHERE character_name = :cn "
"ORDER BY timestamp DESC LIMIT 1"
)
snap = await database.fetch_one(sql_snap, {"cn": character_name})
if not snap:
# Single optimized query with LEFT JOINs to get all data in one round trip
sql = """
WITH latest AS (
SELECT * FROM telemetry_events
WHERE character_name = :cn
ORDER BY timestamp DESC LIMIT 1
)
SELECT
l.*,
COALESCE(cs.total_kills, 0) as total_kills,
COALESCE(rs.total_rares, 0) as total_rares
FROM latest l
LEFT JOIN char_stats cs ON l.character_name = cs.character_name
LEFT JOIN rare_stats rs ON l.character_name = rs.character_name
"""
row = await database.fetch_one(sql, {"cn": character_name})
if not row:
logger.warning(f"No telemetry data found for character: {character_name}")
raise HTTPException(status_code=404, detail="Character not found")
snap_dict = dict(snap)
# Total kills
sql_kills = "SELECT total_kills FROM char_stats WHERE character_name = :cn"
row_kills = await database.fetch_one(sql_kills, {"cn": character_name})
total_kills = row_kills["total_kills"] if row_kills else 0
# Total rares
sql_rares = "SELECT total_rares FROM rare_stats WHERE character_name = :cn"
row_rares = await database.fetch_one(sql_rares, {"cn": character_name})
total_rares = row_rares["total_rares"] if row_rares else 0
# Extract latest snapshot data (exclude the added total_kills/total_rares)
snap_dict = {k: v for k, v in dict(row).items()
if k not in ("total_kills", "total_rares")}
result = {
"character_name": character_name,
"latest_snapshot": snap_dict,
"total_kills": total_kills,
"total_rares": total_rares,
"total_kills": row["total_kills"],
"total_rares": row["total_rares"],
}
logger.debug(f"Retrieved stats for character: {character_name}")
logger.debug(f"Retrieved stats for character: {character_name} (optimized query)")
return JSONResponse(content=jsonable_encoder(result))
except HTTPException:
raise