feat(go-services): Phase 2 foundation — isolated shadow DB + schema

Stands up the shadow-ingest substrate without touching production:
- schema.go: faithful replica of db_async.init_db_async (idempotent DDL),
  run only when an instance OWNS its DB (READ_ONLY=false). Fixes for a fresh DB:
  spawn_events has no sole-id PK (so it can be a hypertable), telemetry_events
  compression is enabled before its policy, and the portal unique index uses
  ROUND(..,1) to match main.py's ON CONFLICT. 35/35 statements OK.
- store.go: read-only transaction enforcement is now conditional (on for
  production read parity, off for ingest).
- main.go: READ_ONLY + SHADOW_INGEST_WS config; schema init on boot when owning
  the DB.
- compose override: a SEPARATE TimescaleDB `dereth-go-db` (isolated volume,
  127.0.0.1:5434) and a `dereth-tracker-go-shadow` instance (image reused via
  dereth-tracker-go:local) that owns it. Production DB never written.

Verified: dereth_go has all 13 tables; telemetry_events + spawn_events are
hypertables; the read-side instance still serves production read-only.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Erik 2026-06-24 10:18:30 +02:00
parent b6d2871cf0
commit 6a839e69bc
4 changed files with 277 additions and 12 deletions

View file

@ -22,6 +22,7 @@ services:
context: ./go-services/tracker-go
args:
BUILD_VERSION: ${BUILD_VERSION:-dev}
image: dereth-tracker-go:local
container_name: dereth-tracker-go
ports:
- "127.0.0.1:8770:8770"
@ -72,3 +73,53 @@ services:
options:
max-size: "10m"
max-file: "3"
# ---- Phase 2: shadow ingest (fully isolated; production never touched) ----
# A SEPARATE TimescaleDB the Go tracker owns for shadow ingest. Isolated
# volume + loopback port; the production dereth DB is never written.
dereth-go-db:
image: timescale/timescaledb:2.19.3-pg14
container_name: dereth-go-db
ports:
- "127.0.0.1:5434:5432"
environment:
POSTGRES_DB: "dereth_go"
POSTGRES_USER: "postgres"
POSTGRES_PASSWORD: "${POSTGRES_PASSWORD}"
volumes:
- dereth-go-data:/var/lib/postgresql/data
restart: unless-stopped
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
# Shadow tracker instance: same image, but OWNS dereth-go-db (read-write) and
# (once ingest lands) consumes the Python /ws/live firehose into it, so its
# ingest output can be compared against production without writing to it.
dereth-tracker-go-shadow:
image: dereth-tracker-go:local
container_name: dereth-tracker-go-shadow
ports:
- "127.0.0.1:8771:8771"
environment:
PORT: "8771"
DATABASE_URL: "postgresql://postgres:${POSTGRES_PASSWORD}@dereth-go-db:5432/dereth_go"
READ_ONLY: "false" # owns its DB; creates schema on boot
INVENTORY_SERVICE_URL: "http://inventory-service:8000"
SECRET_KEY: "${SECRET_KEY}"
# SHADOW_INGEST_WS: "ws://dereth-tracker:8765/ws/live" # enabled once ingest handlers land
LOG_LEVEL: "INFO"
depends_on:
- dereth-go-db
restart: unless-stopped
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
volumes:
dereth-go-data:

View file

@ -77,7 +77,7 @@ func main() {
logger.Warn("DATABASE_URL unset — running without DB; DB-backed endpoints will be empty")
} else {
connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
pool, err := newPool(connectCtx, cfg.DatabaseURL)
pool, err := newPool(connectCtx, cfg.DatabaseURL, cfg.ReadOnly)
cancel()
if err != nil {
logger.Error("db pool init failed", "err", err)
@ -85,9 +85,18 @@ func main() {
}
defer pool.Close()
srv.pool = pool
// Ingest/shadow mode owns its own DB: create the schema on first run.
if !cfg.ReadOnly {
schemaCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
initSchema(schemaCtx, pool, logger)
cancel()
}
go srv.runCacheLoop(ctx)
go srv.runTotalsLoop(ctx)
logger.Info("db connected; cache loops started", "live_interval", cacheInterval.String(), "totals_interval", totalsInterval.String())
logger.Info("db connected; cache loops started",
"read_only", cfg.ReadOnly, "live_interval", cacheInterval.String(), "totals_interval", totalsInterval.String())
}
mux := http.NewServeMux()
@ -121,19 +130,23 @@ func main() {
// matching the Python service's env var names where they overlap.
type config struct {
Addr string // listen address, e.g. ":8770"
DatabaseURL string // dereth TimescaleDB DSN (read-only use)
DatabaseURL string // dereth TimescaleDB DSN
ReadOnly bool // true = read-side parity (force read-only txns); false = ingest/shadow (owns its DB)
InventoryURL string // inventory-service base URL
StaticDir string // directory for static assets / openissues.json
SecretKey string // session-cookie signing key (must match the Python service)
IngestWS string // optional: a /ws/live URL to shadow-ingest from (Python tracker)
}
func loadConfig() config {
return config{
Addr: ":" + envOr("PORT", "8770"),
DatabaseURL: os.Getenv("DATABASE_URL"),
ReadOnly: envOr("READ_ONLY", "true") != "false",
InventoryURL: envOr("INVENTORY_SERVICE_URL", "http://inventory-service:8000"),
StaticDir: envOr("STATIC_DIR", "static"),
SecretKey: os.Getenv("SECRET_KEY"),
IngestWS: os.Getenv("SHADOW_INGEST_WS"),
}
}

View file

@ -0,0 +1,197 @@
package main
import (
"context"
"log/slog"
"strings"
"github.com/jackc/pgx/v5/pgxpool"
)
// initSchema creates the dereth schema on an ingest-owned database, faithfully
// replicating db_async.init_db_async (idempotent DDL). It runs ONLY for an
// instance that owns its DB (read-write shadow/ingest mode) — never against the
// production dereth DB. Like the Python init, it logs and continues per
// statement so an optional step (e.g. a timescale policy) can't abort the rest.
//
// One deliberate divergence from db_async.py: the portal unique index uses
// ROUND(..,1), matching main.py's ON CONFLICT target, so portal upserts resolve
// on a fresh DB (db_async.py creates ROUND(..,2) — the known production drift).
func initSchema(ctx context.Context, pool *pgxpool.Pool, log *slog.Logger) {
stmts := []string{
`CREATE EXTENSION IF NOT EXISTS timescaledb`,
`CREATE TABLE IF NOT EXISTS telemetry_events (
character_name VARCHAR NOT NULL,
char_tag VARCHAR,
session_id VARCHAR NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
ew DOUBLE PRECISION NOT NULL,
ns DOUBLE PRECISION NOT NULL,
z DOUBLE PRECISION NOT NULL,
kills INTEGER NOT NULL,
kills_per_hour DOUBLE PRECISION,
onlinetime VARCHAR,
deaths INTEGER NOT NULL,
total_deaths INTEGER,
rares_found INTEGER NOT NULL,
prismatic_taper_count INTEGER NOT NULL,
vt_state VARCHAR,
mem_mb DOUBLE PRECISION,
cpu_pct DOUBLE PRECISION,
mem_handles INTEGER,
latency_ms DOUBLE PRECISION,
received_at TIMESTAMPTZ
)`,
`SELECT create_hypertable('telemetry_events','timestamp', if_not_exists => true, migrate_data => true, create_default_indexes => false)`,
`CREATE INDEX IF NOT EXISTS ix_telemetry_events_char_ts ON telemetry_events (character_name, timestamp)`,
`CREATE INDEX IF NOT EXISTS ix_telemetry_events_character_name ON telemetry_events (character_name)`,
`CREATE INDEX IF NOT EXISTS ix_telemetry_events_session_id ON telemetry_events (session_id)`,
`CREATE INDEX IF NOT EXISTS ix_telemetry_events_timestamp ON telemetry_events (timestamp)`,
`SELECT add_retention_policy('telemetry_events', INTERVAL '7 days', if_not_exists => TRUE)`,
// Compression must be enabled on the hypertable before a policy can be added.
`ALTER TABLE telemetry_events SET (timescaledb.compress, timescaledb.compress_segmentby = 'character_name')`,
`SELECT add_compression_policy('telemetry_events', INTERVAL '1 day', if_not_exists => TRUE)`,
`CREATE TABLE IF NOT EXISTS char_stats (
character_name VARCHAR PRIMARY KEY,
total_kills INTEGER NOT NULL DEFAULT 0
)`,
`CREATE TABLE IF NOT EXISTS rare_stats (
character_name VARCHAR PRIMARY KEY,
total_rares INTEGER NOT NULL DEFAULT 0
)`,
`CREATE TABLE IF NOT EXISTS rare_stats_sessions (
character_name VARCHAR NOT NULL,
session_id VARCHAR NOT NULL,
session_rares INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (character_name, session_id)
)`,
`CREATE TABLE IF NOT EXISTS combat_stats (
character_name VARCHAR PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
stats_data JSONB NOT NULL
)`,
`CREATE TABLE IF NOT EXISTS combat_stats_sessions (
id SERIAL PRIMARY KEY,
character_name VARCHAR NOT NULL,
session_id VARCHAR NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
stats_data JSONB NOT NULL
)`,
`CREATE INDEX IF NOT EXISTS ix_combat_stats_sessions_character_name ON combat_stats_sessions (character_name)`,
`CREATE INDEX IF NOT EXISTS ix_combat_stats_sessions_session_id ON combat_stats_sessions (session_id)`,
`CREATE INDEX IF NOT EXISTS ix_combat_stats_sessions_timestamp ON combat_stats_sessions (timestamp)`,
// No sole-id PRIMARY KEY: TimescaleDB requires the partition column
// (timestamp) in every unique index, so a bare id PK blocks hypertable
// conversion. id stays an auto-increment column for an append-only log.
`CREATE TABLE IF NOT EXISTS spawn_events (
id BIGSERIAL,
character_name VARCHAR NOT NULL,
mob VARCHAR NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
ew DOUBLE PRECISION NOT NULL,
ns DOUBLE PRECISION NOT NULL,
z DOUBLE PRECISION NOT NULL
)`,
`SELECT create_hypertable('spawn_events','timestamp', if_not_exists => TRUE, migrate_data => FALSE, chunk_time_interval => INTERVAL '1 day')`,
`CREATE INDEX IF NOT EXISTS ix_spawn_events_timestamp ON spawn_events (timestamp)`,
`SELECT add_retention_policy('spawn_events', INTERVAL '7 days', if_not_exists => TRUE)`,
`CREATE TABLE IF NOT EXISTS rare_events (
id SERIAL PRIMARY KEY,
character_name VARCHAR NOT NULL,
name VARCHAR NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
ew DOUBLE PRECISION NOT NULL,
ns DOUBLE PRECISION NOT NULL,
z DOUBLE PRECISION NOT NULL
)`,
`CREATE INDEX IF NOT EXISTS ix_rare_events_timestamp ON rare_events (timestamp)`,
`CREATE TABLE IF NOT EXISTS character_inventories (
id SERIAL PRIMARY KEY,
character_name VARCHAR NOT NULL,
item_id BIGINT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
name VARCHAR,
icon INTEGER,
object_class INTEGER,
value INTEGER,
burden INTEGER,
has_id_data BOOLEAN,
item_data JSONB NOT NULL,
CONSTRAINT uq_char_item UNIQUE (character_name, item_id)
)`,
`CREATE INDEX IF NOT EXISTS ix_character_inventories_character_name ON character_inventories (character_name)`,
`CREATE INDEX IF NOT EXISTS ix_character_inventories_object_class ON character_inventories (object_class)`,
`CREATE INDEX IF NOT EXISTS ix_character_inventories_value ON character_inventories (value)`,
`CREATE TABLE IF NOT EXISTS portals (
id SERIAL PRIMARY KEY,
portal_name VARCHAR NOT NULL,
ns DOUBLE PRECISION NOT NULL,
ew DOUBLE PRECISION NOT NULL,
z DOUBLE PRECISION NOT NULL,
discovered_at TIMESTAMPTZ NOT NULL,
discovered_by VARCHAR NOT NULL
)`,
`CREATE INDEX IF NOT EXISTS ix_portals_discovered_at ON portals (discovered_at)`,
`CREATE UNIQUE INDEX IF NOT EXISTS unique_portal_coords ON portals (ROUND(ns::numeric, 1), ROUND(ew::numeric, 1))`,
`CREATE INDEX IF NOT EXISTS idx_portals_coords ON portals (ns, ew)`,
`CREATE TABLE IF NOT EXISTS server_status (
server_name VARCHAR PRIMARY KEY,
current_status VARCHAR(10) NOT NULL,
last_seen_up TIMESTAMPTZ,
last_restart TIMESTAMPTZ,
total_uptime_seconds BIGINT DEFAULT 0,
last_check TIMESTAMPTZ,
last_latency_ms DOUBLE PRECISION,
last_player_count INTEGER
)`,
`CREATE TABLE IF NOT EXISTS character_stats (
character_name VARCHAR(255) PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
level INTEGER,
total_xp BIGINT,
unassigned_xp BIGINT,
luminance_earned BIGINT,
luminance_total BIGINT,
deaths INTEGER,
stats_data JSONB NOT NULL
)`,
`CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR NOT NULL UNIQUE,
password_hash VARCHAR NOT NULL,
is_admin BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)`,
}
ok, failed := 0, 0
for _, s := range stmts {
if _, err := pool.Exec(ctx, s); err != nil {
failed++
log.Warn("schema statement failed (continuing)", "stmt", firstLine(s), "err", err)
continue
}
ok++
}
log.Info("schema init complete", "ok", ok, "failed", failed)
}
func firstLine(s string) string {
s = strings.TrimSpace(s)
if i := strings.IndexByte(s, '\n'); i >= 0 {
return strings.TrimSpace(s[:i])
}
if len(s) > 80 {
return s[:80]
}
return s
}

View file

@ -11,23 +11,27 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
)
// newPool creates a pgx pool against the dereth TimescaleDB.
// newPool creates a pgx pool against a dereth TimescaleDB.
//
// Phase 1 is strictly read-only. As defense-in-depth we force every pooled
// connection into read-only transaction mode, so even a buggy or future write
// statement cannot mutate the live production data the Python service owns.
func newPool(ctx context.Context, dsn string) (*pgxpool.Pool, error) {
// When readOnly is true (the default — read-side parity against the live
// production dereth DB), every pooled connection is forced into read-only
// transaction mode as defense-in-depth, so even a buggy write cannot mutate the
// data the Python service owns. When false (ingest/shadow mode against this
// instance's OWN database), writes are permitted.
func newPool(ctx context.Context, dsn string, readOnly bool) (*pgxpool.Pool, error) {
cfg, err := pgxpool.ParseConfig(dsn)
if err != nil {
return nil, fmt.Errorf("parse DATABASE_URL: %w", err)
}
cfg.MaxConns = 10
cfg.MaxConnIdleTime = 5 * time.Minute
cfg.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
if _, err := conn.Exec(ctx, "SET default_transaction_read_only = on"); err != nil {
return fmt.Errorf("set read-only: %w", err)
if readOnly {
cfg.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
if _, err := conn.Exec(ctx, "SET default_transaction_read_only = on"); err != nil {
return fmt.Errorf("set read-only: %w", err)
}
return nil
}
return nil
}
pool, err := pgxpool.NewWithConfig(ctx, cfg)
if err != nil {