Server load optimization: spawn_events retention + log spam fix

Database cleanup:
- Converted spawn_events to a TimescaleDB hypertable with 7-day retention.
  Previously a regular table growing unbounded — had reached 482M rows/66GB
  from June 2025. Manual migration copied last 7 days (12M rows) to a new
  hypertable, swapped names, and dropped the old table.
  Result: DB shrunk from 77GB → 12GB.
- Dropped server_health_checks table entirely. It was write-only (850K rows,
  134MB) — only current state in server_status is actually read. Eliminated
  the insert from monitor_server_health().

Telemetry handler cleanup:
- Removed 4 per-message INFO log lines (TELEMETRY_RECEIVED, DB_WRITE_ATTEMPT,
  DB_WRITE_SUCCESS, PROCESSING_COMPLETE). At 60+ chars × every 2s = hundreds
  of log lines/sec. Replaced with single SLOW_* warnings above 500ms/1000ms
  thresholds.
- Removed redundant pool-size introspection (try/except + hasattr) on every
  telemetry message — useless noise in the hot path.
- Removed debug cache-miss and kill-delta logs.

Log level:
- docker-compose.yml: dereth-tracker LOG_LEVEL DEBUG → INFO (was dumping
  entire inventory_delta JSON payloads for every item update).
- inventory-service LOG_LEVEL DEBUG → INFO.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Erik 2026-04-15 10:08:51 +02:00
parent de2cc3a0e3
commit 926e912c57
3 changed files with 45 additions and 101 deletions

View file

@ -165,25 +165,8 @@ portals = Table(
Column("discovered_by", String, nullable=False),
)
# Server health monitoring tables
server_health_checks = Table(
# Time-series data for server health checks
"server_health_checks",
metadata,
Column("id", Integer, primary_key=True),
Column("server_name", String, nullable=False, index=True),
Column("server_address", String, nullable=False),
Column(
"timestamp",
DateTime(timezone=True),
nullable=False,
default=sqlalchemy.func.now(),
),
Column("status", String(10), nullable=False), # 'up' or 'down'
Column("latency_ms", Float, nullable=True),
Column("player_count", Integer, nullable=True),
)
# Server health monitoring: only current state is kept.
# Historical health checks were removed — nothing read from them.
server_status = Table(
# Current server status and uptime tracking
"server_status",
@ -198,13 +181,6 @@ server_status = Table(
Column("last_player_count", Integer, nullable=True),
)
# Index for efficient server health check queries
Index(
"ix_server_health_checks_name_ts",
server_health_checks.c.server_name,
server_health_checks.c.timestamp.desc(),
)
character_stats = Table(
"character_stats",
metadata,
@ -299,6 +275,32 @@ async def init_db_async():
except Exception as e:
print(f"Warning: failed to set retention/compression policies: {e}")
# Ensure spawn_events is a hypertable with a 7-day retention policy.
# This is idempotent — if already a hypertable, create_hypertable is a no-op
# when if_not_exists=TRUE. The existing 482M-row table needed a manual
# migration (see docs/plans/spawn_events_cleanup.md); this block keeps the
# policy alive on subsequent deploys.
try:
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
# Try to convert spawn_events to a hypertable if it isn't already.
# migrate_data=FALSE is safe because the manual migration handled it;
# if someone creates a fresh DB, the table is empty and this converts it.
conn.execute(
text(
"SELECT create_hypertable('spawn_events', 'timestamp', "
"if_not_exists => TRUE, migrate_data => FALSE, "
"chunk_time_interval => INTERVAL '1 day')"
)
)
# 7-day retention
conn.execute(
text(
"SELECT add_retention_policy('spawn_events', INTERVAL '7 days', if_not_exists => TRUE)"
)
)
except Exception as e:
print(f"Warning: failed to set spawn_events hypertable/retention: {e}")
# Create unique constraint on rounded portal coordinates
try:
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:

View file

@ -27,9 +27,9 @@ services:
DB_WAL_AUTOCHECKPOINT_PAGES: "${DB_WAL_AUTOCHECKPOINT_PAGES}"
SHARED_SECRET: "${SHARED_SECRET}"
SECRET_KEY: "${SECRET_KEY}"
LOG_LEVEL: "DEBUG"
INVENTORY_SERVICE_URL: "http://inventory-service:8000"
DISCORD_ACLOG_WEBHOOK: "${DISCORD_ACLOG_WEBHOOK:-}"
LOG_LEVEL: "INFO"
restart: unless-stopped
logging:
driver: "json-file"
@ -72,7 +72,7 @@ services:
- "./inventory-service:/app"
environment:
DATABASE_URL: "postgresql://inventory_user:${INVENTORY_DB_PASSWORD}@inventory-db:5432/inventory_db"
LOG_LEVEL: "DEBUG"
LOG_LEVEL: "INFO"
restart: unless-stopped
logging:
driver: "json-file"

86
main.py
View file

@ -60,7 +60,6 @@ from db_async import (
character_inventories,
character_stats,
portals,
server_health_checks,
server_status,
combat_stats,
combat_stats_sessions,
@ -448,17 +447,8 @@ async def monitor_server_health():
f"🏥 Health check result: {status}, latency: {latency_ms}, players: {current_player_count}"
)
# Record health check
await database.execute(
server_health_checks.insert().values(
server_name=server_name,
server_address=f"{server_address}:{server_port}",
timestamp=now,
status=status,
latency_ms=latency_ms,
player_count=current_player_count,
)
)
# Note: historical health checks table removed — only current state
# is used (server_status upsert below). No reader ever queried it.
# Get previous status
prev_status = await database.fetch_one(
@ -2901,11 +2891,8 @@ async def ws_receive_snapshots(
payload.pop("type", None)
character_name = payload.get("character_name", "unknown")
# Track message receipt and start timing
# Track message receipt and start timing (debug-level only)
telemetry_start_time = time.time()
logger.info(
f"📨 TELEMETRY_RECEIVED: {character_name} from {websocket.client}"
)
try:
snap = TelemetrySnapshot.parse_obj(payload)
@ -2925,29 +2912,11 @@ async def ws_receive_snapshots(
{"char": snap.character_name, "session": snap.session_id},
)
last = row["kills"] if row else 0
logger.debug(
f"Cache miss for {snap.character_name} session {snap.session_id[:8]}: loaded last_kills={last} from database"
)
delta = snap.kills - last
# Persist snapshot and any kill delta in a single transaction
db_start_time = time.time()
# Log connection pool status before database operation
try:
pool_status = (
f"pool_size:{database._pool._queue.qsize()}"
if hasattr(database, "_pool")
and hasattr(database._pool, "_queue")
else "pool_status:unknown"
)
except:
pool_status = "pool_status:error"
logger.info(
f"💾 TELEMETRY_DB_WRITE_ATTEMPT: {snap.character_name} session:{snap.session_id[:8]} kills:{snap.kills} delta:{delta} {pool_status}"
)
try:
async with database.transaction():
await database.execute(
@ -2969,22 +2938,15 @@ async def ws_receive_snapshots(
)
)
await database.execute(stmt)
logger.debug(
f"Updated kills for {snap.character_name}: +{delta} (total from {last} to {snap.kills})"
)
# Success: log timing and update cache
# Success: update cache and performance counters
db_duration = (time.time() - db_start_time) * 1000
ws_receive_snapshots._last_kills[key] = snap.kills
# Track database performance (Phase 2)
global _total_queries, _total_query_time
_total_queries += 1
_total_query_time += (
db_duration / 1000.0
) # Convert ms to seconds
_total_query_time += db_duration / 1000.0
# Track recent activity (Phase 3)
global _recent_telemetry_messages, _max_recent_messages
activity_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
@ -2997,47 +2959,27 @@ async def ws_receive_snapshots(
if len(_recent_telemetry_messages) > _max_recent_messages:
_recent_telemetry_messages.pop(0)
# Log final pool status after successful operation
try:
final_pool_status = (
f"pool_size:{database._pool._queue.qsize()}"
if hasattr(database, "_pool")
and hasattr(database._pool, "_queue")
else "pool_status:unknown"
)
except:
final_pool_status = "pool_status:error"
logger.info(
f"✅ TELEMETRY_DB_WRITE_SUCCESS: {snap.character_name} took {db_duration:.1f}ms {final_pool_status}"
# Only warn if writes are getting slow
if db_duration > 500:
logger.warning(
f"⚠️ SLOW_TELEMETRY_WRITE: {snap.character_name} took {db_duration:.0f}ms"
)
except Exception as db_error:
db_duration = (time.time() - db_start_time) * 1000
# Log pool status during failure
try:
error_pool_status = (
f"pool_size:{database._pool._queue.qsize()}"
if hasattr(database, "_pool")
and hasattr(database._pool, "_queue")
else "pool_status:unknown"
)
except:
error_pool_status = "pool_status:error"
logger.error(
f"❌ TELEMETRY_DB_WRITE_FAILED: {snap.character_name} session:{snap.session_id[:8]} took {db_duration:.1f}ms {error_pool_status} error:{db_error}",
f"❌ TELEMETRY_DB_WRITE_FAILED: {snap.character_name} session:{snap.session_id[:8]} took {db_duration:.1f}ms error:{db_error}",
exc_info=True,
)
continue
# Broadcast updated snapshot to all browser clients
await _broadcast_to_browser_clients(snap.dict())
# Log successful processing completion with timing
# Only warn on slow total processing
total_duration = (time.time() - telemetry_start_time) * 1000
logger.info(
f"⏱️ TELEMETRY_PROCESSING_COMPLETE: {snap.character_name} took {total_duration:.1f}ms total"
if total_duration > 1000:
logger.warning(
f"⚠️ SLOW_TELEMETRY_PROCESSING: {snap.character_name} took {total_duration:.0f}ms total"
)
except Exception as e: