feat(app): Phase A.1 — LandblockStreamer (background worker + channels)
Background thread pulls load/unload jobs from an inbox channel, invokes a caller-supplied Func<uint, LoadedLandblock?> (production wraps LandblockLoader.Load, tests inject a fake), and posts results to an outbox channel the render thread drains. Graceful shutdown via CancellationToken; failed loads reported rather than retried. 4 new tests, all green. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
9d1c2c45e5
commit
0904372af6
2 changed files with 248 additions and 0 deletions
147
src/AcDream.App/Streaming/LandblockStreamer.cs
Normal file
147
src/AcDream.App/Streaming/LandblockStreamer.cs
Normal file
|
|
@ -0,0 +1,147 @@
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Channels;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using AcDream.Core.World;
|
||||||
|
|
||||||
|
namespace AcDream.App.Streaming;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Background worker that services landblock load and unload requests off
|
||||||
|
/// the render thread. Loads are executed on a dedicated thread via a
|
||||||
|
/// caller-supplied delegate (the production instance wraps
|
||||||
|
/// <see cref="LandblockLoader.Load"/>); completed results are posted to
|
||||||
|
/// an outbox channel the render thread drains once per OnUpdate.
|
||||||
|
///
|
||||||
|
/// <para>
|
||||||
|
/// Unloads are passed through the same channel as a <see cref="LandblockStreamResult.Unloaded"/>
|
||||||
|
/// record so the render thread can release GPU state on the next drain —
|
||||||
|
/// the worker never touches GPU resources directly.
|
||||||
|
/// </para>
|
||||||
|
/// </summary>
|
||||||
|
public sealed class LandblockStreamer : IDisposable
|
||||||
|
{
|
||||||
|
private readonly Func<uint, LoadedLandblock?> _loadLandblock;
|
||||||
|
private readonly Channel<LandblockStreamJob> _inbox;
|
||||||
|
private readonly Channel<LandblockStreamResult> _outbox;
|
||||||
|
private readonly CancellationTokenSource _cancel = new();
|
||||||
|
private Thread? _worker;
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
|
public LandblockStreamer(Func<uint, LoadedLandblock?> loadLandblock)
|
||||||
|
{
|
||||||
|
_loadLandblock = loadLandblock;
|
||||||
|
_inbox = Channel.CreateUnbounded<LandblockStreamJob>(
|
||||||
|
new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
|
||||||
|
_outbox = Channel.CreateUnbounded<LandblockStreamResult>(
|
||||||
|
new UnboundedChannelOptions { SingleReader = true, SingleWriter = true });
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Start the worker thread. Must be called before enqueueing jobs.
|
||||||
|
/// Calling twice is a no-op.
|
||||||
|
/// </summary>
|
||||||
|
public void Start()
|
||||||
|
{
|
||||||
|
if (_worker is not null) return;
|
||||||
|
_worker = new Thread(WorkerLoop)
|
||||||
|
{
|
||||||
|
IsBackground = true,
|
||||||
|
Name = "acdream.landblock-streamer",
|
||||||
|
};
|
||||||
|
_worker.Start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void EnqueueLoad(uint landblockId)
|
||||||
|
{
|
||||||
|
_inbox.Writer.TryWrite(new LandblockStreamJob.Load(landblockId));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void EnqueueUnload(uint landblockId)
|
||||||
|
{
|
||||||
|
_inbox.Writer.TryWrite(new LandblockStreamJob.Unload(landblockId));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Drain up to <paramref name="maxBatchSize"/> completed results.
|
||||||
|
/// Non-blocking. Call from the render thread once per OnUpdate.
|
||||||
|
/// </summary>
|
||||||
|
public IReadOnlyList<LandblockStreamResult> DrainCompletions(int maxBatchSize = 4)
|
||||||
|
{
|
||||||
|
var batch = new List<LandblockStreamResult>(maxBatchSize);
|
||||||
|
while (batch.Count < maxBatchSize && _outbox.Reader.TryRead(out var result))
|
||||||
|
batch.Add(result);
|
||||||
|
return batch;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void WorkerLoop()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Synchronous read loop via .WaitToReadAsync + ReadAllAsync
|
||||||
|
// would be idiomatic but requires async; the blocking reader
|
||||||
|
// is simpler and the thread is dedicated anyway.
|
||||||
|
while (!_cancel.Token.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
if (!_inbox.Reader.WaitToReadAsync(_cancel.Token).AsTask().GetAwaiter().GetResult())
|
||||||
|
break;
|
||||||
|
|
||||||
|
while (_inbox.Reader.TryRead(out var job))
|
||||||
|
{
|
||||||
|
if (_cancel.Token.IsCancellationRequested) return;
|
||||||
|
HandleJob(job);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { /* graceful shutdown */ }
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
// Last-ditch: surface via outbox so the caller at least sees
|
||||||
|
// something. We never retry a crashed worker.
|
||||||
|
_outbox.Writer.TryWrite(new LandblockStreamResult.Failed(0, ex.ToString()));
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_outbox.Writer.TryComplete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void HandleJob(LandblockStreamJob job)
|
||||||
|
{
|
||||||
|
switch (job)
|
||||||
|
{
|
||||||
|
case LandblockStreamJob.Load load:
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var lb = _loadLandblock(load.LandblockId);
|
||||||
|
if (lb is null)
|
||||||
|
_outbox.Writer.TryWrite(new LandblockStreamResult.Failed(
|
||||||
|
load.LandblockId, "LandblockLoader.Load returned null"));
|
||||||
|
else
|
||||||
|
_outbox.Writer.TryWrite(new LandblockStreamResult.Loaded(
|
||||||
|
load.LandblockId, lb));
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_outbox.Writer.TryWrite(new LandblockStreamResult.Failed(
|
||||||
|
load.LandblockId, ex.ToString()));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case LandblockStreamJob.Unload unload:
|
||||||
|
_outbox.Writer.TryWrite(new LandblockStreamResult.Unloaded(unload.LandblockId));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (_disposed) return;
|
||||||
|
_disposed = true;
|
||||||
|
_cancel.Cancel();
|
||||||
|
_inbox.Writer.TryComplete();
|
||||||
|
_worker?.Join(TimeSpan.FromSeconds(2));
|
||||||
|
_cancel.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
101
tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs
Normal file
101
tests/AcDream.Core.Tests/Streaming/LandblockStreamerTests.cs
Normal file
|
|
@ -0,0 +1,101 @@
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using AcDream.App.Streaming;
|
||||||
|
using AcDream.Core.World;
|
||||||
|
using DatReaderWriter.DBObjs;
|
||||||
|
using Xunit;
|
||||||
|
|
||||||
|
namespace AcDream.Core.Tests.Streaming;
|
||||||
|
|
||||||
|
public class LandblockStreamerTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Load_FollowedByDrain_ReturnsLoadedRecord()
|
||||||
|
{
|
||||||
|
var stubLandblock = new LoadedLandblock(
|
||||||
|
0xA9B4FFFEu,
|
||||||
|
new LandBlock(),
|
||||||
|
System.Array.Empty<WorldEntity>());
|
||||||
|
|
||||||
|
using var streamer = new LandblockStreamer(
|
||||||
|
loadLandblock: id => id == 0xA9B4FFFEu ? stubLandblock : null);
|
||||||
|
|
||||||
|
streamer.Start();
|
||||||
|
streamer.EnqueueLoad(0xA9B4FFFEu);
|
||||||
|
|
||||||
|
// Spin until the worker produces a completion, with a 2s timeout.
|
||||||
|
LandblockStreamResult? result = null;
|
||||||
|
for (int i = 0; i < 200 && result is null; i++)
|
||||||
|
{
|
||||||
|
var drained = streamer.DrainCompletions(maxBatchSize: 4);
|
||||||
|
if (drained.Count > 0) result = drained[0];
|
||||||
|
else await Task.Delay(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.NotNull(result);
|
||||||
|
var loaded = Assert.IsType<LandblockStreamResult.Loaded>(result);
|
||||||
|
Assert.Equal(0xA9B4FFFEu, loaded.LandblockId);
|
||||||
|
Assert.Same(stubLandblock, loaded.Landblock);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Load_WhenLoaderReturnsNull_ReportsFailed()
|
||||||
|
{
|
||||||
|
using var streamer = new LandblockStreamer(
|
||||||
|
loadLandblock: _ => null);
|
||||||
|
|
||||||
|
streamer.Start();
|
||||||
|
streamer.EnqueueLoad(0x12340000u);
|
||||||
|
|
||||||
|
LandblockStreamResult? result = null;
|
||||||
|
for (int i = 0; i < 200 && result is null; i++)
|
||||||
|
{
|
||||||
|
var drained = streamer.DrainCompletions(4);
|
||||||
|
if (drained.Count > 0) result = drained[0];
|
||||||
|
else await Task.Delay(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.NotNull(result);
|
||||||
|
Assert.IsType<LandblockStreamResult.Failed>(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Load_WhenLoaderThrows_ReportsFailedWithMessage()
|
||||||
|
{
|
||||||
|
using var streamer = new LandblockStreamer(
|
||||||
|
loadLandblock: _ => throw new System.InvalidOperationException("boom"));
|
||||||
|
|
||||||
|
streamer.Start();
|
||||||
|
streamer.EnqueueLoad(0x55550000u);
|
||||||
|
|
||||||
|
LandblockStreamResult? result = null;
|
||||||
|
for (int i = 0; i < 200 && result is null; i++)
|
||||||
|
{
|
||||||
|
var drained = streamer.DrainCompletions(4);
|
||||||
|
if (drained.Count > 0) result = drained[0];
|
||||||
|
else await Task.Delay(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
var failed = Assert.IsType<LandblockStreamResult.Failed>(result);
|
||||||
|
Assert.Contains("boom", failed.Error);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Unload_ProducesUnloadedResult()
|
||||||
|
{
|
||||||
|
using var streamer = new LandblockStreamer(loadLandblock: _ => null);
|
||||||
|
|
||||||
|
streamer.Start();
|
||||||
|
streamer.EnqueueUnload(0xABCD0000u);
|
||||||
|
|
||||||
|
LandblockStreamResult? result = null;
|
||||||
|
for (int i = 0; i < 200 && result is null; i++)
|
||||||
|
{
|
||||||
|
var drained = streamer.DrainCompletions(4);
|
||||||
|
if (drained.Count > 0) result = drained[0];
|
||||||
|
else await Task.Delay(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
var unloaded = Assert.IsType<LandblockStreamResult.Unloaded>(result);
|
||||||
|
Assert.Equal(0xABCD0000u, unloaded.LandblockId);
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue