added epic counters
This commit is contained in:
parent
10c51f6825
commit
09a6cd4946
4 changed files with 963 additions and 8 deletions
114
main.py
114
main.py
|
|
@ -56,7 +56,9 @@ INVENTORY_SERVICE_URL = os.getenv('INVENTORY_SERVICE_URL', 'http://inventory-ser
|
|||
# In-memory caches for REST endpoints
|
||||
_cached_live: dict = {"players": []}
|
||||
_cached_trails: dict = {"trails": []}
|
||||
_cached_total_rares: dict = {"all_time": 0, "today": 0, "last_updated": None}
|
||||
_cache_task: asyncio.Task | None = None
|
||||
_rares_cache_task: asyncio.Task | None = None
|
||||
|
||||
async def _refresh_cache_loop() -> None:
|
||||
"""Background task: refresh `/live` and `/trails` caches every 5 seconds."""
|
||||
|
|
@ -131,10 +133,61 @@ async def _refresh_cache_loop() -> None:
|
|||
|
||||
await asyncio.sleep(5)
|
||||
|
||||
async def _refresh_total_rares_cache() -> None:
|
||||
"""Background task: refresh total rares cache every 5 minutes."""
|
||||
consecutive_failures = 0
|
||||
max_consecutive_failures = 3
|
||||
|
||||
while True:
|
||||
try:
|
||||
async with database.connection() as conn:
|
||||
# Get all-time total rares (sum of all characters) - gracefully handle missing table
|
||||
try:
|
||||
all_time_query = "SELECT COALESCE(SUM(total_rares), 0) as total FROM rare_stats"
|
||||
all_time_result = await conn.fetch_one(all_time_query)
|
||||
all_time_total = all_time_result["total"] if all_time_result else 0
|
||||
except Exception as e:
|
||||
logger.debug(f"rare_stats table not available: {e}")
|
||||
all_time_total = 0
|
||||
|
||||
# Get today's rares from rare_events table - gracefully handle missing table
|
||||
try:
|
||||
today_query = """
|
||||
SELECT COUNT(*) as today_count
|
||||
FROM rare_events
|
||||
WHERE timestamp >= CURRENT_DATE
|
||||
"""
|
||||
today_result = await conn.fetch_one(today_query)
|
||||
today_total = today_result["today_count"] if today_result else 0
|
||||
except Exception as e:
|
||||
logger.debug(f"rare_events table not available or empty: {e}")
|
||||
today_total = 0
|
||||
|
||||
# Update cache
|
||||
_cached_total_rares["all_time"] = all_time_total
|
||||
_cached_total_rares["today"] = today_total
|
||||
_cached_total_rares["last_updated"] = datetime.now(timezone.utc)
|
||||
|
||||
consecutive_failures = 0
|
||||
logger.debug(f"Total rares cache updated: All-time: {all_time_total}, Today: {today_total}")
|
||||
|
||||
except Exception as e:
|
||||
consecutive_failures += 1
|
||||
logger.error(f"Total rares cache refresh failed ({consecutive_failures}/{max_consecutive_failures}): {e}", exc_info=True)
|
||||
|
||||
if consecutive_failures >= max_consecutive_failures:
|
||||
logger.warning("Too many consecutive total rares cache failures, waiting longer...")
|
||||
await asyncio.sleep(60) # Wait longer on repeated failures
|
||||
continue
|
||||
|
||||
# Sleep for 5 minutes (300 seconds)
|
||||
await asyncio.sleep(300)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
app = FastAPI()
|
||||
# In-memory store mapping character_name to the most recent telemetry snapshot
|
||||
live_snapshots: Dict[str, dict] = {}
|
||||
live_vitals: Dict[str, dict] = {}
|
||||
|
||||
# Shared secret used to authenticate plugin WebSocket connections (override for production)
|
||||
SHARED_SECRET = "your_shared_secret"
|
||||
|
|
@ -212,6 +265,25 @@ class FullInventoryMessage(BaseModel):
|
|||
items: List[Dict[str, Any]]
|
||||
|
||||
|
||||
class VitalsMessage(BaseModel):
|
||||
"""
|
||||
Model for the vitals WebSocket message type.
|
||||
Contains character health, stamina, mana, and vitae information.
|
||||
"""
|
||||
character_name: str
|
||||
timestamp: datetime
|
||||
health_current: int
|
||||
health_max: int
|
||||
health_percentage: float
|
||||
stamina_current: int
|
||||
stamina_max: int
|
||||
stamina_percentage: float
|
||||
mana_current: int
|
||||
mana_max: int
|
||||
mana_percentage: float
|
||||
vitae: int
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def on_startup():
|
||||
"""Event handler triggered when application starts up.
|
||||
|
|
@ -238,17 +310,18 @@ async def on_startup():
|
|||
else:
|
||||
raise RuntimeError(f"Could not connect to database after {max_attempts} attempts")
|
||||
# Start background cache refresh (live & trails)
|
||||
global _cache_task
|
||||
global _cache_task, _rares_cache_task
|
||||
_cache_task = asyncio.create_task(_refresh_cache_loop())
|
||||
logger.info("Background cache refresh task started")
|
||||
_rares_cache_task = asyncio.create_task(_refresh_total_rares_cache())
|
||||
logger.info("Background cache refresh tasks started")
|
||||
@app.on_event("shutdown")
|
||||
async def on_shutdown():
|
||||
"""Event handler triggered when application is shutting down.
|
||||
|
||||
Ensures the database connection is closed cleanly.
|
||||
"""
|
||||
# Stop cache refresh task
|
||||
global _cache_task
|
||||
# Stop cache refresh tasks
|
||||
global _cache_task, _rares_cache_task
|
||||
if _cache_task:
|
||||
logger.info("Stopping background cache refresh task")
|
||||
_cache_task.cancel()
|
||||
|
|
@ -256,6 +329,14 @@ async def on_shutdown():
|
|||
await _cache_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
if _rares_cache_task:
|
||||
logger.info("Stopping total rares cache refresh task")
|
||||
_rares_cache_task.cancel()
|
||||
try:
|
||||
await _rares_cache_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logger.info("Disconnecting from database")
|
||||
await database.disconnect()
|
||||
|
||||
|
|
@ -294,6 +375,17 @@ async def get_trails(
|
|||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
|
||||
@app.get("/total-rares")
|
||||
@app.get("/total-rares/")
|
||||
async def get_total_rares():
|
||||
"""Return cached total rares statistics (updated every 5 minutes)."""
|
||||
try:
|
||||
return JSONResponse(content=jsonable_encoder(_cached_total_rares))
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get total rares: {e}", exc_info=True)
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
|
||||
# --- GET Inventory Endpoints ---------------------------------
|
||||
@app.get("/inventory/{character_name}")
|
||||
async def get_character_inventory(character_name: str):
|
||||
|
|
@ -669,6 +761,8 @@ async def ws_receive_snapshots(
|
|||
rare_events.insert().values(**rare_ev.dict())
|
||||
)
|
||||
logger.info(f"Recorded rare event: {rare_ev.name} by {name}")
|
||||
# Broadcast rare event to browser clients for epic notifications
|
||||
await _broadcast_to_browser_clients(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to persist rare event: {e}")
|
||||
except Exception as e:
|
||||
|
|
@ -690,6 +784,18 @@ async def ws_receive_snapshots(
|
|||
except Exception as e:
|
||||
logger.error(f"Failed to process inventory for {data.get('character_name', 'unknown')}: {e}", exc_info=True)
|
||||
continue
|
||||
# --- Vitals message: store character health/stamina/mana and broadcast ---
|
||||
if msg_type == "vitals":
|
||||
payload = data.copy()
|
||||
payload.pop("type", None)
|
||||
try:
|
||||
vitals_msg = VitalsMessage.parse_obj(payload)
|
||||
live_vitals[vitals_msg.character_name] = vitals_msg.dict()
|
||||
await _broadcast_to_browser_clients(data)
|
||||
logger.debug(f"Updated vitals for {vitals_msg.character_name}: {vitals_msg.health_percentage}% HP, {vitals_msg.stamina_percentage}% Stam, {vitals_msg.mana_percentage}% Mana")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process vitals for {data.get('character_name', 'unknown')}: {e}", exc_info=True)
|
||||
continue
|
||||
# Unknown message types are ignored
|
||||
if msg_type:
|
||||
logger.warning(f"Unknown message type '{msg_type}' from {websocket.client}")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue