MosswartOverlord/db_async.py
2025-06-24 19:13:31 +00:00

270 lines
No EOL
11 KiB
Python

"""Asynchronous database layer for telemetry service using PostgreSQL/TimescaleDB.
Defines table schemas via SQLAlchemy Core and provides an
initialization function to set up TimescaleDB hypertable.
"""
import os
import sqlalchemy
from datetime import datetime, timedelta, timezone
from databases import Database
from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime, text
from sqlalchemy import Index, BigInteger, JSON, Boolean, UniqueConstraint
# Environment: Postgres/TimescaleDB connection URL
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:password@localhost:5432/dereth")
# Async database client with explicit connection pool configuration
database = Database(DATABASE_URL, min_size=5, max_size=20)
# Metadata for SQLAlchemy Core
# SQLAlchemy metadata container for table definitions
metadata = MetaData()
# --- Table Definitions ---
# Table for storing raw telemetry snapshots at scale (converted to hypertable)
telemetry_events = Table(
# Time-series hypertable storing raw telemetry snapshots from plugins
"telemetry_events",
metadata,
Column("id", Integer, primary_key=True),
Column("character_name", String, nullable=False, index=True),
Column("char_tag", String, nullable=True),
Column("session_id", String, nullable=False, index=True),
Column("timestamp", DateTime(timezone=True), nullable=False, index=True),
Column("ew", Float, nullable=False),
Column("ns", Float, nullable=False),
Column("z", Float, nullable=False),
Column("kills", Integer, nullable=False),
Column("kills_per_hour", Float, nullable=True),
Column("onlinetime", String, nullable=True),
Column("deaths", Integer, nullable=False),
Column("total_deaths", Integer, nullable=True),
Column("rares_found", Integer, nullable=False),
Column("prismatic_taper_count", Integer, nullable=False),
Column("vt_state", String, nullable=True),
# New telemetry metrics
Column("mem_mb", Float, nullable=True),
Column("cpu_pct", Float, nullable=True),
Column("mem_handles", Integer, nullable=True),
Column("latency_ms", Float, nullable=True),
)
# Composite index to accelerate Grafana queries filtering by character_name then ordering by timestamp
Index(
'ix_telemetry_events_char_ts',
telemetry_events.c.character_name,
telemetry_events.c.timestamp
)
# Table for persistent total kills per character
char_stats = Table(
# Stores cumulative kills per character in a single-row upsert table
"char_stats",
metadata,
Column("character_name", String, primary_key=True),
Column("total_kills", Integer, nullable=False, default=0),
)
# Table for persistent total rare counts per character
rare_stats = Table(
# Stores cumulative rare event counts per character
"rare_stats",
metadata,
Column("character_name", String, primary_key=True),
Column("total_rares", Integer, nullable=False, default=0),
)
rare_stats_sessions = Table(
# Stores per-session rare counts; composite PK (character_name, session_id)
"rare_stats_sessions",
metadata,
Column("character_name", String, primary_key=True),
Column("session_id", String, primary_key=True),
Column("session_rares", Integer, nullable=False, default=0),
)
# Table for recording spawn events (mob creates) for heatmap analysis
spawn_events = Table(
# Records individual mob spawn occurrences for heatmap and analysis
"spawn_events",
metadata,
Column("id", Integer, primary_key=True),
Column("character_name", String, nullable=False),
Column("mob", String, nullable=False),
Column("timestamp", DateTime(timezone=True), nullable=False, index=True),
Column("ew", Float, nullable=False),
Column("ns", Float, nullable=False),
Column("z", Float, nullable=False),
)
# Table for recording individual rare spawn events for analysis
rare_events = Table(
# Records individual rare mob events for detailed analysis and heatmaps
"rare_events",
metadata,
Column("id", Integer, primary_key=True),
Column("character_name", String, nullable=False),
Column("name", String, nullable=False),
Column("timestamp", DateTime(timezone=True), nullable=False, index=True),
Column("ew", Float, nullable=False),
Column("ns", Float, nullable=False),
Column("z", Float, nullable=False),
)
character_inventories = Table(
# Stores complete character inventory snapshots with searchable fields
"character_inventories",
metadata,
Column("id", Integer, primary_key=True),
Column("character_name", String, nullable=False, index=True),
Column("item_id", BigInteger, nullable=False),
Column("timestamp", DateTime(timezone=True), nullable=False),
# Extracted searchable fields
Column("name", String),
Column("icon", Integer),
Column("object_class", Integer, index=True),
Column("value", Integer, index=True),
Column("burden", Integer),
Column("has_id_data", Boolean),
# Complete item data as JSONB
Column("item_data", JSON, nullable=False),
# Unique constraint to prevent duplicate items per character
UniqueConstraint("character_name", "item_id", name="uq_char_item"),
)
# Portals table with coordinate-based uniqueness and 1-hour retention
portals = Table(
# Stores unique portals by coordinates with 1-hour retention
"portals",
metadata,
Column("id", Integer, primary_key=True),
Column("portal_name", String, nullable=False),
Column("ns", Float, nullable=False),
Column("ew", Float, nullable=False),
Column("z", Float, nullable=False),
Column("discovered_at", DateTime(timezone=True), nullable=False, index=True),
Column("discovered_by", String, nullable=False),
)
# Server health monitoring tables
server_health_checks = Table(
# Time-series data for server health checks
"server_health_checks",
metadata,
Column("id", Integer, primary_key=True),
Column("server_name", String, nullable=False, index=True),
Column("server_address", String, nullable=False),
Column("timestamp", DateTime(timezone=True), nullable=False, default=sqlalchemy.func.now()),
Column("status", String(10), nullable=False), # 'up' or 'down'
Column("latency_ms", Float, nullable=True),
Column("player_count", Integer, nullable=True),
)
server_status = Table(
# Current server status and uptime tracking
"server_status",
metadata,
Column("server_name", String, primary_key=True),
Column("current_status", String(10), nullable=False),
Column("last_seen_up", DateTime(timezone=True), nullable=True),
Column("last_restart", DateTime(timezone=True), nullable=True),
Column("total_uptime_seconds", BigInteger, default=0),
Column("last_check", DateTime(timezone=True), nullable=True),
Column("last_latency_ms", Float, nullable=True),
Column("last_player_count", Integer, nullable=True),
)
# Index for efficient server health check queries
Index(
'ix_server_health_checks_name_ts',
server_health_checks.c.server_name,
server_health_checks.c.timestamp.desc()
)
async def init_db_async():
"""Initialize PostgreSQL/TimescaleDB schema and hypertable.
Creates all defined tables and ensures the TimescaleDB extension is
installed. Converts telemetry_events table into a hypertable for efficient
time-series data storage.
"""
# Create tables in Postgres
engine = sqlalchemy.create_engine(DATABASE_URL)
# Reflects metadata definitions into actual database tables via SQLAlchemy
metadata.create_all(engine)
# Ensure TimescaleDB extension is installed and telemetry_events is a hypertable
# Run DDL in autocommit mode so errors don't abort subsequent statements
try:
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
# Install extension if missing
try:
conn.execute(text("CREATE EXTENSION IF NOT EXISTS timescaledb"))
except Exception as e:
print(f"Warning: failed to create extension timescaledb: {e}")
# Convert to hypertable, migrating existing data and skipping default index creation
try:
conn.execute(text(
"SELECT create_hypertable('telemetry_events', 'timestamp', "
"if_not_exists => true, migrate_data => true, create_default_indexes => false)"
))
except Exception as e:
print(f"Warning: failed to create hypertable telemetry_events: {e}")
except Exception as e:
print(f"Warning: timescale extension/hypertable setup failed: {e}")
# Ensure composite index exists for efficient time-series queries by character
try:
with engine.connect() as conn:
conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_telemetry_events_char_ts "
"ON telemetry_events (character_name, timestamp)"
))
except Exception as e:
print(f"Warning: failed to create composite index ix_telemetry_events_char_ts: {e}")
# Add retention and compression policies on the hypertable
try:
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
# Retain only recent data (default 7 days or override via DB_RETENTION_DAYS)
days = int(os.getenv('DB_RETENTION_DAYS', '7'))
conn.execute(text(
f"SELECT add_retention_policy('telemetry_events', INTERVAL '{days} days')"
))
# Compress chunks older than 1 day
conn.execute(text(
"SELECT add_compression_policy('telemetry_events', INTERVAL '1 day')"
))
except Exception as e:
print(f"Warning: failed to set retention/compression policies: {e}")
# Create unique constraint on rounded portal coordinates
try:
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
# Drop old portal_discoveries table if it exists
conn.execute(text("DROP TABLE IF EXISTS portal_discoveries CASCADE"))
# Create unique constraint on rounded coordinates for the new portals table
conn.execute(text(
"""CREATE UNIQUE INDEX IF NOT EXISTS unique_portal_coords
ON portals (ROUND(ns::numeric, 2), ROUND(ew::numeric, 2))"""
))
# Create index on coordinates for efficient lookups
conn.execute(text(
"CREATE INDEX IF NOT EXISTS idx_portals_coords ON portals (ns, ew)"
))
print("Portal table indexes and constraints created successfully")
except Exception as e:
print(f"Warning: failed to create portal table constraints: {e}")
async def cleanup_old_portals():
"""Clean up portals older than 1 hour."""
try:
cutoff_time = datetime.now(timezone.utc) - timedelta(hours=1)
# Delete old portals
result = await database.execute(
"DELETE FROM portals WHERE discovered_at < :cutoff_time",
{"cutoff_time": cutoff_time}
)
print(f"Cleaned up {result} portals older than 1 hour")
return result
except Exception as e:
print(f"Warning: failed to cleanup old portals: {e}")
return 0