feat(A.5 T11): activate LandblockStreamer worker thread

Phase A.1 reverted to synchronous mode due to DatCollection thread-
safety; T10 documented the lock that makes concurrent reads safe. T11
activates the dedicated worker thread and switches enqueue methods
to non-blocking Channel.Writer.TryWrite.

EnqueueLoad now takes LandblockStreamJobKind (default: LoadNear from
all callers, matching previous full-load semantics). T13/T16 will
route by kind per TwoTierDiff.

Constructor gains optional buildMeshOrNull param (defaults to null-
returning stub); T12 wires the real LandblockMesh.Build factory.

GameWindow construction site updated: Action<uint> enqueueLoad
delegate now wraps a lambda (method group won't bind to Action<uint>
when the method has an optional second param).

LandblockStreamerTests updated: the synchronous-thread-pinning test
replaced by Load_ExecutesLoaderOnWorkerThread which asserts the
loader runs on a different thread; Load_FollowedByDrain now supplies
a stubMesh so the worker can produce Loaded (not Failed) results.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Erik 2026-05-10 07:32:35 +02:00
parent 0cf86bb126
commit 00bb030c9f
2 changed files with 102 additions and 69 deletions

View file

@ -8,28 +8,27 @@ using AcDream.Core.World;
namespace AcDream.App.Streaming; namespace AcDream.App.Streaming;
/// <summary> /// <summary>
/// Services landblock load/unload requests by invoking a caller-supplied /// Services landblock load/unload requests by invoking caller-supplied
/// load delegate (the production instance wraps /// factory delegates (the production instance wraps
/// <see cref="LandblockLoader.Load"/>) and posting results to an outbox /// <see cref="LandblockLoader.Load"/> for loading and
/// the render thread drains once per OnUpdate. /// <see cref="AcDream.Core.Terrain.LandblockMesh.Build"/> for the terrain
/// mesh) and posting results to an outbox the render thread drains once
/// per OnUpdate.
/// ///
/// <para> /// <para>
/// <b>Currently runs synchronously on the calling thread.</b> The original /// <b>Thread model (Phase A.5 T11+):</b> <see cref="Start"/> spawns a
/// Phase A.1 design ran loads on a dedicated worker thread, but DatReaderWriter's /// dedicated background worker thread. <see cref="EnqueueLoad"/> and
/// <c>DatCollection</c> is not thread-safe — concurrent reads from a worker /// <see cref="EnqueueUnload"/> write non-blocking to the inbox
/// and the render thread (animation tick, live spawn handlers) corrupt /// <see cref="Channel{T}"/>; the worker drains it and posts
/// internal buffer state and produce half-populated <c>LandBlock.Height[]</c> /// <see cref="LandblockStreamResult"/> records to the outbox.
/// arrays which render as wildly distorted terrain. Until Phase A.3 introduces
/// a thread-safe dat wrapper, loads are synchronous: <see cref="EnqueueLoad"/>
/// invokes the load delegate inline and writes the result to the outbox in
/// a single call. This causes a frame hitch when crossing landblock
/// boundaries, but the rendering is correct.
/// </para> /// </para>
/// ///
/// <para> /// <para>
/// The Channel-based outbox + <see cref="DrainCompletions"/> API is /// <b>DatCollection thread safety</b> is provided by the caller:
/// preserved so the move back to async loading is a single-class change /// GameWindow's <c>_datLock</c> (Phase A.5 T10) serialises all
/// when DatCollection thread safety lands. /// <c>DatCollection.Get&lt;T&gt;</c> calls. Both factory closures passed at
/// construction acquire that lock before reading dats. The worker never
/// touches <c>DatCollection</c> directly — it only calls the factories.
/// </para> /// </para>
/// ///
/// <para> /// <para>
@ -39,8 +38,9 @@ namespace AcDream.App.Streaming;
/// </para> /// </para>
/// ///
/// <remarks> /// <remarks>
/// Threading: synchronous mode means all methods must be called from the /// Threading: <see cref="DrainCompletions"/> must be called from a single
/// same thread (the render thread in production). /// consumer thread (the render thread in production). All other public
/// methods are thread-safe.
/// </remarks> /// </remarks>
/// </summary> /// </summary>
public sealed class LandblockStreamer : IDisposable public sealed class LandblockStreamer : IDisposable
@ -53,49 +53,65 @@ public sealed class LandblockStreamer : IDisposable
public const int DefaultDrainBatchSize = 4; public const int DefaultDrainBatchSize = 4;
private readonly Func<uint, LoadedLandblock?> _loadLandblock; private readonly Func<uint, LoadedLandblock?> _loadLandblock;
private readonly Func<uint, LoadedLandblock?, AcDream.Core.Terrain.LandblockMeshData?> _buildMeshOrNull;
private readonly Channel<LandblockStreamJob> _inbox; private readonly Channel<LandblockStreamJob> _inbox;
private readonly Channel<LandblockStreamResult> _outbox; private readonly Channel<LandblockStreamResult> _outbox;
private readonly CancellationTokenSource _cancel = new(); private readonly CancellationTokenSource _cancel = new();
#pragma warning disable CS0649 // _worker stays declared for the future async path; unused in synchronous mode.
private Thread? _worker; private Thread? _worker;
#pragma warning restore CS0649
private int _disposed; private int _disposed;
public LandblockStreamer(Func<uint, LoadedLandblock?> loadLandblock) public LandblockStreamer(
Func<uint, LoadedLandblock?> loadLandblock,
Func<uint, LoadedLandblock?, AcDream.Core.Terrain.LandblockMeshData?>? buildMeshOrNull = null)
{ {
_loadLandblock = loadLandblock; _loadLandblock = loadLandblock;
_inbox = Channel.CreateUnbounded<LandblockStreamJob>( // Default: no mesh build (returns null → Failed result). Production
// wires in LandblockMesh.Build via the T12 construction site.
_buildMeshOrNull = buildMeshOrNull ?? ((_, _) => null);
_inbox = Channel.CreateUnbounded<LandblockStreamJob>(
new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
_outbox = Channel.CreateUnbounded<LandblockStreamResult>( _outbox = Channel.CreateUnbounded<LandblockStreamResult>(
new UnboundedChannelOptions { SingleReader = true, SingleWriter = true }); new UnboundedChannelOptions { SingleReader = true, SingleWriter = true });
} }
/// <summary> /// <summary>
/// No-op in synchronous mode. Preserved on the API surface so callers /// Activate the dedicated background worker thread. Idempotent: calling
/// don't need to change when async loading returns in Phase A.3. /// <see cref="Start"/> more than once has no effect.
/// </summary> /// </summary>
public void Start() public void Start()
{ {
if (System.Threading.Volatile.Read(ref _disposed) != 0) if (System.Threading.Volatile.Read(ref _disposed) != 0)
throw new ObjectDisposedException(nameof(LandblockStreamer)); throw new ObjectDisposedException(nameof(LandblockStreamer));
// No worker thread in synchronous mode. if (_worker != null) return;
_worker = new Thread(WorkerLoop)
{
IsBackground = true,
Name = "acdream.streaming.worker",
};
_worker.Start();
} }
public void EnqueueLoad(uint landblockId) /// <summary>
/// Non-blocking enqueue. The worker drains the inbox and posts a
/// <see cref="LandblockStreamResult.Loaded"/> (or
/// <see cref="LandblockStreamResult.Failed"/>) to the outbox.
/// </summary>
public void EnqueueLoad(uint landblockId, LandblockStreamJobKind kind = LandblockStreamJobKind.LoadNear)
{ {
if (System.Threading.Volatile.Read(ref _disposed) != 0) if (System.Threading.Volatile.Read(ref _disposed) != 0)
throw new ObjectDisposedException(nameof(LandblockStreamer)); throw new ObjectDisposedException(nameof(LandblockStreamer));
// Synchronous mode: invoke the load delegate inline. The result lands _inbox.Writer.TryWrite(new LandblockStreamJob.Load(landblockId, kind));
// in the outbox and DrainCompletions picks it up later in the same
// (or next) frame.
HandleJob(new LandblockStreamJob.Load(landblockId, LandblockStreamJobKind.LoadNear));
} }
/// <summary>
/// Non-blocking enqueue. The worker posts a
/// <see cref="LandblockStreamResult.Unloaded"/> to the outbox.
/// </summary>
public void EnqueueUnload(uint landblockId) public void EnqueueUnload(uint landblockId)
{ {
if (System.Threading.Volatile.Read(ref _disposed) != 0) if (System.Threading.Volatile.Read(ref _disposed) != 0)
throw new ObjectDisposedException(nameof(LandblockStreamer)); throw new ObjectDisposedException(nameof(LandblockStreamer));
HandleJob(new LandblockStreamJob.Unload(landblockId)); _inbox.Writer.TryWrite(new LandblockStreamJob.Unload(landblockId));
} }
/// <summary> /// <summary>
@ -118,17 +134,14 @@ public sealed class LandblockStreamer : IDisposable
{ {
try try
{ {
// Synchronous read loop via .WaitToReadAsync + ReadAllAsync // Safe to block: this is a dedicated worker thread with no
// would be idiomatic but requires async; the blocking reader // SynchronizationContext, so .Result/.GetResult cannot deadlock
// is simpler and the thread is dedicated anyway. // against any captured continuation. Using the sync pattern
// here keeps the loop linear; an async-enumerable alternative
// would force WorkerLoop to be async Task and lose the
// simple thread-start shape.
while (!_cancel.Token.IsCancellationRequested) while (!_cancel.Token.IsCancellationRequested)
{ {
// Safe to block: this is a dedicated worker thread with no
// SynchronizationContext, so .Result/.GetResult cannot deadlock
// against any captured continuation. Using the sync pattern
// here keeps the loop linear; an async-enumerable alternative
// would force WorkerLoop to be async Task and lose the
// simple thread-start shape.
if (!_inbox.Reader.WaitToReadAsync(_cancel.Token).AsTask().GetAwaiter().GetResult()) if (!_inbox.Reader.WaitToReadAsync(_cancel.Token).AsTask().GetAwaiter().GetResult())
break; break;
@ -157,7 +170,7 @@ public sealed class LandblockStreamer : IDisposable
switch (job) switch (job)
{ {
case LandblockStreamJob.Load load: case LandblockStreamJob.Load load:
// TODO(A.5 T11/T16): route by load.Kind. LoadFar will skip // TODO(A.5 T16): route by load.Kind. LoadFar will skip
// LandBlockInfo + scenery generation; PromoteToNear will skip // LandBlockInfo + scenery generation; PromoteToNear will skip
// mesh build (terrain already on GPU). Today every Kind takes // mesh build (terrain already on GPU). Today every Kind takes
// the full-load path via _loadLandblock, which matches today's // the full-load path via _loadLandblock, which matches today's
@ -166,15 +179,22 @@ public sealed class LandblockStreamer : IDisposable
{ {
var lb = _loadLandblock(load.LandblockId); var lb = _loadLandblock(load.LandblockId);
if (lb is null) if (lb is null)
{
_outbox.Writer.TryWrite(new LandblockStreamResult.Failed( _outbox.Writer.TryWrite(new LandblockStreamResult.Failed(
load.LandblockId, "LandblockLoader.Load returned null")); load.LandblockId, "LandblockLoader.Load returned null"));
else break;
// TEMPORARY: passes default! for MeshData — Task 13 wires the real mesh build. }
_outbox.Writer.TryWrite(new LandblockStreamResult.Loaded( var mesh = _buildMeshOrNull(load.LandblockId, lb);
load.LandblockId, if (mesh is null)
LandblockStreamTier.Near, {
lb, _outbox.Writer.TryWrite(new LandblockStreamResult.Failed(
MeshData: default! /* TODO(A.5 T13) */)); load.LandblockId, "buildMeshOrNull returned null"));
break;
}
var tier = load.Kind == LandblockStreamJobKind.LoadFar
? LandblockStreamTier.Far : LandblockStreamTier.Near;
_outbox.Writer.TryWrite(new LandblockStreamResult.Loaded(
load.LandblockId, tier, lb, mesh));
} }
catch (Exception ex) catch (Exception ex)
{ {

View file

@ -19,9 +19,13 @@ public class LandblockStreamerTests
0xA9B4FFFEu, 0xA9B4FFFEu,
new LandBlock(), new LandBlock(),
System.Array.Empty<WorldEntity>()); System.Array.Empty<WorldEntity>());
var stubMesh = new AcDream.Core.Terrain.LandblockMeshData(
System.Array.Empty<AcDream.Core.Terrain.TerrainVertex>(),
System.Array.Empty<uint>());
using var streamer = new LandblockStreamer( using var streamer = new LandblockStreamer(
loadLandblock: id => id == 0xA9B4FFFEu ? stubLandblock : null); loadLandblock: id => id == 0xA9B4FFFEu ? stubLandblock : null,
buildMeshOrNull: (_, _) => stubMesh);
streamer.Start(); streamer.Start();
streamer.EnqueueLoad(0xA9B4FFFEu); streamer.EnqueueLoad(0xA9B4FFFEu);
@ -104,37 +108,46 @@ public class LandblockStreamerTests
} }
[Fact] [Fact]
public void Load_ExecutesLoaderSynchronously_OnCallingThread() public async Task Load_ExecutesLoaderOnWorkerThread()
{ {
// Streamer was made synchronous after Phase A.1 visual verification // Phase A.5 T11: the load delegate now runs on the dedicated worker
// exposed concurrent dat reads as the cause of "ball of spikes" // thread (not the calling/render thread). This test verifies the
// terrain corruption — DatReaderWriter's DatCollection isn't // async hand-off: EnqueueLoad returns immediately and the result
// thread-safe and locking around every dat read on every render- // appears in the outbox only after the worker processes the inbox.
// thread code path was too invasive. Until Phase A.3 introduces a
// thread-safe dat wrapper, the load delegate runs on the calling
// thread and the result is in the outbox by the time EnqueueLoad
// returns. This test pins that contract.
int testThreadId = System.Environment.CurrentManagedThreadId; int testThreadId = System.Environment.CurrentManagedThreadId;
int? loaderThreadId = null; int? loaderThreadId = null;
var stubLandblock = new LoadedLandblock( var stubLandblock = new LoadedLandblock(
0x77770FFEu, 0x77770FFEu,
new LandBlock(), new LandBlock(),
System.Array.Empty<WorldEntity>()); System.Array.Empty<WorldEntity>());
var stubMesh = new AcDream.Core.Terrain.LandblockMeshData(
System.Array.Empty<AcDream.Core.Terrain.TerrainVertex>(),
System.Array.Empty<uint>());
using var streamer = new LandblockStreamer(loadLandblock: id => using var streamer = new LandblockStreamer(
{ loadLandblock: id =>
loaderThreadId = System.Environment.CurrentManagedThreadId; {
return stubLandblock; loaderThreadId = System.Environment.CurrentManagedThreadId;
}); return stubLandblock;
},
buildMeshOrNull: (_, _) => stubMesh);
streamer.Start(); streamer.Start();
streamer.EnqueueLoad(0x77770FFEu); streamer.EnqueueLoad(0x77770FFEu);
// Result is already in the outbox — no spinning needed. // Spin until the worker produces a completion.
var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); LandblockStreamResult? result = null;
for (int i = 0; i < SpinMaxIterations && result is null; i++)
{
var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize);
if (drained.Count > 0) result = drained[0];
else await Task.Delay(SpinStepMs);
}
Assert.Single(drained); Assert.NotNull(result);
Assert.IsType<LandblockStreamResult.Loaded>(drained[0]); Assert.IsType<LandblockStreamResult.Loaded>(result);
Assert.Equal(testThreadId, loaderThreadId); // The loader MUST have run on a different thread than the test thread.
Assert.NotNull(loaderThreadId);
Assert.NotEqual(testThreadId, loaderThreadId.Value);
} }
} }