From 6a839e69bcd21617b461c83ca5c19826095ad9c5 Mon Sep 17 00:00:00 2001 From: Erik Date: Wed, 24 Jun 2026 10:18:30 +0200 Subject: [PATCH] =?UTF-8?q?feat(go-services):=20Phase=202=20foundation=20?= =?UTF-8?q?=E2=80=94=20isolated=20shadow=20DB=20+=20schema?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stands up the shadow-ingest substrate without touching production: - schema.go: faithful replica of db_async.init_db_async (idempotent DDL), run only when an instance OWNS its DB (READ_ONLY=false). Fixes for a fresh DB: spawn_events has no sole-id PK (so it can be a hypertable), telemetry_events compression is enabled before its policy, and the portal unique index uses ROUND(..,1) to match main.py's ON CONFLICT. 35/35 statements OK. - store.go: read-only transaction enforcement is now conditional (on for production read parity, off for ingest). - main.go: READ_ONLY + SHADOW_INGEST_WS config; schema init on boot when owning the DB. - compose override: a SEPARATE TimescaleDB `dereth-go-db` (isolated volume, 127.0.0.1:5434) and a `dereth-tracker-go-shadow` instance (image reused via dereth-tracker-go:local) that owns it. Production DB never written. Verified: dereth_go has all 13 tables; telemetry_events + spawn_events are hypertables; the read-side instance still serves production read-only. Co-Authored-By: Claude Opus 4.8 --- go-services/docker-compose.go.yml | 51 ++++++++ go-services/tracker-go/main.go | 19 ++- go-services/tracker-go/schema.go | 197 ++++++++++++++++++++++++++++++ go-services/tracker-go/store.go | 22 ++-- 4 files changed, 277 insertions(+), 12 deletions(-) create mode 100644 go-services/tracker-go/schema.go diff --git a/go-services/docker-compose.go.yml b/go-services/docker-compose.go.yml index 42de969c..a80008f7 100644 --- a/go-services/docker-compose.go.yml +++ b/go-services/docker-compose.go.yml @@ -22,6 +22,7 @@ services: context: ./go-services/tracker-go args: BUILD_VERSION: ${BUILD_VERSION:-dev} + image: dereth-tracker-go:local container_name: dereth-tracker-go ports: - "127.0.0.1:8770:8770" @@ -72,3 +73,53 @@ services: options: max-size: "10m" max-file: "3" + + # ---- Phase 2: shadow ingest (fully isolated; production never touched) ---- + + # A SEPARATE TimescaleDB the Go tracker owns for shadow ingest. Isolated + # volume + loopback port; the production dereth DB is never written. + dereth-go-db: + image: timescale/timescaledb:2.19.3-pg14 + container_name: dereth-go-db + ports: + - "127.0.0.1:5434:5432" + environment: + POSTGRES_DB: "dereth_go" + POSTGRES_USER: "postgres" + POSTGRES_PASSWORD: "${POSTGRES_PASSWORD}" + volumes: + - dereth-go-data:/var/lib/postgresql/data + restart: unless-stopped + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" + + # Shadow tracker instance: same image, but OWNS dereth-go-db (read-write) and + # (once ingest lands) consumes the Python /ws/live firehose into it, so its + # ingest output can be compared against production without writing to it. + dereth-tracker-go-shadow: + image: dereth-tracker-go:local + container_name: dereth-tracker-go-shadow + ports: + - "127.0.0.1:8771:8771" + environment: + PORT: "8771" + DATABASE_URL: "postgresql://postgres:${POSTGRES_PASSWORD}@dereth-go-db:5432/dereth_go" + READ_ONLY: "false" # owns its DB; creates schema on boot + INVENTORY_SERVICE_URL: "http://inventory-service:8000" + SECRET_KEY: "${SECRET_KEY}" + # SHADOW_INGEST_WS: "ws://dereth-tracker:8765/ws/live" # enabled once ingest handlers land + LOG_LEVEL: "INFO" + depends_on: + - dereth-go-db + restart: unless-stopped + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" + +volumes: + dereth-go-data: diff --git a/go-services/tracker-go/main.go b/go-services/tracker-go/main.go index 635c0ed3..c7c991f8 100644 --- a/go-services/tracker-go/main.go +++ b/go-services/tracker-go/main.go @@ -77,7 +77,7 @@ func main() { logger.Warn("DATABASE_URL unset — running without DB; DB-backed endpoints will be empty") } else { connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second) - pool, err := newPool(connectCtx, cfg.DatabaseURL) + pool, err := newPool(connectCtx, cfg.DatabaseURL, cfg.ReadOnly) cancel() if err != nil { logger.Error("db pool init failed", "err", err) @@ -85,9 +85,18 @@ func main() { } defer pool.Close() srv.pool = pool + + // Ingest/shadow mode owns its own DB: create the schema on first run. + if !cfg.ReadOnly { + schemaCtx, cancel := context.WithTimeout(ctx, 60*time.Second) + initSchema(schemaCtx, pool, logger) + cancel() + } + go srv.runCacheLoop(ctx) go srv.runTotalsLoop(ctx) - logger.Info("db connected; cache loops started", "live_interval", cacheInterval.String(), "totals_interval", totalsInterval.String()) + logger.Info("db connected; cache loops started", + "read_only", cfg.ReadOnly, "live_interval", cacheInterval.String(), "totals_interval", totalsInterval.String()) } mux := http.NewServeMux() @@ -121,19 +130,23 @@ func main() { // matching the Python service's env var names where they overlap. type config struct { Addr string // listen address, e.g. ":8770" - DatabaseURL string // dereth TimescaleDB DSN (read-only use) + DatabaseURL string // dereth TimescaleDB DSN + ReadOnly bool // true = read-side parity (force read-only txns); false = ingest/shadow (owns its DB) InventoryURL string // inventory-service base URL StaticDir string // directory for static assets / openissues.json SecretKey string // session-cookie signing key (must match the Python service) + IngestWS string // optional: a /ws/live URL to shadow-ingest from (Python tracker) } func loadConfig() config { return config{ Addr: ":" + envOr("PORT", "8770"), DatabaseURL: os.Getenv("DATABASE_URL"), + ReadOnly: envOr("READ_ONLY", "true") != "false", InventoryURL: envOr("INVENTORY_SERVICE_URL", "http://inventory-service:8000"), StaticDir: envOr("STATIC_DIR", "static"), SecretKey: os.Getenv("SECRET_KEY"), + IngestWS: os.Getenv("SHADOW_INGEST_WS"), } } diff --git a/go-services/tracker-go/schema.go b/go-services/tracker-go/schema.go new file mode 100644 index 00000000..df3326dc --- /dev/null +++ b/go-services/tracker-go/schema.go @@ -0,0 +1,197 @@ +package main + +import ( + "context" + "log/slog" + "strings" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// initSchema creates the dereth schema on an ingest-owned database, faithfully +// replicating db_async.init_db_async (idempotent DDL). It runs ONLY for an +// instance that owns its DB (read-write shadow/ingest mode) — never against the +// production dereth DB. Like the Python init, it logs and continues per +// statement so an optional step (e.g. a timescale policy) can't abort the rest. +// +// One deliberate divergence from db_async.py: the portal unique index uses +// ROUND(..,1), matching main.py's ON CONFLICT target, so portal upserts resolve +// on a fresh DB (db_async.py creates ROUND(..,2) — the known production drift). +func initSchema(ctx context.Context, pool *pgxpool.Pool, log *slog.Logger) { + stmts := []string{ + `CREATE EXTENSION IF NOT EXISTS timescaledb`, + + `CREATE TABLE IF NOT EXISTS telemetry_events ( + character_name VARCHAR NOT NULL, + char_tag VARCHAR, + session_id VARCHAR NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + ew DOUBLE PRECISION NOT NULL, + ns DOUBLE PRECISION NOT NULL, + z DOUBLE PRECISION NOT NULL, + kills INTEGER NOT NULL, + kills_per_hour DOUBLE PRECISION, + onlinetime VARCHAR, + deaths INTEGER NOT NULL, + total_deaths INTEGER, + rares_found INTEGER NOT NULL, + prismatic_taper_count INTEGER NOT NULL, + vt_state VARCHAR, + mem_mb DOUBLE PRECISION, + cpu_pct DOUBLE PRECISION, + mem_handles INTEGER, + latency_ms DOUBLE PRECISION, + received_at TIMESTAMPTZ + )`, + `SELECT create_hypertable('telemetry_events','timestamp', if_not_exists => true, migrate_data => true, create_default_indexes => false)`, + `CREATE INDEX IF NOT EXISTS ix_telemetry_events_char_ts ON telemetry_events (character_name, timestamp)`, + `CREATE INDEX IF NOT EXISTS ix_telemetry_events_character_name ON telemetry_events (character_name)`, + `CREATE INDEX IF NOT EXISTS ix_telemetry_events_session_id ON telemetry_events (session_id)`, + `CREATE INDEX IF NOT EXISTS ix_telemetry_events_timestamp ON telemetry_events (timestamp)`, + `SELECT add_retention_policy('telemetry_events', INTERVAL '7 days', if_not_exists => TRUE)`, + // Compression must be enabled on the hypertable before a policy can be added. + `ALTER TABLE telemetry_events SET (timescaledb.compress, timescaledb.compress_segmentby = 'character_name')`, + `SELECT add_compression_policy('telemetry_events', INTERVAL '1 day', if_not_exists => TRUE)`, + + `CREATE TABLE IF NOT EXISTS char_stats ( + character_name VARCHAR PRIMARY KEY, + total_kills INTEGER NOT NULL DEFAULT 0 + )`, + `CREATE TABLE IF NOT EXISTS rare_stats ( + character_name VARCHAR PRIMARY KEY, + total_rares INTEGER NOT NULL DEFAULT 0 + )`, + `CREATE TABLE IF NOT EXISTS rare_stats_sessions ( + character_name VARCHAR NOT NULL, + session_id VARCHAR NOT NULL, + session_rares INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (character_name, session_id) + )`, + `CREATE TABLE IF NOT EXISTS combat_stats ( + character_name VARCHAR PRIMARY KEY, + timestamp TIMESTAMPTZ NOT NULL, + stats_data JSONB NOT NULL + )`, + `CREATE TABLE IF NOT EXISTS combat_stats_sessions ( + id SERIAL PRIMARY KEY, + character_name VARCHAR NOT NULL, + session_id VARCHAR NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + stats_data JSONB NOT NULL + )`, + `CREATE INDEX IF NOT EXISTS ix_combat_stats_sessions_character_name ON combat_stats_sessions (character_name)`, + `CREATE INDEX IF NOT EXISTS ix_combat_stats_sessions_session_id ON combat_stats_sessions (session_id)`, + `CREATE INDEX IF NOT EXISTS ix_combat_stats_sessions_timestamp ON combat_stats_sessions (timestamp)`, + + // No sole-id PRIMARY KEY: TimescaleDB requires the partition column + // (timestamp) in every unique index, so a bare id PK blocks hypertable + // conversion. id stays an auto-increment column for an append-only log. + `CREATE TABLE IF NOT EXISTS spawn_events ( + id BIGSERIAL, + character_name VARCHAR NOT NULL, + mob VARCHAR NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + ew DOUBLE PRECISION NOT NULL, + ns DOUBLE PRECISION NOT NULL, + z DOUBLE PRECISION NOT NULL + )`, + `SELECT create_hypertable('spawn_events','timestamp', if_not_exists => TRUE, migrate_data => FALSE, chunk_time_interval => INTERVAL '1 day')`, + `CREATE INDEX IF NOT EXISTS ix_spawn_events_timestamp ON spawn_events (timestamp)`, + `SELECT add_retention_policy('spawn_events', INTERVAL '7 days', if_not_exists => TRUE)`, + + `CREATE TABLE IF NOT EXISTS rare_events ( + id SERIAL PRIMARY KEY, + character_name VARCHAR NOT NULL, + name VARCHAR NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + ew DOUBLE PRECISION NOT NULL, + ns DOUBLE PRECISION NOT NULL, + z DOUBLE PRECISION NOT NULL + )`, + `CREATE INDEX IF NOT EXISTS ix_rare_events_timestamp ON rare_events (timestamp)`, + + `CREATE TABLE IF NOT EXISTS character_inventories ( + id SERIAL PRIMARY KEY, + character_name VARCHAR NOT NULL, + item_id BIGINT NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + name VARCHAR, + icon INTEGER, + object_class INTEGER, + value INTEGER, + burden INTEGER, + has_id_data BOOLEAN, + item_data JSONB NOT NULL, + CONSTRAINT uq_char_item UNIQUE (character_name, item_id) + )`, + `CREATE INDEX IF NOT EXISTS ix_character_inventories_character_name ON character_inventories (character_name)`, + `CREATE INDEX IF NOT EXISTS ix_character_inventories_object_class ON character_inventories (object_class)`, + `CREATE INDEX IF NOT EXISTS ix_character_inventories_value ON character_inventories (value)`, + + `CREATE TABLE IF NOT EXISTS portals ( + id SERIAL PRIMARY KEY, + portal_name VARCHAR NOT NULL, + ns DOUBLE PRECISION NOT NULL, + ew DOUBLE PRECISION NOT NULL, + z DOUBLE PRECISION NOT NULL, + discovered_at TIMESTAMPTZ NOT NULL, + discovered_by VARCHAR NOT NULL + )`, + `CREATE INDEX IF NOT EXISTS ix_portals_discovered_at ON portals (discovered_at)`, + `CREATE UNIQUE INDEX IF NOT EXISTS unique_portal_coords ON portals (ROUND(ns::numeric, 1), ROUND(ew::numeric, 1))`, + `CREATE INDEX IF NOT EXISTS idx_portals_coords ON portals (ns, ew)`, + + `CREATE TABLE IF NOT EXISTS server_status ( + server_name VARCHAR PRIMARY KEY, + current_status VARCHAR(10) NOT NULL, + last_seen_up TIMESTAMPTZ, + last_restart TIMESTAMPTZ, + total_uptime_seconds BIGINT DEFAULT 0, + last_check TIMESTAMPTZ, + last_latency_ms DOUBLE PRECISION, + last_player_count INTEGER + )`, + + `CREATE TABLE IF NOT EXISTS character_stats ( + character_name VARCHAR(255) PRIMARY KEY, + timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(), + level INTEGER, + total_xp BIGINT, + unassigned_xp BIGINT, + luminance_earned BIGINT, + luminance_total BIGINT, + deaths INTEGER, + stats_data JSONB NOT NULL + )`, + + `CREATE TABLE IF NOT EXISTS users ( + id SERIAL PRIMARY KEY, + username VARCHAR NOT NULL UNIQUE, + password_hash VARCHAR NOT NULL, + is_admin BOOLEAN NOT NULL DEFAULT false, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + )`, + } + + ok, failed := 0, 0 + for _, s := range stmts { + if _, err := pool.Exec(ctx, s); err != nil { + failed++ + log.Warn("schema statement failed (continuing)", "stmt", firstLine(s), "err", err) + continue + } + ok++ + } + log.Info("schema init complete", "ok", ok, "failed", failed) +} + +func firstLine(s string) string { + s = strings.TrimSpace(s) + if i := strings.IndexByte(s, '\n'); i >= 0 { + return strings.TrimSpace(s[:i]) + } + if len(s) > 80 { + return s[:80] + } + return s +} diff --git a/go-services/tracker-go/store.go b/go-services/tracker-go/store.go index 505ddf13..6979622b 100644 --- a/go-services/tracker-go/store.go +++ b/go-services/tracker-go/store.go @@ -11,23 +11,27 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) -// newPool creates a pgx pool against the dereth TimescaleDB. +// newPool creates a pgx pool against a dereth TimescaleDB. // -// Phase 1 is strictly read-only. As defense-in-depth we force every pooled -// connection into read-only transaction mode, so even a buggy or future write -// statement cannot mutate the live production data the Python service owns. -func newPool(ctx context.Context, dsn string) (*pgxpool.Pool, error) { +// When readOnly is true (the default — read-side parity against the live +// production dereth DB), every pooled connection is forced into read-only +// transaction mode as defense-in-depth, so even a buggy write cannot mutate the +// data the Python service owns. When false (ingest/shadow mode against this +// instance's OWN database), writes are permitted. +func newPool(ctx context.Context, dsn string, readOnly bool) (*pgxpool.Pool, error) { cfg, err := pgxpool.ParseConfig(dsn) if err != nil { return nil, fmt.Errorf("parse DATABASE_URL: %w", err) } cfg.MaxConns = 10 cfg.MaxConnIdleTime = 5 * time.Minute - cfg.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { - if _, err := conn.Exec(ctx, "SET default_transaction_read_only = on"); err != nil { - return fmt.Errorf("set read-only: %w", err) + if readOnly { + cfg.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { + if _, err := conn.Exec(ctx, "SET default_transaction_read_only = on"); err != nil { + return fmt.Errorf("set read-only: %w", err) + } + return nil } - return nil } pool, err := pgxpool.NewWithConfig(ctx, cfg) if err != nil {