diff --git a/Dockerfile b/Dockerfile index 118f0807..3be29c8f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -38,4 +38,4 @@ ENV DATABASE_URL=postgresql://postgres:password@db:5432/dereth \ SHARED_SECRET=your_shared_secret ## Launch the FastAPI app using Uvicorn -CMD ["uvicorn","main:app","--host","0.0.0.0","--port","8765","--reload","--workers","1"] +CMD ["uvicorn","main:app","--host","0.0.0.0","--port","8765","--reload","--workers","1","--no-access-log","--log-level","warning"] diff --git a/docker-compose.yml b/docker-compose.yml index 534ebf44..ef4c840d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,6 +25,7 @@ services: DB_MAX_SQL_VARIABLES: "${DB_MAX_SQL_VARIABLES}" DB_WAL_AUTOCHECKPOINT_PAGES: "${DB_WAL_AUTOCHECKPOINT_PAGES}" SHARED_SECRET: "${SHARED_SECRET}" + LOG_LEVEL: "${LOG_LEVEL:-INFO}" restart: unless-stopped logging: driver: "json-file" diff --git a/grafana/dashboards/dereth_tracker_dashboard.json b/grafana/dashboards/dereth_tracker_dashboard.json new file mode 100644 index 00000000..4ec57674 --- /dev/null +++ b/grafana/dashboards/dereth_tracker_dashboard.json @@ -0,0 +1,85 @@ +{ + "id": null, + "uid": "dereth-tracker", + "title": "Dereth Tracker Dashboard", + "schemaVersion": 30, + "version": 1, + "refresh": "10s", + "panels": [ + { + "type": "timeseries", + "title": "Kills per Hour", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 }, + "id": 1, + "fieldConfig": { "defaults": {}, "overrides": [] }, + "targets": [ + { + "refId": "A", + "format": "time_series", + "datasource": { "uid": "dereth-db" }, + "rawSql": "SELECT $__time(timestamp), kills_per_hour AS value FROM telemetry_events WHERE character_name = '$character' ORDER BY timestamp" + } + ] + }, + { + "type": "timeseries", + "title": "Memory (MB)", + "gridPos": { "h": 8, "w": 6, "x": 0, "y": 8 }, + "id": 2, + "fieldConfig": { "defaults": {}, "overrides": [] }, + "targets": [ + { + "refId": "A", + "format": "time_series", + "datasource": { "uid": "dereth-db" }, + "rawSql": "SELECT $__time(timestamp), mem_mb AS value FROM telemetry_events WHERE character_name = '$character' ORDER BY timestamp" + } + ] + }, + { + "type": "timeseries", + "title": "CPU (%)", + "gridPos": { "h": 8, "w": 6, "x": 6, "y": 8 }, + "id": 3, + "fieldConfig": { "defaults": {}, "overrides": [] }, + "targets": [ + { + "refId": "A", + "format": "time_series", + "datasource": { "uid": "dereth-db" }, + "rawSql": "SELECT $__time(timestamp), cpu_pct AS value FROM telemetry_events WHERE character_name = '$character' ORDER BY timestamp" + } + ] + }, + { + "type": "timeseries", + "title": "Mem Handles", + "gridPos": { "h": 8, "w": 6, "x": 0, "y": 16 }, + "id": 4, + "fieldConfig": { "defaults": {}, "overrides": [] }, + "targets": [ + { + "refId": "A", + "format": "time_series", + "datasource": { "uid": "dereth-db" }, + "rawSql": "SELECT $__time(timestamp), mem_handles AS value FROM telemetry_events WHERE character_name = '$character' ORDER BY timestamp" + } + ] + } + ], + "templating": { + "list": [ + { + "type": "query", + "name": "character", + "label": "Character", + "datasource": { "uid": "dereth-db" }, + "query": "SELECT DISTINCT character_name FROM telemetry_events ORDER BY character_name", + "refresh": 2, + "sort": 1, + "multi": false, + "includeAll": false + } + ] + } +} \ No newline at end of file diff --git a/grafana/provisioning/dashboards/dashboards.yaml b/grafana/provisioning/dashboards/dashboards.yaml new file mode 100644 index 00000000..b6c4c55b --- /dev/null +++ b/grafana/provisioning/dashboards/dashboards.yaml @@ -0,0 +1,11 @@ +apiVersion: 1 + +providers: + - name: 'dereth_dashboards' + orgId: 1 + folder: '' + type: file + disableDeletion: false + updateIntervalSeconds: 30 + options: + path: /var/lib/grafana/dashboards \ No newline at end of file diff --git a/grafana/provisioning/datasources/datasource.yaml b/grafana/provisioning/datasources/datasource.yaml new file mode 100644 index 00000000..7137fb4a --- /dev/null +++ b/grafana/provisioning/datasources/datasource.yaml @@ -0,0 +1,16 @@ +apiVersion: 1 + +datasources: + - name: DerethDB + uid: dereth-db + type: postgres + access: proxy + url: db:5432 + database: dereth + user: postgres + # Securely provision the password + secureJsonData: + password: ${POSTGRES_PASSWORD} + jsonData: + sslmode: disable + postgresVersion: 1400 \ No newline at end of file diff --git a/main.py b/main.py index 5600afa9..eb868027 100644 --- a/main.py +++ b/main.py @@ -7,7 +7,9 @@ endpoints for browser clients to retrieve live and historical data, trails, and """ from datetime import datetime, timedelta, timezone import json +import logging import os +import sys from typing import Dict from fastapi import FastAPI, Header, HTTPException, Query, WebSocket, WebSocketDisconnect @@ -31,6 +33,20 @@ from db_async import ( init_db_async ) import asyncio + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(sys.stdout), + ] +) +logger = logging.getLogger(__name__) + +# Get log level from environment (DEBUG, INFO, WARNING, ERROR) +log_level = os.getenv('LOG_LEVEL', 'INFO').upper() +logger.setLevel(getattr(logging, log_level, logging.INFO)) # In-memory caches for REST endpoints _cached_live: dict = {"players": []} _cached_trails: dict = {"trails": []} @@ -75,7 +91,7 @@ async def _refresh_cache_loop() -> None: for r in rows2 ] except Exception as e: - print(f"[CACHE] refresh error: {e}") + logger.error(f"Cache refresh failed: {e}", exc_info=True) await asyncio.sleep(5) # ------------------------------------------------------------------ @@ -159,10 +175,10 @@ async def on_startup(): try: await database.connect() await init_db_async() - print(f"DB connected on attempt {attempt}") + logger.info(f"Database connected successfully on attempt {attempt}") break except Exception as e: - print(f"DB connection failed (attempt {attempt}/{max_attempts}): {e}") + logger.warning(f"Database connection failed (attempt {attempt}/{max_attempts}): {e}") if attempt < max_attempts: await asyncio.sleep(5) else: @@ -170,6 +186,7 @@ async def on_startup(): # Start background cache refresh (live & trails) global _cache_task _cache_task = asyncio.create_task(_refresh_cache_loop()) + logger.info("Background cache refresh task started") @app.on_event("shutdown") async def on_shutdown(): """Event handler triggered when application is shutting down. @@ -179,11 +196,13 @@ async def on_shutdown(): # Stop cache refresh task global _cache_task if _cache_task: + logger.info("Stopping background cache refresh task") _cache_task.cancel() try: await _cache_task except asyncio.CancelledError: pass + logger.info("Disconnecting from database") await database.disconnect() @@ -198,7 +217,11 @@ def debug(): @app.get("/live/", response_model=dict) async def get_live_players(): """Return cached live telemetry per character.""" - return JSONResponse(content=jsonable_encoder(_cached_live)) + try: + return JSONResponse(content=jsonable_encoder(_cached_live)) + except Exception as e: + logger.error(f"Failed to get live players: {e}", exc_info=True) + raise HTTPException(status_code=500, detail="Internal server error") @@ -210,7 +233,11 @@ async def get_trails( seconds: int = Query(600, ge=0, description="Lookback window in seconds"), ): """Return cached trails (updated every 5 seconds).""" - return JSONResponse(content=jsonable_encoder(_cached_trails)) + try: + return JSONResponse(content=jsonable_encoder(_cached_trails)) + except Exception as e: + logger.error(f"Failed to get trails: {e}", exc_info=True) + raise HTTPException(status_code=500, detail="Internal server error") # -------------------- WebSocket endpoints ----------------------- ## WebSocket connection tracking @@ -229,8 +256,9 @@ async def _broadcast_to_browser_clients(snapshot: dict): for ws in list(browser_conns): try: await ws.send_json(data) - except WebSocketDisconnect: - browser_conns.remove(ws) + except (WebSocketDisconnect, RuntimeError) as e: + browser_conns.discard(ws) + logger.debug(f"Removed disconnected browser client from broadcast list: {e}") @app.websocket("/ws/position") async def ws_receive_snapshots( @@ -252,25 +280,27 @@ async def ws_receive_snapshots( key = secret or x_plugin_secret if key != SHARED_SECRET: # Reject without completing the WebSocket handshake + logger.warning(f"Plugin WebSocket authentication failed from {websocket.client}") await websocket.close(code=1008) return # Accept the WebSocket connection await websocket.accept() - print(f"[WS] Plugin connected: {websocket.client}") + logger.info(f"Plugin WebSocket connected: {websocket.client}") try: while True: # Read next text frame try: raw = await websocket.receive_text() - # Debug: log all incoming plugin WebSocket messages - print(f"[WS-PLUGIN RX] {websocket.client}: {raw}") + # Debug: log all incoming plugin WebSocket messages + logger.debug(f"Plugin WebSocket RX from {websocket.client}: {raw}") except WebSocketDisconnect: - print(f"[WS] Plugin disconnected: {websocket.client}") + logger.info(f"Plugin WebSocket disconnected: {websocket.client}") break # Parse JSON payload try: data = json.loads(raw) - except json.JSONDecodeError: + except json.JSONDecodeError as e: + logger.warning(f"Invalid JSON from plugin {websocket.client}: {e}") continue msg_type = data.get("type") # --- Registration: associate character_name with this plugin socket --- @@ -278,6 +308,7 @@ async def ws_receive_snapshots( name = data.get("character_name") or data.get("player_name") if isinstance(name, str): plugin_conns[name] = websocket + logger.info(f"Registered plugin connection for character: {name}") continue # --- Spawn event: persist to spawn_events table --- if msg_type == "spawn": @@ -285,90 +316,120 @@ async def ws_receive_snapshots( payload.pop("type", None) try: spawn = SpawnEvent.parse_obj(payload) - except Exception: + await database.execute( + spawn_events.insert().values(**spawn.dict()) + ) + logger.debug(f"Recorded spawn event: {spawn.mob} by {spawn.character_name}") + except Exception as e: + logger.error(f"Failed to process spawn event: {e}") continue - await database.execute( - spawn_events.insert().values(**spawn.dict()) - ) continue # --- Telemetry message: persist snapshot and update kill stats --- if msg_type == "telemetry": # Parse telemetry snapshot and update in-memory state payload = data.copy() payload.pop("type", None) - snap = TelemetrySnapshot.parse_obj(payload) - live_snapshots[snap.character_name] = snap.dict() - # Prepare data and compute kill delta - db_data = snap.dict() - db_data['rares_found'] = 0 - key = (snap.session_id, snap.character_name) - last = ws_receive_snapshots._last_kills.get(key, 0) - delta = snap.kills - last - # Persist snapshot and any kill delta in a single transaction - async with database.transaction(): - await database.execute( - telemetry_events.insert().values(**db_data) - ) - if delta > 0: - stmt = pg_insert(char_stats).values( - character_name=snap.character_name, - total_kills=delta - ).on_conflict_do_update( - index_elements=["character_name"], - set_={"total_kills": char_stats.c.total_kills + delta}, - ) - await database.execute(stmt) - ws_receive_snapshots._last_kills[key] = snap.kills - # Broadcast updated snapshot to all browser clients - await _broadcast_to_browser_clients(snap.dict()) + try: + snap = TelemetrySnapshot.parse_obj(payload) + live_snapshots[snap.character_name] = snap.dict() + # Prepare data and compute kill delta + db_data = snap.dict() + db_data['rares_found'] = 0 + key = (snap.session_id, snap.character_name) + last = ws_receive_snapshots._last_kills.get(key, 0) + delta = snap.kills - last + # Persist snapshot and any kill delta in a single transaction + try: + async with database.transaction(): + await database.execute( + telemetry_events.insert().values(**db_data) + ) + if delta > 0: + stmt = pg_insert(char_stats).values( + character_name=snap.character_name, + total_kills=delta + ).on_conflict_do_update( + index_elements=["character_name"], + set_={"total_kills": char_stats.c.total_kills + delta}, + ) + await database.execute(stmt) + logger.debug(f"Updated kills for {snap.character_name}: +{delta} (total from {last} to {snap.kills})") + ws_receive_snapshots._last_kills[key] = snap.kills + except Exception as db_error: + logger.error(f"Database transaction failed for {snap.character_name}: {db_error}", exc_info=True) + continue + # Broadcast updated snapshot to all browser clients + await _broadcast_to_browser_clients(snap.dict()) + logger.debug(f"Processed telemetry from {snap.character_name}") + except Exception as e: + logger.error(f"Failed to process telemetry event: {e}", exc_info=True) continue # --- Rare event: update total and session counters and persist --- if msg_type == "rare": name = data.get("character_name") - if isinstance(name, str): - # Total rare count per character - stmt_tot = pg_insert(rare_stats).values( - character_name=name, - total_rares=1 - ).on_conflict_do_update( - index_elements=["character_name"], - set_={"total_rares": rare_stats.c.total_rares + 1}, - ) - await database.execute(stmt_tot) - # Session-specific rare count - session_id = live_snapshots.get(name, {}).get("session_id") - if session_id: - stmt_sess = pg_insert(rare_stats_sessions).values( - character_name=name, - session_id=session_id, - session_rares=1 - ).on_conflict_do_update( - index_elements=["character_name", "session_id"], - set_={"session_rares": rare_stats_sessions.c.session_rares + 1}, - ) - await database.execute(stmt_sess) - # Persist individual rare event for future analysis - payload = data.copy() - payload.pop("type", None) + if isinstance(name, str) and name.strip(): try: - rare_ev = RareEvent.parse_obj(payload) - await database.execute( - rare_events.insert().values(**rare_ev.dict()) + # Total rare count per character + stmt_tot = pg_insert(rare_stats).values( + character_name=name, + total_rares=1 + ).on_conflict_do_update( + index_elements=["character_name"], + set_={"total_rares": rare_stats.c.total_rares + 1}, ) - except Exception: - pass + await database.execute(stmt_tot) + # Session-specific rare count (use live cache or fallback to latest telemetry) + session_id = live_snapshots.get(name, {}).get("session_id") + if not session_id: + row = await database.fetch_one( + "SELECT session_id FROM telemetry_events" + " WHERE character_name = :name" + " ORDER BY timestamp DESC LIMIT 1", + {"name": name} + ) + if row: + session_id = row["session_id"] + if session_id: + stmt_sess = pg_insert(rare_stats_sessions).values( + character_name=name, + session_id=session_id, + session_rares=1 + ).on_conflict_do_update( + index_elements=["character_name", "session_id"], + set_={"session_rares": rare_stats_sessions.c.session_rares + 1}, + ) + await database.execute(stmt_sess) + # Persist individual rare event for future analysis + payload = data.copy() + payload.pop("type", None) + try: + rare_ev = RareEvent.parse_obj(payload) + await database.execute( + rare_events.insert().values(**rare_ev.dict()) + ) + logger.info(f"Recorded rare event: {rare_ev.name} by {name}") + except Exception as e: + logger.error(f"Failed to persist rare event: {e}") + except Exception as e: + logger.error(f"Failed to process rare event for {name}: {e}", exc_info=True) continue # --- Chat message: forward chat payload to browser clients --- if msg_type == "chat": await _broadcast_to_browser_clients(data) + logger.debug(f"Broadcasted chat message from {data.get('character_name', 'unknown')}") continue # Unknown message types are ignored + if msg_type: + logger.warning(f"Unknown message type '{msg_type}' from {websocket.client}") finally: # Clean up any plugin registrations for this socket to_remove = [n for n, ws in plugin_conns.items() if ws is websocket] for n in to_remove: del plugin_conns[n] - print(f"[WS] Cleaned up plugin connections for {websocket.client}") + if to_remove: + logger.info(f"Cleaned up plugin connections for characters: {to_remove} from {websocket.client}") + else: + logger.debug(f"No plugin registrations to clean up for {websocket.client}") # In-memory cache of last seen kill counts per (session_id, character_name) # Used to compute deltas for updating persistent kill statistics efficiently @@ -384,14 +445,16 @@ async def ws_live_updates(websocket: WebSocket): # Add new browser client to the set await websocket.accept() browser_conns.add(websocket) + logger.info(f"Browser WebSocket connected: {websocket.client}") try: while True: # Receive command messages from browser try: data = await websocket.receive_json() # Debug: log all incoming browser WebSocket messages - print(f"[WS-LIVE RX] {websocket.client}: {data}") + logger.debug(f"Browser WebSocket RX from {websocket.client}: {data}") except WebSocketDisconnect: + logger.info(f"Browser WebSocket disconnected: {websocket.client}") break # Determine command envelope format (new or legacy) if "player_name" in data and "command" in data: @@ -408,22 +471,31 @@ async def ws_live_updates(websocket: WebSocket): # Forward command envelope to the appropriate plugin WebSocket target_ws = plugin_conns.get(target_name) if target_ws: - await target_ws.send_json(payload) + try: + await target_ws.send_json(payload) + logger.debug(f"Forwarded command to plugin for {target_name}: {payload}") + except (WebSocketDisconnect, RuntimeError, ConnectionAbortedError) as e: + logger.warning(f"Failed to forward command to {target_name}: {e}") + # Remove stale connection + plugin_conns.pop(target_name, None) + else: + logger.warning(f"No plugin connection found for target character: {target_name}") except WebSocketDisconnect: pass finally: - browser_conns.remove(websocket) + browser_conns.discard(websocket) + logger.debug(f"Removed browser WebSocket from connection pool: {websocket.client}") ## -------------------- static frontend --------------------------- ## (static mount moved to end of file, below API routes) # list routes for convenience -print("🔍 Registered HTTP API routes:") +logger.info("🔍 Registered HTTP API routes:") for route in app.routes: if isinstance(route, APIRoute): # Log the path and allowed methods for each API route - print(f"{route.path} -> {route.methods}") + logger.info(f"{route.path} -> {route.methods}") # Add stats endpoint for per-character metrics @app.get("/stats/{character_name}") async def get_stats(character_name: str): @@ -434,31 +506,39 @@ async def get_stats(character_name: str): - total_rares: accumulated rares from rare_stats Returns 404 if character has no recorded telemetry. """ - # Latest snapshot - sql_snap = ( - "SELECT * FROM telemetry_events " - "WHERE character_name = :cn " - "ORDER BY timestamp DESC LIMIT 1" - ) - snap = await database.fetch_one(sql_snap, {"cn": character_name}) - if not snap: - raise HTTPException(status_code=404, detail="Character not found") - snap_dict = dict(snap) - # Total kills - sql_kills = "SELECT total_kills FROM char_stats WHERE character_name = :cn" - row_kills = await database.fetch_one(sql_kills, {"cn": character_name}) - total_kills = row_kills["total_kills"] if row_kills else 0 - # Total rares - sql_rares = "SELECT total_rares FROM rare_stats WHERE character_name = :cn" - row_rares = await database.fetch_one(sql_rares, {"cn": character_name}) - total_rares = row_rares["total_rares"] if row_rares else 0 - result = { - "character_name": character_name, - "latest_snapshot": snap_dict, - "total_kills": total_kills, - "total_rares": total_rares, - } - return JSONResponse(content=jsonable_encoder(result)) + try: + # Latest snapshot + sql_snap = ( + "SELECT * FROM telemetry_events " + "WHERE character_name = :cn " + "ORDER BY timestamp DESC LIMIT 1" + ) + snap = await database.fetch_one(sql_snap, {"cn": character_name}) + if not snap: + logger.warning(f"No telemetry data found for character: {character_name}") + raise HTTPException(status_code=404, detail="Character not found") + snap_dict = dict(snap) + # Total kills + sql_kills = "SELECT total_kills FROM char_stats WHERE character_name = :cn" + row_kills = await database.fetch_one(sql_kills, {"cn": character_name}) + total_kills = row_kills["total_kills"] if row_kills else 0 + # Total rares + sql_rares = "SELECT total_rares FROM rare_stats WHERE character_name = :cn" + row_rares = await database.fetch_one(sql_rares, {"cn": character_name}) + total_rares = row_rares["total_rares"] if row_rares else 0 + result = { + "character_name": character_name, + "latest_snapshot": snap_dict, + "total_kills": total_kills, + "total_rares": total_rares, + } + logger.debug(f"Retrieved stats for character: {character_name}") + return JSONResponse(content=jsonable_encoder(result)) + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to get stats for character {character_name}: {e}", exc_info=True) + raise HTTPException(status_code=500, detail="Internal server error") # -------------------- static frontend --------------------------- # Serve SPA files (catch-all for frontend routes)