MosswartOverlord/go-services/tracker-go/live.go
Erik 1af47520c0 feat(go-services): tracker-go Phase 0/1 — /live + /trails read parity
Parallel Go reimplementation of the dereth-tracker read side, deployed
loopback-only (:8770) and reading the dereth TimescaleDB read-only. The live
Python stack is untouched (added via a compose override, not by editing the
tracked docker-compose.yml).

- Phase 0 scaffold: stdlib net/http server (Go 1.22+ method+path routing),
  /health + /api-version, multi-stage distroless Docker build, and
  go-services/docker-compose.go.yml override (loopback :8770).
- Phase 1: pgx v5 pool forced into read-only transactions, a 5s /live + /trails
  cache loop using the exact main.py:837 SQL, and Python-isoformat timestamps
  so output matches FastAPI's jsonable_encoder.
- compare/compare_live.py: parity harness vs the live Python service. Uses the
  server-stamped received_at to prove same-row full-field equality and to make
  the online-set diff boundary-aware.

Verified on live traffic (73 players): identical online set + 23-key schema,
identity/type parity for all, every same-row pair matches on every field, and
diff-row pairs differ only by the ~6s two-cache refresh skew.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-24 09:24:22 +02:00

150 lines
4.5 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
// Timing constants mirror main.py.
const (
activeWindow = 30 * time.Second // ACTIVE_WINDOW — the real "online" test
chunkLookback = 10 * time.Minute // coarse bound, only so TimescaleDB can prune chunks
trailsWindow = 600 * time.Second // /trails lookback (hardcoded; the `seconds` param is ignored)
cacheInterval = 5 * time.Second // _refresh_cache_loop cadence
)
// liveSQL mirrors main.py:837 exactly. $1 = chunk_cutoff (now-10min), $2 = cutoff (now-30s).
// Online-ness is decided on COALESCE(received_at, timestamp) — server receive-time — because
// game clients' clocks drift up to ~90s and would otherwise flap the player count.
const liveSQL = `
SELECT sub.*,
COALESCE(rs.total_rares, 0) AS total_rares,
COALESCE(rss.session_rares, 0) AS session_rares,
COALESCE(cs.total_kills, 0) AS total_kills
FROM (
SELECT DISTINCT ON (character_name) *
FROM telemetry_events
WHERE timestamp > $1
AND COALESCE(received_at, timestamp) > $2
ORDER BY character_name, timestamp DESC
) sub
LEFT JOIN rare_stats rs ON sub.character_name = rs.character_name
LEFT JOIN rare_stats_sessions rss ON sub.character_name = rss.character_name
AND sub.session_id = rss.session_id
LEFT JOIN char_stats cs ON sub.character_name = cs.character_name`
// trailsSQL mirrors main.py:874 — last 600s of position points, ordered for the map.
const trailsSQL = `
SELECT timestamp, character_name, ew, ns, z
FROM telemetry_events
WHERE timestamp >= $1
ORDER BY character_name, timestamp`
// liveCache holds the pre-marshaled JSON bodies for /live and /trails, swapped
// atomically every cacheInterval by the refresh loop.
type liveCache struct {
mu sync.RWMutex
liveJSON []byte
trailsJSON []byte
}
func newLiveCache() *liveCache {
return &liveCache{
liveJSON: []byte(`{"players":[]}`),
trailsJSON: []byte(`{"trails":[]}`),
}
}
func (c *liveCache) getLive() []byte {
c.mu.RLock()
defer c.mu.RUnlock()
return c.liveJSON
}
func (c *liveCache) getTrails() []byte {
c.mu.RLock()
defer c.mu.RUnlock()
return c.trailsJSON
}
func (c *liveCache) set(live, trails []byte) {
c.mu.Lock()
defer c.mu.Unlock()
c.liveJSON = live
c.trailsJSON = trails
}
// refresh recomputes both caches from the DB. Both queries use the SAME `now`
// so the online window and trails window are consistent within a tick.
func (s *Server) refreshLiveCache(ctx context.Context) error {
qctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
now := time.Now().UTC()
players, err := queryRowsAsMaps(qctx, s.pool, liveSQL, now.Add(-chunkLookback), now.Add(-activeWindow))
if err != nil {
return fmt.Errorf("live query: %w", err)
}
formatTimes(players, "timestamp", "received_at")
liveJSON, err := json.Marshal(map[string]any{"players": players})
if err != nil {
return fmt.Errorf("marshal live: %w", err)
}
trails, err := queryRowsAsMaps(qctx, s.pool, trailsSQL, now.Add(-trailsWindow))
if err != nil {
return fmt.Errorf("trails query: %w", err)
}
formatTimes(trails, "timestamp")
trailsJSON, err := json.Marshal(map[string]any{"trails": trails})
if err != nil {
return fmt.Errorf("marshal trails: %w", err)
}
s.cache.set(liveJSON, trailsJSON)
return nil
}
// runCacheLoop refreshes the cache every cacheInterval until ctx is cancelled.
// It refreshes immediately on entry (refresh-then-sleep) so the cache is warm
// shortly after startup. pgxpool handles reconnection transparently, so we just
// log failures and keep serving the last good snapshot.
func (s *Server) runCacheLoop(ctx context.Context) {
failures := 0
for {
if err := s.refreshLiveCache(ctx); err != nil {
failures++
s.log.Error("live cache refresh failed", "err", err, "consecutive", failures)
} else {
if failures > 0 {
s.log.Info("live cache refresh recovered", "after_failures", failures)
}
failures = 0
}
select {
case <-ctx.Done():
return
case <-time.After(cacheInterval):
}
}
}
func (s *Server) handleLive(w http.ResponseWriter, r *http.Request) {
writeRawJSON(w, s.cache.getLive())
}
func (s *Server) handleTrails(w http.ResponseWriter, r *http.Request) {
// `seconds` query param is accepted but ignored, matching main.py:2001.
writeRawJSON(w, s.cache.getTrails())
}
func writeRawJSON(w http.ResponseWriter, body []byte) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(body)
}