from datetime import datetime, timedelta, timezone import json import sqlite3 from typing import Dict from fastapi import FastAPI, Header, HTTPException, Query, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse from fastapi.routing import APIRoute from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import Optional from db import init_db, save_snapshot, DB_FILE import asyncio from starlette.concurrency import run_in_threadpool # ------------------------------------------------------------------ app = FastAPI() # In-memory store of the last packet per character live_snapshots: Dict[str, dict] = {} SHARED_SECRET = "your_shared_secret" # LOG_FILE = "telemetry_log.jsonl" # ------------------------------------------------------------------ ACTIVE_WINDOW = timedelta(seconds=30) # player is “online” if seen in last 30 s class TelemetrySnapshot(BaseModel): character_name: str char_tag: str session_id: str timestamp: datetime ew: float # +E / –W ns: float # +N / –S z: float kills: int kills_per_hour: Optional[str] = None # now optional onlinetime: Optional[str] = None # now optional deaths: int rares_found: int prismatic_taper_count: int vt_state: str @app.on_event("startup") def on_startup(): init_db() # ------------------------ POST ---------------------------------- @app.post("/position") @app.post("/position/") async def receive_snapshot( snapshot: TelemetrySnapshot, x_plugin_secret: str = Header(None) ): if x_plugin_secret != SHARED_SECRET: raise HTTPException(status_code=401, detail="Unauthorized") # cache for /live live_snapshots[snapshot.character_name] = snapshot.dict() # save in sqlite save_snapshot(snapshot.dict()) # optional log-file append # with open(LOG_FILE, "a") as f: # f.write(json.dumps(snapshot.dict(), default=str) + "\n") print( f"[{datetime.now()}] {snapshot.character_name} @ NS={snapshot.ns:+.2f}, EW={snapshot.ew:+.2f}" ) return {"status": "ok"} # ------------------------ GET ----------------------------------- @app.get("/debug") def debug(): return {"status": "OK"} @app.get("/live") @app.get("/live/") def get_live_players(): conn = sqlite3.connect(DB_FILE) conn.row_factory = sqlite3.Row rows = conn.execute("SELECT * FROM live_state").fetchall() conn.close() # aware cutoff (UTC) cutoff = datetime.utcnow().replace(tzinfo=timezone.utc) - ACTIVE_WINDOW players = [ dict(r) for r in rows if datetime.fromisoformat(r["timestamp"].replace("Z", "+00:00")) > cutoff ] return JSONResponse(content={"players": players}) @app.get("/history/") @app.get("/history") def get_history( from_ts: str | None = Query(None, alias="from"), to_ts: str | None = Query(None, alias="to"), ): """ Returns a time‐ordered list of telemetry snapshots: - timestamp: ISO8601 string - character_name: str - kills: cumulative kill count (int) - kph: kills_per_hour (float) """ conn = sqlite3.connect(DB_FILE) conn.row_factory = sqlite3.Row # Build the base query sql = """ SELECT timestamp, character_name, kills, CAST(kills_per_hour AS REAL) AS kph FROM telemetry_log """ params: list[str] = [] conditions: list[str] = [] # Add optional filters if from_ts: conditions.append("timestamp >= ?") params.append(from_ts) if to_ts: conditions.append("timestamp <= ?") params.append(to_ts) if conditions: sql += " WHERE " + " AND ".join(conditions) sql += " ORDER BY timestamp" rows = conn.execute(sql, params).fetchall() conn.close() data = [ { "timestamp": row["timestamp"], "character_name": row["character_name"], "kills": row["kills"], "kph": row["kph"], } for row in rows ] return JSONResponse(content={"data": data}) # ------------------------ GET Trails --------------------------------- @app.get("/trails") @app.get("/trails/") def get_trails( seconds: int = Query(600, ge=0, description="Lookback window in seconds") ): """ Return position snapshots (timestamp, character_name, ew, ns, z) for the past `seconds` seconds. """ # match the same string format as stored timestamps (via str(datetime)) cutoff_dt = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta( seconds=seconds ) cutoff = str(cutoff_dt) conn = sqlite3.connect(DB_FILE) conn.row_factory = sqlite3.Row rows = conn.execute( """ SELECT timestamp, character_name, ew, ns, z FROM telemetry_log WHERE timestamp >= ? ORDER BY character_name, timestamp """, (cutoff,), ).fetchall() conn.close() trails = [ { "timestamp": r["timestamp"], "character_name": r["character_name"], "ew": r["ew"], "ns": r["ns"], "z": r["z"], } for r in rows ] return JSONResponse(content={"trails": trails}) # -------------------- WebSocket endpoints ----------------------- browser_conns: set[WebSocket] = set() async def _broadcast_to_browser_clients(snapshot: dict): for ws in list(browser_conns): try: await ws.send_json(snapshot) except WebSocketDisconnect: browser_conns.remove(ws) @app.websocket("/ws/position") async def ws_receive_snapshots(websocket: WebSocket, secret: str = Query(...)): await websocket.accept() if secret != SHARED_SECRET: await websocket.close(code=1008) return try: while True: data = await websocket.receive_json() snap = TelemetrySnapshot.parse_obj(data) live_snapshots[snap.character_name] = snap.dict() await run_in_threadpool(save_snapshot, snap.dict()) await _broadcast_to_browser_clients(snap.dict()) except WebSocketDisconnect: pass @app.websocket("/ws/live") async def ws_live_updates(websocket: WebSocket): await websocket.accept() browser_conns.add(websocket) try: while True: await asyncio.sleep(3600) except WebSocketDisconnect: browser_conns.remove(websocket) # -------------------- static frontend --------------------------- app.mount("/", StaticFiles(directory="static", html=True), name="static") # list routes for convenience print("🔍 Registered routes:") for route in app.routes: if isinstance(route, APIRoute): print(f"{route.path} -> {route.methods}")