fix(app): Phase A.1 — make LandblockStreamer synchronous (DatCollection isn't thread-safe)
Second hotfix attempt for the "ball of spikes" terrain corruption.
The previous _datLock fix was insufficient because dat reads happen
from many render-thread code paths I didn't enumerate (animation
tick, OnLiveMotionUpdated, OnLivePositionUpdated, the live spawn
hydration, ApplyLoadedTerrain) and locking each is invasive and
fragile.
DatReaderWriter's DatCollection is fundamentally not thread-safe:
DatBinReader's internal buffer position is shared per-database, so
two concurrent .Get<T> calls corrupt each other's read state. The
ArgumentOutOfRangeException at DatBinReader.ReadBytesInternal in
the failure log is the smoking gun — one read started reading a
LandBlock, another moved the reader's position, the first one
asked for the wrong number of bytes.
Until Phase A.3 introduces a thread-safe dat wrapper (or until we
preload all dats into pure in-memory dictionaries), the streamer
runs synchronously: EnqueueLoad invokes the load delegate inline
on the calling thread and writes the result to the outbox in a
single call. The render-thread DrainCompletions loop picks it up
on the same frame.
API surface unchanged — Channel-based outbox, EnqueueLoad/Unload,
DrainCompletions, Start (now no-op), Dispose all preserved. Move
back to async loading is a single-class change once dat thread
safety lands.
Cost: visible frame hitch when crossing landblock boundaries
(rendering the new landblock is now on the render thread). For
default 5×5 the hitch is one landblock per cardinal step, ~50ms
worst case. Acceptable for the MVP — correctness over hitches.
Updated the off-thread test to assert the new synchronous contract
(loader runs on the calling thread). The other 4 tests still pass
unchanged because their spin-drain pattern works with synchronous
delivery too.
The previous _datLock from commit c991fb2 stays in place as
defensive belt-and-suspenders — it's free in synchronous mode and
keeps the contract documented at every dat-reading entry point.
212 tests green.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
c991fb23ce
commit
531c9f9349
2 changed files with 52 additions and 31 deletions
|
|
@ -8,22 +8,39 @@ using AcDream.Core.World;
|
||||||
namespace AcDream.App.Streaming;
|
namespace AcDream.App.Streaming;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Background worker that services landblock load and unload requests off
|
/// Services landblock load/unload requests by invoking a caller-supplied
|
||||||
/// the render thread. Loads are executed on a dedicated thread via a
|
/// load delegate (the production instance wraps
|
||||||
/// caller-supplied delegate (the production instance wraps
|
/// <see cref="LandblockLoader.Load"/>) and posting results to an outbox
|
||||||
/// <see cref="LandblockLoader.Load"/>); completed results are posted to
|
/// the render thread drains once per OnUpdate.
|
||||||
/// an outbox channel the render thread drains once per OnUpdate.
|
|
||||||
///
|
///
|
||||||
/// <para>
|
/// <para>
|
||||||
/// Unloads are passed through the same channel as a <see cref="LandblockStreamResult.Unloaded"/>
|
/// <b>Currently runs synchronously on the calling thread.</b> The original
|
||||||
/// record so the render thread can release GPU state on the next drain —
|
/// Phase A.1 design ran loads on a dedicated worker thread, but DatReaderWriter's
|
||||||
/// the worker never touches GPU resources directly.
|
/// <c>DatCollection</c> is not thread-safe — concurrent reads from a worker
|
||||||
|
/// and the render thread (animation tick, live spawn handlers) corrupt
|
||||||
|
/// internal buffer state and produce half-populated <c>LandBlock.Height[]</c>
|
||||||
|
/// arrays which render as wildly distorted terrain. Until Phase A.3 introduces
|
||||||
|
/// a thread-safe dat wrapper, loads are synchronous: <see cref="EnqueueLoad"/>
|
||||||
|
/// invokes the load delegate inline and writes the result to the outbox in
|
||||||
|
/// a single call. This causes a frame hitch when crossing landblock
|
||||||
|
/// boundaries, but the rendering is correct.
|
||||||
|
/// </para>
|
||||||
|
///
|
||||||
|
/// <para>
|
||||||
|
/// The Channel-based outbox + <see cref="DrainCompletions"/> API is
|
||||||
|
/// preserved so the move back to async loading is a single-class change
|
||||||
|
/// when DatCollection thread safety lands.
|
||||||
|
/// </para>
|
||||||
|
///
|
||||||
|
/// <para>
|
||||||
|
/// Unloads pass through the outbox as <see cref="LandblockStreamResult.Unloaded"/>
|
||||||
|
/// records so the render thread can release GPU state on the next drain —
|
||||||
|
/// the streamer never touches GPU resources directly.
|
||||||
/// </para>
|
/// </para>
|
||||||
///
|
///
|
||||||
/// <remarks>
|
/// <remarks>
|
||||||
/// Threading: <see cref="EnqueueLoad"/> / <see cref="EnqueueUnload"/> may
|
/// Threading: synchronous mode means all methods must be called from the
|
||||||
/// be called from any thread; <see cref="DrainCompletions"/> must be called
|
/// same thread (the render thread in production).
|
||||||
/// from a single consumer thread (the render thread in production).
|
|
||||||
/// </remarks>
|
/// </remarks>
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class LandblockStreamer : IDisposable
|
public sealed class LandblockStreamer : IDisposable
|
||||||
|
|
@ -39,7 +56,9 @@ public sealed class LandblockStreamer : IDisposable
|
||||||
private readonly Channel<LandblockStreamJob> _inbox;
|
private readonly Channel<LandblockStreamJob> _inbox;
|
||||||
private readonly Channel<LandblockStreamResult> _outbox;
|
private readonly Channel<LandblockStreamResult> _outbox;
|
||||||
private readonly CancellationTokenSource _cancel = new();
|
private readonly CancellationTokenSource _cancel = new();
|
||||||
|
#pragma warning disable CS0649 // _worker stays declared for the future async path; unused in synchronous mode.
|
||||||
private Thread? _worker;
|
private Thread? _worker;
|
||||||
|
#pragma warning restore CS0649
|
||||||
private int _disposed;
|
private int _disposed;
|
||||||
|
|
||||||
public LandblockStreamer(Func<uint, LoadedLandblock?> loadLandblock)
|
public LandblockStreamer(Func<uint, LoadedLandblock?> loadLandblock)
|
||||||
|
|
@ -52,34 +71,31 @@ public sealed class LandblockStreamer : IDisposable
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Start the worker thread. Must be called before enqueueing jobs.
|
/// No-op in synchronous mode. Preserved on the API surface so callers
|
||||||
/// Calling twice is a no-op.
|
/// don't need to change when async loading returns in Phase A.3.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public void Start()
|
public void Start()
|
||||||
{
|
{
|
||||||
if (System.Threading.Volatile.Read(ref _disposed) != 0)
|
if (System.Threading.Volatile.Read(ref _disposed) != 0)
|
||||||
throw new ObjectDisposedException(nameof(LandblockStreamer));
|
throw new ObjectDisposedException(nameof(LandblockStreamer));
|
||||||
if (_worker is not null) return;
|
// No worker thread in synchronous mode.
|
||||||
_worker = new Thread(WorkerLoop)
|
|
||||||
{
|
|
||||||
IsBackground = true,
|
|
||||||
Name = "acdream.landblock-streamer",
|
|
||||||
};
|
|
||||||
_worker.Start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void EnqueueLoad(uint landblockId)
|
public void EnqueueLoad(uint landblockId)
|
||||||
{
|
{
|
||||||
if (System.Threading.Volatile.Read(ref _disposed) != 0)
|
if (System.Threading.Volatile.Read(ref _disposed) != 0)
|
||||||
throw new ObjectDisposedException(nameof(LandblockStreamer));
|
throw new ObjectDisposedException(nameof(LandblockStreamer));
|
||||||
_inbox.Writer.TryWrite(new LandblockStreamJob.Load(landblockId));
|
// Synchronous mode: invoke the load delegate inline. The result lands
|
||||||
|
// in the outbox and DrainCompletions picks it up later in the same
|
||||||
|
// (or next) frame.
|
||||||
|
HandleJob(new LandblockStreamJob.Load(landblockId));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void EnqueueUnload(uint landblockId)
|
public void EnqueueUnload(uint landblockId)
|
||||||
{
|
{
|
||||||
if (System.Threading.Volatile.Read(ref _disposed) != 0)
|
if (System.Threading.Volatile.Read(ref _disposed) != 0)
|
||||||
throw new ObjectDisposedException(nameof(LandblockStreamer));
|
throw new ObjectDisposedException(nameof(LandblockStreamer));
|
||||||
_inbox.Writer.TryWrite(new LandblockStreamJob.Unload(landblockId));
|
HandleJob(new LandblockStreamJob.Unload(landblockId));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|
|
||||||
|
|
@ -104,8 +104,16 @@ public class LandblockStreamerTests
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task Load_ExecutesLoaderOnBackgroundThread()
|
public void Load_ExecutesLoaderSynchronously_OnCallingThread()
|
||||||
{
|
{
|
||||||
|
// Streamer was made synchronous after Phase A.1 visual verification
|
||||||
|
// exposed concurrent dat reads as the cause of "ball of spikes"
|
||||||
|
// terrain corruption — DatReaderWriter's DatCollection isn't
|
||||||
|
// thread-safe and locking around every dat read on every render-
|
||||||
|
// thread code path was too invasive. Until Phase A.3 introduces a
|
||||||
|
// thread-safe dat wrapper, the load delegate runs on the calling
|
||||||
|
// thread and the result is in the outbox by the time EnqueueLoad
|
||||||
|
// returns. This test pins that contract.
|
||||||
int testThreadId = System.Environment.CurrentManagedThreadId;
|
int testThreadId = System.Environment.CurrentManagedThreadId;
|
||||||
int? loaderThreadId = null;
|
int? loaderThreadId = null;
|
||||||
var stubLandblock = new LoadedLandblock(
|
var stubLandblock = new LoadedLandblock(
|
||||||
|
|
@ -122,14 +130,11 @@ public class LandblockStreamerTests
|
||||||
streamer.Start();
|
streamer.Start();
|
||||||
streamer.EnqueueLoad(0x77770FFEu);
|
streamer.EnqueueLoad(0x77770FFEu);
|
||||||
|
|
||||||
// Drain until we see the completion.
|
// Result is already in the outbox — no spinning needed.
|
||||||
for (int i = 0; i < SpinMaxIterations && loaderThreadId is null; i++)
|
var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize);
|
||||||
{
|
|
||||||
streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize);
|
|
||||||
if (loaderThreadId is null) await Task.Delay(SpinStepMs);
|
|
||||||
}
|
|
||||||
|
|
||||||
Assert.NotNull(loaderThreadId);
|
Assert.Single(drained);
|
||||||
Assert.NotEqual(testThreadId, loaderThreadId.Value);
|
Assert.IsType<LandblockStreamResult.Loaded>(drained[0]);
|
||||||
|
Assert.Equal(testThreadId, loaderThreadId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue