Stands up the shadow-ingest substrate without touching production: - schema.go: faithful replica of db_async.init_db_async (idempotent DDL), run only when an instance OWNS its DB (READ_ONLY=false). Fixes for a fresh DB: spawn_events has no sole-id PK (so it can be a hypertable), telemetry_events compression is enabled before its policy, and the portal unique index uses ROUND(..,1) to match main.py's ON CONFLICT. 35/35 statements OK. - store.go: read-only transaction enforcement is now conditional (on for production read parity, off for ingest). - main.go: READ_ONLY + SHADOW_INGEST_WS config; schema init on boot when owning the DB. - compose override: a SEPARATE TimescaleDB `dereth-go-db` (isolated volume, 127.0.0.1:5434) and a `dereth-tracker-go-shadow` instance (image reused via dereth-tracker-go:local) that owns it. Production DB never written. Verified: dereth_go has all 13 tables; telemetry_events + spawn_events are hypertables; the read-side instance still serves production read-only. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
197 lines
7.2 KiB
Go
197 lines
7.2 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"strings"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
// initSchema creates the dereth schema on an ingest-owned database, faithfully
|
|
// replicating db_async.init_db_async (idempotent DDL). It runs ONLY for an
|
|
// instance that owns its DB (read-write shadow/ingest mode) — never against the
|
|
// production dereth DB. Like the Python init, it logs and continues per
|
|
// statement so an optional step (e.g. a timescale policy) can't abort the rest.
|
|
//
|
|
// One deliberate divergence from db_async.py: the portal unique index uses
|
|
// ROUND(..,1), matching main.py's ON CONFLICT target, so portal upserts resolve
|
|
// on a fresh DB (db_async.py creates ROUND(..,2) — the known production drift).
|
|
func initSchema(ctx context.Context, pool *pgxpool.Pool, log *slog.Logger) {
|
|
stmts := []string{
|
|
`CREATE EXTENSION IF NOT EXISTS timescaledb`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS telemetry_events (
|
|
character_name VARCHAR NOT NULL,
|
|
char_tag VARCHAR,
|
|
session_id VARCHAR NOT NULL,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
ew DOUBLE PRECISION NOT NULL,
|
|
ns DOUBLE PRECISION NOT NULL,
|
|
z DOUBLE PRECISION NOT NULL,
|
|
kills INTEGER NOT NULL,
|
|
kills_per_hour DOUBLE PRECISION,
|
|
onlinetime VARCHAR,
|
|
deaths INTEGER NOT NULL,
|
|
total_deaths INTEGER,
|
|
rares_found INTEGER NOT NULL,
|
|
prismatic_taper_count INTEGER NOT NULL,
|
|
vt_state VARCHAR,
|
|
mem_mb DOUBLE PRECISION,
|
|
cpu_pct DOUBLE PRECISION,
|
|
mem_handles INTEGER,
|
|
latency_ms DOUBLE PRECISION,
|
|
received_at TIMESTAMPTZ
|
|
)`,
|
|
`SELECT create_hypertable('telemetry_events','timestamp', if_not_exists => true, migrate_data => true, create_default_indexes => false)`,
|
|
`CREATE INDEX IF NOT EXISTS ix_telemetry_events_char_ts ON telemetry_events (character_name, timestamp)`,
|
|
`CREATE INDEX IF NOT EXISTS ix_telemetry_events_character_name ON telemetry_events (character_name)`,
|
|
`CREATE INDEX IF NOT EXISTS ix_telemetry_events_session_id ON telemetry_events (session_id)`,
|
|
`CREATE INDEX IF NOT EXISTS ix_telemetry_events_timestamp ON telemetry_events (timestamp)`,
|
|
`SELECT add_retention_policy('telemetry_events', INTERVAL '7 days', if_not_exists => TRUE)`,
|
|
// Compression must be enabled on the hypertable before a policy can be added.
|
|
`ALTER TABLE telemetry_events SET (timescaledb.compress, timescaledb.compress_segmentby = 'character_name')`,
|
|
`SELECT add_compression_policy('telemetry_events', INTERVAL '1 day', if_not_exists => TRUE)`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS char_stats (
|
|
character_name VARCHAR PRIMARY KEY,
|
|
total_kills INTEGER NOT NULL DEFAULT 0
|
|
)`,
|
|
`CREATE TABLE IF NOT EXISTS rare_stats (
|
|
character_name VARCHAR PRIMARY KEY,
|
|
total_rares INTEGER NOT NULL DEFAULT 0
|
|
)`,
|
|
`CREATE TABLE IF NOT EXISTS rare_stats_sessions (
|
|
character_name VARCHAR NOT NULL,
|
|
session_id VARCHAR NOT NULL,
|
|
session_rares INTEGER NOT NULL DEFAULT 0,
|
|
PRIMARY KEY (character_name, session_id)
|
|
)`,
|
|
`CREATE TABLE IF NOT EXISTS combat_stats (
|
|
character_name VARCHAR PRIMARY KEY,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
stats_data JSONB NOT NULL
|
|
)`,
|
|
`CREATE TABLE IF NOT EXISTS combat_stats_sessions (
|
|
id SERIAL PRIMARY KEY,
|
|
character_name VARCHAR NOT NULL,
|
|
session_id VARCHAR NOT NULL,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
stats_data JSONB NOT NULL
|
|
)`,
|
|
`CREATE INDEX IF NOT EXISTS ix_combat_stats_sessions_character_name ON combat_stats_sessions (character_name)`,
|
|
`CREATE INDEX IF NOT EXISTS ix_combat_stats_sessions_session_id ON combat_stats_sessions (session_id)`,
|
|
`CREATE INDEX IF NOT EXISTS ix_combat_stats_sessions_timestamp ON combat_stats_sessions (timestamp)`,
|
|
|
|
// No sole-id PRIMARY KEY: TimescaleDB requires the partition column
|
|
// (timestamp) in every unique index, so a bare id PK blocks hypertable
|
|
// conversion. id stays an auto-increment column for an append-only log.
|
|
`CREATE TABLE IF NOT EXISTS spawn_events (
|
|
id BIGSERIAL,
|
|
character_name VARCHAR NOT NULL,
|
|
mob VARCHAR NOT NULL,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
ew DOUBLE PRECISION NOT NULL,
|
|
ns DOUBLE PRECISION NOT NULL,
|
|
z DOUBLE PRECISION NOT NULL
|
|
)`,
|
|
`SELECT create_hypertable('spawn_events','timestamp', if_not_exists => TRUE, migrate_data => FALSE, chunk_time_interval => INTERVAL '1 day')`,
|
|
`CREATE INDEX IF NOT EXISTS ix_spawn_events_timestamp ON spawn_events (timestamp)`,
|
|
`SELECT add_retention_policy('spawn_events', INTERVAL '7 days', if_not_exists => TRUE)`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS rare_events (
|
|
id SERIAL PRIMARY KEY,
|
|
character_name VARCHAR NOT NULL,
|
|
name VARCHAR NOT NULL,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
ew DOUBLE PRECISION NOT NULL,
|
|
ns DOUBLE PRECISION NOT NULL,
|
|
z DOUBLE PRECISION NOT NULL
|
|
)`,
|
|
`CREATE INDEX IF NOT EXISTS ix_rare_events_timestamp ON rare_events (timestamp)`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS character_inventories (
|
|
id SERIAL PRIMARY KEY,
|
|
character_name VARCHAR NOT NULL,
|
|
item_id BIGINT NOT NULL,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
name VARCHAR,
|
|
icon INTEGER,
|
|
object_class INTEGER,
|
|
value INTEGER,
|
|
burden INTEGER,
|
|
has_id_data BOOLEAN,
|
|
item_data JSONB NOT NULL,
|
|
CONSTRAINT uq_char_item UNIQUE (character_name, item_id)
|
|
)`,
|
|
`CREATE INDEX IF NOT EXISTS ix_character_inventories_character_name ON character_inventories (character_name)`,
|
|
`CREATE INDEX IF NOT EXISTS ix_character_inventories_object_class ON character_inventories (object_class)`,
|
|
`CREATE INDEX IF NOT EXISTS ix_character_inventories_value ON character_inventories (value)`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS portals (
|
|
id SERIAL PRIMARY KEY,
|
|
portal_name VARCHAR NOT NULL,
|
|
ns DOUBLE PRECISION NOT NULL,
|
|
ew DOUBLE PRECISION NOT NULL,
|
|
z DOUBLE PRECISION NOT NULL,
|
|
discovered_at TIMESTAMPTZ NOT NULL,
|
|
discovered_by VARCHAR NOT NULL
|
|
)`,
|
|
`CREATE INDEX IF NOT EXISTS ix_portals_discovered_at ON portals (discovered_at)`,
|
|
`CREATE UNIQUE INDEX IF NOT EXISTS unique_portal_coords ON portals (ROUND(ns::numeric, 1), ROUND(ew::numeric, 1))`,
|
|
`CREATE INDEX IF NOT EXISTS idx_portals_coords ON portals (ns, ew)`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS server_status (
|
|
server_name VARCHAR PRIMARY KEY,
|
|
current_status VARCHAR(10) NOT NULL,
|
|
last_seen_up TIMESTAMPTZ,
|
|
last_restart TIMESTAMPTZ,
|
|
total_uptime_seconds BIGINT DEFAULT 0,
|
|
last_check TIMESTAMPTZ,
|
|
last_latency_ms DOUBLE PRECISION,
|
|
last_player_count INTEGER
|
|
)`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS character_stats (
|
|
character_name VARCHAR(255) PRIMARY KEY,
|
|
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
level INTEGER,
|
|
total_xp BIGINT,
|
|
unassigned_xp BIGINT,
|
|
luminance_earned BIGINT,
|
|
luminance_total BIGINT,
|
|
deaths INTEGER,
|
|
stats_data JSONB NOT NULL
|
|
)`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS users (
|
|
id SERIAL PRIMARY KEY,
|
|
username VARCHAR NOT NULL UNIQUE,
|
|
password_hash VARCHAR NOT NULL,
|
|
is_admin BOOLEAN NOT NULL DEFAULT false,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
)`,
|
|
}
|
|
|
|
ok, failed := 0, 0
|
|
for _, s := range stmts {
|
|
if _, err := pool.Exec(ctx, s); err != nil {
|
|
failed++
|
|
log.Warn("schema statement failed (continuing)", "stmt", firstLine(s), "err", err)
|
|
continue
|
|
}
|
|
ok++
|
|
}
|
|
log.Info("schema init complete", "ok", ok, "failed", failed)
|
|
}
|
|
|
|
func firstLine(s string) string {
|
|
s = strings.TrimSpace(s)
|
|
if i := strings.IndexByte(s, '\n'); i >= 0 {
|
|
return strings.TrimSpace(s[:i])
|
|
}
|
|
if len(s) > 80 {
|
|
return s[:80]
|
|
}
|
|
return s
|
|
}
|