diff --git a/src/AcDream.Core.Net/Packets/FragmentAssembler.cs b/src/AcDream.Core.Net/Packets/FragmentAssembler.cs new file mode 100644 index 0000000..6f8b554 --- /dev/null +++ b/src/AcDream.Core.Net/Packets/FragmentAssembler.cs @@ -0,0 +1,111 @@ +namespace AcDream.Core.Net.Packets; + +/// +/// Reassembles multi-fragment GameMessages. UDP packets can arrive in any +/// order and individual fragments within a logical message can be split +/// across packets, so we buffer partial messages keyed by fragment Id and +/// only yield a complete byte stream once every Count fragment for +/// that Id has arrived. +/// +/// +/// Correctness properties: +/// +/// Out-of-order arrival: fragments can arrive in any index order; +/// the full message is released on the last fragment regardless of +/// its index. +/// Duplicate-fragment idempotence: receiving index N twice for the +/// same Id is harmless — the second copy is silently ignored. +/// Single-fragment messages: Count=1 releases immediately on +/// that one fragment with no buffering. +/// Orphaned partials: if fragments for an Id arrive but the message +/// never completes, they stay buffered until +/// is called or the assembler is disposed. +/// A future phase will add a TTL-based eviction. +/// +/// +/// +public sealed class FragmentAssembler +{ + private readonly Dictionary _inFlight = new(); + + /// + /// Number of logical messages currently partially-assembled (waiting on + /// more fragments to arrive). + /// + public int PartialCount => _inFlight.Count; + + /// + /// Ingest one fragment. If this fragment completes a message, returns + /// the fully-assembled payload as a new byte array. Otherwise returns + /// null and the fragment is held for later assembly. + /// + /// The decoded fragment from the wire. + /// + /// Filled with the completed message's GameMessageGroup (queue) if the + /// call returns a non-null payload; otherwise 0. + /// + public byte[]? Ingest(in MessageFragment fragment, out ushort messageQueue) + { + var h = fragment.Header; + messageQueue = 0; + + // Single-fragment message: shortcut to avoid the dictionary. + if (h.Count == 1 && h.Index == 0) + { + messageQueue = h.Queue; + return fragment.Payload; + } + + if (!_inFlight.TryGetValue(h.Id, out var partial)) + { + partial = new PartialMessage(h.Count, h.Queue); + _inFlight[h.Id] = partial; + } + + // Idempotent: receiving the same index twice is not an error. + if (partial.Fragments[h.Index] is null) + { + partial.Fragments[h.Index] = fragment.Payload; + partial.ReceivedCount++; + } + + if (partial.ReceivedCount < partial.TotalFragments) + return null; + + // All fragments present — concatenate and release. + int totalBytes = 0; + for (int i = 0; i < partial.TotalFragments; i++) + totalBytes += partial.Fragments[i]!.Length; + + var combined = new byte[totalBytes]; + int offset = 0; + for (int i = 0; i < partial.TotalFragments; i++) + { + var p = partial.Fragments[i]!; + Buffer.BlockCopy(p, 0, combined, offset, p.Length); + offset += p.Length; + } + + _inFlight.Remove(h.Id); + messageQueue = partial.Queue; + return combined; + } + + /// Discard all in-flight partial messages. + public void DropAll() => _inFlight.Clear(); + + private sealed class PartialMessage + { + public readonly byte[]?[] Fragments; + public readonly int TotalFragments; + public readonly ushort Queue; + public int ReceivedCount; + + public PartialMessage(int count, ushort queue) + { + TotalFragments = count; + Fragments = new byte[count][]; + Queue = queue; + } + } +} diff --git a/src/AcDream.Core.Net/Packets/MessageFragment.cs b/src/AcDream.Core.Net/Packets/MessageFragment.cs new file mode 100644 index 0000000..d86b157 --- /dev/null +++ b/src/AcDream.Core.Net/Packets/MessageFragment.cs @@ -0,0 +1,44 @@ +namespace AcDream.Core.Net.Packets; + +/// +/// A complete message fragment: header + payload bytes. On the wire, a +/// UDP packet's body is zero or more fragments back-to-back; each fragment +/// is + payload.Length bytes. +/// +public readonly record struct MessageFragment(MessageFragmentHeader Header, byte[] Payload) +{ + /// Total bytes of this fragment on the wire (header + payload). + public int WireSize => MessageFragmentHeader.Size + Payload.Length; + + /// + /// Parse a fragment from the start of . Returns + /// the parsed fragment and the number of bytes consumed (so the caller + /// can advance past this fragment to read the next one from the same + /// packet body). Returns null if the fragment header's TotalSize + /// is impossible (negative payload length, or larger than the UDP + /// payload budget). + /// + public static (MessageFragment? fragment, int consumed) TryParse(ReadOnlySpan source) + { + if (source.Length < MessageFragmentHeader.Size) + return (null, 0); + + var header = MessageFragmentHeader.Unpack(source); + + // TotalSize is the fragment's own size including its header. Anything + // smaller than the header or larger than the max fragment size is + // wire corruption and we refuse to parse. + if (header.TotalSize < MessageFragmentHeader.Size + || header.TotalSize > MessageFragmentHeader.MaxFragmentSize) + { + return (null, 0); + } + + int payloadLength = header.TotalSize - MessageFragmentHeader.Size; + if (source.Length < header.TotalSize) + return (null, 0); + + var payload = source.Slice(MessageFragmentHeader.Size, payloadLength).ToArray(); + return (new MessageFragment(header, payload), header.TotalSize); + } +} diff --git a/src/AcDream.Core.Net/Packets/MessageFragmentHeader.cs b/src/AcDream.Core.Net/Packets/MessageFragmentHeader.cs new file mode 100644 index 0000000..bf88c49 --- /dev/null +++ b/src/AcDream.Core.Net/Packets/MessageFragmentHeader.cs @@ -0,0 +1,77 @@ +using System.Buffers.Binary; + +namespace AcDream.Core.Net.Packets; + +/// +/// The 16-byte header that prefixes every message fragment inside a UDP +/// packet's body. A single UDP packet can contain one or more fragments, +/// and a single logical GameMessage can be split across multiple +/// fragments that share the same and +/// but differ in . +/// +/// +/// Layout (byte offsets): +/// +/// 0 Sequence uint32 Fragment-level sequence +/// 4 Id uint32 Message id — fragments with the same Id belong +/// to the same logical GameMessage. Outbound +/// messages use the high bit set (0x80000000+). +/// 8 Count uint16 Total fragments in the logical message (1 if +/// the message fits in a single fragment) +/// 10 Size uint16 Total fragment size including this 16-byte +/// header — max 464 bytes (448 bytes of payload) +/// 12 Index uint16 0-based position of this fragment in the +/// logical message +/// 14 Queue uint16 GameMessageGroup — used by the server to +/// sequence related messages +/// +/// +/// +/// Reimplemented from ACE's AGPL reference; see NOTICE.md. +/// +public struct MessageFragmentHeader +{ + public const int Size = 16; + + /// Max total fragment size on the wire (including this header). + public const int MaxFragmentSize = 464; + + /// Max payload bytes per fragment (= MaxFragmentSize - Size). + public const int MaxFragmentDataSize = MaxFragmentSize - Size; // 448 + + public uint Sequence; + public uint Id; + public ushort Count; + public ushort TotalSize; // total bytes of this fragment including header + public ushort Index; + public ushort Queue; + + public readonly void Pack(Span destination) + { + if (destination.Length < Size) + throw new ArgumentException($"destination must be at least {Size} bytes", nameof(destination)); + + BinaryPrimitives.WriteUInt32LittleEndian(destination.Slice(0), Sequence); + BinaryPrimitives.WriteUInt32LittleEndian(destination.Slice(4), Id); + BinaryPrimitives.WriteUInt16LittleEndian(destination.Slice(8), Count); + BinaryPrimitives.WriteUInt16LittleEndian(destination.Slice(10), TotalSize); + BinaryPrimitives.WriteUInt16LittleEndian(destination.Slice(12), Index); + BinaryPrimitives.WriteUInt16LittleEndian(destination.Slice(14), Queue); + } + + public static MessageFragmentHeader Unpack(ReadOnlySpan source) + { + if (source.Length < Size) + throw new ArgumentException($"source must be at least {Size} bytes", nameof(source)); + + return new MessageFragmentHeader + { + Sequence = BinaryPrimitives.ReadUInt32LittleEndian(source.Slice(0)), + Id = BinaryPrimitives.ReadUInt32LittleEndian(source.Slice(4)), + Count = BinaryPrimitives.ReadUInt16LittleEndian(source.Slice(8)), + TotalSize = BinaryPrimitives.ReadUInt16LittleEndian(source.Slice(10)), + Index = BinaryPrimitives.ReadUInt16LittleEndian(source.Slice(12)), + Queue = BinaryPrimitives.ReadUInt16LittleEndian(source.Slice(14)), + }; + } +} diff --git a/tests/AcDream.Core.Net.Tests/Packets/FragmentAssemblerTests.cs b/tests/AcDream.Core.Net.Tests/Packets/FragmentAssemblerTests.cs new file mode 100644 index 0000000..58b1c64 --- /dev/null +++ b/tests/AcDream.Core.Net.Tests/Packets/FragmentAssemblerTests.cs @@ -0,0 +1,123 @@ +using AcDream.Core.Net.Packets; + +namespace AcDream.Core.Net.Tests.Packets; + +public class FragmentAssemblerTests +{ + private static MessageFragment MakeFrag(uint id, ushort count, ushort index, byte[] payload, ushort queue = 7) + => new( + new MessageFragmentHeader + { + Id = id, + Count = count, + Index = index, + TotalSize = (ushort)(MessageFragmentHeader.Size + payload.Length), + Queue = queue, + }, + payload); + + [Fact] + public void Ingest_SingleFragmentMessage_ReleasesImmediately() + { + var assembler = new FragmentAssembler(); + var frag = MakeFrag(id: 1, count: 1, index: 0, payload: new byte[] { 1, 2, 3 }, queue: 42); + + var result = assembler.Ingest(frag, out var queue); + + Assert.NotNull(result); + Assert.Equal(new byte[] { 1, 2, 3 }, result); + Assert.Equal(42, queue); + Assert.Equal(0, assembler.PartialCount); + } + + [Fact] + public void Ingest_ThreeFragmentsInOrder_ReleasesOnLast() + { + // Queue is a property of the logical message, not individual fragments, + // so all three fragments carry the same queue value (captured from the + // first arrival). Testing with queue=9 on all three. + var assembler = new FragmentAssembler(); + + Assert.Null(assembler.Ingest(MakeFrag(7, 3, 0, new byte[] { 0xAA, 0xBB }, queue: 9), out _)); + Assert.Equal(1, assembler.PartialCount); + Assert.Null(assembler.Ingest(MakeFrag(7, 3, 1, new byte[] { 0xCC, 0xDD }, queue: 9), out _)); + var result = assembler.Ingest(MakeFrag(7, 3, 2, new byte[] { 0xEE }, queue: 9), out var queue); + + Assert.NotNull(result); + Assert.Equal(new byte[] { 0xAA, 0xBB, 0xCC, 0xDD, 0xEE }, result); + Assert.Equal(9, queue); + Assert.Equal(0, assembler.PartialCount); + } + + [Fact] + public void Ingest_OutOfOrderFragments_ReleasesCorrectlyOnLastArrival() + { + // Arrive as index 2, then 0, then 1 — the last arrival (index 1) is + // neither the first nor the last index, so this tests that the + // assembler releases on "count full", not "last index". + var assembler = new FragmentAssembler(); + + Assert.Null(assembler.Ingest(MakeFrag(3, 3, 2, new byte[] { 0xCC }), out _)); + Assert.Null(assembler.Ingest(MakeFrag(3, 3, 0, new byte[] { 0xAA }), out _)); + var result = assembler.Ingest(MakeFrag(3, 3, 1, new byte[] { 0xBB }), out _); + + Assert.NotNull(result); + // Result must be assembled in INDEX order, not arrival order. + Assert.Equal(new byte[] { 0xAA, 0xBB, 0xCC }, result); + } + + [Fact] + public void Ingest_DuplicateFragment_IsIdempotent() + { + var assembler = new FragmentAssembler(); + Assert.Null(assembler.Ingest(MakeFrag(5, 2, 0, new byte[] { 0x11 }), out _)); + // Resend index 0 — should not double-count or corrupt state. + Assert.Null(assembler.Ingest(MakeFrag(5, 2, 0, new byte[] { 0x11 }), out _)); + // Assembler should still be waiting for index 1. + Assert.Equal(1, assembler.PartialCount); + + var result = assembler.Ingest(MakeFrag(5, 2, 1, new byte[] { 0x22 }), out _); + Assert.NotNull(result); + Assert.Equal(new byte[] { 0x11, 0x22 }, result); + } + + [Fact] + public void Ingest_MissingFragment_DoesNotRelease() + { + var assembler = new FragmentAssembler(); + Assert.Null(assembler.Ingest(MakeFrag(9, 3, 0, new byte[] { 1 }), out _)); + Assert.Null(assembler.Ingest(MakeFrag(9, 3, 2, new byte[] { 3 }), out _)); + // Only 2 of 3 arrived → still waiting + Assert.Equal(1, assembler.PartialCount); + } + + [Fact] + public void Ingest_TwoIndependentMessages_BuiltInParallel() + { + var assembler = new FragmentAssembler(); + + Assert.Null(assembler.Ingest(MakeFrag(100, 2, 0, new byte[] { 0xA1 }), out _)); + Assert.Null(assembler.Ingest(MakeFrag(200, 2, 0, new byte[] { 0xB1 }), out _)); + Assert.Equal(2, assembler.PartialCount); + + var resultA = assembler.Ingest(MakeFrag(100, 2, 1, new byte[] { 0xA2 }), out _); + Assert.Equal(new byte[] { 0xA1, 0xA2 }, resultA); + Assert.Equal(1, assembler.PartialCount); + + var resultB = assembler.Ingest(MakeFrag(200, 2, 1, new byte[] { 0xB2 }), out _); + Assert.Equal(new byte[] { 0xB1, 0xB2 }, resultB); + Assert.Equal(0, assembler.PartialCount); + } + + [Fact] + public void DropAll_ClearsInFlightPartials() + { + var assembler = new FragmentAssembler(); + assembler.Ingest(MakeFrag(1, 5, 0, new byte[] { 1 }), out _); + assembler.Ingest(MakeFrag(2, 5, 0, new byte[] { 2 }), out _); + Assert.Equal(2, assembler.PartialCount); + + assembler.DropAll(); + Assert.Equal(0, assembler.PartialCount); + } +} diff --git a/tests/AcDream.Core.Net.Tests/Packets/MessageFragmentHeaderTests.cs b/tests/AcDream.Core.Net.Tests/Packets/MessageFragmentHeaderTests.cs new file mode 100644 index 0000000..b7bbced --- /dev/null +++ b/tests/AcDream.Core.Net.Tests/Packets/MessageFragmentHeaderTests.cs @@ -0,0 +1,72 @@ +using AcDream.Core.Net.Packets; + +namespace AcDream.Core.Net.Tests.Packets; + +public class MessageFragmentHeaderTests +{ + [Fact] + public void PackUnpack_RoundTrip_PreservesAllFields() + { + var original = new MessageFragmentHeader + { + Sequence = 0x11223344u, + Id = 0x80000010u, + Count = 3, + TotalSize = 64, + Index = 2, + Queue = 0x0A, + }; + + Span buf = stackalloc byte[MessageFragmentHeader.Size]; + original.Pack(buf); + var decoded = MessageFragmentHeader.Unpack(buf); + + Assert.Equal(original.Sequence, decoded.Sequence); + Assert.Equal(original.Id, decoded.Id); + Assert.Equal(original.Count, decoded.Count); + Assert.Equal(original.TotalSize, decoded.TotalSize); + Assert.Equal(original.Index, decoded.Index); + Assert.Equal(original.Queue, decoded.Queue); + } + + [Fact] + public void Pack_WritesLittleEndianWireFormat() + { + var h = new MessageFragmentHeader + { + Sequence = 0x04030201u, // 01 02 03 04 + Id = 0x08070605u, // 05 06 07 08 + Count = 0x0A09, // 09 0A + TotalSize = 0x0C0B, // 0B 0C + Index = 0x0E0D, // 0D 0E + Queue = 0x100F, // 0F 10 + }; + Span buf = stackalloc byte[16]; + h.Pack(buf); + + byte[] expected = + { + 0x01, 0x02, 0x03, 0x04, + 0x05, 0x06, 0x07, 0x08, + 0x09, 0x0A, + 0x0B, 0x0C, + 0x0D, 0x0E, + 0x0F, 0x10, + }; + Assert.Equal(expected, buf.ToArray()); + } + + [Fact] + public void Constants_MatchAcProtocolLimits() + { + Assert.Equal(16, MessageFragmentHeader.Size); + Assert.Equal(464, MessageFragmentHeader.MaxFragmentSize); + Assert.Equal(448, MessageFragmentHeader.MaxFragmentDataSize); + } + + [Fact] + public void Unpack_InsufficientData_Throws() + { + Assert.Throws(() => MessageFragmentHeader.Unpack(new byte[15])); + } +} diff --git a/tests/AcDream.Core.Net.Tests/Packets/MessageFragmentTests.cs b/tests/AcDream.Core.Net.Tests/Packets/MessageFragmentTests.cs new file mode 100644 index 0000000..8eae892 --- /dev/null +++ b/tests/AcDream.Core.Net.Tests/Packets/MessageFragmentTests.cs @@ -0,0 +1,100 @@ +using AcDream.Core.Net.Packets; + +namespace AcDream.Core.Net.Tests.Packets; + +public class MessageFragmentTests +{ + [Fact] + public void TryParse_CompleteFragment_ReturnsFragmentAndConsumedBytes() + { + // Build a synthetic fragment: 16-byte header + 4-byte payload = totalSize 20 + var header = new MessageFragmentHeader + { + Sequence = 1, Id = 0x80000001u, Count = 1, + TotalSize = 20, Index = 0, Queue = 5, + }; + byte[] buf = new byte[20]; + header.Pack(buf); + buf[16] = 0xAA; buf[17] = 0xBB; buf[18] = 0xCC; buf[19] = 0xDD; + + var (frag, consumed) = MessageFragment.TryParse(buf); + + Assert.NotNull(frag); + Assert.Equal(20, consumed); + Assert.Equal(4, frag!.Value.Payload.Length); + Assert.Equal(new byte[] { 0xAA, 0xBB, 0xCC, 0xDD }, frag.Value.Payload); + Assert.Equal(5, frag.Value.Header.Queue); + } + + [Fact] + public void TryParse_InsufficientSourceForHeader_ReturnsNull() + { + var (frag, consumed) = MessageFragment.TryParse(new byte[15]); + Assert.Null(frag); + Assert.Equal(0, consumed); + } + + [Fact] + public void TryParse_TotalSizeTooLarge_ReturnsNull() + { + // TotalSize claims 500 bytes (> MaxFragmentSize 464). + var header = new MessageFragmentHeader { TotalSize = 500, Count = 1 }; + byte[] buf = new byte[16]; + header.Pack(buf); + + var (frag, consumed) = MessageFragment.TryParse(buf); + + Assert.Null(frag); + Assert.Equal(0, consumed); + } + + [Fact] + public void TryParse_TotalSizeSmallerThanHeader_ReturnsNull() + { + var header = new MessageFragmentHeader { TotalSize = 10 }; + byte[] buf = new byte[16]; + header.Pack(buf); + + var (frag, consumed) = MessageFragment.TryParse(buf); + + Assert.Null(frag); + Assert.Equal(0, consumed); + } + + [Fact] + public void TryParse_IncompleteBody_ReturnsNull() + { + // Header says 40 total bytes but buffer only has 20. + var header = new MessageFragmentHeader { TotalSize = 40, Count = 1 }; + byte[] buf = new byte[20]; + header.Pack(buf); + + var (frag, consumed) = MessageFragment.TryParse(buf); + + Assert.Null(frag); + Assert.Equal(0, consumed); + } + + [Fact] + public void TryParse_ConsumesExactlyOneFragment_LeavesRemainderForCaller() + { + // Two back-to-back fragments in one buffer. First call should consume + // exactly the first fragment, leaving the second intact. + var h1 = new MessageFragmentHeader { Id = 1, Count = 1, TotalSize = 18, Index = 0 }; + var h2 = new MessageFragmentHeader { Id = 2, Count = 1, TotalSize = 17, Index = 0 }; + byte[] buf = new byte[35]; // 18 + 17 + h1.Pack(buf.AsSpan(0, 16)); + buf[16] = 0xAA; buf[17] = 0xBB; + h2.Pack(buf.AsSpan(18, 16)); + buf[34] = 0xCC; + + var (frag1, consumed1) = MessageFragment.TryParse(buf); + Assert.NotNull(frag1); + Assert.Equal(18, consumed1); + + var (frag2, consumed2) = MessageFragment.TryParse(buf.AsSpan(consumed1)); + Assert.NotNull(frag2); + Assert.Equal(17, consumed2); + Assert.Equal(new byte[] { 0xCC }, frag2!.Value.Payload); + } +}