Parallel Go reimplementation of the dereth-tracker read side, deployed loopback-only (:8770) and reading the dereth TimescaleDB read-only. The live Python stack is untouched (added via a compose override, not by editing the tracked docker-compose.yml). - Phase 0 scaffold: stdlib net/http server (Go 1.22+ method+path routing), /health + /api-version, multi-stage distroless Docker build, and go-services/docker-compose.go.yml override (loopback :8770). - Phase 1: pgx v5 pool forced into read-only transactions, a 5s /live + /trails cache loop using the exact main.py:837 SQL, and Python-isoformat timestamps so output matches FastAPI's jsonable_encoder. - compare/compare_live.py: parity harness vs the live Python service. Uses the server-stamped received_at to prove same-row full-field equality and to make the online-set diff boundary-aware. Verified on live traffic (73 players): identical online set + 23-key schema, identity/type parity for all, every same-row pair matches on every field, and diff-row pairs differ only by the ~6s two-cache refresh skew. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
150 lines
4.5 KiB
Go
150 lines
4.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Timing constants mirror main.py.
|
|
const (
|
|
activeWindow = 30 * time.Second // ACTIVE_WINDOW — the real "online" test
|
|
chunkLookback = 10 * time.Minute // coarse bound, only so TimescaleDB can prune chunks
|
|
trailsWindow = 600 * time.Second // /trails lookback (hardcoded; the `seconds` param is ignored)
|
|
cacheInterval = 5 * time.Second // _refresh_cache_loop cadence
|
|
)
|
|
|
|
// liveSQL mirrors main.py:837 exactly. $1 = chunk_cutoff (now-10min), $2 = cutoff (now-30s).
|
|
// Online-ness is decided on COALESCE(received_at, timestamp) — server receive-time — because
|
|
// game clients' clocks drift up to ~90s and would otherwise flap the player count.
|
|
const liveSQL = `
|
|
SELECT sub.*,
|
|
COALESCE(rs.total_rares, 0) AS total_rares,
|
|
COALESCE(rss.session_rares, 0) AS session_rares,
|
|
COALESCE(cs.total_kills, 0) AS total_kills
|
|
FROM (
|
|
SELECT DISTINCT ON (character_name) *
|
|
FROM telemetry_events
|
|
WHERE timestamp > $1
|
|
AND COALESCE(received_at, timestamp) > $2
|
|
ORDER BY character_name, timestamp DESC
|
|
) sub
|
|
LEFT JOIN rare_stats rs ON sub.character_name = rs.character_name
|
|
LEFT JOIN rare_stats_sessions rss ON sub.character_name = rss.character_name
|
|
AND sub.session_id = rss.session_id
|
|
LEFT JOIN char_stats cs ON sub.character_name = cs.character_name`
|
|
|
|
// trailsSQL mirrors main.py:874 — last 600s of position points, ordered for the map.
|
|
const trailsSQL = `
|
|
SELECT timestamp, character_name, ew, ns, z
|
|
FROM telemetry_events
|
|
WHERE timestamp >= $1
|
|
ORDER BY character_name, timestamp`
|
|
|
|
// liveCache holds the pre-marshaled JSON bodies for /live and /trails, swapped
|
|
// atomically every cacheInterval by the refresh loop.
|
|
type liveCache struct {
|
|
mu sync.RWMutex
|
|
liveJSON []byte
|
|
trailsJSON []byte
|
|
}
|
|
|
|
func newLiveCache() *liveCache {
|
|
return &liveCache{
|
|
liveJSON: []byte(`{"players":[]}`),
|
|
trailsJSON: []byte(`{"trails":[]}`),
|
|
}
|
|
}
|
|
|
|
func (c *liveCache) getLive() []byte {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.liveJSON
|
|
}
|
|
|
|
func (c *liveCache) getTrails() []byte {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.trailsJSON
|
|
}
|
|
|
|
func (c *liveCache) set(live, trails []byte) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.liveJSON = live
|
|
c.trailsJSON = trails
|
|
}
|
|
|
|
// refresh recomputes both caches from the DB. Both queries use the SAME `now`
|
|
// so the online window and trails window are consistent within a tick.
|
|
func (s *Server) refreshLiveCache(ctx context.Context) error {
|
|
qctx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
|
defer cancel()
|
|
|
|
now := time.Now().UTC()
|
|
|
|
players, err := queryRowsAsMaps(qctx, s.pool, liveSQL, now.Add(-chunkLookback), now.Add(-activeWindow))
|
|
if err != nil {
|
|
return fmt.Errorf("live query: %w", err)
|
|
}
|
|
formatTimes(players, "timestamp", "received_at")
|
|
liveJSON, err := json.Marshal(map[string]any{"players": players})
|
|
if err != nil {
|
|
return fmt.Errorf("marshal live: %w", err)
|
|
}
|
|
|
|
trails, err := queryRowsAsMaps(qctx, s.pool, trailsSQL, now.Add(-trailsWindow))
|
|
if err != nil {
|
|
return fmt.Errorf("trails query: %w", err)
|
|
}
|
|
formatTimes(trails, "timestamp")
|
|
trailsJSON, err := json.Marshal(map[string]any{"trails": trails})
|
|
if err != nil {
|
|
return fmt.Errorf("marshal trails: %w", err)
|
|
}
|
|
|
|
s.cache.set(liveJSON, trailsJSON)
|
|
return nil
|
|
}
|
|
|
|
// runCacheLoop refreshes the cache every cacheInterval until ctx is cancelled.
|
|
// It refreshes immediately on entry (refresh-then-sleep) so the cache is warm
|
|
// shortly after startup. pgxpool handles reconnection transparently, so we just
|
|
// log failures and keep serving the last good snapshot.
|
|
func (s *Server) runCacheLoop(ctx context.Context) {
|
|
failures := 0
|
|
for {
|
|
if err := s.refreshLiveCache(ctx); err != nil {
|
|
failures++
|
|
s.log.Error("live cache refresh failed", "err", err, "consecutive", failures)
|
|
} else {
|
|
if failures > 0 {
|
|
s.log.Info("live cache refresh recovered", "after_failures", failures)
|
|
}
|
|
failures = 0
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(cacheInterval):
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleLive(w http.ResponseWriter, r *http.Request) {
|
|
writeRawJSON(w, s.cache.getLive())
|
|
}
|
|
|
|
func (s *Server) handleTrails(w http.ResponseWriter, r *http.Request) {
|
|
// `seconds` query param is accepted but ignored, matching main.py:2001.
|
|
writeRawJSON(w, s.cache.getTrails())
|
|
}
|
|
|
|
func writeRawJSON(w http.ResponseWriter, body []byte) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write(body)
|
|
}
|