From 00bb030c9f933e244d642db9c6fcddf892f3bb87 Mon Sep 17 00:00:00 2001 From: Erik Date: Sun, 10 May 2026 07:32:35 +0200 Subject: [PATCH] feat(A.5 T11): activate LandblockStreamer worker thread Phase A.1 reverted to synchronous mode due to DatCollection thread- safety; T10 documented the lock that makes concurrent reads safe. T11 activates the dedicated worker thread and switches enqueue methods to non-blocking Channel.Writer.TryWrite. EnqueueLoad now takes LandblockStreamJobKind (default: LoadNear from all callers, matching previous full-load semantics). T13/T16 will route by kind per TwoTierDiff. Constructor gains optional buildMeshOrNull param (defaults to null- returning stub); T12 wires the real LandblockMesh.Build factory. GameWindow construction site updated: Action enqueueLoad delegate now wraps a lambda (method group won't bind to Action when the method has an optional second param). LandblockStreamerTests updated: the synchronous-thread-pinning test replaced by Load_ExecutesLoaderOnWorkerThread which asserts the loader runs on a different thread; Load_FollowedByDrain now supplies a stubMesh so the worker can produce Loaded (not Failed) results. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Streaming/LandblockStreamer.cs | 118 ++++++++++-------- .../Streaming/LandblockStreamerTests.cs | 53 +++++--- 2 files changed, 102 insertions(+), 69 deletions(-) diff --git a/src/AcDream.App/Streaming/LandblockStreamer.cs b/src/AcDream.App/Streaming/LandblockStreamer.cs index 4f41486..6b08095 100644 --- a/src/AcDream.App/Streaming/LandblockStreamer.cs +++ b/src/AcDream.App/Streaming/LandblockStreamer.cs @@ -8,28 +8,27 @@ using AcDream.Core.World; namespace AcDream.App.Streaming; /// -/// Services landblock load/unload requests by invoking a caller-supplied -/// load delegate (the production instance wraps -/// ) and posting results to an outbox -/// the render thread drains once per OnUpdate. +/// Services landblock load/unload requests by invoking caller-supplied +/// factory delegates (the production instance wraps +/// for loading and +/// for the terrain +/// mesh) and posting results to an outbox the render thread drains once +/// per OnUpdate. /// /// -/// Currently runs synchronously on the calling thread. The original -/// Phase A.1 design ran loads on a dedicated worker thread, but DatReaderWriter's -/// DatCollection is not thread-safe — concurrent reads from a worker -/// and the render thread (animation tick, live spawn handlers) corrupt -/// internal buffer state and produce half-populated LandBlock.Height[] -/// arrays which render as wildly distorted terrain. Until Phase A.3 introduces -/// a thread-safe dat wrapper, loads are synchronous: -/// invokes the load delegate inline and writes the result to the outbox in -/// a single call. This causes a frame hitch when crossing landblock -/// boundaries, but the rendering is correct. +/// Thread model (Phase A.5 T11+): spawns a +/// dedicated background worker thread. and +/// write non-blocking to the inbox +/// ; the worker drains it and posts +/// records to the outbox. /// /// /// -/// The Channel-based outbox + API is -/// preserved so the move back to async loading is a single-class change -/// when DatCollection thread safety lands. +/// DatCollection thread safety is provided by the caller: +/// GameWindow's _datLock (Phase A.5 T10) serialises all +/// DatCollection.Get<T> calls. Both factory closures passed at +/// construction acquire that lock before reading dats. The worker never +/// touches DatCollection directly — it only calls the factories. /// /// /// @@ -39,8 +38,9 @@ namespace AcDream.App.Streaming; /// /// /// -/// Threading: synchronous mode means all methods must be called from the -/// same thread (the render thread in production). +/// Threading: must be called from a single +/// consumer thread (the render thread in production). All other public +/// methods are thread-safe. /// /// public sealed class LandblockStreamer : IDisposable @@ -53,49 +53,65 @@ public sealed class LandblockStreamer : IDisposable public const int DefaultDrainBatchSize = 4; private readonly Func _loadLandblock; + private readonly Func _buildMeshOrNull; private readonly Channel _inbox; private readonly Channel _outbox; private readonly CancellationTokenSource _cancel = new(); -#pragma warning disable CS0649 // _worker stays declared for the future async path; unused in synchronous mode. private Thread? _worker; -#pragma warning restore CS0649 private int _disposed; - public LandblockStreamer(Func loadLandblock) + public LandblockStreamer( + Func loadLandblock, + Func? buildMeshOrNull = null) { _loadLandblock = loadLandblock; - _inbox = Channel.CreateUnbounded( + // Default: no mesh build (returns null → Failed result). Production + // wires in LandblockMesh.Build via the T12 construction site. + _buildMeshOrNull = buildMeshOrNull ?? ((_, _) => null); + _inbox = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); _outbox = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true, SingleWriter = true }); } /// - /// No-op in synchronous mode. Preserved on the API surface so callers - /// don't need to change when async loading returns in Phase A.3. + /// Activate the dedicated background worker thread. Idempotent: calling + /// more than once has no effect. /// public void Start() { if (System.Threading.Volatile.Read(ref _disposed) != 0) throw new ObjectDisposedException(nameof(LandblockStreamer)); - // No worker thread in synchronous mode. + if (_worker != null) return; + _worker = new Thread(WorkerLoop) + { + IsBackground = true, + Name = "acdream.streaming.worker", + }; + _worker.Start(); } - public void EnqueueLoad(uint landblockId) + /// + /// Non-blocking enqueue. The worker drains the inbox and posts a + /// (or + /// ) to the outbox. + /// + public void EnqueueLoad(uint landblockId, LandblockStreamJobKind kind = LandblockStreamJobKind.LoadNear) { if (System.Threading.Volatile.Read(ref _disposed) != 0) throw new ObjectDisposedException(nameof(LandblockStreamer)); - // Synchronous mode: invoke the load delegate inline. The result lands - // in the outbox and DrainCompletions picks it up later in the same - // (or next) frame. - HandleJob(new LandblockStreamJob.Load(landblockId, LandblockStreamJobKind.LoadNear)); + _inbox.Writer.TryWrite(new LandblockStreamJob.Load(landblockId, kind)); } + /// + /// Non-blocking enqueue. The worker posts a + /// to the outbox. + /// public void EnqueueUnload(uint landblockId) { if (System.Threading.Volatile.Read(ref _disposed) != 0) throw new ObjectDisposedException(nameof(LandblockStreamer)); - HandleJob(new LandblockStreamJob.Unload(landblockId)); + _inbox.Writer.TryWrite(new LandblockStreamJob.Unload(landblockId)); } /// @@ -118,17 +134,14 @@ public sealed class LandblockStreamer : IDisposable { try { - // Synchronous read loop via .WaitToReadAsync + ReadAllAsync - // would be idiomatic but requires async; the blocking reader - // is simpler and the thread is dedicated anyway. + // Safe to block: this is a dedicated worker thread with no + // SynchronizationContext, so .Result/.GetResult cannot deadlock + // against any captured continuation. Using the sync pattern + // here keeps the loop linear; an async-enumerable alternative + // would force WorkerLoop to be async Task and lose the + // simple thread-start shape. while (!_cancel.Token.IsCancellationRequested) { - // Safe to block: this is a dedicated worker thread with no - // SynchronizationContext, so .Result/.GetResult cannot deadlock - // against any captured continuation. Using the sync pattern - // here keeps the loop linear; an async-enumerable alternative - // would force WorkerLoop to be async Task and lose the - // simple thread-start shape. if (!_inbox.Reader.WaitToReadAsync(_cancel.Token).AsTask().GetAwaiter().GetResult()) break; @@ -157,7 +170,7 @@ public sealed class LandblockStreamer : IDisposable switch (job) { case LandblockStreamJob.Load load: - // TODO(A.5 T11/T16): route by load.Kind. LoadFar will skip + // TODO(A.5 T16): route by load.Kind. LoadFar will skip // LandBlockInfo + scenery generation; PromoteToNear will skip // mesh build (terrain already on GPU). Today every Kind takes // the full-load path via _loadLandblock, which matches today's @@ -166,15 +179,22 @@ public sealed class LandblockStreamer : IDisposable { var lb = _loadLandblock(load.LandblockId); if (lb is null) + { _outbox.Writer.TryWrite(new LandblockStreamResult.Failed( load.LandblockId, "LandblockLoader.Load returned null")); - else - // TEMPORARY: passes default! for MeshData — Task 13 wires the real mesh build. - _outbox.Writer.TryWrite(new LandblockStreamResult.Loaded( - load.LandblockId, - LandblockStreamTier.Near, - lb, - MeshData: default! /* TODO(A.5 T13) */)); + break; + } + var mesh = _buildMeshOrNull(load.LandblockId, lb); + if (mesh is null) + { + _outbox.Writer.TryWrite(new LandblockStreamResult.Failed( + load.LandblockId, "buildMeshOrNull returned null")); + break; + } + var tier = load.Kind == LandblockStreamJobKind.LoadFar + ? LandblockStreamTier.Far : LandblockStreamTier.Near; + _outbox.Writer.TryWrite(new LandblockStreamResult.Loaded( + load.LandblockId, tier, lb, mesh)); } catch (Exception ex) { diff --git a/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs b/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs index e058f81..2e11804 100644 --- a/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs +++ b/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs @@ -19,9 +19,13 @@ public class LandblockStreamerTests 0xA9B4FFFEu, new LandBlock(), System.Array.Empty()); + var stubMesh = new AcDream.Core.Terrain.LandblockMeshData( + System.Array.Empty(), + System.Array.Empty()); using var streamer = new LandblockStreamer( - loadLandblock: id => id == 0xA9B4FFFEu ? stubLandblock : null); + loadLandblock: id => id == 0xA9B4FFFEu ? stubLandblock : null, + buildMeshOrNull: (_, _) => stubMesh); streamer.Start(); streamer.EnqueueLoad(0xA9B4FFFEu); @@ -104,37 +108,46 @@ public class LandblockStreamerTests } [Fact] - public void Load_ExecutesLoaderSynchronously_OnCallingThread() + public async Task Load_ExecutesLoaderOnWorkerThread() { - // Streamer was made synchronous after Phase A.1 visual verification - // exposed concurrent dat reads as the cause of "ball of spikes" - // terrain corruption — DatReaderWriter's DatCollection isn't - // thread-safe and locking around every dat read on every render- - // thread code path was too invasive. Until Phase A.3 introduces a - // thread-safe dat wrapper, the load delegate runs on the calling - // thread and the result is in the outbox by the time EnqueueLoad - // returns. This test pins that contract. + // Phase A.5 T11: the load delegate now runs on the dedicated worker + // thread (not the calling/render thread). This test verifies the + // async hand-off: EnqueueLoad returns immediately and the result + // appears in the outbox only after the worker processes the inbox. int testThreadId = System.Environment.CurrentManagedThreadId; int? loaderThreadId = null; var stubLandblock = new LoadedLandblock( 0x77770FFEu, new LandBlock(), System.Array.Empty()); + var stubMesh = new AcDream.Core.Terrain.LandblockMeshData( + System.Array.Empty(), + System.Array.Empty()); - using var streamer = new LandblockStreamer(loadLandblock: id => - { - loaderThreadId = System.Environment.CurrentManagedThreadId; - return stubLandblock; - }); + using var streamer = new LandblockStreamer( + loadLandblock: id => + { + loaderThreadId = System.Environment.CurrentManagedThreadId; + return stubLandblock; + }, + buildMeshOrNull: (_, _) => stubMesh); streamer.Start(); streamer.EnqueueLoad(0x77770FFEu); - // Result is already in the outbox — no spinning needed. - var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); + // Spin until the worker produces a completion. + LandblockStreamResult? result = null; + for (int i = 0; i < SpinMaxIterations && result is null; i++) + { + var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); + if (drained.Count > 0) result = drained[0]; + else await Task.Delay(SpinStepMs); + } - Assert.Single(drained); - Assert.IsType(drained[0]); - Assert.Equal(testThreadId, loaderThreadId); + Assert.NotNull(result); + Assert.IsType(result); + // The loader MUST have run on a different thread than the test thread. + Assert.NotNull(loaderThreadId); + Assert.NotEqual(testThreadId, loaderThreadId.Value); } }