162 lines
4.3 KiB
Python
162 lines
4.3 KiB
Python
from datetime import datetime, timedelta, timezone
|
||
import json
|
||
import sqlite3
|
||
from typing import Dict
|
||
|
||
from fastapi import FastAPI, Header, HTTPException, Query
|
||
from fastapi.responses import JSONResponse
|
||
from fastapi.routing import APIRoute
|
||
from fastapi.staticfiles import StaticFiles
|
||
from pydantic import BaseModel
|
||
from typing import Optional
|
||
|
||
from db import init_db, save_snapshot, DB_FILE
|
||
|
||
# ------------------------------------------------------------------
|
||
app = FastAPI()
|
||
|
||
# In-memory store of the last packet per character
|
||
live_snapshots: Dict[str, dict] = {}
|
||
|
||
SHARED_SECRET = "your_shared_secret"
|
||
#LOG_FILE = "telemetry_log.jsonl"
|
||
# ------------------------------------------------------------------
|
||
ACTIVE_WINDOW = timedelta(seconds=30) # player is “online” if seen in last 30 s
|
||
|
||
class TelemetrySnapshot(BaseModel):
|
||
character_name: str
|
||
char_tag: str
|
||
session_id: str
|
||
timestamp: datetime
|
||
|
||
ew: float # +E / –W
|
||
ns: float # +N / –S
|
||
z: float
|
||
|
||
kills: int
|
||
kills_per_hour: Optional[str] = None # now optional
|
||
onlinetime: Optional[str] = None # now optional
|
||
deaths: int
|
||
rares_found: int
|
||
prismatic_taper_count: int
|
||
vt_state: str
|
||
|
||
|
||
@app.on_event("startup")
|
||
def on_startup():
|
||
init_db()
|
||
|
||
|
||
# ------------------------ POST ----------------------------------
|
||
@app.post("/position")
|
||
@app.post("/position/")
|
||
async def receive_snapshot(
|
||
snapshot: TelemetrySnapshot,
|
||
x_plugin_secret: str = Header(None)
|
||
):
|
||
if x_plugin_secret != SHARED_SECRET:
|
||
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
||
# cache for /live
|
||
live_snapshots[snapshot.character_name] = snapshot.dict()
|
||
|
||
# save in sqlite
|
||
save_snapshot(snapshot.dict())
|
||
|
||
# optional log-file append
|
||
#with open(LOG_FILE, "a") as f:
|
||
# f.write(json.dumps(snapshot.dict(), default=str) + "\n")
|
||
|
||
print(f"[{datetime.now()}] {snapshot.character_name} @ NS={snapshot.ns:+.2f}, EW={snapshot.ew:+.2f}")
|
||
|
||
return {"status": "ok"}
|
||
|
||
|
||
# ------------------------ GET -----------------------------------
|
||
@app.get("/debug")
|
||
def debug():
|
||
return {"status": "OK"}
|
||
|
||
|
||
@app.get("/live")
|
||
@app.get("/live/")
|
||
def get_live_players():
|
||
conn = sqlite3.connect(DB_FILE)
|
||
conn.row_factory = sqlite3.Row
|
||
rows = conn.execute("SELECT * FROM live_state").fetchall()
|
||
conn.close()
|
||
|
||
# aware cutoff (UTC)
|
||
cutoff = datetime.utcnow().replace(tzinfo=timezone.utc) - ACTIVE_WINDOW
|
||
|
||
players = [
|
||
dict(r) for r in rows
|
||
if datetime.fromisoformat(
|
||
r["timestamp"].replace('Z', '+00:00')
|
||
) > cutoff
|
||
]
|
||
|
||
return JSONResponse(content={"players": players})
|
||
@app.get("/history/")
|
||
@app.get("/history")
|
||
def get_history(
|
||
from_ts: str | None = Query(None, alias="from"),
|
||
to_ts: str | None = Query(None, alias="to"),
|
||
):
|
||
"""
|
||
Returns a time‐ordered list of telemetry snapshots:
|
||
- timestamp: ISO8601 string
|
||
- character_name: str
|
||
- kills: cumulative kill count (int)
|
||
- kph: kills_per_hour (float)
|
||
"""
|
||
conn = sqlite3.connect(DB_FILE)
|
||
conn.row_factory = sqlite3.Row
|
||
|
||
# Build the base query
|
||
sql = """
|
||
SELECT
|
||
timestamp,
|
||
character_name,
|
||
kills,
|
||
CAST(kills_per_hour AS REAL) AS kph
|
||
FROM telemetry_log
|
||
"""
|
||
params: list[str] = []
|
||
conditions: list[str] = []
|
||
|
||
# Add optional filters
|
||
if from_ts:
|
||
conditions.append("timestamp >= ?")
|
||
params.append(from_ts)
|
||
if to_ts:
|
||
conditions.append("timestamp <= ?")
|
||
params.append(to_ts)
|
||
if conditions:
|
||
sql += " WHERE " + " AND ".join(conditions)
|
||
|
||
sql += " ORDER BY timestamp"
|
||
|
||
rows = conn.execute(sql, params).fetchall()
|
||
conn.close()
|
||
|
||
data = [
|
||
{
|
||
"timestamp": row["timestamp"],
|
||
"character_name":row["character_name"],
|
||
"kills": row["kills"],
|
||
"kph": row["kph"],
|
||
}
|
||
for row in rows
|
||
]
|
||
return JSONResponse(content={"data": data})
|
||
|
||
|
||
# -------------------- static frontend ---------------------------
|
||
app.mount("/", StaticFiles(directory="static", html=True), name="static")
|
||
|
||
# list routes for convenience
|
||
print("🔍 Registered routes:")
|
||
for route in app.routes:
|
||
if isinstance(route, APIRoute):
|
||
print(f"{route.path} -> {route.methods}")
|