Refactor to async TimescaleDB backend & add Alembic migrations

This commit is contained in:
erik 2025-05-18 19:07:23 +00:00
parent d396942deb
commit c20d54d037
9 changed files with 328 additions and 99 deletions

View file

@ -6,18 +6,22 @@ WORKDIR /app
# Upgrade pip and install Python dependencies # Upgrade pip and install Python dependencies
RUN python -m pip install --upgrade pip && \ RUN python -m pip install --upgrade pip && \
pip install --no-cache-dir fastapi uvicorn pydantic pandas matplotlib websockets pip install --no-cache-dir fastapi uvicorn pydantic websockets databases[postgresql] sqlalchemy alembic
# Copy application code # Copy application code
COPY static/ /app/static/ COPY static/ /app/static/
COPY main.py /app/main.py COPY main.py /app/main.py
COPY db.py /app/db.py COPY db.py /app/db.py
COPY db_async.py /app/db_async.py
COPY alembic.ini /app/alembic.ini
COPY alembic/ /app/alembic/
COPY Dockerfile /Dockerfile COPY Dockerfile /Dockerfile
# Expose the application port # Expose the application port
EXPOSE 8765 EXPOSE 8765
# Default environment variables (override as needed) # Default environment variables (override as needed)
ENV DB_MAX_SIZE_MB=2048 \ ENV DATABASE_URL=postgresql://postgres:password@db:5432/dereth \
DB_MAX_SIZE_MB=2048 \
DB_RETENTION_DAYS=7 \ DB_RETENTION_DAYS=7 \
DB_MAX_SQL_LENGTH=1000000000 \ DB_MAX_SQL_LENGTH=1000000000 \
DB_MAX_SQL_VARIABLES=32766 \ DB_MAX_SQL_VARIABLES=32766 \

View file

@ -42,8 +42,6 @@ Python packages:
- fastapi - fastapi
- uvicorn - uvicorn
- pydantic - pydantic
- pandas
- matplotlib
- websockets # required for sample data generator - websockets # required for sample data generator
## Installation ## Installation
@ -60,7 +58,7 @@ Python packages:
``` ```
3. Install dependencies: 3. Install dependencies:
```bash ```bash
pip install fastapi uvicorn pydantic pandas matplotlib websockets pip install fastapi uvicorn pydantic websockets
``` ```
## Configuration ## Configuration
@ -171,6 +169,16 @@ Example chat payload:
} }
``` ```
## Event Payload Formats
For a complete reference of JSON payloads accepted by the backend (over `/ws/position`), see the file `EVENT_FORMATS.json` in the project root. It contains example schemas for:
- **Telemetry events** (`type`: "telemetry")
- **Spawn events** (`type`: "spawn")
- **Chat events** (`type`: "chat")
- **Rare events** (`type`: "rare")
Each entry shows all required and optional fields, their types, and example values.
### GET /live ### GET /live
Returns active players seen within the last 30 seconds: Returns active players seen within the last 30 seconds:
@ -207,3 +215,26 @@ Response:
## Contributing ## Contributing
Contributions are welcome! Feel free to open issues or submit pull requests. Contributions are welcome! Feel free to open issues or submit pull requests.
## Roadmap & TODO
For detailed tasks, migration steps, and future enhancements, see [TODO.md](TODO.md).
### Local Development Database
This project will migrate from SQLite to PostgreSQL/TimescaleDB. You can configure local development using Docker Compose or connect to an external instance:
1. PostgreSQL/TimescaleDB via Docker Compose (recommended):
- Pros:
- Reproducible, isolated environment out-of-the-box
- No need to install Postgres locally
- Aligns development with production setups
- Cons:
- Additional resource usage (memory, CPU)
- Slightly more complex Docker configuration
2. External PostgreSQL instance:
- Pros:
- Leverages existing infrastructure
- No Docker overhead
- Cons:
- Requires manual setup and Timescale extension
- Less portable for new contributors

38
alembic.ini Normal file
View file

@ -0,0 +1,38 @@
[alembic]
# path to migration scripts
script_location = alembic
# default database URL; overridden by DATABASE_URL env var in env.py
sqlalchemy.url = postgresql://postgres:password@localhost:5432/dereth
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
level = NOTSET
args = (sys.stderr,)
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s

60
alembic/env.py Normal file
View file

@ -0,0 +1,60 @@
import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the DATABASE_URL env var, if set; else fall back to ini file
database_url = os.getenv('DATABASE_URL', config.get_main_option('sqlalchemy.url'))
config.set_main_option('sqlalchemy.url', database_url)
# Interpret log config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
from db_async import metadata # noqa
target_metadata = metadata
def run_migrations_offline():
'''Run migrations in 'offline' mode.''' # noqa
url = config.get_main_option('sqlalchemy.url')
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
'''Run migrations in 'online' mode.''' # noqa
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

21
alembic/script.py.mako Normal file
View file

@ -0,0 +1,21 @@
"""
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '${up_revision}'
down_revision = ${repr(down_revision) if down_revision else None}
branch_labels = ${repr(branch_labels) if branch_labels else None}
depends_on = ${repr(depends_on) if depends_on else None}
def upgrade():
"""Upgrade migrations go here."""
pass
def downgrade():
"""Downgrade migrations go here."""
pass

View file

@ -0,0 +1,5 @@
"""
This directory will hold Alembic migration scripts.
Each migration filename should follow the naming convention:
<revision_id>_<slug>.py
"""

57
db_async.py Normal file
View file

@ -0,0 +1,57 @@
import os
import sqlalchemy
from databases import Database
from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime
# Environment: Postgres/TimescaleDB connection URL
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:password@localhost:5432/dereth")
# Async database client
database = Database(DATABASE_URL)
# Metadata for SQLAlchemy Core
metadata = MetaData()
# Telemetry events hypertable schema
telemetry_events = Table(
"telemetry_events",
metadata,
Column("id", Integer, primary_key=True),
Column("character_name", String, nullable=False),
Column("char_tag", String, nullable=True),
Column("session_id", String, nullable=False, index=True),
Column("timestamp", DateTime(timezone=True), nullable=False, index=True),
Column("ew", Float, nullable=False),
Column("ns", Float, nullable=False),
Column("z", Float, nullable=False),
Column("kills", Integer, nullable=False),
Column("kills_per_hour", Float, nullable=True),
Column("onlinetime", String, nullable=True),
Column("deaths", Integer, nullable=False),
Column("rares_found", Integer, nullable=False),
Column("prismatic_taper_count", Integer, nullable=False),
Column("vt_state", String, nullable=True),
# New telemetry metrics
Column("mem_mb", Float, nullable=True),
Column("cpu_pct", Float, nullable=True),
Column("mem_handles", Integer, nullable=True),
Column("latency_ms", Float, nullable=True),
)
# Persistent kill statistics per character
char_stats = Table(
"char_stats",
metadata,
Column("character_name", String, primary_key=True),
Column("total_kills", Integer, nullable=False, default=0),
)
async def init_db_async():
"""Create tables and enable TimescaleDB hypertable for telemetry_events."""
# Create tables in Postgres
engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.create_all(engine)
# Enable TimescaleDB extension and convert telemetry_events to hypertable
with engine.connect() as conn:
conn.execute("CREATE EXTENSION IF NOT EXISTS timescaledb;")
conn.execute(
"SELECT create_hypertable('telemetry_events', 'timestamp', if_not_exists => true);"
)

View file

@ -5,14 +5,18 @@ services:
build: . build: .
ports: ports:
- "127.0.0.1:8765:8765" - "127.0.0.1:8765:8765"
depends_on:
- db
volumes: volumes:
# Mount local database file for persistence
- "./dereth.db:/app/dereth.db"
- "./main.py:/app/main.py" - "./main.py:/app/main.py"
- "./db.py:/app/db.py" - "./db_async.py:/app/db_async.py"
- "./static:/app/static" - "./static:/app/static"
- "./alembic:/app/alembic"
- "./alembic.ini:/app/alembic.ini"
environment: environment:
# Override database and application settings as needed # Database connection URL for TimescaleDB
DATABASE_URL: "postgresql://postgres:password@db:5432/dereth"
# Override application settings as needed
DB_MAX_SIZE_MB: "2048" DB_MAX_SIZE_MB: "2048"
DB_RETENTION_DAYS: "7" DB_RETENTION_DAYS: "7"
DB_MAX_SQL_LENGTH: "1000000000" DB_MAX_SQL_LENGTH: "1000000000"
@ -20,3 +24,18 @@ services:
DB_WAL_AUTOCHECKPOINT_PAGES: "1000" DB_WAL_AUTOCHECKPOINT_PAGES: "1000"
SHARED_SECRET: "your_shared_secret" SHARED_SECRET: "your_shared_secret"
restart: unless-stopped restart: unless-stopped
db:
image: timescale/timescaledb:latest-pg14
container_name: dereth-db
environment:
POSTGRES_DB: dereth
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
volumes:
- timescale-data:/var/lib/postgresql/data
ports:
- "5432:5432"
volumes:
timescale-data:

168
main.py
View file

@ -1,6 +1,6 @@
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
import json import json
import sqlite3 import os
from typing import Dict from typing import Dict
from fastapi import FastAPI, Header, HTTPException, Query, WebSocket, WebSocketDisconnect from fastapi import FastAPI, Header, HTTPException, Query, WebSocket, WebSocketDisconnect
@ -11,9 +11,11 @@ from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional from typing import Optional
from db import init_db, save_snapshot, DB_FILE # Async database support
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert as pg_insert
from db_async import database, telemetry_events, char_stats, init_db_async
import asyncio import asyncio
from starlette.concurrency import run_in_threadpool
# ------------------------------------------------------------------ # ------------------------------------------------------------------
app = FastAPI() app = FastAPI()
@ -29,7 +31,7 @@ ACTIVE_WINDOW = timedelta(seconds=30) # player is “online” if seen in last
class TelemetrySnapshot(BaseModel): class TelemetrySnapshot(BaseModel):
character_name: str character_name: str
char_tag: str char_tag: Optional[str] = None
session_id: str session_id: str
timestamp: datetime timestamp: datetime
@ -38,17 +40,29 @@ class TelemetrySnapshot(BaseModel):
z: float z: float
kills: int kills: int
kills_per_hour: Optional[str] = None # now optional kills_per_hour: Optional[float] = None
onlinetime: Optional[str] = None # now optional onlinetime: Optional[str] = None
deaths: int deaths: int
rares_found: int rares_found: int
prismatic_taper_count: int prismatic_taper_count: int
vt_state: str vt_state: str
# Optional telemetry metrics
mem_mb: Optional[float] = None
cpu_pct: Optional[float] = None
mem_handles: Optional[int] = None
latency_ms: Optional[float] = None
@app.on_event("startup") @app.on_event("startup")
def on_startup(): async def on_startup():
init_db() # Connect to database and initialize TimescaleDB hypertable
await database.connect()
await init_db_async()
@app.on_event("shutdown")
async def on_shutdown():
# Disconnect from database
await database.disconnect()
@ -60,78 +74,47 @@ def debug():
@app.get("/live", response_model=dict) @app.get("/live", response_model=dict)
@app.get("/live/", response_model=dict) @app.get("/live/", response_model=dict)
def get_live_players(): async def get_live_players():
# compute cutoff once """Return recent live telemetry per character (last 30 seconds)."""
now_utc = datetime.now(timezone.utc) cutoff = datetime.now(timezone.utc) - ACTIVE_WINDOW
cutoff = now_utc - ACTIVE_WINDOW query = text(
cutoff_sql = cutoff.strftime("%Y-%m-%d %H:%M:%S")
try:
with sqlite3.connect(DB_FILE) as conn:
conn.row_factory = sqlite3.Row
query = """
SELECT *
FROM live_state
WHERE datetime(timestamp) > datetime(?, 'utc')
""" """
rows = conn.execute(query, (cutoff_sql,)).fetchall() SELECT * FROM (
SELECT DISTINCT ON (character_name) *
except sqlite3.Error as e: FROM telemetry_events
# log e if you have logging set up ORDER BY character_name, timestamp DESC
raise HTTPException(status_code=500, detail="Database error") ) sub
WHERE timestamp > :cutoff
# build list of dicts """
players = [] )
for r in rows: rows = await database.fetch_all(query, {"cutoff": cutoff})
players.append(dict(r)) players = [dict(r) for r in rows]
return JSONResponse(content={"players": players}) return JSONResponse(content={"players": players})
@app.get("/history/") @app.get("/history/")
@app.get("/history") @app.get("/history")
def get_history( async def get_history(
from_ts: str | None = Query(None, alias="from"), from_ts: str | None = Query(None, alias="from"),
to_ts: str | None = Query(None, alias="to"), to_ts: str | None = Query(None, alias="to"),
): ):
""" """Returns a time-ordered list of telemetry snapshots."""
Returns a timeordered list of telemetry snapshots: sql = (
- timestamp: ISO8601 string "SELECT timestamp, character_name, kills, kills_per_hour AS kph "
- character_name: str "FROM telemetry_events"
- kills: cumulative kill count (int) )
- kph: kills_per_hour (float) values: dict = {}
"""
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
# Build the base query
sql = """
SELECT
timestamp,
character_name,
kills,
CAST(kills_per_hour AS REAL) AS kph
FROM telemetry_log
"""
params: list[str] = []
conditions: list[str] = [] conditions: list[str] = []
# Add optional filters
if from_ts: if from_ts:
conditions.append("timestamp >= ?") conditions.append("timestamp >= :from_ts")
params.append(from_ts) values["from_ts"] = from_ts
if to_ts: if to_ts:
conditions.append("timestamp <= ?") conditions.append("timestamp <= :to_ts")
params.append(to_ts) values["to_ts"] = to_ts
if conditions: if conditions:
sql += " WHERE " + " AND ".join(conditions) sql += " WHERE " + " AND ".join(conditions)
sql += " ORDER BY timestamp" sql += " ORDER BY timestamp"
rows = await database.fetch_all(text(sql), values)
rows = conn.execute(sql, params).fetchall()
conn.close()
data = [ data = [
{ {
"timestamp": row["timestamp"], "timestamp": row["timestamp"],
@ -144,33 +127,23 @@ def get_history(
return JSONResponse(content={"data": data}) return JSONResponse(content={"data": data})
# ------------------------ GET Trails --------------------------------- # --- GET Trails ---------------------------------
@app.get("/trails") @app.get("/trails")
@app.get("/trails/") @app.get("/trails/")
def get_trails( async def get_trails(
seconds: int = Query(600, ge=0, description="Lookback window in seconds") seconds: int = Query(600, ge=0, description="Lookback window in seconds"),
): ):
""" """Return position snapshots (timestamp, character_name, ew, ns, z) for the past `seconds`."""
Return position snapshots (timestamp, character_name, ew, ns, z) cutoff = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=seconds)
for the past `seconds` seconds. sql = text(
"""
# match the same string format as stored timestamps (via str(datetime))
cutoff_dt = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(
seconds=seconds
)
cutoff = str(cutoff_dt)
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
rows = conn.execute(
""" """
SELECT timestamp, character_name, ew, ns, z SELECT timestamp, character_name, ew, ns, z
FROM telemetry_log FROM telemetry_events
WHERE timestamp >= ? WHERE timestamp >= :cutoff
ORDER BY character_name, timestamp ORDER BY character_name, timestamp
""", """
(cutoff,), )
).fetchall() rows = await database.fetch_all(sql, {"cutoff": cutoff})
conn.close()
trails = [ trails = [
{ {
"timestamp": r["timestamp"], "timestamp": r["timestamp"],
@ -236,11 +209,29 @@ async def ws_receive_snapshots(
continue continue
# Telemetry message: save to DB and broadcast # Telemetry message: save to DB and broadcast
if msg_type == "telemetry": if msg_type == "telemetry":
# Parse and broadcast telemetry snapshot
payload = data.copy() payload = data.copy()
payload.pop("type", None) payload.pop("type", None)
snap = TelemetrySnapshot.parse_obj(payload) snap = TelemetrySnapshot.parse_obj(payload)
live_snapshots[snap.character_name] = snap.dict() live_snapshots[snap.character_name] = snap.dict()
await run_in_threadpool(save_snapshot, snap.dict()) # Persist to TimescaleDB
await database.execute(
telemetry_events.insert().values(**snap.dict())
)
# Update persistent kill stats (delta per session)
key = (snap.session_id, snap.character_name)
last = ws_receive_snapshots._last_kills.get(key, 0)
delta = snap.kills - last
if delta > 0:
stmt = pg_insert(char_stats).values(
character_name=snap.character_name,
total_kills=delta
).on_conflict_do_update(
index_elements=["character_name"],
set_={"total_kills": char_stats.c.total_kills + delta},
)
await database.execute(stmt)
ws_receive_snapshots._last_kills[key] = snap.kills
await _broadcast_to_browser_clients(snap.dict()) await _broadcast_to_browser_clients(snap.dict())
continue continue
# Chat message: broadcast to browser clients only (no DB write) # Chat message: broadcast to browser clients only (no DB write)
@ -255,6 +246,9 @@ async def ws_receive_snapshots(
del plugin_conns[n] del plugin_conns[n]
print(f"[WS] Cleaned up plugin connections for {websocket.client}") print(f"[WS] Cleaned up plugin connections for {websocket.client}")
# In-memory store of last kills per session for delta calculations
ws_receive_snapshots._last_kills = {}
@app.websocket("/ws/live") @app.websocket("/ws/live")
async def ws_live_updates(websocket: WebSocket): async def ws_live_updates(websocket: WebSocket):
# Browser clients connect here to receive telemetry and chat, and send commands # Browser clients connect here to receive telemetry and chat, and send commands