Completes the Go backend so it can fully replace Python in production: tracker-go website layer (serves the unchanged frontend): - static file serving + SPA fallback + /icons (website.go) - login/logout with itsdangerous cookie ISSUING (bcrypt, Python-interop) and the /me handler (auth.go issueSessionCookie + website.go) - admin user CRUD (website_admin.go) and the issue-board write side (website_issues.go) - request-scoped user context + requireAdmin (auth.go) cutover ingest (gated off during the parallel run, required for a clean cutover): - inventory forwarding: full_inventory -> /process-inventory, inventory_delta -> item POST/DELETE, per-character serialized, fire-and-forget (inventory_forward.go) - death/idle Discord alerts via DISCORD_ACLOG_WEBHOOK (aclog.go) - SKIP_SCHEMA_INIT so write mode against the prod DBs runs no DDL (tracker-go + inventory-go) two bugs found live and fixed: - coerceNum: the plugin sends kills_per_hour/deaths/total_deaths/prismatic_taper_count as STRINGS; pydantic coerced them, Go's number helpers wrote null/0 (reads.go/ingest.go) - telemetry is broadcast TYPELESS so the browser ignores it and uses the /live poll; broadcasting it typed flapped the per-player counters 0<->value (ingest.go stripType) docker-compose.cutover.yml: reversible override flipping the Go services to write mode against the production DBs and repointing the Discord bot at the Go /ws/live. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
337 lines
12 KiB
Go
337 lines
12 KiB
Go
// Command tracker-go is a Go reimplementation of the MosswartOverlord
|
|
// "dereth-tracker" backend, deployed in parallel with the live Python service
|
|
// for side-by-side comparison (strangler-fig migration).
|
|
//
|
|
// Phase 1: read-side parity. Connects READ-ONLY to the existing dereth
|
|
// TimescaleDB and reimplements the HTTP read API, starting with the /live and
|
|
// /trails caches (the 5s _refresh_cache_loop). It never touches anything the
|
|
// Python service writes.
|
|
//
|
|
// Routes are declared WITHOUT the nginx-stripped "/go/" prefix, mirroring the
|
|
// Python service's "no /api/ prefix" convention. nginx's `location /go/` strips
|
|
// the prefix before proxying to this service on 127.0.0.1:8770.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"log/slog"
|
|
"net/http"
|
|
"net/http/httputil"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
// buildVersion is injected at build time via -ldflags "-X main.buildVersion=...".
|
|
// Mirrors the Python service's APP_VERSION / "/api-version" stamp.
|
|
var buildVersion = "dev"
|
|
|
|
// Server holds the shared dependencies for HTTP handlers.
|
|
type Server struct {
|
|
pool *pgxpool.Pool
|
|
cache *liveCache
|
|
totals *totalsCache
|
|
invProxy *httputil.ReverseProxy
|
|
staticDir string
|
|
secretKey string
|
|
sharedSecret string
|
|
sharedSecretLegacy string
|
|
ingestor *Ingestor // non-nil only in ingest/shadow mode
|
|
hub *Hub // browser /ws/live fan-out
|
|
plugins *pluginRegistry
|
|
loginLimiter *loginLimiter
|
|
log *slog.Logger
|
|
}
|
|
|
|
func main() {
|
|
// `tracker-go combat-merge` reads a JSON array of cumulative session
|
|
// snapshots from stdin and prints the folded lifetime — a deterministic hook
|
|
// for cross-language parity testing against the Python combat functions.
|
|
if len(os.Args) > 1 && os.Args[1] == "combat-merge" {
|
|
runCombatMergeCLI()
|
|
return
|
|
}
|
|
// `tracker-go issue-cookie <username> <is_admin> <secret_key>` prints a
|
|
// session token — a hook to cross-check itsdangerous cookie interop with the
|
|
// Python service.
|
|
if len(os.Args) > 1 && os.Args[1] == "issue-cookie" {
|
|
runIssueCookieCLI()
|
|
return
|
|
}
|
|
|
|
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
|
|
slog.SetDefault(logger)
|
|
|
|
cfg := loadConfig()
|
|
logger.Info("starting tracker-go", "version", buildVersion, "addr", cfg.Addr)
|
|
|
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
srv := &Server{
|
|
cache: newLiveCache(),
|
|
totals: newTotalsCache(),
|
|
loginLimiter: newLoginLimiter(),
|
|
staticDir: cfg.StaticDir,
|
|
secretKey: cfg.SecretKey,
|
|
sharedSecret: cfg.SharedSecret,
|
|
sharedSecretLegacy: cfg.SharedSecretLegacy,
|
|
hub: newHub(),
|
|
plugins: newPluginRegistry(logger),
|
|
log: logger,
|
|
}
|
|
if cfg.SecretKey == "" {
|
|
// Fail closed like the Python service: with no key, no external cookie
|
|
// can verify, so only internal-trust (loopback/compose) requests pass.
|
|
logger.Warn("SECRET_KEY unset — external (nginx-proxied) requests will all be rejected")
|
|
}
|
|
|
|
// Inventory-service reverse proxy (independent of the DB).
|
|
if err := srv.initInvProxy(cfg.InventoryURL); err != nil {
|
|
logger.Error("inventory proxy init failed", "err", err, "target", cfg.InventoryURL)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Connect to the dereth DB (read-only). If DATABASE_URL is unset we still
|
|
// serve health/version (Phase-0 mode) so the container is observable.
|
|
if cfg.DatabaseURL == "" {
|
|
logger.Warn("DATABASE_URL unset — running without DB; DB-backed endpoints will be empty")
|
|
} else {
|
|
connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
|
pool, err := newPool(connectCtx, cfg.DatabaseURL, cfg.ReadOnly)
|
|
cancel()
|
|
if err != nil {
|
|
logger.Error("db pool init failed", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
defer pool.Close()
|
|
srv.pool = pool
|
|
|
|
// Write mode (shadow OR cutover) owns the ingest path; read-only mode
|
|
// (parallel read API) skips all of this.
|
|
if !cfg.ReadOnly {
|
|
// Schema init only when we own a fresh DB. In cutover (reusing the
|
|
// production DB) SKIP_SCHEMA_INIT keeps us from running ANY DDL.
|
|
if !cfg.SkipSchemaInit {
|
|
schemaCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
|
initSchema(schemaCtx, pool, logger)
|
|
cancel()
|
|
}
|
|
|
|
srv.ingestor = newIngestor(pool, logger, srv.hub.broadcast, srv.plugins)
|
|
|
|
if cfg.IngestWS != "" {
|
|
// Shadow: replay the Python /ws/live firehose. Inventory forwarding
|
|
// + Discord alerts stay OFF (would double production writes/alerts;
|
|
// inventory isn't in the firehose anyway).
|
|
go srv.runShadowConsumer(ctx, cfg.IngestWS)
|
|
logger.Info("shadow ingest enabled", "source", cfg.IngestWS)
|
|
} else {
|
|
// Cutover: the real plugin connects to /ws/position. Forward
|
|
// inventory to the inventory service and post death/idle alerts.
|
|
srv.ingestor.invFwd = newInvForwarder(cfg.InventoryURL, logger, srv.hub.broadcast)
|
|
if cfg.DiscordACLog != "" {
|
|
srv.ingestor.aclog = newACLogPoster(cfg.DiscordACLog, logger)
|
|
go srv.ingestor.aclog.runIdleLoop(ctx, pool)
|
|
}
|
|
logger.Info("cutover ingest enabled", "inventory_url", cfg.InventoryURL, "aclog", cfg.DiscordACLog != "")
|
|
}
|
|
} else if cfg.IngestWS != "" {
|
|
logger.Error("SHADOW_INGEST_WS set but READ_ONLY=true; refusing to ingest into the production DB")
|
|
os.Exit(1)
|
|
}
|
|
|
|
go srv.runCacheLoop(ctx)
|
|
go srv.runTotalsLoop(ctx)
|
|
logger.Info("db connected; cache loops started",
|
|
"read_only", cfg.ReadOnly, "live_interval", cacheInterval.String(), "totals_interval", totalsInterval.String())
|
|
}
|
|
|
|
mux := http.NewServeMux()
|
|
srv.registerRoutes(mux)
|
|
|
|
httpSrv := &http.Server{
|
|
Addr: cfg.Addr,
|
|
Handler: withRequestLogging(srv.authMiddleware(mux)),
|
|
ReadHeaderTimeout: 10 * time.Second,
|
|
}
|
|
|
|
go func() {
|
|
if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
logger.Error("http server failed", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
}()
|
|
logger.Info("listening", "addr", cfg.Addr)
|
|
|
|
<-ctx.Done()
|
|
logger.Info("shutdown signal received, draining")
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
if err := httpSrv.Shutdown(shutdownCtx); err != nil {
|
|
logger.Error("graceful shutdown failed", "err", err)
|
|
}
|
|
logger.Info("stopped")
|
|
}
|
|
|
|
// config holds runtime configuration sourced from environment variables,
|
|
// matching the Python service's env var names where they overlap.
|
|
type config struct {
|
|
Addr string // listen address, e.g. ":8770"
|
|
DatabaseURL string // dereth TimescaleDB DSN
|
|
ReadOnly bool // true = read-side parity (force read-only txns); false = ingest/shadow (owns its DB)
|
|
InventoryURL string // inventory-service base URL
|
|
StaticDir string // directory for static assets / openissues.json
|
|
SecretKey string // session-cookie signing key (must match the Python service)
|
|
SharedSecret string // plugin /ws/position auth
|
|
SharedSecretLegacy string // plugin auth rotation fallback
|
|
IngestWS string // optional: a /ws/live URL to shadow-ingest from (Python tracker)
|
|
SkipSchemaInit bool // cutover: trust the existing prod schema, run no DDL
|
|
DiscordACLog string // #aclog webhook for death/idle alerts (cutover only)
|
|
}
|
|
|
|
func loadConfig() config {
|
|
return config{
|
|
Addr: ":" + envOr("PORT", "8770"),
|
|
DatabaseURL: os.Getenv("DATABASE_URL"),
|
|
ReadOnly: envOr("READ_ONLY", "true") != "false",
|
|
InventoryURL: envOr("INVENTORY_SERVICE_URL", "http://inventory-service:8000"),
|
|
StaticDir: envOr("STATIC_DIR", "static"),
|
|
SecretKey: os.Getenv("SECRET_KEY"),
|
|
SharedSecret: os.Getenv("SHARED_SECRET"),
|
|
SharedSecretLegacy: os.Getenv("SHARED_SECRET_LEGACY"),
|
|
IngestWS: os.Getenv("SHADOW_INGEST_WS"),
|
|
SkipSchemaInit: envOr("SKIP_SCHEMA_INIT", "false") == "true",
|
|
DiscordACLog: os.Getenv("DISCORD_ACLOG_WEBHOOK"),
|
|
}
|
|
}
|
|
|
|
func envOr(key, def string) string {
|
|
if v := os.Getenv(key); v != "" {
|
|
return v
|
|
}
|
|
return def
|
|
}
|
|
|
|
func (s *Server) registerRoutes(mux *http.ServeMux) {
|
|
mux.HandleFunc("GET /health", s.handleHealth)
|
|
// Mirrors Python's GET /api-version (hyphenated so nginx never strips it).
|
|
mux.HandleFunc("GET /api-version", s.handleVersion)
|
|
|
|
// Phase 1 read-side: the 5s caches.
|
|
mux.HandleFunc("GET /live", s.handleLive)
|
|
mux.HandleFunc("GET /live/", s.handleLive)
|
|
mux.HandleFunc("GET /trails", s.handleTrails)
|
|
mux.HandleFunc("GET /trails/", s.handleTrails)
|
|
|
|
// Totals (5-minute caches).
|
|
mux.HandleFunc("GET /total-rares", s.handleTotalRares)
|
|
mux.HandleFunc("GET /total-rares/", s.handleTotalRares)
|
|
mux.HandleFunc("GET /total-kills", s.handleTotalKills)
|
|
mux.HandleFunc("GET /total-kills/", s.handleTotalKills)
|
|
|
|
// Per-character & aggregate DB reads.
|
|
mux.HandleFunc("GET /stats/{character_name}", s.handleStats)
|
|
mux.HandleFunc("GET /portals", s.handlePortals)
|
|
mux.HandleFunc("GET /spawns/heatmap", s.handleSpawnHeatmap)
|
|
mux.HandleFunc("GET /server-health", s.handleServerHealth)
|
|
mux.HandleFunc("GET /character-stats/{name}", s.handleCharacterStats)
|
|
mux.HandleFunc("GET /combat-stats", s.handleCombatStatsAll)
|
|
mux.HandleFunc("GET /combat-stats/{character_name}", s.handleCombatStatsOne)
|
|
mux.HandleFunc("GET /inventories", s.handleInventories)
|
|
mux.HandleFunc("GET /inventory/{character_name}/search", s.handleInventorySearch)
|
|
|
|
// Ingest-only state (empty/default in Phase 1).
|
|
mux.HandleFunc("GET /quest-status", s.handleQuestStatus)
|
|
mux.HandleFunc("GET /vital-sharing/peers", s.handleVitalSharingPeers)
|
|
mux.HandleFunc("GET /equipment-cantrip-state/{name}", s.handleEquipmentCantrip)
|
|
mux.HandleFunc("GET /issues", s.handleIssues)
|
|
mux.HandleFunc("GET /me", s.handleMe)
|
|
|
|
// WebSocket servers (cutover-ready): browser fan-out + plugin ingest.
|
|
mux.HandleFunc("GET /ws/live", s.handleWSLive)
|
|
mux.HandleFunc("GET /ws/position", s.handleWSPosition)
|
|
|
|
// Inventory-service reverse proxies.
|
|
s.registerProxyRoutes(mux)
|
|
|
|
// Website layer: login/logout + icons + static frontend (cutover).
|
|
mux.HandleFunc("GET /login", s.handleLoginGet)
|
|
mux.HandleFunc("POST /login", s.handleLoginPost)
|
|
mux.HandleFunc("GET /logout", s.handleLogout)
|
|
mux.HandleFunc("GET /icons/{filename}", s.handleIcon)
|
|
|
|
// Admin user management.
|
|
mux.HandleFunc("GET /admin/users", s.handleAdminPage)
|
|
mux.HandleFunc("GET /api-admin/users", s.handleListUsers)
|
|
mux.HandleFunc("POST /api-admin/users", s.handleCreateUser)
|
|
mux.HandleFunc("PATCH /api-admin/users/{user_id}", s.handleUpdateUser)
|
|
mux.HandleFunc("DELETE /api-admin/users/{user_id}", s.handleDeleteUser)
|
|
|
|
// Issue board write side (GET /issues is registered above).
|
|
mux.HandleFunc("POST /issues", s.handleAddIssue)
|
|
mux.HandleFunc("PATCH /issues/{issue_id}", s.handleUpdateIssue)
|
|
mux.HandleFunc("POST /issues/{issue_id}/comments", s.handleAddComment)
|
|
mux.HandleFunc("DELETE /issues/{issue_id}", s.handleDeleteIssue)
|
|
// Catch-all: serve the static frontend (SPA). Registered last; every
|
|
// specific route above is more specific, so this only handles the rest.
|
|
mux.HandleFunc("GET /", s.handleStatic)
|
|
}
|
|
|
|
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"status": "ok",
|
|
"service": "tracker-go",
|
|
"version": buildVersion,
|
|
"db": s.pool != nil,
|
|
})
|
|
}
|
|
|
|
func (s *Server) handleVersion(w http.ResponseWriter, r *http.Request) {
|
|
writeJSON(w, http.StatusOK, map[string]any{"version": buildVersion})
|
|
}
|
|
|
|
func writeJSON(w http.ResponseWriter, status int, v any) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(status)
|
|
if err := json.NewEncoder(w).Encode(v); err != nil {
|
|
slog.Error("json encode failed", "err", err)
|
|
}
|
|
}
|
|
|
|
// withRequestLogging is a thin access-log middleware.
|
|
func withRequestLogging(next http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
start := time.Now()
|
|
sr := &statusRecorder{ResponseWriter: w, status: http.StatusOK}
|
|
next.ServeHTTP(sr, r)
|
|
slog.Info("http",
|
|
"method", r.Method,
|
|
"path", r.URL.Path,
|
|
"status", sr.status,
|
|
"dur_ms", time.Since(start).Milliseconds(),
|
|
)
|
|
})
|
|
}
|
|
|
|
type statusRecorder struct {
|
|
http.ResponseWriter
|
|
status int
|
|
}
|
|
|
|
func (s *statusRecorder) WriteHeader(code int) {
|
|
s.status = code
|
|
s.ResponseWriter.WriteHeader(code)
|
|
}
|
|
|
|
// Unwrap lets http.ResponseController (used by coder/websocket to hijack the
|
|
// connection for /ws upgrades) reach the underlying ResponseWriter through this
|
|
// logging wrapper. Without it, WebSocket handshakes fail.
|
|
func (s *statusRecorder) Unwrap() http.ResponseWriter {
|
|
return s.ResponseWriter
|
|
}
|