Database cleanup: - Converted spawn_events to a TimescaleDB hypertable with 7-day retention. Previously a regular table growing unbounded — had reached 482M rows/66GB from June 2025. Manual migration copied last 7 days (12M rows) to a new hypertable, swapped names, and dropped the old table. Result: DB shrunk from 77GB → 12GB. - Dropped server_health_checks table entirely. It was write-only (850K rows, 134MB) — only current state in server_status is actually read. Eliminated the insert from monitor_server_health(). Telemetry handler cleanup: - Removed 4 per-message INFO log lines (TELEMETRY_RECEIVED, DB_WRITE_ATTEMPT, DB_WRITE_SUCCESS, PROCESSING_COMPLETE). At 60+ chars × every 2s = hundreds of log lines/sec. Replaced with single SLOW_* warnings above 500ms/1000ms thresholds. - Removed redundant pool-size introspection (try/except + hasattr) on every telemetry message — useless noise in the hot path. - Removed debug cache-miss and kill-delta logs. Log level: - docker-compose.yml: dereth-tracker LOG_LEVEL DEBUG → INFO (was dumping entire inventory_delta JSON payloads for every item update). - inventory-service LOG_LEVEL DEBUG → INFO. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
401 lines
16 KiB
Python
401 lines
16 KiB
Python
"""Asynchronous database layer for telemetry service using PostgreSQL/TimescaleDB.
|
|
|
|
Defines table schemas via SQLAlchemy Core and provides an
|
|
initialization function to set up TimescaleDB hypertable.
|
|
"""
|
|
|
|
import os
|
|
import sqlalchemy
|
|
from datetime import datetime, timedelta, timezone
|
|
from databases import Database
|
|
from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime, text
|
|
from sqlalchemy import Index, BigInteger, JSON, Boolean, UniqueConstraint
|
|
from sqlalchemy.sql import func
|
|
import bcrypt as _bcrypt
|
|
|
|
# Environment: Postgres/TimescaleDB connection URL
|
|
DATABASE_URL = os.getenv(
|
|
"DATABASE_URL", "postgresql://postgres:password@localhost:5432/dereth"
|
|
)
|
|
# Async database client with explicit connection pool configuration and query timeout
|
|
database = Database(DATABASE_URL, min_size=5, max_size=100, command_timeout=120)
|
|
# Metadata for SQLAlchemy Core
|
|
# SQLAlchemy metadata container for table definitions
|
|
metadata = MetaData()
|
|
|
|
# --- Table Definitions ---
|
|
# Table for storing raw telemetry snapshots at scale (converted to hypertable)
|
|
telemetry_events = Table(
|
|
# Time-series hypertable storing raw telemetry snapshots from plugins
|
|
"telemetry_events",
|
|
metadata,
|
|
Column("character_name", String, nullable=False, index=True),
|
|
Column("char_tag", String, nullable=True),
|
|
Column("session_id", String, nullable=False, index=True),
|
|
Column("timestamp", DateTime(timezone=True), nullable=False, index=True),
|
|
Column("ew", Float, nullable=False),
|
|
Column("ns", Float, nullable=False),
|
|
Column("z", Float, nullable=False),
|
|
Column("kills", Integer, nullable=False),
|
|
Column("kills_per_hour", Float, nullable=True),
|
|
Column("onlinetime", String, nullable=True),
|
|
Column("deaths", Integer, nullable=False),
|
|
Column("total_deaths", Integer, nullable=True),
|
|
Column("rares_found", Integer, nullable=False),
|
|
Column("prismatic_taper_count", Integer, nullable=False),
|
|
Column("vt_state", String, nullable=True),
|
|
# New telemetry metrics
|
|
Column("mem_mb", Float, nullable=True),
|
|
Column("cpu_pct", Float, nullable=True),
|
|
Column("mem_handles", Integer, nullable=True),
|
|
Column("latency_ms", Float, nullable=True),
|
|
)
|
|
# Composite index to accelerate Grafana queries filtering by character_name then ordering by timestamp
|
|
Index(
|
|
"ix_telemetry_events_char_ts",
|
|
telemetry_events.c.character_name,
|
|
telemetry_events.c.timestamp,
|
|
)
|
|
|
|
# Table for persistent total kills per character
|
|
char_stats = Table(
|
|
# Stores cumulative kills per character in a single-row upsert table
|
|
"char_stats",
|
|
metadata,
|
|
Column("character_name", String, primary_key=True),
|
|
Column("total_kills", Integer, nullable=False, default=0),
|
|
)
|
|
|
|
# Table for persistent total rare counts per character
|
|
rare_stats = Table(
|
|
# Stores cumulative rare event counts per character
|
|
"rare_stats",
|
|
metadata,
|
|
Column("character_name", String, primary_key=True),
|
|
Column("total_rares", Integer, nullable=False, default=0),
|
|
)
|
|
|
|
rare_stats_sessions = Table(
|
|
# Stores per-session rare counts; composite PK (character_name, session_id)
|
|
"rare_stats_sessions",
|
|
metadata,
|
|
Column("character_name", String, primary_key=True),
|
|
Column("session_id", String, primary_key=True),
|
|
Column("session_rares", Integer, nullable=False, default=0),
|
|
)
|
|
# Per-character persistent combat stats (lifetime accumulation, Mag-Tools style)
|
|
combat_stats = Table(
|
|
"combat_stats",
|
|
metadata,
|
|
Column("character_name", String, primary_key=True),
|
|
Column("timestamp", DateTime(timezone=True), nullable=False),
|
|
Column("stats_data", JSON, nullable=False),
|
|
)
|
|
|
|
# Per-session combat stats snapshots (session history)
|
|
combat_stats_sessions = Table(
|
|
"combat_stats_sessions",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("character_name", String, nullable=False, index=True),
|
|
Column("session_id", String, nullable=False, index=True),
|
|
Column("timestamp", DateTime(timezone=True), nullable=False, index=True),
|
|
Column("stats_data", JSON, nullable=False),
|
|
)
|
|
|
|
# Table for recording spawn events (mob creates) for heatmap analysis
|
|
spawn_events = Table(
|
|
# Records individual mob spawn occurrences for heatmap and analysis
|
|
"spawn_events",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("character_name", String, nullable=False),
|
|
Column("mob", String, nullable=False),
|
|
Column("timestamp", DateTime(timezone=True), nullable=False, index=True),
|
|
Column("ew", Float, nullable=False),
|
|
Column("ns", Float, nullable=False),
|
|
Column("z", Float, nullable=False),
|
|
)
|
|
# Table for recording individual rare spawn events for analysis
|
|
rare_events = Table(
|
|
# Records individual rare mob events for detailed analysis and heatmaps
|
|
"rare_events",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("character_name", String, nullable=False),
|
|
Column("name", String, nullable=False),
|
|
Column("timestamp", DateTime(timezone=True), nullable=False, index=True),
|
|
Column("ew", Float, nullable=False),
|
|
Column("ns", Float, nullable=False),
|
|
Column("z", Float, nullable=False),
|
|
)
|
|
|
|
character_inventories = Table(
|
|
# Stores complete character inventory snapshots with searchable fields
|
|
"character_inventories",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("character_name", String, nullable=False, index=True),
|
|
Column("item_id", BigInteger, nullable=False),
|
|
Column("timestamp", DateTime(timezone=True), nullable=False),
|
|
# Extracted searchable fields
|
|
Column("name", String),
|
|
Column("icon", Integer),
|
|
Column("object_class", Integer, index=True),
|
|
Column("value", Integer, index=True),
|
|
Column("burden", Integer),
|
|
Column("has_id_data", Boolean),
|
|
# Complete item data as JSONB
|
|
Column("item_data", JSON, nullable=False),
|
|
# Unique constraint to prevent duplicate items per character
|
|
UniqueConstraint("character_name", "item_id", name="uq_char_item"),
|
|
)
|
|
|
|
# Portals table with coordinate-based uniqueness and 1-hour retention
|
|
portals = Table(
|
|
# Stores unique portals by coordinates with 1-hour retention
|
|
"portals",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("portal_name", String, nullable=False),
|
|
Column("ns", Float, nullable=False),
|
|
Column("ew", Float, nullable=False),
|
|
Column("z", Float, nullable=False),
|
|
Column("discovered_at", DateTime(timezone=True), nullable=False, index=True),
|
|
Column("discovered_by", String, nullable=False),
|
|
)
|
|
|
|
# Server health monitoring: only current state is kept.
|
|
# Historical health checks were removed — nothing read from them.
|
|
server_status = Table(
|
|
# Current server status and uptime tracking
|
|
"server_status",
|
|
metadata,
|
|
Column("server_name", String, primary_key=True),
|
|
Column("current_status", String(10), nullable=False),
|
|
Column("last_seen_up", DateTime(timezone=True), nullable=True),
|
|
Column("last_restart", DateTime(timezone=True), nullable=True),
|
|
Column("total_uptime_seconds", BigInteger, default=0),
|
|
Column("last_check", DateTime(timezone=True), nullable=True),
|
|
Column("last_latency_ms", Float, nullable=True),
|
|
Column("last_player_count", Integer, nullable=True),
|
|
)
|
|
|
|
character_stats = Table(
|
|
"character_stats",
|
|
metadata,
|
|
Column("character_name", String, primary_key=True, nullable=False),
|
|
Column(
|
|
"timestamp", DateTime(timezone=True), nullable=False, server_default=func.now()
|
|
),
|
|
Column("level", Integer, nullable=True),
|
|
Column("total_xp", BigInteger, nullable=True),
|
|
Column("unassigned_xp", BigInteger, nullable=True),
|
|
Column("luminance_earned", BigInteger, nullable=True),
|
|
Column("luminance_total", BigInteger, nullable=True),
|
|
Column("deaths", Integer, nullable=True),
|
|
Column("stats_data", JSON, nullable=False),
|
|
)
|
|
|
|
# User accounts for app-level authentication
|
|
users = Table(
|
|
"users",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("username", String, nullable=False, unique=True),
|
|
Column("password_hash", String, nullable=False),
|
|
Column("is_admin", Boolean, nullable=False, default=False),
|
|
Column(
|
|
"created_at", DateTime(timezone=True), nullable=False, server_default=func.now()
|
|
),
|
|
)
|
|
|
|
|
|
async def init_db_async():
|
|
"""Initialize PostgreSQL/TimescaleDB schema and hypertable.
|
|
|
|
Creates all defined tables and ensures the TimescaleDB extension is
|
|
installed. Converts telemetry_events table into a hypertable for efficient
|
|
time-series data storage.
|
|
"""
|
|
# Create tables in Postgres
|
|
engine = sqlalchemy.create_engine(DATABASE_URL)
|
|
# Reflects metadata definitions into actual database tables via SQLAlchemy
|
|
metadata.create_all(engine)
|
|
# Ensure TimescaleDB extension is installed and telemetry_events is a hypertable
|
|
# Run DDL in autocommit mode so errors don't abort subsequent statements
|
|
try:
|
|
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
|
|
# Install extension if missing
|
|
try:
|
|
conn.execute(text("CREATE EXTENSION IF NOT EXISTS timescaledb"))
|
|
except Exception as e:
|
|
print(f"Warning: failed to create extension timescaledb: {e}")
|
|
# Convert to hypertable, migrating existing data and skipping default index creation
|
|
try:
|
|
conn.execute(
|
|
text(
|
|
"SELECT create_hypertable('telemetry_events', 'timestamp', "
|
|
"if_not_exists => true, migrate_data => true, create_default_indexes => false)"
|
|
)
|
|
)
|
|
except Exception as e:
|
|
print(f"Warning: failed to create hypertable telemetry_events: {e}")
|
|
except Exception as e:
|
|
print(f"Warning: timescale extension/hypertable setup failed: {e}")
|
|
# Ensure composite index exists for efficient time-series queries by character
|
|
try:
|
|
with engine.connect() as conn:
|
|
conn.execute(
|
|
text(
|
|
"CREATE INDEX IF NOT EXISTS ix_telemetry_events_char_ts "
|
|
"ON telemetry_events (character_name, timestamp)"
|
|
)
|
|
)
|
|
except Exception as e:
|
|
print(
|
|
f"Warning: failed to create composite index ix_telemetry_events_char_ts: {e}"
|
|
)
|
|
# Add retention and compression policies on the hypertable
|
|
try:
|
|
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
|
|
# Retain only recent data (default 7 days or override via DB_RETENTION_DAYS)
|
|
days = int(os.getenv("DB_RETENTION_DAYS", "7"))
|
|
conn.execute(
|
|
text(
|
|
f"SELECT add_retention_policy('telemetry_events', INTERVAL '{days} days')"
|
|
)
|
|
)
|
|
# Compress chunks older than 1 day
|
|
conn.execute(
|
|
text(
|
|
"SELECT add_compression_policy('telemetry_events', INTERVAL '1 day')"
|
|
)
|
|
)
|
|
except Exception as e:
|
|
print(f"Warning: failed to set retention/compression policies: {e}")
|
|
|
|
# Ensure spawn_events is a hypertable with a 7-day retention policy.
|
|
# This is idempotent — if already a hypertable, create_hypertable is a no-op
|
|
# when if_not_exists=TRUE. The existing 482M-row table needed a manual
|
|
# migration (see docs/plans/spawn_events_cleanup.md); this block keeps the
|
|
# policy alive on subsequent deploys.
|
|
try:
|
|
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
|
|
# Try to convert spawn_events to a hypertable if it isn't already.
|
|
# migrate_data=FALSE is safe because the manual migration handled it;
|
|
# if someone creates a fresh DB, the table is empty and this converts it.
|
|
conn.execute(
|
|
text(
|
|
"SELECT create_hypertable('spawn_events', 'timestamp', "
|
|
"if_not_exists => TRUE, migrate_data => FALSE, "
|
|
"chunk_time_interval => INTERVAL '1 day')"
|
|
)
|
|
)
|
|
# 7-day retention
|
|
conn.execute(
|
|
text(
|
|
"SELECT add_retention_policy('spawn_events', INTERVAL '7 days', if_not_exists => TRUE)"
|
|
)
|
|
)
|
|
except Exception as e:
|
|
print(f"Warning: failed to set spawn_events hypertable/retention: {e}")
|
|
|
|
# Create unique constraint on rounded portal coordinates
|
|
try:
|
|
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
|
|
# Drop old portal_discoveries table if it exists
|
|
conn.execute(text("DROP TABLE IF EXISTS portal_discoveries CASCADE"))
|
|
|
|
# Create unique constraint on rounded coordinates for the new portals table
|
|
conn.execute(
|
|
text(
|
|
"""CREATE UNIQUE INDEX IF NOT EXISTS unique_portal_coords
|
|
ON portals (ROUND(ns::numeric, 2), ROUND(ew::numeric, 2))"""
|
|
)
|
|
)
|
|
|
|
# Create index on coordinates for efficient lookups
|
|
conn.execute(
|
|
text(
|
|
"CREATE INDEX IF NOT EXISTS idx_portals_coords ON portals (ns, ew)"
|
|
)
|
|
)
|
|
|
|
print("Portal table indexes and constraints created successfully")
|
|
except Exception as e:
|
|
print(f"Warning: failed to create portal table constraints: {e}")
|
|
|
|
# Ensure character_stats table exists with JSONB column type
|
|
try:
|
|
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
|
|
conn.execute(
|
|
text("""
|
|
CREATE TABLE IF NOT EXISTS character_stats (
|
|
character_name VARCHAR(255) PRIMARY KEY,
|
|
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
level INTEGER,
|
|
total_xp BIGINT,
|
|
unassigned_xp BIGINT,
|
|
luminance_earned BIGINT,
|
|
luminance_total BIGINT,
|
|
deaths INTEGER,
|
|
stats_data JSONB NOT NULL
|
|
)
|
|
""")
|
|
)
|
|
print("character_stats table created/verified successfully")
|
|
except Exception as e:
|
|
print(f"Warning: failed to create character_stats table: {e}")
|
|
|
|
|
|
async def cleanup_old_portals():
|
|
"""Clean up portals older than 1 hour."""
|
|
try:
|
|
cutoff_time = datetime.now(timezone.utc) - timedelta(hours=1)
|
|
|
|
# Delete old portals
|
|
result = await database.execute(
|
|
"DELETE FROM portals WHERE discovered_at < :cutoff_time",
|
|
{"cutoff_time": cutoff_time},
|
|
)
|
|
|
|
print(f"Cleaned up {result} portals older than 1 hour")
|
|
return result
|
|
|
|
except Exception as e:
|
|
print(f"Warning: failed to cleanup old portals: {e}")
|
|
return 0
|
|
|
|
|
|
async def seed_users():
|
|
"""Seed default users if the users table is empty."""
|
|
try:
|
|
count = await database.fetch_val("SELECT COUNT(*) FROM users")
|
|
if count > 0:
|
|
print(f"Users table already has {count} users, skipping seed")
|
|
return
|
|
|
|
default_users = [
|
|
{"username": "erik", "password": "erik123", "is_admin": True},
|
|
{"username": "alex", "password": "AlexGillar100Killar", "is_admin": False},
|
|
{
|
|
"username": "lundberg",
|
|
"password": "JohanGillar100Kvinnor",
|
|
"is_admin": False,
|
|
},
|
|
]
|
|
for u in default_users:
|
|
pw_hash = _bcrypt.hashpw(u["password"].encode(), _bcrypt.gensalt()).decode()
|
|
await database.execute(
|
|
"INSERT INTO users (username, password_hash, is_admin) VALUES (:username, :password_hash, :is_admin)",
|
|
{
|
|
"username": u["username"],
|
|
"password_hash": pw_hash,
|
|
"is_admin": u["is_admin"],
|
|
},
|
|
)
|
|
role = "admin" if u["is_admin"] else "user"
|
|
print(f"Seeded {role} user: {u['username']}")
|
|
except Exception as e:
|
|
print(f"Warning: failed to seed users: {e}")
|