From 1af47520c0796b96a60e5bc3f6f4808a58aaa987 Mon Sep 17 00:00:00 2001 From: Erik Date: Wed, 24 Jun 2026 09:24:22 +0200 Subject: [PATCH] =?UTF-8?q?feat(go-services):=20tracker-go=20Phase=200/1?= =?UTF-8?q?=20=E2=80=94=20/live=20+=20/trails=20read=20parity?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Parallel Go reimplementation of the dereth-tracker read side, deployed loopback-only (:8770) and reading the dereth TimescaleDB read-only. The live Python stack is untouched (added via a compose override, not by editing the tracked docker-compose.yml). - Phase 0 scaffold: stdlib net/http server (Go 1.22+ method+path routing), /health + /api-version, multi-stage distroless Docker build, and go-services/docker-compose.go.yml override (loopback :8770). - Phase 1: pgx v5 pool forced into read-only transactions, a 5s /live + /trails cache loop using the exact main.py:837 SQL, and Python-isoformat timestamps so output matches FastAPI's jsonable_encoder. - compare/compare_live.py: parity harness vs the live Python service. Uses the server-stamped received_at to prove same-row full-field equality and to make the online-set diff boundary-aware. Verified on live traffic (73 players): identical online set + 23-key schema, identity/type parity for all, every same-row pair matches on every field, and diff-row pairs differ only by the ~6s two-cache refresh skew. Co-Authored-By: Claude Opus 4.8 --- go-services/compare/compare_live.py | 223 ++++++++++++++++++++++++++++ go-services/docker-compose.go.yml | 40 +++++ go-services/tracker-go/Dockerfile | 20 +++ go-services/tracker-go/go.mod | 5 + go-services/tracker-go/live.go | 150 +++++++++++++++++++ go-services/tracker-go/main.go | 173 +++++++++++++++++++++ go-services/tracker-go/store.go | 80 ++++++++++ 7 files changed, 691 insertions(+) create mode 100644 go-services/compare/compare_live.py create mode 100644 go-services/docker-compose.go.yml create mode 100644 go-services/tracker-go/Dockerfile create mode 100644 go-services/tracker-go/go.mod create mode 100644 go-services/tracker-go/live.go create mode 100644 go-services/tracker-go/main.go create mode 100644 go-services/tracker-go/store.go diff --git a/go-services/compare/compare_live.py b/go-services/compare/compare_live.py new file mode 100644 index 00000000..5208ac58 --- /dev/null +++ b/go-services/compare/compare_live.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python3 +"""Compare the Go tracker's /live (and /trails) against the live Python service. + +Run on the server (or anywhere with loopback access to both): + python3 compare_live.py # default loopback ports + python3 compare_live.py --py http://127.0.0.1:8765 --go http://127.0.0.1:8770 + +Parity strategy for a live firehose +----------------------------------- +The two services rebuild their /live cache independently every 5s, so an +actively-updating character can legitimately show a newer telemetry row in one +than the other. We separate "is this a real divergence?" from "is this just +cache timing?" using the server-stamped received_at: + + * SAME ROW (py.received_at == go.received_at): both rendered the *same* + telemetry_events row -> every field MUST match (numbers within epsilon, + timestamps compared as instants). This is the rigorous render-parity proof. + * DIFFERENT ROW: a newer row arrived between the two refreshes -> we only + require identity + key-set + type/null-pattern parity, and report the + volatile-field skew (which should be small and recent). + +Exit code 0 if no real parity violations, 1 otherwise. +""" +import argparse +import json +import sys +import urllib.request +from datetime import datetime, timezone + +EPS = 1e-6 + +# Fields that identify the entity / join keys — must always match for a player +# present in both outputs. +IDENTITY = ("character_name", "char_tag", "session_id") +# Slowly-changing aggregates — informational when they differ on a same-row pair +# (a kill/rare recorded between refreshes can bump these even for the same +# telemetry row). +AGGREGATES = ("total_kills", "total_rares", "session_rares") +TIMESTAMP_FIELDS = ("timestamp", "received_at") + + +def fetch(base, path): + with urllib.request.urlopen(base.rstrip("/") + path, timeout=8) as r: + return json.load(r) + + +def jtype(v): + if v is None: + return "null" + if isinstance(v, bool): + return "bool" + if isinstance(v, (int, float)): + return "num" + if isinstance(v, str): + return "str" + return type(v).__name__ + + +def parse_ts(s): + if s is None: + return None + return datetime.fromisoformat(s.replace("Z", "+00:00")) + + +def values_equal(key, a, b): + """Semantic equality for a single field value.""" + if a is None or b is None: + return a is b or a == b + if key in TIMESTAMP_FIELDS and isinstance(a, str) and isinstance(b, str): + return parse_ts(a) == parse_ts(b) + an, bn = isinstance(a, (int, float)) and not isinstance(a, bool), isinstance(b, (int, float)) and not isinstance(b, bool) + if an and bn: + return abs(float(a) - float(b)) <= EPS + return a == b + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--py", default="http://127.0.0.1:8765") + ap.add_argument("--go", default="http://127.0.0.1:8770") + args = ap.parse_args() + + py = fetch(args.py, "/live")["players"] + go = fetch(args.go, "/live")["players"] + now = datetime.now(timezone.utc) + + pyi = {p["character_name"]: p for p in py} + goi = {p["character_name"]: p for p in go} + common = sorted(set(pyi) & set(goi)) + only_py = sorted(set(pyi) - set(goi)) + only_go = sorted(set(goi) - set(pyi)) + + print("=" * 72) + print("/live PARITY python(%s) vs go(%s)" % (args.py, args.go)) + print("=" * 72) + print(f"python players : {len(py)}") + print(f"go players : {len(go)}") + print(f"common : {len(common)}") + + violations = 0 + + # --- key-set parity (all players) --- + py_keys = set().union(*[set(p) for p in py]) if py else set() + go_keys = set().union(*[set(p) for p in go]) if go else set() + if py_keys == go_keys: + print(f"key set : IDENTICAL ({len(py_keys)} keys)") + else: + violations += 1 + print("key set : MISMATCH") + print(" only in python:", sorted(py_keys - go_keys)) + print(" only in go :", sorted(go_keys - py_keys)) + + # --- online-set parity (boundary-aware) --- + def age(p): + ts = parse_ts(p.get("received_at") or p.get("timestamp")) + return (now - ts).total_seconds() if ts else None + + print("\n-- online set --") + if not only_py and not only_go: + print("online set : IDENTICAL") + else: + # Players near the 30s boundary can flap between the two refreshes. + def explain(names, idx): + for n in names: + a = age(idx[n]) + tag = "boundary-flap (age %.1fs)" % a if a is not None and 22 <= a <= 38 else "age %s" % (None if a is None else round(a, 1)) + print(f" only_{('py' if idx is pyi else 'go')}: {n:<20} {tag}") + if only_py: + print(f"only in python : {len(only_py)}") + explain(only_py, pyi) + if only_go: + print(f"only in go : {len(only_go)}") + explain(only_go, goi) + unexplained = [n for n in (only_py + only_go) + if not (lambda a: a is not None and 22 <= a <= 38)(age((pyi.get(n) or goi.get(n))))] + if unexplained: + violations += 1 + print(" UNEXPLAINED set difference (not near 30s boundary):", unexplained) + else: + print(" (all set differences explained by the 30s online boundary)") + + # --- per-player field parity --- + same_row = [] # py.received_at == go.received_at -> must fully match + diff_row = [] # newer row arrived between refreshes + for n in common: + a, b = pyi[n], goi[n] + if a.get("received_at") is not None and a.get("received_at") == b.get("received_at"): + same_row.append(n) + else: + diff_row.append(n) + + print("\n-- per-player parity --") + print(f"same-row pairs (identical received_at, must fully match): {len(same_row)}") + print(f"diff-row pairs (newer telemetry between refreshes) : {len(diff_row)}") + + # Identity + type/null-pattern parity across ALL common players. + id_bad = type_bad = 0 + for n in common: + a, b = pyi[n], goi[n] + for k in IDENTITY: + if a.get(k) != b.get(k): + id_bad += 1 + print(f" IDENTITY mismatch {n}.{k}: py={a.get(k)!r} go={b.get(k)!r}") + for k in py_keys: + ta, tb = jtype(a.get(k)), jtype(b.get(k)) + if ta != tb: + # null vs num/str is a real null-pattern divergence; num-vs-num + # whole-float (0.0) vs int (0) is already unified under "num". + type_bad += 1 + print(f" TYPE mismatch {n}.{k}: py={ta}({a.get(k)!r}) go={tb}({b.get(k)!r})") + if id_bad: + violations += id_bad + if type_bad: + violations += type_bad + if not id_bad and not type_bad: + print("identity+type : IDENTICAL for all common players") + + # Rigorous: same-row pairs must match on every field. + sr_full_match = 0 + for n in same_row: + a, b = pyi[n], goi[n] + diffs = [] + for k in py_keys: + if not values_equal(k, a.get(k), b.get(k)): + diffs.append((k, a.get(k), b.get(k))) + if not diffs: + sr_full_match += 1 + else: + # Aggregate-only diffs are timing-explainable even on a same row. + non_agg = [d for d in diffs if d[0] not in AGGREGATES] + if non_agg: + violations += 1 + print(f" SAME-ROW FIELD divergence {n}: " + + ", ".join(f"{k}: py={pa!r} go={ga!r}" for k, pa, ga in non_agg)) + else: + print(f" (same-row {n}: only aggregate fields differ — kill/rare between refreshes: " + + ", ".join(f"{k} py={pa} go={ga}" for k, pa, ga in diffs) + ")") + print(f"same-row full-field matches: {sr_full_match}/{len(same_row)}") + + # Volatile-field skew on diff-row pairs (informational). + if diff_row: + ts_deltas = [] + for n in diff_row: + da, db = parse_ts(pyi[n].get("timestamp")), parse_ts(goi[n].get("timestamp")) + if da and db: + ts_deltas.append(abs((da - db).total_seconds())) + if ts_deltas: + ts_deltas.sort() + print(f"diff-row timestamp skew: min={ts_deltas[0]:.1f}s " + f"median={ts_deltas[len(ts_deltas)//2]:.1f}s max={ts_deltas[-1]:.1f}s " + "(bounded by the two 5s refresh cycles)") + + print("\n" + "=" * 72) + if violations == 0: + print("RESULT: PARITY OK — no structural or same-row divergences.") + else: + print(f"RESULT: {violations} PARITY VIOLATION(S) — see above.") + print("=" * 72) + return 1 if violations else 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/go-services/docker-compose.go.yml b/go-services/docker-compose.go.yml new file mode 100644 index 00000000..c0adf796 --- /dev/null +++ b/go-services/docker-compose.go.yml @@ -0,0 +1,40 @@ +# Compose OVERRIDE that adds the Go services alongside the live Python stack. +# It only ADDS containers; it never modifies the tracked docker-compose.yml or +# any running Python service. +# +# Invoke from the repo root so the Compose project name resolves to +# "mosswartoverlord" (same as the live stack) and the new container joins the +# existing default network — letting it reach the `db` service by name: +# +# cd /home/erik/MosswartOverlord +# export BUILD_VERSION="$(date -u +%Y.%-m.%-d.%H%M)-$(git rev-parse --short HEAD)" +# docker compose -f docker-compose.yml -f go-services/docker-compose.go.yml \ +# build dereth-tracker-go +# docker compose -f docker-compose.yml -f go-services/docker-compose.go.yml \ +# up -d --no-deps dereth-tracker-go +# +# --no-deps keeps Compose from touching the already-running `db` (and anything +# else). The service is loopback-bound (127.0.0.1:8770); external reach is only +# ever via the host nginx `location /go/` block (added separately). +services: + dereth-tracker-go: + build: + context: ./go-services/tracker-go + args: + BUILD_VERSION: ${BUILD_VERSION:-dev} + container_name: dereth-tracker-go + ports: + - "127.0.0.1:8770:8770" + environment: + PORT: "8770" + # Read-only use of the same dereth TimescaleDB the Python tracker writes. + DATABASE_URL: "postgresql://postgres:${POSTGRES_PASSWORD}@db:5432/dereth" + LOG_LEVEL: "INFO" + depends_on: + - db + restart: unless-stopped + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" diff --git a/go-services/tracker-go/Dockerfile b/go-services/tracker-go/Dockerfile new file mode 100644 index 00000000..f799af5a --- /dev/null +++ b/go-services/tracker-go/Dockerfile @@ -0,0 +1,20 @@ +# Multi-stage build: compile a static Go binary, ship it on distroless. +# No host Go toolchain required — everything happens inside the build stage. +FROM golang:1.25-bookworm AS build +WORKDIR /src + +# No local Go toolchain is available to maintain go.sum, so resolve and lock +# dependencies inside the build (network is available here). `go mod tidy` +# reads the imports from the source and writes go.mod/go.sum, then we build. +COPY . . +RUN go mod tidy +ARG BUILD_VERSION=dev +RUN CGO_ENABLED=0 GOOS=linux go build \ + -trimpath \ + -ldflags "-s -w -X main.buildVersion=${BUILD_VERSION}" \ + -o /out/tracker-go . + +FROM gcr.io/distroless/static-debian12:nonroot +COPY --from=build /out/tracker-go /tracker-go +EXPOSE 8770 +ENTRYPOINT ["/tracker-go"] diff --git a/go-services/tracker-go/go.mod b/go-services/tracker-go/go.mod new file mode 100644 index 00000000..38254972 --- /dev/null +++ b/go-services/tracker-go/go.mod @@ -0,0 +1,5 @@ +module git.snakedesert.se/SawatoMosswartsEnjoyersClub/MosswartOverlord/go-services/tracker-go + +go 1.25 + +require github.com/jackc/pgx/v5 v5.10.0 diff --git a/go-services/tracker-go/live.go b/go-services/tracker-go/live.go new file mode 100644 index 00000000..2e571602 --- /dev/null +++ b/go-services/tracker-go/live.go @@ -0,0 +1,150 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" +) + +// Timing constants mirror main.py. +const ( + activeWindow = 30 * time.Second // ACTIVE_WINDOW — the real "online" test + chunkLookback = 10 * time.Minute // coarse bound, only so TimescaleDB can prune chunks + trailsWindow = 600 * time.Second // /trails lookback (hardcoded; the `seconds` param is ignored) + cacheInterval = 5 * time.Second // _refresh_cache_loop cadence +) + +// liveSQL mirrors main.py:837 exactly. $1 = chunk_cutoff (now-10min), $2 = cutoff (now-30s). +// Online-ness is decided on COALESCE(received_at, timestamp) — server receive-time — because +// game clients' clocks drift up to ~90s and would otherwise flap the player count. +const liveSQL = ` +SELECT sub.*, + COALESCE(rs.total_rares, 0) AS total_rares, + COALESCE(rss.session_rares, 0) AS session_rares, + COALESCE(cs.total_kills, 0) AS total_kills +FROM ( + SELECT DISTINCT ON (character_name) * + FROM telemetry_events + WHERE timestamp > $1 + AND COALESCE(received_at, timestamp) > $2 + ORDER BY character_name, timestamp DESC +) sub +LEFT JOIN rare_stats rs ON sub.character_name = rs.character_name +LEFT JOIN rare_stats_sessions rss ON sub.character_name = rss.character_name + AND sub.session_id = rss.session_id +LEFT JOIN char_stats cs ON sub.character_name = cs.character_name` + +// trailsSQL mirrors main.py:874 — last 600s of position points, ordered for the map. +const trailsSQL = ` +SELECT timestamp, character_name, ew, ns, z +FROM telemetry_events +WHERE timestamp >= $1 +ORDER BY character_name, timestamp` + +// liveCache holds the pre-marshaled JSON bodies for /live and /trails, swapped +// atomically every cacheInterval by the refresh loop. +type liveCache struct { + mu sync.RWMutex + liveJSON []byte + trailsJSON []byte +} + +func newLiveCache() *liveCache { + return &liveCache{ + liveJSON: []byte(`{"players":[]}`), + trailsJSON: []byte(`{"trails":[]}`), + } +} + +func (c *liveCache) getLive() []byte { + c.mu.RLock() + defer c.mu.RUnlock() + return c.liveJSON +} + +func (c *liveCache) getTrails() []byte { + c.mu.RLock() + defer c.mu.RUnlock() + return c.trailsJSON +} + +func (c *liveCache) set(live, trails []byte) { + c.mu.Lock() + defer c.mu.Unlock() + c.liveJSON = live + c.trailsJSON = trails +} + +// refresh recomputes both caches from the DB. Both queries use the SAME `now` +// so the online window and trails window are consistent within a tick. +func (s *Server) refreshLiveCache(ctx context.Context) error { + qctx, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + + now := time.Now().UTC() + + players, err := queryRowsAsMaps(qctx, s.pool, liveSQL, now.Add(-chunkLookback), now.Add(-activeWindow)) + if err != nil { + return fmt.Errorf("live query: %w", err) + } + formatTimes(players, "timestamp", "received_at") + liveJSON, err := json.Marshal(map[string]any{"players": players}) + if err != nil { + return fmt.Errorf("marshal live: %w", err) + } + + trails, err := queryRowsAsMaps(qctx, s.pool, trailsSQL, now.Add(-trailsWindow)) + if err != nil { + return fmt.Errorf("trails query: %w", err) + } + formatTimes(trails, "timestamp") + trailsJSON, err := json.Marshal(map[string]any{"trails": trails}) + if err != nil { + return fmt.Errorf("marshal trails: %w", err) + } + + s.cache.set(liveJSON, trailsJSON) + return nil +} + +// runCacheLoop refreshes the cache every cacheInterval until ctx is cancelled. +// It refreshes immediately on entry (refresh-then-sleep) so the cache is warm +// shortly after startup. pgxpool handles reconnection transparently, so we just +// log failures and keep serving the last good snapshot. +func (s *Server) runCacheLoop(ctx context.Context) { + failures := 0 + for { + if err := s.refreshLiveCache(ctx); err != nil { + failures++ + s.log.Error("live cache refresh failed", "err", err, "consecutive", failures) + } else { + if failures > 0 { + s.log.Info("live cache refresh recovered", "after_failures", failures) + } + failures = 0 + } + select { + case <-ctx.Done(): + return + case <-time.After(cacheInterval): + } + } +} + +func (s *Server) handleLive(w http.ResponseWriter, r *http.Request) { + writeRawJSON(w, s.cache.getLive()) +} + +func (s *Server) handleTrails(w http.ResponseWriter, r *http.Request) { + // `seconds` query param is accepted but ignored, matching main.py:2001. + writeRawJSON(w, s.cache.getTrails()) +} + +func writeRawJSON(w http.ResponseWriter, body []byte) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write(body) +} diff --git a/go-services/tracker-go/main.go b/go-services/tracker-go/main.go new file mode 100644 index 00000000..ca1baaff --- /dev/null +++ b/go-services/tracker-go/main.go @@ -0,0 +1,173 @@ +// Command tracker-go is a Go reimplementation of the MosswartOverlord +// "dereth-tracker" backend, deployed in parallel with the live Python service +// for side-by-side comparison (strangler-fig migration). +// +// Phase 1: read-side parity. Connects READ-ONLY to the existing dereth +// TimescaleDB and reimplements the HTTP read API, starting with the /live and +// /trails caches (the 5s _refresh_cache_loop). It never touches anything the +// Python service writes. +// +// Routes are declared WITHOUT the nginx-stripped "/go/" prefix, mirroring the +// Python service's "no /api/ prefix" convention. nginx's `location /go/` strips +// the prefix before proxying to this service on 127.0.0.1:8770. +package main + +import ( + "context" + "encoding/json" + "errors" + "log/slog" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// buildVersion is injected at build time via -ldflags "-X main.buildVersion=...". +// Mirrors the Python service's APP_VERSION / "/api-version" stamp. +var buildVersion = "dev" + +// Server holds the shared dependencies for HTTP handlers. +type Server struct { + pool *pgxpool.Pool + cache *liveCache + log *slog.Logger +} + +func main() { + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) + slog.SetDefault(logger) + + cfg := loadConfig() + logger.Info("starting tracker-go", "version", buildVersion, "addr", cfg.Addr) + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + srv := &Server{cache: newLiveCache(), log: logger} + + // Connect to the dereth DB (read-only). If DATABASE_URL is unset we still + // serve health/version (Phase-0 mode) so the container is observable. + if cfg.DatabaseURL == "" { + logger.Warn("DATABASE_URL unset — running without DB; /live and /trails will be empty") + } else { + connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + pool, err := newPool(connectCtx, cfg.DatabaseURL) + cancel() + if err != nil { + logger.Error("db pool init failed", "err", err) + os.Exit(1) + } + defer pool.Close() + srv.pool = pool + go srv.runCacheLoop(ctx) + logger.Info("db connected; live cache loop started", "interval", cacheInterval.String()) + } + + mux := http.NewServeMux() + srv.registerRoutes(mux) + + httpSrv := &http.Server{ + Addr: cfg.Addr, + Handler: withRequestLogging(mux), + ReadHeaderTimeout: 10 * time.Second, + } + + go func() { + if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Error("http server failed", "err", err) + os.Exit(1) + } + }() + logger.Info("listening", "addr", cfg.Addr) + + <-ctx.Done() + logger.Info("shutdown signal received, draining") + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := httpSrv.Shutdown(shutdownCtx); err != nil { + logger.Error("graceful shutdown failed", "err", err) + } + logger.Info("stopped") +} + +// config holds runtime configuration sourced from environment variables, +// matching the Python service's env var names where they overlap. +type config struct { + Addr string // listen address, e.g. ":8770" + DatabaseURL string // dereth TimescaleDB DSN (read-only use) +} + +func loadConfig() config { + return config{ + Addr: ":" + envOr("PORT", "8770"), + DatabaseURL: os.Getenv("DATABASE_URL"), + } +} + +func envOr(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +func (s *Server) registerRoutes(mux *http.ServeMux) { + mux.HandleFunc("GET /health", s.handleHealth) + // Mirrors Python's GET /api-version (hyphenated so nginx never strips it). + mux.HandleFunc("GET /api-version", s.handleVersion) + // Phase 1 read-side: the 5s caches. + mux.HandleFunc("GET /live", s.handleLive) + mux.HandleFunc("GET /live/", s.handleLive) + mux.HandleFunc("GET /trails", s.handleTrails) + mux.HandleFunc("GET /trails/", s.handleTrails) +} + +func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{ + "status": "ok", + "service": "tracker-go", + "version": buildVersion, + "db": s.pool != nil, + }) +} + +func (s *Server) handleVersion(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{"version": buildVersion}) +} + +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + if err := json.NewEncoder(w).Encode(v); err != nil { + slog.Error("json encode failed", "err", err) + } +} + +// withRequestLogging is a thin access-log middleware. +func withRequestLogging(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + sr := &statusRecorder{ResponseWriter: w, status: http.StatusOK} + next.ServeHTTP(sr, r) + slog.Info("http", + "method", r.Method, + "path", r.URL.Path, + "status", sr.status, + "dur_ms", time.Since(start).Milliseconds(), + ) + }) +} + +type statusRecorder struct { + http.ResponseWriter + status int +} + +func (s *statusRecorder) WriteHeader(code int) { + s.status = code + s.ResponseWriter.WriteHeader(code) +} diff --git a/go-services/tracker-go/store.go b/go-services/tracker-go/store.go new file mode 100644 index 00000000..f75fe75b --- /dev/null +++ b/go-services/tracker-go/store.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +// newPool creates a pgx pool against the dereth TimescaleDB. +// +// Phase 1 is strictly read-only. As defense-in-depth we force every pooled +// connection into read-only transaction mode, so even a buggy or future write +// statement cannot mutate the live production data the Python service owns. +func newPool(ctx context.Context, dsn string) (*pgxpool.Pool, error) { + cfg, err := pgxpool.ParseConfig(dsn) + if err != nil { + return nil, fmt.Errorf("parse DATABASE_URL: %w", err) + } + cfg.MaxConns = 10 + cfg.MaxConnIdleTime = 5 * time.Minute + cfg.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { + if _, err := conn.Exec(ctx, "SET default_transaction_read_only = on"); err != nil { + return fmt.Errorf("set read-only: %w", err) + } + return nil + } + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("create pool: %w", err) + } + return pool, nil +} + +// queryRowsAsMaps runs a query and returns each row as a column-name->value map, +// mirroring how the Python service builds response dicts directly from rows. +// A nil result is coerced to an empty (non-nil) slice so JSON encodes "[]". +func queryRowsAsMaps(ctx context.Context, pool *pgxpool.Pool, sql string, args ...any) ([]map[string]any, error) { + rows, err := pool.Query(ctx, sql, args...) + if err != nil { + return nil, err + } + out, err := pgx.CollectRows(rows, pgx.RowToMap) + if err != nil { + return nil, err + } + if out == nil { + out = []map[string]any{} + } + return out, nil +} + +// pyISO formats a timestamp the way Python's datetime.isoformat() does for a +// UTC tz-aware value, so output matches FastAPI's jsonable_encoder: +// - no fractional part when microseconds are zero +// - otherwise exactly 6 fractional digits +// - "+00:00" offset (not "Z") +// Postgres timestamptz has microsecond resolution, so ns is always a multiple +// of 1000. +func pyISO(t time.Time) string { + t = t.UTC() + if t.Nanosecond() == 0 { + return t.Format("2006-01-02T15:04:05+00:00") + } + return t.Format("2006-01-02T15:04:05") + fmt.Sprintf(".%06d+00:00", t.Nanosecond()/1000) +} + +// formatTimes rewrites the named time.Time columns in-place to pyISO strings. +// Missing or NULL (nil) values are left untouched, so they encode as JSON null. +func formatTimes(rows []map[string]any, keys ...string) { + for _, m := range rows { + for _, k := range keys { + if t, ok := m[k].(time.Time); ok { + m[k] = pyISO(t) + } + } + } +}