"""SQLite3 helper module for local telemetry storage. Provides functions to initialize the local database schema and save telemetry snapshots into history and live_state tables. Enforces WAL mode, size limits, and auto-vacuum for efficient storage. """ import os import sqlite3 from typing import Dict from datetime import datetime, timedelta # Local SQLite database file name (used when running without TimescaleDB) DB_FILE = "dereth.db" # Maximum allowed database size (in MB). Defaults to 2048 (2GB). Override via env DB_MAX_SIZE_MB. MAX_DB_SIZE_MB = int(os.getenv("DB_MAX_SIZE_MB", "2048")) # Retention window for telemetry history in days (currently not auto-enforced). # Override via env DB_RETENTION_DAYS for future cleanup scripts. MAX_RETENTION_DAYS = int(os.getenv("DB_RETENTION_DAYS", "7")) # SQLite runtime limits customization DB_MAX_SQL_LENGTH = int(os.getenv("DB_MAX_SQL_LENGTH", "1000000000")) DB_MAX_SQL_VARIABLES = int(os.getenv("DB_MAX_SQL_VARIABLES", "32766")) # Number of WAL frames to write before forcing a checkpoint (override via env DB_WAL_AUTOCHECKPOINT_PAGES) DB_WAL_AUTOCHECKPOINT_PAGES = int(os.getenv("DB_WAL_AUTOCHECKPOINT_PAGES", "1000")) def init_db() -> None: """ Initialize local SQLite database schema for telemetry logging. - Applies SQLite PRAGMA settings for performance and file size management - Ensures WAL journaling and auto-vacuum for concurrency and compaction - Creates telemetry_log for full history and live_state for latest snapshot per character """ # Open connection with a longer timeout # Open connection with extended timeout for schema operations conn = sqlite3.connect(DB_FILE, timeout=30) # Bump SQLite runtime limits conn.setlimit(sqlite3.SQLITE_LIMIT_LENGTH, DB_MAX_SQL_LENGTH) conn.setlimit(sqlite3.SQLITE_LIMIT_SQL_LENGTH, DB_MAX_SQL_LENGTH) conn.setlimit(sqlite3.SQLITE_LIMIT_VARIABLE_NUMBER, DB_MAX_SQL_VARIABLES) c = conn.cursor() # Enable auto_vacuum FULL and rebuild DB so that deletions shrink the file # Enable full auto-vacuum to shrink database file on deletes c.execute("PRAGMA auto_vacuum=FULL;") conn.commit() # Rebuild database to apply auto_vacuum changes c.execute("VACUUM;") conn.commit() # Switch to WAL mode for concurrency, adjust checkpointing, and enforce max size # Configure write-ahead logging for concurrency and performance c.execute("PRAGMA journal_mode=WAL") c.execute("PRAGMA synchronous=NORMAL") # Auto-checkpoint after specified WAL frames to limit WAL file size c.execute(f"PRAGMA wal_autocheckpoint={DB_WAL_AUTOCHECKPOINT_PAGES}") # Create history log table for all telemetry snapshots c.execute( """ CREATE TABLE IF NOT EXISTS telemetry_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, character_name TEXT NOT NULL, char_tag TEXT, session_id TEXT NOT NULL, timestamp TEXT NOT NULL, ew REAL, ns REAL, z REAL, kills INTEGER, kills_per_hour TEXT, onlinetime TEXT, deaths INTEGER, rares_found INTEGER, prismatic_taper_count INTEGER, vt_state TEXT ) """ ) # Create live_state table for upserts of the most recent snapshot per character c.execute( """ CREATE TABLE IF NOT EXISTS live_state ( character_name TEXT PRIMARY KEY, char_tag TEXT, session_id TEXT, timestamp TEXT, ew REAL, ns REAL, z REAL, kills INTEGER, kills_per_hour TEXT, onlinetime TEXT, deaths INTEGER, rares_found INTEGER, prismatic_taper_count INTEGER, vt_state TEXT ) """ ) conn.commit() conn.close() def save_snapshot(data: Dict) -> None: """ Save a telemetry snapshot into the local SQLite database. Inserts a full record into telemetry_log (history) and upserts into live_state for quick lookup of the most recent data per character. Respects WAL mode and checkpoint settings on each connection. """ # Open new connection with extended timeout for inserting data conn = sqlite3.connect(DB_FILE, timeout=30) # Bump SQLite runtime limits on this connection conn.setlimit(sqlite3.SQLITE_LIMIT_LENGTH, DB_MAX_SQL_LENGTH) conn.setlimit(sqlite3.SQLITE_LIMIT_SQL_LENGTH, DB_MAX_SQL_LENGTH) conn.setlimit(sqlite3.SQLITE_LIMIT_VARIABLE_NUMBER, DB_MAX_SQL_VARIABLES) c = conn.cursor() # Ensure WAL mode and checkpointing settings on this connection c.execute("PRAGMA journal_mode=WAL") c.execute("PRAGMA synchronous=NORMAL") c.execute(f"PRAGMA wal_autocheckpoint={DB_WAL_AUTOCHECKPOINT_PAGES}") # Insert the snapshot into the telemetry_log (history) table c.execute( """ INSERT INTO telemetry_log ( character_name, char_tag, session_id, timestamp, ew, ns, z, kills, kills_per_hour, onlinetime, deaths, rares_found, prismatic_taper_count, vt_state ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( data["character_name"], data.get("char_tag", ""), data["session_id"], data["timestamp"], data["ew"], data["ns"], data.get("z", 0.0), data["kills"], data.get("kills_per_hour", ""), data.get("onlinetime", ""), data.get("deaths", 0), data.get("rares_found", 0), data.get("prismatic_taper_count", 0), data.get("vt_state", "Unknown"), ), ) # Upsert (insert or update) the latest snapshot into live_state table c.execute( """ INSERT INTO live_state ( character_name, char_tag, session_id, timestamp, ew, ns, z, kills, kills_per_hour, onlinetime, deaths, rares_found, prismatic_taper_count, vt_state ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(character_name) DO UPDATE SET char_tag = excluded.char_tag, session_id = excluded.session_id, timestamp = excluded.timestamp, ew = excluded.ew, ns = excluded.ns, z = excluded.z, kills = excluded.kills, kills_per_hour = excluded.kills_per_hour, onlinetime = excluded.onlinetime, deaths = excluded.deaths, rares_found = excluded.rares_found, prismatic_taper_count = excluded.prismatic_taper_count, vt_state = excluded.vt_state """, ( data["character_name"], data.get("char_tag", ""), data["session_id"], data["timestamp"], data["ew"], data["ns"], data.get("z", 0.0), data["kills"], data.get("kills_per_hour", ""), data.get("onlinetime", ""), data.get("deaths", 0), data.get("rares_found", 0), data.get("prismatic_taper_count", 0), data.get("vt_state", "Unknown"), ), ) conn.commit() conn.close()