""" main.py - FastAPI-based telemetry server for Dereth Tracker. This service ingests real-time position and event data from plugin clients via WebSockets, stores telemetry and statistics in a TimescaleDB backend, and exposes HTTP and WebSocket endpoints for browser clients to retrieve live and historical data, trails, and per-character stats. """ from datetime import datetime, timedelta, timezone import json import logging import os import sys from typing import Dict, List, Any from pathlib import Path from fastapi import FastAPI, Header, HTTPException, Query, WebSocket, WebSocketDisconnect, Request from fastapi.responses import JSONResponse, Response from fastapi.routing import APIRoute from fastapi.staticfiles import StaticFiles from fastapi.encoders import jsonable_encoder from pydantic import BaseModel from typing import Optional import httpx # Async database support from sqlalchemy.dialects.postgresql import insert as pg_insert from db_async import ( database, telemetry_events, char_stats, rare_stats, rare_stats_sessions, spawn_events, rare_events, character_inventories, init_db_async ) import asyncio # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), ] ) logger = logging.getLogger(__name__) # Get log level from environment (DEBUG, INFO, WARNING, ERROR) log_level = os.getenv('LOG_LEVEL', 'INFO').upper() logger.setLevel(getattr(logging, log_level, logging.INFO)) # Inventory service configuration INVENTORY_SERVICE_URL = os.getenv('INVENTORY_SERVICE_URL', 'http://inventory-service:8000') # In-memory caches for REST endpoints _cached_live: dict = {"players": []} _cached_trails: dict = {"trails": []} _cached_total_rares: dict = {"all_time": 0, "today": 0, "last_updated": None} _cache_task: asyncio.Task | None = None _rares_cache_task: asyncio.Task | None = None async def _refresh_cache_loop() -> None: """Background task: refresh `/live` and `/trails` caches every 5 seconds.""" consecutive_failures = 0 max_consecutive_failures = 5 while True: try: # Recompute live players (last 30s) cutoff = datetime.now(timezone.utc) - ACTIVE_WINDOW sql_live = """ SELECT sub.*, COALESCE(rs.total_rares, 0) AS total_rares, COALESCE(rss.session_rares, 0) AS session_rares, COALESCE(cs.total_kills, 0) AS total_kills FROM ( SELECT DISTINCT ON (character_name) * FROM telemetry_events WHERE timestamp > :cutoff ORDER BY character_name, timestamp DESC ) sub LEFT JOIN rare_stats rs ON sub.character_name = rs.character_name LEFT JOIN rare_stats_sessions rss ON sub.character_name = rss.character_name AND sub.session_id = rss.session_id LEFT JOIN char_stats cs ON sub.character_name = cs.character_name """ # Use a single connection for both queries to reduce connection churn async with database.connection() as conn: rows = await conn.fetch_all(sql_live, {"cutoff": cutoff}) _cached_live["players"] = [dict(r) for r in rows] # Recompute trails (last 600s) cutoff2 = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=600) sql_trail = """ SELECT timestamp, character_name, ew, ns, z FROM telemetry_events WHERE timestamp >= :cutoff ORDER BY character_name, timestamp """ rows2 = await conn.fetch_all(sql_trail, {"cutoff": cutoff2}) _cached_trails["trails"] = [ {"timestamp": r["timestamp"], "character_name": r["character_name"], "ew": r["ew"], "ns": r["ns"], "z": r["z"]} for r in rows2 ] # Reset failure counter on success consecutive_failures = 0 logger.debug(f"Cache refreshed: {len(_cached_live['players'])} players, {len(_cached_trails['trails'])} trail points") except Exception as e: consecutive_failures += 1 logger.error(f"Cache refresh failed ({consecutive_failures}/{max_consecutive_failures}): {e}", exc_info=True) # If too many consecutive failures, wait longer and try to reconnect if consecutive_failures >= max_consecutive_failures: logger.warning(f"Too many consecutive cache refresh failures. Attempting database reconnection...") try: await database.disconnect() await asyncio.sleep(2) await database.connect() logger.info("Database reconnected successfully") consecutive_failures = 0 except Exception as reconnect_error: logger.error(f"Database reconnection failed: {reconnect_error}") await asyncio.sleep(10) # Wait longer before retrying continue await asyncio.sleep(5) async def _refresh_total_rares_cache() -> None: """Background task: refresh total rares cache every 5 minutes.""" consecutive_failures = 0 max_consecutive_failures = 3 while True: try: async with database.connection() as conn: # Get all-time total rares (sum of all characters) - gracefully handle missing table try: all_time_query = "SELECT COALESCE(SUM(total_rares), 0) as total FROM rare_stats" all_time_result = await conn.fetch_one(all_time_query) all_time_total = all_time_result["total"] if all_time_result else 0 except Exception as e: logger.debug(f"rare_stats table not available: {e}") all_time_total = 0 # Get today's rares from rare_events table - gracefully handle missing table try: today_query = """ SELECT COUNT(*) as today_count FROM rare_events WHERE timestamp >= CURRENT_DATE """ today_result = await conn.fetch_one(today_query) today_total = today_result["today_count"] if today_result else 0 except Exception as e: logger.debug(f"rare_events table not available or empty: {e}") today_total = 0 # Update cache _cached_total_rares["all_time"] = all_time_total _cached_total_rares["today"] = today_total _cached_total_rares["last_updated"] = datetime.now(timezone.utc) consecutive_failures = 0 logger.debug(f"Total rares cache updated: All-time: {all_time_total}, Today: {today_total}") except Exception as e: consecutive_failures += 1 logger.error(f"Total rares cache refresh failed ({consecutive_failures}/{max_consecutive_failures}): {e}", exc_info=True) if consecutive_failures >= max_consecutive_failures: logger.warning("Too many consecutive total rares cache failures, waiting longer...") await asyncio.sleep(60) # Wait longer on repeated failures continue # Sleep for 5 minutes (300 seconds) await asyncio.sleep(300) # ------------------------------------------------------------------ app = FastAPI() # In-memory store mapping character_name to the most recent telemetry snapshot live_snapshots: Dict[str, dict] = {} live_vitals: Dict[str, dict] = {} # Shared secret used to authenticate plugin WebSocket connections (override for production) SHARED_SECRET = "your_shared_secret" # LOG_FILE = "telemetry_log.jsonl" # ------------------------------------------------------------------ ACTIVE_WINDOW = timedelta(seconds=30) # Time window defining โ€œonlineโ€ players (last 30 seconds) """ Data models for plugin events: - TelemetrySnapshot: periodic telemetry data from a player client - SpawnEvent: information about a mob spawn event - RareEvent: details of a rare mob event """ class TelemetrySnapshot(BaseModel): character_name: str char_tag: Optional[str] = None session_id: str timestamp: datetime ew: float # +E / โ€“W ns: float # +N / โ€“S z: float kills: int kills_per_hour: Optional[float] = None onlinetime: Optional[str] = None deaths: int total_deaths: Optional[int] = None # Removed from telemetry payload; always enforced to 0 and tracked via rare events rares_found: int = 0 prismatic_taper_count: int vt_state: str # Optional telemetry metrics mem_mb: Optional[float] = None cpu_pct: Optional[float] = None mem_handles: Optional[int] = None latency_ms: Optional[float] = None class SpawnEvent(BaseModel): """ Model for a spawn event emitted by plugin clients when a mob appears. Records character context, mob type, timestamp, and spawn location. """ character_name: str mob: str timestamp: datetime ew: float ns: float z: float = 0.0 class RareEvent(BaseModel): """ Model for a rare mob event when a player encounters or discovers a rare entity. Includes character, event name, timestamp, and location coordinates. """ character_name: str name: str timestamp: datetime ew: float ns: float z: float = 0.0 class FullInventoryMessage(BaseModel): """ Model for the full_inventory WebSocket message type. Contains complete character inventory snapshot with raw item data. """ character_name: str timestamp: datetime item_count: int items: List[Dict[str, Any]] class VitalsMessage(BaseModel): """ Model for the vitals WebSocket message type. Contains character health, stamina, mana, and vitae information. """ character_name: str timestamp: datetime health_current: int health_max: int health_percentage: float stamina_current: int stamina_max: int stamina_percentage: float mana_current: int mana_max: int mana_percentage: float vitae: int @app.on_event("startup") async def on_startup(): """Event handler triggered when application starts up. Attempts to connect to the database with retry logic to accommodate potential startup delays (e.g., waiting for Postgres to be ready). """ max_attempts = 5 for attempt in range(1, max_attempts + 1): try: await database.connect() await init_db_async() logger.info(f"Database connected successfully on attempt {attempt}") # Log connection pool configuration try: logger.info(f"Database connection established with pool configuration") except Exception as pool_error: logger.debug(f"Could not access pool details: {pool_error}") break except Exception as e: logger.warning(f"Database connection failed (attempt {attempt}/{max_attempts}): {e}") if attempt < max_attempts: await asyncio.sleep(5) else: raise RuntimeError(f"Could not connect to database after {max_attempts} attempts") # Start background cache refresh (live & trails) global _cache_task, _rares_cache_task _cache_task = asyncio.create_task(_refresh_cache_loop()) _rares_cache_task = asyncio.create_task(_refresh_total_rares_cache()) logger.info("Background cache refresh tasks started") @app.on_event("shutdown") async def on_shutdown(): """Event handler triggered when application is shutting down. Ensures the database connection is closed cleanly. """ # Stop cache refresh tasks global _cache_task, _rares_cache_task if _cache_task: logger.info("Stopping background cache refresh task") _cache_task.cancel() try: await _cache_task except asyncio.CancelledError: pass if _rares_cache_task: logger.info("Stopping total rares cache refresh task") _rares_cache_task.cancel() try: await _rares_cache_task except asyncio.CancelledError: pass logger.info("Disconnecting from database") await database.disconnect() # ------------------------ GET ----------------------------------- @app.get("/debug") def debug(): return {"status": "OK"} @app.get("/live", response_model=dict) @app.get("/live/", response_model=dict) async def get_live_players(): """Return cached live telemetry per character.""" try: return JSONResponse(content=jsonable_encoder(_cached_live)) except Exception as e: logger.error(f"Failed to get live players: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") # --- GET Trails --------------------------------- @app.get("/trails") @app.get("/trails/") async def get_trails( seconds: int = Query(600, ge=0, description="Lookback window in seconds"), ): """Return cached trails (updated every 5 seconds).""" try: return JSONResponse(content=jsonable_encoder(_cached_trails)) except Exception as e: logger.error(f"Failed to get trails: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") @app.get("/total-rares") @app.get("/total-rares/") async def get_total_rares(): """Return cached total rares statistics (updated every 5 minutes).""" try: return JSONResponse(content=jsonable_encoder(_cached_total_rares)) except Exception as e: logger.error(f"Failed to get total rares: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") # --- GET Spawn Heat Map Endpoint --------------------------------- @app.get("/spawns/heatmap") async def get_spawn_heatmap_data( hours: int = Query(24, ge=1, le=168, description="Lookback window in hours (1-168)"), limit: int = Query(10000, ge=100, le=50000, description="Maximum number of spawn points to return") ): """ Aggregate spawn locations for heat-map visualization. Returns spawn event coordinates grouped by location with intensity counts for the specified time window. Response format: { "spawn_points": [{"ew": float, "ns": float, "intensity": int}, ...], "total_points": int, "timestamp": "UTC-ISO" } """ try: cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) # Aggregate spawn events by coordinates within time window query = """ SELECT ew, ns, COUNT(*) AS spawn_count FROM spawn_events WHERE timestamp >= :cutoff GROUP BY ew, ns ORDER BY spawn_count DESC LIMIT :limit """ rows = await database.fetch_all(query, {"cutoff": cutoff, "limit": limit}) spawn_points = [ { "ew": float(row["ew"]), "ns": float(row["ns"]), "intensity": int(row["spawn_count"]) } for row in rows ] result = { "spawn_points": spawn_points, "total_points": len(spawn_points), "timestamp": datetime.now(timezone.utc).isoformat(), "hours_window": hours } logger.debug(f"Heat map data: {len(spawn_points)} unique spawn locations from last {hours} hours") return JSONResponse(content=jsonable_encoder(result)) except Exception as e: logger.error(f"Heat map query failed: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Spawn heat map query failed") # --- GET Inventory Endpoints --------------------------------- @app.get("/inventory/{character_name}") async def get_character_inventory(character_name: str): """Get the complete inventory for a specific character - inventory service only.""" try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get( f"{INVENTORY_SERVICE_URL}/inventory/{character_name}" ) if response.status_code == 200: return JSONResponse(content=response.json()) elif response.status_code == 404: raise HTTPException(status_code=404, detail=f"No inventory found for character '{character_name}'") else: logger.error(f"Inventory service returned {response.status_code} for {character_name}") raise HTTPException(status_code=502, detail="Inventory service error") except httpx.RequestError as e: logger.error(f"Could not reach inventory service: {e}") raise HTTPException(status_code=503, detail="Inventory service unavailable") except HTTPException: raise except Exception as e: logger.error(f"Failed to get inventory for {character_name}: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") @app.get("/inventory/{character_name}/search") async def search_character_inventory( character_name: str, name: str = Query(None, description="Search by item name (partial match)"), object_class: int = Query(None, description="Filter by ObjectClass"), min_value: int = Query(None, description="Minimum item value"), max_value: int = Query(None, description="Maximum item value"), min_burden: int = Query(None, description="Minimum burden"), max_burden: int = Query(None, description="Maximum burden") ): """Search and filter inventory items for a character with various criteria.""" try: conditions = ["character_name = :character_name"] params = {"character_name": character_name} if name: conditions.append("name ILIKE :name") params["name"] = f"%{name}%" if object_class is not None: conditions.append("object_class = :object_class") params["object_class"] = object_class if min_value is not None: conditions.append("value >= :min_value") params["min_value"] = min_value if max_value is not None: conditions.append("value <= :max_value") params["max_value"] = max_value if min_burden is not None: conditions.append("burden >= :min_burden") params["min_burden"] = min_burden if max_burden is not None: conditions.append("burden <= :max_burden") params["max_burden"] = max_burden query = f""" SELECT name, icon, object_class, value, burden, has_id_data, item_data, timestamp FROM character_inventories WHERE {' AND '.join(conditions)} ORDER BY value DESC, name """ rows = await database.fetch_all(query, params) items = [] for row in rows: item = dict(row) items.append(item) return JSONResponse(content=jsonable_encoder({ "character_name": character_name, "item_count": len(items), "search_criteria": { "name": name, "object_class": object_class, "min_value": min_value, "max_value": max_value, "min_burden": min_burden, "max_burden": max_burden }, "items": items })) except Exception as e: logger.error(f"Failed to search inventory for {character_name}: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") @app.get("/inventories") async def list_characters_with_inventories(): """List all characters that have stored inventories with item counts.""" try: query = """ SELECT character_name, COUNT(*) as item_count, MAX(timestamp) as last_updated FROM character_inventories GROUP BY character_name ORDER BY last_updated DESC """ rows = await database.fetch_all(query) characters = [] for row in rows: characters.append({ "character_name": row["character_name"], "item_count": row["item_count"], "last_updated": row["last_updated"] }) return JSONResponse(content=jsonable_encoder({ "characters": characters, "total_characters": len(characters) })) except Exception as e: logger.error(f"Failed to list inventory characters: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") # --- Inventory Service Character List Proxy --------------------- @app.get("/inventory-characters") async def get_inventory_characters(): """Get character list from inventory service - proxy to avoid routing conflicts.""" try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{INVENTORY_SERVICE_URL}/characters/list") if response.status_code == 200: return JSONResponse(content=response.json()) else: logger.error(f"Inventory service returned {response.status_code}: {response.text}") raise HTTPException(status_code=response.status_code, detail="Failed to get characters from inventory service") except Exception as e: logger.error(f"Failed to proxy inventory characters request: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Failed to get inventory characters") # --- Inventory Search Service Proxy Endpoints ------------------- @app.get("/search/items") async def search_items_proxy( text: str = Query(None, description="Search item names, descriptions, or properties"), character: str = Query(None, description="Limit search to specific character"), include_all_characters: bool = Query(False, description="Search across all characters"), equipment_status: str = Query(None, description="equipped, unequipped, or all"), equipment_slot: int = Query(None, description="Equipment slot mask"), # Item category filtering armor_only: bool = Query(False, description="Show only armor items"), jewelry_only: bool = Query(False, description="Show only jewelry items"), weapon_only: bool = Query(False, description="Show only weapon items"), # Spell filtering has_spell: str = Query(None, description="Must have this specific spell (by name)"), spell_contains: str = Query(None, description="Spell name contains this text"), legendary_cantrips: str = Query(None, description="Comma-separated list of legendary cantrip names"), # Combat properties min_damage: int = Query(None, description="Minimum damage"), max_damage: int = Query(None, description="Maximum damage"), min_armor: int = Query(None, description="Minimum armor level"), max_armor: int = Query(None, description="Maximum armor level"), min_attack_bonus: float = Query(None, description="Minimum attack bonus"), min_crit_damage_rating: int = Query(None, description="Minimum critical damage rating"), min_damage_rating: int = Query(None, description="Minimum damage rating"), min_heal_boost_rating: int = Query(None, description="Minimum heal boost rating"), max_level: int = Query(None, description="Maximum wield level requirement"), min_level: int = Query(None, description="Minimum wield level requirement"), material: str = Query(None, description="Material type (partial match)"), min_workmanship: float = Query(None, description="Minimum workmanship"), has_imbue: bool = Query(None, description="Has imbue effects"), item_set: str = Query(None, description="Item set name (partial match)"), min_tinks: int = Query(None, description="Minimum tinker count"), bonded: bool = Query(None, description="Bonded status"), attuned: bool = Query(None, description="Attuned status"), unique: bool = Query(None, description="Unique item status"), is_rare: bool = Query(None, description="Rare item status"), min_condition: int = Query(None, description="Minimum condition percentage"), min_value: int = Query(None, description="Minimum item value"), max_value: int = Query(None, description="Maximum item value"), max_burden: int = Query(None, description="Maximum burden"), sort_by: str = Query("name", description="Sort field: name, value, damage, armor, workmanship"), sort_dir: str = Query("asc", description="Sort direction: asc or desc"), page: int = Query(1, ge=1, description="Page number"), limit: int = Query(50, ge=1, le=200, description="Items per page") ): """Proxy to inventory service comprehensive item search.""" try: # Build query parameters params = {} if text: params["text"] = text if character: params["character"] = character if include_all_characters: params["include_all_characters"] = include_all_characters if equipment_status: params["equipment_status"] = equipment_status if equipment_slot is not None: params["equipment_slot"] = equipment_slot # Category filtering if armor_only: params["armor_only"] = armor_only if jewelry_only: params["jewelry_only"] = jewelry_only if weapon_only: params["weapon_only"] = weapon_only # Spell filtering if has_spell: params["has_spell"] = has_spell if spell_contains: params["spell_contains"] = spell_contains if legendary_cantrips: params["legendary_cantrips"] = legendary_cantrips # Combat properties if min_damage is not None: params["min_damage"] = min_damage if max_damage is not None: params["max_damage"] = max_damage if min_armor is not None: params["min_armor"] = min_armor if max_armor is not None: params["max_armor"] = max_armor if min_attack_bonus is not None: params["min_attack_bonus"] = min_attack_bonus if min_crit_damage_rating is not None: params["min_crit_damage_rating"] = min_crit_damage_rating if min_damage_rating is not None: params["min_damage_rating"] = min_damage_rating if min_heal_boost_rating is not None: params["min_heal_boost_rating"] = min_heal_boost_rating if max_level is not None: params["max_level"] = max_level if min_level is not None: params["min_level"] = min_level if material: params["material"] = material if min_workmanship is not None: params["min_workmanship"] = min_workmanship if has_imbue is not None: params["has_imbue"] = has_imbue if item_set: params["item_set"] = item_set if min_tinks is not None: params["min_tinks"] = min_tinks if bonded is not None: params["bonded"] = bonded if attuned is not None: params["attuned"] = attuned if unique is not None: params["unique"] = unique if is_rare is not None: params["is_rare"] = is_rare if min_condition is not None: params["min_condition"] = min_condition if min_value is not None: params["min_value"] = min_value if max_value is not None: params["max_value"] = max_value if max_burden is not None: params["max_burden"] = max_burden params["sort_by"] = sort_by params["sort_dir"] = sort_dir params["page"] = page params["limit"] = limit async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( f"{INVENTORY_SERVICE_URL}/search/items", params=params ) if response.status_code == 200: return JSONResponse(content=response.json()) else: logger.error(f"Inventory search service returned {response.status_code}") raise HTTPException(status_code=response.status_code, detail="Inventory search service error") except httpx.RequestError as e: logger.error(f"Could not reach inventory service: {e}") raise HTTPException(status_code=503, detail="Inventory service unavailable") except HTTPException: raise except Exception as e: logger.error(f"Failed to search items: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") @app.get("/search/equipped/{character_name}") async def search_equipped_items_proxy( character_name: str, slot: int = Query(None, description="Specific equipment slot mask") ): """Proxy to inventory service equipped items search.""" try: params = {} if slot is not None: params["slot"] = slot async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get( f"{INVENTORY_SERVICE_URL}/search/equipped/{character_name}", params=params ) if response.status_code == 200: return JSONResponse(content=response.json()) elif response.status_code == 404: raise HTTPException(status_code=404, detail=f"No equipped items found for character '{character_name}'") else: logger.error(f"Inventory service returned {response.status_code} for equipped items search") raise HTTPException(status_code=response.status_code, detail="Inventory service error") except httpx.RequestError as e: logger.error(f"Could not reach inventory service: {e}") raise HTTPException(status_code=503, detail="Inventory service unavailable") except HTTPException: raise except Exception as e: logger.error(f"Failed to search equipped items for {character_name}: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") @app.get("/search/upgrades/{character_name}/{slot}") async def find_equipment_upgrades_proxy( character_name: str, slot: int, upgrade_type: str = Query("damage", description="What to optimize for: damage, armor, workmanship, value") ): """Proxy to inventory service equipment upgrades search.""" try: params = {"upgrade_type": upgrade_type} async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get( f"{INVENTORY_SERVICE_URL}/search/upgrades/{character_name}/{slot}", params=params ) if response.status_code == 200: return JSONResponse(content=response.json()) elif response.status_code == 404: raise HTTPException(status_code=404, detail=f"No upgrade options found for character '{character_name}' slot {slot}") else: logger.error(f"Inventory service returned {response.status_code} for upgrades search") raise HTTPException(status_code=response.status_code, detail="Inventory service error") except httpx.RequestError as e: logger.error(f"Could not reach inventory service: {e}") raise HTTPException(status_code=503, detail="Inventory service unavailable") except HTTPException: raise except Exception as e: logger.error(f"Failed to find equipment upgrades for {character_name} slot {slot}: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") @app.get("/sets/list") async def list_equipment_sets_proxy(): """Proxy to inventory service equipment sets list.""" try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{INVENTORY_SERVICE_URL}/sets/list") if response.status_code == 200: return JSONResponse(content=response.json()) else: logger.error(f"Inventory service returned {response.status_code} for sets list") raise HTTPException(status_code=response.status_code, detail="Inventory service error") except httpx.RequestError as e: logger.error(f"Could not reach inventory service: {e}") raise HTTPException(status_code=503, detail="Inventory service unavailable") except HTTPException: raise except Exception as e: logger.error(f"Failed to list equipment sets: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") # -------------------- WebSocket endpoints ----------------------- ## WebSocket connection tracking # Set of browser WebSocket clients subscribed to live updates browser_conns: set[WebSocket] = set() # Mapping of plugin clients by character_name to their WebSocket for command forwarding plugin_conns: Dict[str, WebSocket] = {} async def _broadcast_to_browser_clients(snapshot: dict): """Broadcast a telemetry or chat message to all connected browser clients. Converts any non-serializable types (e.g., datetime) before sending. Handles connection errors gracefully and removes stale connections. """ # Convert snapshot payload to JSON-friendly types data = jsonable_encoder(snapshot) # Use list() to avoid "set changed size during iteration" errors disconnected_clients = [] for ws in list(browser_conns): try: await ws.send_json(data) except (WebSocketDisconnect, RuntimeError, ConnectionAbortedError) as e: # Collect disconnected clients for cleanup disconnected_clients.append(ws) logger.debug(f"Detected disconnected browser client: {e}") except Exception as e: # Handle any other unexpected errors disconnected_clients.append(ws) logger.warning(f"Unexpected error broadcasting to browser client: {e}") # Clean up disconnected clients for ws in disconnected_clients: browser_conns.discard(ws) async def _forward_to_inventory_service(inventory_msg: FullInventoryMessage): """Forward inventory data to the inventory microservice for processing.""" try: # Prepare data for inventory service inventory_data = { "character_name": inventory_msg.character_name, "timestamp": inventory_msg.timestamp.isoformat(), "items": inventory_msg.items } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( f"{INVENTORY_SERVICE_URL}/process-inventory", json=inventory_data ) if response.status_code == 200: result = response.json() logger.info(f"Inventory service processed {result['processed']} items for {inventory_msg.character_name}") else: logger.error(f"Inventory service error {response.status_code}: {response.text}") except Exception as e: logger.error(f"Failed to forward inventory to service: {e}") # Don't raise - this shouldn't block the main storage async def _store_inventory(inventory_msg: FullInventoryMessage): """Forward inventory data to inventory microservice for processing and storage.""" try: # Forward to inventory microservice for enhanced processing and storage await _forward_to_inventory_service(inventory_msg) # Optional: Create JSON file for debugging (can be removed in production) inventory_dir = Path("./inventory") inventory_dir.mkdir(exist_ok=True) file_path = inventory_dir / f"{inventory_msg.character_name}_inventory.json" inventory_data = { "character_name": inventory_msg.character_name, "timestamp": inventory_msg.timestamp.isoformat(), "item_count": inventory_msg.item_count, "items": inventory_msg.items } with open(file_path, 'w') as f: json.dump(inventory_data, f, indent=2) except Exception as e: logger.error(f"Failed to forward inventory for {inventory_msg.character_name}: {e}", exc_info=True) raise @app.websocket("/ws/position") async def ws_receive_snapshots( websocket: WebSocket, secret: str | None = Query(None), x_plugin_secret: str | None = Header(None) ): """WebSocket endpoint for plugin clients to send telemetry and events. Validates a shared secret for authentication, then listens for messages of various types (register, spawn, telemetry, rare, chat) and handles each: - register: record plugin WebSocket for command forwarding - spawn: persist spawn event - telemetry: store snapshot, update stats, broadcast to browsers - rare: update total and session rare counts, persist event - chat: broadcast chat messages to browsers """ # Authenticate plugin connection using shared secret key = secret or x_plugin_secret if key != SHARED_SECRET: # Reject without completing the WebSocket handshake logger.warning(f"Plugin WebSocket authentication failed from {websocket.client}") await websocket.close(code=1008) return # Accept the WebSocket connection await websocket.accept() logger.info(f"Plugin WebSocket connected: {websocket.client}") try: while True: # Read next text frame try: raw = await websocket.receive_text() # Debug: log all incoming plugin WebSocket messages logger.debug(f"Plugin WebSocket RX from {websocket.client}: {raw}") except WebSocketDisconnect: logger.info(f"Plugin WebSocket disconnected: {websocket.client}") break # Parse JSON payload try: data = json.loads(raw) except json.JSONDecodeError as e: logger.warning(f"Invalid JSON from plugin {websocket.client}: {e}") continue msg_type = data.get("type") # --- Registration: associate character_name with this plugin socket --- if msg_type == "register": name = data.get("character_name") or data.get("player_name") if isinstance(name, str): plugin_conns[name] = websocket logger.info(f"Registered plugin connection for character: {name}") continue # --- Spawn event: persist to spawn_events table --- if msg_type == "spawn": payload = data.copy() payload.pop("type", None) try: spawn = SpawnEvent.parse_obj(payload) await database.execute( spawn_events.insert().values(**spawn.dict()) ) logger.debug(f"Recorded spawn event: {spawn.mob} by {spawn.character_name}") except Exception as e: logger.error(f"Failed to process spawn event: {e}") continue continue # --- Telemetry message: persist snapshot and update kill stats --- if msg_type == "telemetry": # Parse telemetry snapshot and update in-memory state payload = data.copy() payload.pop("type", None) try: snap = TelemetrySnapshot.parse_obj(payload) live_snapshots[snap.character_name] = snap.dict() # Prepare data and compute kill delta db_data = snap.dict() db_data['rares_found'] = 0 key = (snap.session_id, snap.character_name) # Get last recorded kill count for this session if key in ws_receive_snapshots._last_kills: last = ws_receive_snapshots._last_kills[key] else: # Cache miss - check database for last kill count for this session row = await database.fetch_one( "SELECT kills FROM telemetry_events WHERE character_name = :char AND session_id = :session ORDER BY timestamp DESC LIMIT 1", {"char": snap.character_name, "session": snap.session_id} ) last = row["kills"] if row else 0 logger.debug(f"Cache miss for {snap.character_name} session {snap.session_id[:8]}: loaded last_kills={last} from database") delta = snap.kills - last # Persist snapshot and any kill delta in a single transaction try: async with database.transaction(): await database.execute( telemetry_events.insert().values(**db_data) ) if delta > 0: stmt = pg_insert(char_stats).values( character_name=snap.character_name, total_kills=delta ).on_conflict_do_update( index_elements=["character_name"], set_={"total_kills": char_stats.c.total_kills + delta}, ) await database.execute(stmt) logger.debug(f"Updated kills for {snap.character_name}: +{delta} (total from {last} to {snap.kills})") ws_receive_snapshots._last_kills[key] = snap.kills except Exception as db_error: logger.error(f"๐Ÿ’พ Database transaction failed for {snap.character_name} (session: {snap.session_id[:8]}): {db_error}", exc_info=True) continue # Broadcast updated snapshot to all browser clients await _broadcast_to_browser_clients(snap.dict()) logger.debug(f"โœ… Processed telemetry from {snap.character_name} (session: {snap.session_id[:8]}, kills: {snap.kills})") except Exception as e: logger.error(f"โŒ Failed to process telemetry event from {data.get('character_name', 'unknown')}: {e}", exc_info=True) continue # --- Rare event: update total and session counters and persist --- if msg_type == "rare": name = data.get("character_name") if isinstance(name, str) and name.strip(): try: # Total rare count per character stmt_tot = pg_insert(rare_stats).values( character_name=name, total_rares=1 ).on_conflict_do_update( index_elements=["character_name"], set_={"total_rares": rare_stats.c.total_rares + 1}, ) await database.execute(stmt_tot) # Session-specific rare count (use live cache or fallback to latest telemetry) session_id = live_snapshots.get(name, {}).get("session_id") if not session_id: row = await database.fetch_one( "SELECT session_id FROM telemetry_events" " WHERE character_name = :name" " ORDER BY timestamp DESC LIMIT 1", {"name": name} ) if row: session_id = row["session_id"] if session_id: stmt_sess = pg_insert(rare_stats_sessions).values( character_name=name, session_id=session_id, session_rares=1 ).on_conflict_do_update( index_elements=["character_name", "session_id"], set_={"session_rares": rare_stats_sessions.c.session_rares + 1}, ) await database.execute(stmt_sess) # Persist individual rare event for future analysis payload = data.copy() payload.pop("type", None) try: rare_ev = RareEvent.parse_obj(payload) await database.execute( rare_events.insert().values(**rare_ev.dict()) ) logger.info(f"Recorded rare event: {rare_ev.name} by {name}") # Broadcast rare event to browser clients for epic notifications await _broadcast_to_browser_clients(data) except Exception as e: logger.error(f"Failed to persist rare event: {e}") except Exception as e: logger.error(f"Failed to process rare event for {name}: {e}", exc_info=True) continue # --- Chat message: forward chat payload to browser clients --- if msg_type == "chat": await _broadcast_to_browser_clients(data) logger.debug(f"Broadcasted chat message from {data.get('character_name', 'unknown')}") continue # --- Full inventory message: store complete inventory snapshot --- if msg_type == "full_inventory": payload = data.copy() payload.pop("type", None) try: inventory_msg = FullInventoryMessage.parse_obj(payload) await _store_inventory(inventory_msg) logger.info(f"Stored inventory for {inventory_msg.character_name}: {inventory_msg.item_count} items") except Exception as e: logger.error(f"Failed to process inventory for {data.get('character_name', 'unknown')}: {e}", exc_info=True) continue # --- Vitals message: store character health/stamina/mana and broadcast --- if msg_type == "vitals": payload = data.copy() payload.pop("type", None) try: vitals_msg = VitalsMessage.parse_obj(payload) live_vitals[vitals_msg.character_name] = vitals_msg.dict() await _broadcast_to_browser_clients(data) logger.debug(f"Updated vitals for {vitals_msg.character_name}: {vitals_msg.health_percentage}% HP, {vitals_msg.stamina_percentage}% Stam, {vitals_msg.mana_percentage}% Mana") except Exception as e: logger.error(f"Failed to process vitals for {data.get('character_name', 'unknown')}: {e}", exc_info=True) continue # Unknown message types are ignored if msg_type: logger.warning(f"Unknown message type '{msg_type}' from {websocket.client}") finally: # Clean up any plugin registrations for this socket to_remove = [n for n, ws in plugin_conns.items() if ws is websocket] for n in to_remove: # Use pop() instead of del to avoid KeyError if already removed plugin_conns.pop(n, None) # Also clean up any entries in the kill tracking cache for this session # Remove entries that might be associated with disconnected clients stale_keys = [] for (session_id, char_name), _ in ws_receive_snapshots._last_kills.items(): if char_name in to_remove: stale_keys.append((session_id, char_name)) for key in stale_keys: ws_receive_snapshots._last_kills.pop(key, None) if to_remove: logger.info(f"Cleaned up plugin connections for characters: {to_remove} from {websocket.client}") if stale_keys: logger.debug(f"Cleaned up {len(stale_keys)} kill tracking cache entries") else: logger.debug(f"No plugin registrations to clean up for {websocket.client}") # In-memory cache of last seen kill counts per (session_id, character_name) # Used to compute deltas for updating persistent kill statistics efficiently ws_receive_snapshots._last_kills = {} async def cleanup_stale_connections(): """Periodic cleanup of stale WebSocket connections. This function can be called periodically to clean up connections that may have become stale but weren't properly cleaned up. """ # Clean up plugin connections that no longer have valid WebSockets stale_plugins = [] for char_name, ws in list(plugin_conns.items()): try: # Test if the WebSocket is still alive by checking its state if ws.client_state.name != 'CONNECTED': stale_plugins.append(char_name) except Exception: # If we can't check the state, consider it stale stale_plugins.append(char_name) for char_name in stale_plugins: plugin_conns.pop(char_name, None) logger.info(f"Cleaned up stale plugin connection: {char_name}") # Clean up browser connections stale_browsers = [] for ws in list(browser_conns): try: if ws.client_state.name != 'CONNECTED': stale_browsers.append(ws) except Exception: stale_browsers.append(ws) for ws in stale_browsers: browser_conns.discard(ws) if stale_browsers: logger.info(f"Cleaned up {len(stale_browsers)} stale browser connections") logger.debug(f"Connection health check: {len(plugin_conns)} plugins, {len(browser_conns)} browsers") @app.websocket("/ws/live") async def ws_live_updates(websocket: WebSocket): """WebSocket endpoint for browser clients to receive live updates and send commands. Manages a set of connected browser clients; listens for incoming command messages and forwards them to the appropriate plugin client WebSocket. """ # Add new browser client to the set await websocket.accept() browser_conns.add(websocket) logger.info(f"Browser WebSocket connected: {websocket.client}") try: while True: # Receive command messages from browser try: data = await websocket.receive_json() # Debug: log all incoming browser WebSocket messages logger.debug(f"Browser WebSocket RX from {websocket.client}: {data}") except WebSocketDisconnect: logger.info(f"Browser WebSocket disconnected: {websocket.client}") break # Determine command envelope format (new or legacy) if "player_name" in data and "command" in data: # New format: { player_name, command } target_name = data["player_name"] payload = data elif data.get("type") == "command" and "character_name" in data and "text" in data: # Legacy format: { type: 'command', character_name, text } target_name = data.get("character_name") payload = {"player_name": target_name, "command": data.get("text")} else: # Not a recognized command envelope continue # Forward command envelope to the appropriate plugin WebSocket target_ws = plugin_conns.get(target_name) if target_ws: try: await target_ws.send_json(payload) logger.debug(f"Forwarded command to plugin for {target_name}: {payload}") except (WebSocketDisconnect, RuntimeError, ConnectionAbortedError) as e: logger.warning(f"Failed to forward command to {target_name}: {e}") # Remove stale connection plugin_conns.pop(target_name, None) except Exception as e: logger.error(f"Unexpected error forwarding command to {target_name}: {e}") # Remove potentially corrupted connection plugin_conns.pop(target_name, None) else: logger.warning(f"No plugin connection found for target character: {target_name}") except WebSocketDisconnect: pass finally: browser_conns.discard(websocket) logger.debug(f"Removed browser WebSocket from connection pool: {websocket.client}") ## -------------------- static frontend --------------------------- ## (static mount moved to end of file, below API routes) # list routes for convenience logger.info("๐Ÿ” Registered HTTP API routes:") for route in app.routes: if isinstance(route, APIRoute): # Log the path and allowed methods for each API route logger.info(f"{route.path} -> {route.methods}") # Add stats endpoint for per-character metrics @app.get("/stats/{character_name}") async def get_stats(character_name: str): """ HTTP GET endpoint to retrieve per-character metrics: - latest_snapshot: most recent telemetry entry for the character - total_kills: accumulated kills from char_stats - total_rares: accumulated rares from rare_stats Returns 404 if character has no recorded telemetry. """ try: # Single optimized query with LEFT JOINs to get all data in one round trip sql = """ WITH latest AS ( SELECT * FROM telemetry_events WHERE character_name = :cn ORDER BY timestamp DESC LIMIT 1 ) SELECT l.*, COALESCE(cs.total_kills, 0) as total_kills, COALESCE(rs.total_rares, 0) as total_rares FROM latest l LEFT JOIN char_stats cs ON l.character_name = cs.character_name LEFT JOIN rare_stats rs ON l.character_name = rs.character_name """ row = await database.fetch_one(sql, {"cn": character_name}) if not row: logger.warning(f"No telemetry data found for character: {character_name}") raise HTTPException(status_code=404, detail="Character not found") # Extract latest snapshot data (exclude the added total_kills/total_rares) snap_dict = {k: v for k, v in dict(row).items() if k not in ("total_kills", "total_rares")} result = { "character_name": character_name, "latest_snapshot": snap_dict, "total_kills": row["total_kills"], "total_rares": row["total_rares"], } logger.debug(f"Retrieved stats for character: {character_name} (optimized query)") return JSONResponse(content=jsonable_encoder(result)) except HTTPException: raise except Exception as e: logger.error(f"Failed to get stats for character {character_name}: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") # -------------------- static frontend --------------------------- # Custom icon handler that prioritizes clean icons over originals from fastapi.responses import FileResponse @app.get("/icons/{icon_filename}") async def serve_icon(icon_filename: str): """Serve icons from static/icons directory""" # Serve from static/icons directory icon_path = Path("static/icons") / icon_filename if icon_path.exists(): return FileResponse(icon_path, media_type="image/png") # Icon not found raise HTTPException(status_code=404, detail="Icon not found") # -------------------- Inventory Service Proxy --------------------------- @app.get("/inv/test") async def test_inventory_route(): """Test route to verify inventory proxy is working""" return {"message": "Inventory proxy route is working"} @app.api_route("/inv/{path:path}", methods=["GET", "POST"]) async def proxy_inventory_service(path: str, request: Request): """Proxy all inventory service requests""" try: inventory_service_url = os.getenv('INVENTORY_SERVICE_URL', 'http://inventory-service:8000') logger.info(f"Proxying to inventory service: {inventory_service_url}/{path}") # Forward the request to inventory service async with httpx.AsyncClient() as client: response = await client.request( method=request.method, url=f"{inventory_service_url}/{path}", params=request.query_params, headers=dict(request.headers), content=await request.body() ) return Response( content=response.content, status_code=response.status_code, headers=dict(response.headers) ) except Exception as e: logger.error(f"Failed to proxy inventory request: {e}") raise HTTPException(status_code=500, detail="Inventory service unavailable") # Icons are now served from static/icons directory # Serve SPA files (catch-all for frontend routes) # Mount the single-page application frontend (static assets) at root path app.mount("/", StaticFiles(directory="static", html=True), name="static")