diff --git a/Dockerfile b/Dockerfile index be2c14f1..14b89196 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,18 +6,22 @@ WORKDIR /app # Upgrade pip and install Python dependencies RUN python -m pip install --upgrade pip && \ - pip install --no-cache-dir fastapi uvicorn pydantic pandas matplotlib websockets + pip install --no-cache-dir fastapi uvicorn pydantic websockets databases[postgresql] sqlalchemy alembic # Copy application code COPY static/ /app/static/ COPY main.py /app/main.py COPY db.py /app/db.py +COPY db_async.py /app/db_async.py +COPY alembic.ini /app/alembic.ini +COPY alembic/ /app/alembic/ COPY Dockerfile /Dockerfile # Expose the application port EXPOSE 8765 # Default environment variables (override as needed) -ENV DB_MAX_SIZE_MB=2048 \ +ENV DATABASE_URL=postgresql://postgres:password@db:5432/dereth \ + DB_MAX_SIZE_MB=2048 \ DB_RETENTION_DAYS=7 \ DB_MAX_SQL_LENGTH=1000000000 \ DB_MAX_SQL_VARIABLES=32766 \ diff --git a/README.md b/README.md index 50a7849d..cd92b6fd 100644 --- a/README.md +++ b/README.md @@ -42,8 +42,6 @@ Python packages: - fastapi - uvicorn - pydantic -- pandas -- matplotlib - websockets # required for sample data generator ## Installation @@ -60,7 +58,7 @@ Python packages: ``` 3. Install dependencies: ```bash - pip install fastapi uvicorn pydantic pandas matplotlib websockets + pip install fastapi uvicorn pydantic websockets ``` ## Configuration @@ -171,6 +169,16 @@ Example chat payload: } ``` +## Event Payload Formats + +For a complete reference of JSON payloads accepted by the backend (over `/ws/position`), see the file `EVENT_FORMATS.json` in the project root. It contains example schemas for: + - **Telemetry events** (`type`: "telemetry") + - **Spawn events** (`type`: "spawn") + - **Chat events** (`type`: "chat") + - **Rare events** (`type`: "rare") + +Each entry shows all required and optional fields, their types, and example values. + ### GET /live Returns active players seen within the last 30 seconds: @@ -207,3 +215,26 @@ Response: ## Contributing Contributions are welcome! Feel free to open issues or submit pull requests. + +## Roadmap & TODO +For detailed tasks, migration steps, and future enhancements, see [TODO.md](TODO.md). + +### Local Development Database +This project will migrate from SQLite to PostgreSQL/TimescaleDB. You can configure local development using Docker Compose or connect to an external instance: + +1. PostgreSQL/TimescaleDB via Docker Compose (recommended): + - Pros: + - Reproducible, isolated environment out-of-the-box + - No need to install Postgres locally + - Aligns development with production setups + - Cons: + - Additional resource usage (memory, CPU) + - Slightly more complex Docker configuration + +2. External PostgreSQL instance: + - Pros: + - Leverages existing infrastructure + - No Docker overhead + - Cons: + - Requires manual setup and Timescale extension + - Less portable for new contributors diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 00000000..ec1f0d53 --- /dev/null +++ b/alembic.ini @@ -0,0 +1,38 @@ +[alembic] +# path to migration scripts +script_location = alembic +# default database URL; overridden by DATABASE_URL env var in env.py +sqlalchemy.url = postgresql://postgres:password@localhost:5432/dereth + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +level = NOTSET +args = (sys.stderr,) +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s \ No newline at end of file diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 00000000..fce9fb84 --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,60 @@ +import os +from logging.config import fileConfig + +from sqlalchemy import engine_from_config, pool +from alembic import context + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the DATABASE_URL env var, if set; else fall back to ini file +database_url = os.getenv('DATABASE_URL', config.get_main_option('sqlalchemy.url')) +config.set_main_option('sqlalchemy.url', database_url) + +# Interpret log config +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +from db_async import metadata # noqa +target_metadata = metadata + + +def run_migrations_offline(): + '''Run migrations in 'offline' mode.''' # noqa + url = config.get_main_option('sqlalchemy.url') + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + '''Run migrations in 'online' mode.''' # noqa + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix='sqlalchemy.', + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=target_metadata, + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() \ No newline at end of file diff --git a/alembic/script.py.mako b/alembic/script.py.mako new file mode 100644 index 00000000..b672e5b4 --- /dev/null +++ b/alembic/script.py.mako @@ -0,0 +1,21 @@ +""" +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = '${up_revision}' +down_revision = ${repr(down_revision) if down_revision else None} +branch_labels = ${repr(branch_labels) if branch_labels else None} +depends_on = ${repr(depends_on) if depends_on else None} + +def upgrade(): + """Upgrade migrations go here.""" + pass + +def downgrade(): + """Downgrade migrations go here.""" + pass \ No newline at end of file diff --git a/alembic/versions/__init__.py b/alembic/versions/__init__.py new file mode 100644 index 00000000..8776ed6b --- /dev/null +++ b/alembic/versions/__init__.py @@ -0,0 +1,5 @@ +""" +This directory will hold Alembic migration scripts. +Each migration filename should follow the naming convention: + _.py +""" \ No newline at end of file diff --git a/db_async.py b/db_async.py new file mode 100644 index 00000000..68e403ee --- /dev/null +++ b/db_async.py @@ -0,0 +1,57 @@ +import os +import sqlalchemy +from databases import Database +from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime + +# Environment: Postgres/TimescaleDB connection URL +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:password@localhost:5432/dereth") +# Async database client +database = Database(DATABASE_URL) +# Metadata for SQLAlchemy Core +metadata = MetaData() + +# Telemetry events hypertable schema +telemetry_events = Table( + "telemetry_events", + metadata, + Column("id", Integer, primary_key=True), + Column("character_name", String, nullable=False), + Column("char_tag", String, nullable=True), + Column("session_id", String, nullable=False, index=True), + Column("timestamp", DateTime(timezone=True), nullable=False, index=True), + Column("ew", Float, nullable=False), + Column("ns", Float, nullable=False), + Column("z", Float, nullable=False), + Column("kills", Integer, nullable=False), + Column("kills_per_hour", Float, nullable=True), + Column("onlinetime", String, nullable=True), + Column("deaths", Integer, nullable=False), + Column("rares_found", Integer, nullable=False), + Column("prismatic_taper_count", Integer, nullable=False), + Column("vt_state", String, nullable=True), + # New telemetry metrics + Column("mem_mb", Float, nullable=True), + Column("cpu_pct", Float, nullable=True), + Column("mem_handles", Integer, nullable=True), + Column("latency_ms", Float, nullable=True), +) + +# Persistent kill statistics per character +char_stats = Table( + "char_stats", + metadata, + Column("character_name", String, primary_key=True), + Column("total_kills", Integer, nullable=False, default=0), +) + +async def init_db_async(): + """Create tables and enable TimescaleDB hypertable for telemetry_events.""" + # Create tables in Postgres + engine = sqlalchemy.create_engine(DATABASE_URL) + metadata.create_all(engine) + # Enable TimescaleDB extension and convert telemetry_events to hypertable + with engine.connect() as conn: + conn.execute("CREATE EXTENSION IF NOT EXISTS timescaledb;") + conn.execute( + "SELECT create_hypertable('telemetry_events', 'timestamp', if_not_exists => true);" + ) \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 013a91f6..01f7ff93 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,18 +5,37 @@ services: build: . ports: - "127.0.0.1:8765:8765" + depends_on: + - db volumes: - # Mount local database file for persistence - - "./dereth.db:/app/dereth.db" - "./main.py:/app/main.py" - - "./db.py:/app/db.py" + - "./db_async.py:/app/db_async.py" - "./static:/app/static" + - "./alembic:/app/alembic" + - "./alembic.ini:/app/alembic.ini" environment: - # Override database and application settings as needed + # Database connection URL for TimescaleDB + DATABASE_URL: "postgresql://postgres:password@db:5432/dereth" + # Override application settings as needed DB_MAX_SIZE_MB: "2048" DB_RETENTION_DAYS: "7" DB_MAX_SQL_LENGTH: "1000000000" DB_MAX_SQL_VARIABLES: "32766" DB_WAL_AUTOCHECKPOINT_PAGES: "1000" SHARED_SECRET: "your_shared_secret" - restart: unless-stopped \ No newline at end of file + restart: unless-stopped + + db: + image: timescale/timescaledb:latest-pg14 + container_name: dereth-db + environment: + POSTGRES_DB: dereth + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + volumes: + - timescale-data:/var/lib/postgresql/data + ports: + - "5432:5432" + +volumes: + timescale-data: \ No newline at end of file diff --git a/main.py b/main.py index f5942551..03f26a53 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,6 @@ from datetime import datetime, timedelta, timezone import json -import sqlite3 +import os from typing import Dict from fastapi import FastAPI, Header, HTTPException, Query, WebSocket, WebSocketDisconnect @@ -11,9 +11,11 @@ from fastapi.encoders import jsonable_encoder from pydantic import BaseModel from typing import Optional -from db import init_db, save_snapshot, DB_FILE +# Async database support +from sqlalchemy import text +from sqlalchemy.dialects.postgresql import insert as pg_insert +from db_async import database, telemetry_events, char_stats, init_db_async import asyncio -from starlette.concurrency import run_in_threadpool # ------------------------------------------------------------------ app = FastAPI() @@ -29,7 +31,7 @@ ACTIVE_WINDOW = timedelta(seconds=30) # player is “online” if seen in last class TelemetrySnapshot(BaseModel): character_name: str - char_tag: str + char_tag: Optional[str] = None session_id: str timestamp: datetime @@ -38,17 +40,29 @@ class TelemetrySnapshot(BaseModel): z: float kills: int - kills_per_hour: Optional[str] = None # now optional - onlinetime: Optional[str] = None # now optional + kills_per_hour: Optional[float] = None + onlinetime: Optional[str] = None deaths: int rares_found: int prismatic_taper_count: int vt_state: str + # Optional telemetry metrics + mem_mb: Optional[float] = None + cpu_pct: Optional[float] = None + mem_handles: Optional[int] = None + latency_ms: Optional[float] = None @app.on_event("startup") -def on_startup(): - init_db() +async def on_startup(): + # Connect to database and initialize TimescaleDB hypertable + await database.connect() + await init_db_async() + +@app.on_event("shutdown") +async def on_shutdown(): + # Disconnect from database + await database.disconnect() @@ -60,78 +74,47 @@ def debug(): @app.get("/live", response_model=dict) @app.get("/live/", response_model=dict) -def get_live_players(): - # compute cutoff once - now_utc = datetime.now(timezone.utc) - cutoff = now_utc - ACTIVE_WINDOW - - cutoff_sql = cutoff.strftime("%Y-%m-%d %H:%M:%S") - - try: - with sqlite3.connect(DB_FILE) as conn: - conn.row_factory = sqlite3.Row - query = """ - SELECT * - FROM live_state - WHERE datetime(timestamp) > datetime(?, 'utc') - """ - rows = conn.execute(query, (cutoff_sql,)).fetchall() - - except sqlite3.Error as e: - # log e if you have logging set up - raise HTTPException(status_code=500, detail="Database error") - - # build list of dicts - players = [] - for r in rows: - players.append(dict(r)) - +async def get_live_players(): + """Return recent live telemetry per character (last 30 seconds).""" + cutoff = datetime.now(timezone.utc) - ACTIVE_WINDOW + query = text( + """ + SELECT * FROM ( + SELECT DISTINCT ON (character_name) * + FROM telemetry_events + ORDER BY character_name, timestamp DESC + ) sub + WHERE timestamp > :cutoff + """ + ) + rows = await database.fetch_all(query, {"cutoff": cutoff}) + players = [dict(r) for r in rows] return JSONResponse(content={"players": players}) @app.get("/history/") @app.get("/history") -def get_history( +async def get_history( from_ts: str | None = Query(None, alias="from"), to_ts: str | None = Query(None, alias="to"), ): - """ - Returns a time‐ordered list of telemetry snapshots: - - timestamp: ISO8601 string - - character_name: str - - kills: cumulative kill count (int) - - kph: kills_per_hour (float) - """ - conn = sqlite3.connect(DB_FILE) - conn.row_factory = sqlite3.Row - - # Build the base query - sql = """ - SELECT - timestamp, - character_name, - kills, - CAST(kills_per_hour AS REAL) AS kph - FROM telemetry_log - """ - params: list[str] = [] + """Returns a time-ordered list of telemetry snapshots.""" + sql = ( + "SELECT timestamp, character_name, kills, kills_per_hour AS kph " + "FROM telemetry_events" + ) + values: dict = {} conditions: list[str] = [] - - # Add optional filters if from_ts: - conditions.append("timestamp >= ?") - params.append(from_ts) + conditions.append("timestamp >= :from_ts") + values["from_ts"] = from_ts if to_ts: - conditions.append("timestamp <= ?") - params.append(to_ts) + conditions.append("timestamp <= :to_ts") + values["to_ts"] = to_ts if conditions: sql += " WHERE " + " AND ".join(conditions) - sql += " ORDER BY timestamp" - - rows = conn.execute(sql, params).fetchall() - conn.close() - + rows = await database.fetch_all(text(sql), values) data = [ { "timestamp": row["timestamp"], @@ -144,33 +127,23 @@ def get_history( return JSONResponse(content={"data": data}) -# ------------------------ GET Trails --------------------------------- +# --- GET Trails --------------------------------- @app.get("/trails") @app.get("/trails/") -def get_trails( - seconds: int = Query(600, ge=0, description="Lookback window in seconds") +async def get_trails( + seconds: int = Query(600, ge=0, description="Lookback window in seconds"), ): - """ - Return position snapshots (timestamp, character_name, ew, ns, z) - for the past `seconds` seconds. - """ - # match the same string format as stored timestamps (via str(datetime)) - cutoff_dt = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta( - seconds=seconds - ) - cutoff = str(cutoff_dt) - conn = sqlite3.connect(DB_FILE) - conn.row_factory = sqlite3.Row - rows = conn.execute( + """Return position snapshots (timestamp, character_name, ew, ns, z) for the past `seconds`.""" + cutoff = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=seconds) + sql = text( """ SELECT timestamp, character_name, ew, ns, z - FROM telemetry_log - WHERE timestamp >= ? - ORDER BY character_name, timestamp - """, - (cutoff,), - ).fetchall() - conn.close() + FROM telemetry_events + WHERE timestamp >= :cutoff + ORDER BY character_name, timestamp + """ + ) + rows = await database.fetch_all(sql, {"cutoff": cutoff}) trails = [ { "timestamp": r["timestamp"], @@ -236,11 +209,29 @@ async def ws_receive_snapshots( continue # Telemetry message: save to DB and broadcast if msg_type == "telemetry": + # Parse and broadcast telemetry snapshot payload = data.copy() payload.pop("type", None) snap = TelemetrySnapshot.parse_obj(payload) live_snapshots[snap.character_name] = snap.dict() - await run_in_threadpool(save_snapshot, snap.dict()) + # Persist to TimescaleDB + await database.execute( + telemetry_events.insert().values(**snap.dict()) + ) + # Update persistent kill stats (delta per session) + key = (snap.session_id, snap.character_name) + last = ws_receive_snapshots._last_kills.get(key, 0) + delta = snap.kills - last + if delta > 0: + stmt = pg_insert(char_stats).values( + character_name=snap.character_name, + total_kills=delta + ).on_conflict_do_update( + index_elements=["character_name"], + set_={"total_kills": char_stats.c.total_kills + delta}, + ) + await database.execute(stmt) + ws_receive_snapshots._last_kills[key] = snap.kills await _broadcast_to_browser_clients(snap.dict()) continue # Chat message: broadcast to browser clients only (no DB write) @@ -255,6 +246,9 @@ async def ws_receive_snapshots( del plugin_conns[n] print(f"[WS] Cleaned up plugin connections for {websocket.client}") +# In-memory store of last kills per session for delta calculations +ws_receive_snapshots._last_kills = {} + @app.websocket("/ws/live") async def ws_live_updates(websocket: WebSocket): # Browser clients connect here to receive telemetry and chat, and send commands