diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 000000000..5d5c1225a --- /dev/null +++ b/.editorconfig @@ -0,0 +1,17 @@ +root = true + +[*] +insert_final_newline = true +indent_style = space +indent_size = 4 +trim_trailing_whitespace = true + +[*{_AssemblyInfo.cs,.notsupported.cs}] +generated_code = true + +# C# files +[*.cs] +charset = utf-8-bom + +csharp_style_namespace_declarations = file_scoped +dotnet_style_require_accessibility_modifiers = never \ No newline at end of file diff --git a/sandbox/ConsoleApp/ConsoleApp.csproj b/sandbox/ConsoleApp/ConsoleApp.csproj index c4cdfd8a3..088d61348 100644 --- a/sandbox/ConsoleApp/ConsoleApp.csproj +++ b/sandbox/ConsoleApp/ConsoleApp.csproj @@ -9,6 +9,8 @@ + + diff --git a/sandbox/ConsoleApp/Program.cs b/sandbox/ConsoleApp/Program.cs index 9783a3ee6..3a89ccc88 100644 --- a/sandbox/ConsoleApp/Program.cs +++ b/sandbox/ConsoleApp/Program.cs @@ -1,39 +1,34 @@  using AlterNats; using MessagePack; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System.Buffers; +using System.IO.Pipelines; using System.Reflection; using System.Text; +using System.Threading.Tasks.Sources; +using ZLogger; +var provider = new ServiceCollection() + .AddLogging(x => + { + x.ClearProviders(); + x.SetMinimumLevel(LogLevel.Trace); + x.AddZLoggerConsole(); + }) + .BuildServiceProvider(); +var loggerFactory = provider.GetRequiredService(); +var conn = new NatsConnection(NatsOptions.Default with { LoggerFactory = loggerFactory }); +for (int i = 0; i < 10000; i++) +{ + conn.Ping(); +} -await using var conn = new NatsConnection(); - -//Console.WriteLine("GO PING"); -var t = Foo(); -//var b = conn.PingAsync(); -//var c = conn.PingAsync(); -//var d = conn.PingAsync(); -//var e = conn.PingAsync(); -//// await a; -//await b; -//await c; -//await d; -//await e; -//Console.WriteLine("END PING"); - - -Console.ReadLine(); - -await t; +Console.WriteLine("READ STOP"); Console.ReadLine(); - - -async Task Foo() -{ - await conn.PingAsync(); - throw new Exception("YEAH?"); -} \ No newline at end of file diff --git a/src/AlterNats/AlterNats.csproj b/src/AlterNats/AlterNats.csproj index 647ea7518..77e92fbaf 100644 --- a/src/AlterNats/AlterNats.csproj +++ b/src/AlterNats/AlterNats.csproj @@ -1,14 +1,17 @@ - - net6.0 - enable - enable - + + net6.0 + enable + enable + - - - - + + + + + + + diff --git a/src/AlterNats/Commands/CommandBase.cs b/src/AlterNats/Commands/CommandBase.cs index 8729b5d5c..56536b2cc 100644 --- a/src/AlterNats/Commands/CommandBase.cs +++ b/src/AlterNats/Commands/CommandBase.cs @@ -12,10 +12,12 @@ internal abstract class CommandBase : ICommand, IObjectPoolNode TSelf? nextNode; public ref TSelf? NextNode => ref nextNode; + public abstract string WriteTraceMessage { get; } + public virtual void Return() { pool.TryPush(Unsafe.As(this)); } - public abstract void Write(ILogger logger, ProtocolWriter writer); + public abstract void Write(ProtocolWriter writer); } diff --git a/src/AlterNats/Commands/FixedArrayBufferWriter.cs b/src/AlterNats/Commands/FixedArrayBufferWriter.cs new file mode 100644 index 000000000..bba40d6f2 --- /dev/null +++ b/src/AlterNats/Commands/FixedArrayBufferWriter.cs @@ -0,0 +1,55 @@ +using System.Buffers; +using System.Runtime.CompilerServices; + +namespace AlterNats.Commands; + +internal sealed class FixedArrayBufferWriter : IBufferWriter +{ + byte[] buffer; + int written; + + public ReadOnlyMemory WrittenMemory => buffer.AsMemory(0, written); + + public FixedArrayBufferWriter() + { + buffer = new byte[65535]; + written = 0; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Reset() + { + written = 0; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Advance(int count) + { + written += count; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public Memory GetMemory(int sizeHint = 0) + { + if (buffer.Length - written < sizeHint) + { + Resize(sizeHint); + } + return buffer.AsMemory(written); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public Span GetSpan(int sizeHint = 0) + { + if (buffer.Length - written < sizeHint) + { + Resize(sizeHint); + } + return buffer.AsSpan(written); + } + + void Resize(int sizeHint) + { + Array.Resize(ref buffer, Math.Max(sizeHint, buffer.Length * 2)); + } +} diff --git a/src/AlterNats/Commands/ICommand.cs b/src/AlterNats/Commands/ICommand.cs index 9a11e9a77..a1cd715e4 100644 --- a/src/AlterNats/Commands/ICommand.cs +++ b/src/AlterNats/Commands/ICommand.cs @@ -7,5 +7,6 @@ namespace AlterNats.Commands; internal interface ICommand { void Return(); - void Write(ILogger logger, ProtocolWriter writer); + void Write(ProtocolWriter writer); + string WriteTraceMessage { get; } } diff --git a/src/AlterNats/Commands/PingCommand.cs b/src/AlterNats/Commands/PingCommand.cs index 9a7429f55..67f46d9fb 100644 --- a/src/AlterNats/Commands/PingCommand.cs +++ b/src/AlterNats/Commands/PingCommand.cs @@ -29,10 +29,10 @@ public static PingCommand Create(List queue) return result; } - public override void Write(ILogger logger, ProtocolWriter writer) - { - logger.LogTrace("Write PONG Command to buffer."); + public override string WriteTraceMessage => "Write PING Command to buffer."; + public override void Write(ProtocolWriter writer) + { if (queue != null) { lock (queue) @@ -114,3 +114,27 @@ void IThreadPoolWorkItem.Execute() core.SetResult(null!); } } + + +internal sealed class LightPingCommand : CommandBase +{ + LightPingCommand() + { + } + + public static LightPingCommand Create() + { + if (!pool.TryPop(out var result)) + { + result = new LightPingCommand(); + } + return result; + } + + public override string WriteTraceMessage => "Write PING Command to buffer."; + + public override void Write(ProtocolWriter writer) + { + writer.WritePing(); + } +} \ No newline at end of file diff --git a/src/AlterNats/Commands/PongCommand.cs b/src/AlterNats/Commands/PongCommand.cs index 2183788f4..7862007c8 100644 --- a/src/AlterNats/Commands/PongCommand.cs +++ b/src/AlterNats/Commands/PongCommand.cs @@ -17,9 +17,10 @@ public static PongCommand Create() return result; } - public override void Write(ILogger logger, ProtocolWriter writer) + public override string WriteTraceMessage => "Write PONG Command to buffer."; + + public override void Write(ProtocolWriter writer) { - logger.LogTrace("Write PONG Command to buffer."); writer.WritePong(); } } \ No newline at end of file diff --git a/src/AlterNats/Commands/ProtocolWriter.cs b/src/AlterNats/Commands/ProtocolWriter.cs index ccce78acf..35f28af94 100644 --- a/src/AlterNats/Commands/ProtocolWriter.cs +++ b/src/AlterNats/Commands/ProtocolWriter.cs @@ -1,6 +1,4 @@ -using System.Buffers.Binary; -using System.Buffers.Text; -using System.IO.Pipelines; +using System.Buffers.Text; using System.Text; using System.Text.Json; @@ -11,9 +9,9 @@ internal sealed class ProtocolWriter const int MaxIntStringLength = 10; // int.MaxValue.ToString().Length const int NewLineLength = 2; // \r\n - readonly PipeWriter writer; + readonly FixedArrayBufferWriter writer; // where T : IBufferWriter - public ProtocolWriter(PipeWriter writer) + public ProtocolWriter(FixedArrayBufferWriter writer) { this.writer = writer; } diff --git a/src/AlterNats/Commands/SubscribeCommand.cs b/src/AlterNats/Commands/SubscribeCommand.cs index d76665a81..a435eedb4 100644 --- a/src/AlterNats/Commands/SubscribeCommand.cs +++ b/src/AlterNats/Commands/SubscribeCommand.cs @@ -24,9 +24,11 @@ public static SubscribeCommand Create(int subscriptionId, NatsKey subject) return result; } - public override void Write(ILogger logger, ProtocolWriter writer) + public override string WriteTraceMessage => throw new NotImplementedException(); + + public override void Write(ProtocolWriter writer) { - logger.LogTrace("Write SUB Command to buffer."); + // logger.LogTrace("Write SUB Command to buffer."); writer.WriteSubscribe(subscriptionId, subject!); } diff --git a/src/AlterNats/Internal/AppendableSequence.cs b/src/AlterNats/Internal/AppendableSequence.cs new file mode 100644 index 000000000..b9af605c4 --- /dev/null +++ b/src/AlterNats/Internal/AppendableSequence.cs @@ -0,0 +1,63 @@ +using System.Buffers; + +namespace AlterNats.Internal; + +// TODO: for reuse, add reset. +internal sealed class AppendableSequence +{ + Segment? first; + Segment? last; + int length; + + public ReadOnlySequence AsReadOnlySequence() => new ReadOnlySequence(first!, 0, last!, last!.Memory.Length); + + public int Count => length; + + public AppendableSequence() + { + } + + public void Slice(long start) + { + // TODO:impl slice + throw new NotImplementedException(); + } + + public void Append(ReadOnlyMemory buffer) + { + if (length == 0) + { + var first = new Segment(buffer); + this.first = first; + last = first; + length = buffer.Length; + } + else + { + var newLast = new Segment(buffer); + last!.SetNextSegment(newLast); + newLast.SetRunningIndex(length); + last = newLast; + length += buffer.Length; + } + } + + // TODO:Segment should uses ObjectPool + internal class Segment : ReadOnlySequenceSegment + { + public Segment(ReadOnlyMemory buffer) + { + Memory = buffer; + } + + internal void SetRunningIndex(long runningIndex) + { + this.RunningIndex = runningIndex; + } + + internal void SetNextSegment(Segment? nextSegment) + { + this.Next = nextSegment; + } + } +} \ No newline at end of file diff --git a/src/AlterNats/Internal/SocketReader.cs b/src/AlterNats/Internal/SocketReader.cs new file mode 100644 index 000000000..413a9d002 --- /dev/null +++ b/src/AlterNats/Internal/SocketReader.cs @@ -0,0 +1,53 @@ +using System.Buffers; +using System.Net.Sockets; +using System.Runtime.InteropServices; + +namespace AlterNats.Internal; + +internal sealed class SocketReader +{ + readonly Socket socket; + readonly int bufferSize; + + byte[] buffer; + + public SocketReader(Socket socket, int bufferSize) + { + this.buffer = ArrayPool.Shared.Rent(bufferSize); + this.bufferSize = bufferSize; + this.socket = socket; + } + + public ValueTask ReadAsync() + { + // return Socket's internal IValueTaskSource directly + return socket.ReceiveAsync(buffer.AsMemory(), SocketFlags.None); + } + + public ReadOnlyMemory RentReadBuffer(int size) + { + var buf = buffer; + buffer = ArrayPool.Shared.Rent(bufferSize); + return new ReadOnlyMemory(buf, 0, size); + } + + public void ReturnBuffer(ReadOnlyMemory buffer) + { + if (MemoryMarshal.TryGetArray(buffer, out var segment) && segment.Array != null) + { + ArrayPool.Shared.Return(segment.Array); + } + } + + //public ReadOnlySequence CombineBuffer(ReadOnlySequence segment, ReadOnlyMemory buffer) + //{ + // // var lastSegment = new Segment(buffer); + + + + + // //new ReadOnlySequence(segment.Start, 0, + //} + + +} \ No newline at end of file diff --git a/src/AlterNats/Internal/ThreadPoolWorkItem.cs b/src/AlterNats/Internal/ThreadPoolWorkItem.cs index a31c6f404..f365117b9 100644 --- a/src/AlterNats/Internal/ThreadPoolWorkItem.cs +++ b/src/AlterNats/Internal/ThreadPoolWorkItem.cs @@ -44,7 +44,7 @@ public void Execute() { call.Invoke(v); } - catch (Exception ex) + catch { // TODO: do nanika(logging?) } diff --git a/src/AlterNats/NatsConnection.cs b/src/AlterNats/NatsConnection.cs index 52f68d3a4..b6b53474d 100644 --- a/src/AlterNats/NatsConnection.cs +++ b/src/AlterNats/NatsConnection.cs @@ -6,43 +6,60 @@ namespace AlterNats; public class NatsConnection : IAsyncDisposable { - readonly TcpClient tcpClient; - readonly Stream stream; - readonly NatsStreamReader streamReader; - readonly NatsPipeliningStreamWriter streamWriter; + readonly Socket socket; + readonly NatsReadProtocolProcessor socketReader; + readonly NatsPipeliningSocketWriter socketWriter; readonly SubscriptionManager subscriptionManager; // use List for Queue is not performant readonly List pingQueue = new List(); public NatsOptions Options { get; } + public ServerInfo? ServerInfo { get; internal set; } // server info is set when received INFO public NatsConnection() + : this(NatsOptions.Default) { - this.Options = NatsOptions.Default; // TODO: + } + + public NatsConnection(NatsOptions options) + { + this.Options = options; + this.socket = new Socket(Socket.OSSupportsIPv6 ? AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + if (Socket.OSSupportsIPv6) + { + socket.DualMode = true; + } + + socket.NoDelay = true; + socket.SendBufferSize = 0; + socket.ReceiveBufferSize = 0; - // TODO: use Raw Socket? - this.tcpClient = new TcpClient(); - tcpClient.Connect("localhost", 4222); // when? here? + socket.Connect("localhost", 4222); // TODO: connect here? - this.stream = tcpClient.GetStream(); - this.streamWriter = new NatsPipeliningStreamWriter(stream, Options); - this.streamReader = new NatsStreamReader(stream, this); + this.socketWriter = new NatsPipeliningSocketWriter(socket, Options); + this.socketReader = new NatsReadProtocolProcessor(socket, this); this.subscriptionManager = new SubscriptionManager(this); } + public ValueTask PingAsync() { var command = PingCommand.Create(pingQueue); - streamWriter.Post(command); + socketWriter.Post(command); return command.AsValueTask(); } /// Send PONG message to Server. - internal void Pong() + public void Pong() { - streamWriter.Post(PongCommand.Create()); + socketWriter.Post(PongCommand.Create()); + } + + public void Ping() + { + socketWriter.Post(LightPingCommand.Create()); } // SubscribeAsync??? @@ -65,6 +82,7 @@ internal void SignalPingCommand() { if (pingQueue.Count != 0) { + // TODO:???? var p = pingQueue[0]; pingQueue.RemoveAt(0); } @@ -74,10 +92,11 @@ internal void SignalPingCommand() public async ValueTask DisposeAsync() { - await streamWriter.DisposeAsync().ConfigureAwait(false); - await streamReader.DisposeAsync().ConfigureAwait(false); + await socketWriter.DisposeAsync().ConfigureAwait(false); + await socketReader.DisposeAsync().ConfigureAwait(false); subscriptionManager.Dispose(); - await stream.DisposeAsync().ConfigureAwait(false); - tcpClient.Dispose(); + + await socket.DisconnectAsync(false); + socket.Dispose(); } } \ No newline at end of file diff --git a/src/AlterNats/NatsOptions.cs b/src/AlterNats/NatsOptions.cs index 855eecb8e..25fa5637a 100644 --- a/src/AlterNats/NatsOptions.cs +++ b/src/AlterNats/NatsOptions.cs @@ -14,13 +14,19 @@ public sealed record NatsOptions ( INatsSerializer Serializer, ILoggerFactory LoggerFactory, - int MaxBatchCount + int MaxBatchCount, + int ReaderBufferSize ) { const int DefaultMaxBatchCount = 100; + const int DefaultReaderBufferSize = 1048576; // 1MB // TODO:not null, default serializer - public static NatsOptions Default = new NatsOptions(new JsonNatsSerializer(new JsonSerializerOptions()), NullLoggerFactory.Instance, DefaultMaxBatchCount); + public static NatsOptions Default = new NatsOptions( + Serializer: new JsonNatsSerializer(new JsonSerializerOptions()), + LoggerFactory: NullLoggerFactory.Instance, + MaxBatchCount: DefaultMaxBatchCount, + ReaderBufferSize: DefaultReaderBufferSize); } diff --git a/src/AlterNats/NatsPipeliningStreamWriter.cs b/src/AlterNats/NatsPipeliningSocketWriter.cs similarity index 59% rename from src/AlterNats/NatsPipeliningStreamWriter.cs rename to src/AlterNats/NatsPipeliningSocketWriter.cs index e97fca130..5a0df0593 100644 --- a/src/AlterNats/NatsPipeliningStreamWriter.cs +++ b/src/AlterNats/NatsPipeliningSocketWriter.cs @@ -1,35 +1,31 @@ using AlterNats.Commands; using Microsoft.Extensions.Logging; -using System.Diagnostics; -using System.IO.Pipelines; +using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Threading.Channels; namespace AlterNats; -internal sealed class NatsPipeliningStreamWriter : IAsyncDisposable +internal sealed class NatsPipeliningSocketWriter : IAsyncDisposable { - readonly Stream stream; - readonly PipeWriter pipeWriter; + readonly Socket socket; + readonly FixedArrayBufferWriter bufferWriter; readonly Channel channel; readonly Task writeLoop; - // TODO:no need if not using Task.Delay etc.. - readonly CancellationTokenSource cancellationTokenSource; readonly NatsOptions options; - public NatsPipeliningStreamWriter(Stream stream, NatsOptions options) + public NatsPipeliningSocketWriter(Socket socket, NatsOptions options) { - this.stream = stream; + this.socket = socket; this.options = options; - this.pipeWriter = PipeWriter.Create(stream); - this.cancellationTokenSource = new CancellationTokenSource(); + this.bufferWriter = new FixedArrayBufferWriter(); this.channel = Channel.CreateUnbounded(new UnboundedChannelOptions { AllowSynchronousContinuations = false, // always should be in async loop. SingleWriter = false, SingleReader = true, }); - this.writeLoop = Task.Run(WriteLoop); + this.writeLoop = Task.Run(WriteLoopAsync); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -38,12 +34,15 @@ public void Post(ICommand command) channel.Writer.TryWrite(command); } - async Task WriteLoop() + async Task WriteLoopAsync() { var reader = channel.Reader; - var protocolWriter = new ProtocolWriter(pipeWriter); - var logger = options.LoggerFactory.CreateLogger(); + var protocolWriter = new ProtocolWriter(bufferWriter); + var logger = options.LoggerFactory.CreateLogger(); var promiseList = new List(options.MaxBatchCount); + + var isEnabledTraceLogging = logger.IsEnabled(LogLevel.Trace); + try { while (await reader.WaitToReadAsync().ConfigureAwait(false)) @@ -53,7 +52,12 @@ async Task WriteLoop() // TODO:buffer while (reader.TryRead(out var command)) { - command.Write(logger, protocolWriter); + if (isEnabledTraceLogging) + { + logger.LogTrace(command.WriteTraceMessage); + } + + command.Write(protocolWriter); if (command is IPromise p) { @@ -67,9 +71,19 @@ async Task WriteLoop() try { - logger.LogTrace("Write loop start Flush."); - var flushResult = await pipeWriter.FlushAsync().ConfigureAwait(false); - logger.LogTrace("Write loop complete Flush."); + if (isEnabledTraceLogging) + { + logger.LogTrace("Write loop start Flush."); + } + + // SendAsync(ReadOnlyMemory) is very efficient, internally using AwaitableAsyncSocketEventArgs + // should use cancellation token?, currently no, wait for flush complete. + await socket.SendAsync(bufferWriter.WrittenMemory, SocketFlags.None).ConfigureAwait(false); + + if (isEnabledTraceLogging) + { + logger.LogTrace("Write loop complete Flush."); + } } catch (Exception ex) { @@ -98,7 +112,10 @@ async Task WriteLoop() { try { - await pipeWriter.FlushAsync().ConfigureAwait(false); + if (bufferWriter.WrittenMemory.Length != 0) + { + await socket.SendAsync(bufferWriter.WrittenMemory, SocketFlags.None).ConfigureAwait(false); + } } catch { } } @@ -107,8 +124,6 @@ async Task WriteLoop() public async ValueTask DisposeAsync() { channel.Writer.Complete(); - cancellationTokenSource.Cancel(); - await this.pipeWriter.CompleteAsync().ConfigureAwait(false); await writeLoop.ConfigureAwait(false); } -} +} \ No newline at end of file diff --git a/src/AlterNats/NatsReadProtocolProcessor.cs b/src/AlterNats/NatsReadProtocolProcessor.cs new file mode 100644 index 000000000..886749116 --- /dev/null +++ b/src/AlterNats/NatsReadProtocolProcessor.cs @@ -0,0 +1,342 @@ +using AlterNats.Internal; +using Microsoft.Extensions.Logging; +using System.Buffers; +using System.Buffers.Text; +using System.Diagnostics; +using System.IO.Pipelines; +using System.Net.Sockets; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Text; +using System.Text.Json; + +namespace AlterNats; + +internal sealed class NatsReadProtocolProcessor : IAsyncDisposable +{ + readonly Socket socket; + readonly NatsConnection connection; + readonly PipeWriter writer; + readonly PipeReader reader; + readonly Task receiveLoop; + readonly Task consumeLoop; + readonly ILogger logger; + + public NatsReadProtocolProcessor(Socket socket, NatsConnection connection) + { + this.socket = socket; + this.connection = connection; + this.logger = connection.Options.LoggerFactory.CreateLogger(); + + // TODO: set threadhold + var pipe = new Pipe(new PipeOptions(useSynchronizationContext: false)); + this.reader = pipe.Reader; + this.writer = pipe.Writer; + + this.receiveLoop = Task.Run(ReciveLoopAsync); + this.consumeLoop = Task.Run(ConsumeLoopAsync); + } + + // receive data from socket and write to Pipe. + async Task ReciveLoopAsync() + { + int read = 0; + while (true) + { + var buffer = writer.GetMemory(connection.Options.ReaderBufferSize); + try + { + logger.LogTrace("Start Socket Read"); // TODO: if-trace + read = await socket.ReceiveAsync(buffer, SocketFlags.None); + if (read == 0) + { + break; // complete. + } + + logger.LogTrace("Receive: {0} B", read); // TODO: if-trace + writer.Advance(read); + } + catch + { + // TODO: LogError??? + break; + } + + var result = await writer.FlushAsync(); + + if (result.IsCompleted) + { + break; + } + } + + await writer.CompleteAsync(); + } + + // read data from pipe and consume message. + async Task ConsumeLoopAsync() + { + while (true) + { + try + { + var readResult = await reader.ReadAtLeastAsync(4); + var buffer = readResult.Buffer; + + logger.LogTrace(buffer.Length.ToString()); + while (buffer.Length > 0) + { + var first = buffer.First; + + var code = (first.Length >= 4) + ? GetCode(first.Span) + : GetCode(buffer); + + if (code == ServerOpCodes.Msg) + { + // Optimized for Msg, most frequently called type, Inline await in loop. + throw new NotImplementedException(); + } + else + { + // Others. + buffer = await DispatchCommandAsync(code, buffer); + } + } + + reader.AdvanceTo(buffer.End); + if (readResult.IsCompleted) + { + break; + } + } + catch (Exception ex) + { + logger.LogError(ex, "Error occured during read loop."); + } + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static int GetCode(ReadOnlySpan span) + { + return Unsafe.ReadUnaligned(ref MemoryMarshal.GetReference(span)); + } + + static int GetCode(in ReadOnlySequence sequence) + { + Span buf = stackalloc byte[4]; + sequence.Slice(0, 4).CopyTo(buf); + return GetCode(buf); + } + + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder<>))] + async ValueTask<(ReadOnlySequence, SequencePosition)> ReadUntilReceiveNewLineAsync() + { + // TODO:how read readResult handle? + while (true) + { + var result = await reader.ReadAsync(); + var position = result.Buffer.PositionOf((byte)'\n'); + if (position != null) + { + return (result.Buffer, position.Value); + } + + reader.AdvanceTo(result.Buffer.Start, result.Buffer.End); + // TODO:check result completed? + } + } + + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder<>))] + async ValueTask> DispatchCommandAsync(int code, in ReadOnlySequence buffer) + { + var length = (int)buffer.Length; + + if (code == ServerOpCodes.Ping) + { + const int PingSize = 6; // PING\r\n + + logger.LogTrace("Receive Ping"); + + connection.Pong(); // return pong + + if (length < PingSize) + { + // TODO:how read readResult handle? + reader.AdvanceTo(buffer.Start, buffer.End); + var readResult = await reader.ReadAtLeastAsync(PingSize - length); + return readResult.Buffer.Slice(PingSize); + } + + return buffer.Slice(PingSize); + } + else if (code == ServerOpCodes.Pong) + { + const int PongSize = 6; // PONG\r\n + + // TODO: PONG TRACE + // logger.LogTrace("Receive Pong"); + + if (length < PongSize) + { + // TODO:how read readResult handle? + reader.AdvanceTo(buffer.Start, buffer.End); + var readResult = await reader.ReadAtLeastAsync(PongSize - length); + return readResult.Buffer.Slice(PongSize); + } + + return buffer.Slice(PongSize); + } + else if (code == ServerOpCodes.Error) + { + logger.LogTrace("Receive Error"); + + // TODO: Parse Error + throw new NotImplementedException(); + } + else if (code == ServerOpCodes.Ok) + { + logger.LogTrace("Receive OK"); + + const int OkSize = 5; // +OK\r\n + + if (length < OkSize) + { + // TODO:how read readResult handle? + reader.AdvanceTo(buffer.Start, buffer.End); + var readResult = await reader.ReadAtLeastAsync(OkSize - length); + return readResult.Buffer.Slice(OkSize); + } + + return buffer.Slice(OkSize); + } + else if (code == ServerOpCodes.Info) + { + logger.LogTrace("Receive Info"); + + // TODO:log info? + //var info = ParseInfo(buffer, out var readSize); + //return readSize + buffer.Span.Slice(readSize).IndexOf((byte)'\n') + 1; + + + // try to get \n. + var position = buffer.PositionOf((byte)'\n'); + + if (position == null) + { + reader.AdvanceTo(buffer.Start, buffer.End); + var (newBuffer, newPosition) = await ReadUntilReceiveNewLineAsync(); + // TODO:log serverInfo. + var serverInfo = ParseInfo(newBuffer); + connection.ServerInfo = serverInfo; + logger.LogInformation("Received ServerInfo: {0}", serverInfo); + return newBuffer.Slice(buffer.GetPosition(1, newPosition)); + } + else + { + var serverInfo = ParseInfo(buffer); + connection.ServerInfo = serverInfo; + logger.LogInformation("Received ServerInfo: {0}", serverInfo); + return buffer.Slice(buffer.GetPosition(1, position.Value)); + } + } + else + { + // TODO:reaches invalid line. + //var s = Encoding.UTF8.GetString(buffer.Span.Slice(0, 4)); + //Console.WriteLine("NANIDEMO NAI!?:" + s); + + // reaches invalid line. + // Try to skip next '\n'; + throw new NotImplementedException(); + } + } + + // https://docs.nats.io/reference/reference-protocols/nats-protocol#info + // INFO {["option_name":option_value],...} + internal static ServerInfo ParseInfo(ReadOnlySequence buffer) + { + // skip `INFO` + var jsonReader = new Utf8JsonReader(buffer.Slice(5)); + + var serverInfo = JsonSerializer.Deserialize(ref jsonReader); + if (serverInfo == null) throw new InvalidOperationException(""); // TODO:NatsException + return serverInfo; + } + + // https://docs.nats.io/reference/reference-protocols/nats-protocol#msg + // MSG [reply-to] <#bytes>\r\n[payload] + internal static void ParseMessage(ReadOnlySequence data, out SequencePosition readPosition, Action> callback) + { + if (data.IsSingleSegment) + { + var offset = 0; + var buffer = data.FirstSpan.Slice(4); + + offset += Split(buffer, ' ', out var subject, out buffer); + offset += Split(buffer, ' ', out var sid, out buffer); + + // reply-to or #bytes, check \n first. + var i = buffer.IndexOf((byte)'\n'); + var i2 = buffer.Slice(0, i).IndexOf((byte)' '); + if (i2 == -1) + { + // not exists reply-to, only #bytes + if (!Utf8Parser.TryParse(buffer.Slice(0, i - 1), out int payloadLength, out _)) + { + throw new Exception(); // todo + } + + offset += i + 1; + var payload = data.Slice(offset, payloadLength); // create slice of ReadOnlySequence + callback(payload); + + var last = buffer.Slice(i + 1 + payloadLength).IndexOf((byte)'\n'); + readPosition = data.GetPosition(last + 1); + } + else + { + throw new NotImplementedException(); // TODO:reply-to! + } + } + else + { + throw new NotImplementedException(); // TODO:how??? + } + } + + static int Split(ReadOnlySpan span, char delimiter, out ReadOnlySpan left, out ReadOnlySpan right) + { + var i = span.IndexOf((byte)delimiter); + if (i == -1) + { + // TODO: + throw new InvalidOperationException(); + } + + left = span.Slice(0, i); + right = span.Slice(i + 1); + return i + 1; + } + + public async ValueTask DisposeAsync() + { + await writer.CompleteAsync(); + await reader.CompleteAsync(); // TODO:check stop behaviour + await receiveLoop; + await consumeLoop; + } + + internal static class ServerOpCodes + { + // All sent by server commands as int(first 4 characters(includes space, newline)). + + public const int Info = 1330007625; // Encoding.ASCII.GetBytes("INFO") |> MemoryMarshal.Read + public const int Msg = 541545293; // Encoding.ASCII.GetBytes("MSG ") |> MemoryMarshal.Read + public const int Ping = 1196312912; // Encoding.ASCII.GetBytes("PING") |> MemoryMarshal.Read + public const int Pong = 1196314448; // Encoding.ASCII.GetBytes("PONG") |> MemoryMarshal.Read + public const int Ok = 223039275; // Encoding.ASCII.GetBytes("+OK\r") |> MemoryMarshal.Read + public const int Error = 1381123373; // Encoding.ASCII.GetBytes("-ERR") |> MemoryMarshal.Read + } +} \ No newline at end of file diff --git a/src/AlterNats/NatsStreamReader.cs b/src/AlterNats/NatsStreamReader.cs deleted file mode 100644 index d325adf1f..000000000 --- a/src/AlterNats/NatsStreamReader.cs +++ /dev/null @@ -1,216 +0,0 @@ -using Microsoft.Extensions.Logging; -using System; -using System.Buffers; -using System.Buffers.Text; -using System.Collections.Generic; -using System.IO.Pipelines; -using System.Linq; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; -using System.Text; -using System.Text.Json; -using System.Threading.Tasks; - -namespace AlterNats; - -internal sealed class NatsStreamReader : IAsyncDisposable -{ - readonly Stream stream; - readonly NatsConnection connection; - readonly PipeReader pipeReader; - readonly Task readLoop; - readonly CancellationTokenSource cancellationTokenSource; // TODO:no need it? - readonly ILogger logger; - - public NatsStreamReader(Stream stream, NatsConnection connection) - { - this.stream = stream; - this.connection = connection; - this.logger = connection.Options.LoggerFactory.CreateLogger(); - this.cancellationTokenSource = new CancellationTokenSource(); - this.pipeReader = PipeReader.Create(stream); - this.readLoop = Task.Run(ReadLoop); - } - - async Task ReadLoop() - { - while (true) - { - // TODO:read more if buffer is not filled??? - var readResult = await pipeReader.ReadAsync(); - if (readResult.IsCompleted) - { - return; - } - - var buffer = readResult.Buffer; - var code = (buffer.FirstSpan.Length >= 4) - ? GetCode(buffer.FirstSpan) - : GetCode(buffer); - - var position = DispatchCommand(code, buffer); - pipeReader.AdvanceTo(position); - } - } - - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - static int GetCode(ReadOnlySpan span) - { - return Unsafe.ReadUnaligned(ref MemoryMarshal.GetReference(span)); - } - - static int GetCode(ReadOnlySequence sequence) - { - Span buf = stackalloc byte[4]; - sequence.Slice(0, 4).CopyTo(buf); - return GetCode(buf); - } - - SequencePosition DispatchCommand(int code, ReadOnlySequence buffer) - { - if (code == ServerOpCodes.Msg) - { - logger.LogTrace("Receive Msg"); - - // TODO:no. - } - else if (code == ServerOpCodes.Ping) - { - logger.LogTrace("Receive Ping"); - - connection.Pong(); // return pong - return buffer.GetPosition(6); // PING\r\n - } - else if (code == ServerOpCodes.Pong) - { - logger.LogTrace("Receive Pong"); - - return buffer.GetPosition(6); // PONG\r\n - } - else if (code == ServerOpCodes.Error) - { - logger.LogTrace("Receive Error"); - - // TODO: Parse Error - throw new NotImplementedException(); - } - else if (code == ServerOpCodes.Ok) - { - logger.LogTrace("Receive OK"); - return buffer.GetPosition(5); // +OK\r\n - } - else if (code == ServerOpCodes.Info) - { - logger.LogTrace("Receive Info"); - - var info = ParseInfo(buffer, out var position); - - - buffer = buffer.Slice(position); - } - else - { - // TODO:reaches invalid line. - - var s = Encoding.UTF8.GetString(buffer.Slice(0, 4).FirstSpan); - Console.WriteLine("NANIDEMO NAI!?:" + s); - } - - var finalPosition = buffer.PositionOf((byte)'\n'); - if (finalPosition == null) - { - throw new Exception(); // TODO - } - - return buffer.GetPosition(1, finalPosition.Value); - } - - // https://docs.nats.io/reference/reference-protocols/nats-protocol#info - // INFO {["option_name":option_value],...} - internal static ServerInfo ParseInfo(ReadOnlySequence data, out SequencePosition readPosition) - { - // skip `INFO` - var jsonReader = new Utf8JsonReader(data.Slice(5)); - - var serverInfo = JsonSerializer.Deserialize(ref jsonReader); - if (serverInfo == null) throw new InvalidOperationException(""); // TODO:NatsException - - readPosition = data.GetPosition(4 + jsonReader.BytesConsumed); - return serverInfo; - } - - // https://docs.nats.io/reference/reference-protocols/nats-protocol#msg - // MSG [reply-to] <#bytes>\r\n[payload] - internal static void ParseMessage(ReadOnlySequence data, out SequencePosition readPosition, Action> callback) - { - if (data.IsSingleSegment) - { - var offset = 0; - var buffer = data.FirstSpan.Slice(4); - - offset += Split(buffer, ' ', out var subject, out buffer); - offset += Split(buffer, ' ', out var sid, out buffer); - - // reply-to or #bytes, check \n first. - var i = buffer.IndexOf((byte)'\n'); - var i2 = buffer.Slice(0, i).IndexOf((byte)' '); - if (i2 == -1) - { - // not exists reply-to, only #bytes - if (!Utf8Parser.TryParse(buffer.Slice(0, i - 1), out int payloadLength, out _)) - { - throw new Exception(); // todo - } - - offset += i + 1; - var payload = data.Slice(offset, payloadLength); // create slice of ReadOnlySequence - callback(payload); - - var last = buffer.Slice(i + 1 + payloadLength).IndexOf((byte)'\n'); - readPosition = data.GetPosition(last + 1); - } - else - { - throw new NotImplementedException(); // TODO:reply-to! - } - } - else - { - throw new NotImplementedException(); // TODO:how??? - } - } - - static int Split(ReadOnlySpan span, char delimiter, out ReadOnlySpan left, out ReadOnlySpan right) - { - var i = span.IndexOf((byte)delimiter); - if (i == -1) - { - // TODO: - throw new InvalidOperationException(); - } - - left = span.Slice(0, i); - right = span.Slice(i + 1); - return i + 1; - } - - public async ValueTask DisposeAsync() - { - cancellationTokenSource.Cancel(); - await this.pipeReader.CompleteAsync().ConfigureAwait(false); - await readLoop.ConfigureAwait(false); - } - - internal static class ServerOpCodes - { - // All sent by server commands as int(first 4 characters(includes space, newline)). - - public const int Info = 1330007625; // Encoding.ASCII.GetBytes("INFO") |> MemoryMarshal.Read - public const int Msg = 541545293; // Encoding.ASCII.GetBytes("MSG ") |> MemoryMarshal.Read - public const int Ping = 1196312912; // Encoding.ASCII.GetBytes("PING") |> MemoryMarshal.Read - public const int Pong = 1196314448; // Encoding.ASCII.GetBytes("PONG") |> MemoryMarshal.Read - public const int Ok = 223039275; // Encoding.ASCII.GetBytes("+OK\r") |> MemoryMarshal.Read - public const int Error = 1381123373; // Encoding.ASCII.GetBytes("-ERR") |> MemoryMarshal.Read - } -} \ No newline at end of file diff --git a/tests/AlterNats.Tests/AlterNats.Tests.csproj b/tests/AlterNats.Tests/AlterNats.Tests.csproj index da2b676bf..27fcffbf6 100644 --- a/tests/AlterNats.Tests/AlterNats.Tests.csproj +++ b/tests/AlterNats.Tests/AlterNats.Tests.csproj @@ -1,24 +1,28 @@ - + - - net6.0 - enable + + net6.0 + enable + false + - false - + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + - - - - - - runtime; build; native; contentfiles; analyzers; buildtransitive - all - - - - - - + + + + + + + + diff --git a/tests/AlterNats.Tests/AppendableSequenceTest.cs b/tests/AlterNats.Tests/AppendableSequenceTest.cs new file mode 100644 index 000000000..921a799d0 --- /dev/null +++ b/tests/AlterNats.Tests/AppendableSequenceTest.cs @@ -0,0 +1,94 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace AlterNats.Tests; + +public class AppendableSequenceTest +{ + [Fact] + public void Test() + { + var buffer1 = new byte[] { 1, 10, 20 }; + + var a = new AppendableSequence(); + a.Append(buffer1); + + var seq = a.AsReadOnlySequence(); + seq.IsSingleSegment.ShouldBeTrue(); + seq.Length.Should().Be(3); + seq.First.ShouldEqual(new byte[] { 1, 10, 20 }); + + a.Append(new byte[] { 100, 150, 200, 240 }); + + seq = a.AsReadOnlySequence(); + seq.IsSingleSegment.ShouldBeFalse(); + seq.Length.Should().Be(7); + + var i = 0; + foreach (var item in seq) + { + if (i == 0) + { + item.ShouldEqual(new byte[] { 1, 10, 20 }); + } + else if (i == 1) + { + item.ShouldEqual(new byte[] { 100, 150, 200, 240 }); + } + else + { + throw new Exception(); + } + + i++; + } + + a.Append(new byte[] { 99, 98, 97, 96, 95 }); + seq = a.AsReadOnlySequence(); + seq.IsSingleSegment.ShouldBeFalse(); + seq.Length.Should().Be(12); + + i = 0; + foreach (var item in seq) + { + if (i == 0) + { + item.ShouldEqual(new byte[] { 1, 10, 20 }); + } + else if (i == 1) + { + item.ShouldEqual(new byte[] { 100, 150, 200, 240 }); + } + else if (i == 2) + { + item.ShouldEqual(new byte[] { 99, 98, 97, 96, 95 }); + } + else + { + throw new Exception(); + } + + i++; + } + + // finally slice check. + EnumerateAll(seq.Slice(2)).ShouldEqual(new byte[] { 20, 100, 150, 200, 240, 99, 98, 97, 96, 95 }); + EnumerateAll(seq.Slice(5)).ShouldEqual(new byte[] { 200, 240, 99, 98, 97, 96, 95 }); + EnumerateAll(seq.Slice(8)).ShouldEqual(new byte[] { 98, 97, 96, 95 }); + } + + IEnumerable EnumerateAll(ReadOnlySequence seq) + { + foreach (var item in seq) + { + foreach (var item2 in item.ToArray()) + { + yield return item2; + } + } + } +} \ No newline at end of file diff --git a/tests/AlterNats.Tests/FixedArrayBufferWriterTest.cs b/tests/AlterNats.Tests/FixedArrayBufferWriterTest.cs new file mode 100644 index 000000000..fe0faaeb4 --- /dev/null +++ b/tests/AlterNats.Tests/FixedArrayBufferWriterTest.cs @@ -0,0 +1,38 @@ +namespace AlterNats.Tests; + +public class FixedArrayBufferWriterTest +{ + [Fact] + public void Standard() + { + var writer = new FixedArrayBufferWriter(); + + var buffer = writer.GetSpan(); + buffer[0] = 100; + buffer[1] = 200; + buffer[2] = 220; + + writer.Advance(3); + + var buffer2 = writer.GetSpan(); + buffer2[0].Should().Be(0); + (buffer.Length - buffer2.Length).Should().Be(3); + + buffer2[0] = 244; + writer.Advance(1); + + writer.WrittenMemory.ToArray().Should().Equal(100, 200, 220, 244); + } + + [Fact] + public void Ensure() + { + var writer = new FixedArrayBufferWriter(); + + writer.Advance(20000); + + var newSpan = writer.GetSpan(50000); + + newSpan.Length.Should().Be(ushort.MaxValue * 2 - 20000); + } +} diff --git a/tests/AlterNats.Tests/UnitTest1.cs b/tests/AlterNats.Tests/UnitTest1.cs deleted file mode 100644 index 4b56c20db..000000000 --- a/tests/AlterNats.Tests/UnitTest1.cs +++ /dev/null @@ -1,13 +0,0 @@ -using Xunit; - -namespace AlterNats.Tests -{ - public class UnitTest1 - { - [Fact] - public void Test1() - { - - } - } -} \ No newline at end of file diff --git a/tests/AlterNats.Tests/_FluentAssertionsExtension.cs b/tests/AlterNats.Tests/_FluentAssertionsExtension.cs new file mode 100644 index 000000000..2b9ea8c03 --- /dev/null +++ b/tests/AlterNats.Tests/_FluentAssertionsExtension.cs @@ -0,0 +1,52 @@ +using FluentAssertions.Collections; +using System; +using System.Collections.Generic; + +namespace AlterNats.Tests; + +public static class FluentAssertionsExtension +{ + public static void ShouldBe(this object actual, object expected) + { + actual.Should().Be(expected); + } + public static void ShouldBeTrue(this bool v) + { + v.Should().BeTrue(); + } + + public static void ShouldBeFalse(this bool v) + { + v.Should().BeFalse(); + } + + public static void ShouldEqual(this IEnumerable source, params T[] elements) + { + source.Should().Equal(elements); + } + + public static void ShouldEqual(this ReadOnlyMemory source, params T[] elements) + { + source.ToArray().Should().Equal(elements); + } + + public static GenericCollectionAssertions Should(this Memory source) + { + return source.ToArray().Should(); + } + + public static GenericCollectionAssertions Should(this ReadOnlyMemory source) + { + return source.ToArray().Should(); + } + + public static GenericCollectionAssertions Should(this Span source) + { + return source.ToArray().Should(); + } + + public static GenericCollectionAssertions Should(this ReadOnlySpan source) + { + return source.ToArray().Should(); + } +} \ No newline at end of file diff --git a/tools/nats-server.exe b/tools/nats-server.exe new file mode 100644 index 000000000..59339b342 Binary files /dev/null and b/tools/nats-server.exe differ diff --git a/tools/start-nats-server.bat b/tools/start-nats-server.bat new file mode 100644 index 000000000..2172852bc --- /dev/null +++ b/tools/start-nats-server.bat @@ -0,0 +1,4 @@ +@REM https://docs.nats.io/running-a-nats-service/introduction/flags +@REM -DV is Debug and Protocol Trace +@REM -DVV is Debug and Verbose +nats-server.exe -DV \ No newline at end of file