using System.Threading.Tasks; using AcDream.App.Streaming; using AcDream.Core.World; using DatReaderWriter.DBObjs; using Xunit; namespace AcDream.Core.Tests.Streaming; public class LandblockStreamerTests { private const int SpinTimeoutMs = 2000; private const int SpinStepMs = 10; private const int SpinMaxIterations = SpinTimeoutMs / SpinStepMs; [Fact] public async Task Load_FollowedByDrain_ReturnsLoadedRecord() { var stubLandblock = new LoadedLandblock( 0xA9B4FFFEu, new LandBlock(), System.Array.Empty()); using var streamer = new LandblockStreamer( loadLandblock: id => id == 0xA9B4FFFEu ? stubLandblock : null); streamer.Start(); streamer.EnqueueLoad(0xA9B4FFFEu); // Spin until the worker produces a completion, with a 2s timeout. LandblockStreamResult? result = null; for (int i = 0; i < SpinMaxIterations && result is null; i++) { var drained = streamer.DrainCompletions(maxBatchSize: LandblockStreamer.DefaultDrainBatchSize); if (drained.Count > 0) result = drained[0]; else await Task.Delay(SpinStepMs); } Assert.NotNull(result); var loaded = Assert.IsType(result); Assert.Equal(0xA9B4FFFEu, loaded.LandblockId); Assert.Same(stubLandblock, loaded.Landblock); } [Fact] public async Task Load_WhenLoaderReturnsNull_ReportsFailed() { using var streamer = new LandblockStreamer( loadLandblock: _ => null); streamer.Start(); streamer.EnqueueLoad(0x12340000u); LandblockStreamResult? result = null; for (int i = 0; i < SpinMaxIterations && result is null; i++) { var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); if (drained.Count > 0) result = drained[0]; else await Task.Delay(SpinStepMs); } Assert.NotNull(result); Assert.IsType(result); } [Fact] public async Task Load_WhenLoaderThrows_ReportsFailedWithMessage() { using var streamer = new LandblockStreamer( loadLandblock: _ => throw new System.InvalidOperationException("boom")); streamer.Start(); streamer.EnqueueLoad(0x55550000u); LandblockStreamResult? result = null; for (int i = 0; i < SpinMaxIterations && result is null; i++) { var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); if (drained.Count > 0) result = drained[0]; else await Task.Delay(SpinStepMs); } var failed = Assert.IsType(result); Assert.Contains("boom", failed.Error); } [Fact] public async Task Unload_ProducesUnloadedResult() { using var streamer = new LandblockStreamer(loadLandblock: _ => null); streamer.Start(); streamer.EnqueueUnload(0xABCD0000u); LandblockStreamResult? result = null; for (int i = 0; i < SpinMaxIterations && result is null; i++) { var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); if (drained.Count > 0) result = drained[0]; else await Task.Delay(SpinStepMs); } var unloaded = Assert.IsType(result); Assert.Equal(0xABCD0000u, unloaded.LandblockId); } [Fact] public async Task Load_ExecutesLoaderOnBackgroundThread() { int testThreadId = System.Environment.CurrentManagedThreadId; int? loaderThreadId = null; var stubLandblock = new LoadedLandblock( 0x77770FFEu, new LandBlock(), System.Array.Empty()); using var streamer = new LandblockStreamer(loadLandblock: id => { loaderThreadId = System.Environment.CurrentManagedThreadId; return stubLandblock; }); streamer.Start(); streamer.EnqueueLoad(0x77770FFEu); // Drain until we see the completion. for (int i = 0; i < SpinMaxIterations && loaderThreadId is null; i++) { streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize); if (loaderThreadId is null) await Task.Delay(SpinStepMs); } Assert.NotNull(loaderThreadId); Assert.NotEqual(testThreadId, loaderThreadId.Value); } }