Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect backoff #211

Merged
merged 3 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public partial class NatsConnection : IAsyncDisposable, INatsConnection
private TlsCerts? _tlsCerts;
private ClientOpts _clientOpts;
private UserCredentials? _userCredentials;
private int _connectRetry;
private TimeSpan _backoff = TimeSpan.Zero;

public NatsConnection()
: this(NatsOpts.Default)
Expand Down Expand Up @@ -432,7 +434,7 @@ private async void ReconnectLoop()
{
ConnectionState = NatsConnectionState.Reconnecting;
_waitForOpenConnection.TrySetCanceled();
_waitForOpenConnection = new TaskCompletionSource();
_waitForOpenConnection = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_pingTimerCancellationTokenSource?.Cancel();
}

Expand Down Expand Up @@ -522,6 +524,8 @@ private async void ReconnectLoop()

lock (_gate)
{
_connectRetry = 0;
_backoff = TimeSpan.Zero;
_logger.LogInformation("Connect succeed {0}, NATS {1}", _name, url);
ConnectionState = NatsConnectionState.Open;
_pingTimerCancellationTokenSource = new CancellationTokenSource();
Expand All @@ -535,6 +539,7 @@ private async void ReconnectLoop()
{
if (ex is OperationCanceledException)
return;
_waitForOpenConnection.TrySetException(ex);
_logger.LogError(ex, "Unknown error, loop stopped and connection is invalid state.");
}
}
Expand All @@ -561,8 +566,44 @@ private NatsUri FixTlsHost(NatsUri uri)

private async Task WaitWithJitterAsync()
{
int retry;
TimeSpan backoff;
lock (_gate)
{
retry = _connectRetry++;

if (Opts.ReconnectWaitMin >= Opts.ReconnectWaitMax)
{
_backoff = Opts.ReconnectWaitMin;
}
else if (_backoff == TimeSpan.Zero)
{
_backoff = Opts.ReconnectWaitMin;
}
else if (_backoff == Opts.ReconnectWaitMax)
{
}
else
{
_backoff *= 2;
mtmk marked this conversation as resolved.
Show resolved Hide resolved
if (_backoff > Opts.ReconnectWaitMax)
{
_backoff = Opts.ReconnectWaitMax;
}
else if (_backoff <= TimeSpan.Zero)
{
_backoff = TimeSpan.FromSeconds(1);
}
}

backoff = _backoff;
}

if (Opts.MaxReconnectRetry > 0 && retry > Opts.MaxReconnectRetry)
throw new NatsException("Max connect retry exceeded.");

var jitter = Random.Shared.NextDouble() * Opts.ReconnectJitter.TotalMilliseconds;
var waitTime = Opts.ReconnectWait + TimeSpan.FromMilliseconds(jitter);
var waitTime = TimeSpan.FromMilliseconds(jitter) + backoff;
if (waitTime != TimeSpan.Zero)
{
_logger.LogTrace("Wait {0}ms to reconnect.", waitTime.TotalMilliseconds);
Expand Down
157 changes: 77 additions & 80 deletions src/NATS.Client.Core/NatsOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,88 +8,85 @@ namespace NATS.Client.Core;
/// <summary>
/// Immutable options for NatsConnection, you can configure via `with` operator.
/// </summary>
/// <param name="Url"></param>
/// <param name="Name"></param>
/// <param name="Echo"></param>
/// <param name="Verbose"></param>
/// <param name="Headers"></param>
/// <param name="AuthOpts"></param>
/// <param name="TlsOpts"></param>
/// <param name="SerializerRegistry"></param>
/// <param name="LoggerFactory"></param>
/// <param name="WriterBufferSize"></param>
/// <param name="ReaderBufferSize"></param>
/// <param name="UseThreadPoolCallback"></param>
/// <param name="InboxPrefix"></param>
/// <param name="NoRandomize"></param>
/// <param name="PingInterval"></param>
/// <param name="MaxPingOut"></param>
/// <param name="ReconnectWait"></param>
/// <param name="ReconnectJitter"></param>
/// <param name="ConnectTimeout"></param>
/// <param name="ObjectPoolSize"></param>
/// <param name="RequestTimeout"></param>
/// <param name="CommandTimeout"></param>
/// <param name="SubscriptionCleanUpInterval"></param>
/// <param name="WriterCommandBufferLimit"></param>
/// <param name="HeaderEncoding"></param>
/// <param name="WaitUntilSent"></param>
public sealed record NatsOpts
(
string Url,
string Name,
bool Echo,
bool Verbose,
bool Headers,
NatsAuthOpts AuthOpts,
NatsTlsOpts TlsOpts,
INatsSerializerRegistry SerializerRegistry,
ILoggerFactory LoggerFactory,
int WriterBufferSize,
int ReaderBufferSize,
bool UseThreadPoolCallback,
string InboxPrefix,
bool NoRandomize,
TimeSpan PingInterval,
int MaxPingOut,
TimeSpan ReconnectWait,
TimeSpan ReconnectJitter,
TimeSpan ConnectTimeout,
int ObjectPoolSize,
TimeSpan RequestTimeout,
TimeSpan CommandTimeout,
TimeSpan SubscriptionCleanUpInterval,
int? WriterCommandBufferLimit,
Encoding HeaderEncoding,
bool WaitUntilSent)
{
public static readonly NatsOpts Default = new(
Url: "nats://localhost:4222",
Name: "NATS .Net Client",
Echo: true,
Verbose: false,
Headers: true,
AuthOpts: NatsAuthOpts.Default,
TlsOpts: NatsTlsOpts.Default,
SerializerRegistry: NatsDefaultSerializerRegistry.Default,
LoggerFactory: NullLoggerFactory.Instance,
WriterBufferSize: 65534, // 32767
ReaderBufferSize: 1048576,
UseThreadPoolCallback: false,
InboxPrefix: "_INBOX",
NoRandomize: false,
PingInterval: TimeSpan.FromMinutes(2),
MaxPingOut: 2,
ReconnectWait: TimeSpan.FromSeconds(2),
ReconnectJitter: TimeSpan.FromMilliseconds(100),
ConnectTimeout: TimeSpan.FromSeconds(2),
ObjectPoolSize: 256,
RequestTimeout: TimeSpan.FromSeconds(5),
CommandTimeout: TimeSpan.FromMinutes(1),
SubscriptionCleanUpInterval: TimeSpan.FromMinutes(5),
WriterCommandBufferLimit: 1_000,
HeaderEncoding: Encoding.ASCII,
WaitUntilSent: false);
public static readonly NatsOpts Default = new();

public string Url { get; init; } = "nats://localhost:4222";

public string Name { get; init; } = "NATS .Net Client";

public bool Echo { get; init; } = true;

public bool Verbose { get; init; } = false;

public bool Headers { get; init; } = true;

public NatsAuthOpts AuthOpts { get; init; } = NatsAuthOpts.Default;

public NatsTlsOpts TlsOpts { get; init; } = NatsTlsOpts.Default;

public INatsSerializerRegistry SerializerRegistry { get; init; } = NatsDefaultSerializerRegistry.Default;

public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance;

public int WriterBufferSize { get; init; } = 65534;

public int ReaderBufferSize { get; init; } = 1048576;

public bool UseThreadPoolCallback { get; init; } = false;

public string InboxPrefix { get; init; } = "_INBOX";

public bool NoRandomize { get; init; } = false;

public TimeSpan PingInterval { get; init; } = TimeSpan.FromMinutes(2);

public int MaxPingOut { get; init; } = 2;

/// <summary>
/// Minimum amount of time to wait between reconnect attempts. (default: 2s)
/// </summary>
public TimeSpan ReconnectWaitMin { get; init; } = TimeSpan.FromSeconds(2);

/// <summary>
/// Random amount of time to wait between reconnect attempts. (default: 100ms)
/// </summary>
public TimeSpan ReconnectJitter { get; init; } = TimeSpan.FromMilliseconds(100);

public TimeSpan ConnectTimeout { get; init; } = TimeSpan.FromSeconds(2);

public int ObjectPoolSize { get; init; } = 256;

public TimeSpan RequestTimeout { get; init; } = TimeSpan.FromSeconds(5);

public TimeSpan CommandTimeout { get; init; } = TimeSpan.FromMinutes(1);

public TimeSpan SubscriptionCleanUpInterval { get; init; } = TimeSpan.FromMinutes(5);

public int? WriterCommandBufferLimit { get; init; } = 1_000;

public Encoding HeaderEncoding { get; init; } = Encoding.ASCII;

public bool WaitUntilSent { get; init; } = false;

/// <summary>
/// Maximum number of reconnect attempts. (default: -1, unlimited)
/// </summary>
/// <remarks>
/// Set to -1 for unlimited retries.
/// </remarks>
public int MaxReconnectRetry { get; init; } = -1;

/// <summary>
/// Backoff delay limit for reconnect attempts. (default: 5 seconds)
/// </summary>
/// <remarks>
/// When the connection is lost, the client will wait for <see cref="ReconnectWaitMin"/> before attempting to reconnect.
/// Every failed attempt will increase the wait time by 2x, up to <see cref="ReconnectWaitMax"/>.
/// If <see cref="ReconnectWaitMax"/> is equal to or less than <see cref="ReconnectWaitMin"/>, the delay will be constant.
/// </remarks>
public TimeSpan ReconnectWaitMax { get; init; } = TimeSpan.FromSeconds(5);

internal NatsUri[] GetSeedUris()
{
Expand Down
59 changes: 59 additions & 0 deletions tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
namespace NATS.Client.Core.Tests;

public class ConnectionRetryTest
{
private readonly ITestOutputHelper _output;

public ConnectionRetryTest(ITestOutputHelper output) => _output = output;

[Fact]
public async Task Max_retry_reached_after_disconnect()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection(new NatsOpts
{
MaxReconnectRetry = 2,
ReconnectWaitMax = TimeSpan.Zero,
ReconnectWaitMin = TimeSpan.FromSeconds(.1),
});

var signal = new WaitSignal();
nats.ReconnectFailed += (_, e) => signal.Pulse();

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await server.StopAsync();

await signal;
var exception = await Assert.ThrowsAsync<NatsException>(async () => await nats.PingAsync(cts.Token));
Assert.Equal("Max connect retry exceeded.", exception.Message);
}

[Fact]
public async Task Retry_and_connect_after_disconnected()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection(new NatsOpts
{
MaxReconnectRetry = 10,
ReconnectWaitMax = TimeSpan.Zero,
ReconnectWaitMin = TimeSpan.FromSeconds(2),
});

var signal = new WaitSignal();
nats.ReconnectFailed += (_, e) => signal.Pulse();

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await server.StopAsync();

await signal;

await Task.Delay(TimeSpan.FromSeconds(5), cts.Token);

server.StartServerProcess();

var rtt = await nats.PingAsync(cts.Token);
Assert.True(rtt > TimeSpan.Zero);
}
}