feat(go-services): tracker-go Phase 0/1 — /live + /trails read parity
Parallel Go reimplementation of the dereth-tracker read side, deployed loopback-only (:8770) and reading the dereth TimescaleDB read-only. The live Python stack is untouched (added via a compose override, not by editing the tracked docker-compose.yml). - Phase 0 scaffold: stdlib net/http server (Go 1.22+ method+path routing), /health + /api-version, multi-stage distroless Docker build, and go-services/docker-compose.go.yml override (loopback :8770). - Phase 1: pgx v5 pool forced into read-only transactions, a 5s /live + /trails cache loop using the exact main.py:837 SQL, and Python-isoformat timestamps so output matches FastAPI's jsonable_encoder. - compare/compare_live.py: parity harness vs the live Python service. Uses the server-stamped received_at to prove same-row full-field equality and to make the online-set diff boundary-aware. Verified on live traffic (73 players): identical online set + 23-key schema, identity/type parity for all, every same-row pair matches on every field, and diff-row pairs differ only by the ~6s two-cache refresh skew. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
b8fd449d62
commit
1af47520c0
7 changed files with 691 additions and 0 deletions
223
go-services/compare/compare_live.py
Normal file
223
go-services/compare/compare_live.py
Normal file
|
|
@ -0,0 +1,223 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Compare the Go tracker's /live (and /trails) against the live Python service.
|
||||||
|
|
||||||
|
Run on the server (or anywhere with loopback access to both):
|
||||||
|
python3 compare_live.py # default loopback ports
|
||||||
|
python3 compare_live.py --py http://127.0.0.1:8765 --go http://127.0.0.1:8770
|
||||||
|
|
||||||
|
Parity strategy for a live firehose
|
||||||
|
-----------------------------------
|
||||||
|
The two services rebuild their /live cache independently every 5s, so an
|
||||||
|
actively-updating character can legitimately show a newer telemetry row in one
|
||||||
|
than the other. We separate "is this a real divergence?" from "is this just
|
||||||
|
cache timing?" using the server-stamped received_at:
|
||||||
|
|
||||||
|
* SAME ROW (py.received_at == go.received_at): both rendered the *same*
|
||||||
|
telemetry_events row -> every field MUST match (numbers within epsilon,
|
||||||
|
timestamps compared as instants). This is the rigorous render-parity proof.
|
||||||
|
* DIFFERENT ROW: a newer row arrived between the two refreshes -> we only
|
||||||
|
require identity + key-set + type/null-pattern parity, and report the
|
||||||
|
volatile-field skew (which should be small and recent).
|
||||||
|
|
||||||
|
Exit code 0 if no real parity violations, 1 otherwise.
|
||||||
|
"""
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import urllib.request
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
EPS = 1e-6
|
||||||
|
|
||||||
|
# Fields that identify the entity / join keys — must always match for a player
|
||||||
|
# present in both outputs.
|
||||||
|
IDENTITY = ("character_name", "char_tag", "session_id")
|
||||||
|
# Slowly-changing aggregates — informational when they differ on a same-row pair
|
||||||
|
# (a kill/rare recorded between refreshes can bump these even for the same
|
||||||
|
# telemetry row).
|
||||||
|
AGGREGATES = ("total_kills", "total_rares", "session_rares")
|
||||||
|
TIMESTAMP_FIELDS = ("timestamp", "received_at")
|
||||||
|
|
||||||
|
|
||||||
|
def fetch(base, path):
|
||||||
|
with urllib.request.urlopen(base.rstrip("/") + path, timeout=8) as r:
|
||||||
|
return json.load(r)
|
||||||
|
|
||||||
|
|
||||||
|
def jtype(v):
|
||||||
|
if v is None:
|
||||||
|
return "null"
|
||||||
|
if isinstance(v, bool):
|
||||||
|
return "bool"
|
||||||
|
if isinstance(v, (int, float)):
|
||||||
|
return "num"
|
||||||
|
if isinstance(v, str):
|
||||||
|
return "str"
|
||||||
|
return type(v).__name__
|
||||||
|
|
||||||
|
|
||||||
|
def parse_ts(s):
|
||||||
|
if s is None:
|
||||||
|
return None
|
||||||
|
return datetime.fromisoformat(s.replace("Z", "+00:00"))
|
||||||
|
|
||||||
|
|
||||||
|
def values_equal(key, a, b):
|
||||||
|
"""Semantic equality for a single field value."""
|
||||||
|
if a is None or b is None:
|
||||||
|
return a is b or a == b
|
||||||
|
if key in TIMESTAMP_FIELDS and isinstance(a, str) and isinstance(b, str):
|
||||||
|
return parse_ts(a) == parse_ts(b)
|
||||||
|
an, bn = isinstance(a, (int, float)) and not isinstance(a, bool), isinstance(b, (int, float)) and not isinstance(b, bool)
|
||||||
|
if an and bn:
|
||||||
|
return abs(float(a) - float(b)) <= EPS
|
||||||
|
return a == b
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
ap = argparse.ArgumentParser()
|
||||||
|
ap.add_argument("--py", default="http://127.0.0.1:8765")
|
||||||
|
ap.add_argument("--go", default="http://127.0.0.1:8770")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
py = fetch(args.py, "/live")["players"]
|
||||||
|
go = fetch(args.go, "/live")["players"]
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
pyi = {p["character_name"]: p for p in py}
|
||||||
|
goi = {p["character_name"]: p for p in go}
|
||||||
|
common = sorted(set(pyi) & set(goi))
|
||||||
|
only_py = sorted(set(pyi) - set(goi))
|
||||||
|
only_go = sorted(set(goi) - set(pyi))
|
||||||
|
|
||||||
|
print("=" * 72)
|
||||||
|
print("/live PARITY python(%s) vs go(%s)" % (args.py, args.go))
|
||||||
|
print("=" * 72)
|
||||||
|
print(f"python players : {len(py)}")
|
||||||
|
print(f"go players : {len(go)}")
|
||||||
|
print(f"common : {len(common)}")
|
||||||
|
|
||||||
|
violations = 0
|
||||||
|
|
||||||
|
# --- key-set parity (all players) ---
|
||||||
|
py_keys = set().union(*[set(p) for p in py]) if py else set()
|
||||||
|
go_keys = set().union(*[set(p) for p in go]) if go else set()
|
||||||
|
if py_keys == go_keys:
|
||||||
|
print(f"key set : IDENTICAL ({len(py_keys)} keys)")
|
||||||
|
else:
|
||||||
|
violations += 1
|
||||||
|
print("key set : MISMATCH")
|
||||||
|
print(" only in python:", sorted(py_keys - go_keys))
|
||||||
|
print(" only in go :", sorted(go_keys - py_keys))
|
||||||
|
|
||||||
|
# --- online-set parity (boundary-aware) ---
|
||||||
|
def age(p):
|
||||||
|
ts = parse_ts(p.get("received_at") or p.get("timestamp"))
|
||||||
|
return (now - ts).total_seconds() if ts else None
|
||||||
|
|
||||||
|
print("\n-- online set --")
|
||||||
|
if not only_py and not only_go:
|
||||||
|
print("online set : IDENTICAL")
|
||||||
|
else:
|
||||||
|
# Players near the 30s boundary can flap between the two refreshes.
|
||||||
|
def explain(names, idx):
|
||||||
|
for n in names:
|
||||||
|
a = age(idx[n])
|
||||||
|
tag = "boundary-flap (age %.1fs)" % a if a is not None and 22 <= a <= 38 else "age %s" % (None if a is None else round(a, 1))
|
||||||
|
print(f" only_{('py' if idx is pyi else 'go')}: {n:<20} {tag}")
|
||||||
|
if only_py:
|
||||||
|
print(f"only in python : {len(only_py)}")
|
||||||
|
explain(only_py, pyi)
|
||||||
|
if only_go:
|
||||||
|
print(f"only in go : {len(only_go)}")
|
||||||
|
explain(only_go, goi)
|
||||||
|
unexplained = [n for n in (only_py + only_go)
|
||||||
|
if not (lambda a: a is not None and 22 <= a <= 38)(age((pyi.get(n) or goi.get(n))))]
|
||||||
|
if unexplained:
|
||||||
|
violations += 1
|
||||||
|
print(" UNEXPLAINED set difference (not near 30s boundary):", unexplained)
|
||||||
|
else:
|
||||||
|
print(" (all set differences explained by the 30s online boundary)")
|
||||||
|
|
||||||
|
# --- per-player field parity ---
|
||||||
|
same_row = [] # py.received_at == go.received_at -> must fully match
|
||||||
|
diff_row = [] # newer row arrived between refreshes
|
||||||
|
for n in common:
|
||||||
|
a, b = pyi[n], goi[n]
|
||||||
|
if a.get("received_at") is not None and a.get("received_at") == b.get("received_at"):
|
||||||
|
same_row.append(n)
|
||||||
|
else:
|
||||||
|
diff_row.append(n)
|
||||||
|
|
||||||
|
print("\n-- per-player parity --")
|
||||||
|
print(f"same-row pairs (identical received_at, must fully match): {len(same_row)}")
|
||||||
|
print(f"diff-row pairs (newer telemetry between refreshes) : {len(diff_row)}")
|
||||||
|
|
||||||
|
# Identity + type/null-pattern parity across ALL common players.
|
||||||
|
id_bad = type_bad = 0
|
||||||
|
for n in common:
|
||||||
|
a, b = pyi[n], goi[n]
|
||||||
|
for k in IDENTITY:
|
||||||
|
if a.get(k) != b.get(k):
|
||||||
|
id_bad += 1
|
||||||
|
print(f" IDENTITY mismatch {n}.{k}: py={a.get(k)!r} go={b.get(k)!r}")
|
||||||
|
for k in py_keys:
|
||||||
|
ta, tb = jtype(a.get(k)), jtype(b.get(k))
|
||||||
|
if ta != tb:
|
||||||
|
# null vs num/str is a real null-pattern divergence; num-vs-num
|
||||||
|
# whole-float (0.0) vs int (0) is already unified under "num".
|
||||||
|
type_bad += 1
|
||||||
|
print(f" TYPE mismatch {n}.{k}: py={ta}({a.get(k)!r}) go={tb}({b.get(k)!r})")
|
||||||
|
if id_bad:
|
||||||
|
violations += id_bad
|
||||||
|
if type_bad:
|
||||||
|
violations += type_bad
|
||||||
|
if not id_bad and not type_bad:
|
||||||
|
print("identity+type : IDENTICAL for all common players")
|
||||||
|
|
||||||
|
# Rigorous: same-row pairs must match on every field.
|
||||||
|
sr_full_match = 0
|
||||||
|
for n in same_row:
|
||||||
|
a, b = pyi[n], goi[n]
|
||||||
|
diffs = []
|
||||||
|
for k in py_keys:
|
||||||
|
if not values_equal(k, a.get(k), b.get(k)):
|
||||||
|
diffs.append((k, a.get(k), b.get(k)))
|
||||||
|
if not diffs:
|
||||||
|
sr_full_match += 1
|
||||||
|
else:
|
||||||
|
# Aggregate-only diffs are timing-explainable even on a same row.
|
||||||
|
non_agg = [d for d in diffs if d[0] not in AGGREGATES]
|
||||||
|
if non_agg:
|
||||||
|
violations += 1
|
||||||
|
print(f" SAME-ROW FIELD divergence {n}: " +
|
||||||
|
", ".join(f"{k}: py={pa!r} go={ga!r}" for k, pa, ga in non_agg))
|
||||||
|
else:
|
||||||
|
print(f" (same-row {n}: only aggregate fields differ — kill/rare between refreshes: "
|
||||||
|
+ ", ".join(f"{k} py={pa} go={ga}" for k, pa, ga in diffs) + ")")
|
||||||
|
print(f"same-row full-field matches: {sr_full_match}/{len(same_row)}")
|
||||||
|
|
||||||
|
# Volatile-field skew on diff-row pairs (informational).
|
||||||
|
if diff_row:
|
||||||
|
ts_deltas = []
|
||||||
|
for n in diff_row:
|
||||||
|
da, db = parse_ts(pyi[n].get("timestamp")), parse_ts(goi[n].get("timestamp"))
|
||||||
|
if da and db:
|
||||||
|
ts_deltas.append(abs((da - db).total_seconds()))
|
||||||
|
if ts_deltas:
|
||||||
|
ts_deltas.sort()
|
||||||
|
print(f"diff-row timestamp skew: min={ts_deltas[0]:.1f}s "
|
||||||
|
f"median={ts_deltas[len(ts_deltas)//2]:.1f}s max={ts_deltas[-1]:.1f}s "
|
||||||
|
"(bounded by the two 5s refresh cycles)")
|
||||||
|
|
||||||
|
print("\n" + "=" * 72)
|
||||||
|
if violations == 0:
|
||||||
|
print("RESULT: PARITY OK — no structural or same-row divergences.")
|
||||||
|
else:
|
||||||
|
print(f"RESULT: {violations} PARITY VIOLATION(S) — see above.")
|
||||||
|
print("=" * 72)
|
||||||
|
return 1 if violations else 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
40
go-services/docker-compose.go.yml
Normal file
40
go-services/docker-compose.go.yml
Normal file
|
|
@ -0,0 +1,40 @@
|
||||||
|
# Compose OVERRIDE that adds the Go services alongside the live Python stack.
|
||||||
|
# It only ADDS containers; it never modifies the tracked docker-compose.yml or
|
||||||
|
# any running Python service.
|
||||||
|
#
|
||||||
|
# Invoke from the repo root so the Compose project name resolves to
|
||||||
|
# "mosswartoverlord" (same as the live stack) and the new container joins the
|
||||||
|
# existing default network — letting it reach the `db` service by name:
|
||||||
|
#
|
||||||
|
# cd /home/erik/MosswartOverlord
|
||||||
|
# export BUILD_VERSION="$(date -u +%Y.%-m.%-d.%H%M)-$(git rev-parse --short HEAD)"
|
||||||
|
# docker compose -f docker-compose.yml -f go-services/docker-compose.go.yml \
|
||||||
|
# build dereth-tracker-go
|
||||||
|
# docker compose -f docker-compose.yml -f go-services/docker-compose.go.yml \
|
||||||
|
# up -d --no-deps dereth-tracker-go
|
||||||
|
#
|
||||||
|
# --no-deps keeps Compose from touching the already-running `db` (and anything
|
||||||
|
# else). The service is loopback-bound (127.0.0.1:8770); external reach is only
|
||||||
|
# ever via the host nginx `location /go/` block (added separately).
|
||||||
|
services:
|
||||||
|
dereth-tracker-go:
|
||||||
|
build:
|
||||||
|
context: ./go-services/tracker-go
|
||||||
|
args:
|
||||||
|
BUILD_VERSION: ${BUILD_VERSION:-dev}
|
||||||
|
container_name: dereth-tracker-go
|
||||||
|
ports:
|
||||||
|
- "127.0.0.1:8770:8770"
|
||||||
|
environment:
|
||||||
|
PORT: "8770"
|
||||||
|
# Read-only use of the same dereth TimescaleDB the Python tracker writes.
|
||||||
|
DATABASE_URL: "postgresql://postgres:${POSTGRES_PASSWORD}@db:5432/dereth"
|
||||||
|
LOG_LEVEL: "INFO"
|
||||||
|
depends_on:
|
||||||
|
- db
|
||||||
|
restart: unless-stopped
|
||||||
|
logging:
|
||||||
|
driver: "json-file"
|
||||||
|
options:
|
||||||
|
max-size: "10m"
|
||||||
|
max-file: "3"
|
||||||
20
go-services/tracker-go/Dockerfile
Normal file
20
go-services/tracker-go/Dockerfile
Normal file
|
|
@ -0,0 +1,20 @@
|
||||||
|
# Multi-stage build: compile a static Go binary, ship it on distroless.
|
||||||
|
# No host Go toolchain required — everything happens inside the build stage.
|
||||||
|
FROM golang:1.25-bookworm AS build
|
||||||
|
WORKDIR /src
|
||||||
|
|
||||||
|
# No local Go toolchain is available to maintain go.sum, so resolve and lock
|
||||||
|
# dependencies inside the build (network is available here). `go mod tidy`
|
||||||
|
# reads the imports from the source and writes go.mod/go.sum, then we build.
|
||||||
|
COPY . .
|
||||||
|
RUN go mod tidy
|
||||||
|
ARG BUILD_VERSION=dev
|
||||||
|
RUN CGO_ENABLED=0 GOOS=linux go build \
|
||||||
|
-trimpath \
|
||||||
|
-ldflags "-s -w -X main.buildVersion=${BUILD_VERSION}" \
|
||||||
|
-o /out/tracker-go .
|
||||||
|
|
||||||
|
FROM gcr.io/distroless/static-debian12:nonroot
|
||||||
|
COPY --from=build /out/tracker-go /tracker-go
|
||||||
|
EXPOSE 8770
|
||||||
|
ENTRYPOINT ["/tracker-go"]
|
||||||
5
go-services/tracker-go/go.mod
Normal file
5
go-services/tracker-go/go.mod
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
module git.snakedesert.se/SawatoMosswartsEnjoyersClub/MosswartOverlord/go-services/tracker-go
|
||||||
|
|
||||||
|
go 1.25
|
||||||
|
|
||||||
|
require github.com/jackc/pgx/v5 v5.10.0
|
||||||
150
go-services/tracker-go/live.go
Normal file
150
go-services/tracker-go/live.go
Normal file
|
|
@ -0,0 +1,150 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Timing constants mirror main.py.
|
||||||
|
const (
|
||||||
|
activeWindow = 30 * time.Second // ACTIVE_WINDOW — the real "online" test
|
||||||
|
chunkLookback = 10 * time.Minute // coarse bound, only so TimescaleDB can prune chunks
|
||||||
|
trailsWindow = 600 * time.Second // /trails lookback (hardcoded; the `seconds` param is ignored)
|
||||||
|
cacheInterval = 5 * time.Second // _refresh_cache_loop cadence
|
||||||
|
)
|
||||||
|
|
||||||
|
// liveSQL mirrors main.py:837 exactly. $1 = chunk_cutoff (now-10min), $2 = cutoff (now-30s).
|
||||||
|
// Online-ness is decided on COALESCE(received_at, timestamp) — server receive-time — because
|
||||||
|
// game clients' clocks drift up to ~90s and would otherwise flap the player count.
|
||||||
|
const liveSQL = `
|
||||||
|
SELECT sub.*,
|
||||||
|
COALESCE(rs.total_rares, 0) AS total_rares,
|
||||||
|
COALESCE(rss.session_rares, 0) AS session_rares,
|
||||||
|
COALESCE(cs.total_kills, 0) AS total_kills
|
||||||
|
FROM (
|
||||||
|
SELECT DISTINCT ON (character_name) *
|
||||||
|
FROM telemetry_events
|
||||||
|
WHERE timestamp > $1
|
||||||
|
AND COALESCE(received_at, timestamp) > $2
|
||||||
|
ORDER BY character_name, timestamp DESC
|
||||||
|
) sub
|
||||||
|
LEFT JOIN rare_stats rs ON sub.character_name = rs.character_name
|
||||||
|
LEFT JOIN rare_stats_sessions rss ON sub.character_name = rss.character_name
|
||||||
|
AND sub.session_id = rss.session_id
|
||||||
|
LEFT JOIN char_stats cs ON sub.character_name = cs.character_name`
|
||||||
|
|
||||||
|
// trailsSQL mirrors main.py:874 — last 600s of position points, ordered for the map.
|
||||||
|
const trailsSQL = `
|
||||||
|
SELECT timestamp, character_name, ew, ns, z
|
||||||
|
FROM telemetry_events
|
||||||
|
WHERE timestamp >= $1
|
||||||
|
ORDER BY character_name, timestamp`
|
||||||
|
|
||||||
|
// liveCache holds the pre-marshaled JSON bodies for /live and /trails, swapped
|
||||||
|
// atomically every cacheInterval by the refresh loop.
|
||||||
|
type liveCache struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
liveJSON []byte
|
||||||
|
trailsJSON []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLiveCache() *liveCache {
|
||||||
|
return &liveCache{
|
||||||
|
liveJSON: []byte(`{"players":[]}`),
|
||||||
|
trailsJSON: []byte(`{"trails":[]}`),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *liveCache) getLive() []byte {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
return c.liveJSON
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *liveCache) getTrails() []byte {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
return c.trailsJSON
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *liveCache) set(live, trails []byte) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
c.liveJSON = live
|
||||||
|
c.trailsJSON = trails
|
||||||
|
}
|
||||||
|
|
||||||
|
// refresh recomputes both caches from the DB. Both queries use the SAME `now`
|
||||||
|
// so the online window and trails window are consistent within a tick.
|
||||||
|
func (s *Server) refreshLiveCache(ctx context.Context) error {
|
||||||
|
qctx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
now := time.Now().UTC()
|
||||||
|
|
||||||
|
players, err := queryRowsAsMaps(qctx, s.pool, liveSQL, now.Add(-chunkLookback), now.Add(-activeWindow))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("live query: %w", err)
|
||||||
|
}
|
||||||
|
formatTimes(players, "timestamp", "received_at")
|
||||||
|
liveJSON, err := json.Marshal(map[string]any{"players": players})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal live: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
trails, err := queryRowsAsMaps(qctx, s.pool, trailsSQL, now.Add(-trailsWindow))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("trails query: %w", err)
|
||||||
|
}
|
||||||
|
formatTimes(trails, "timestamp")
|
||||||
|
trailsJSON, err := json.Marshal(map[string]any{"trails": trails})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal trails: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.cache.set(liveJSON, trailsJSON)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// runCacheLoop refreshes the cache every cacheInterval until ctx is cancelled.
|
||||||
|
// It refreshes immediately on entry (refresh-then-sleep) so the cache is warm
|
||||||
|
// shortly after startup. pgxpool handles reconnection transparently, so we just
|
||||||
|
// log failures and keep serving the last good snapshot.
|
||||||
|
func (s *Server) runCacheLoop(ctx context.Context) {
|
||||||
|
failures := 0
|
||||||
|
for {
|
||||||
|
if err := s.refreshLiveCache(ctx); err != nil {
|
||||||
|
failures++
|
||||||
|
s.log.Error("live cache refresh failed", "err", err, "consecutive", failures)
|
||||||
|
} else {
|
||||||
|
if failures > 0 {
|
||||||
|
s.log.Info("live cache refresh recovered", "after_failures", failures)
|
||||||
|
}
|
||||||
|
failures = 0
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(cacheInterval):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleLive(w http.ResponseWriter, r *http.Request) {
|
||||||
|
writeRawJSON(w, s.cache.getLive())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleTrails(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// `seconds` query param is accepted but ignored, matching main.py:2001.
|
||||||
|
writeRawJSON(w, s.cache.getTrails())
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeRawJSON(w http.ResponseWriter, body []byte) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, _ = w.Write(body)
|
||||||
|
}
|
||||||
173
go-services/tracker-go/main.go
Normal file
173
go-services/tracker-go/main.go
Normal file
|
|
@ -0,0 +1,173 @@
|
||||||
|
// Command tracker-go is a Go reimplementation of the MosswartOverlord
|
||||||
|
// "dereth-tracker" backend, deployed in parallel with the live Python service
|
||||||
|
// for side-by-side comparison (strangler-fig migration).
|
||||||
|
//
|
||||||
|
// Phase 1: read-side parity. Connects READ-ONLY to the existing dereth
|
||||||
|
// TimescaleDB and reimplements the HTTP read API, starting with the /live and
|
||||||
|
// /trails caches (the 5s _refresh_cache_loop). It never touches anything the
|
||||||
|
// Python service writes.
|
||||||
|
//
|
||||||
|
// Routes are declared WITHOUT the nginx-stripped "/go/" prefix, mirroring the
|
||||||
|
// Python service's "no /api/ prefix" convention. nginx's `location /go/` strips
|
||||||
|
// the prefix before proxying to this service on 127.0.0.1:8770.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"log/slog"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// buildVersion is injected at build time via -ldflags "-X main.buildVersion=...".
|
||||||
|
// Mirrors the Python service's APP_VERSION / "/api-version" stamp.
|
||||||
|
var buildVersion = "dev"
|
||||||
|
|
||||||
|
// Server holds the shared dependencies for HTTP handlers.
|
||||||
|
type Server struct {
|
||||||
|
pool *pgxpool.Pool
|
||||||
|
cache *liveCache
|
||||||
|
log *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
|
||||||
|
slog.SetDefault(logger)
|
||||||
|
|
||||||
|
cfg := loadConfig()
|
||||||
|
logger.Info("starting tracker-go", "version", buildVersion, "addr", cfg.Addr)
|
||||||
|
|
||||||
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
defer stop()
|
||||||
|
|
||||||
|
srv := &Server{cache: newLiveCache(), log: logger}
|
||||||
|
|
||||||
|
// Connect to the dereth DB (read-only). If DATABASE_URL is unset we still
|
||||||
|
// serve health/version (Phase-0 mode) so the container is observable.
|
||||||
|
if cfg.DatabaseURL == "" {
|
||||||
|
logger.Warn("DATABASE_URL unset — running without DB; /live and /trails will be empty")
|
||||||
|
} else {
|
||||||
|
connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
||||||
|
pool, err := newPool(connectCtx, cfg.DatabaseURL)
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("db pool init failed", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
defer pool.Close()
|
||||||
|
srv.pool = pool
|
||||||
|
go srv.runCacheLoop(ctx)
|
||||||
|
logger.Info("db connected; live cache loop started", "interval", cacheInterval.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
srv.registerRoutes(mux)
|
||||||
|
|
||||||
|
httpSrv := &http.Server{
|
||||||
|
Addr: cfg.Addr,
|
||||||
|
Handler: withRequestLogging(mux),
|
||||||
|
ReadHeaderTimeout: 10 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||||
|
logger.Error("http server failed", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
logger.Info("listening", "addr", cfg.Addr)
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
logger.Info("shutdown signal received, draining")
|
||||||
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := httpSrv.Shutdown(shutdownCtx); err != nil {
|
||||||
|
logger.Error("graceful shutdown failed", "err", err)
|
||||||
|
}
|
||||||
|
logger.Info("stopped")
|
||||||
|
}
|
||||||
|
|
||||||
|
// config holds runtime configuration sourced from environment variables,
|
||||||
|
// matching the Python service's env var names where they overlap.
|
||||||
|
type config struct {
|
||||||
|
Addr string // listen address, e.g. ":8770"
|
||||||
|
DatabaseURL string // dereth TimescaleDB DSN (read-only use)
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadConfig() config {
|
||||||
|
return config{
|
||||||
|
Addr: ":" + envOr("PORT", "8770"),
|
||||||
|
DatabaseURL: os.Getenv("DATABASE_URL"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func envOr(key, def string) string {
|
||||||
|
if v := os.Getenv(key); v != "" {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
return def
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) registerRoutes(mux *http.ServeMux) {
|
||||||
|
mux.HandleFunc("GET /health", s.handleHealth)
|
||||||
|
// Mirrors Python's GET /api-version (hyphenated so nginx never strips it).
|
||||||
|
mux.HandleFunc("GET /api-version", s.handleVersion)
|
||||||
|
// Phase 1 read-side: the 5s caches.
|
||||||
|
mux.HandleFunc("GET /live", s.handleLive)
|
||||||
|
mux.HandleFunc("GET /live/", s.handleLive)
|
||||||
|
mux.HandleFunc("GET /trails", s.handleTrails)
|
||||||
|
mux.HandleFunc("GET /trails/", s.handleTrails)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{
|
||||||
|
"status": "ok",
|
||||||
|
"service": "tracker-go",
|
||||||
|
"version": buildVersion,
|
||||||
|
"db": s.pool != nil,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleVersion(w http.ResponseWriter, r *http.Request) {
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"version": buildVersion})
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeJSON(w http.ResponseWriter, status int, v any) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.WriteHeader(status)
|
||||||
|
if err := json.NewEncoder(w).Encode(v); err != nil {
|
||||||
|
slog.Error("json encode failed", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// withRequestLogging is a thin access-log middleware.
|
||||||
|
func withRequestLogging(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
start := time.Now()
|
||||||
|
sr := &statusRecorder{ResponseWriter: w, status: http.StatusOK}
|
||||||
|
next.ServeHTTP(sr, r)
|
||||||
|
slog.Info("http",
|
||||||
|
"method", r.Method,
|
||||||
|
"path", r.URL.Path,
|
||||||
|
"status", sr.status,
|
||||||
|
"dur_ms", time.Since(start).Milliseconds(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type statusRecorder struct {
|
||||||
|
http.ResponseWriter
|
||||||
|
status int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *statusRecorder) WriteHeader(code int) {
|
||||||
|
s.status = code
|
||||||
|
s.ResponseWriter.WriteHeader(code)
|
||||||
|
}
|
||||||
80
go-services/tracker-go/store.go
Normal file
80
go-services/tracker-go/store.go
Normal file
|
|
@ -0,0 +1,80 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// newPool creates a pgx pool against the dereth TimescaleDB.
|
||||||
|
//
|
||||||
|
// Phase 1 is strictly read-only. As defense-in-depth we force every pooled
|
||||||
|
// connection into read-only transaction mode, so even a buggy or future write
|
||||||
|
// statement cannot mutate the live production data the Python service owns.
|
||||||
|
func newPool(ctx context.Context, dsn string) (*pgxpool.Pool, error) {
|
||||||
|
cfg, err := pgxpool.ParseConfig(dsn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse DATABASE_URL: %w", err)
|
||||||
|
}
|
||||||
|
cfg.MaxConns = 10
|
||||||
|
cfg.MaxConnIdleTime = 5 * time.Minute
|
||||||
|
cfg.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
|
||||||
|
if _, err := conn.Exec(ctx, "SET default_transaction_read_only = on"); err != nil {
|
||||||
|
return fmt.Errorf("set read-only: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
pool, err := pgxpool.NewWithConfig(ctx, cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create pool: %w", err)
|
||||||
|
}
|
||||||
|
return pool, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// queryRowsAsMaps runs a query and returns each row as a column-name->value map,
|
||||||
|
// mirroring how the Python service builds response dicts directly from rows.
|
||||||
|
// A nil result is coerced to an empty (non-nil) slice so JSON encodes "[]".
|
||||||
|
func queryRowsAsMaps(ctx context.Context, pool *pgxpool.Pool, sql string, args ...any) ([]map[string]any, error) {
|
||||||
|
rows, err := pool.Query(ctx, sql, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out, err := pgx.CollectRows(rows, pgx.RowToMap)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if out == nil {
|
||||||
|
out = []map[string]any{}
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// pyISO formats a timestamp the way Python's datetime.isoformat() does for a
|
||||||
|
// UTC tz-aware value, so output matches FastAPI's jsonable_encoder:
|
||||||
|
// - no fractional part when microseconds are zero
|
||||||
|
// - otherwise exactly 6 fractional digits
|
||||||
|
// - "+00:00" offset (not "Z")
|
||||||
|
// Postgres timestamptz has microsecond resolution, so ns is always a multiple
|
||||||
|
// of 1000.
|
||||||
|
func pyISO(t time.Time) string {
|
||||||
|
t = t.UTC()
|
||||||
|
if t.Nanosecond() == 0 {
|
||||||
|
return t.Format("2006-01-02T15:04:05+00:00")
|
||||||
|
}
|
||||||
|
return t.Format("2006-01-02T15:04:05") + fmt.Sprintf(".%06d+00:00", t.Nanosecond()/1000)
|
||||||
|
}
|
||||||
|
|
||||||
|
// formatTimes rewrites the named time.Time columns in-place to pyISO strings.
|
||||||
|
// Missing or NULL (nil) values are left untouched, so they encode as JSON null.
|
||||||
|
func formatTimes(rows []map[string]any, keys ...string) {
|
||||||
|
for _, m := range rows {
|
||||||
|
for _, k := range keys {
|
||||||
|
if t, ok := m[k].(time.Time); ok {
|
||||||
|
m[k] = pyISO(t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue