Skip to content

Commit

Permalink
auto connect part2
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Apr 12, 2022
1 parent 61c30ee commit a700f61
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 52 deletions.
13 changes: 5 additions & 8 deletions sandbox/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,12 @@ public static async Task Main()


var connection = new NatsConnection(options);
try
{
await connection.ConnectAsync();
}
catch
{
}


await Parallel.ForEachAsync(Enumerable.Range(0, 100), new ParallelOptions { MaxDegreeOfParallelism = 100 }, async (i, ct) =>
{
var ttl = await connection.PingAsync();
Console.WriteLine(ttl);
});
Console.ReadLine();

await connection.PublishAsync("foo", 100);
Expand Down
190 changes: 146 additions & 44 deletions src/AlterNats/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -633,41 +633,69 @@ public void PostDirectWrite(DirectWriteCommand command)
}
}

// TODO:Auto Connect.

public ValueTask<TResponse?> RequestAsync<TRequest, TResponse>(string key, TRequest request)
{
return RequestAsync<TRequest, TResponse>(new NatsKey(key, true), request);
}

public ValueTask<IDisposable> SubscribeRequestAsync<TRequest, TResponse>(in NatsKey key, Func<TRequest, TResponse> requestHandler)
{
return subscriptionManager.AddRequestHandlerAsync(key.Key, requestHandler);
return SubscribeRequestAsync(key.Key, requestHandler);
}

public ValueTask<IDisposable> SubscribeRequestAsync<TRequest, TResponse>(string key, Func<TRequest, TResponse> requestHandler)
{
return subscriptionManager.AddRequestHandlerAsync(key, requestHandler);
if (ConnectionState == NatsConnectionState.Open)
{
return subscriptionManager.AddRequestHandlerAsync(key, requestHandler);
}
else
{
return WithConnectAsync(key, requestHandler, static (self, key, requestHandler) =>
{
return self.subscriptionManager.AddRequestHandlerAsync(key, requestHandler);
});
}
}

public ValueTask<IDisposable> SubscribeRequestAsync<TRequest, TResponse>(in NatsKey key, Func<TRequest, Task<TResponse>> requestHandler)
{
return subscriptionManager.AddRequestHandlerAsync(key.Key, requestHandler);
return SubscribeRequestAsync(key.Key, requestHandler);
}

public ValueTask<IDisposable> SubscribeRequestAsync<TRequest, TResponse>(string key, Func<TRequest, Task<TResponse>> requestHandler)
{
return subscriptionManager.AddRequestHandlerAsync(key, requestHandler);
if (ConnectionState == NatsConnectionState.Open)
{
return subscriptionManager.AddRequestHandlerAsync(key, requestHandler);
}
else
{
return WithConnectAsync(key, requestHandler, static (self, key, requestHandler) =>
{
return self.subscriptionManager.AddRequestHandlerAsync(key, requestHandler);
});
}
}

public ValueTask<IDisposable> SubscribeAsync<T>(in NatsKey key, Action<T> handler)
{
return subscriptionManager.AddAsync(key.Key, null, handler);
return SubscribeAsync(key.Key, handler);
}

public ValueTask<IDisposable> SubscribeAsync<T>(string key, Action<T> handler)
{
return subscriptionManager.AddAsync(key, null, handler);
if (ConnectionState == NatsConnectionState.Open)
{
return subscriptionManager.AddAsync(key, null, handler);
}
else
{
return WithConnectAsync(key, handler, static (self, key, handler) =>
{
return self.subscriptionManager.AddAsync(key, null, handler);
});
}
}

public ValueTask<IDisposable> SubscribeAsync<T>(in NatsKey key, Func<T, Task> asyncHandler)
Expand All @@ -677,57 +705,137 @@ public ValueTask<IDisposable> SubscribeAsync<T>(in NatsKey key, Func<T, Task> as

public ValueTask<IDisposable> SubscribeAsync<T>(string key, Func<T, Task> asyncHandler)
{
return subscriptionManager.AddAsync<T>(key, null, async x =>
if (ConnectionState == NatsConnectionState.Open)
{
try
return subscriptionManager.AddAsync<T>(key, null, async x =>
{
await asyncHandler(x).ConfigureAwait(false);
}
catch (Exception ex)
try
{
await asyncHandler(x).ConfigureAwait(false);
}
catch (Exception ex)
{
logger.LogError(ex, "Error occured during subscribe message.");
}
});
}
else
{
return WithConnectAsync(key, asyncHandler, static (self, key, asyncHandler) =>
{
logger.LogError(ex, "Error occured during subscribe message.");
}
});
return self.subscriptionManager.AddAsync<T>(key, null, async x =>
{
try
{
await asyncHandler(x).ConfigureAwait(false);
}
catch (Exception ex)
{
self.logger.LogError(ex, "Error occured during subscribe message.");
}
});
});
}
}

public ValueTask<IDisposable> SubscribeAsync<T>(in NatsKey key, in NatsKey queueGroup, Action<T> handler)
{
return subscriptionManager.AddAsync(key.Key, queueGroup, handler);
if (ConnectionState == NatsConnectionState.Open)
{
return subscriptionManager.AddAsync(key.Key, queueGroup, handler);
}
else
{
return WithConnectAsync(key, queueGroup, handler, static (self, key, queueGroup, handler) =>
{
return self.subscriptionManager.AddAsync(key.Key, queueGroup, handler);
});
}
}

public ValueTask<IDisposable> SubscribeAsync<T>(string key, string queueGroup, Action<T> handler)
{
return subscriptionManager.AddAsync(key, new NatsKey(queueGroup, true), handler);
if (ConnectionState == NatsConnectionState.Open)
{
return subscriptionManager.AddAsync(key, new NatsKey(queueGroup, true), handler);
}
else
{
return WithConnectAsync(key, queueGroup, handler, static (self, key, queueGroup, handler) =>
{
return self.subscriptionManager.AddAsync(key, new NatsKey(queueGroup, true), handler);
});
}
}

public ValueTask<IDisposable> SubscribeAsync<T>(in NatsKey key, in NatsKey queueGroup, Func<T, Task> asyncHandler)
{
return subscriptionManager.AddAsync<T>(key.Key, queueGroup, async x =>
if (ConnectionState == NatsConnectionState.Open)
{
try
return subscriptionManager.AddAsync<T>(key.Key, queueGroup, async x =>
{
await asyncHandler(x).ConfigureAwait(false);
}
catch (Exception ex)
try
{
await asyncHandler(x).ConfigureAwait(false);
}
catch (Exception ex)
{
logger.LogError(ex, "Error occured during subscribe message.");
}
});
}
else
{
return WithConnectAsync(key, queueGroup, asyncHandler, static (self, key, queueGroup, asyncHandler) =>
{
logger.LogError(ex, "Error occured during subscribe message.");
}
});
return self.subscriptionManager.AddAsync<T>(key.Key, queueGroup, async x =>
{
try
{
await asyncHandler(x).ConfigureAwait(false);
}
catch (Exception ex)
{
self.logger.LogError(ex, "Error occured during subscribe message.");
}
});
});
}
}

public ValueTask<IDisposable> SubscribeAsync<T>(string key, string queueGroup, Func<T, Task> asyncHandler)
{
return subscriptionManager.AddAsync<T>(key, new NatsKey(queueGroup, true), async x =>
if (ConnectionState == NatsConnectionState.Open)
{
try
return subscriptionManager.AddAsync<T>(key, new NatsKey(queueGroup, true), async x =>
{
await asyncHandler(x).ConfigureAwait(false);
}
catch (Exception ex)
try
{
await asyncHandler(x).ConfigureAwait(false);
}
catch (Exception ex)
{
logger.LogError(ex, "Error occured during subscribe message.");
}
});
}
else
{
return WithConnectAsync(key, queueGroup, asyncHandler, static (self, key, queueGroup, asyncHandler) =>
{
logger.LogError(ex, "Error occured during subscribe message.");
}
});
return self.subscriptionManager.AddAsync<T>(key, new NatsKey(queueGroup, true), async x =>
{
try
{
await asyncHandler(x).ConfigureAwait(false);
}
catch (Exception ex)
{
self.logger.LogError(ex, "Error occured during subscribe message.");
}
});
});
}
}

public IObservable<T> AsObservable<T>(string key)
Expand Down Expand Up @@ -843,12 +951,6 @@ async void WithConnect<T1, T2>(T1 item1, T2 item2, Action<NatsConnection, T1, T2
core(this, item1, item2);
}

async ValueTask WithConnectAsync(Func<NatsConnection, ValueTask> coreAsync)
{
await ConnectAsync().ConfigureAwait(false);
await coreAsync(this).ConfigureAwait(false);
}

async ValueTask WithConnectAsync<T1>(T1 item1, Func<NatsConnection, T1, ValueTask> coreAsync)
{
await ConnectAsync().ConfigureAwait(false);
Expand All @@ -867,15 +969,15 @@ async ValueTask<T> WithConnectAsync<T>(Func<NatsConnection, ValueTask<T>> coreAs
return await coreAsync(this).ConfigureAwait(false);
}

async ValueTask<TResult> WithConnectAsync<T1, TResult>(T1 item1, Func<NatsConnection, T1, ValueTask<TResult>> coreAsync)
async ValueTask<TResult> WithConnectAsync<T1, T2, TResult>(T1 item1, T2 item2, Func<NatsConnection, T1, T2, ValueTask<TResult>> coreAsync)
{
await ConnectAsync().ConfigureAwait(false);
return await coreAsync(this, item1).ConfigureAwait(false);
return await coreAsync(this, item1, item2).ConfigureAwait(false);
}

async ValueTask<TResult> WithConnectAsync<T1, T2, TResult>(T1 item1, T2 item2, Func<NatsConnection, T1, T2, ValueTask<TResult>> coreAsync)
async ValueTask<TResult> WithConnectAsync<T1, T2, T3, TResult>(T1 item1, T2 item2, T3 item3, Func<NatsConnection, T1, T2, T3, ValueTask<TResult>> coreAsync)
{
await ConnectAsync().ConfigureAwait(false);
return await coreAsync(this, item1, item2).ConfigureAwait(false);
return await coreAsync(this, item1, item2, item3).ConfigureAwait(false);
}
}

0 comments on commit a700f61

Please sign in to comment.