MosswartOverlord/db_async.py
2025-05-25 19:33:48 +00:00

133 lines
No EOL
5.5 KiB
Python

"""Asynchronous database layer for telemetry service using PostgreSQL/TimescaleDB.
Defines table schemas via SQLAlchemy Core and provides an
initialization function to set up TimescaleDB hypertable.
"""
import os
import sqlalchemy
from databases import Database
from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime, text
# Environment: Postgres/TimescaleDB connection URL
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:password@localhost:5432/dereth")
# Async database client
database = Database(DATABASE_URL)
# Metadata for SQLAlchemy Core
# SQLAlchemy metadata container for table definitions
metadata = MetaData()
# --- Table Definitions ---
# Table for storing raw telemetry snapshots at scale (converted to hypertable)
telemetry_events = Table(
# Time-series hypertable storing raw telemetry snapshots from plugins
"telemetry_events",
metadata,
Column("id", Integer, primary_key=True),
Column("character_name", String, nullable=False, index=True),
Column("char_tag", String, nullable=True),
Column("session_id", String, nullable=False, index=True),
Column("timestamp", DateTime(timezone=True), nullable=False, index=True),
Column("ew", Float, nullable=False),
Column("ns", Float, nullable=False),
Column("z", Float, nullable=False),
Column("kills", Integer, nullable=False),
Column("kills_per_hour", Float, nullable=True),
Column("onlinetime", String, nullable=True),
Column("deaths", Integer, nullable=False),
Column("rares_found", Integer, nullable=False),
Column("prismatic_taper_count", Integer, nullable=False),
Column("vt_state", String, nullable=True),
# New telemetry metrics
Column("mem_mb", Float, nullable=True),
Column("cpu_pct", Float, nullable=True),
Column("mem_handles", Integer, nullable=True),
Column("latency_ms", Float, nullable=True),
)
# Table for persistent total kills per character
char_stats = Table(
# Stores cumulative kills per character in a single-row upsert table
"char_stats",
metadata,
Column("character_name", String, primary_key=True),
Column("total_kills", Integer, nullable=False, default=0),
)
# Table for persistent total rare counts per character
rare_stats = Table(
# Stores cumulative rare event counts per character
"rare_stats",
metadata,
Column("character_name", String, primary_key=True),
Column("total_rares", Integer, nullable=False, default=0),
)
rare_stats_sessions = Table(
# Stores per-session rare counts; composite PK (character_name, session_id)
"rare_stats_sessions",
metadata,
Column("character_name", String, primary_key=True),
Column("session_id", String, primary_key=True),
Column("session_rares", Integer, nullable=False, default=0),
)
# Table for recording spawn events (mob creates) for heatmap analysis
spawn_events = Table(
# Records individual mob spawn occurrences for heatmap and analysis
"spawn_events",
metadata,
Column("id", Integer, primary_key=True),
Column("character_name", String, nullable=False),
Column("mob", String, nullable=False),
Column("timestamp", DateTime(timezone=True), nullable=False, index=True),
Column("ew", Float, nullable=False),
Column("ns", Float, nullable=False),
Column("z", Float, nullable=False),
)
# Table for recording individual rare spawn events for analysis
rare_events = Table(
# Records individual rare mob events for detailed analysis and heatmaps
"rare_events",
metadata,
Column("id", Integer, primary_key=True),
Column("character_name", String, nullable=False),
Column("name", String, nullable=False),
Column("timestamp", DateTime(timezone=True), nullable=False, index=True),
Column("ew", Float, nullable=False),
Column("ns", Float, nullable=False),
Column("z", Float, nullable=False),
)
async def init_db_async():
"""Initialize PostgreSQL/TimescaleDB schema and hypertable.
Creates all defined tables and ensures the TimescaleDB extension is
installed. Converts telemetry_events table into a hypertable for efficient
time-series data storage.
"""
# Create tables in Postgres
engine = sqlalchemy.create_engine(DATABASE_URL)
# Reflects metadata definitions into actual database tables via SQLAlchemy
metadata.create_all(engine)
# Enable TimescaleDB extension and convert telemetry_events to hypertable
# Use a transactional context to ensure DDL statements are committed
with engine.begin() as conn:
# Enable or update TimescaleDB extension
# Install or confirm TimescaleDB extension to support hypertables
try:
conn.execute(text("CREATE EXTENSION IF NOT EXISTS timescaledb"))
except Exception as e:
print(f"Warning: failed to create extension timescaledb: {e}")
# Update TimescaleDB extension if an older version exists
try:
conn.execute(text("ALTER EXTENSION timescaledb UPDATE"))
except Exception as e:
print(f"Warning: failed to update timescaledb extension: {e}")
# Create hypertable for telemetry_events, skip default indexes to avoid collisions
# Transform telemetry_events into a hypertable partitioned by timestamp
try:
conn.execute(text(
"SELECT create_hypertable('telemetry_events', 'timestamp', \
if_not_exists => true, create_default_indexes => false)"
))
except Exception as e:
print(f"Warning: failed to create hypertable telemetry_events: {e}")