From c5e207a51fe4cc1f6ab9aa2c6c64c82ad407378b Mon Sep 17 00:00:00 2001 From: Erik Date: Sat, 11 Apr 2026 22:20:41 +0200 Subject: [PATCH] =?UTF-8?q?fix(app):=20Phase=20A.1=20=E2=80=94=20Landblock?= =?UTF-8?q?Streamer=20lifecycle=20+=20threading=20hardening?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code review follow-up to commit 0904372. Five Important fixes plus three Minor polish items found by the reviewer before StreamingController depends on this class under churn. I1: Dispose is now thread-safe via Interlocked.Exchange on an int guard. Two concurrent Dispose calls no longer double-dispose the CancellationTokenSource. I2: EnqueueLoad/EnqueueUnload now throw ObjectDisposedException when called after Dispose instead of silently dropping the job. Jobs vanishing into a completed channel was a debugging hazard. I3: Start throws ObjectDisposedException when called after Dispose instead of silently doing nothing (the old guard only checked whether the thread was non-null, not whether the streamer was still usable). I4: New test Load_ExecutesLoaderOnBackgroundThread captures the loader delegate's ManagedThreadId and asserts it differs from the test thread's id, proving the whole reason this class exists (off-thread execution) is actually happening. I5: New LandblockStreamResult.WorkerCrashed record type for the outer catch in WorkerLoop. Previously the crash path wrote Failed(0, ex.ToString()) which collided with landblock (0, 0) in the north ocean, making "worker crashed" indistinguishable from "landblock 0 failed to load". Minor polish: - M1: Test spin constants (SpinTimeoutMs, SpinStepMs, SpinMaxIterations) extracted so the 200 x 10ms pattern has one source of truth. - M2: DefaultDrainBatchSize public const on LandblockStreamer so the batch cap has a name and a comment explaining why 4. - M3: Safety-argument comment on the sync-over-async WaitToReadAsync call explaining why it cannot deadlock (dedicated thread, no SyncContext). - M6: XML remarks on the class and on DrainCompletions documenting threading contract (Enqueue = any thread, Drain = single consumer thread). 112 Core + 96 Core.Net tests green. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Streaming/LandblockStreamJob.cs | 9 +++ .../Streaming/LandblockStreamer.cs | 38 ++++++++++-- .../Streaming/LandblockStreamerTests.cs | 58 +++++++++++++++---- 3 files changed, 88 insertions(+), 17 deletions(-) diff --git a/src/AcDream.App/Streaming/LandblockStreamJob.cs b/src/AcDream.App/Streaming/LandblockStreamJob.cs index 7542c68..aff6500 100644 --- a/src/AcDream.App/Streaming/LandblockStreamJob.cs +++ b/src/AcDream.App/Streaming/LandblockStreamJob.cs @@ -25,4 +25,13 @@ public abstract record LandblockStreamResult(uint LandblockId) public sealed record Loaded(uint LandblockId, LoadedLandblock Landblock) : LandblockStreamResult(LandblockId); public sealed record Failed(uint LandblockId, string Error) : LandblockStreamResult(LandblockId); public sealed record Unloaded(uint LandblockId) : LandblockStreamResult(LandblockId); + + /// + /// The worker loop itself crashed with an unhandled exception. Not tied + /// to a specific landblock — distinguished from + /// because consumers typically route this to a fatal-log path rather + /// than retrying a single landblock later. LandblockId is 0 by + /// convention; readers should pattern-match on the type, not the id. + /// + public sealed record WorkerCrashed(string Error) : LandblockStreamResult(0); } diff --git a/src/AcDream.App/Streaming/LandblockStreamer.cs b/src/AcDream.App/Streaming/LandblockStreamer.cs index 7b42aed..65b2007 100644 --- a/src/AcDream.App/Streaming/LandblockStreamer.cs +++ b/src/AcDream.App/Streaming/LandblockStreamer.cs @@ -19,15 +19,28 @@ namespace AcDream.App.Streaming; /// record so the render thread can release GPU state on the next drain — /// the worker never touches GPU resources directly. /// +/// +/// +/// Threading: / may +/// be called from any thread; must be called +/// from a single consumer thread (the render thread in production). +/// /// public sealed class LandblockStreamer : IDisposable { + /// + /// Default drain batch size. Tuned to cap GPU upload work the render + /// thread does per frame while still draining a moderate backlog in a + /// few frames. Callers can override on a per-call basis. + /// + public const int DefaultDrainBatchSize = 4; + private readonly Func _loadLandblock; private readonly Channel _inbox; private readonly Channel _outbox; private readonly CancellationTokenSource _cancel = new(); private Thread? _worker; - private bool _disposed; + private int _disposed; public LandblockStreamer(Func loadLandblock) { @@ -44,6 +57,8 @@ public sealed class LandblockStreamer : IDisposable /// public void Start() { + if (System.Threading.Volatile.Read(ref _disposed) != 0) + throw new ObjectDisposedException(nameof(LandblockStreamer)); if (_worker is not null) return; _worker = new Thread(WorkerLoop) { @@ -55,11 +70,15 @@ public sealed class LandblockStreamer : IDisposable public void EnqueueLoad(uint landblockId) { + if (System.Threading.Volatile.Read(ref _disposed) != 0) + throw new ObjectDisposedException(nameof(LandblockStreamer)); _inbox.Writer.TryWrite(new LandblockStreamJob.Load(landblockId)); } public void EnqueueUnload(uint landblockId) { + if (System.Threading.Volatile.Read(ref _disposed) != 0) + throw new ObjectDisposedException(nameof(LandblockStreamer)); _inbox.Writer.TryWrite(new LandblockStreamJob.Unload(landblockId)); } @@ -67,7 +86,11 @@ public sealed class LandblockStreamer : IDisposable /// Drain up to completed results. /// Non-blocking. Call from the render thread once per OnUpdate. /// - public IReadOnlyList DrainCompletions(int maxBatchSize = 4) + /// + /// Must be called from a single consumer thread. The outbox channel is + /// configured with SingleReader = true and will throw on concurrent reads. + /// + public IReadOnlyList DrainCompletions(int maxBatchSize = DefaultDrainBatchSize) { var batch = new List(maxBatchSize); while (batch.Count < maxBatchSize && _outbox.Reader.TryRead(out var result)) @@ -84,6 +107,12 @@ public sealed class LandblockStreamer : IDisposable // is simpler and the thread is dedicated anyway. while (!_cancel.Token.IsCancellationRequested) { + // Safe to block: this is a dedicated worker thread with no + // SynchronizationContext, so .Result/.GetResult cannot deadlock + // against any captured continuation. Using the sync pattern + // here keeps the loop linear; an async-enumerable alternative + // would force WorkerLoop to be async Task and lose the + // simple thread-start shape. if (!_inbox.Reader.WaitToReadAsync(_cancel.Token).AsTask().GetAwaiter().GetResult()) break; @@ -99,7 +128,7 @@ public sealed class LandblockStreamer : IDisposable { // Last-ditch: surface via outbox so the caller at least sees // something. We never retry a crashed worker. - _outbox.Writer.TryWrite(new LandblockStreamResult.Failed(0, ex.ToString())); + _outbox.Writer.TryWrite(new LandblockStreamResult.WorkerCrashed(ex.ToString())); } finally { @@ -137,8 +166,7 @@ public sealed class LandblockStreamer : IDisposable public void Dispose() { - if (_disposed) return; - _disposed = true; + if (System.Threading.Interlocked.Exchange(ref _disposed, 1) != 0) return; _cancel.Cancel(); _inbox.Writer.TryComplete(); _worker?.Join(TimeSpan.FromSeconds(2)); diff --git a/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs b/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs index 3005b01..491ca6b 100644 --- a/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs +++ b/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs @@ -8,6 +8,10 @@ namespace AcDream.Core.Tests.Streaming; public class LandblockStreamerTests { + private const int SpinTimeoutMs = 2000; + private const int SpinStepMs = 10; + private const int SpinMaxIterations = SpinTimeoutMs / SpinStepMs; + [Fact] public async Task Load_FollowedByDrain_ReturnsLoadedRecord() { @@ -24,11 +28,11 @@ public class LandblockStreamerTests // Spin until the worker produces a completion, with a 2s timeout. LandblockStreamResult? result = null; - for (int i = 0; i < 200 && result is null; i++) + for (int i = 0; i < SpinMaxIterations && result is null; i++) { - var drained = streamer.DrainCompletions(maxBatchSize: 4); + var drained = streamer.DrainCompletions(maxBatchSize: LandblockStreamer.DefaultDrainBatchSize); if (drained.Count > 0) result = drained[0]; - else await Task.Delay(10); + else await Task.Delay(SpinStepMs); } Assert.NotNull(result); @@ -47,11 +51,11 @@ public class LandblockStreamerTests streamer.EnqueueLoad(0x12340000u); LandblockStreamResult? result = null; - for (int i = 0; i < 200 && result is null; i++) + for (int i = 0; i < SpinMaxIterations && result is null; i++) { - var drained = streamer.DrainCompletions(4); + var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); if (drained.Count > 0) result = drained[0]; - else await Task.Delay(10); + else await Task.Delay(SpinStepMs); } Assert.NotNull(result); @@ -68,11 +72,11 @@ public class LandblockStreamerTests streamer.EnqueueLoad(0x55550000u); LandblockStreamResult? result = null; - for (int i = 0; i < 200 && result is null; i++) + for (int i = 0; i < SpinMaxIterations && result is null; i++) { - var drained = streamer.DrainCompletions(4); + var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); if (drained.Count > 0) result = drained[0]; - else await Task.Delay(10); + else await Task.Delay(SpinStepMs); } var failed = Assert.IsType(result); @@ -88,14 +92,44 @@ public class LandblockStreamerTests streamer.EnqueueUnload(0xABCD0000u); LandblockStreamResult? result = null; - for (int i = 0; i < 200 && result is null; i++) + for (int i = 0; i < SpinMaxIterations && result is null; i++) { - var drained = streamer.DrainCompletions(4); + var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); if (drained.Count > 0) result = drained[0]; - else await Task.Delay(10); + else await Task.Delay(SpinStepMs); } var unloaded = Assert.IsType(result); Assert.Equal(0xABCD0000u, unloaded.LandblockId); } + + [Fact] + public async Task Load_ExecutesLoaderOnBackgroundThread() + { + int testThreadId = System.Environment.CurrentManagedThreadId; + int? loaderThreadId = null; + var stubLandblock = new LoadedLandblock( + 0x77770FFEu, + new LandBlock(), + System.Array.Empty()); + + using var streamer = new LandblockStreamer(loadLandblock: id => + { + loaderThreadId = System.Environment.CurrentManagedThreadId; + return stubLandblock; + }); + + streamer.Start(); + streamer.EnqueueLoad(0x77770FFEu); + + // Drain until we see the completion. + for (int i = 0; i < SpinMaxIterations && loaderThreadId is null; i++) + { + streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); + if (loaderThreadId is null) await Task.Delay(SpinStepMs); + } + + Assert.NotNull(loaderThreadId); + Assert.NotEqual(testThreadId, loaderThreadId.Value); + } }