From c4182215751dd557193409f30c6b01d66625f608 Mon Sep 17 00:00:00 2001 From: erik Date: Thu, 22 May 2025 15:30:45 +0000 Subject: [PATCH] Working version with new streaming and DB --- ARCHITECTURE.md | 84 ++++++++++++++++++++++++++++++ Dockerfile | 2 +- EVENT_FORMATS.json | 45 ++++++++++++++++ README.md | 22 +++++++- db_async.py | 51 +++++++++++++++--- generate_data.py | 4 +- main.py | 125 ++++++++++++++++++++++++++++++++++++--------- static/script.js | 6 +-- 8 files changed, 302 insertions(+), 37 deletions(-) create mode 100644 ARCHITECTURE.md create mode 100644 EVENT_FORMATS.json diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md new file mode 100644 index 00000000..b888260f --- /dev/null +++ b/ARCHITECTURE.md @@ -0,0 +1,84 @@ +# Project Architecture and Data Model + +This document provides an overview of the project files, their roles, and a detailed description of the database architecture and data model. + +## Project Structure + +Root directory: +- **Dockerfile**: Defines the Python 3.12-slim image, installs dependencies (FastAPI, Uvicorn, SQLAlchemy, databases, TimescaleDB support), and runs the app. +- **docker-compose.yml**: Orchestrates two services: + - **dereth-tracker**: The FastAPI application container. + - **db**: A TimescaleDB (PostgreSQL 14 + TimescaleDB extension) container for persistent storage. +- **README.md**: High-level documentation and usage instructions. +- **EVENT_FORMATS.json**: Example JSON payloads for all event types (`telemetry`, `spawn`, `chat`, `rare`). +- **db.py**: Legacy SQLite-based storage (telemetry_log & live_state tables, WAL mode, auto-vacuum). +- **db_async.py**: Async database definitions for PostgreSQL/TimescaleDB: + - Table schemas (SQLAlchemy Core): `telemetry_events`, `char_stats`, `rare_stats`, `rare_stats_sessions`, `spawn_events`. + - `init_db_async()`: Creates tables, enables TimescaleDB extension, and configures a hypertable on `telemetry_events`. +- **main.py**: The FastAPI application: + - HTTP endpoints: `/debug`, `/live`, `/history`, `/trails`. + - WebSocket endpoints: `/ws/position` (plugin data in), `/ws/live` (browser live updates). + - Pydantic models: `TelemetrySnapshot`, `SpawnEvent`. + - In-memory state: `live_snapshots`, WebSocket connection registries. +- **generate_data.py**: Sample WebSocket client that sends synthetic telemetry snapshots. +- **alembic/** & **alembic.ini**: Migration tooling for evolving the database schema. +- **static/**: Frontend assets (HTML, CSS, JavaScript, images) for the live map UI. +- **FIXES.md**, **LESSONSLEARNED.md**, **TODO.md**: Project notes and future work. + +## Database Overview + +### Technology Stack +- **PostgreSQL** 14 with **TimescaleDB** extension for time-series optimization. +- **databases** library (async) with **SQLAlchemy Core** for schema definitions and queries. +- Environment variable: `DATABASE_URL` controls the connection string. + +### Tables and Hypertable +1. **telemetry_events** (hypertable) + - Columns: + - `id`: Integer, primary key. + - `character_name` (String), `char_tag` (String, nullable), `session_id` (String, indexed). + - `timestamp` (DateTime with TZ, indexed) — partitioning column for the hypertable. + - `ew`, `ns`, `z`: Float coordinates. + - `kills`, `deaths`, `rares_found`, `prismatic_taper_count`: Integer metrics. + - `kills_per_hour`, `onlinetime` (String), `vt_state` (String). + - Optional: `mem_mb`, `cpu_pct`, `mem_handles`, `latency_ms`. + - Created via `SELECT create_hypertable('telemetry_events', 'timestamp', if_not_exists=>true, create_default_indexes=>false)`. + +2. **char_stats** + - Tracks cumulative kills per character. + - Columns: `character_name` (PK), `total_kills` (Integer). + +3. **rare_stats** + - Tracks total rare spawns per character. + - Columns: `character_name` (PK), `total_rares` (Integer). + +4. **rare_stats_sessions** + - Tracks rarities per session. + - Columns: composite PK `(character_name, session_id)`, `session_rares` (Integer). + +5. **spawn_events** + - Records individual mob spawn events for heatmapping. + - Columns: `id` (PK), `character_name` (String), `mob` (String), `timestamp` (DateTime), `ew`, `ns`, `z` (Float). + +### Initialization and Migrations +- On startup (`main.py`), `init_db_async()` is called: + 1. Creates all tables via SQLAlchemy’s `metadata.create_all()`. + 2. Enables TimescaleDB extension. + 3. Converts `telemetry_events` to a hypertable, skipping default index creation to avoid PK/index collisions. +- Alembic is configured for schema migrations (`alembic/` directory). + +## Data Ingestion Flow +1. **Plugin** connects to `/ws/position` with a shared secret. +2. Sends JSON frames of types: + - `telemetry`: parsed into `TelemetrySnapshot`, upserted into `live_snapshots`, persisted to `telemetry_events`, and broadcast to browser clients. + - `spawn`: parsed into `SpawnEvent`, inserted into `spawn_events`. + - `rare`: increments `rare_stats` and `rare_stats_sessions` via upsert operations. + - `chat`: broadcast to browser clients without DB writes. +3. **Browser** connects to `/ws/live` to receive live updates and can send commands to plugins. + +## HTTP Query Endpoints +- **GET /live**: returns recent snapshots (last 30s) plus rare counts per character. +- **GET /history**: returns ordered telemetry history with optional time filters. +- **GET /trails**: returns positional trails for a lookback window. + +This architecture enables real-time telemetry ingestion, historical time-series analysis, and an interactive front-end map for tracking players and spawn events. \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 14b89196..e9f3354a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,7 @@ WORKDIR /app # Upgrade pip and install Python dependencies RUN python -m pip install --upgrade pip && \ - pip install --no-cache-dir fastapi uvicorn pydantic websockets databases[postgresql] sqlalchemy alembic + pip install --no-cache-dir fastapi uvicorn pydantic websockets databases[postgresql] sqlalchemy alembic psycopg2-binary # Copy application code COPY static/ /app/static/ diff --git a/EVENT_FORMATS.json b/EVENT_FORMATS.json new file mode 100644 index 00000000..8bbb18b3 --- /dev/null +++ b/EVENT_FORMATS.json @@ -0,0 +1,45 @@ +{ + "_comment": "These are individual example payloads keyed by event type. Send each payload separately over the WS connection.", + "telemetry": { + "type": "telemetry", + "character_name": "string", + "session_id": "string", + "timestamp": "2025-04-22T13:45:00Z", + "ew": 123.4, + "ns": 567.8, + "z": 10.2, + "kills": 42, + "kills_per_hour": 7.0, + "onlinetime": "00.05:00", + "deaths": 1, + "prismatic_taper_count": 17, + "vt_state": "Combat", + "mem_mb": 256.5, + "cpu_pct": 12.3, + "mem_handles": 1024 + }, + "spawn": { + "type": "spawn", + "timestamp": "2025-04-22T13:46:00Z", + "character_name": "MyCharacter", + "mob": "Forest Troll", + "ew": 100.1, + "ns": 200.2 + }, + "chat": { + "type": "chat", + "timestamp": "2025-04-22T13:47:00Z", + "character_name": "MyCharacter", + "text": "Hello world!", + "color": "#88FF00" + }, + "rare": { + "type": "rare", + "timestamp": "2025-04-22T13:48:00Z", + "character_name": "MyCharacter", + "mob": "Golden Gryphon", + "ew": 150.5, + "ns": 350.7, + "z": 5.0 + } +} \ No newline at end of file diff --git a/README.md b/README.md index cd92b6fd..73a1036a 100644 --- a/README.md +++ b/README.md @@ -144,13 +144,27 @@ After connecting, send JSON messages matching the `TelemetrySnapshot` schema. Fo "z": 10.2, "kills": 42, "deaths": 1, - "rares_found": 2, "prismatic_taper_count": 17, "vt_state": "Combat", "kills_per_hour": "N/A", "onlinetime": "00:05:00" -} + } ``` + + Each message above is sent as its own JSON object over the WebSocket (one frame per event). When you want to report a rare spawn, send a standalone `rare` event instead of embedding rare counts in telemetry. For example: + + ```json + { + "type": "rare", + "timestamp": "2025-04-22T13:48:00Z", + "character_name": "MyCharacter", + "mob": "Golden Gryphon", + "ew": 150.5, + "ns": 350.7, + "z": 5.0, + "additional_info": "first sighting of the day" + } + ``` ### Chat messages You can also send chat envelopes over the same WebSocket to display messages in the browser. Fields: @@ -177,6 +191,10 @@ For a complete reference of JSON payloads accepted by the backend (over `/ws/pos - **Chat events** (`type`: "chat") - **Rare events** (`type`: "rare") +Notes on payload changes: + - Spawn events no longer require the `z` coordinate; if omitted, the server defaults it to 0.0. + - Telemetry events have removed the `latency_ms` field; please omit it from your payloads. + Each entry shows all required and optional fields, their types, and example values. ### GET /live diff --git a/db_async.py b/db_async.py index 68e403ee..e319065b 100644 --- a/db_async.py +++ b/db_async.py @@ -1,7 +1,7 @@ import os import sqlalchemy from databases import Database -from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime +from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime, text # Environment: Postgres/TimescaleDB connection URL DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:password@localhost:5432/dereth") @@ -36,6 +36,7 @@ telemetry_events = Table( Column("latency_ms", Float, nullable=True), ) +# Persistent kill statistics per character # Persistent kill statistics per character char_stats = Table( "char_stats", @@ -44,14 +45,52 @@ char_stats = Table( Column("total_kills", Integer, nullable=False, default=0), ) +# Rare event tracking: total and per-session counts +rare_stats = Table( + "rare_stats", + metadata, + Column("character_name", String, primary_key=True), + Column("total_rares", Integer, nullable=False, default=0), +) + +rare_stats_sessions = Table( + "rare_stats_sessions", + metadata, + Column("character_name", String, primary_key=True), + Column("session_id", String, primary_key=True), + Column("session_rares", Integer, nullable=False, default=0), +) +# Spawn events: record mob spawns for heatmapping +spawn_events = Table( + "spawn_events", + metadata, + Column("id", Integer, primary_key=True), + Column("character_name", String, nullable=False), + Column("mob", String, nullable=False), + Column("timestamp", DateTime(timezone=True), nullable=False, index=True), + Column("ew", Float, nullable=False), + Column("ns", Float, nullable=False), + Column("z", Float, nullable=False), +) + async def init_db_async(): """Create tables and enable TimescaleDB hypertable for telemetry_events.""" # Create tables in Postgres engine = sqlalchemy.create_engine(DATABASE_URL) metadata.create_all(engine) # Enable TimescaleDB extension and convert telemetry_events to hypertable - with engine.connect() as conn: - conn.execute("CREATE EXTENSION IF NOT EXISTS timescaledb;") - conn.execute( - "SELECT create_hypertable('telemetry_events', 'timestamp', if_not_exists => true);" - ) \ No newline at end of file + # Use a transactional context to ensure DDL statements are committed + with engine.begin() as conn: + # Enable TimescaleDB extension (may already exist) + try: + conn.execute(text("CREATE EXTENSION IF NOT EXISTS timescaledb")) + except Exception as e: + print(f"Warning: failed to create extension timescaledb: {e}") + # Create hypertable for telemetry_events, skip default indexes to avoid collisions + try: + conn.execute(text( + "SELECT create_hypertable('telemetry_events', 'timestamp', \ + if_not_exists => true, create_default_indexes => false)" + )) + except Exception as e: + print(f"Warning: failed to create hypertable telemetry_events: {e}") \ No newline at end of file diff --git a/generate_data.py b/generate_data.py index d5eb674f..0d0b0d26 100644 --- a/generate_data.py +++ b/generate_data.py @@ -26,12 +26,14 @@ async def main() -> None: kills_per_hour="kph_str", onlinetime=str(timedelta(seconds=online_time)), deaths=0, - rares_found=0, + # rares_found removed from telemetry payload; tracked via rare events prismatic_taper_count=0, vt_state="test state", ) # wrap in envelope with message type + # Serialize telemetry snapshot without telemetry.kind 'rares_found' payload = snapshot.model_dump() + payload.pop("rares_found", None) payload["type"] = "telemetry" await websocket.send(json.dumps(payload, default=str)) print(f"Sent snapshot: EW={ew:.2f}, NS={ns:.2f}") diff --git a/main.py b/main.py index 03f26a53..55d2399c 100644 --- a/main.py +++ b/main.py @@ -12,9 +12,16 @@ from pydantic import BaseModel from typing import Optional # Async database support -from sqlalchemy import text from sqlalchemy.dialects.postgresql import insert as pg_insert -from db_async import database, telemetry_events, char_stats, init_db_async +from db_async import ( + database, + telemetry_events, + char_stats, + rare_stats, + rare_stats_sessions, + spawn_events, + init_db_async +) import asyncio # ------------------------------------------------------------------ @@ -43,7 +50,8 @@ class TelemetrySnapshot(BaseModel): kills_per_hour: Optional[float] = None onlinetime: Optional[str] = None deaths: int - rares_found: int + # Removed from telemetry payload; always enforced to 0 and tracked via rare events + rares_found: int = 0 prismatic_taper_count: int vt_state: str # Optional telemetry metrics @@ -53,11 +61,31 @@ class TelemetrySnapshot(BaseModel): latency_ms: Optional[float] = None +class SpawnEvent(BaseModel): + character_name: str + mob: str + timestamp: datetime + ew: float + ns: float + z: float = 0.0 + + @app.on_event("startup") async def on_startup(): - # Connect to database and initialize TimescaleDB hypertable - await database.connect() - await init_db_async() + # Retry connecting to database on startup to handle DB readiness delays + max_attempts = 5 + for attempt in range(1, max_attempts + 1): + try: + await database.connect() + await init_db_async() + print(f"DB connected on attempt {attempt}") + break + except Exception as e: + print(f"DB connection failed (attempt {attempt}/{max_attempts}): {e}") + if attempt < max_attempts: + await asyncio.sleep(5) + else: + raise RuntimeError(f"Could not connect to database after {max_attempts} attempts") @app.on_event("shutdown") async def on_shutdown(): @@ -77,19 +105,27 @@ def debug(): async def get_live_players(): """Return recent live telemetry per character (last 30 seconds).""" cutoff = datetime.now(timezone.utc) - ACTIVE_WINDOW - query = text( - """ - SELECT * FROM ( + # Include rare counts: total and session-specific + sql = """ + SELECT sub.*, + COALESCE(rs.total_rares, 0) AS total_rares, + COALESCE(rss.session_rares, 0) AS session_rares + FROM ( SELECT DISTINCT ON (character_name) * FROM telemetry_events ORDER BY character_name, timestamp DESC ) sub - WHERE timestamp > :cutoff - """ - ) - rows = await database.fetch_all(query, {"cutoff": cutoff}) + LEFT JOIN rare_stats rs + ON sub.character_name = rs.character_name + LEFT JOIN rare_stats_sessions rss + ON sub.character_name = rss.character_name + AND sub.session_id = rss.session_id + WHERE sub.timestamp > :cutoff + """ + rows = await database.fetch_all(sql, {"cutoff": cutoff}) players = [dict(r) for r in rows] - return JSONResponse(content={"players": players}) + # Ensure all types (e.g. datetime) are JSON serializable + return JSONResponse(content=jsonable_encoder({"players": players})) @app.get("/history/") @@ -114,7 +150,7 @@ async def get_history( if conditions: sql += " WHERE " + " AND ".join(conditions) sql += " ORDER BY timestamp" - rows = await database.fetch_all(text(sql), values) + rows = await database.fetch_all(sql, values) data = [ { "timestamp": row["timestamp"], @@ -124,7 +160,8 @@ async def get_history( } for row in rows ] - return JSONResponse(content={"data": data}) + # Ensure all types (e.g. datetime) are JSON serializable + return JSONResponse(content=jsonable_encoder({"data": data})) # --- GET Trails --------------------------------- @@ -135,14 +172,12 @@ async def get_trails( ): """Return position snapshots (timestamp, character_name, ew, ns, z) for the past `seconds`.""" cutoff = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=seconds) - sql = text( - """ + sql = """ SELECT timestamp, character_name, ew, ns, z FROM telemetry_events WHERE timestamp >= :cutoff ORDER BY character_name, timestamp - """ - ) + """ rows = await database.fetch_all(sql, {"cutoff": cutoff}) trails = [ { @@ -154,7 +189,8 @@ async def get_trails( } for r in rows ] - return JSONResponse(content={"trails": trails}) + # Ensure all types (e.g. datetime) are JSON serializable + return JSONResponse(content=jsonable_encoder({"trails": trails})) # -------------------- WebSocket endpoints ----------------------- browser_conns: set[WebSocket] = set() @@ -207,16 +243,30 @@ async def ws_receive_snapshots( if isinstance(name, str): plugin_conns[name] = websocket continue + # Spawn event: persist spawn for heatmaps + if msg_type == "spawn": + payload = data.copy() + payload.pop("type", None) + try: + spawn = SpawnEvent.parse_obj(payload) + except Exception: + continue + await database.execute( + spawn_events.insert().values(**spawn.dict()) + ) + continue # Telemetry message: save to DB and broadcast if msg_type == "telemetry": - # Parse and broadcast telemetry snapshot + # Parse telemetry snapshot and update in-memory state payload = data.copy() payload.pop("type", None) snap = TelemetrySnapshot.parse_obj(payload) live_snapshots[snap.character_name] = snap.dict() - # Persist to TimescaleDB + # Persist snapshot to TimescaleDB, force rares_found=0 + db_data = snap.dict() + db_data['rares_found'] = 0 await database.execute( - telemetry_events.insert().values(**snap.dict()) + telemetry_events.insert().values(**db_data) ) # Update persistent kill stats (delta per session) key = (snap.session_id, snap.character_name) @@ -232,8 +282,35 @@ async def ws_receive_snapshots( ) await database.execute(stmt) ws_receive_snapshots._last_kills[key] = snap.kills + # Broadcast to browser clients await _broadcast_to_browser_clients(snap.dict()) continue + # Rare event: increment total and session counts + if msg_type == "rare": + name = data.get("character_name") + if isinstance(name, str): + # Total rare count per character + stmt_tot = pg_insert(rare_stats).values( + character_name=name, + total_rares=1 + ).on_conflict_do_update( + index_elements=["character_name"], + set_={"total_rares": rare_stats.c.total_rares + 1}, + ) + await database.execute(stmt_tot) + # Session-specific rare count + session_id = live_snapshots.get(name, {}).get("session_id") + if session_id: + stmt_sess = pg_insert(rare_stats_sessions).values( + character_name=name, + session_id=session_id, + session_rares=1 + ).on_conflict_do_update( + index_elements=["character_name", "session_id"], + set_={"session_rares": rare_stats_sessions.c.session_rares + 1}, + ) + await database.execute(stmt_sess) + continue # Chat message: broadcast to browser clients only (no DB write) if msg_type == "chat": await _broadcast_to_browser_clients(data) diff --git a/static/script.js b/static/script.js index 499c3855..e158f4bf 100644 --- a/static/script.js +++ b/static/script.js @@ -81,8 +81,8 @@ const sortOptions = [ }, { value: "rares", - label: "Rares ↓", - comparator: (a, b) => (b.rares_found || 0) - (a.rares_found || 0) + label: "Session Rares ↓", + comparator: (a, b) => (b.session_rares || 0) - (a.session_rares || 0) } ]; @@ -251,7 +251,7 @@ function render(players) { ${loc(p.ns, p.ew)} ${p.kills} ${p.kills_per_hour} - ${p.rares_found} + ${p.session_rares}/${p.total_rares} ${p.vt_state} ${p.onlinetime} ${p.deaths}