fixed CPU logging from db
This commit is contained in:
parent
add24e5c9d
commit
4de85b8db4
2 changed files with 27 additions and 59 deletions
20
db_async.py
20
db_async.py
|
|
@ -143,13 +143,17 @@ async def init_db_async():
|
||||||
))
|
))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Warning: failed to create composite index ix_telemetry_events_char_ts: {e}")
|
print(f"Warning: failed to create composite index ix_telemetry_events_char_ts: {e}")
|
||||||
# Disable parallel workers at the system level to avoid OOMs from large parallel scans
|
# Add retention and compression policies on the hypertable
|
||||||
try:
|
try:
|
||||||
# Apply settings outside transaction for ALTER SYSTEM
|
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
|
||||||
conn2 = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
|
# Retain only recent data (default 7 days or override via DB_RETENTION_DAYS)
|
||||||
conn2.execute(text("ALTER SYSTEM SET max_parallel_workers_per_gather = 0"))
|
days = int(os.getenv('DB_RETENTION_DAYS', '7'))
|
||||||
conn2.execute(text("ALTER SYSTEM SET max_parallel_workers = 0"))
|
conn.execute(text(
|
||||||
conn2.execute(text("SELECT pg_reload_conf()"))
|
f"SELECT add_retention_policy('telemetry_events', INTERVAL '{days} days')"
|
||||||
conn2.close()
|
))
|
||||||
|
# Compress chunks older than 1 day
|
||||||
|
conn.execute(text(
|
||||||
|
"SELECT add_compression_policy('telemetry_events', INTERVAL '1 day')"
|
||||||
|
))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Warning: failed to disable parallel workers: {e}")
|
print(f"Warning: failed to set retention/compression policies: {e}")
|
||||||
66
main.py
66
main.py
|
|
@ -153,6 +153,7 @@ async def get_live_players():
|
||||||
FROM (
|
FROM (
|
||||||
SELECT DISTINCT ON (character_name) *
|
SELECT DISTINCT ON (character_name) *
|
||||||
FROM telemetry_events
|
FROM telemetry_events
|
||||||
|
WHERE timestamp > :cutoff
|
||||||
ORDER BY character_name, timestamp DESC
|
ORDER BY character_name, timestamp DESC
|
||||||
) sub
|
) sub
|
||||||
LEFT JOIN rare_stats rs
|
LEFT JOIN rare_stats rs
|
||||||
|
|
@ -160,7 +161,6 @@ async def get_live_players():
|
||||||
LEFT JOIN rare_stats_sessions rss
|
LEFT JOIN rare_stats_sessions rss
|
||||||
ON sub.character_name = rss.character_name
|
ON sub.character_name = rss.character_name
|
||||||
AND sub.session_id = rss.session_id
|
AND sub.session_id = rss.session_id
|
||||||
WHERE sub.timestamp > :cutoff
|
|
||||||
"""
|
"""
|
||||||
rows = await database.fetch_all(sql, {"cutoff": cutoff})
|
rows = await database.fetch_all(sql, {"cutoff": cutoff})
|
||||||
players = [dict(r) for r in rows]
|
players = [dict(r) for r in rows]
|
||||||
|
|
@ -168,43 +168,6 @@ async def get_live_players():
|
||||||
return JSONResponse(content=jsonable_encoder({"players": players}))
|
return JSONResponse(content=jsonable_encoder({"players": players}))
|
||||||
|
|
||||||
|
|
||||||
@app.get("/history/")
|
|
||||||
@app.get("/history")
|
|
||||||
async def get_history(
|
|
||||||
from_ts: str | None = Query(None, alias="from"),
|
|
||||||
to_ts: str | None = Query(None, alias="to"),
|
|
||||||
):
|
|
||||||
"""Returns a time-ordered list of telemetry snapshots."""
|
|
||||||
# Base SQL query: fetch timestamp, character_name, kills, kills_per_hour (as kph)
|
|
||||||
sql = (
|
|
||||||
"SELECT timestamp, character_name, kills, kills_per_hour AS kph "
|
|
||||||
"FROM telemetry_events"
|
|
||||||
)
|
|
||||||
values: dict = {}
|
|
||||||
conditions: list[str] = []
|
|
||||||
# Apply filters if time bounds provided via 'from' and 'to' query parameters
|
|
||||||
if from_ts:
|
|
||||||
conditions.append("timestamp >= :from_ts")
|
|
||||||
values["from_ts"] = from_ts
|
|
||||||
if to_ts:
|
|
||||||
conditions.append("timestamp <= :to_ts")
|
|
||||||
values["to_ts"] = to_ts
|
|
||||||
# Concatenate WHERE clauses dynamically based on provided filters
|
|
||||||
if conditions:
|
|
||||||
sql += " WHERE " + " AND ".join(conditions)
|
|
||||||
sql += " ORDER BY timestamp"
|
|
||||||
rows = await database.fetch_all(sql, values)
|
|
||||||
data = [
|
|
||||||
{
|
|
||||||
"timestamp": row["timestamp"],
|
|
||||||
"character_name": row["character_name"],
|
|
||||||
"kills": row["kills"],
|
|
||||||
"kph": row["kph"],
|
|
||||||
}
|
|
||||||
for row in rows
|
|
||||||
]
|
|
||||||
# Ensure all types (e.g. datetime) are JSON serializable
|
|
||||||
return JSONResponse(content=jsonable_encoder({"data": data}))
|
|
||||||
|
|
||||||
|
|
||||||
# --- GET Trails ---------------------------------
|
# --- GET Trails ---------------------------------
|
||||||
|
|
@ -322,25 +285,26 @@ async def ws_receive_snapshots(
|
||||||
payload.pop("type", None)
|
payload.pop("type", None)
|
||||||
snap = TelemetrySnapshot.parse_obj(payload)
|
snap = TelemetrySnapshot.parse_obj(payload)
|
||||||
live_snapshots[snap.character_name] = snap.dict()
|
live_snapshots[snap.character_name] = snap.dict()
|
||||||
# Persist snapshot to TimescaleDB, force rares_found=0
|
# Prepare data and compute kill delta
|
||||||
db_data = snap.dict()
|
db_data = snap.dict()
|
||||||
db_data['rares_found'] = 0
|
db_data['rares_found'] = 0
|
||||||
await database.execute(
|
|
||||||
telemetry_events.insert().values(**db_data)
|
|
||||||
)
|
|
||||||
# Update persistent kill stats (delta per session)
|
|
||||||
key = (snap.session_id, snap.character_name)
|
key = (snap.session_id, snap.character_name)
|
||||||
last = ws_receive_snapshots._last_kills.get(key, 0)
|
last = ws_receive_snapshots._last_kills.get(key, 0)
|
||||||
delta = snap.kills - last
|
delta = snap.kills - last
|
||||||
if delta > 0:
|
# Persist snapshot and any kill delta in a single transaction
|
||||||
stmt = pg_insert(char_stats).values(
|
async with database.transaction():
|
||||||
character_name=snap.character_name,
|
await database.execute(
|
||||||
total_kills=delta
|
telemetry_events.insert().values(**db_data)
|
||||||
).on_conflict_do_update(
|
|
||||||
index_elements=["character_name"],
|
|
||||||
set_={"total_kills": char_stats.c.total_kills + delta},
|
|
||||||
)
|
)
|
||||||
await database.execute(stmt)
|
if delta > 0:
|
||||||
|
stmt = pg_insert(char_stats).values(
|
||||||
|
character_name=snap.character_name,
|
||||||
|
total_kills=delta
|
||||||
|
).on_conflict_do_update(
|
||||||
|
index_elements=["character_name"],
|
||||||
|
set_={"total_kills": char_stats.c.total_kills + delta},
|
||||||
|
)
|
||||||
|
await database.execute(stmt)
|
||||||
ws_receive_snapshots._last_kills[key] = snap.kills
|
ws_receive_snapshots._last_kills[key] = snap.kills
|
||||||
# Broadcast updated snapshot to all browser clients
|
# Broadcast updated snapshot to all browser clients
|
||||||
await _broadcast_to_browser_clients(snap.dict())
|
await _broadcast_to_browser_clients(snap.dict())
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue