acdream/src/AcDream.Core.Net/WorldSession.cs
Erik ae43531866 feat(net): Phase A.3 — background net receive thread
Moves the UDP receive onto a dedicated daemon thread that
continuously pulls raw datagrams from the kernel buffer and posts
them to a Channel<byte[]>. Tick() on the render thread drains the
channel instead of calling _net.TryReceive() directly. All decode,
fragment assembly, ISAAC crypto, event dispatch, and ack-sending
remain on the render thread — this is the minimal change that
prevents packet drops during render-thread stalls without the
complexity of moving decode/dispatch off-thread.

The net thread starts at the end of EnterWorld() after the handshake
is complete — during Connect() and EnterWorld(), PumpOnce() still
reads directly from the socket (the net thread isn't running yet).
Dispose() cancels the thread via CancellationToken, joins with a
2-second timeout, then disposes the socket.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 09:20:56 +02:00

539 lines
22 KiB
C#

using System.Buffers.Binary;
using System.Net;
using System.Threading.Channels;
using AcDream.Core.Net.Cryptography;
using AcDream.Core.Net.Messages;
using AcDream.Core.Net.Packets;
namespace AcDream.Core.Net;
/// <summary>
/// High-level AC client session: owns a <see cref="NetClient"/>, drives
/// the full handshake + character-enter-world flow, and converts the
/// inbound GameMessage stream into C# events that a game loop can bind.
///
/// <para>
/// Intended use from <c>GameWindow</c>:
/// </para>
/// <code>
/// var session = new WorldSession(new IPEndPoint(IPAddress.Loopback, 9000));
/// session.EntitySpawned += snap =&gt; { /* add to IGameState */ };
/// session.Connect("testaccount", "testpassword"); // blocks until CharacterList
/// session.EnterWorld(characterIndex: 0); // blocks until first CreateObject
/// // ... then every frame:
/// session.Tick(); // non-blocking, drains any pending packets, fires events
/// </code>
///
/// <para>
/// <b>Not yet provided</b> (deferred): ACK pump, retransmit handling,
/// delete-object processing, position updates, chat, disconnect detection.
/// The current client is one-shot — connect, enter the world, stream
/// events for a few seconds, let the test harness tear it down.
/// </para>
/// </summary>
public sealed class WorldSession : IDisposable
{
public enum State
{
Disconnected,
Handshaking,
InCharacterSelect,
EnteringWorld,
InWorld,
Failed,
}
public readonly record struct EntitySpawn(
uint Guid,
CreateObject.ServerPosition? Position,
uint? SetupTableId,
IReadOnlyList<CreateObject.AnimPartChange> AnimPartChanges,
IReadOnlyList<CreateObject.TextureChange> TextureChanges,
IReadOnlyList<CreateObject.SubPaletteSwap> SubPalettes,
uint? BasePaletteId,
float? ObjScale,
string? Name,
CreateObject.ServerMotionState? MotionState,
uint? MotionTableId);
/// <summary>Fires when the session finishes parsing a CreateObject.</summary>
public event Action<EntitySpawn>? EntitySpawned;
/// <summary>
/// Payload for <see cref="MotionUpdated"/>: the server guid of the entity
/// whose motion changed and its new server-side stance + forward command.
/// The renderer uses these to drive per-entity cycle switching.
/// </summary>
public readonly record struct EntityMotionUpdate(
uint Guid,
CreateObject.ServerMotionState MotionState);
/// <summary>
/// Fires when the session parses a 0xF74C UpdateMotion game message.
/// Subscribers can look up the entity by guid and transition its
/// animation cycle to the new (stance, forward-command) pair.
/// </summary>
public event Action<EntityMotionUpdate>? MotionUpdated;
/// <summary>
/// Payload for <see cref="PositionUpdated"/>: the server guid plus a
/// full <see cref="CreateObject.ServerPosition"/> describing the
/// entity's new world position and rotation. Subscribers translate
/// the landblock-local position into acdream world space and reseat
/// the corresponding <c>WorldEntity</c>.
/// </summary>
public readonly record struct EntityPositionUpdate(
uint Guid,
CreateObject.ServerPosition Position,
System.Numerics.Vector3? Velocity);
/// <summary>
/// Fires when the session parses a 0xF748 UpdatePosition game message.
/// </summary>
public event Action<EntityPositionUpdate>? PositionUpdated;
/// <summary>Raised every time the state machine transitions.</summary>
public event Action<State>? StateChanged;
public State CurrentState { get; private set; } = State.Disconnected;
public CharacterList.Parsed? Characters { get; private set; }
private readonly NetClient _net;
private readonly IPEndPoint _loginEndpoint;
private readonly IPEndPoint _connectEndpoint;
private readonly FragmentAssembler _assembler = new();
private IsaacRandom? _inboundIsaac;
private IsaacRandom? _outboundIsaac;
private ushort _sessionClientId;
private uint _clientPacketSequence;
private uint _fragmentSequence = 1;
// Phase A.3: background receive thread buffers raw UDP datagrams into
// a channel so the render thread never blocks on socket I/O.
private readonly Channel<byte[]> _inboundQueue =
Channel.CreateUnbounded<byte[]>(
new UnboundedChannelOptions
{ SingleReader = true, SingleWriter = true });
private Thread? _netThread;
private readonly CancellationTokenSource _netCancel = new();
/// <summary>
/// Phase 4.10 latch — true after we've sent the LoginComplete game
/// action in response to PlayerCreate. Prevents re-sending if the
/// server emits multiple PlayerCreate messages (rare but possible
/// across recall / portal teleports).
/// </summary>
private bool _loginCompleteSent;
public WorldSession(IPEndPoint serverLogin)
{
_loginEndpoint = serverLogin;
_connectEndpoint = new IPEndPoint(serverLogin.Address, serverLogin.Port + 1);
_net = new NetClient(serverLogin);
}
/// <summary>
/// Do the 3-leg handshake (LoginRequest → ConnectRequest → ConnectResponse),
/// then drain packets until CharacterList is assembled. Blocks for up to
/// <paramref name="timeout"/> total.
/// </summary>
public void Connect(string account, string password, TimeSpan? timeout = null)
{
var deadline = DateTime.UtcNow + (timeout ?? TimeSpan.FromSeconds(10));
Transition(State.Handshaking);
// Step 1: LoginRequest
uint timestamp = (uint)DateTimeOffset.UtcNow.ToUnixTimeSeconds();
byte[] loginPayload = LoginRequest.Build(account, password, timestamp);
var loginHeader = new PacketHeader { Flags = PacketHeaderFlags.LoginRequest };
_net.Send(PacketCodec.Encode(loginHeader, loginPayload, null));
// Step 2: wait for ConnectRequest
Packet? cr = null;
while (DateTime.UtcNow < deadline && cr is null)
{
var bytes = _net.Receive(deadline - DateTime.UtcNow, out _);
if (bytes is null) break;
var dec = PacketCodec.TryDecode(bytes, null);
if (dec.IsOk && dec.Packet!.Header.HasFlag(PacketHeaderFlags.ConnectRequest))
cr = dec.Packet;
}
if (cr is null) { Transition(State.Failed); throw new TimeoutException("ConnectRequest not received"); }
// Step 3: seed ISAAC, send ConnectResponse to port+1, with 200ms race delay
var opt = cr.Optional;
byte[] serverSeedBytes = new byte[4];
BinaryPrimitives.WriteUInt32LittleEndian(serverSeedBytes, opt.ConnectRequestServerSeed);
byte[] clientSeedBytes = new byte[4];
BinaryPrimitives.WriteUInt32LittleEndian(clientSeedBytes, opt.ConnectRequestClientSeed);
_inboundIsaac = new IsaacRandom(serverSeedBytes);
_outboundIsaac = new IsaacRandom(clientSeedBytes);
_sessionClientId = (ushort)opt.ConnectRequestClientId;
_clientPacketSequence = 2;
byte[] crBody = new byte[8];
BinaryPrimitives.WriteUInt64LittleEndian(crBody, opt.ConnectRequestCookie);
var crHeader = new PacketHeader { Sequence = 1, Flags = PacketHeaderFlags.ConnectResponse, Id = 0 };
Thread.Sleep(200);
_net.Send(_connectEndpoint, PacketCodec.Encode(crHeader, crBody, null));
Transition(State.InCharacterSelect);
// Step 4: drain until CharacterList arrives
while (DateTime.UtcNow < deadline && Characters is null)
{
PumpOnce();
}
if (Characters is null) { Transition(State.Failed); throw new TimeoutException("CharacterList not received"); }
}
/// <summary>
/// Send CharacterEnterWorldRequest and CharacterEnterWorld for
/// <see cref="Characters"/>[<paramref name="characterIndex"/>].
/// Returns once the server starts sending CreateObjects (at which point
/// callers should poll <see cref="Tick"/> to stream events).
/// </summary>
public void EnterWorld(string account, int characterIndex = 0, TimeSpan? timeout = null)
{
if (Characters is null || Characters.Characters.Count == 0)
throw new InvalidOperationException("Connect() must complete with a non-empty CharacterList");
if (characterIndex < 0 || characterIndex >= Characters.Characters.Count)
throw new ArgumentOutOfRangeException(nameof(characterIndex));
var deadline = DateTime.UtcNow + (timeout ?? TimeSpan.FromSeconds(10));
var chosen = Characters.Characters[characterIndex];
Transition(State.EnteringWorld);
SendGameMessage(CharacterEnterWorld.BuildEnterWorldRequestBody());
// Wait for CharacterEnterWorldServerReady (0xF7DF)
bool serverReady = false;
while (DateTime.UtcNow < deadline && !serverReady)
{
var drained = PumpOnce(out var opcodes);
if (!drained) continue;
foreach (var op in opcodes)
if (op == 0xF7DFu) { serverReady = true; break; }
}
if (!serverReady) { Transition(State.Failed); throw new TimeoutException("ServerReady not received"); }
SendGameMessage(CharacterEnterWorld.BuildEnterWorldBody(chosen.Id, account));
// NOTE: LoginComplete used to be sent here unconditionally. That was
// wrong — per holtburger's flow (see references/holtburger/.../client/
// messages.rs lines 391-422), LoginComplete is sent in response to the
// server's PlayerCreate (0xF746) game message, NOT immediately after
// EnterWorld. Sending it too early means the player object isn't
// ready and the server ignores it. The actual trigger lives in
// ProcessDatagram.
Transition(State.InWorld);
// Phase A.3: start the background receive thread now that the
// handshake is complete and the session is fully established.
// During Connect() and EnterWorld(), PumpOnce() read directly
// from the socket (blocking). From here on, Tick() drains the
// channel instead.
_netThread = new Thread(NetReceiveLoop)
{
IsBackground = true,
Name = "acdream.net-recv",
};
_netThread.Start();
}
/// <summary>
/// Non-blocking pump. Drains any datagrams buffered by the background
/// net thread (Phase A.3), decodes them, and fires events. Call once
/// per game-loop frame. Returns the number of datagrams processed.
/// </summary>
public int Tick()
{
int processed = 0;
while (_inboundQueue.Reader.TryRead(out var bytes))
{
ProcessDatagram(bytes);
processed++;
}
return processed;
}
/// <summary>
/// Phase A.3: background receive loop. Runs on a dedicated daemon
/// thread started at the end of <see cref="EnterWorld"/>. Continuously
/// pulls raw UDP datagrams from the kernel buffer via
/// <see cref="NetClient.Receive"/> and writes them into
/// <see cref="_inboundQueue"/> for the render thread to drain in
/// <see cref="Tick"/>. Does NOT decode, reassemble, or dispatch —
/// all of that stays on the render thread to avoid ISAAC/assembler
/// thread-safety issues.
///
/// <para>
/// The 250ms receive timeout is the heartbeat: if no packet arrives
/// within 250ms, the loop re-checks the cancellation token and
/// tries again. On shutdown, <see cref="Dispose"/> cancels the token
/// and joins the thread.
/// </para>
/// </summary>
private void NetReceiveLoop()
{
try
{
while (!_netCancel.Token.IsCancellationRequested)
{
var bytes = _net.Receive(TimeSpan.FromMilliseconds(250), out _);
if (bytes is not null)
_inboundQueue.Writer.TryWrite(bytes);
}
}
catch (OperationCanceledException) { /* graceful shutdown */ }
catch (System.Net.Sockets.SocketException) { /* socket closed during shutdown */ }
catch (ObjectDisposedException) { /* NetClient disposed before thread noticed */ }
finally
{
_inboundQueue.Writer.TryComplete();
}
}
/// <summary>
/// Blocking single-datagram pump used during Connect/EnterWorld.
/// Returns true if a datagram was processed.
/// </summary>
private bool PumpOnce()
{
return PumpOnce(out _);
}
private bool PumpOnce(out List<uint> opcodesThisCall)
{
opcodesThisCall = new List<uint>();
var bytes = _net.Receive(TimeSpan.FromMilliseconds(250), out _);
if (bytes is null) return false;
ProcessDatagram(bytes, opcodesThisCall);
return true;
}
private void ProcessDatagram(byte[] bytes, List<uint>? opcodesOut = null)
{
var dec = PacketCodec.TryDecode(bytes, _inboundIsaac);
if (!dec.IsOk) return;
// Phase 4.9: send an ACK_SEQUENCE control packet for every received
// server packet with sequence > 0 and no ACK flag of its own. This
// is the proper holtburger pattern (every received packet gets an
// ack queued back; not periodic). Without it, ACE drops the session
// with "Network Timeout" because it sees no acks coming back —
// which surfaces in other clients' views as the player rendering
// as a stationary purple haze (loading state).
var serverHeader = dec.Packet!.Header;
if (serverHeader.Sequence > 0
&& (serverHeader.Flags & PacketHeaderFlags.AckSequence) == 0)
{
SendAck(serverHeader.Sequence);
}
foreach (var frag in dec.Packet!.Fragments)
{
var body = _assembler.Ingest(frag, out _);
if (body is null || body.Length < 4) continue;
uint op = BinaryPrimitives.ReadUInt32LittleEndian(body);
opcodesOut?.Add(op);
if (op == CharacterList.Opcode && Characters is null)
{
try { Characters = CharacterList.Parse(body); }
catch { /* malformed — ignore and keep draining */ }
}
else if (op == 0xF7E5u) // DddInterrogation — server asks "what dat list versions do you have?"
{
// Phase 4.10: reply with an empty DddInterrogationResponse
// (language=1 English, count=0 lists). The server is happy
// with an empty acknowledgement; without ANY reply it keeps
// the client in a transitional state and renders us as the
// purple loading haze to other clients. Pattern from
// references/holtburger/.../client/messages.rs::DddInterrogation
SendGameMessage(DddInterrogationResponse.Build());
}
else if (op == 0xF746u && !_loginCompleteSent) // PlayerCreate — server creates our player object
{
// Phase 4.10: PlayerCreate for our character is the cue to
// send LoginComplete. Sending it earlier (right after the
// outbound CharacterEnterWorld) was wrong because the server
// hadn't finished spawning the player yet. Holtburger's
// client/messages.rs (PlayerCreate handler) confirms this is
// the correct trigger. Send once per session.
_loginCompleteSent = true;
SendGameMessage(GameActionLoginComplete.Build());
}
else if (op == CreateObject.Opcode)
{
var parsed = CreateObject.TryParse(body);
if (parsed is not null)
{
EntitySpawned?.Invoke(new EntitySpawn(
parsed.Value.Guid,
parsed.Value.Position,
parsed.Value.SetupTableId,
parsed.Value.AnimPartChanges,
parsed.Value.TextureChanges,
parsed.Value.SubPalettes,
parsed.Value.BasePaletteId,
parsed.Value.ObjScale,
parsed.Value.Name,
parsed.Value.MotionState,
parsed.Value.MotionTableId));
}
}
else if (op == UpdateMotion.Opcode)
{
// Phase 6.6: the server sends UpdateMotion (0xF74C) whenever an
// already-spawned entity changes its motion state — NPCs
// starting a walk cycle, creatures entering combat, doors
// opening, etc. We dispatch a lightweight event with the
// new (stance, forward-command) pair so the animation
// system can swap the entity's cycle.
var motion = UpdateMotion.TryParse(body);
if (motion is not null)
{
MotionUpdated?.Invoke(new EntityMotionUpdate(
motion.Value.Guid,
motion.Value.MotionState));
}
}
else if (op == UpdatePosition.Opcode)
{
// Phase 6.7: the server sends UpdatePosition (0xF748) every
// time an entity moves through the world — NPC patrols,
// creatures hunting, other players walking past, projectiles
// tracking. Without this, everything stays at its
// CreateObject spawn point forever.
var posUpdate = UpdatePosition.TryParse(body);
if (posUpdate is not null)
{
PositionUpdated?.Invoke(new EntityPositionUpdate(
posUpdate.Value.Guid,
posUpdate.Value.Position,
posUpdate.Value.Velocity));
}
}
}
}
private void SendGameMessage(byte[] gameMessageBody)
{
var fragment = GameMessageFragment.BuildSingleFragment(
_fragmentSequence++, GameMessageGroup.UIQueue, gameMessageBody);
byte[] packetBody = GameMessageFragment.Serialize(fragment);
var header = new PacketHeader
{
Sequence = _clientPacketSequence++,
Flags = PacketHeaderFlags.BlobFragments | PacketHeaderFlags.EncryptedChecksum,
Id = _sessionClientId,
};
byte[] datagram = PacketCodec.Encode(header, packetBody, _outboundIsaac);
_net.Send(datagram);
}
/// <summary>
/// Phase 4.9: send a bare ACK_SEQUENCE control packet acknowledging
/// <paramref name="serverPacketSequence"/>. This is a cleartext control
/// packet (no EncryptedChecksum) — the body is just the 4-byte server
/// sequence number being acknowledged. The header re-uses the most
/// recently sent client sequence (no increment) because acks aren't
/// themselves part of the reliable stream the server tracks.
///
/// <para>
/// Without sending these, ACE drops the session with
/// <c>Network Timeout</c> after ~60s — and during that 60s the
/// character appears to other clients as a stationary purple haze
/// (loading state) because the server hasn't seen the client confirm
/// any post-EnterWorld traffic.
/// </para>
///
/// <para>
/// Pattern ported from
/// <c>references/holtburger/crates/holtburger-session/src/session/send.rs::send_ack</c>
/// and the receive-side trigger at
/// <c>.../session/receive.rs::finalize_ordered_server_packet</c>.
/// </para>
/// </summary>
private void SendAck(uint serverPacketSequence)
{
// 4-byte body: little-endian u32 of the server sequence we're acking.
Span<byte> body = stackalloc byte[4];
BinaryPrimitives.WriteUInt32LittleEndian(body, serverPacketSequence);
// Holtburger uses current_client_sequence (= packet_sequence - 1) for
// ack headers. We mirror that — acks borrow the most recently issued
// client sequence rather than consuming a new one.
uint ackHeaderSequence = _clientPacketSequence > 0
? _clientPacketSequence - 1
: 0u;
var header = new PacketHeader
{
Sequence = ackHeaderSequence,
Flags = PacketHeaderFlags.AckSequence,
Id = _sessionClientId,
};
byte[] datagram = PacketCodec.Encode(header, body, outboundIsaac: null);
_net.Send(datagram);
}
private void Transition(State next)
{
if (CurrentState == next) return;
CurrentState = next;
StateChanged?.Invoke(next);
}
/// <summary>
/// Graceful shutdown: tell the server we're leaving so it releases the
/// character lock immediately instead of waiting 60s for the session to
/// time out. Pattern from
/// <c>references/holtburger/crates/holtburger-core/src/client/commands.rs</c>
/// lines 879-892: send <c>CharacterLogOff</c> game message (opcode
/// 0xF653, no payload) then send a bare <c>DISCONNECT</c> control
/// packet (header flag 0x8000, no payload).
/// </summary>
public void Dispose()
{
if (CurrentState == State.InWorld)
{
try
{
// Tell ACE "player is leaving the world" so it cleans up
// the character immediately.
var logoff = new Packets.PacketWriter(8);
logoff.WriteUInt32(0xF653u); // CharacterLogOff opcode
SendGameMessage(logoff.ToArray());
// Tell the transport layer "close this session."
var disconnectHeader = new PacketHeader
{
Sequence = _clientPacketSequence++,
Flags = PacketHeaderFlags.Disconnect,
Id = _sessionClientId,
};
byte[] disconnectPacket = PacketCodec.Encode(
disconnectHeader, ReadOnlySpan<byte>.Empty, outboundIsaac: null);
_net.Send(disconnectPacket);
}
catch
{
// Best-effort — if the socket is already dead, eat the
// exception and let Dispose finish cleaning up.
}
}
// Phase A.3: shut down the background receive thread. Cancel the
// token → the 250ms receive timeout fires → loop exits → join.
_netCancel.Cancel();
_inboundQueue.Writer.TryComplete();
_netThread?.Join(TimeSpan.FromSeconds(2));
_netCancel.Dispose();
_net.Dispose();
}
}