diff --git a/sandbox/ConsoleApp/Program.cs b/sandbox/ConsoleApp/Program.cs index 624ca4f61..1c951cfed 100644 --- a/sandbox/ConsoleApp/Program.cs +++ b/sandbox/ConsoleApp/Program.cs @@ -39,7 +39,8 @@ public static async Task Main() //LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Information), Serializer = new MessagePackNatsSerializer(), ConnectTimeout = TimeSpan.FromSeconds(1), - ConnectOptions = ConnectOptions.Default with { Echo = true, Verbose = false } + ConnectOptions = ConnectOptions.Default with { Echo = true, Verbose = false }, + PingInterval = TimeSpan.FromSeconds(5) }; @@ -47,38 +48,11 @@ public static async Task Main() var connection = new NatsConnection(options); - await Parallel.ForEachAsync(Enumerable.Range(0, 100), new ParallelOptions { MaxDegreeOfParallelism = 100 }, async (i, ct) => - { - var ttl = await connection.PingAsync(); - Console.WriteLine(ttl); - }); - Console.ReadLine(); - - await connection.PublishAsync("foo", 100); - - await connection.SubscribeAsync("foo", x => - { - Console.WriteLine(x); - }); - - // Server - await connection.SubscribeRequestAsync("hogemoge.key", req => - { - Console.WriteLine("YEAH?"); - return new FooResponse(); - }); - - // Client - var response = await connection.RequestAsync("hogemoge.key", new FooRequest()); - + await connection.ConnectAsync(); - var ttl = await connection.PingAsync(); - Console.WriteLine("RTT:" + ttl.TotalMilliseconds); Console.ReadLine(); - - } //static void CalcCommandPushPop(NatsKey key, INatsSerializer serializer) diff --git a/src/AlterNats/NatsConnection.cs b/src/AlterNats/NatsConnection.cs index ecbfec5d8..7467c4867 100644 --- a/src/AlterNats/NatsConnection.cs +++ b/src/AlterNats/NatsConnection.cs @@ -49,11 +49,12 @@ public class NatsConnection : IAsyncDisposable readonly ObjectPool pool; internal readonly ReadOnlyMemory indBoxPrefix; - Task? reconnectLoop; + int pongCount; bool isDisposed; // when reconnect, make new instance. TcpConnection? socket; + CancellationTokenSource? pingTimerCancellationTokenSource; NatsUri? currentConnectUri; NatsReadProtocolProcessor? socketReader; NatsPipeliningWriteProtocolProcessor? socketWriter; @@ -194,143 +195,154 @@ async ValueTask InitialConnectAsync() var url = currentConnectUri; logger.LogInformation("Connect succeed, NATS {0}:{1}", url?.Host, url?.Port); this.ConnectionState = NatsConnectionState.Open; + this.pingTimerCancellationTokenSource = new CancellationTokenSource(); + StartPingTimerAsync(pingTimerCancellationTokenSource.Token); this.waitForOpenConnection.TrySetResult(); - reconnectLoop = Task.Run(ReconnectLoopAsync); + Task.Run(ReconnectLoopAsync); } } - async Task ReconnectLoopAsync() + async void ReconnectLoopAsync() { - // If dispose this client, WaitForClosed throws OpeationCanceledException so stop reconnect-loop correctly. - await socket!.WaitForClosed.ConfigureAwait(false); + try + { + // If dispose this client, WaitForClosed throws OpeationCanceledException so stop reconnect-loop correctly. + await socket!.WaitForClosed.ConfigureAwait(false); - logger.LogTrace("Detect connection closed, start to cleanup current connection and start to reconnect."); + logger.LogTrace("Detect connection closed, start to cleanup current connection and start to reconnect."); - lock (gate) - { - this.ConnectionState = NatsConnectionState.Reconnecting; - this.waitForOpenConnection.TrySetCanceled(); - this.waitForOpenConnection = new TaskCompletionSource(); - } + lock (gate) + { + this.ConnectionState = NatsConnectionState.Reconnecting; + this.waitForOpenConnection.TrySetCanceled(); + this.waitForOpenConnection = new TaskCompletionSource(); + this.pingTimerCancellationTokenSource?.Cancel(); + } - // Cleanup current reader/writer - { - // reader is not share state, can dispose asynchronously. - var reader = socketReader!; - _ = Task.Run(async () => + // Cleanup current reader/writer { - try + // reader is not share state, can dispose asynchronously. + var reader = socketReader!; + _ = Task.Run(async () => { - await reader.DisposeAsync().ConfigureAwait(false); - } - catch (Exception ex) - { - logger.LogError(ex, "Error occured when disposing socket reader loop."); - } - }); + try + { + await reader.DisposeAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + logger.LogError(ex, "Error occured when disposing socket reader loop."); + } + }); - // writer's internal buffer/channel is not thread-safe, must wait complete. - await socketWriter!.DisposeAsync(); - } + // writer's internal buffer/channel is not thread-safe, must wait complete. + await socketWriter!.DisposeAsync(); + } - // Dispose current and create new - await socket.DisposeAsync(); + // Dispose current and create new + await socket.DisposeAsync(); - NatsUri[] urls = Array.Empty(); - if (Options.NoRandomize) - { - urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x)).Distinct().ToArray() ?? Array.Empty(); - if (urls.Length == 0) + NatsUri[] urls = Array.Empty(); + if (Options.NoRandomize) { - urls = Options.GetSeedUris(); + urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x)).Distinct().ToArray() ?? Array.Empty(); + if (urls.Length == 0) + { + urls = Options.GetSeedUris(); + } } - } - else - { - urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x)).OrderBy(_ => Guid.NewGuid()).Distinct().ToArray() ?? Array.Empty(); - if (urls.Length == 0) + else { - urls = Options.GetSeedUris(); + urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x)).OrderBy(_ => Guid.NewGuid()).Distinct().ToArray() ?? Array.Empty(); + if (urls.Length == 0) + { + urls = Options.GetSeedUris(); + } } - } - - if (this.currentConnectUri != null) - { - // add last. - urls = urls.Where(x => x != currentConnectUri).Append(currentConnectUri).ToArray(); - } - currentConnectUri = null; - var urlEnumerator = urls.AsEnumerable().GetEnumerator(); - NatsUri? url = null; - CONNECT_AGAIN: - try - { - if (urlEnumerator.MoveNext()) + if (this.currentConnectUri != null) { - url = urlEnumerator.Current; - logger.LogInformation("Try to connect NATS {0}:{1}", url.Host, url.Port); - var conn = new TcpConnection(); - await conn.ConnectAsync(url.Host, url.Port, Options.ConnectTimeout).ConfigureAwait(false); - this.socket = conn; - this.currentConnectUri = url; + // add last. + urls = urls.Where(x => x != currentConnectUri).Append(currentConnectUri).ToArray(); } - else + + currentConnectUri = null; + var urlEnumerator = urls.AsEnumerable().GetEnumerator(); + NatsUri? url = null; + CONNECT_AGAIN: + try { - urlEnumerator = urls.AsEnumerable().GetEnumerator(); - goto CONNECT_AGAIN; - } + if (urlEnumerator.MoveNext()) + { + url = urlEnumerator.Current; + logger.LogInformation("Try to connect NATS {0}:{1}", url.Host, url.Port); + var conn = new TcpConnection(); + await conn.ConnectAsync(url.Host, url.Port, Options.ConnectTimeout).ConfigureAwait(false); + this.socket = conn; + this.currentConnectUri = url; + } + else + { + urlEnumerator = urls.AsEnumerable().GetEnumerator(); + goto CONNECT_AGAIN; + } - // add CONNECT command to priority lane - var connectCommand = AsyncConnectCommand.Create(pool, Options.ConnectOptions); - writerState.PriorityCommands.Add(connectCommand); + // add CONNECT command to priority lane + var connectCommand = AsyncConnectCommand.Create(pool, Options.ConnectOptions); + writerState.PriorityCommands.Add(connectCommand); - // Add SUBSCRIBE command to priority lane - var subscribeCommand = AsyncSubscribeBatchCommand.Create(pool, subscriptionManager.GetExistingSubscriptions()); - writerState.PriorityCommands.Add(subscribeCommand); + // Add SUBSCRIBE command to priority lane + var subscribeCommand = AsyncSubscribeBatchCommand.Create(pool, subscriptionManager.GetExistingSubscriptions()); + writerState.PriorityCommands.Add(subscribeCommand); - // Run Reader/Writer LOOP start - var waitForInfoSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - this.socketWriter = new NatsPipeliningWriteProtocolProcessor(socket, writerState, pool); - this.socketReader = new NatsReadProtocolProcessor(socket, this, waitForInfoSignal); + // Run Reader/Writer LOOP start + var waitForInfoSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + this.socketWriter = new NatsPipeliningWriteProtocolProcessor(socket, writerState, pool); + this.socketReader = new NatsReadProtocolProcessor(socket, this, waitForInfoSignal); - await connectCommand.AsValueTask().ConfigureAwait(false); - await subscribeCommand.AsValueTask().ConfigureAwait(false); - await waitForInfoSignal.Task.ConfigureAwait(false); - } - catch (Exception ex) - { - if (url != null) - { - logger.LogError(ex, "Fail to connect NATS {0}:{1}.", url.Host, url.Port); + await connectCommand.AsValueTask().ConfigureAwait(false); + await subscribeCommand.AsValueTask().ConfigureAwait(false); + await waitForInfoSignal.Task.ConfigureAwait(false); } - - if (socketWriter != null) - { - await socketWriter.DisposeAsync().ConfigureAwait(false); - } - if (socketReader != null) + catch (Exception ex) { - await socketReader.DisposeAsync().ConfigureAwait(false); + if (url != null) + { + logger.LogError(ex, "Fail to connect NATS {0}:{1}.", url.Host, url.Port); + } + + if (socketWriter != null) + { + await socketWriter.DisposeAsync().ConfigureAwait(false); + } + if (socketReader != null) + { + await socketReader.DisposeAsync().ConfigureAwait(false); + } + if (socket != null) + { + await socket.DisposeAsync().ConfigureAwait(false); + } + socket = null; + socketWriter = null; + socketReader = null; + + await WaitWithJitterAsync().ConfigureAwait(false); + goto CONNECT_AGAIN; } - if (socket != null) + + lock (gate) { - await socket.DisposeAsync().ConfigureAwait(false); + logger.LogInformation("Connect succeed, NATS {0}:{1}", url.Host, url.Port); + this.ConnectionState = NatsConnectionState.Open; + this.pingTimerCancellationTokenSource = new CancellationTokenSource(); + StartPingTimerAsync(pingTimerCancellationTokenSource.Token); + this.waitForOpenConnection.TrySetResult(); + Task.Run(ReconnectLoopAsync); } - socket = null; - socketWriter = null; - socketReader = null; - - await WaitWithJitterAsync().ConfigureAwait(false); - goto CONNECT_AGAIN; } - - lock (gate) + catch { - logger.LogInformation("Connect succeed, NATS {0}:{1}", url.Host, url.Port); - this.ConnectionState = NatsConnectionState.Open; - this.waitForOpenConnection.TrySetResult(); - reconnectLoop = Task.Run(ReconnectLoopAsync); } } @@ -342,6 +354,33 @@ async Task WaitWithJitterAsync() await Task.Delay(waitTime).ConfigureAwait(false); } + async void StartPingTimerAsync(CancellationToken cancellationToken) + { + if (Options.PingInterval == TimeSpan.Zero) return; + + var periodicTimer = new PeriodicTimer(Options.PingInterval); + ResetPongCount(); + try + { + while (!cancellationToken.IsCancellationRequested) + { + if (Interlocked.Increment(ref pongCount) > Options.MaxPingOut) + { + logger.LogInformation("Detect MaxPingOut, try to connection abort."); + if (socket != null) + { + await socket.AbortConnectionAsync(cancellationToken).ConfigureAwait(false); + return; + } + } + + PostPing(); + await periodicTimer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false); + } + } + catch { } + } + internal void EnqueuePing(AsyncPingCommand pingCommand) { // Enqueue Ping Command to current working reader. @@ -889,6 +928,11 @@ internal void PublishToResponseHandler(int requestId, in ReadOnlySequence requestResponseManager.PublishToResponseHandler(requestId, buffer); } + internal void ResetPongCount() + { + Interlocked.Exchange(ref pongCount, 0); + } + public async ValueTask DisposeAsync() { if (!isDisposed) @@ -909,6 +953,10 @@ public async ValueTask DisposeAsync() { await socketReader.DisposeAsync().ConfigureAwait(false); } + if (pingTimerCancellationTokenSource != null) + { + pingTimerCancellationTokenSource.Cancel(); + } subscriptionManager.Dispose(); requestResponseManager.Dispose(); waitForOpenConnection.TrySetCanceled(); @@ -925,9 +973,12 @@ async void WithConnect(Action core) try { await ConnectAsync().ConfigureAwait(false); + } + catch + { + // log will shown on ConnectAsync failed return; } - catch { } // log will shown on ConnectAsync failed core(this); } @@ -936,9 +987,12 @@ async void WithConnect(T1 item1, Action core) try { await ConnectAsync().ConfigureAwait(false); + } + catch + { + // log will shown on ConnectAsync failed return; } - catch { } // log will shown on ConnectAsync failed core(this, item1); } @@ -947,9 +1001,12 @@ async void WithConnect(T1 item1, T2 item2, Action> DispatchCommandAsync(int code, in ReadOn { const int PongSize = 6; // PONG\r\n + connection.ResetPongCount(); // reset count for PingTimer + if (pingCommands.TryDequeue(out var pingCommand)) { var start = pingCommand.WriteTime; diff --git a/src/AlterNats/TcpConnection.cs b/src/AlterNats/TcpConnection.cs index 9b9ff223e..ae1d4ff07 100644 --- a/src/AlterNats/TcpConnection.cs +++ b/src/AlterNats/TcpConnection.cs @@ -85,6 +85,11 @@ public ValueTask ReceiveAsync(Memory buffer, SocketFlags socketFlags) return socket.ReceiveAsync(buffer, socketFlags, CancellationToken.None); } + public ValueTask AbortConnectionAsync(CancellationToken cancellationToken) + { + return socket.DisconnectAsync(false, cancellationToken); + } + public ValueTask DisposeAsync() { if (Interlocked.Increment(ref disposed) == 1)