diff --git a/sandbox/ConsoleApp/Program.cs b/sandbox/ConsoleApp/Program.cs index 1c951cfed..5fd1be3ab 100644 --- a/sandbox/ConsoleApp/Program.cs +++ b/sandbox/ConsoleApp/Program.cs @@ -26,7 +26,7 @@ public static async Task Main() .AddLogging(x => { x.ClearProviders(); - x.SetMinimumLevel(LogLevel.Information); + x.SetMinimumLevel(LogLevel.Trace); x.AddZLoggerConsole(); }) .BuildServiceProvider(); @@ -40,7 +40,7 @@ public static async Task Main() Serializer = new MessagePackNatsSerializer(), ConnectTimeout = TimeSpan.FromSeconds(1), ConnectOptions = ConnectOptions.Default with { Echo = true, Verbose = false }, - PingInterval = TimeSpan.FromSeconds(5) + PingInterval = TimeSpan.Zero }; diff --git a/src/AlterNats/NatsConnection.cs b/src/AlterNats/NatsConnection.cs index 7467c4867..8fc16a473 100644 --- a/src/AlterNats/NatsConnection.cs +++ b/src/AlterNats/NatsConnection.cs @@ -153,19 +153,25 @@ async ValueTask InitialConnectAsync() // Connected completely but still ConnnectionState is Connecting(require after receive INFO). - // add CONNECT command to priority lane + // add CONNECT and PING command to priority lane var connectCommand = AsyncConnectCommand.Create(pool, Options.ConnectOptions); writerState.PriorityCommands.Add(connectCommand); + writerState.PriorityCommands.Add(PingCommand.Create(pool)); // Run Reader/Writer LOOP start var waitForInfoSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var waitForPongOrErrorSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); this.socketWriter = new NatsPipeliningWriteProtocolProcessor(socket, writerState, pool); - this.socketReader = new NatsReadProtocolProcessor(socket, this, waitForInfoSignal); + this.socketReader = new NatsReadProtocolProcessor(socket, this, waitForInfoSignal, waitForPongOrErrorSignal); try { - await connectCommand.AsValueTask().ConfigureAwait(false); + // before send connect, wait INFO. await waitForInfoSignal.Task.ConfigureAwait(false); + // send COMMAND and PING + await connectCommand.AsValueTask().ConfigureAwait(false); + // receive COMMAND response(PONG or ERROR) + await waitForPongOrErrorSignal.Task.ConfigureAwait(false); } catch (Exception ex) { @@ -287,9 +293,10 @@ async void ReconnectLoopAsync() goto CONNECT_AGAIN; } - // add CONNECT command to priority lane + // add CONNECT and PING command to priority lane var connectCommand = AsyncConnectCommand.Create(pool, Options.ConnectOptions); writerState.PriorityCommands.Add(connectCommand); + writerState.PriorityCommands.Add(PingCommand.Create(pool)); // Add SUBSCRIBE command to priority lane var subscribeCommand = AsyncSubscribeBatchCommand.Create(pool, subscriptionManager.GetExistingSubscriptions()); @@ -297,12 +304,14 @@ async void ReconnectLoopAsync() // Run Reader/Writer LOOP start var waitForInfoSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var waitForPongOrErrorSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); this.socketWriter = new NatsPipeliningWriteProtocolProcessor(socket, writerState, pool); - this.socketReader = new NatsReadProtocolProcessor(socket, this, waitForInfoSignal); + this.socketReader = new NatsReadProtocolProcessor(socket, this, waitForInfoSignal, waitForPongOrErrorSignal); + await waitForInfoSignal.Task.ConfigureAwait(false); await connectCommand.AsValueTask().ConfigureAwait(false); + await waitForPongOrErrorSignal.Task.ConfigureAwait(false); await subscribeCommand.AsValueTask().ConfigureAwait(false); - await waitForInfoSignal.Task.ConfigureAwait(false); } catch (Exception ex) { diff --git a/src/AlterNats/NatsReadProtocolProcessor.cs b/src/AlterNats/NatsReadProtocolProcessor.cs index d4ed5bd32..96318571f 100644 --- a/src/AlterNats/NatsReadProtocolProcessor.cs +++ b/src/AlterNats/NatsReadProtocolProcessor.cs @@ -19,18 +19,20 @@ internal sealed class NatsReadProtocolProcessor : IAsyncDisposable readonly SocketReader socketReader; readonly Task readLoop; readonly TaskCompletionSource waitForInfoSignal; + readonly TaskCompletionSource waitForPongOrErrorSignal; // wait for initial connection readonly ConcurrentQueue pingCommands; // wait for pong readonly ILogger logger; readonly bool isEnabledTraceLogging; int disposed; - public NatsReadProtocolProcessor(TcpConnection socket, NatsConnection connection, TaskCompletionSource waitForInfoSignal) + public NatsReadProtocolProcessor(TcpConnection socket, NatsConnection connection, TaskCompletionSource waitForInfoSignal, TaskCompletionSource waitForPongOrErrorSignal) { this.socket = socket; this.connection = connection; this.logger = connection.Options.LoggerFactory.CreateLogger(); this.isEnabledTraceLogging = logger.IsEnabled(LogLevel.Trace); this.waitForInfoSignal = waitForInfoSignal; + this.waitForPongOrErrorSignal = waitForPongOrErrorSignal; this.pingCommands = new ConcurrentQueue(); this.socketReader = new SocketReader(socket, connection.Options.ReaderBufferSize, connection.Options.LoggerFactory); this.readLoop = Task.Run(ReadLoopAsync); @@ -242,6 +244,7 @@ async ValueTask> DispatchCommandAsync(int code, in ReadOn const int PongSize = 6; // PONG\r\n connection.ResetPongCount(); // reset count for PingTimer + waitForPongOrErrorSignal.TrySetResult(); // set for initial connect if (pingCommands.TryDequeue(out var pingCommand)) { @@ -270,12 +273,14 @@ async ValueTask> DispatchCommandAsync(int code, in ReadOn var newPosition = newBuffer.PositionOf((byte)'\n'); var error = ParseError(newBuffer.Slice(0, buffer.GetOffset(newPosition!.Value) - 1)); logger.LogError(error); + waitForPongOrErrorSignal.TrySetException(new NatsException(error)); return newBuffer.Slice(newBuffer.GetPosition(1, newPosition!.Value)); } else { var error = ParseError(buffer.Slice(0, buffer.GetOffset(position.Value) - 1)); logger.LogError(error); + waitForPongOrErrorSignal.TrySetException(new NatsException(error)); return buffer.Slice(buffer.GetPosition(1, position.Value)); } } @@ -424,6 +429,7 @@ public async ValueTask DisposeAsync() item.SetCanceled(CancellationToken.None); } waitForInfoSignal.TrySetCanceled(); + waitForPongOrErrorSignal.TrySetCanceled(); } } diff --git a/tools/start-nats-server-withtoken.bat b/tools/start-nats-server-withtoken.bat new file mode 100644 index 000000000..255d2931b --- /dev/null +++ b/tools/start-nats-server-withtoken.bat @@ -0,0 +1,4 @@ +@REM https://docs.nats.io/running-a-nats-service/introduction/flags +@REM -DV is Debug and Protocol Trace +@REM -DVV is Debug and Verbose +nats-server.exe -DV --auth s3cr3t \ No newline at end of file