diff --git a/src/AcDream.App/Streaming/LandblockStreamJob.cs b/src/AcDream.App/Streaming/LandblockStreamJob.cs index 7542c68..aff6500 100644 --- a/src/AcDream.App/Streaming/LandblockStreamJob.cs +++ b/src/AcDream.App/Streaming/LandblockStreamJob.cs @@ -25,4 +25,13 @@ public abstract record LandblockStreamResult(uint LandblockId) public sealed record Loaded(uint LandblockId, LoadedLandblock Landblock) : LandblockStreamResult(LandblockId); public sealed record Failed(uint LandblockId, string Error) : LandblockStreamResult(LandblockId); public sealed record Unloaded(uint LandblockId) : LandblockStreamResult(LandblockId); + + /// + /// The worker loop itself crashed with an unhandled exception. Not tied + /// to a specific landblock — distinguished from + /// because consumers typically route this to a fatal-log path rather + /// than retrying a single landblock later. LandblockId is 0 by + /// convention; readers should pattern-match on the type, not the id. + /// + public sealed record WorkerCrashed(string Error) : LandblockStreamResult(0); } diff --git a/src/AcDream.App/Streaming/LandblockStreamer.cs b/src/AcDream.App/Streaming/LandblockStreamer.cs index 7b42aed..65b2007 100644 --- a/src/AcDream.App/Streaming/LandblockStreamer.cs +++ b/src/AcDream.App/Streaming/LandblockStreamer.cs @@ -19,15 +19,28 @@ namespace AcDream.App.Streaming; /// record so the render thread can release GPU state on the next drain — /// the worker never touches GPU resources directly. /// +/// +/// +/// Threading: / may +/// be called from any thread; must be called +/// from a single consumer thread (the render thread in production). +/// /// public sealed class LandblockStreamer : IDisposable { + /// + /// Default drain batch size. Tuned to cap GPU upload work the render + /// thread does per frame while still draining a moderate backlog in a + /// few frames. Callers can override on a per-call basis. + /// + public const int DefaultDrainBatchSize = 4; + private readonly Func _loadLandblock; private readonly Channel _inbox; private readonly Channel _outbox; private readonly CancellationTokenSource _cancel = new(); private Thread? _worker; - private bool _disposed; + private int _disposed; public LandblockStreamer(Func loadLandblock) { @@ -44,6 +57,8 @@ public sealed class LandblockStreamer : IDisposable /// public void Start() { + if (System.Threading.Volatile.Read(ref _disposed) != 0) + throw new ObjectDisposedException(nameof(LandblockStreamer)); if (_worker is not null) return; _worker = new Thread(WorkerLoop) { @@ -55,11 +70,15 @@ public sealed class LandblockStreamer : IDisposable public void EnqueueLoad(uint landblockId) { + if (System.Threading.Volatile.Read(ref _disposed) != 0) + throw new ObjectDisposedException(nameof(LandblockStreamer)); _inbox.Writer.TryWrite(new LandblockStreamJob.Load(landblockId)); } public void EnqueueUnload(uint landblockId) { + if (System.Threading.Volatile.Read(ref _disposed) != 0) + throw new ObjectDisposedException(nameof(LandblockStreamer)); _inbox.Writer.TryWrite(new LandblockStreamJob.Unload(landblockId)); } @@ -67,7 +86,11 @@ public sealed class LandblockStreamer : IDisposable /// Drain up to completed results. /// Non-blocking. Call from the render thread once per OnUpdate. /// - public IReadOnlyList DrainCompletions(int maxBatchSize = 4) + /// + /// Must be called from a single consumer thread. The outbox channel is + /// configured with SingleReader = true and will throw on concurrent reads. + /// + public IReadOnlyList DrainCompletions(int maxBatchSize = DefaultDrainBatchSize) { var batch = new List(maxBatchSize); while (batch.Count < maxBatchSize && _outbox.Reader.TryRead(out var result)) @@ -84,6 +107,12 @@ public sealed class LandblockStreamer : IDisposable // is simpler and the thread is dedicated anyway. while (!_cancel.Token.IsCancellationRequested) { + // Safe to block: this is a dedicated worker thread with no + // SynchronizationContext, so .Result/.GetResult cannot deadlock + // against any captured continuation. Using the sync pattern + // here keeps the loop linear; an async-enumerable alternative + // would force WorkerLoop to be async Task and lose the + // simple thread-start shape. if (!_inbox.Reader.WaitToReadAsync(_cancel.Token).AsTask().GetAwaiter().GetResult()) break; @@ -99,7 +128,7 @@ public sealed class LandblockStreamer : IDisposable { // Last-ditch: surface via outbox so the caller at least sees // something. We never retry a crashed worker. - _outbox.Writer.TryWrite(new LandblockStreamResult.Failed(0, ex.ToString())); + _outbox.Writer.TryWrite(new LandblockStreamResult.WorkerCrashed(ex.ToString())); } finally { @@ -137,8 +166,7 @@ public sealed class LandblockStreamer : IDisposable public void Dispose() { - if (_disposed) return; - _disposed = true; + if (System.Threading.Interlocked.Exchange(ref _disposed, 1) != 0) return; _cancel.Cancel(); _inbox.Writer.TryComplete(); _worker?.Join(TimeSpan.FromSeconds(2)); diff --git a/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs b/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs index 3005b01..491ca6b 100644 --- a/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs +++ b/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs @@ -8,6 +8,10 @@ namespace AcDream.Core.Tests.Streaming; public class LandblockStreamerTests { + private const int SpinTimeoutMs = 2000; + private const int SpinStepMs = 10; + private const int SpinMaxIterations = SpinTimeoutMs / SpinStepMs; + [Fact] public async Task Load_FollowedByDrain_ReturnsLoadedRecord() { @@ -24,11 +28,11 @@ public class LandblockStreamerTests // Spin until the worker produces a completion, with a 2s timeout. LandblockStreamResult? result = null; - for (int i = 0; i < 200 && result is null; i++) + for (int i = 0; i < SpinMaxIterations && result is null; i++) { - var drained = streamer.DrainCompletions(maxBatchSize: 4); + var drained = streamer.DrainCompletions(maxBatchSize: LandblockStreamer.DefaultDrainBatchSize); if (drained.Count > 0) result = drained[0]; - else await Task.Delay(10); + else await Task.Delay(SpinStepMs); } Assert.NotNull(result); @@ -47,11 +51,11 @@ public class LandblockStreamerTests streamer.EnqueueLoad(0x12340000u); LandblockStreamResult? result = null; - for (int i = 0; i < 200 && result is null; i++) + for (int i = 0; i < SpinMaxIterations && result is null; i++) { - var drained = streamer.DrainCompletions(4); + var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); if (drained.Count > 0) result = drained[0]; - else await Task.Delay(10); + else await Task.Delay(SpinStepMs); } Assert.NotNull(result); @@ -68,11 +72,11 @@ public class LandblockStreamerTests streamer.EnqueueLoad(0x55550000u); LandblockStreamResult? result = null; - for (int i = 0; i < 200 && result is null; i++) + for (int i = 0; i < SpinMaxIterations && result is null; i++) { - var drained = streamer.DrainCompletions(4); + var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); if (drained.Count > 0) result = drained[0]; - else await Task.Delay(10); + else await Task.Delay(SpinStepMs); } var failed = Assert.IsType(result); @@ -88,14 +92,44 @@ public class LandblockStreamerTests streamer.EnqueueUnload(0xABCD0000u); LandblockStreamResult? result = null; - for (int i = 0; i < 200 && result is null; i++) + for (int i = 0; i < SpinMaxIterations && result is null; i++) { - var drained = streamer.DrainCompletions(4); + var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); if (drained.Count > 0) result = drained[0]; - else await Task.Delay(10); + else await Task.Delay(SpinStepMs); } var unloaded = Assert.IsType(result); Assert.Equal(0xABCD0000u, unloaded.LandblockId); } + + [Fact] + public async Task Load_ExecutesLoaderOnBackgroundThread() + { + int testThreadId = System.Environment.CurrentManagedThreadId; + int? loaderThreadId = null; + var stubLandblock = new LoadedLandblock( + 0x77770FFEu, + new LandBlock(), + System.Array.Empty()); + + using var streamer = new LandblockStreamer(loadLandblock: id => + { + loaderThreadId = System.Environment.CurrentManagedThreadId; + return stubLandblock; + }); + + streamer.Start(); + streamer.EnqueueLoad(0x77770FFEu); + + // Drain until we see the completion. + for (int i = 0; i < SpinMaxIterations && loaderThreadId is null; i++) + { + streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); + if (loaderThreadId is null) await Task.Delay(SpinStepMs); + } + + Assert.NotNull(loaderThreadId); + Assert.NotEqual(testThreadId, loaderThreadId.Value); + } }