feat(net): message fragment header + fragment + assembler (Phase 4.3)

Ports the fragment layer of the AC UDP protocol. A UDP packet's body is
zero or more message fragments back-to-back; a logical GameMessage that
doesn't fit in ~448 bytes gets split across multiple fragments sharing
the same Id with differing Index values. The assembler handles
reassembly across arbitrary arrival ordering and duplicate fragments.

Added (all reimplemented from ACE's AGPL reference, see NOTICE.md):
  - Packets/MessageFragmentHeader.cs: 16-byte fragment header struct
    with Pack/Unpack, constants for MaxFragmentSize (464) and
    MaxFragmentDataSize (448). Bit-layout doc comment documents what
    each field is for.
  - Packets/MessageFragment.cs: readonly record struct bundling a
    header with its payload bytes; TryParse(source) parses one fragment
    from the start of a buffer and returns (fragment, consumed) for
    incremental parsing of multi-fragment packets. Refuses to parse
    fragments with impossible TotalSize (too small for header, too
    large for the 464-byte max, or larger than the source buffer).
  - Packets/FragmentAssembler.cs: buffers partial messages keyed by
    fragment Id. Ingest(frag, out queue) returns the assembled byte[]
    when the last fragment arrives, null while still waiting. Key
    correctness properties, all tested:
      * Single-fragment (Count=1) shortcut releases with no buffering
      * Out-of-order arrival (e.g. 2, 0, 1) releases on last arrival
        and assembles in INDEX order, not arrival order
      * Duplicate-fragment idempotence (re-sending same index is a no-op)
      * Missing fragments stay buffered; DropAll() forcibly clears them
      * Two independent messages can be assembled in parallel without
        interfering
      * messageQueue captured from first-arriving fragment (it's a
        property of the logical message, not individual fragments)

Tests (17 new, 37 total in net project, 114 across both test projects):
  - MessageFragmentHeader (4): pack/unpack round-trip, little-endian
    wire format, constants, size-check throw
  - MessageFragment (6): complete parse, insufficient header, oversized
    TotalSize, undersized TotalSize, incomplete body, two-back-to-back
    incremental parse
  - FragmentAssembler (7): single-fragment, in-order 3-fragment,
    out-of-order 3-fragment (tests index-order assembly), duplicate
    idempotence, missing-fragment buffered, two parallel messages,
    DropAll

Phase 4.4 (GameMessage reader + opcode handlers) next.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Erik 2026-04-11 14:20:53 +02:00
parent 18e308fe85
commit 3226c4bcab
6 changed files with 527 additions and 0 deletions

View file

@ -0,0 +1,111 @@
namespace AcDream.Core.Net.Packets;
/// <summary>
/// Reassembles multi-fragment GameMessages. UDP packets can arrive in any
/// order and individual fragments within a logical message can be split
/// across packets, so we buffer partial messages keyed by fragment Id and
/// only yield a complete byte stream once every <c>Count</c> fragment for
/// that Id has arrived.
///
/// <para>
/// <b>Correctness properties:</b>
/// <list type="bullet">
/// <item>Out-of-order arrival: fragments can arrive in any index order;
/// the full message is released on the last fragment regardless of
/// its index.</item>
/// <item>Duplicate-fragment idempotence: receiving index N twice for the
/// same Id is harmless — the second copy is silently ignored.</item>
/// <item>Single-fragment messages: Count=1 releases immediately on
/// that one fragment with no buffering.</item>
/// <item>Orphaned partials: if fragments for an Id arrive but the message
/// never completes, they stay buffered until
/// <see cref="DropAll"/> is called or the assembler is disposed.
/// A future phase will add a TTL-based eviction.</item>
/// </list>
/// </para>
/// </summary>
public sealed class FragmentAssembler
{
private readonly Dictionary<uint, PartialMessage> _inFlight = new();
/// <summary>
/// Number of logical messages currently partially-assembled (waiting on
/// more fragments to arrive).
/// </summary>
public int PartialCount => _inFlight.Count;
/// <summary>
/// Ingest one fragment. If this fragment completes a message, returns
/// the fully-assembled payload as a new byte array. Otherwise returns
/// <c>null</c> and the fragment is held for later assembly.
/// </summary>
/// <param name="fragment">The decoded fragment from the wire.</param>
/// <param name="messageQueue">
/// Filled with the completed message's GameMessageGroup (queue) if the
/// call returns a non-null payload; otherwise 0.
/// </param>
public byte[]? Ingest(in MessageFragment fragment, out ushort messageQueue)
{
var h = fragment.Header;
messageQueue = 0;
// Single-fragment message: shortcut to avoid the dictionary.
if (h.Count == 1 && h.Index == 0)
{
messageQueue = h.Queue;
return fragment.Payload;
}
if (!_inFlight.TryGetValue(h.Id, out var partial))
{
partial = new PartialMessage(h.Count, h.Queue);
_inFlight[h.Id] = partial;
}
// Idempotent: receiving the same index twice is not an error.
if (partial.Fragments[h.Index] is null)
{
partial.Fragments[h.Index] = fragment.Payload;
partial.ReceivedCount++;
}
if (partial.ReceivedCount < partial.TotalFragments)
return null;
// All fragments present — concatenate and release.
int totalBytes = 0;
for (int i = 0; i < partial.TotalFragments; i++)
totalBytes += partial.Fragments[i]!.Length;
var combined = new byte[totalBytes];
int offset = 0;
for (int i = 0; i < partial.TotalFragments; i++)
{
var p = partial.Fragments[i]!;
Buffer.BlockCopy(p, 0, combined, offset, p.Length);
offset += p.Length;
}
_inFlight.Remove(h.Id);
messageQueue = partial.Queue;
return combined;
}
/// <summary>Discard all in-flight partial messages.</summary>
public void DropAll() => _inFlight.Clear();
private sealed class PartialMessage
{
public readonly byte[]?[] Fragments;
public readonly int TotalFragments;
public readonly ushort Queue;
public int ReceivedCount;
public PartialMessage(int count, ushort queue)
{
TotalFragments = count;
Fragments = new byte[count][];
Queue = queue;
}
}
}

View file

@ -0,0 +1,44 @@
namespace AcDream.Core.Net.Packets;
/// <summary>
/// A complete message fragment: header + payload bytes. On the wire, a
/// UDP packet's body is zero or more fragments back-to-back; each fragment
/// is <see cref="MessageFragmentHeader.Size"/> + <c>payload.Length</c> bytes.
/// </summary>
public readonly record struct MessageFragment(MessageFragmentHeader Header, byte[] Payload)
{
/// <summary>Total bytes of this fragment on the wire (header + payload).</summary>
public int WireSize => MessageFragmentHeader.Size + Payload.Length;
/// <summary>
/// Parse a fragment from the start of <paramref name="source"/>. Returns
/// the parsed fragment and the number of bytes consumed (so the caller
/// can advance past this fragment to read the next one from the same
/// packet body). Returns <c>null</c> if the fragment header's TotalSize
/// is impossible (negative payload length, or larger than the UDP
/// payload budget).
/// </summary>
public static (MessageFragment? fragment, int consumed) TryParse(ReadOnlySpan<byte> source)
{
if (source.Length < MessageFragmentHeader.Size)
return (null, 0);
var header = MessageFragmentHeader.Unpack(source);
// TotalSize is the fragment's own size including its header. Anything
// smaller than the header or larger than the max fragment size is
// wire corruption and we refuse to parse.
if (header.TotalSize < MessageFragmentHeader.Size
|| header.TotalSize > MessageFragmentHeader.MaxFragmentSize)
{
return (null, 0);
}
int payloadLength = header.TotalSize - MessageFragmentHeader.Size;
if (source.Length < header.TotalSize)
return (null, 0);
var payload = source.Slice(MessageFragmentHeader.Size, payloadLength).ToArray();
return (new MessageFragment(header, payload), header.TotalSize);
}
}

View file

@ -0,0 +1,77 @@
using System.Buffers.Binary;
namespace AcDream.Core.Net.Packets;
/// <summary>
/// The 16-byte header that prefixes every message fragment inside a UDP
/// packet's body. A single UDP packet can contain one or more fragments,
/// and a single logical <c>GameMessage</c> can be split across multiple
/// fragments that share the same <see cref="Id"/> and <see cref="Sequence"/>
/// but differ in <see cref="Index"/>.
///
/// <para>
/// Layout (byte offsets):
/// <code>
/// 0 Sequence uint32 Fragment-level sequence
/// 4 Id uint32 Message id — fragments with the same Id belong
/// to the same logical GameMessage. Outbound
/// messages use the high bit set (0x80000000+).
/// 8 Count uint16 Total fragments in the logical message (1 if
/// the message fits in a single fragment)
/// 10 Size uint16 Total fragment size including this 16-byte
/// header — max 464 bytes (448 bytes of payload)
/// 12 Index uint16 0-based position of this fragment in the
/// logical message
/// 14 Queue uint16 GameMessageGroup — used by the server to
/// sequence related messages
/// </code>
/// </para>
///
/// Reimplemented from ACE's AGPL reference; see <c>NOTICE.md</c>.
/// </summary>
public struct MessageFragmentHeader
{
public const int Size = 16;
/// <summary>Max total fragment size on the wire (including this header).</summary>
public const int MaxFragmentSize = 464;
/// <summary>Max payload bytes per fragment (= MaxFragmentSize - Size).</summary>
public const int MaxFragmentDataSize = MaxFragmentSize - Size; // 448
public uint Sequence;
public uint Id;
public ushort Count;
public ushort TotalSize; // total bytes of this fragment including header
public ushort Index;
public ushort Queue;
public readonly void Pack(Span<byte> destination)
{
if (destination.Length < Size)
throw new ArgumentException($"destination must be at least {Size} bytes", nameof(destination));
BinaryPrimitives.WriteUInt32LittleEndian(destination.Slice(0), Sequence);
BinaryPrimitives.WriteUInt32LittleEndian(destination.Slice(4), Id);
BinaryPrimitives.WriteUInt16LittleEndian(destination.Slice(8), Count);
BinaryPrimitives.WriteUInt16LittleEndian(destination.Slice(10), TotalSize);
BinaryPrimitives.WriteUInt16LittleEndian(destination.Slice(12), Index);
BinaryPrimitives.WriteUInt16LittleEndian(destination.Slice(14), Queue);
}
public static MessageFragmentHeader Unpack(ReadOnlySpan<byte> source)
{
if (source.Length < Size)
throw new ArgumentException($"source must be at least {Size} bytes", nameof(source));
return new MessageFragmentHeader
{
Sequence = BinaryPrimitives.ReadUInt32LittleEndian(source.Slice(0)),
Id = BinaryPrimitives.ReadUInt32LittleEndian(source.Slice(4)),
Count = BinaryPrimitives.ReadUInt16LittleEndian(source.Slice(8)),
TotalSize = BinaryPrimitives.ReadUInt16LittleEndian(source.Slice(10)),
Index = BinaryPrimitives.ReadUInt16LittleEndian(source.Slice(12)),
Queue = BinaryPrimitives.ReadUInt16LittleEndian(source.Slice(14)),
};
}
}

View file

@ -0,0 +1,123 @@
using AcDream.Core.Net.Packets;
namespace AcDream.Core.Net.Tests.Packets;
public class FragmentAssemblerTests
{
private static MessageFragment MakeFrag(uint id, ushort count, ushort index, byte[] payload, ushort queue = 7)
=> new(
new MessageFragmentHeader
{
Id = id,
Count = count,
Index = index,
TotalSize = (ushort)(MessageFragmentHeader.Size + payload.Length),
Queue = queue,
},
payload);
[Fact]
public void Ingest_SingleFragmentMessage_ReleasesImmediately()
{
var assembler = new FragmentAssembler();
var frag = MakeFrag(id: 1, count: 1, index: 0, payload: new byte[] { 1, 2, 3 }, queue: 42);
var result = assembler.Ingest(frag, out var queue);
Assert.NotNull(result);
Assert.Equal(new byte[] { 1, 2, 3 }, result);
Assert.Equal(42, queue);
Assert.Equal(0, assembler.PartialCount);
}
[Fact]
public void Ingest_ThreeFragmentsInOrder_ReleasesOnLast()
{
// Queue is a property of the logical message, not individual fragments,
// so all three fragments carry the same queue value (captured from the
// first arrival). Testing with queue=9 on all three.
var assembler = new FragmentAssembler();
Assert.Null(assembler.Ingest(MakeFrag(7, 3, 0, new byte[] { 0xAA, 0xBB }, queue: 9), out _));
Assert.Equal(1, assembler.PartialCount);
Assert.Null(assembler.Ingest(MakeFrag(7, 3, 1, new byte[] { 0xCC, 0xDD }, queue: 9), out _));
var result = assembler.Ingest(MakeFrag(7, 3, 2, new byte[] { 0xEE }, queue: 9), out var queue);
Assert.NotNull(result);
Assert.Equal(new byte[] { 0xAA, 0xBB, 0xCC, 0xDD, 0xEE }, result);
Assert.Equal(9, queue);
Assert.Equal(0, assembler.PartialCount);
}
[Fact]
public void Ingest_OutOfOrderFragments_ReleasesCorrectlyOnLastArrival()
{
// Arrive as index 2, then 0, then 1 — the last arrival (index 1) is
// neither the first nor the last index, so this tests that the
// assembler releases on "count full", not "last index".
var assembler = new FragmentAssembler();
Assert.Null(assembler.Ingest(MakeFrag(3, 3, 2, new byte[] { 0xCC }), out _));
Assert.Null(assembler.Ingest(MakeFrag(3, 3, 0, new byte[] { 0xAA }), out _));
var result = assembler.Ingest(MakeFrag(3, 3, 1, new byte[] { 0xBB }), out _);
Assert.NotNull(result);
// Result must be assembled in INDEX order, not arrival order.
Assert.Equal(new byte[] { 0xAA, 0xBB, 0xCC }, result);
}
[Fact]
public void Ingest_DuplicateFragment_IsIdempotent()
{
var assembler = new FragmentAssembler();
Assert.Null(assembler.Ingest(MakeFrag(5, 2, 0, new byte[] { 0x11 }), out _));
// Resend index 0 — should not double-count or corrupt state.
Assert.Null(assembler.Ingest(MakeFrag(5, 2, 0, new byte[] { 0x11 }), out _));
// Assembler should still be waiting for index 1.
Assert.Equal(1, assembler.PartialCount);
var result = assembler.Ingest(MakeFrag(5, 2, 1, new byte[] { 0x22 }), out _);
Assert.NotNull(result);
Assert.Equal(new byte[] { 0x11, 0x22 }, result);
}
[Fact]
public void Ingest_MissingFragment_DoesNotRelease()
{
var assembler = new FragmentAssembler();
Assert.Null(assembler.Ingest(MakeFrag(9, 3, 0, new byte[] { 1 }), out _));
Assert.Null(assembler.Ingest(MakeFrag(9, 3, 2, new byte[] { 3 }), out _));
// Only 2 of 3 arrived → still waiting
Assert.Equal(1, assembler.PartialCount);
}
[Fact]
public void Ingest_TwoIndependentMessages_BuiltInParallel()
{
var assembler = new FragmentAssembler();
Assert.Null(assembler.Ingest(MakeFrag(100, 2, 0, new byte[] { 0xA1 }), out _));
Assert.Null(assembler.Ingest(MakeFrag(200, 2, 0, new byte[] { 0xB1 }), out _));
Assert.Equal(2, assembler.PartialCount);
var resultA = assembler.Ingest(MakeFrag(100, 2, 1, new byte[] { 0xA2 }), out _);
Assert.Equal(new byte[] { 0xA1, 0xA2 }, resultA);
Assert.Equal(1, assembler.PartialCount);
var resultB = assembler.Ingest(MakeFrag(200, 2, 1, new byte[] { 0xB2 }), out _);
Assert.Equal(new byte[] { 0xB1, 0xB2 }, resultB);
Assert.Equal(0, assembler.PartialCount);
}
[Fact]
public void DropAll_ClearsInFlightPartials()
{
var assembler = new FragmentAssembler();
assembler.Ingest(MakeFrag(1, 5, 0, new byte[] { 1 }), out _);
assembler.Ingest(MakeFrag(2, 5, 0, new byte[] { 2 }), out _);
Assert.Equal(2, assembler.PartialCount);
assembler.DropAll();
Assert.Equal(0, assembler.PartialCount);
}
}

View file

@ -0,0 +1,72 @@
using AcDream.Core.Net.Packets;
namespace AcDream.Core.Net.Tests.Packets;
public class MessageFragmentHeaderTests
{
[Fact]
public void PackUnpack_RoundTrip_PreservesAllFields()
{
var original = new MessageFragmentHeader
{
Sequence = 0x11223344u,
Id = 0x80000010u,
Count = 3,
TotalSize = 64,
Index = 2,
Queue = 0x0A,
};
Span<byte> buf = stackalloc byte[MessageFragmentHeader.Size];
original.Pack(buf);
var decoded = MessageFragmentHeader.Unpack(buf);
Assert.Equal(original.Sequence, decoded.Sequence);
Assert.Equal(original.Id, decoded.Id);
Assert.Equal(original.Count, decoded.Count);
Assert.Equal(original.TotalSize, decoded.TotalSize);
Assert.Equal(original.Index, decoded.Index);
Assert.Equal(original.Queue, decoded.Queue);
}
[Fact]
public void Pack_WritesLittleEndianWireFormat()
{
var h = new MessageFragmentHeader
{
Sequence = 0x04030201u, // 01 02 03 04
Id = 0x08070605u, // 05 06 07 08
Count = 0x0A09, // 09 0A
TotalSize = 0x0C0B, // 0B 0C
Index = 0x0E0D, // 0D 0E
Queue = 0x100F, // 0F 10
};
Span<byte> buf = stackalloc byte[16];
h.Pack(buf);
byte[] expected =
{
0x01, 0x02, 0x03, 0x04,
0x05, 0x06, 0x07, 0x08,
0x09, 0x0A,
0x0B, 0x0C,
0x0D, 0x0E,
0x0F, 0x10,
};
Assert.Equal(expected, buf.ToArray());
}
[Fact]
public void Constants_MatchAcProtocolLimits()
{
Assert.Equal(16, MessageFragmentHeader.Size);
Assert.Equal(464, MessageFragmentHeader.MaxFragmentSize);
Assert.Equal(448, MessageFragmentHeader.MaxFragmentDataSize);
}
[Fact]
public void Unpack_InsufficientData_Throws()
{
Assert.Throws<ArgumentException>(() => MessageFragmentHeader.Unpack(new byte[15]));
}
}

View file

@ -0,0 +1,100 @@
using AcDream.Core.Net.Packets;
namespace AcDream.Core.Net.Tests.Packets;
public class MessageFragmentTests
{
[Fact]
public void TryParse_CompleteFragment_ReturnsFragmentAndConsumedBytes()
{
// Build a synthetic fragment: 16-byte header + 4-byte payload = totalSize 20
var header = new MessageFragmentHeader
{
Sequence = 1, Id = 0x80000001u, Count = 1,
TotalSize = 20, Index = 0, Queue = 5,
};
byte[] buf = new byte[20];
header.Pack(buf);
buf[16] = 0xAA; buf[17] = 0xBB; buf[18] = 0xCC; buf[19] = 0xDD;
var (frag, consumed) = MessageFragment.TryParse(buf);
Assert.NotNull(frag);
Assert.Equal(20, consumed);
Assert.Equal(4, frag!.Value.Payload.Length);
Assert.Equal(new byte[] { 0xAA, 0xBB, 0xCC, 0xDD }, frag.Value.Payload);
Assert.Equal(5, frag.Value.Header.Queue);
}
[Fact]
public void TryParse_InsufficientSourceForHeader_ReturnsNull()
{
var (frag, consumed) = MessageFragment.TryParse(new byte[15]);
Assert.Null(frag);
Assert.Equal(0, consumed);
}
[Fact]
public void TryParse_TotalSizeTooLarge_ReturnsNull()
{
// TotalSize claims 500 bytes (> MaxFragmentSize 464).
var header = new MessageFragmentHeader { TotalSize = 500, Count = 1 };
byte[] buf = new byte[16];
header.Pack(buf);
var (frag, consumed) = MessageFragment.TryParse(buf);
Assert.Null(frag);
Assert.Equal(0, consumed);
}
[Fact]
public void TryParse_TotalSizeSmallerThanHeader_ReturnsNull()
{
var header = new MessageFragmentHeader { TotalSize = 10 };
byte[] buf = new byte[16];
header.Pack(buf);
var (frag, consumed) = MessageFragment.TryParse(buf);
Assert.Null(frag);
Assert.Equal(0, consumed);
}
[Fact]
public void TryParse_IncompleteBody_ReturnsNull()
{
// Header says 40 total bytes but buffer only has 20.
var header = new MessageFragmentHeader { TotalSize = 40, Count = 1 };
byte[] buf = new byte[20];
header.Pack(buf);
var (frag, consumed) = MessageFragment.TryParse(buf);
Assert.Null(frag);
Assert.Equal(0, consumed);
}
[Fact]
public void TryParse_ConsumesExactlyOneFragment_LeavesRemainderForCaller()
{
// Two back-to-back fragments in one buffer. First call should consume
// exactly the first fragment, leaving the second intact.
var h1 = new MessageFragmentHeader { Id = 1, Count = 1, TotalSize = 18, Index = 0 };
var h2 = new MessageFragmentHeader { Id = 2, Count = 1, TotalSize = 17, Index = 0 };
byte[] buf = new byte[35]; // 18 + 17
h1.Pack(buf.AsSpan(0, 16));
buf[16] = 0xAA; buf[17] = 0xBB;
h2.Pack(buf.AsSpan(18, 16));
buf[34] = 0xCC;
var (frag1, consumed1) = MessageFragment.TryParse(buf);
Assert.NotNull(frag1);
Assert.Equal(18, consumed1);
var (frag2, consumed2) = MessageFragment.TryParse(buf.AsSpan(consumed1));
Assert.NotNull(frag2);
Assert.Equal(17, consumed2);
Assert.Equal(new byte[] { 0xCC }, frag2!.Value.Payload);
}
}