"""Asynchronous database layer for telemetry service using PostgreSQL/TimescaleDB. Defines table schemas via SQLAlchemy Core and provides an initialization function to set up TimescaleDB hypertable. """ import os import sqlalchemy from databases import Database from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime, text from sqlalchemy import Index # Environment: Postgres/TimescaleDB connection URL DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:password@localhost:5432/dereth") # Async database client database = Database(DATABASE_URL) # Metadata for SQLAlchemy Core # SQLAlchemy metadata container for table definitions metadata = MetaData() # --- Table Definitions --- # Table for storing raw telemetry snapshots at scale (converted to hypertable) telemetry_events = Table( # Time-series hypertable storing raw telemetry snapshots from plugins "telemetry_events", metadata, Column("id", Integer, primary_key=True), Column("character_name", String, nullable=False, index=True), Column("char_tag", String, nullable=True), Column("session_id", String, nullable=False, index=True), Column("timestamp", DateTime(timezone=True), nullable=False, index=True), Column("ew", Float, nullable=False), Column("ns", Float, nullable=False), Column("z", Float, nullable=False), Column("kills", Integer, nullable=False), Column("kills_per_hour", Float, nullable=True), Column("onlinetime", String, nullable=True), Column("deaths", Integer, nullable=False), Column("rares_found", Integer, nullable=False), Column("prismatic_taper_count", Integer, nullable=False), Column("vt_state", String, nullable=True), # New telemetry metrics Column("mem_mb", Float, nullable=True), Column("cpu_pct", Float, nullable=True), Column("mem_handles", Integer, nullable=True), Column("latency_ms", Float, nullable=True), ) # Composite index to accelerate Grafana queries filtering by character_name then ordering by timestamp Index( 'ix_telemetry_events_char_ts', telemetry_events.c.character_name, telemetry_events.c.timestamp ) # Table for persistent total kills per character char_stats = Table( # Stores cumulative kills per character in a single-row upsert table "char_stats", metadata, Column("character_name", String, primary_key=True), Column("total_kills", Integer, nullable=False, default=0), ) # Table for persistent total rare counts per character rare_stats = Table( # Stores cumulative rare event counts per character "rare_stats", metadata, Column("character_name", String, primary_key=True), Column("total_rares", Integer, nullable=False, default=0), ) rare_stats_sessions = Table( # Stores per-session rare counts; composite PK (character_name, session_id) "rare_stats_sessions", metadata, Column("character_name", String, primary_key=True), Column("session_id", String, primary_key=True), Column("session_rares", Integer, nullable=False, default=0), ) # Table for recording spawn events (mob creates) for heatmap analysis spawn_events = Table( # Records individual mob spawn occurrences for heatmap and analysis "spawn_events", metadata, Column("id", Integer, primary_key=True), Column("character_name", String, nullable=False), Column("mob", String, nullable=False), Column("timestamp", DateTime(timezone=True), nullable=False, index=True), Column("ew", Float, nullable=False), Column("ns", Float, nullable=False), Column("z", Float, nullable=False), ) # Table for recording individual rare spawn events for analysis rare_events = Table( # Records individual rare mob events for detailed analysis and heatmaps "rare_events", metadata, Column("id", Integer, primary_key=True), Column("character_name", String, nullable=False), Column("name", String, nullable=False), Column("timestamp", DateTime(timezone=True), nullable=False, index=True), Column("ew", Float, nullable=False), Column("ns", Float, nullable=False), Column("z", Float, nullable=False), ) async def init_db_async(): """Initialize PostgreSQL/TimescaleDB schema and hypertable. Creates all defined tables and ensures the TimescaleDB extension is installed. Converts telemetry_events table into a hypertable for efficient time-series data storage. """ # Create tables in Postgres engine = sqlalchemy.create_engine(DATABASE_URL) # Reflects metadata definitions into actual database tables via SQLAlchemy metadata.create_all(engine) # Ensure TimescaleDB extension is installed and telemetry_events is a hypertable # Run DDL in autocommit mode so errors don't abort subsequent statements try: with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: # Install extension if missing try: conn.execute(text("CREATE EXTENSION IF NOT EXISTS timescaledb")) except Exception as e: print(f"Warning: failed to create extension timescaledb: {e}") # Convert to hypertable, migrating existing data and skipping default index creation try: conn.execute(text( "SELECT create_hypertable('telemetry_events', 'timestamp', " "if_not_exists => true, migrate_data => true, create_default_indexes => false)" )) except Exception as e: print(f"Warning: failed to create hypertable telemetry_events: {e}") except Exception as e: print(f"Warning: timescale extension/hypertable setup failed: {e}") # Ensure composite index exists for efficient time-series queries by character try: with engine.connect() as conn: conn.execute(text( "CREATE INDEX IF NOT EXISTS ix_telemetry_events_char_ts " "ON telemetry_events (character_name, timestamp)" )) except Exception as e: print(f"Warning: failed to create composite index ix_telemetry_events_char_ts: {e}") # Disable parallel workers at the system level to avoid OOMs from large parallel scans try: # Apply settings outside transaction for ALTER SYSTEM conn2 = engine.connect().execution_options(isolation_level="AUTOCOMMIT") conn2.execute(text("ALTER SYSTEM SET max_parallel_workers_per_gather = 0")) conn2.execute(text("ALTER SYSTEM SET max_parallel_workers = 0")) conn2.execute(text("SELECT pg_reload_conf()")) conn2.close() except Exception as e: print(f"Warning: failed to disable parallel workers: {e}")