Skip to content

Commit

Permalink
Subscriber cancellation fix
Browse files Browse the repository at this point in the history
The only way to stop the subscription in the async enumerable
model is to use a cancellation token. When we use the token
we must make sure the channel is completed cleanly as well.
  • Loading branch information
mtmk committed Nov 7, 2023
1 parent 4ac9672 commit 3a3c70f
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
7 changes: 5 additions & 2 deletions src/NATS.Client.Core/NatsConnection.Subscribe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ public partial class NatsConnection
public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var sub = await SubscribeInternalAsync(subject, queueGroup, serializer, opts, cancellationToken).ConfigureAwait(false);
while (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false))

// We don't cancel the channel reader here because we want to keep reading until the subscription
// channel writer completes so that messages left in the channel can be consumed before exit the loop.
while (await sub.Msgs.WaitToReadAsync(CancellationToken.None).ConfigureAwait(false))
{
while (sub.Msgs.TryRead(out var msg))
{
Expand All @@ -20,7 +23,7 @@ public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, stri
public async ValueTask<NatsSub<T>> SubscribeInternalAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
{
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();
var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer);
var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
await SubAsync(sub, cancellationToken).ConfigureAwait(false);
return sub;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.Services.Tests/ServicesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public async Task Add_service_listeners_ping_info_and_stats()
[Fact]
public async Task Add_end_point()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10_0000));
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

await using var server = NatsServer.Start();
Expand Down

0 comments on commit 3a3c70f

Please sign in to comment.