acdream/tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs
Erik c5e207a51f fix(app): Phase A.1 — LandblockStreamer lifecycle + threading hardening
Code review follow-up to commit 0904372. Five Important fixes plus
three Minor polish items found by the reviewer before StreamingController
depends on this class under churn.

I1: Dispose is now thread-safe via Interlocked.Exchange on an int
    guard. Two concurrent Dispose calls no longer double-dispose the
    CancellationTokenSource.

I2: EnqueueLoad/EnqueueUnload now throw ObjectDisposedException when
    called after Dispose instead of silently dropping the job. Jobs
    vanishing into a completed channel was a debugging hazard.

I3: Start throws ObjectDisposedException when called after Dispose
    instead of silently doing nothing (the old guard only checked
    whether the thread was non-null, not whether the streamer was
    still usable).

I4: New test Load_ExecutesLoaderOnBackgroundThread captures the
    loader delegate's ManagedThreadId and asserts it differs from
    the test thread's id, proving the whole reason this class
    exists (off-thread execution) is actually happening.

I5: New LandblockStreamResult.WorkerCrashed record type for the
    outer catch in WorkerLoop. Previously the crash path wrote
    Failed(0, ex.ToString()) which collided with landblock (0, 0)
    in the north ocean, making "worker crashed" indistinguishable
    from "landblock 0 failed to load".

Minor polish:
- M1: Test spin constants (SpinTimeoutMs, SpinStepMs,
  SpinMaxIterations) extracted so the 200 x 10ms pattern has one
  source of truth.
- M2: DefaultDrainBatchSize public const on LandblockStreamer so
  the batch cap has a name and a comment explaining why 4.
- M3: Safety-argument comment on the sync-over-async
  WaitToReadAsync call explaining why it cannot deadlock (dedicated
  thread, no SyncContext).
- M6: XML remarks on the class and on DrainCompletions documenting
  threading contract (Enqueue = any thread, Drain = single consumer
  thread).

112 Core + 96 Core.Net tests green.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 22:20:41 +02:00

135 lines
4.5 KiB
C#

using System.Threading.Tasks;
using AcDream.App.Streaming;
using AcDream.Core.World;
using DatReaderWriter.DBObjs;
using Xunit;
namespace AcDream.Core.Tests.Streaming;
public class LandblockStreamerTests
{
private const int SpinTimeoutMs = 2000;
private const int SpinStepMs = 10;
private const int SpinMaxIterations = SpinTimeoutMs / SpinStepMs;
[Fact]
public async Task Load_FollowedByDrain_ReturnsLoadedRecord()
{
var stubLandblock = new LoadedLandblock(
0xA9B4FFFEu,
new LandBlock(),
System.Array.Empty<WorldEntity>());
using var streamer = new LandblockStreamer(
loadLandblock: id => id == 0xA9B4FFFEu ? stubLandblock : null);
streamer.Start();
streamer.EnqueueLoad(0xA9B4FFFEu);
// Spin until the worker produces a completion, with a 2s timeout.
LandblockStreamResult? result = null;
for (int i = 0; i < SpinMaxIterations && result is null; i++)
{
var drained = streamer.DrainCompletions(maxBatchSize: LandblockStreamer.DefaultDrainBatchSize);
if (drained.Count > 0) result = drained[0];
else await Task.Delay(SpinStepMs);
}
Assert.NotNull(result);
var loaded = Assert.IsType<LandblockStreamResult.Loaded>(result);
Assert.Equal(0xA9B4FFFEu, loaded.LandblockId);
Assert.Same(stubLandblock, loaded.Landblock);
}
[Fact]
public async Task Load_WhenLoaderReturnsNull_ReportsFailed()
{
using var streamer = new LandblockStreamer(
loadLandblock: _ => null);
streamer.Start();
streamer.EnqueueLoad(0x12340000u);
LandblockStreamResult? result = null;
for (int i = 0; i < SpinMaxIterations && result is null; i++)
{
var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize);
if (drained.Count > 0) result = drained[0];
else await Task.Delay(SpinStepMs);
}
Assert.NotNull(result);
Assert.IsType<LandblockStreamResult.Failed>(result);
}
[Fact]
public async Task Load_WhenLoaderThrows_ReportsFailedWithMessage()
{
using var streamer = new LandblockStreamer(
loadLandblock: _ => throw new System.InvalidOperationException("boom"));
streamer.Start();
streamer.EnqueueLoad(0x55550000u);
LandblockStreamResult? result = null;
for (int i = 0; i < SpinMaxIterations && result is null; i++)
{
var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize);
if (drained.Count > 0) result = drained[0];
else await Task.Delay(SpinStepMs);
}
var failed = Assert.IsType<LandblockStreamResult.Failed>(result);
Assert.Contains("boom", failed.Error);
}
[Fact]
public async Task Unload_ProducesUnloadedResult()
{
using var streamer = new LandblockStreamer(loadLandblock: _ => null);
streamer.Start();
streamer.EnqueueUnload(0xABCD0000u);
LandblockStreamResult? result = null;
for (int i = 0; i < SpinMaxIterations && result is null; i++)
{
var drained = streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize);
if (drained.Count > 0) result = drained[0];
else await Task.Delay(SpinStepMs);
}
var unloaded = Assert.IsType<LandblockStreamResult.Unloaded>(result);
Assert.Equal(0xABCD0000u, unloaded.LandblockId);
}
[Fact]
public async Task Load_ExecutesLoaderOnBackgroundThread()
{
int testThreadId = System.Environment.CurrentManagedThreadId;
int? loaderThreadId = null;
var stubLandblock = new LoadedLandblock(
0x77770FFEu,
new LandBlock(),
System.Array.Empty<WorldEntity>());
using var streamer = new LandblockStreamer(loadLandblock: id =>
{
loaderThreadId = System.Environment.CurrentManagedThreadId;
return stubLandblock;
});
streamer.Start();
streamer.EnqueueLoad(0x77770FFEu);
// Drain until we see the completion.
for (int i = 0; i < SpinMaxIterations && loaderThreadId is null; i++)
{
streamer.DrainCompletions(LandblockStreamer.DefaultDrainBatchSize);
if (loaderThreadId is null) await Task.Delay(SpinStepMs);
}
Assert.NotNull(loaderThreadId);
Assert.NotEqual(testThreadId, loaderThreadId.Value);
}
}