Working version with new streaming and DB

This commit is contained in:
erik 2025-05-22 15:30:45 +00:00
parent c20d54d037
commit c418221575
8 changed files with 302 additions and 37 deletions

125
main.py
View file

@ -12,9 +12,16 @@ from pydantic import BaseModel
from typing import Optional
# Async database support
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert as pg_insert
from db_async import database, telemetry_events, char_stats, init_db_async
from db_async import (
database,
telemetry_events,
char_stats,
rare_stats,
rare_stats_sessions,
spawn_events,
init_db_async
)
import asyncio
# ------------------------------------------------------------------
@ -43,7 +50,8 @@ class TelemetrySnapshot(BaseModel):
kills_per_hour: Optional[float] = None
onlinetime: Optional[str] = None
deaths: int
rares_found: int
# Removed from telemetry payload; always enforced to 0 and tracked via rare events
rares_found: int = 0
prismatic_taper_count: int
vt_state: str
# Optional telemetry metrics
@ -53,11 +61,31 @@ class TelemetrySnapshot(BaseModel):
latency_ms: Optional[float] = None
class SpawnEvent(BaseModel):
character_name: str
mob: str
timestamp: datetime
ew: float
ns: float
z: float = 0.0
@app.on_event("startup")
async def on_startup():
# Connect to database and initialize TimescaleDB hypertable
await database.connect()
await init_db_async()
# Retry connecting to database on startup to handle DB readiness delays
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
await database.connect()
await init_db_async()
print(f"DB connected on attempt {attempt}")
break
except Exception as e:
print(f"DB connection failed (attempt {attempt}/{max_attempts}): {e}")
if attempt < max_attempts:
await asyncio.sleep(5)
else:
raise RuntimeError(f"Could not connect to database after {max_attempts} attempts")
@app.on_event("shutdown")
async def on_shutdown():
@ -77,19 +105,27 @@ def debug():
async def get_live_players():
"""Return recent live telemetry per character (last 30 seconds)."""
cutoff = datetime.now(timezone.utc) - ACTIVE_WINDOW
query = text(
"""
SELECT * FROM (
# Include rare counts: total and session-specific
sql = """
SELECT sub.*,
COALESCE(rs.total_rares, 0) AS total_rares,
COALESCE(rss.session_rares, 0) AS session_rares
FROM (
SELECT DISTINCT ON (character_name) *
FROM telemetry_events
ORDER BY character_name, timestamp DESC
) sub
WHERE timestamp > :cutoff
"""
)
rows = await database.fetch_all(query, {"cutoff": cutoff})
LEFT JOIN rare_stats rs
ON sub.character_name = rs.character_name
LEFT JOIN rare_stats_sessions rss
ON sub.character_name = rss.character_name
AND sub.session_id = rss.session_id
WHERE sub.timestamp > :cutoff
"""
rows = await database.fetch_all(sql, {"cutoff": cutoff})
players = [dict(r) for r in rows]
return JSONResponse(content={"players": players})
# Ensure all types (e.g. datetime) are JSON serializable
return JSONResponse(content=jsonable_encoder({"players": players}))
@app.get("/history/")
@ -114,7 +150,7 @@ async def get_history(
if conditions:
sql += " WHERE " + " AND ".join(conditions)
sql += " ORDER BY timestamp"
rows = await database.fetch_all(text(sql), values)
rows = await database.fetch_all(sql, values)
data = [
{
"timestamp": row["timestamp"],
@ -124,7 +160,8 @@ async def get_history(
}
for row in rows
]
return JSONResponse(content={"data": data})
# Ensure all types (e.g. datetime) are JSON serializable
return JSONResponse(content=jsonable_encoder({"data": data}))
# --- GET Trails ---------------------------------
@ -135,14 +172,12 @@ async def get_trails(
):
"""Return position snapshots (timestamp, character_name, ew, ns, z) for the past `seconds`."""
cutoff = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=seconds)
sql = text(
"""
sql = """
SELECT timestamp, character_name, ew, ns, z
FROM telemetry_events
WHERE timestamp >= :cutoff
ORDER BY character_name, timestamp
"""
)
"""
rows = await database.fetch_all(sql, {"cutoff": cutoff})
trails = [
{
@ -154,7 +189,8 @@ async def get_trails(
}
for r in rows
]
return JSONResponse(content={"trails": trails})
# Ensure all types (e.g. datetime) are JSON serializable
return JSONResponse(content=jsonable_encoder({"trails": trails}))
# -------------------- WebSocket endpoints -----------------------
browser_conns: set[WebSocket] = set()
@ -207,16 +243,30 @@ async def ws_receive_snapshots(
if isinstance(name, str):
plugin_conns[name] = websocket
continue
# Spawn event: persist spawn for heatmaps
if msg_type == "spawn":
payload = data.copy()
payload.pop("type", None)
try:
spawn = SpawnEvent.parse_obj(payload)
except Exception:
continue
await database.execute(
spawn_events.insert().values(**spawn.dict())
)
continue
# Telemetry message: save to DB and broadcast
if msg_type == "telemetry":
# Parse and broadcast telemetry snapshot
# Parse telemetry snapshot and update in-memory state
payload = data.copy()
payload.pop("type", None)
snap = TelemetrySnapshot.parse_obj(payload)
live_snapshots[snap.character_name] = snap.dict()
# Persist to TimescaleDB
# Persist snapshot to TimescaleDB, force rares_found=0
db_data = snap.dict()
db_data['rares_found'] = 0
await database.execute(
telemetry_events.insert().values(**snap.dict())
telemetry_events.insert().values(**db_data)
)
# Update persistent kill stats (delta per session)
key = (snap.session_id, snap.character_name)
@ -232,8 +282,35 @@ async def ws_receive_snapshots(
)
await database.execute(stmt)
ws_receive_snapshots._last_kills[key] = snap.kills
# Broadcast to browser clients
await _broadcast_to_browser_clients(snap.dict())
continue
# Rare event: increment total and session counts
if msg_type == "rare":
name = data.get("character_name")
if isinstance(name, str):
# Total rare count per character
stmt_tot = pg_insert(rare_stats).values(
character_name=name,
total_rares=1
).on_conflict_do_update(
index_elements=["character_name"],
set_={"total_rares": rare_stats.c.total_rares + 1},
)
await database.execute(stmt_tot)
# Session-specific rare count
session_id = live_snapshots.get(name, {}).get("session_id")
if session_id:
stmt_sess = pg_insert(rare_stats_sessions).values(
character_name=name,
session_id=session_id,
session_rares=1
).on_conflict_do_update(
index_elements=["character_name", "session_id"],
set_={"session_rares": rare_stats_sessions.c.session_rares + 1},
)
await database.execute(stmt_sess)
continue
# Chat message: broadcast to browser clients only (no DB write)
if msg_type == "chat":
await _broadcast_to_browser_clients(data)