Second hotfix attempt for the "ball of spikes" terrain corruption.
The previous _datLock fix was insufficient because dat reads happen
from many render-thread code paths I didn't enumerate (animation
tick, OnLiveMotionUpdated, OnLivePositionUpdated, the live spawn
hydration, ApplyLoadedTerrain) and locking each is invasive and
fragile.
DatReaderWriter's DatCollection is fundamentally not thread-safe:
DatBinReader's internal buffer position is shared per-database, so
two concurrent .Get<T> calls corrupt each other's read state. The
ArgumentOutOfRangeException at DatBinReader.ReadBytesInternal in
the failure log is the smoking gun — one read started reading a
LandBlock, another moved the reader's position, the first one
asked for the wrong number of bytes.
Until Phase A.3 introduces a thread-safe dat wrapper (or until we
preload all dats into pure in-memory dictionaries), the streamer
runs synchronously: EnqueueLoad invokes the load delegate inline
on the calling thread and writes the result to the outbox in a
single call. The render-thread DrainCompletions loop picks it up
on the same frame.
API surface unchanged — Channel-based outbox, EnqueueLoad/Unload,
DrainCompletions, Start (now no-op), Dispose all preserved. Move
back to async loading is a single-class change once dat thread
safety lands.
Cost: visible frame hitch when crossing landblock boundaries
(rendering the new landblock is now on the render thread). For
default 5×5 the hitch is one landblock per cardinal step, ~50ms
worst case. Acceptable for the MVP — correctness over hitches.
Updated the off-thread test to assert the new synchronous contract
(loader runs on the calling thread). The other 4 tests still pass
unchanged because their spin-drain pattern works with synchronous
delivery too.
The previous _datLock from commit c991fb2 stays in place as
defensive belt-and-suspenders — it's free in synchronous mode and
keeps the contract documented at every dat-reading entry point.
212 tests green.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
140 lines
5 KiB
C#
140 lines
5 KiB
C#
using System.Threading.Tasks;
|
|
using AcDream.App.Streaming;
|
|
using AcDream.Core.World;
|
|
using DatReaderWriter.DBObjs;
|
|
using Xunit;
|
|
|
|
namespace AcDream.Core.Tests.Streaming;
|
|
|
|
public class LandblockStreamerTests
|
|
{
|
|
private const int SpinTimeoutMs = 2000;
|
|
private const int SpinStepMs = 10;
|
|
private const int SpinMaxIterations = SpinTimeoutMs / SpinStepMs;
|
|
|
|
[Fact]
|
|
public async Task Load_FollowedByDrain_ReturnsLoadedRecord()
|
|
{
|
|
var stubLandblock = new LoadedLandblock(
|
|
0xA9B4FFFEu,
|
|
new LandBlock(),
|
|
System.Array.Empty<WorldEntity>());
|
|
|
|
using var streamer = new LandblockStreamer(
|
|
loadLandblock: id => id == 0xA9B4FFFEu ? stubLandblock : null);
|
|
|
|
streamer.Start();
|
|
streamer.EnqueueLoad(0xA9B4FFFEu);
|
|
|
|
// Spin until the worker produces a completion, with a 2s timeout.
|
|
LandblockStreamResult? result = null;
|
|
for (int i = 0; i < SpinMaxIterations && result is null; i++)
|
|
{
|
|
var drained = streamer.DrainCompletions(maxBatchSize: LandblockStreamer.DefaultDrainBatchSize);
|
|
if (drained.Count > 0) result = drained[0];
|
|
else await Task.Delay(SpinStepMs);
|
|
}
|
|
|
|
Assert.NotNull(result);
|
|
var loaded = Assert.IsType<LandblockStreamResult.Loaded>(result);
|
|
Assert.Equal(0xA9B4FFFEu, loaded.LandblockId);
|
|
Assert.Same(stubLandblock, loaded.Landblock);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Load_WhenLoaderReturnsNull_ReportsFailed()
|
|
{
|
|
using var streamer = new LandblockStreamer(
|
|
loadLandblock: _ => null);
|
|
|
|
streamer.Start();
|
|
streamer.EnqueueLoad(0x12340000u);
|
|
|
|
LandblockStreamResult? result = null;
|
|
for (int i = 0; i < SpinMaxIterations && result is null; i++)
|
|
{
|
|
var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize);
|
|
if (drained.Count > 0) result = drained[0];
|
|
else await Task.Delay(SpinStepMs);
|
|
}
|
|
|
|
Assert.NotNull(result);
|
|
Assert.IsType<LandblockStreamResult.Failed>(result);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Load_WhenLoaderThrows_ReportsFailedWithMessage()
|
|
{
|
|
using var streamer = new LandblockStreamer(
|
|
loadLandblock: _ => throw new System.InvalidOperationException("boom"));
|
|
|
|
streamer.Start();
|
|
streamer.EnqueueLoad(0x55550000u);
|
|
|
|
LandblockStreamResult? result = null;
|
|
for (int i = 0; i < SpinMaxIterations && result is null; i++)
|
|
{
|
|
var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize);
|
|
if (drained.Count > 0) result = drained[0];
|
|
else await Task.Delay(SpinStepMs);
|
|
}
|
|
|
|
var failed = Assert.IsType<LandblockStreamResult.Failed>(result);
|
|
Assert.Contains("boom", failed.Error);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Unload_ProducesUnloadedResult()
|
|
{
|
|
using var streamer = new LandblockStreamer(loadLandblock: _ => null);
|
|
|
|
streamer.Start();
|
|
streamer.EnqueueUnload(0xABCD0000u);
|
|
|
|
LandblockStreamResult? result = null;
|
|
for (int i = 0; i < SpinMaxIterations && result is null; i++)
|
|
{
|
|
var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize);
|
|
if (drained.Count > 0) result = drained[0];
|
|
else await Task.Delay(SpinStepMs);
|
|
}
|
|
|
|
var unloaded = Assert.IsType<LandblockStreamResult.Unloaded>(result);
|
|
Assert.Equal(0xABCD0000u, unloaded.LandblockId);
|
|
}
|
|
|
|
[Fact]
|
|
public void Load_ExecutesLoaderSynchronously_OnCallingThread()
|
|
{
|
|
// Streamer was made synchronous after Phase A.1 visual verification
|
|
// exposed concurrent dat reads as the cause of "ball of spikes"
|
|
// terrain corruption — DatReaderWriter's DatCollection isn't
|
|
// thread-safe and locking around every dat read on every render-
|
|
// thread code path was too invasive. Until Phase A.3 introduces a
|
|
// thread-safe dat wrapper, the load delegate runs on the calling
|
|
// thread and the result is in the outbox by the time EnqueueLoad
|
|
// returns. This test pins that contract.
|
|
int testThreadId = System.Environment.CurrentManagedThreadId;
|
|
int? loaderThreadId = null;
|
|
var stubLandblock = new LoadedLandblock(
|
|
0x77770FFEu,
|
|
new LandBlock(),
|
|
System.Array.Empty<WorldEntity>());
|
|
|
|
using var streamer = new LandblockStreamer(loadLandblock: id =>
|
|
{
|
|
loaderThreadId = System.Environment.CurrentManagedThreadId;
|
|
return stubLandblock;
|
|
});
|
|
|
|
streamer.Start();
|
|
streamer.EnqueueLoad(0x77770FFEu);
|
|
|
|
// Result is already in the outbox — no spinning needed.
|
|
var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize);
|
|
|
|
Assert.Single(drained);
|
|
Assert.IsType<LandblockStreamResult.Loaded>(drained[0]);
|
|
Assert.Equal(testThreadId, loaderThreadId);
|
|
}
|
|
}
|