using System; using System.Collections.Generic; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using AcDream.Core.World; namespace AcDream.App.Streaming; /// /// Services landblock load/unload requests by invoking caller-supplied /// factory delegates (the production instance wraps /// for loading and /// for the terrain /// mesh) and posting results to an outbox the render thread drains once /// per OnUpdate. /// /// /// Thread model (Phase A.5 T11+): spawns a /// dedicated background worker thread. and /// write non-blocking to the inbox /// ; the worker drains it and posts /// records to the outbox. /// /// /// /// DatCollection thread safety is provided by the caller: /// GameWindow's _datLock (Phase A.5 T10) serialises all /// DatCollection.Get<T> calls. Both factory closures passed at /// construction acquire that lock before reading dats. The worker never /// touches DatCollection directly — it only calls the factories. /// /// /// /// Unloads pass through the outbox as /// records so the render thread can release GPU state on the next drain — /// the streamer never touches GPU resources directly. /// /// /// /// Threading: must be called from a single /// consumer thread (the render thread in production). All other public /// methods are thread-safe. /// /// public sealed class LandblockStreamer : IDisposable { /// /// Default drain batch size. Tuned to cap GPU upload work the render /// thread does per frame while still draining a moderate backlog in a /// few frames. Callers can override on a per-call basis. /// public const int DefaultDrainBatchSize = 4; private readonly Func _loadLandblock; private readonly Func _buildMeshOrNull; private readonly Channel _inbox; private readonly Channel _outbox; private readonly CancellationTokenSource _cancel = new(); private Thread? _worker; private int _disposed; /// /// Primary ctor — the factory takes the job's /// so it can branch on far-tier vs near-tier and skip entity hydration on far-tier /// loads (heightmap-only). See ISSUE #54: prior to this signature the worker always /// called the full-load path and stripped entities at the output, wasting per-LB /// LandBlockInfo + SceneryGenerator work. /// public LandblockStreamer( Func loadLandblock, Func? buildMeshOrNull = null) { _loadLandblock = loadLandblock; // Default: no mesh build (returns null → Failed result). Production // wires in LandblockMesh.Build via the T12 construction site. _buildMeshOrNull = buildMeshOrNull ?? ((_, _) => null); _inbox = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); _outbox = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true, SingleWriter = true }); } /// /// Back-compat overload — wraps a kind-agnostic factory so existing test code /// that doesn't care about the JobKind branch keeps compiling. The wrapper /// ignores the kind and calls the factory once per LB regardless of tier. /// New production code should use the primary 2-arg ctor. /// public LandblockStreamer( Func loadLandblock, Func? buildMeshOrNull = null) : this((id, _) => loadLandblock(id), buildMeshOrNull) { } /// /// Activate the dedicated background worker thread. Idempotent and /// thread-safe: concurrent callers will only spawn one worker; subsequent /// calls are no-ops. Atomic via . /// public void Start() { if (System.Threading.Volatile.Read(ref _disposed) != 0) throw new ObjectDisposedException(nameof(LandblockStreamer)); // A.5 T10-T12 follow-up: atomically install the worker so concurrent // Start() callers don't both pass the null check and spawn duplicate // threads. Construct the candidate; CAS it into _worker; if we lost // the race, the candidate goes unstarted and is GCed. var candidate = new Thread(WorkerLoop) { IsBackground = true, Name = "acdream.streaming.worker", }; if (Interlocked.CompareExchange(ref _worker, candidate, null) == null) candidate.Start(); // else: another caller won the race; their thread is running. } /// /// Non-blocking enqueue. The worker drains the inbox and posts a /// (or /// ) to the outbox. /// public void EnqueueLoad(uint landblockId, LandblockStreamJobKind kind = LandblockStreamJobKind.LoadNear) { if (System.Threading.Volatile.Read(ref _disposed) != 0) throw new ObjectDisposedException(nameof(LandblockStreamer)); _inbox.Writer.TryWrite(new LandblockStreamJob.Load(landblockId, kind)); } /// /// Non-blocking enqueue. The worker posts a /// to the outbox. /// public void EnqueueUnload(uint landblockId) { if (System.Threading.Volatile.Read(ref _disposed) != 0) throw new ObjectDisposedException(nameof(LandblockStreamer)); _inbox.Writer.TryWrite(new LandblockStreamJob.Unload(landblockId)); } /// /// Drain up to completed results. /// Non-blocking. Call from the render thread once per OnUpdate. /// /// /// Must be called from a single consumer thread. The outbox channel is /// configured with SingleReader = true and will throw on concurrent reads. /// public IReadOnlyList DrainCompletions(int maxBatchSize = DefaultDrainBatchSize) { var batch = new List(maxBatchSize); while (batch.Count < maxBatchSize && _outbox.Reader.TryRead(out var result)) batch.Add(result); return batch; } private void WorkerLoop() { var highPriority = new Queue(); var lowPriority = new Queue(); try { // Safe to block: this is a dedicated worker thread with no // SynchronizationContext, so .Result/.GetResult cannot deadlock // against any captured continuation. Using the sync pattern // here keeps the loop linear; an async-enumerable alternative // would force WorkerLoop to be async Task and lose the // simple thread-start shape. while (!_cancel.Token.IsCancellationRequested) { if (highPriority.Count == 0 && lowPriority.Count == 0 && !_inbox.Reader.WaitToReadAsync(_cancel.Token).AsTask().GetAwaiter().GetResult()) { break; } while (_inbox.Reader.TryRead(out var job)) EnqueuePrioritized(job, highPriority, lowPriority); if (highPriority.Count == 0 && lowPriority.Count == 0) continue; if (_cancel.Token.IsCancellationRequested) return; var next = highPriority.Count > 0 ? highPriority.Dequeue() : lowPriority.Dequeue(); HandleJob(next); } } catch (OperationCanceledException) { /* graceful shutdown */ } catch (Exception ex) { // Last-ditch: surface via outbox so the caller at least sees // something. We never retry a crashed worker. _outbox.Writer.TryWrite(new LandblockStreamResult.WorkerCrashed(ex.ToString())); } finally { _outbox.Writer.TryComplete(); } } private static void EnqueuePrioritized( LandblockStreamJob job, Queue highPriority, Queue lowPriority) { if (job is LandblockStreamJob.Load { Kind: LandblockStreamJobKind.LoadNear or LandblockStreamJobKind.PromoteToNear } high) { // Near-tier jobs are visible-content critical. They supersede an // older queued LoadFar for the same landblock: LoadNear obviously // loads everything, and PromoteToNear now carries mesh data so the // render thread can run the full near-tier apply side effects. If a // LoadFar is already being processed, the single worker naturally // finishes it before the promotion is dequeued. RemoveLowPriorityJobsForLandblock( lowPriority, high.LandblockId, removeLoadFar: true, removeUnload: true); highPriority.Enqueue(job); return; } lowPriority.Enqueue(job); } private static void RemoveLowPriorityJobsForLandblock( Queue queue, uint landblockId, bool removeLoadFar, bool removeUnload) { int count = queue.Count; for (int i = 0; i < count; i++) { var job = queue.Dequeue(); bool remove = job.LandblockId == landblockId && job switch { LandblockStreamJob.Load { Kind: LandblockStreamJobKind.LoadFar } => removeLoadFar, LandblockStreamJob.Unload => removeUnload, _ => false }; if (!remove) queue.Enqueue(job); } } private void HandleJob(LandblockStreamJob job) { switch (job) { case LandblockStreamJob.Load load: // ISSUE #54 (post-A.5): JobKind is now plumbed through to the // factory, so far-tier loads can skip LandBlockInfo + scenery // + interior hydration on the worker thread (heightmap-only). // The post-load entity-strip below is retained as a Debug // assertion + Release safety net for the case where a buggy // factory returns far-tier with entities anyway. try { var lb = _loadLandblock(load.LandblockId, load.Kind); if (lb is null) { _outbox.Writer.TryWrite(new LandblockStreamResult.Failed( load.LandblockId, "LandblockLoader.Load returned null")); break; } if (load.Kind == LandblockStreamJobKind.PromoteToNear) { var promotedMesh = _buildMeshOrNull(load.LandblockId, lb); if (promotedMesh is null) { _outbox.Writer.TryWrite(new LandblockStreamResult.Failed( load.LandblockId, "buildMeshOrNull returned null")); break; } _outbox.Writer.TryWrite(new LandblockStreamResult.Promoted( load.LandblockId, lb, promotedMesh)); break; } var mesh = _buildMeshOrNull(load.LandblockId, lb); if (mesh is null) { _outbox.Writer.TryWrite(new LandblockStreamResult.Failed( load.LandblockId, "buildMeshOrNull returned null")); break; } var tier = load.Kind == LandblockStreamJobKind.LoadFar ? LandblockStreamTier.Far : LandblockStreamTier.Near; if (tier == LandblockStreamTier.Far && lb.Entities.Count > 0) { // Belt-and-suspenders: factory should have skipped // entity hydration for LoadFar. If it didn't, fail // loud in Debug builds and strip in Release. System.Diagnostics.Debug.Assert(false, $"Far-tier factory should skip entity hydration; got {lb.Entities.Count} entities for LB 0x{load.LandblockId:X8}"); lb = new LoadedLandblock( lb.LandblockId, lb.Heightmap, System.Array.Empty()); } _outbox.Writer.TryWrite(new LandblockStreamResult.Loaded( load.LandblockId, tier, lb, mesh)); } catch (Exception ex) { _outbox.Writer.TryWrite(new LandblockStreamResult.Failed( load.LandblockId, ex.ToString())); } break; case LandblockStreamJob.Unload unload: _outbox.Writer.TryWrite(new LandblockStreamResult.Unloaded(unload.LandblockId)); break; } } public void Dispose() { if (System.Threading.Interlocked.Exchange(ref _disposed, 1) != 0) return; _cancel.Cancel(); _inbox.Writer.TryComplete(); _worker?.Join(TimeSpan.FromSeconds(2)); _cancel.Dispose(); } }