diff --git a/db_async.py b/db_async.py index f2c28ab7..c7adf1dc 100644 --- a/db_async.py +++ b/db_async.py @@ -49,6 +49,11 @@ telemetry_events = Table( Column("cpu_pct", Float, nullable=True), Column("mem_handles", Integer, nullable=True), Column("latency_ms", Float, nullable=True), + # Server-side receive time. The `timestamp` column above is the CLIENT's + # self-reported wall clock and drifts up to ~90s across machines, so the + # "online" window must use this server-stamped value instead (see /live + # cache query). Nullable so pre-migration rows fall back to `timestamp`. + Column("received_at", DateTime(timezone=True), nullable=True), ) # Composite index to accelerate Grafana queries filtering by character_name then ordering by timestamp Index( @@ -256,6 +261,18 @@ async def init_db_async(): print( f"Warning: failed to create composite index ix_telemetry_events_char_ts: {e}" ) + # Add the server-receive-time column to existing deployments (idempotent). + # Used as the clock-skew-proof basis for the "online" window in /live. + try: + with engine.connect() as conn: + conn.execute( + text( + "ALTER TABLE telemetry_events " + "ADD COLUMN IF NOT EXISTS received_at TIMESTAMPTZ" + ) + ) + except Exception as e: + print(f"Warning: failed to add telemetry_events.received_at column: {e}") # Add retention and compression policies on the hypertable try: with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: diff --git a/main.py b/main.py index a3dd906e..2eaa2433 100644 --- a/main.py +++ b/main.py @@ -824,8 +824,16 @@ async def _refresh_cache_loop() -> None: while True: try: - # Recompute live players (last 30s) + # Recompute live players (last 30s) using the SERVER receive-time, + # not the client's self-reported timestamp. Game machines' clocks + # drift up to ~90s apart, which put characters on the 30s boundary + # and flapped the count. COALESCE falls back to the client timestamp + # for pre-migration rows (and the brief window after deploy before + # new rows carry received_at). The coarse `timestamp` bound is kept + # ONLY so TimescaleDB can prune chunks; it's wide enough (10 min) to + # cover any plausible clock skew. cutoff = datetime.now(timezone.utc) - ACTIVE_WINDOW + chunk_cutoff = datetime.now(timezone.utc) - timedelta(minutes=10) sql_live = """ SELECT sub.*, COALESCE(rs.total_rares, 0) AS total_rares, @@ -834,7 +842,8 @@ async def _refresh_cache_loop() -> None: FROM ( SELECT DISTINCT ON (character_name) * FROM telemetry_events - WHERE timestamp > :cutoff + WHERE timestamp > :chunk_cutoff + AND COALESCE(received_at, timestamp) > :cutoff ORDER BY character_name, timestamp DESC ) sub LEFT JOIN rare_stats rs @@ -848,7 +857,9 @@ async def _refresh_cache_loop() -> None: # Use a single connection for both queries to reduce connection churn async with database.connection() as conn: - rows = await conn.fetch_all(sql_live, {"cutoff": cutoff}) + rows = await conn.fetch_all( + sql_live, {"cutoff": cutoff, "chunk_cutoff": chunk_cutoff} + ) new_players = [dict(r) for r in rows] # Track player changes for debugging @@ -3125,6 +3136,11 @@ async def ws_receive_snapshots( # Prepare data and compute kill delta db_data = snap.dict() db_data["rares_found"] = 0 + # Stamp the SERVER receive time. snap.timestamp is the + # client's wall clock (drifts across machines); the /live + # "online" window uses received_at so clock skew can't flap + # the player count. + db_data["received_at"] = datetime.now(timezone.utc) key = (snap.session_id, snap.character_name) # Get last recorded kill count for this session