MosswartOverlord/db.py
2025-05-24 18:33:03 +00:00

197 lines
7.8 KiB
Python

"""SQLite3 helper module for local telemetry storage.
Provides functions to initialize the local database schema and save
telemetry snapshots into history and live_state tables.
Enforces WAL mode, size limits, and auto-vacuum for efficient storage.
"""
import os
import sqlite3
from typing import Dict
from datetime import datetime, timedelta
# Local SQLite database file name (used when running without TimescaleDB)
DB_FILE = "dereth.db"
# Maximum allowed database size (in MB). Defaults to 2048 (2GB). Override via env DB_MAX_SIZE_MB.
MAX_DB_SIZE_MB = int(os.getenv("DB_MAX_SIZE_MB", "2048"))
# Retention window for telemetry history in days (currently not auto-enforced).
# Override via env DB_RETENTION_DAYS for future cleanup scripts.
MAX_RETENTION_DAYS = int(os.getenv("DB_RETENTION_DAYS", "7"))
# SQLite runtime limits customization
DB_MAX_SQL_LENGTH = int(os.getenv("DB_MAX_SQL_LENGTH", "1000000000"))
DB_MAX_SQL_VARIABLES = int(os.getenv("DB_MAX_SQL_VARIABLES", "32766"))
# Number of WAL frames to write before forcing a checkpoint (override via env DB_WAL_AUTOCHECKPOINT_PAGES)
DB_WAL_AUTOCHECKPOINT_PAGES = int(os.getenv("DB_WAL_AUTOCHECKPOINT_PAGES", "1000"))
def init_db() -> None:
"""
Initialize local SQLite database schema for telemetry logging.
- Applies SQLite PRAGMA settings for performance and file size management
- Ensures WAL journaling and auto-vacuum for concurrency and compaction
- Creates telemetry_log for full history and live_state for latest snapshot per character
"""
# Open connection with a longer timeout
# Open connection with extended timeout for schema operations
conn = sqlite3.connect(DB_FILE, timeout=30)
# Bump SQLite runtime limits
conn.setlimit(sqlite3.SQLITE_LIMIT_LENGTH, DB_MAX_SQL_LENGTH)
conn.setlimit(sqlite3.SQLITE_LIMIT_SQL_LENGTH, DB_MAX_SQL_LENGTH)
conn.setlimit(sqlite3.SQLITE_LIMIT_VARIABLE_NUMBER, DB_MAX_SQL_VARIABLES)
c = conn.cursor()
# Enable auto_vacuum FULL and rebuild DB so that deletions shrink the file
# Enable full auto-vacuum to shrink database file on deletes
c.execute("PRAGMA auto_vacuum=FULL;")
conn.commit()
# Rebuild database to apply auto_vacuum changes
c.execute("VACUUM;")
conn.commit()
# Switch to WAL mode for concurrency, adjust checkpointing, and enforce max size
# Configure write-ahead logging for concurrency and performance
c.execute("PRAGMA journal_mode=WAL")
c.execute("PRAGMA synchronous=NORMAL")
# Auto-checkpoint after specified WAL frames to limit WAL file size
c.execute(f"PRAGMA wal_autocheckpoint={DB_WAL_AUTOCHECKPOINT_PAGES}")
# Create history log table for all telemetry snapshots
c.execute(
"""
CREATE TABLE IF NOT EXISTS telemetry_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
character_name TEXT NOT NULL,
char_tag TEXT,
session_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
ew REAL,
ns REAL,
z REAL,
kills INTEGER,
kills_per_hour TEXT,
onlinetime TEXT,
deaths INTEGER,
rares_found INTEGER,
prismatic_taper_count INTEGER,
vt_state TEXT
)
"""
)
# Create live_state table for upserts of the most recent snapshot per character
c.execute(
"""
CREATE TABLE IF NOT EXISTS live_state (
character_name TEXT PRIMARY KEY,
char_tag TEXT,
session_id TEXT,
timestamp TEXT,
ew REAL,
ns REAL,
z REAL,
kills INTEGER,
kills_per_hour TEXT,
onlinetime TEXT,
deaths INTEGER,
rares_found INTEGER,
prismatic_taper_count INTEGER,
vt_state TEXT
)
"""
)
conn.commit()
conn.close()
def save_snapshot(data: Dict) -> None:
"""
Save a telemetry snapshot into the local SQLite database.
Inserts a full record into telemetry_log (history) and upserts into live_state
for quick lookup of the most recent data per character.
Respects WAL mode and checkpoint settings on each connection.
"""
# Open new connection with extended timeout for inserting data
conn = sqlite3.connect(DB_FILE, timeout=30)
# Bump SQLite runtime limits on this connection
conn.setlimit(sqlite3.SQLITE_LIMIT_LENGTH, DB_MAX_SQL_LENGTH)
conn.setlimit(sqlite3.SQLITE_LIMIT_SQL_LENGTH, DB_MAX_SQL_LENGTH)
conn.setlimit(sqlite3.SQLITE_LIMIT_VARIABLE_NUMBER, DB_MAX_SQL_VARIABLES)
c = conn.cursor()
# Ensure WAL mode and checkpointing settings on this connection
c.execute("PRAGMA journal_mode=WAL")
c.execute("PRAGMA synchronous=NORMAL")
c.execute(f"PRAGMA wal_autocheckpoint={DB_WAL_AUTOCHECKPOINT_PAGES}")
# Insert the snapshot into the telemetry_log (history) table
c.execute(
"""
INSERT INTO telemetry_log (
character_name, char_tag, session_id, timestamp,
ew, ns, z,
kills, kills_per_hour, onlinetime,
deaths, rares_found, prismatic_taper_count, vt_state
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
data["character_name"],
data.get("char_tag", ""),
data["session_id"],
data["timestamp"],
data["ew"],
data["ns"],
data.get("z", 0.0),
data["kills"],
data.get("kills_per_hour", ""),
data.get("onlinetime", ""),
data.get("deaths", 0),
data.get("rares_found", 0),
data.get("prismatic_taper_count", 0),
data.get("vt_state", "Unknown"),
),
)
# Upsert (insert or update) the latest snapshot into live_state table
c.execute(
"""
INSERT INTO live_state (
character_name, char_tag, session_id, timestamp,
ew, ns, z,
kills, kills_per_hour, onlinetime,
deaths, rares_found, prismatic_taper_count, vt_state
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(character_name) DO UPDATE SET
char_tag = excluded.char_tag,
session_id = excluded.session_id,
timestamp = excluded.timestamp,
ew = excluded.ew,
ns = excluded.ns,
z = excluded.z,
kills = excluded.kills,
kills_per_hour = excluded.kills_per_hour,
onlinetime = excluded.onlinetime,
deaths = excluded.deaths,
rares_found = excluded.rares_found,
prismatic_taper_count = excluded.prismatic_taper_count,
vt_state = excluded.vt_state
""",
(
data["character_name"],
data.get("char_tag", ""),
data["session_id"],
data["timestamp"],
data["ew"],
data["ns"],
data.get("z", 0.0),
data["kills"],
data.get("kills_per_hour", ""),
data.get("onlinetime", ""),
data.get("deaths", 0),
data.get("rares_found", 0),
data.get("prismatic_taper_count", 0),
data.get("vt_state", "Unknown"),
),
)
conn.commit()
conn.close()