diff --git a/db_async.py b/db_async.py index d401ec09..fecee594 100644 --- a/db_async.py +++ b/db_async.py @@ -143,13 +143,17 @@ async def init_db_async(): )) except Exception as e: print(f"Warning: failed to create composite index ix_telemetry_events_char_ts: {e}") - # Disable parallel workers at the system level to avoid OOMs from large parallel scans + # Add retention and compression policies on the hypertable try: - # Apply settings outside transaction for ALTER SYSTEM - conn2 = engine.connect().execution_options(isolation_level="AUTOCOMMIT") - conn2.execute(text("ALTER SYSTEM SET max_parallel_workers_per_gather = 0")) - conn2.execute(text("ALTER SYSTEM SET max_parallel_workers = 0")) - conn2.execute(text("SELECT pg_reload_conf()")) - conn2.close() + with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: + # Retain only recent data (default 7 days or override via DB_RETENTION_DAYS) + days = int(os.getenv('DB_RETENTION_DAYS', '7')) + conn.execute(text( + f"SELECT add_retention_policy('telemetry_events', INTERVAL '{days} days')" + )) + # Compress chunks older than 1 day + conn.execute(text( + "SELECT add_compression_policy('telemetry_events', INTERVAL '1 day')" + )) except Exception as e: - print(f"Warning: failed to disable parallel workers: {e}") \ No newline at end of file + print(f"Warning: failed to set retention/compression policies: {e}") \ No newline at end of file diff --git a/main.py b/main.py index d2ccd29b..49925606 100644 --- a/main.py +++ b/main.py @@ -153,6 +153,7 @@ async def get_live_players(): FROM ( SELECT DISTINCT ON (character_name) * FROM telemetry_events + WHERE timestamp > :cutoff ORDER BY character_name, timestamp DESC ) sub LEFT JOIN rare_stats rs @@ -160,7 +161,6 @@ async def get_live_players(): LEFT JOIN rare_stats_sessions rss ON sub.character_name = rss.character_name AND sub.session_id = rss.session_id - WHERE sub.timestamp > :cutoff """ rows = await database.fetch_all(sql, {"cutoff": cutoff}) players = [dict(r) for r in rows] @@ -168,43 +168,6 @@ async def get_live_players(): return JSONResponse(content=jsonable_encoder({"players": players})) -@app.get("/history/") -@app.get("/history") -async def get_history( - from_ts: str | None = Query(None, alias="from"), - to_ts: str | None = Query(None, alias="to"), -): - """Returns a time-ordered list of telemetry snapshots.""" - # Base SQL query: fetch timestamp, character_name, kills, kills_per_hour (as kph) - sql = ( - "SELECT timestamp, character_name, kills, kills_per_hour AS kph " - "FROM telemetry_events" - ) - values: dict = {} - conditions: list[str] = [] - # Apply filters if time bounds provided via 'from' and 'to' query parameters - if from_ts: - conditions.append("timestamp >= :from_ts") - values["from_ts"] = from_ts - if to_ts: - conditions.append("timestamp <= :to_ts") - values["to_ts"] = to_ts - # Concatenate WHERE clauses dynamically based on provided filters - if conditions: - sql += " WHERE " + " AND ".join(conditions) - sql += " ORDER BY timestamp" - rows = await database.fetch_all(sql, values) - data = [ - { - "timestamp": row["timestamp"], - "character_name": row["character_name"], - "kills": row["kills"], - "kph": row["kph"], - } - for row in rows - ] - # Ensure all types (e.g. datetime) are JSON serializable - return JSONResponse(content=jsonable_encoder({"data": data})) # --- GET Trails --------------------------------- @@ -322,25 +285,26 @@ async def ws_receive_snapshots( payload.pop("type", None) snap = TelemetrySnapshot.parse_obj(payload) live_snapshots[snap.character_name] = snap.dict() - # Persist snapshot to TimescaleDB, force rares_found=0 + # Prepare data and compute kill delta db_data = snap.dict() db_data['rares_found'] = 0 - await database.execute( - telemetry_events.insert().values(**db_data) - ) - # Update persistent kill stats (delta per session) key = (snap.session_id, snap.character_name) last = ws_receive_snapshots._last_kills.get(key, 0) delta = snap.kills - last - if delta > 0: - stmt = pg_insert(char_stats).values( - character_name=snap.character_name, - total_kills=delta - ).on_conflict_do_update( - index_elements=["character_name"], - set_={"total_kills": char_stats.c.total_kills + delta}, + # Persist snapshot and any kill delta in a single transaction + async with database.transaction(): + await database.execute( + telemetry_events.insert().values(**db_data) ) - await database.execute(stmt) + if delta > 0: + stmt = pg_insert(char_stats).values( + character_name=snap.character_name, + total_kills=delta + ).on_conflict_do_update( + index_elements=["character_name"], + set_={"total_kills": char_stats.c.total_kills + delta}, + ) + await database.execute(stmt) ws_receive_snapshots._last_kills[key] = snap.kills # Broadcast updated snapshot to all browser clients await _broadcast_to_browser_clients(snap.dict())