From b987f006a91bd9e8f90511dcd6667b5ee5cb3a96 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 26 Oct 2023 19:20:40 +0100 Subject: [PATCH] NATS buffer writer --- src/NATS.Client.Core/NatsBufferWriter.cs | 451 +++++++++++++++++++++++ 1 file changed, 451 insertions(+) create mode 100644 src/NATS.Client.Core/NatsBufferWriter.cs diff --git a/src/NATS.Client.Core/NatsBufferWriter.cs b/src/NATS.Client.Core/NatsBufferWriter.cs new file mode 100644 index 000000000..9eab602a4 --- /dev/null +++ b/src/NATS.Client.Core/NatsBufferWriter.cs @@ -0,0 +1,451 @@ +// adapted from https://github.com/CommunityToolkit/dotnet/blob/main/src/CommunityToolkit.HighPerformance/Buffers/NatsBufferWriter%7BT%7D.cs + +using System.Buffers; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using BitOperations = System.Numerics.BitOperations; + +namespace NATS.Client.Core; + +/// +/// An interface that expands with the ability to also inspect +/// the written data, and to reset the underlying buffer to write again from the start. +/// +/// The type of items in the current buffer. +public interface INatsBuffer : IBufferWriter +{ + /// + /// Gets the data written to the underlying buffer so far, as a . + /// + ReadOnlyMemory WrittenMemory { get; } + + /// + /// Gets the data written to the underlying buffer so far, as a . + /// + ReadOnlySpan WrittenSpan { get; } + + /// + /// Gets the amount of data written to the underlying buffer so far. + /// + int WrittenCount { get; } + + /// + /// Gets the total amount of space within the underlying buffer. + /// + int Capacity { get; } + + /// + /// Gets the amount of space available that can still be written into without forcing the underlying buffer to grow. + /// + int FreeCapacity { get; } + + /// + /// Clears the data written to the underlying buffer. + /// + /// + /// You must clear the instance before trying to re-use it. + /// + void Clear(); +} + +/// +/// Represents a heap-based, array-backed output sink into which data can be written. +/// +/// The type of items to write to the current instance. +public class NatsBufferWriter : INatsBuffer, IMemoryOwner +{ + /// + /// The default buffer size to use to expand empty arrays. + /// + private const int DefaultInitialBufferSize = 256; + + /// + /// The instance used to rent . + /// + private readonly ArrayPool _pool; + + /// + /// The underlying array. + /// + private T[]? _array; + +#pragma warning disable IDE0032 // Use field over auto-property (clearer and faster) + /// + /// The starting offset within . + /// + private int _index; +#pragma warning restore IDE0032 + + /// + /// Initializes a new instance of the class. + /// + public NatsBufferWriter() + : this(ArrayPool.Shared, DefaultInitialBufferSize) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The instance to use. + public NatsBufferWriter(ArrayPool pool) + : this(pool, DefaultInitialBufferSize) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The minimum capacity with which to initialize the underlying buffer. + /// Thrown when is not valid. + public NatsBufferWriter(int initialCapacity) + : this(ArrayPool.Shared, initialCapacity) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The instance to use. + /// The minimum capacity with which to initialize the underlying buffer. + /// Thrown when is not valid. + public NatsBufferWriter(ArrayPool pool, int initialCapacity) + { + // Since we're using pooled arrays, we can rent the buffer with the + // default size immediately, we don't need to use lazy initialization + // to save unnecessary memory allocations in this case. + // Additionally, we don't need to manually throw the exception if + // the requested size is not valid, as that'll be thrown automatically + // by the array pool in use when we try to rent an array with that size. + _pool = pool; + _array = pool.Rent(initialCapacity); + _index = 0; + } + + /// + Memory IMemoryOwner.Memory + { + // This property is explicitly implemented so that it's hidden + // under normal usage, as the name could be confusing when + // displayed besides WrittenMemory and GetMemory(). + // The IMemoryOwner interface is implemented primarily + // so that the AsStream() extension can be used on this type, + // allowing users to first create a NatsBufferWriter + // instance to write data to, then get a stream through the + // extension and let it take care of returning the underlying + // buffer to the shared pool when it's no longer necessary. + // Inlining is not needed here since this will always be a callvirt. + get => MemoryMarshal.AsMemory(WrittenMemory); + } + + /// + public ReadOnlyMemory WrittenMemory + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array!.AsMemory(0, _index); + } + } + + /// + public ReadOnlySpan WrittenSpan + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array!.AsSpan(0, _index); + } + } + + /// + public int WrittenCount + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => _index; + } + + /// + public int Capacity + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array!.Length; + } + } + + /// + public int FreeCapacity + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array!.Length - _index; + } + } + + /// + public void Clear() + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + array.AsSpan(0, _index).Clear(); + + _index = 0; + } + + /// + public void Advance(int count) + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + if (count < 0) + { + ThrowArgumentOutOfRangeExceptionForNegativeCount(); + } + + if (_index > array!.Length - count) + { + ThrowArgumentExceptionForAdvancedTooFar(); + } + + _index += count; + } + + /// + public Memory GetMemory(int sizeHint = 0) + { + CheckBufferAndEnsureCapacity(sizeHint); + + return _array.AsMemory(_index); + } + + /// + public Span GetSpan(int sizeHint = 0) + { + CheckBufferAndEnsureCapacity(sizeHint); + + return _array.AsSpan(_index); + } + + /// + /// Gets an instance wrapping the underlying array in use. + /// + /// An instance wrapping the underlying array in use. + /// Thrown when the buffer in use has already been disposed. + /// + /// This method is meant to be used when working with APIs that only accept an array as input, and should be used with caution. + /// In particular, the returned array is rented from an array pool, and it is responsibility of the caller to ensure that it's + /// not used after the current instance is disposed. Doing so is considered undefined + /// behavior, as the same array might be in use within another instance. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public ArraySegment DangerousGetArray() + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return new(array!, 0, _index); + } + + /// + public void Dispose() + { + var array = _array; + + if (array is null) + { + return; + } + + _array = null; + + _pool.Return(array); + } + + /// + public override string ToString() + { + // See comments in MemoryOwner about this + if (typeof(T) == typeof(char) && + _array is char[] chars) + { + return new(chars, 0, _index); + } + + // Same representation used in Span + return $"CommunityToolkit.HighPerformance.Buffers.NatsBufferWriter<{typeof(T)}>[{_index}]"; + } + + /// + /// Throws an when the requested count is negative. + /// + private static void ThrowArgumentOutOfRangeExceptionForNegativeCount() + { + throw new ArgumentOutOfRangeException("count", "The count can't be a negative value."); + } + + /// + /// Throws an when the size hint is negative. + /// + private static void ThrowArgumentOutOfRangeExceptionForNegativeSizeHint() + { + throw new ArgumentOutOfRangeException("sizeHint", "The size hint can't be a negative value."); + } + + /// + /// Throws an when the requested count is negative. + /// + private static void ThrowArgumentExceptionForAdvancedTooFar() + { + throw new ArgumentException("The buffer writer has advanced too far."); + } + + /// + /// Throws an when is . + /// + private static void ThrowObjectDisposedException() + { + throw new ObjectDisposedException("The current buffer has already been disposed."); + } + + /// + /// Ensures that has enough free space to contain a given number of new items. + /// + /// The minimum number of items to ensure space for in . + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void CheckBufferAndEnsureCapacity(int sizeHint) + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + if (sizeHint < 0) + { + ThrowArgumentOutOfRangeExceptionForNegativeSizeHint(); + } + + if (sizeHint == 0) + { + sizeHint = 1; + } + + if (sizeHint > array!.Length - _index) + { + ResizeBuffer(sizeHint); + } + } + + /// + /// Resizes to ensure it can fit the specified number of new items. + /// + /// The minimum number of items to ensure space for in . + [MethodImpl(MethodImplOptions.NoInlining)] + private void ResizeBuffer(int sizeHint) + { + var minimumSize = (uint)_index + (uint)sizeHint; + + // The ArrayPool class has a maximum threshold of 1024 * 1024 for the maximum length of + // pooled arrays, and once this is exceeded it will just allocate a new array every time + // of exactly the requested size. In that case, we manually round up the requested size to + // the nearest power of two, to ensure that repeated consecutive writes when the array in + // use is bigger than that threshold don't end up causing a resize every single time. + if (minimumSize > 1024 * 1024) + { + minimumSize = BitOperations.RoundUpToPowerOf2(minimumSize); + } + + _pool.Resize(ref _array, (int)minimumSize); + } +} + +internal static class NatsBufferWriterExtensions +{ + /// + /// Changes the number of elements of a rented one-dimensional array to the specified new size. + /// + /// The type of items into the target array to resize. + /// The target instance to use to resize the array. + /// The rented array to resize, or to create a new array. + /// The size of the new array. + /// Indicates whether the contents of the array should be cleared before reuse. + /// Thrown when is less than 0. + /// When this method returns, the caller must not use any references to the old array anymore. + public static void Resize(this ArrayPool pool, [NotNull] ref T[]? array, int newSize, bool clearArray = false) + { + // If the old array is null, just create a new one with the requested size + if (array is null) + { + array = pool.Rent(newSize); + + return; + } + + // If the new size is the same as the current size, do nothing + if (array.Length == newSize) + { + return; + } + + // Rent a new array with the specified size, and copy as many items from the current array + // as possible to the new array. This mirrors the behavior of the Array.Resize API from + // the BCL: if the new size is greater than the length of the current array, copy all the + // items from the original array into the new one. Otherwise, copy as many items as possible, + // until the new array is completely filled, and ignore the remaining items in the first array. + var newArray = pool.Rent(newSize); + var itemsToCopy = Math.Min(array.Length, newSize); + + Array.Copy(array, 0, newArray, 0, itemsToCopy); + + pool.Return(array, clearArray); + + array = newArray; + } +}