Code review on T10-T12 bundle (commits 0cf86bb/00bb030/0405947 + audit
fix 76e1a64) found 3 Important issues:
1. LandblockStreamer.Start() had an idempotency race — the XML doc
claimed thread-safety but the implementation checked _worker != null
before assigning, allowing two callers to both pass the check and
spawn duplicate worker threads. Fixed via Interlocked.CompareExchange.
2. No test verified the worker emits Failed when buildMeshOrNull returns
null. Added Load_WhenBuildMeshReturnsNull_ReportsFailed.
3. StreamingControllerTests.cs:81 used MeshData: default! when
constructing a Loaded result. If a future test flows MeshData
through the apply callback, the null reference would NRE rather
than producing a meaningful assertion failure. Replaced with a real
empty LandblockMeshData instance.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
227 lines
9.4 KiB
C#
227 lines
9.4 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Threading;
|
|
using System.Threading.Channels;
|
|
using System.Threading.Tasks;
|
|
using AcDream.Core.World;
|
|
|
|
namespace AcDream.App.Streaming;
|
|
|
|
/// <summary>
|
|
/// Services landblock load/unload requests by invoking caller-supplied
|
|
/// factory delegates (the production instance wraps
|
|
/// <see cref="LandblockLoader.Load"/> for loading and
|
|
/// <see cref="AcDream.Core.Terrain.LandblockMesh.Build"/> for the terrain
|
|
/// mesh) and posting results to an outbox the render thread drains once
|
|
/// per OnUpdate.
|
|
///
|
|
/// <para>
|
|
/// <b>Thread model (Phase A.5 T11+):</b> <see cref="Start"/> spawns a
|
|
/// dedicated background worker thread. <see cref="EnqueueLoad"/> and
|
|
/// <see cref="EnqueueUnload"/> write non-blocking to the inbox
|
|
/// <see cref="Channel{T}"/>; the worker drains it and posts
|
|
/// <see cref="LandblockStreamResult"/> records to the outbox.
|
|
/// </para>
|
|
///
|
|
/// <para>
|
|
/// <b>DatCollection thread safety</b> is provided by the caller:
|
|
/// GameWindow's <c>_datLock</c> (Phase A.5 T10) serialises all
|
|
/// <c>DatCollection.Get<T></c> calls. Both factory closures passed at
|
|
/// construction acquire that lock before reading dats. The worker never
|
|
/// touches <c>DatCollection</c> directly — it only calls the factories.
|
|
/// </para>
|
|
///
|
|
/// <para>
|
|
/// Unloads pass through the outbox as <see cref="LandblockStreamResult.Unloaded"/>
|
|
/// records so the render thread can release GPU state on the next drain —
|
|
/// the streamer never touches GPU resources directly.
|
|
/// </para>
|
|
///
|
|
/// <remarks>
|
|
/// Threading: <see cref="DrainCompletions"/> must be called from a single
|
|
/// consumer thread (the render thread in production). All other public
|
|
/// methods are thread-safe.
|
|
/// </remarks>
|
|
/// </summary>
|
|
public sealed class LandblockStreamer : IDisposable
|
|
{
|
|
/// <summary>
|
|
/// Default drain batch size. Tuned to cap GPU upload work the render
|
|
/// thread does per frame while still draining a moderate backlog in a
|
|
/// few frames. Callers can override on a per-call basis.
|
|
/// </summary>
|
|
public const int DefaultDrainBatchSize = 4;
|
|
|
|
private readonly Func<uint, LoadedLandblock?> _loadLandblock;
|
|
private readonly Func<uint, LoadedLandblock?, AcDream.Core.Terrain.LandblockMeshData?> _buildMeshOrNull;
|
|
private readonly Channel<LandblockStreamJob> _inbox;
|
|
private readonly Channel<LandblockStreamResult> _outbox;
|
|
private readonly CancellationTokenSource _cancel = new();
|
|
private Thread? _worker;
|
|
private int _disposed;
|
|
|
|
public LandblockStreamer(
|
|
Func<uint, LoadedLandblock?> loadLandblock,
|
|
Func<uint, LoadedLandblock?, AcDream.Core.Terrain.LandblockMeshData?>? buildMeshOrNull = null)
|
|
{
|
|
_loadLandblock = loadLandblock;
|
|
// Default: no mesh build (returns null → Failed result). Production
|
|
// wires in LandblockMesh.Build via the T12 construction site.
|
|
_buildMeshOrNull = buildMeshOrNull ?? ((_, _) => null);
|
|
_inbox = Channel.CreateUnbounded<LandblockStreamJob>(
|
|
new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
|
|
_outbox = Channel.CreateUnbounded<LandblockStreamResult>(
|
|
new UnboundedChannelOptions { SingleReader = true, SingleWriter = true });
|
|
}
|
|
|
|
/// <summary>
|
|
/// Activate the dedicated background worker thread. Idempotent and
|
|
/// thread-safe: concurrent callers will only spawn one worker; subsequent
|
|
/// calls are no-ops. Atomic via <see cref="Interlocked.CompareExchange{T}(ref T, T, T)"/>.
|
|
/// </summary>
|
|
public void Start()
|
|
{
|
|
if (System.Threading.Volatile.Read(ref _disposed) != 0)
|
|
throw new ObjectDisposedException(nameof(LandblockStreamer));
|
|
|
|
// A.5 T10-T12 follow-up: atomically install the worker so concurrent
|
|
// Start() callers don't both pass the null check and spawn duplicate
|
|
// threads. Construct the candidate; CAS it into _worker; if we lost
|
|
// the race, the candidate goes unstarted and is GCed.
|
|
var candidate = new Thread(WorkerLoop)
|
|
{
|
|
IsBackground = true,
|
|
Name = "acdream.streaming.worker",
|
|
};
|
|
if (Interlocked.CompareExchange(ref _worker, candidate, null) == null)
|
|
candidate.Start();
|
|
// else: another caller won the race; their thread is running.
|
|
}
|
|
|
|
/// <summary>
|
|
/// Non-blocking enqueue. The worker drains the inbox and posts a
|
|
/// <see cref="LandblockStreamResult.Loaded"/> (or
|
|
/// <see cref="LandblockStreamResult.Failed"/>) to the outbox.
|
|
/// </summary>
|
|
public void EnqueueLoad(uint landblockId, LandblockStreamJobKind kind = LandblockStreamJobKind.LoadNear)
|
|
{
|
|
if (System.Threading.Volatile.Read(ref _disposed) != 0)
|
|
throw new ObjectDisposedException(nameof(LandblockStreamer));
|
|
_inbox.Writer.TryWrite(new LandblockStreamJob.Load(landblockId, kind));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Non-blocking enqueue. The worker posts a
|
|
/// <see cref="LandblockStreamResult.Unloaded"/> to the outbox.
|
|
/// </summary>
|
|
public void EnqueueUnload(uint landblockId)
|
|
{
|
|
if (System.Threading.Volatile.Read(ref _disposed) != 0)
|
|
throw new ObjectDisposedException(nameof(LandblockStreamer));
|
|
_inbox.Writer.TryWrite(new LandblockStreamJob.Unload(landblockId));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Drain up to <paramref name="maxBatchSize"/> completed results.
|
|
/// Non-blocking. Call from the render thread once per OnUpdate.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Must be called from a single consumer thread. The outbox channel is
|
|
/// configured with SingleReader = true and will throw on concurrent reads.
|
|
/// </remarks>
|
|
public IReadOnlyList<LandblockStreamResult> DrainCompletions(int maxBatchSize = DefaultDrainBatchSize)
|
|
{
|
|
var batch = new List<LandblockStreamResult>(maxBatchSize);
|
|
while (batch.Count < maxBatchSize && _outbox.Reader.TryRead(out var result))
|
|
batch.Add(result);
|
|
return batch;
|
|
}
|
|
|
|
private void WorkerLoop()
|
|
{
|
|
try
|
|
{
|
|
// Safe to block: this is a dedicated worker thread with no
|
|
// SynchronizationContext, so .Result/.GetResult cannot deadlock
|
|
// against any captured continuation. Using the sync pattern
|
|
// here keeps the loop linear; an async-enumerable alternative
|
|
// would force WorkerLoop to be async Task and lose the
|
|
// simple thread-start shape.
|
|
while (!_cancel.Token.IsCancellationRequested)
|
|
{
|
|
if (!_inbox.Reader.WaitToReadAsync(_cancel.Token).AsTask().GetAwaiter().GetResult())
|
|
break;
|
|
|
|
while (_inbox.Reader.TryRead(out var job))
|
|
{
|
|
if (_cancel.Token.IsCancellationRequested) return;
|
|
HandleJob(job);
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException) { /* graceful shutdown */ }
|
|
catch (Exception ex)
|
|
{
|
|
// Last-ditch: surface via outbox so the caller at least sees
|
|
// something. We never retry a crashed worker.
|
|
_outbox.Writer.TryWrite(new LandblockStreamResult.WorkerCrashed(ex.ToString()));
|
|
}
|
|
finally
|
|
{
|
|
_outbox.Writer.TryComplete();
|
|
}
|
|
}
|
|
|
|
private void HandleJob(LandblockStreamJob job)
|
|
{
|
|
switch (job)
|
|
{
|
|
case LandblockStreamJob.Load load:
|
|
// TODO(A.5 T16): route by load.Kind. LoadFar will skip
|
|
// LandBlockInfo + scenery generation; PromoteToNear will skip
|
|
// mesh build (terrain already on GPU). Today every Kind takes
|
|
// the full-load path via _loadLandblock, which matches today's
|
|
// single-tier semantics.
|
|
try
|
|
{
|
|
var lb = _loadLandblock(load.LandblockId);
|
|
if (lb is null)
|
|
{
|
|
_outbox.Writer.TryWrite(new LandblockStreamResult.Failed(
|
|
load.LandblockId, "LandblockLoader.Load returned null"));
|
|
break;
|
|
}
|
|
var mesh = _buildMeshOrNull(load.LandblockId, lb);
|
|
if (mesh is null)
|
|
{
|
|
_outbox.Writer.TryWrite(new LandblockStreamResult.Failed(
|
|
load.LandblockId, "buildMeshOrNull returned null"));
|
|
break;
|
|
}
|
|
var tier = load.Kind == LandblockStreamJobKind.LoadFar
|
|
? LandblockStreamTier.Far : LandblockStreamTier.Near;
|
|
_outbox.Writer.TryWrite(new LandblockStreamResult.Loaded(
|
|
load.LandblockId, tier, lb, mesh));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_outbox.Writer.TryWrite(new LandblockStreamResult.Failed(
|
|
load.LandblockId, ex.ToString()));
|
|
}
|
|
break;
|
|
|
|
case LandblockStreamJob.Unload unload:
|
|
_outbox.Writer.TryWrite(new LandblockStreamResult.Unloaded(unload.LandblockId));
|
|
break;
|
|
}
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (System.Threading.Interlocked.Exchange(ref _disposed, 1) != 0) return;
|
|
_cancel.Cancel();
|
|
_inbox.Writer.TryComplete();
|
|
_worker?.Join(TimeSpan.FromSeconds(2));
|
|
_cancel.Dispose();
|
|
}
|
|
}
|