diff --git a/src/AcDream.App/Streaming/LandblockStreamer.cs b/src/AcDream.App/Streaming/LandblockStreamer.cs
index 4f41486..6b08095 100644
--- a/src/AcDream.App/Streaming/LandblockStreamer.cs
+++ b/src/AcDream.App/Streaming/LandblockStreamer.cs
@@ -8,28 +8,27 @@ using AcDream.Core.World;
namespace AcDream.App.Streaming;
///
-/// Services landblock load/unload requests by invoking a caller-supplied
-/// load delegate (the production instance wraps
-/// ) and posting results to an outbox
-/// the render thread drains once per OnUpdate.
+/// Services landblock load/unload requests by invoking caller-supplied
+/// factory delegates (the production instance wraps
+/// for loading and
+/// for the terrain
+/// mesh) and posting results to an outbox the render thread drains once
+/// per OnUpdate.
///
///
-/// Currently runs synchronously on the calling thread. The original
-/// Phase A.1 design ran loads on a dedicated worker thread, but DatReaderWriter's
-/// DatCollection is not thread-safe — concurrent reads from a worker
-/// and the render thread (animation tick, live spawn handlers) corrupt
-/// internal buffer state and produce half-populated LandBlock.Height[]
-/// arrays which render as wildly distorted terrain. Until Phase A.3 introduces
-/// a thread-safe dat wrapper, loads are synchronous:
-/// invokes the load delegate inline and writes the result to the outbox in
-/// a single call. This causes a frame hitch when crossing landblock
-/// boundaries, but the rendering is correct.
+/// Thread model (Phase A.5 T11+): spawns a
+/// dedicated background worker thread. and
+/// write non-blocking to the inbox
+/// ; the worker drains it and posts
+/// records to the outbox.
///
///
///
-/// The Channel-based outbox + API is
-/// preserved so the move back to async loading is a single-class change
-/// when DatCollection thread safety lands.
+/// DatCollection thread safety is provided by the caller:
+/// GameWindow's _datLock (Phase A.5 T10) serialises all
+/// DatCollection.Get<T> calls. Both factory closures passed at
+/// construction acquire that lock before reading dats. The worker never
+/// touches DatCollection directly — it only calls the factories.
///
///
///
@@ -39,8 +38,9 @@ namespace AcDream.App.Streaming;
///
///
///
-/// Threading: synchronous mode means all methods must be called from the
-/// same thread (the render thread in production).
+/// Threading: must be called from a single
+/// consumer thread (the render thread in production). All other public
+/// methods are thread-safe.
///
///
public sealed class LandblockStreamer : IDisposable
@@ -53,49 +53,65 @@ public sealed class LandblockStreamer : IDisposable
public const int DefaultDrainBatchSize = 4;
private readonly Func _loadLandblock;
+ private readonly Func _buildMeshOrNull;
private readonly Channel _inbox;
private readonly Channel _outbox;
private readonly CancellationTokenSource _cancel = new();
-#pragma warning disable CS0649 // _worker stays declared for the future async path; unused in synchronous mode.
private Thread? _worker;
-#pragma warning restore CS0649
private int _disposed;
- public LandblockStreamer(Func loadLandblock)
+ public LandblockStreamer(
+ Func loadLandblock,
+ Func? buildMeshOrNull = null)
{
_loadLandblock = loadLandblock;
- _inbox = Channel.CreateUnbounded(
+ // Default: no mesh build (returns null → Failed result). Production
+ // wires in LandblockMesh.Build via the T12 construction site.
+ _buildMeshOrNull = buildMeshOrNull ?? ((_, _) => null);
+ _inbox = Channel.CreateUnbounded(
new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
_outbox = Channel.CreateUnbounded(
new UnboundedChannelOptions { SingleReader = true, SingleWriter = true });
}
///
- /// No-op in synchronous mode. Preserved on the API surface so callers
- /// don't need to change when async loading returns in Phase A.3.
+ /// Activate the dedicated background worker thread. Idempotent: calling
+ /// more than once has no effect.
///
public void Start()
{
if (System.Threading.Volatile.Read(ref _disposed) != 0)
throw new ObjectDisposedException(nameof(LandblockStreamer));
- // No worker thread in synchronous mode.
+ if (_worker != null) return;
+ _worker = new Thread(WorkerLoop)
+ {
+ IsBackground = true,
+ Name = "acdream.streaming.worker",
+ };
+ _worker.Start();
}
- public void EnqueueLoad(uint landblockId)
+ ///
+ /// Non-blocking enqueue. The worker drains the inbox and posts a
+ /// (or
+ /// ) to the outbox.
+ ///
+ public void EnqueueLoad(uint landblockId, LandblockStreamJobKind kind = LandblockStreamJobKind.LoadNear)
{
if (System.Threading.Volatile.Read(ref _disposed) != 0)
throw new ObjectDisposedException(nameof(LandblockStreamer));
- // Synchronous mode: invoke the load delegate inline. The result lands
- // in the outbox and DrainCompletions picks it up later in the same
- // (or next) frame.
- HandleJob(new LandblockStreamJob.Load(landblockId, LandblockStreamJobKind.LoadNear));
+ _inbox.Writer.TryWrite(new LandblockStreamJob.Load(landblockId, kind));
}
+ ///
+ /// Non-blocking enqueue. The worker posts a
+ /// to the outbox.
+ ///
public void EnqueueUnload(uint landblockId)
{
if (System.Threading.Volatile.Read(ref _disposed) != 0)
throw new ObjectDisposedException(nameof(LandblockStreamer));
- HandleJob(new LandblockStreamJob.Unload(landblockId));
+ _inbox.Writer.TryWrite(new LandblockStreamJob.Unload(landblockId));
}
///
@@ -118,17 +134,14 @@ public sealed class LandblockStreamer : IDisposable
{
try
{
- // Synchronous read loop via .WaitToReadAsync + ReadAllAsync
- // would be idiomatic but requires async; the blocking reader
- // is simpler and the thread is dedicated anyway.
+ // Safe to block: this is a dedicated worker thread with no
+ // SynchronizationContext, so .Result/.GetResult cannot deadlock
+ // against any captured continuation. Using the sync pattern
+ // here keeps the loop linear; an async-enumerable alternative
+ // would force WorkerLoop to be async Task and lose the
+ // simple thread-start shape.
while (!_cancel.Token.IsCancellationRequested)
{
- // Safe to block: this is a dedicated worker thread with no
- // SynchronizationContext, so .Result/.GetResult cannot deadlock
- // against any captured continuation. Using the sync pattern
- // here keeps the loop linear; an async-enumerable alternative
- // would force WorkerLoop to be async Task and lose the
- // simple thread-start shape.
if (!_inbox.Reader.WaitToReadAsync(_cancel.Token).AsTask().GetAwaiter().GetResult())
break;
@@ -157,7 +170,7 @@ public sealed class LandblockStreamer : IDisposable
switch (job)
{
case LandblockStreamJob.Load load:
- // TODO(A.5 T11/T16): route by load.Kind. LoadFar will skip
+ // TODO(A.5 T16): route by load.Kind. LoadFar will skip
// LandBlockInfo + scenery generation; PromoteToNear will skip
// mesh build (terrain already on GPU). Today every Kind takes
// the full-load path via _loadLandblock, which matches today's
@@ -166,15 +179,22 @@ public sealed class LandblockStreamer : IDisposable
{
var lb = _loadLandblock(load.LandblockId);
if (lb is null)
+ {
_outbox.Writer.TryWrite(new LandblockStreamResult.Failed(
load.LandblockId, "LandblockLoader.Load returned null"));
- else
- // TEMPORARY: passes default! for MeshData — Task 13 wires the real mesh build.
- _outbox.Writer.TryWrite(new LandblockStreamResult.Loaded(
- load.LandblockId,
- LandblockStreamTier.Near,
- lb,
- MeshData: default! /* TODO(A.5 T13) */));
+ break;
+ }
+ var mesh = _buildMeshOrNull(load.LandblockId, lb);
+ if (mesh is null)
+ {
+ _outbox.Writer.TryWrite(new LandblockStreamResult.Failed(
+ load.LandblockId, "buildMeshOrNull returned null"));
+ break;
+ }
+ var tier = load.Kind == LandblockStreamJobKind.LoadFar
+ ? LandblockStreamTier.Far : LandblockStreamTier.Near;
+ _outbox.Writer.TryWrite(new LandblockStreamResult.Loaded(
+ load.LandblockId, tier, lb, mesh));
}
catch (Exception ex)
{
diff --git a/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs b/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs
index e058f81..2e11804 100644
--- a/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs
+++ b/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs
@@ -19,9 +19,13 @@ public class LandblockStreamerTests
0xA9B4FFFEu,
new LandBlock(),
System.Array.Empty());
+ var stubMesh = new AcDream.Core.Terrain.LandblockMeshData(
+ System.Array.Empty(),
+ System.Array.Empty());
using var streamer = new LandblockStreamer(
- loadLandblock: id => id == 0xA9B4FFFEu ? stubLandblock : null);
+ loadLandblock: id => id == 0xA9B4FFFEu ? stubLandblock : null,
+ buildMeshOrNull: (_, _) => stubMesh);
streamer.Start();
streamer.EnqueueLoad(0xA9B4FFFEu);
@@ -104,37 +108,46 @@ public class LandblockStreamerTests
}
[Fact]
- public void Load_ExecutesLoaderSynchronously_OnCallingThread()
+ public async Task Load_ExecutesLoaderOnWorkerThread()
{
- // Streamer was made synchronous after Phase A.1 visual verification
- // exposed concurrent dat reads as the cause of "ball of spikes"
- // terrain corruption — DatReaderWriter's DatCollection isn't
- // thread-safe and locking around every dat read on every render-
- // thread code path was too invasive. Until Phase A.3 introduces a
- // thread-safe dat wrapper, the load delegate runs on the calling
- // thread and the result is in the outbox by the time EnqueueLoad
- // returns. This test pins that contract.
+ // Phase A.5 T11: the load delegate now runs on the dedicated worker
+ // thread (not the calling/render thread). This test verifies the
+ // async hand-off: EnqueueLoad returns immediately and the result
+ // appears in the outbox only after the worker processes the inbox.
int testThreadId = System.Environment.CurrentManagedThreadId;
int? loaderThreadId = null;
var stubLandblock = new LoadedLandblock(
0x77770FFEu,
new LandBlock(),
System.Array.Empty());
+ var stubMesh = new AcDream.Core.Terrain.LandblockMeshData(
+ System.Array.Empty(),
+ System.Array.Empty());
- using var streamer = new LandblockStreamer(loadLandblock: id =>
- {
- loaderThreadId = System.Environment.CurrentManagedThreadId;
- return stubLandblock;
- });
+ using var streamer = new LandblockStreamer(
+ loadLandblock: id =>
+ {
+ loaderThreadId = System.Environment.CurrentManagedThreadId;
+ return stubLandblock;
+ },
+ buildMeshOrNull: (_, _) => stubMesh);
streamer.Start();
streamer.EnqueueLoad(0x77770FFEu);
- // Result is already in the outbox — no spinning needed.
- var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize);
+ // Spin until the worker produces a completion.
+ LandblockStreamResult? result = null;
+ for (int i = 0; i < SpinMaxIterations && result is null; i++)
+ {
+ var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize);
+ if (drained.Count > 0) result = drained[0];
+ else await Task.Delay(SpinStepMs);
+ }
- Assert.Single(drained);
- Assert.IsType(drained[0]);
- Assert.Equal(testThreadId, loaderThreadId);
+ Assert.NotNull(result);
+ Assert.IsType(result);
+ // The loader MUST have run on a different thread than the test thread.
+ Assert.NotNull(loaderThreadId);
+ Assert.NotEqual(testThreadId, loaderThreadId.Value);
}
}