acdream/src/AcDream.App/Streaming/LandblockStreamer.cs
Erik 56860501b6 fix(G.3): collapse streaming to the single dungeon landblock indoors (#133 FPS)
Dungeon FPS sat at ~30 (frame ~33ms) because the 25x25 streaming window around
the dungeon landblock pulled in ~129 NEIGHBORING landblocks + their thousands of
torch/particle emitters, all drawn though never visible. In AC all dungeons are
packed adjacent in the unused "ocean" map grid, so those neighbors are unrelated
dungeons. The FPS timeline proved it: 247 fps at login (lb 0/0, ~10K entities) →
17 → 30 as landblocks streamed in (lb 0→129) — the cost tracked LANDBLOCK count,
not entities.

Retail-faithful: ACE LandblockManager.GetAdjacentIDs returns ZERO adjacents for a
dungeon (`if (landblock.IsDungeon) return adjacents;`, Landblock.cs:577-582) —
every dungeon is a self-contained landblock you never see out of.

Fix: when the player stands in a sealed indoor cell (CurrCell.IsEnv &&
!SeenOutside — the same predicate that kills the sun/sky), collapse streaming to
just the player's dungeon landblock and unload the neighbors. Building interiors
(cottage/inn) have SeenOutside cells, so they are NOT gated and keep their
surrounding terrain (the frozen building/cellar demo is unaffected). Unloading the
neighbors also tears down their lights (removeTerrain → UnregisterOwner), shrinking
LightManager._all from ~2227 toward retail's ≤40 — which directly helps the A7
lighting bake landing next.

Mechanics (StreamingController):
- Edge IN: ClearPendingLoads() cancels the in-flight 25x25 window (new streamer
  ClearLoads control job — worker drops queued Loads, keeps Unloads), unload every
  resident neighbor, pin a radius-0 StreamingRegion, (re)load the dungeon block if
  needed.
- Stay collapsed: sweep any straggler that finished loading after the edge (a Load
  the worker had already dequeued before ClearLoads).
- Edge OUT (portal/teleport to outdoors): rebuild the full two-tier window at the
  new center, unload anything stale.

AP-36 added to the divergence register (the gate uses the cheap SeenOutside cell
predicate as an approximation of ACE's full landblock IsDungeon classification).
GameWindow also carries a TEMP ACDREAM_LOG_FPS=1 headless FPS line (strip after
the A7 FPS+lighting verification).

Build green; 58 streaming tests green (6 new dungeon-gate tests).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-13 22:32:56 +02:00

386 lines
17 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using AcDream.Core.World;
namespace AcDream.App.Streaming;
/// <summary>
/// Services landblock load/unload requests by invoking caller-supplied
/// factory delegates (the production instance wraps
/// <see cref="LandblockLoader.Load"/> for loading and
/// <see cref="AcDream.Core.Terrain.LandblockMesh.Build"/> for the terrain
/// mesh) and posting results to an outbox the render thread drains once
/// per OnUpdate.
///
/// <para>
/// <b>Thread model (Phase A.5 T11+):</b> <see cref="Start"/> spawns a
/// dedicated background worker thread. <see cref="EnqueueLoad"/> and
/// <see cref="EnqueueUnload"/> write non-blocking to the inbox
/// <see cref="Channel{T}"/>; the worker drains it and posts
/// <see cref="LandblockStreamResult"/> records to the outbox.
/// </para>
///
/// <para>
/// <b>DatCollection thread safety</b> is provided by the caller:
/// GameWindow's <c>_datLock</c> (Phase A.5 T10) serialises all
/// <c>DatCollection.Get&lt;T&gt;</c> calls. Both factory closures passed at
/// construction acquire that lock before reading dats. The worker never
/// touches <c>DatCollection</c> directly — it only calls the factories.
/// </para>
///
/// <para>
/// Unloads pass through the outbox as <see cref="LandblockStreamResult.Unloaded"/>
/// records so the render thread can release GPU state on the next drain —
/// the streamer never touches GPU resources directly.
/// </para>
///
/// <remarks>
/// Threading: <see cref="DrainCompletions"/> must be called from a single
/// consumer thread (the render thread in production). All other public
/// methods are thread-safe.
/// </remarks>
/// </summary>
public sealed class LandblockStreamer : IDisposable
{
/// <summary>
/// Default drain batch size. Tuned to cap GPU upload work the render
/// thread does per frame while still draining a moderate backlog in a
/// few frames. Callers can override on a per-call basis.
/// </summary>
public const int DefaultDrainBatchSize = 4;
private readonly Func<uint, LandblockStreamJobKind, LoadedLandblock?> _loadLandblock;
private readonly Func<uint, LoadedLandblock?, AcDream.Core.Terrain.LandblockMeshData?> _buildMeshOrNull;
private readonly Channel<LandblockStreamJob> _inbox;
private readonly Channel<LandblockStreamResult> _outbox;
private readonly CancellationTokenSource _cancel = new();
private Thread? _worker;
private int _disposed;
/// <summary>
/// Primary ctor — the factory takes the job's <see cref="LandblockStreamJobKind"/>
/// so it can branch on far-tier vs near-tier and skip entity hydration on far-tier
/// loads (heightmap-only). See ISSUE #54: prior to this signature the worker always
/// called the full-load path and stripped entities at the output, wasting per-LB
/// <c>LandBlockInfo</c> + <c>SceneryGenerator</c> work.
/// </summary>
public LandblockStreamer(
Func<uint, LandblockStreamJobKind, LoadedLandblock?> loadLandblock,
Func<uint, LoadedLandblock?, AcDream.Core.Terrain.LandblockMeshData?>? buildMeshOrNull = null)
{
_loadLandblock = loadLandblock;
// Default: no mesh build (returns null → Failed result). Production
// wires in LandblockMesh.Build via the T12 construction site.
_buildMeshOrNull = buildMeshOrNull ?? ((_, _) => null);
_inbox = Channel.CreateUnbounded<LandblockStreamJob>(
new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
_outbox = Channel.CreateUnbounded<LandblockStreamResult>(
new UnboundedChannelOptions { SingleReader = true, SingleWriter = true });
}
/// <summary>
/// Back-compat overload — wraps a kind-agnostic factory so existing test code
/// that doesn't care about the JobKind branch keeps compiling. The wrapper
/// ignores the kind and calls the factory once per LB regardless of tier.
/// New production code should use the primary 2-arg ctor.
/// </summary>
public LandblockStreamer(
Func<uint, LoadedLandblock?> loadLandblock,
Func<uint, LoadedLandblock?, AcDream.Core.Terrain.LandblockMeshData?>? buildMeshOrNull = null)
: this((id, _) => loadLandblock(id), buildMeshOrNull)
{
}
/// <summary>
/// Activate the dedicated background worker thread. Idempotent and
/// thread-safe: concurrent callers will only spawn one worker; subsequent
/// calls are no-ops. Atomic via <see cref="Interlocked.CompareExchange{T}(ref T, T, T)"/>.
/// </summary>
public void Start()
{
if (System.Threading.Volatile.Read(ref _disposed) != 0)
throw new ObjectDisposedException(nameof(LandblockStreamer));
// A.5 T10-T12 follow-up: atomically install the worker so concurrent
// Start() callers don't both pass the null check and spawn duplicate
// threads. Construct the candidate; CAS it into _worker; if we lost
// the race, the candidate goes unstarted and is GCed.
var candidate = new Thread(WorkerLoop)
{
IsBackground = true,
Name = "acdream.streaming.worker",
};
if (Interlocked.CompareExchange(ref _worker, candidate, null) == null)
candidate.Start();
// else: another caller won the race; their thread is running.
}
/// <summary>
/// Non-blocking enqueue. The worker drains the inbox and posts a
/// <see cref="LandblockStreamResult.Loaded"/> (or
/// <see cref="LandblockStreamResult.Failed"/>) to the outbox.
/// </summary>
public void EnqueueLoad(uint landblockId, LandblockStreamJobKind kind = LandblockStreamJobKind.LoadNear)
{
if (System.Threading.Volatile.Read(ref _disposed) != 0)
throw new ObjectDisposedException(nameof(LandblockStreamer));
_inbox.Writer.TryWrite(new LandblockStreamJob.Load(landblockId, kind));
}
/// <summary>
/// Non-blocking enqueue. The worker posts a
/// <see cref="LandblockStreamResult.Unloaded"/> to the outbox.
/// </summary>
public void EnqueueUnload(uint landblockId)
{
if (System.Threading.Volatile.Read(ref _disposed) != 0)
throw new ObjectDisposedException(nameof(LandblockStreamer));
_inbox.Writer.TryWrite(new LandblockStreamJob.Unload(landblockId));
}
/// <summary>
/// Cancel every queued-but-not-started Load. Posts a
/// <see cref="LandblockStreamJob.ClearLoads"/> control job which the worker
/// honours at read time, dropping all pending Loads from both priority
/// queues (Unloads survive). Used on the dungeon-entry edge to abort the
/// in-flight 25×25 neighbor window so the ~129 ocean-grid dungeons never
/// finish loading (#133 FPS). Loads the worker has ALREADY dequeued still
/// complete; the StreamingController's collapsed-sweep unloads those few.
/// </summary>
public void ClearPendingLoads()
{
if (System.Threading.Volatile.Read(ref _disposed) != 0)
throw new ObjectDisposedException(nameof(LandblockStreamer));
_inbox.Writer.TryWrite(new LandblockStreamJob.ClearLoads());
}
/// <summary>
/// Drain up to <paramref name="maxBatchSize"/> completed results.
/// Non-blocking. Call from the render thread once per OnUpdate.
/// </summary>
/// <remarks>
/// Must be called from a single consumer thread. The outbox channel is
/// configured with SingleReader = true and will throw on concurrent reads.
/// </remarks>
public IReadOnlyList<LandblockStreamResult> DrainCompletions(int maxBatchSize = DefaultDrainBatchSize)
{
var batch = new List<LandblockStreamResult>(maxBatchSize);
while (batch.Count < maxBatchSize && _outbox.Reader.TryRead(out var result))
batch.Add(result);
return batch;
}
private void WorkerLoop()
{
var highPriority = new Queue<LandblockStreamJob>();
var lowPriority = new Queue<LandblockStreamJob>();
try
{
// Safe to block: this is a dedicated worker thread with no
// SynchronizationContext, so .Result/.GetResult cannot deadlock
// against any captured continuation. Using the sync pattern
// here keeps the loop linear; an async-enumerable alternative
// would force WorkerLoop to be async Task and lose the
// simple thread-start shape.
while (!_cancel.Token.IsCancellationRequested)
{
if (highPriority.Count == 0 &&
lowPriority.Count == 0 &&
!_inbox.Reader.WaitToReadAsync(_cancel.Token).AsTask().GetAwaiter().GetResult())
{
break;
}
while (_inbox.Reader.TryRead(out var job))
{
if (job is LandblockStreamJob.ClearLoads)
{
// Dungeon-entry cancellation: drop every queued Load,
// keep Unloads. Handled at read time so it supersedes
// Loads sitting in the priority queues ahead of it.
DropLoadJobs(highPriority);
DropLoadJobs(lowPriority);
continue;
}
EnqueuePrioritized(job, highPriority, lowPriority);
}
if (highPriority.Count == 0 && lowPriority.Count == 0)
continue;
if (_cancel.Token.IsCancellationRequested) return;
var next = highPriority.Count > 0
? highPriority.Dequeue()
: lowPriority.Dequeue();
HandleJob(next);
}
}
catch (OperationCanceledException) { /* graceful shutdown */ }
catch (Exception ex)
{
// Last-ditch: surface via outbox so the caller at least sees
// something. We never retry a crashed worker.
_outbox.Writer.TryWrite(new LandblockStreamResult.WorkerCrashed(ex.ToString()));
}
finally
{
_outbox.Writer.TryComplete();
}
}
private static void EnqueuePrioritized(
LandblockStreamJob job,
Queue<LandblockStreamJob> highPriority,
Queue<LandblockStreamJob> lowPriority)
{
if (job is LandblockStreamJob.Load
{
Kind: LandblockStreamJobKind.LoadNear or LandblockStreamJobKind.PromoteToNear
} high)
{
// Near-tier jobs are visible-content critical. They supersede an
// older queued LoadFar for the same landblock: LoadNear obviously
// loads everything, and PromoteToNear now carries mesh data so the
// render thread can run the full near-tier apply side effects. If a
// LoadFar is already being processed, the single worker naturally
// finishes it before the promotion is dequeued.
RemoveLowPriorityJobsForLandblock(
lowPriority,
high.LandblockId,
removeLoadFar: true,
removeUnload: true);
highPriority.Enqueue(job);
return;
}
lowPriority.Enqueue(job);
}
/// <summary>
/// Drop every <see cref="LandblockStreamJob.Load"/> from a priority queue,
/// preserving Unloads (and any other control jobs). Rotates the queue once
/// in place. Used by the <see cref="LandblockStreamJob.ClearLoads"/> path.
/// </summary>
private static void DropLoadJobs(Queue<LandblockStreamJob> queue)
{
int count = queue.Count;
for (int i = 0; i < count; i++)
{
var job = queue.Dequeue();
if (job is not LandblockStreamJob.Load)
queue.Enqueue(job);
}
}
private static void RemoveLowPriorityJobsForLandblock(
Queue<LandblockStreamJob> queue,
uint landblockId,
bool removeLoadFar,
bool removeUnload)
{
int count = queue.Count;
for (int i = 0; i < count; i++)
{
var job = queue.Dequeue();
bool remove = job.LandblockId == landblockId && job switch
{
LandblockStreamJob.Load { Kind: LandblockStreamJobKind.LoadFar } => removeLoadFar,
LandblockStreamJob.Unload => removeUnload,
_ => false
};
if (!remove)
queue.Enqueue(job);
}
}
private void HandleJob(LandblockStreamJob job)
{
switch (job)
{
case LandblockStreamJob.Load load:
// ISSUE #54 (post-A.5): JobKind is now plumbed through to the
// factory, so far-tier loads can skip LandBlockInfo + scenery
// + interior hydration on the worker thread (heightmap-only).
// The post-load entity-strip below is retained as a Debug
// assertion + Release safety net for the case where a buggy
// factory returns far-tier with entities anyway.
try
{
var lb = _loadLandblock(load.LandblockId, load.Kind);
if (lb is null)
{
_outbox.Writer.TryWrite(new LandblockStreamResult.Failed(
load.LandblockId, "LandblockLoader.Load returned null"));
break;
}
if (load.Kind == LandblockStreamJobKind.PromoteToNear)
{
var promotedMesh = _buildMeshOrNull(load.LandblockId, lb);
if (promotedMesh is null)
{
_outbox.Writer.TryWrite(new LandblockStreamResult.Failed(
load.LandblockId, "buildMeshOrNull returned null"));
break;
}
_outbox.Writer.TryWrite(new LandblockStreamResult.Promoted(
load.LandblockId, lb, promotedMesh));
break;
}
var mesh = _buildMeshOrNull(load.LandblockId, lb);
if (mesh is null)
{
_outbox.Writer.TryWrite(new LandblockStreamResult.Failed(
load.LandblockId, "buildMeshOrNull returned null"));
break;
}
var tier = load.Kind == LandblockStreamJobKind.LoadFar
? LandblockStreamTier.Far : LandblockStreamTier.Near;
if (tier == LandblockStreamTier.Far && lb.Entities.Count > 0)
{
// Belt-and-suspenders: factory should have skipped
// entity hydration for LoadFar. If it didn't, fail
// loud in Debug builds and strip in Release.
System.Diagnostics.Debug.Assert(false,
$"Far-tier factory should skip entity hydration; got {lb.Entities.Count} entities for LB 0x{load.LandblockId:X8}");
lb = new LoadedLandblock(
lb.LandblockId,
lb.Heightmap,
System.Array.Empty<AcDream.Core.World.WorldEntity>());
}
_outbox.Writer.TryWrite(new LandblockStreamResult.Loaded(
load.LandblockId, tier, lb, mesh));
}
catch (Exception ex)
{
_outbox.Writer.TryWrite(new LandblockStreamResult.Failed(
load.LandblockId, ex.ToString()));
}
break;
case LandblockStreamJob.Unload unload:
_outbox.Writer.TryWrite(new LandblockStreamResult.Unloaded(unload.LandblockId));
break;
}
}
public void Dispose()
{
if (System.Threading.Interlocked.Exchange(ref _disposed, 1) != 0) return;
_cancel.Cancel();
_inbox.Writer.TryComplete();
// Generous join: the owner disposes the DatCollection after this, which
// unmaps the dats' memory-mapped views — an abandoned worker mid-dat-read
// would take the process down with an AccessViolation in
// MemoryMappedBlockAllocator.ReadBlock (dat-race investigation 2026-06-09).
// Cancellation is honored between jobs, so the wait is bounded by one
// landblock load; 15s only ever elapses if the worker is genuinely hung.
if (_worker is not null && !_worker.Join(TimeSpan.FromSeconds(15)))
Console.Error.WriteLine(
"[streamer] worker did not stop within 15s — dat teardown may race an in-flight load");
_cancel.Dispose();
}
}