new comments

This commit is contained in:
erik 2025-05-24 18:33:03 +00:00
parent b2f649a489
commit 09404da121
13 changed files with 430 additions and 70 deletions

103
main.py
View file

@ -1,3 +1,10 @@
"""
main.py - FastAPI-based telemetry server for Dereth Tracker.
This service ingests real-time position and event data from plugin clients via WebSockets,
stores telemetry and statistics in a TimescaleDB backend, and exposes HTTP and WebSocket
endpoints for browser clients to retrieve live and historical data, trails, and per-character stats.
"""
from datetime import datetime, timedelta, timezone
import json
import os
@ -27,14 +34,21 @@ import asyncio
# ------------------------------------------------------------------
app = FastAPI()
# test
# In-memory store of the last packet per character
# In-memory store mapping character_name to the most recent telemetry snapshot
live_snapshots: Dict[str, dict] = {}
# Shared secret used to authenticate plugin WebSocket connections (override for production)
SHARED_SECRET = "your_shared_secret"
# LOG_FILE = "telemetry_log.jsonl"
# ------------------------------------------------------------------
ACTIVE_WINDOW = timedelta(seconds=30) # player is “online” if seen in last 30 s
ACTIVE_WINDOW = timedelta(seconds=30) # Time window defining “online” players (last 30 seconds)
"""
Data models for plugin events:
- TelemetrySnapshot: periodic telemetry data from a player client
- SpawnEvent: information about a mob spawn event
- RareEvent: details of a rare mob event
"""
class TelemetrySnapshot(BaseModel):
@ -63,6 +77,10 @@ class TelemetrySnapshot(BaseModel):
class SpawnEvent(BaseModel):
"""
Model for a spawn event emitted by plugin clients when a mob appears.
Records character context, mob type, timestamp, and spawn location.
"""
character_name: str
mob: str
timestamp: datetime
@ -71,6 +89,10 @@ class SpawnEvent(BaseModel):
z: float = 0.0
class RareEvent(BaseModel):
"""
Model for a rare mob event when a player encounters or discovers a rare entity.
Includes character, event name, timestamp, and location coordinates.
"""
character_name: str
name: str
timestamp: datetime
@ -81,7 +103,11 @@ class RareEvent(BaseModel):
@app.on_event("startup")
async def on_startup():
# Retry connecting to database on startup to handle DB readiness delays
"""Event handler triggered when application starts up.
Attempts to connect to the database with retry logic to accommodate
potential startup delays (e.g., waiting for Postgres to be ready).
"""
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
@ -98,7 +124,10 @@ async def on_startup():
@app.on_event("shutdown")
async def on_shutdown():
# Disconnect from database
"""Event handler triggered when application is shutting down.
Ensures the database connection is closed cleanly.
"""
await database.disconnect()
@ -114,7 +143,9 @@ def debug():
async def get_live_players():
"""Return recent live telemetry per character (last 30 seconds)."""
cutoff = datetime.now(timezone.utc) - ACTIVE_WINDOW
# Include rare counts: total and session-specific
# Build SQL to select the most recent telemetry entry per character:
# - Use DISTINCT ON (character_name) to get latest row for each player
# - Join rare_stats for cumulative counts and rare_stats_sessions for session-specific counts
sql = """
SELECT sub.*,
COALESCE(rs.total_rares, 0) AS total_rares,
@ -144,18 +175,21 @@ async def get_history(
to_ts: str | None = Query(None, alias="to"),
):
"""Returns a time-ordered list of telemetry snapshots."""
# Base SQL query: fetch timestamp, character_name, kills, kills_per_hour (as kph)
sql = (
"SELECT timestamp, character_name, kills, kills_per_hour AS kph "
"FROM telemetry_events"
)
values: dict = {}
conditions: list[str] = []
# Apply filters if time bounds provided via 'from' and 'to' query parameters
if from_ts:
conditions.append("timestamp >= :from_ts")
values["from_ts"] = from_ts
if to_ts:
conditions.append("timestamp <= :to_ts")
values["to_ts"] = to_ts
# Concatenate WHERE clauses dynamically based on provided filters
if conditions:
sql += " WHERE " + " AND ".join(conditions)
sql += " ORDER BY timestamp"
@ -181,6 +215,7 @@ async def get_trails(
):
"""Return position snapshots (timestamp, character_name, ew, ns, z) for the past `seconds`."""
cutoff = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=seconds)
# Query position snapshots for all characters since the cutoff time
sql = """
SELECT timestamp, character_name, ew, ns, z
FROM telemetry_events
@ -202,12 +237,18 @@ async def get_trails(
return JSONResponse(content=jsonable_encoder({"trails": trails}))
# -------------------- WebSocket endpoints -----------------------
## WebSocket connection tracking
# Set of browser WebSocket clients subscribed to live updates
browser_conns: set[WebSocket] = set()
# Map of registered plugin clients: character_name -> WebSocket
# Mapping of plugin clients by character_name to their WebSocket for command forwarding
plugin_conns: Dict[str, WebSocket] = {}
async def _broadcast_to_browser_clients(snapshot: dict):
# Ensure all data (e.g. datetime) is JSON-serializable
"""Broadcast a telemetry or chat message to all connected browser clients.
Converts any non-serializable types (e.g., datetime) before sending.
"""
# Convert snapshot payload to JSON-friendly types
data = jsonable_encoder(snapshot)
for ws in list(browser_conns):
try:
@ -221,7 +262,17 @@ async def ws_receive_snapshots(
secret: str | None = Query(None),
x_plugin_secret: str | None = Header(None)
):
# Verify shared secret from query parameter or header
"""WebSocket endpoint for plugin clients to send telemetry and events.
Validates a shared secret for authentication, then listens for messages of
various types (register, spawn, telemetry, rare, chat) and handles each:
- register: record plugin WebSocket for command forwarding
- spawn: persist spawn event
- telemetry: store snapshot, update stats, broadcast to browsers
- rare: update total and session rare counts, persist event
- chat: broadcast chat messages to browsers
"""
# Authenticate plugin connection using shared secret
key = secret or x_plugin_secret
if key != SHARED_SECRET:
# Reject without completing the WebSocket handshake
@ -246,13 +297,13 @@ async def ws_receive_snapshots(
except json.JSONDecodeError:
continue
msg_type = data.get("type")
# Registration message: map character to this socket
# --- Registration: associate character_name with this plugin socket ---
if msg_type == "register":
name = data.get("character_name") or data.get("player_name")
if isinstance(name, str):
plugin_conns[name] = websocket
continue
# Spawn event: persist spawn for heatmaps
# --- Spawn event: persist to spawn_events table ---
if msg_type == "spawn":
payload = data.copy()
payload.pop("type", None)
@ -264,7 +315,7 @@ async def ws_receive_snapshots(
spawn_events.insert().values(**spawn.dict())
)
continue
# Telemetry message: save to DB and broadcast
# --- Telemetry message: persist snapshot and update kill stats ---
if msg_type == "telemetry":
# Parse telemetry snapshot and update in-memory state
payload = data.copy()
@ -291,10 +342,10 @@ async def ws_receive_snapshots(
)
await database.execute(stmt)
ws_receive_snapshots._last_kills[key] = snap.kills
# Broadcast to browser clients
# Broadcast updated snapshot to all browser clients
await _broadcast_to_browser_clients(snap.dict())
continue
# Rare event: increment total and session counts
# --- Rare event: update total and session counters and persist ---
if msg_type == "rare":
name = data.get("character_name")
if isinstance(name, str):
@ -330,7 +381,7 @@ async def ws_receive_snapshots(
except Exception:
pass
continue
# Chat message: broadcast to browser clients only (no DB write)
# --- Chat message: forward chat payload to browser clients ---
if msg_type == "chat":
await _broadcast_to_browser_clients(data)
continue
@ -342,12 +393,18 @@ async def ws_receive_snapshots(
del plugin_conns[n]
print(f"[WS] Cleaned up plugin connections for {websocket.client}")
# In-memory store of last kills per session for delta calculations
# In-memory cache of last seen kill counts per (session_id, character_name)
# Used to compute deltas for updating persistent kill statistics efficiently
ws_receive_snapshots._last_kills = {}
@app.websocket("/ws/live")
async def ws_live_updates(websocket: WebSocket):
# Browser clients connect here to receive telemetry and chat, and send commands
"""WebSocket endpoint for browser clients to receive live updates and send commands.
Manages a set of connected browser clients; listens for incoming command messages
and forwards them to the appropriate plugin client WebSocket.
"""
# Add new browser client to the set
await websocket.accept()
browser_conns.add(websocket)
try:
@ -385,14 +442,21 @@ async def ws_live_updates(websocket: WebSocket):
## (static mount moved to end of file, below API routes)
# list routes for convenience
print("🔍 Registered routes:")
print("🔍 Registered HTTP API routes:")
for route in app.routes:
if isinstance(route, APIRoute):
# Log the path and allowed methods for each API route
print(f"{route.path} -> {route.methods}")
# Add stats endpoint for per-character metrics
@app.get("/stats/{character_name}")
async def get_stats(character_name: str):
"""Return latest telemetry snapshot and aggregates for a specific character."""
"""
HTTP GET endpoint to retrieve per-character metrics:
- latest_snapshot: most recent telemetry entry for the character
- total_kills: accumulated kills from char_stats
- total_rares: accumulated rares from rare_stats
Returns 404 if character has no recorded telemetry.
"""
# Latest snapshot
sql_snap = (
"SELECT * FROM telemetry_events "
@ -421,4 +485,5 @@ async def get_stats(character_name: str):
# -------------------- static frontend ---------------------------
# Serve SPA files (catch-all for frontend routes)
# Mount the single-page application frontend (static assets) at root path
app.mount("/", StaticFiles(directory="static", html=True), name="static")