MosswartOverlord/go-services/tracker-go/store.go
Erik 6a839e69bc feat(go-services): Phase 2 foundation — isolated shadow DB + schema
Stands up the shadow-ingest substrate without touching production:
- schema.go: faithful replica of db_async.init_db_async (idempotent DDL),
  run only when an instance OWNS its DB (READ_ONLY=false). Fixes for a fresh DB:
  spawn_events has no sole-id PK (so it can be a hypertable), telemetry_events
  compression is enabled before its policy, and the portal unique index uses
  ROUND(..,1) to match main.py's ON CONFLICT. 35/35 statements OK.
- store.go: read-only transaction enforcement is now conditional (on for
  production read parity, off for ingest).
- main.go: READ_ONLY + SHADOW_INGEST_WS config; schema init on boot when owning
  the DB.
- compose override: a SEPARATE TimescaleDB `dereth-go-db` (isolated volume,
  127.0.0.1:5434) and a `dereth-tracker-go-shadow` instance (image reused via
  dereth-tracker-go:local) that owns it. Production DB never written.

Verified: dereth_go has all 13 tables; telemetry_events + spawn_events are
hypertables; the read-side instance still serves production read-only.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-24 10:18:30 +02:00

145 lines
4.1 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// newPool creates a pgx pool against a dereth TimescaleDB.
//
// When readOnly is true (the default — read-side parity against the live
// production dereth DB), every pooled connection is forced into read-only
// transaction mode as defense-in-depth, so even a buggy write cannot mutate the
// data the Python service owns. When false (ingest/shadow mode against this
// instance's OWN database), writes are permitted.
func newPool(ctx context.Context, dsn string, readOnly bool) (*pgxpool.Pool, error) {
cfg, err := pgxpool.ParseConfig(dsn)
if err != nil {
return nil, fmt.Errorf("parse DATABASE_URL: %w", err)
}
cfg.MaxConns = 10
cfg.MaxConnIdleTime = 5 * time.Minute
if readOnly {
cfg.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
if _, err := conn.Exec(ctx, "SET default_transaction_read_only = on"); err != nil {
return fmt.Errorf("set read-only: %w", err)
}
return nil
}
}
pool, err := pgxpool.NewWithConfig(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("create pool: %w", err)
}
return pool, nil
}
// queryRowsAsMaps runs a query and returns each row as a column-name->value map,
// mirroring how the Python service builds response dicts directly from rows.
// A nil result is coerced to an empty (non-nil) slice so JSON encodes "[]".
func queryRowsAsMaps(ctx context.Context, pool *pgxpool.Pool, sql string, args ...any) ([]map[string]any, error) {
rows, err := pool.Query(ctx, sql, args...)
if err != nil {
return nil, err
}
out, err := pgx.CollectRows(rows, pgx.RowToMap)
if err != nil {
return nil, err
}
if out == nil {
out = []map[string]any{}
}
return out, nil
}
// queryRowAsMap runs a query expected to return at most one row. It returns
// (nil, nil) when there are no rows, so callers can map that to a 404.
func queryRowAsMap(ctx context.Context, pool *pgxpool.Pool, sql string, args ...any) (map[string]any, error) {
rows, err := pool.Query(ctx, sql, args...)
if err != nil {
return nil, err
}
m, err := pgx.CollectExactlyOneRow(rows, pgx.RowToMap)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
return nil, err
}
return m, nil
}
// asJSONMap coerces a value that may be JSON bytes, a JSON string, or an
// already-decoded map into a map[string]any. Used for JSONB columns where pgx's
// decoding can vary. Returns nil if the value can't be interpreted as an object.
func asJSONMap(v any) map[string]any {
switch x := v.(type) {
case nil:
return nil
case map[string]any:
return x
case []byte:
var m map[string]any
if json.Unmarshal(x, &m) == nil {
return m
}
case string:
var m map[string]any
if json.Unmarshal([]byte(x), &m) == nil {
return m
}
}
return nil
}
// decodeJSONValue coerces a JSON/JSONB column value into its natural Go value
// (map, slice, scalar). Bytes/strings are unmarshaled; anything else is
// returned unchanged.
func decodeJSONValue(v any) any {
switch x := v.(type) {
case []byte:
var out any
if json.Unmarshal(x, &out) == nil {
return out
}
case string:
var out any
if json.Unmarshal([]byte(x), &out) == nil {
return out
}
}
return v
}
// pyISO formats a timestamp the way Python's datetime.isoformat() does for a
// UTC tz-aware value, so output matches FastAPI's jsonable_encoder:
// - no fractional part when microseconds are zero
// - otherwise exactly 6 fractional digits
// - "+00:00" offset (not "Z")
// Postgres timestamptz has microsecond resolution, so ns is always a multiple
// of 1000.
func pyISO(t time.Time) string {
t = t.UTC()
if t.Nanosecond() == 0 {
return t.Format("2006-01-02T15:04:05+00:00")
}
return t.Format("2006-01-02T15:04:05") + fmt.Sprintf(".%06d+00:00", t.Nanosecond()/1000)
}
// formatTimes rewrites the named time.Time columns in-place to pyISO strings.
// Missing or NULL (nil) values are left untouched, so they encode as JSON null.
func formatTimes(rows []map[string]any, keys ...string) {
for _, m := range rows {
for _, k := range keys {
if t, ok := m[k].(time.Time); ok {
m[k] = pyISO(t)
}
}
}
}