Skip to content

Commit

Permalink
remove Options.Host/Port, Add Options.Url
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Apr 7, 2022
1 parent 4f24a84 commit d9e5f21
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 43 deletions.
2 changes: 0 additions & 2 deletions sandbox/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ public static async Task Main()
// LoggerFactory = loggerFactory,
LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Information),
Serializer = new MessagePackNatsSerializer(),
Host = "localhost",
Port = 4222,
ConnectOptions = ConnectOptions.Default with { Echo = true, Verbose = false }
};

Expand Down
8 changes: 4 additions & 4 deletions sandbox/ConsoleApp2/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

await using var subConnection1 = new NatsConnection(NatsOptions.Default with
{
Port = 4222
Url = "localhost:4222"
});
await subConnection1.ConnectAsync();

Expand All @@ -22,7 +22,7 @@

await using var subConnection2 = new NatsConnection(NatsOptions.Default with
{
Port = 4223
Url = "localhost:4222"
});

await subConnection2.ConnectAsync();
Expand All @@ -43,14 +43,14 @@
{
await using var pubConnection1 = new NatsConnection(NatsOptions.Default with
{
Port = 4222
Url = "localhost:4222"
});

await pubConnection1.ConnectAsync();

await using var pubConnection2 = new NatsConnection(NatsOptions.Default with
{
Port = 4222
Url = "localhost:4222"
});

await pubConnection2.ConnectAsync();
Expand Down
2 changes: 0 additions & 2 deletions sandbox/NatsBenchmark/Benchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ double convertTicksToMicros(double ticks)
return ticks / TimeSpan.TicksPerMillisecond * 1000.0;
}

// TODO: look into warming up the server, this client for accuracy.
void runPubSubLatency(string testName, long testCount, long testSize)
{
object subcriberLock = new object();
Expand Down Expand Up @@ -340,7 +339,6 @@ void runPubSubLatency(string testName, long testCount, long testSize)
)
);

// TODO: fix accuracy - trim out outliers, etc.
Console.WriteLine(
"{0} (us)\t{1} msgs, {2:F2} avg, {3:F2} min, {4:F2} max, {5:F2} stddev",
testName,
Expand Down
1 change: 0 additions & 1 deletion sandbox/NatsBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ void RunPubSubAlterNatsVector3(string testName, long testCount, bool disableShow
pubConn.ConnectAsync().AsTask().Wait();
subConn.ConnectAsync().AsTask().Wait();

// TODO:Async Subscribe
var d = subConn.SubscribeAsync<Vector3>(key.Key, _ =>
{
Interlocked.Increment(ref subCount);
Expand Down
4 changes: 3 additions & 1 deletion src/AlterNats/Internal/ObjectPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void Grow<T>(int id)
{
if (poolNodes.Length <= id)
{
Array.Resize(ref poolNodes, poolNodes.Length * 2);
Array.Resize(ref poolNodes, Math.Max(poolNodes.Length * 2, id + 1));
poolNodes[id] = new ObjectPool<T>(poolLimit);
}
else if (poolNodes[id] == null)
Expand Down Expand Up @@ -101,6 +101,8 @@ public ObjectPool(int limit)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryPop([NotNullWhen(true)] out T? result)
{
// Instead of lock, use CompareExchange gate.
// In a worst case, missed cached object(create new one) but it's not a big deal.
if (Interlocked.CompareExchange(ref gate, 1, 0) == 0)
{
var v = root;
Expand Down
32 changes: 19 additions & 13 deletions src/AlterNats/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,27 @@ public async ValueTask ConnectAsync()

async ValueTask InitialConnectAsync()
{
var connectHost = Options.Host;
var connectPort = Options.Port;
try
var uris = Options.GetSeedUris();
foreach (var uri in uris)
{
// TODO:foreach and retry
logger.LogInformation("Try to connect NATS {0}:{1}", connectHost, connectPort);
var conn = new PhysicalConnection();
await conn.ConnectAsync(connectHost, connectPort, CancellationToken.None); // TODO:CancellationToken
this.socket = conn;
try
{
logger.LogInformation("Try to connect NATS {0}:{1}", uri.Host, uri.Port);
var conn = new PhysicalConnection();
await conn.ConnectAsync(uri.Host, uri.Port, CancellationToken.None); // TODO:CancellationToken
this.socket = conn;
break;
}
catch (Exception ex)
{
this.ConnectionState = NatsConnectionState.Closed;
logger.LogError(ex, "Fail to connect NATS {0}:{1}.", uri.Host, uri.Port);
}
}
catch (Exception ex)
if (this.socket == null)
{
this.ConnectionState = NatsConnectionState.Closed;
logger.LogError(ex, "Fail to connect NATS {0}:{1}.", connectHost, connectPort);
throw; // throw for can't connect.
// TODO:require to retry initial connect???
throw new Exception("can not connect uris: " + String.Join(",", uris.Select(x => x.ToString()))); // TODO:throw other Exception
}

// Connected completely but still ConnnectionState is Connecting(require after receive INFO).
Expand Down Expand Up @@ -211,7 +217,7 @@ async Task ReconnectLoopAsync()
// TODO: Append current host/port to last, use Distinct
}

var urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x)).OrderBy(_ => Guid.NewGuid()).ToArray() ?? new[] { new NatsUri("nats://" + Options.Host + ":" + Options.Port) };
var urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x)).OrderBy(_ => Guid.NewGuid()).ToArray() ?? new[] { new NatsUri(Options.Url) };
var urlEnumerator = urls.AsEnumerable().GetEnumerator();
NatsUri? url = null;
CONNECT_AGAIN:
Expand Down
19 changes: 15 additions & 4 deletions src/AlterNats/NatsOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ namespace AlterNats;

public sealed record NatsOptions
(
string Host,
int Port,
string Url,
ConnectOptions ConnectOptions,
INatsSerializer Serializer,
ILoggerFactory LoggerFactory,
Expand All @@ -26,8 +25,7 @@ int CommandPoolSize
)
{
public static NatsOptions Default = new NatsOptions(
Host: "localhost",
Port: 4222,
Url: "nats://localhost:4222",
ConnectOptions: ConnectOptions.Default,
Serializer: new JsonNatsSerializer(new JsonSerializerOptions() { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull }),
LoggerFactory: NullLoggerFactory.Instance,
Expand All @@ -43,4 +41,17 @@ int CommandPoolSize
Timeout: TimeSpan.FromSeconds(2),
CommandPoolSize: 256
);

internal NatsUri[] GetSeedUris()
{
var urls = Url.Split(',');
if (NoRandomize)
{
return urls.Select(x => new NatsUri(x)).ToArray();
}
else
{
return urls.Select(x => new NatsUri(x)).OrderBy(_ => Guid.NewGuid()).ToArray();
}
}
}
2 changes: 1 addition & 1 deletion src/AlterNats/NatsUri.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
internal sealed class NatsUri : IEquatable<NatsUri>
{
const string DefaultScheme = "nats://";
public static readonly NatsUri Default = new NatsUri("localhost:4222");
public static readonly NatsUri Default = new NatsUri("nats://localhost:4222");
public const int DefaultPort = 4222;

readonly Uri uri;
Expand Down
1 change: 0 additions & 1 deletion src/AlterNats/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public SubscriptionManager(NatsConnection connection)
this.connection = connection;
}

// TODO: for reconnecting
public (int subscriptionId, string subject, NatsKey? queueGroup)[] GetExistingSubscriptions()
{
lock (gate)
Expand Down
28 changes: 14 additions & 14 deletions tests/AlterNats.Tests/PubSubTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public async Task Basic<T>(int subPort, int pubPort, IEnumerable<T> items)

await using var subConnection = new NatsConnection(NatsOptions.Default with
{
Port = subPort
Url = "localhost:" + subPort
});

await subConnection.ConnectAsync();
Expand All @@ -40,7 +40,7 @@ public async Task Basic<T>(int subPort, int pubPort, IEnumerable<T> items)

await using var pubConnection = new NatsConnection(NatsOptions.Default with
{
Port = pubPort
Url = "localhost:" + pubPort
});

await pubConnection.ConnectAsync();
Expand Down Expand Up @@ -69,7 +69,7 @@ public async Task BasicWithMessagePackSerializer<T>(int subPort, int pubPort, IE

await using var subConnection = new NatsConnection(NatsOptions.Default with
{
Port = subPort,
Url = "localhost:" + subPort,
Serializer = new MessagePackNatsSerializer()
});

Expand All @@ -85,7 +85,7 @@ public async Task BasicWithMessagePackSerializer<T>(int subPort, int pubPort, IE

await using var pubConnection = new NatsConnection(NatsOptions.Default with
{
Port = pubPort,
Url = "localhost:" + pubPort,
Serializer = new MessagePackNatsSerializer()
});

Expand All @@ -110,7 +110,7 @@ public async Task BasicRequest<T>(int subPort, int pubPort, IEnumerable<T> items

await using var subConnection = new NatsConnection(NatsOptions.Default with
{
Port = subPort
Url = "localhost:" + subPort
});

await subConnection.ConnectAsync();
Expand All @@ -119,7 +119,7 @@ public async Task BasicRequest<T>(int subPort, int pubPort, IEnumerable<T> items

await using var pubConnection = new NatsConnection(NatsOptions.Default with
{
Port = pubPort
Url = "localhost:" + pubPort
});

await pubConnection.ConnectAsync();
Expand All @@ -138,7 +138,7 @@ public async Task BasicRequestWithMessagePackSerializer<T>(int subPort, int pubP

await using var subConnection = new NatsConnection(NatsOptions.Default with
{
Port = subPort,
Url = "localhost:" + subPort,
Serializer = new MessagePackNatsSerializer()
});

Expand All @@ -148,7 +148,7 @@ public async Task BasicRequestWithMessagePackSerializer<T>(int subPort, int pubP

await using var pubConnection = new NatsConnection(NatsOptions.Default with
{
Port = pubPort,
Url = "localhost:" + pubPort,
Serializer = new MessagePackNatsSerializer()
});

Expand Down Expand Up @@ -180,7 +180,7 @@ public async Task SubjectTest(string pubKey, int? expect1, int? expect2, int? ex

await using var subConnection = new NatsConnection(NatsOptions.Default with
{
Port = primaryPort
Url = "localhost:" + primaryPort,
});

await subConnection.ConnectAsync();
Expand Down Expand Up @@ -223,7 +223,7 @@ public async Task SubjectTest(string pubKey, int? expect1, int? expect2, int? ex

await using var pubConnection = new NatsConnection(NatsOptions.Default with
{
Port = primaryPort
Url = "localhost:" + primaryPort,
});

await pubConnection.ConnectAsync();
Expand Down Expand Up @@ -253,10 +253,10 @@ public async Task ConnectionException()
{
var connection1 = new NatsConnection(NatsOptions.Default with
{
Port = 14250
Url = "localhost:" + 14250,
});

await Assert.ThrowsAsync<SocketException>(async () => await connection1.ConnectAsync());
await Assert.ThrowsAsync<Exception>(async () => await connection1.ConnectAsync());
}

static readonly int[] seed1 = { 24, 45, 99, 41, 98, 7, 81, 8, 26, 56 };
Expand Down Expand Up @@ -327,7 +327,7 @@ public async Task ReConnectionTest()

var connection = new NatsConnection(NatsOptions.Default with
{
Port = 14224
Url = "localhost:" + 14224,
});

// Wait for start clustering.
Expand All @@ -341,7 +341,7 @@ public async Task ReConnectionTest()

connection = new NatsConnection(NatsOptions.Default with
{
Port = 14224
Url = "localhost:" + 14224,
});

await Task.Delay(500);
Expand Down

0 comments on commit d9e5f21

Please sign in to comment.