From 3226c4bcabcf98d7e17fdd47ca3fd3f3df56fad1 Mon Sep 17 00:00:00 2001 From: Erik Date: Sat, 11 Apr 2026 14:20:53 +0200 Subject: [PATCH] feat(net): message fragment header + fragment + assembler (Phase 4.3) Ports the fragment layer of the AC UDP protocol. A UDP packet's body is zero or more message fragments back-to-back; a logical GameMessage that doesn't fit in ~448 bytes gets split across multiple fragments sharing the same Id with differing Index values. The assembler handles reassembly across arbitrary arrival ordering and duplicate fragments. Added (all reimplemented from ACE's AGPL reference, see NOTICE.md): - Packets/MessageFragmentHeader.cs: 16-byte fragment header struct with Pack/Unpack, constants for MaxFragmentSize (464) and MaxFragmentDataSize (448). Bit-layout doc comment documents what each field is for. - Packets/MessageFragment.cs: readonly record struct bundling a header with its payload bytes; TryParse(source) parses one fragment from the start of a buffer and returns (fragment, consumed) for incremental parsing of multi-fragment packets. Refuses to parse fragments with impossible TotalSize (too small for header, too large for the 464-byte max, or larger than the source buffer). - Packets/FragmentAssembler.cs: buffers partial messages keyed by fragment Id. Ingest(frag, out queue) returns the assembled byte[] when the last fragment arrives, null while still waiting. Key correctness properties, all tested: * Single-fragment (Count=1) shortcut releases with no buffering * Out-of-order arrival (e.g. 2, 0, 1) releases on last arrival and assembles in INDEX order, not arrival order * Duplicate-fragment idempotence (re-sending same index is a no-op) * Missing fragments stay buffered; DropAll() forcibly clears them * Two independent messages can be assembled in parallel without interfering * messageQueue captured from first-arriving fragment (it's a property of the logical message, not individual fragments) Tests (17 new, 37 total in net project, 114 across both test projects): - MessageFragmentHeader (4): pack/unpack round-trip, little-endian wire format, constants, size-check throw - MessageFragment (6): complete parse, insufficient header, oversized TotalSize, undersized TotalSize, incomplete body, two-back-to-back incremental parse - FragmentAssembler (7): single-fragment, in-order 3-fragment, out-of-order 3-fragment (tests index-order assembly), duplicate idempotence, missing-fragment buffered, two parallel messages, DropAll Phase 4.4 (GameMessage reader + opcode handlers) next. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Packets/FragmentAssembler.cs | 111 ++++++++++++++++ .../Packets/MessageFragment.cs | 44 +++++++ .../Packets/MessageFragmentHeader.cs | 77 +++++++++++ .../Packets/FragmentAssemblerTests.cs | 123 ++++++++++++++++++ .../Packets/MessageFragmentHeaderTests.cs | 72 ++++++++++ .../Packets/MessageFragmentTests.cs | 100 ++++++++++++++ 6 files changed, 527 insertions(+) create mode 100644 src/AcDream.Core.Net/Packets/FragmentAssembler.cs create mode 100644 src/AcDream.Core.Net/Packets/MessageFragment.cs create mode 100644 src/AcDream.Core.Net/Packets/MessageFragmentHeader.cs create mode 100644 tests/AcDream.Core.Net.Tests/Packets/FragmentAssemblerTests.cs create mode 100644 tests/AcDream.Core.Net.Tests/Packets/MessageFragmentHeaderTests.cs create mode 100644 tests/AcDream.Core.Net.Tests/Packets/MessageFragmentTests.cs diff --git a/src/AcDream.Core.Net/Packets/FragmentAssembler.cs b/src/AcDream.Core.Net/Packets/FragmentAssembler.cs new file mode 100644 index 0000000..6f8b554 --- /dev/null +++ b/src/AcDream.Core.Net/Packets/FragmentAssembler.cs @@ -0,0 +1,111 @@ +namespace AcDream.Core.Net.Packets; + +/// +/// Reassembles multi-fragment GameMessages. UDP packets can arrive in any +/// order and individual fragments within a logical message can be split +/// across packets, so we buffer partial messages keyed by fragment Id and +/// only yield a complete byte stream once every Count fragment for +/// that Id has arrived. +/// +/// +/// Correctness properties: +/// +/// Out-of-order arrival: fragments can arrive in any index order; +/// the full message is released on the last fragment regardless of +/// its index. +/// Duplicate-fragment idempotence: receiving index N twice for the +/// same Id is harmless — the second copy is silently ignored. +/// Single-fragment messages: Count=1 releases immediately on +/// that one fragment with no buffering. +/// Orphaned partials: if fragments for an Id arrive but the message +/// never completes, they stay buffered until +/// is called or the assembler is disposed. +/// A future phase will add a TTL-based eviction. +/// +/// +/// +public sealed class FragmentAssembler +{ + private readonly Dictionary _inFlight = new(); + + /// + /// Number of logical messages currently partially-assembled (waiting on + /// more fragments to arrive). + /// + public int PartialCount => _inFlight.Count; + + /// + /// Ingest one fragment. If this fragment completes a message, returns + /// the fully-assembled payload as a new byte array. Otherwise returns + /// null and the fragment is held for later assembly. + /// + /// The decoded fragment from the wire. + /// + /// Filled with the completed message's GameMessageGroup (queue) if the + /// call returns a non-null payload; otherwise 0. + /// + public byte[]? Ingest(in MessageFragment fragment, out ushort messageQueue) + { + var h = fragment.Header; + messageQueue = 0; + + // Single-fragment message: shortcut to avoid the dictionary. + if (h.Count == 1 && h.Index == 0) + { + messageQueue = h.Queue; + return fragment.Payload; + } + + if (!_inFlight.TryGetValue(h.Id, out var partial)) + { + partial = new PartialMessage(h.Count, h.Queue); + _inFlight[h.Id] = partial; + } + + // Idempotent: receiving the same index twice is not an error. + if (partial.Fragments[h.Index] is null) + { + partial.Fragments[h.Index] = fragment.Payload; + partial.ReceivedCount++; + } + + if (partial.ReceivedCount < partial.TotalFragments) + return null; + + // All fragments present — concatenate and release. + int totalBytes = 0; + for (int i = 0; i < partial.TotalFragments; i++) + totalBytes += partial.Fragments[i]!.Length; + + var combined = new byte[totalBytes]; + int offset = 0; + for (int i = 0; i < partial.TotalFragments; i++) + { + var p = partial.Fragments[i]!; + Buffer.BlockCopy(p, 0, combined, offset, p.Length); + offset += p.Length; + } + + _inFlight.Remove(h.Id); + messageQueue = partial.Queue; + return combined; + } + + /// Discard all in-flight partial messages. + public void DropAll() => _inFlight.Clear(); + + private sealed class PartialMessage + { + public readonly byte[]?[] Fragments; + public readonly int TotalFragments; + public readonly ushort Queue; + public int ReceivedCount; + + public PartialMessage(int count, ushort queue) + { + TotalFragments = count; + Fragments = new byte[count][]; + Queue = queue; + } + } +} diff --git a/src/AcDream.Core.Net/Packets/MessageFragment.cs b/src/AcDream.Core.Net/Packets/MessageFragment.cs new file mode 100644 index 0000000..d86b157 --- /dev/null +++ b/src/AcDream.Core.Net/Packets/MessageFragment.cs @@ -0,0 +1,44 @@ +namespace AcDream.Core.Net.Packets; + +/// +/// A complete message fragment: header + payload bytes. On the wire, a +/// UDP packet's body is zero or more fragments back-to-back; each fragment +/// is + payload.Length bytes. +/// +public readonly record struct MessageFragment(MessageFragmentHeader Header, byte[] Payload) +{ + /// Total bytes of this fragment on the wire (header + payload). + public int WireSize => MessageFragmentHeader.Size + Payload.Length; + + /// + /// Parse a fragment from the start of . Returns + /// the parsed fragment and the number of bytes consumed (so the caller + /// can advance past this fragment to read the next one from the same + /// packet body). Returns null if the fragment header's TotalSize + /// is impossible (negative payload length, or larger than the UDP + /// payload budget). + /// + public static (MessageFragment? fragment, int consumed) TryParse(ReadOnlySpan source) + { + if (source.Length < MessageFragmentHeader.Size) + return (null, 0); + + var header = MessageFragmentHeader.Unpack(source); + + // TotalSize is the fragment's own size including its header. Anything + // smaller than the header or larger than the max fragment size is + // wire corruption and we refuse to parse. + if (header.TotalSize < MessageFragmentHeader.Size + || header.TotalSize > MessageFragmentHeader.MaxFragmentSize) + { + return (null, 0); + } + + int payloadLength = header.TotalSize - MessageFragmentHeader.Size; + if (source.Length < header.TotalSize) + return (null, 0); + + var payload = source.Slice(MessageFragmentHeader.Size, payloadLength).ToArray(); + return (new MessageFragment(header, payload), header.TotalSize); + } +} diff --git a/src/AcDream.Core.Net/Packets/MessageFragmentHeader.cs b/src/AcDream.Core.Net/Packets/MessageFragmentHeader.cs new file mode 100644 index 0000000..bf88c49 --- /dev/null +++ b/src/AcDream.Core.Net/Packets/MessageFragmentHeader.cs @@ -0,0 +1,77 @@ +using System.Buffers.Binary; + +namespace AcDream.Core.Net.Packets; + +/// +/// The 16-byte header that prefixes every message fragment inside a UDP +/// packet's body. A single UDP packet can contain one or more fragments, +/// and a single logical GameMessage can be split across multiple +/// fragments that share the same and +/// but differ in . +/// +/// +/// Layout (byte offsets): +/// +/// 0 Sequence uint32 Fragment-level sequence +/// 4 Id uint32 Message id — fragments with the same Id belong +/// to the same logical GameMessage. Outbound +/// messages use the high bit set (0x80000000+). +/// 8 Count uint16 Total fragments in the logical message (1 if +/// the message fits in a single fragment) +/// 10 Size uint16 Total fragment size including this 16-byte +/// header — max 464 bytes (448 bytes of payload) +/// 12 Index uint16 0-based position of this fragment in the +/// logical message +/// 14 Queue uint16 GameMessageGroup — used by the server to +/// sequence related messages +/// +/// +/// +/// Reimplemented from ACE's AGPL reference; see NOTICE.md. +/// +public struct MessageFragmentHeader +{ + public const int Size = 16; + + /// Max total fragment size on the wire (including this header). + public const int MaxFragmentSize = 464; + + /// Max payload bytes per fragment (= MaxFragmentSize - Size). + public const int MaxFragmentDataSize = MaxFragmentSize - Size; // 448 + + public uint Sequence; + public uint Id; + public ushort Count; + public ushort TotalSize; // total bytes of this fragment including header + public ushort Index; + public ushort Queue; + + public readonly void Pack(Span destination) + { + if (destination.Length < Size) + throw new ArgumentException($"destination must be at least {Size} bytes", nameof(destination)); + + BinaryPrimitives.WriteUInt32LittleEndian(destination.Slice(0), Sequence); + BinaryPrimitives.WriteUInt32LittleEndian(destination.Slice(4), Id); + BinaryPrimitives.WriteUInt16LittleEndian(destination.Slice(8), Count); + BinaryPrimitives.WriteUInt16LittleEndian(destination.Slice(10), TotalSize); + BinaryPrimitives.WriteUInt16LittleEndian(destination.Slice(12), Index); + BinaryPrimitives.WriteUInt16LittleEndian(destination.Slice(14), Queue); + } + + public static MessageFragmentHeader Unpack(ReadOnlySpan source) + { + if (source.Length < Size) + throw new ArgumentException($"source must be at least {Size} bytes", nameof(source)); + + return new MessageFragmentHeader + { + Sequence = BinaryPrimitives.ReadUInt32LittleEndian(source.Slice(0)), + Id = BinaryPrimitives.ReadUInt32LittleEndian(source.Slice(4)), + Count = BinaryPrimitives.ReadUInt16LittleEndian(source.Slice(8)), + TotalSize = BinaryPrimitives.ReadUInt16LittleEndian(source.Slice(10)), + Index = BinaryPrimitives.ReadUInt16LittleEndian(source.Slice(12)), + Queue = BinaryPrimitives.ReadUInt16LittleEndian(source.Slice(14)), + }; + } +} diff --git a/tests/AcDream.Core.Net.Tests/Packets/FragmentAssemblerTests.cs b/tests/AcDream.Core.Net.Tests/Packets/FragmentAssemblerTests.cs new file mode 100644 index 0000000..58b1c64 --- /dev/null +++ b/tests/AcDream.Core.Net.Tests/Packets/FragmentAssemblerTests.cs @@ -0,0 +1,123 @@ +using AcDream.Core.Net.Packets; + +namespace AcDream.Core.Net.Tests.Packets; + +public class FragmentAssemblerTests +{ + private static MessageFragment MakeFrag(uint id, ushort count, ushort index, byte[] payload, ushort queue = 7) + => new( + new MessageFragmentHeader + { + Id = id, + Count = count, + Index = index, + TotalSize = (ushort)(MessageFragmentHeader.Size + payload.Length), + Queue = queue, + }, + payload); + + [Fact] + public void Ingest_SingleFragmentMessage_ReleasesImmediately() + { + var assembler = new FragmentAssembler(); + var frag = MakeFrag(id: 1, count: 1, index: 0, payload: new byte[] { 1, 2, 3 }, queue: 42); + + var result = assembler.Ingest(frag, out var queue); + + Assert.NotNull(result); + Assert.Equal(new byte[] { 1, 2, 3 }, result); + Assert.Equal(42, queue); + Assert.Equal(0, assembler.PartialCount); + } + + [Fact] + public void Ingest_ThreeFragmentsInOrder_ReleasesOnLast() + { + // Queue is a property of the logical message, not individual fragments, + // so all three fragments carry the same queue value (captured from the + // first arrival). Testing with queue=9 on all three. + var assembler = new FragmentAssembler(); + + Assert.Null(assembler.Ingest(MakeFrag(7, 3, 0, new byte[] { 0xAA, 0xBB }, queue: 9), out _)); + Assert.Equal(1, assembler.PartialCount); + Assert.Null(assembler.Ingest(MakeFrag(7, 3, 1, new byte[] { 0xCC, 0xDD }, queue: 9), out _)); + var result = assembler.Ingest(MakeFrag(7, 3, 2, new byte[] { 0xEE }, queue: 9), out var queue); + + Assert.NotNull(result); + Assert.Equal(new byte[] { 0xAA, 0xBB, 0xCC, 0xDD, 0xEE }, result); + Assert.Equal(9, queue); + Assert.Equal(0, assembler.PartialCount); + } + + [Fact] + public void Ingest_OutOfOrderFragments_ReleasesCorrectlyOnLastArrival() + { + // Arrive as index 2, then 0, then 1 — the last arrival (index 1) is + // neither the first nor the last index, so this tests that the + // assembler releases on "count full", not "last index". + var assembler = new FragmentAssembler(); + + Assert.Null(assembler.Ingest(MakeFrag(3, 3, 2, new byte[] { 0xCC }), out _)); + Assert.Null(assembler.Ingest(MakeFrag(3, 3, 0, new byte[] { 0xAA }), out _)); + var result = assembler.Ingest(MakeFrag(3, 3, 1, new byte[] { 0xBB }), out _); + + Assert.NotNull(result); + // Result must be assembled in INDEX order, not arrival order. + Assert.Equal(new byte[] { 0xAA, 0xBB, 0xCC }, result); + } + + [Fact] + public void Ingest_DuplicateFragment_IsIdempotent() + { + var assembler = new FragmentAssembler(); + Assert.Null(assembler.Ingest(MakeFrag(5, 2, 0, new byte[] { 0x11 }), out _)); + // Resend index 0 — should not double-count or corrupt state. + Assert.Null(assembler.Ingest(MakeFrag(5, 2, 0, new byte[] { 0x11 }), out _)); + // Assembler should still be waiting for index 1. + Assert.Equal(1, assembler.PartialCount); + + var result = assembler.Ingest(MakeFrag(5, 2, 1, new byte[] { 0x22 }), out _); + Assert.NotNull(result); + Assert.Equal(new byte[] { 0x11, 0x22 }, result); + } + + [Fact] + public void Ingest_MissingFragment_DoesNotRelease() + { + var assembler = new FragmentAssembler(); + Assert.Null(assembler.Ingest(MakeFrag(9, 3, 0, new byte[] { 1 }), out _)); + Assert.Null(assembler.Ingest(MakeFrag(9, 3, 2, new byte[] { 3 }), out _)); + // Only 2 of 3 arrived → still waiting + Assert.Equal(1, assembler.PartialCount); + } + + [Fact] + public void Ingest_TwoIndependentMessages_BuiltInParallel() + { + var assembler = new FragmentAssembler(); + + Assert.Null(assembler.Ingest(MakeFrag(100, 2, 0, new byte[] { 0xA1 }), out _)); + Assert.Null(assembler.Ingest(MakeFrag(200, 2, 0, new byte[] { 0xB1 }), out _)); + Assert.Equal(2, assembler.PartialCount); + + var resultA = assembler.Ingest(MakeFrag(100, 2, 1, new byte[] { 0xA2 }), out _); + Assert.Equal(new byte[] { 0xA1, 0xA2 }, resultA); + Assert.Equal(1, assembler.PartialCount); + + var resultB = assembler.Ingest(MakeFrag(200, 2, 1, new byte[] { 0xB2 }), out _); + Assert.Equal(new byte[] { 0xB1, 0xB2 }, resultB); + Assert.Equal(0, assembler.PartialCount); + } + + [Fact] + public void DropAll_ClearsInFlightPartials() + { + var assembler = new FragmentAssembler(); + assembler.Ingest(MakeFrag(1, 5, 0, new byte[] { 1 }), out _); + assembler.Ingest(MakeFrag(2, 5, 0, new byte[] { 2 }), out _); + Assert.Equal(2, assembler.PartialCount); + + assembler.DropAll(); + Assert.Equal(0, assembler.PartialCount); + } +} diff --git a/tests/AcDream.Core.Net.Tests/Packets/MessageFragmentHeaderTests.cs b/tests/AcDream.Core.Net.Tests/Packets/MessageFragmentHeaderTests.cs new file mode 100644 index 0000000..b7bbced --- /dev/null +++ b/tests/AcDream.Core.Net.Tests/Packets/MessageFragmentHeaderTests.cs @@ -0,0 +1,72 @@ +using AcDream.Core.Net.Packets; + +namespace AcDream.Core.Net.Tests.Packets; + +public class MessageFragmentHeaderTests +{ + [Fact] + public void PackUnpack_RoundTrip_PreservesAllFields() + { + var original = new MessageFragmentHeader + { + Sequence = 0x11223344u, + Id = 0x80000010u, + Count = 3, + TotalSize = 64, + Index = 2, + Queue = 0x0A, + }; + + Span buf = stackalloc byte[MessageFragmentHeader.Size]; + original.Pack(buf); + var decoded = MessageFragmentHeader.Unpack(buf); + + Assert.Equal(original.Sequence, decoded.Sequence); + Assert.Equal(original.Id, decoded.Id); + Assert.Equal(original.Count, decoded.Count); + Assert.Equal(original.TotalSize, decoded.TotalSize); + Assert.Equal(original.Index, decoded.Index); + Assert.Equal(original.Queue, decoded.Queue); + } + + [Fact] + public void Pack_WritesLittleEndianWireFormat() + { + var h = new MessageFragmentHeader + { + Sequence = 0x04030201u, // 01 02 03 04 + Id = 0x08070605u, // 05 06 07 08 + Count = 0x0A09, // 09 0A + TotalSize = 0x0C0B, // 0B 0C + Index = 0x0E0D, // 0D 0E + Queue = 0x100F, // 0F 10 + }; + Span buf = stackalloc byte[16]; + h.Pack(buf); + + byte[] expected = + { + 0x01, 0x02, 0x03, 0x04, + 0x05, 0x06, 0x07, 0x08, + 0x09, 0x0A, + 0x0B, 0x0C, + 0x0D, 0x0E, + 0x0F, 0x10, + }; + Assert.Equal(expected, buf.ToArray()); + } + + [Fact] + public void Constants_MatchAcProtocolLimits() + { + Assert.Equal(16, MessageFragmentHeader.Size); + Assert.Equal(464, MessageFragmentHeader.MaxFragmentSize); + Assert.Equal(448, MessageFragmentHeader.MaxFragmentDataSize); + } + + [Fact] + public void Unpack_InsufficientData_Throws() + { + Assert.Throws(() => MessageFragmentHeader.Unpack(new byte[15])); + } +} diff --git a/tests/AcDream.Core.Net.Tests/Packets/MessageFragmentTests.cs b/tests/AcDream.Core.Net.Tests/Packets/MessageFragmentTests.cs new file mode 100644 index 0000000..8eae892 --- /dev/null +++ b/tests/AcDream.Core.Net.Tests/Packets/MessageFragmentTests.cs @@ -0,0 +1,100 @@ +using AcDream.Core.Net.Packets; + +namespace AcDream.Core.Net.Tests.Packets; + +public class MessageFragmentTests +{ + [Fact] + public void TryParse_CompleteFragment_ReturnsFragmentAndConsumedBytes() + { + // Build a synthetic fragment: 16-byte header + 4-byte payload = totalSize 20 + var header = new MessageFragmentHeader + { + Sequence = 1, Id = 0x80000001u, Count = 1, + TotalSize = 20, Index = 0, Queue = 5, + }; + byte[] buf = new byte[20]; + header.Pack(buf); + buf[16] = 0xAA; buf[17] = 0xBB; buf[18] = 0xCC; buf[19] = 0xDD; + + var (frag, consumed) = MessageFragment.TryParse(buf); + + Assert.NotNull(frag); + Assert.Equal(20, consumed); + Assert.Equal(4, frag!.Value.Payload.Length); + Assert.Equal(new byte[] { 0xAA, 0xBB, 0xCC, 0xDD }, frag.Value.Payload); + Assert.Equal(5, frag.Value.Header.Queue); + } + + [Fact] + public void TryParse_InsufficientSourceForHeader_ReturnsNull() + { + var (frag, consumed) = MessageFragment.TryParse(new byte[15]); + Assert.Null(frag); + Assert.Equal(0, consumed); + } + + [Fact] + public void TryParse_TotalSizeTooLarge_ReturnsNull() + { + // TotalSize claims 500 bytes (> MaxFragmentSize 464). + var header = new MessageFragmentHeader { TotalSize = 500, Count = 1 }; + byte[] buf = new byte[16]; + header.Pack(buf); + + var (frag, consumed) = MessageFragment.TryParse(buf); + + Assert.Null(frag); + Assert.Equal(0, consumed); + } + + [Fact] + public void TryParse_TotalSizeSmallerThanHeader_ReturnsNull() + { + var header = new MessageFragmentHeader { TotalSize = 10 }; + byte[] buf = new byte[16]; + header.Pack(buf); + + var (frag, consumed) = MessageFragment.TryParse(buf); + + Assert.Null(frag); + Assert.Equal(0, consumed); + } + + [Fact] + public void TryParse_IncompleteBody_ReturnsNull() + { + // Header says 40 total bytes but buffer only has 20. + var header = new MessageFragmentHeader { TotalSize = 40, Count = 1 }; + byte[] buf = new byte[20]; + header.Pack(buf); + + var (frag, consumed) = MessageFragment.TryParse(buf); + + Assert.Null(frag); + Assert.Equal(0, consumed); + } + + [Fact] + public void TryParse_ConsumesExactlyOneFragment_LeavesRemainderForCaller() + { + // Two back-to-back fragments in one buffer. First call should consume + // exactly the first fragment, leaving the second intact. + var h1 = new MessageFragmentHeader { Id = 1, Count = 1, TotalSize = 18, Index = 0 }; + var h2 = new MessageFragmentHeader { Id = 2, Count = 1, TotalSize = 17, Index = 0 }; + byte[] buf = new byte[35]; // 18 + 17 + h1.Pack(buf.AsSpan(0, 16)); + buf[16] = 0xAA; buf[17] = 0xBB; + h2.Pack(buf.AsSpan(18, 16)); + buf[34] = 0xCC; + + var (frag1, consumed1) = MessageFragment.TryParse(buf); + Assert.NotNull(frag1); + Assert.Equal(18, consumed1); + + var (frag2, consumed2) = MessageFragment.TryParse(buf.AsSpan(consumed1)); + Assert.NotNull(frag2); + Assert.Equal(17, consumed2); + Assert.Equal(new byte[] { 0xCC }, frag2!.Value.Payload); + } +}