diff --git a/src/AcDream.App/Streaming/LandblockStreamer.cs b/src/AcDream.App/Streaming/LandblockStreamer.cs new file mode 100644 index 0000000..7b42aed --- /dev/null +++ b/src/AcDream.App/Streaming/LandblockStreamer.cs @@ -0,0 +1,147 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using AcDream.Core.World; + +namespace AcDream.App.Streaming; + +/// +/// Background worker that services landblock load and unload requests off +/// the render thread. Loads are executed on a dedicated thread via a +/// caller-supplied delegate (the production instance wraps +/// ); completed results are posted to +/// an outbox channel the render thread drains once per OnUpdate. +/// +/// +/// Unloads are passed through the same channel as a +/// record so the render thread can release GPU state on the next drain — +/// the worker never touches GPU resources directly. +/// +/// +public sealed class LandblockStreamer : IDisposable +{ + private readonly Func _loadLandblock; + private readonly Channel _inbox; + private readonly Channel _outbox; + private readonly CancellationTokenSource _cancel = new(); + private Thread? _worker; + private bool _disposed; + + public LandblockStreamer(Func loadLandblock) + { + _loadLandblock = loadLandblock; + _inbox = Channel.CreateUnbounded( + new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); + _outbox = Channel.CreateUnbounded( + new UnboundedChannelOptions { SingleReader = true, SingleWriter = true }); + } + + /// + /// Start the worker thread. Must be called before enqueueing jobs. + /// Calling twice is a no-op. + /// + public void Start() + { + if (_worker is not null) return; + _worker = new Thread(WorkerLoop) + { + IsBackground = true, + Name = "acdream.landblock-streamer", + }; + _worker.Start(); + } + + public void EnqueueLoad(uint landblockId) + { + _inbox.Writer.TryWrite(new LandblockStreamJob.Load(landblockId)); + } + + public void EnqueueUnload(uint landblockId) + { + _inbox.Writer.TryWrite(new LandblockStreamJob.Unload(landblockId)); + } + + /// + /// Drain up to completed results. + /// Non-blocking. Call from the render thread once per OnUpdate. + /// + public IReadOnlyList DrainCompletions(int maxBatchSize = 4) + { + var batch = new List(maxBatchSize); + while (batch.Count < maxBatchSize && _outbox.Reader.TryRead(out var result)) + batch.Add(result); + return batch; + } + + private void WorkerLoop() + { + try + { + // Synchronous read loop via .WaitToReadAsync + ReadAllAsync + // would be idiomatic but requires async; the blocking reader + // is simpler and the thread is dedicated anyway. + while (!_cancel.Token.IsCancellationRequested) + { + if (!_inbox.Reader.WaitToReadAsync(_cancel.Token).AsTask().GetAwaiter().GetResult()) + break; + + while (_inbox.Reader.TryRead(out var job)) + { + if (_cancel.Token.IsCancellationRequested) return; + HandleJob(job); + } + } + } + catch (OperationCanceledException) { /* graceful shutdown */ } + catch (Exception ex) + { + // Last-ditch: surface via outbox so the caller at least sees + // something. We never retry a crashed worker. + _outbox.Writer.TryWrite(new LandblockStreamResult.Failed(0, ex.ToString())); + } + finally + { + _outbox.Writer.TryComplete(); + } + } + + private void HandleJob(LandblockStreamJob job) + { + switch (job) + { + case LandblockStreamJob.Load load: + try + { + var lb = _loadLandblock(load.LandblockId); + if (lb is null) + _outbox.Writer.TryWrite(new LandblockStreamResult.Failed( + load.LandblockId, "LandblockLoader.Load returned null")); + else + _outbox.Writer.TryWrite(new LandblockStreamResult.Loaded( + load.LandblockId, lb)); + } + catch (Exception ex) + { + _outbox.Writer.TryWrite(new LandblockStreamResult.Failed( + load.LandblockId, ex.ToString())); + } + break; + + case LandblockStreamJob.Unload unload: + _outbox.Writer.TryWrite(new LandblockStreamResult.Unloaded(unload.LandblockId)); + break; + } + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + _cancel.Cancel(); + _inbox.Writer.TryComplete(); + _worker?.Join(TimeSpan.FromSeconds(2)); + _cancel.Dispose(); + } +} diff --git a/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs b/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs new file mode 100644 index 0000000..3005b01 --- /dev/null +++ b/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs @@ -0,0 +1,101 @@ +using System.Threading.Tasks; +using AcDream.App.Streaming; +using AcDream.Core.World; +using DatReaderWriter.DBObjs; +using Xunit; + +namespace AcDream.Core.Tests.Streaming; + +public class LandblockStreamerTests +{ + [Fact] + public async Task Load_FollowedByDrain_ReturnsLoadedRecord() + { + var stubLandblock = new LoadedLandblock( + 0xA9B4FFFEu, + new LandBlock(), + System.Array.Empty()); + + using var streamer = new LandblockStreamer( + loadLandblock: id => id == 0xA9B4FFFEu ? stubLandblock : null); + + streamer.Start(); + streamer.EnqueueLoad(0xA9B4FFFEu); + + // Spin until the worker produces a completion, with a 2s timeout. + LandblockStreamResult? result = null; + for (int i = 0; i < 200 && result is null; i++) + { + var drained = streamer.DrainCompletions(maxBatchSize: 4); + if (drained.Count > 0) result = drained[0]; + else await Task.Delay(10); + } + + Assert.NotNull(result); + var loaded = Assert.IsType(result); + Assert.Equal(0xA9B4FFFEu, loaded.LandblockId); + Assert.Same(stubLandblock, loaded.Landblock); + } + + [Fact] + public async Task Load_WhenLoaderReturnsNull_ReportsFailed() + { + using var streamer = new LandblockStreamer( + loadLandblock: _ => null); + + streamer.Start(); + streamer.EnqueueLoad(0x12340000u); + + LandblockStreamResult? result = null; + for (int i = 0; i < 200 && result is null; i++) + { + var drained = streamer.DrainCompletions(4); + if (drained.Count > 0) result = drained[0]; + else await Task.Delay(10); + } + + Assert.NotNull(result); + Assert.IsType(result); + } + + [Fact] + public async Task Load_WhenLoaderThrows_ReportsFailedWithMessage() + { + using var streamer = new LandblockStreamer( + loadLandblock: _ => throw new System.InvalidOperationException("boom")); + + streamer.Start(); + streamer.EnqueueLoad(0x55550000u); + + LandblockStreamResult? result = null; + for (int i = 0; i < 200 && result is null; i++) + { + var drained = streamer.DrainCompletions(4); + if (drained.Count > 0) result = drained[0]; + else await Task.Delay(10); + } + + var failed = Assert.IsType(result); + Assert.Contains("boom", failed.Error); + } + + [Fact] + public async Task Unload_ProducesUnloadedResult() + { + using var streamer = new LandblockStreamer(loadLandblock: _ => null); + + streamer.Start(); + streamer.EnqueueUnload(0xABCD0000u); + + LandblockStreamResult? result = null; + for (int i = 0; i < 200 && result is null; i++) + { + var drained = streamer.DrainCompletions(4); + if (drained.Count > 0) result = drained[0]; + else await Task.Delay(10); + } + + var unloaded = Assert.IsType(result); + Assert.Equal(0xABCD0000u, unloaded.LandblockId); + } +}