Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream Ordered consumer #169

Merged
merged 11 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/documentation/jetstream/consume.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fast it can process messages without overwhelming the application process.
while (!cancellationToken.IsCancellationRequested)
{
// Consume a batch of messages (1000 by default)
await foreach (var msg in consumer.FetchAllAsync<Order>())
await foreach (var msg in consumer.FetchAsync<Order>())
{
// Process message
await msg.AckAsync();
Expand All @@ -59,7 +59,7 @@ overlapped so that there is a constant flow of messages from the JetStream serve
or `MaxBytes` and respective thresholds to not overwhelm the application and to not waste server resources.

```csharp
await foreach (var msg in consumer.ConsumeAllAsync<Order>())
await foreach (var msg in consumer.ConsumeAsync<Order>())
{
// Process message
await msg.AckAsync();
Expand Down Expand Up @@ -88,7 +88,7 @@ while (!cancellationToken.IsCancellationRequested)
try
{
await consumer.RefreshAsync(); // or try to recreate consumer
await foreach (var msg in consumer.ConsumeAllAsync<Order>())
await foreach (var msg in consumer.ConsumeAsync<Order>())
{
// Process message
await msg.AckAsync();
Expand Down
2 changes: 1 addition & 1 deletion docs/documentation/jetstream/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Check out [JetStream documentation](https://docs.nats.io/nats-concepts/jetstream
Finally, we're ready to consume the messages we persisted in `shop_orders` stream:

```csharp
await foreach (var msg in consumer.ConsumeAllAsync<Order>())
await foreach (var msg in consumer.ConsumeAsync<Order>())
{
var order = msg.Data;
Console.WriteLine($"Processing {msg.Subject} {order}...");
Expand Down
82 changes: 7 additions & 75 deletions sandbox/Example.JetStream.PullConsumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,39 +65,7 @@ void Report(int i, Stopwatch sw, string data)

try
{
if (cmd == "fetch")
{
while (!cts.Token.IsCancellationRequested)
{
try
{
Console.WriteLine($"___\nFETCH {maxMsgs}");
await consumer.RefreshAsync(cts.Token);
await using var sub = await consumer.FetchAsync<NatsMemoryOwner<byte>>(fetchOpts, cts.Token);
await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
{
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
Console.WriteLine($"Received: {message}");
}

await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
}
catch (NatsJSProtocolException e)
{
Console.WriteLine(e.Message);
}
catch (NatsJSException e)
{
Console.WriteLine(e.Message);
await Task.Delay(1000);
}
}
}
else if (cmd == "fetch-all-no-wait")
if (cmd == "fetch-no-wait")
{
while (!cts.Token.IsCancellationRequested)
{
Expand All @@ -110,7 +78,7 @@ void Report(int i, Stopwatch sw, string data)
var fetchNoWaitOpts = new NatsJSFetchOpts { MaxMsgs = max };
var fetchMsgCount = 0;

await foreach (var msg in consumer.FetchAllNoWaitAsync<NatsMemoryOwner<byte>>(fetchNoWaitOpts, cts.Token))
await foreach (var msg in consumer.FetchNoWaitAsync<NatsMemoryOwner<byte>>(fetchNoWaitOpts, cts.Token))
{
fetchMsgCount++;
using (msg.Data)
Expand Down Expand Up @@ -140,15 +108,15 @@ void Report(int i, Stopwatch sw, string data)
}
}
}
else if (cmd == "fetch-all")
else if (cmd == "fetch")
{
while (!cts.Token.IsCancellationRequested)
{
try
{
Console.WriteLine($"___\nFETCH {maxMsgs}");
await consumer.RefreshAsync(cts.Token);
await foreach (var msg in consumer.FetchAllAsync<NatsMemoryOwner<byte>>(fetchOpts, cts.Token))
await foreach (var msg in consumer.FetchAsync<NatsMemoryOwner<byte>>(fetchOpts, cts.Token))
{
using (msg.Data)
{
Expand Down Expand Up @@ -209,15 +177,9 @@ void Report(int i, Stopwatch sw, string data)
try
{
Console.WriteLine("___\nCONSUME");
await using var sub = await consumer.ConsumeAsync<NatsMemoryOwner<byte>>(consumeOpts);

cts.Token.Register(() =>
{
sub.DisposeAsync().GetAwaiter().GetResult();
});

var stopped = false;
await foreach (var msg in sub.Msgs.ReadAllAsync())
var consumeStop = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);
await foreach (var msg in consumer.ConsumeAsync<NatsMemoryOwner<byte>>(consumeOpts, consumeStop.Token))
{
using (msg.Data)
{
Expand All @@ -226,7 +188,7 @@ void Report(int i, Stopwatch sw, string data)
if (message == "stop")
{
Console.WriteLine("Stopping consumer...");
sub.Stop();
consumeStop.Cancel();
stopped = true;
}
}
Expand Down Expand Up @@ -257,36 +219,6 @@ void Report(int i, Stopwatch sw, string data)
}
}
}
else if (cmd == "consume-all")
{
while (!cts.Token.IsCancellationRequested)
{
try
{
Console.WriteLine("___\nCONSUME-ALL");
await foreach (var msg in consumer.ConsumeAllAsync<NatsMemoryOwner<byte>>(consumeOpts, cts.Token))
{
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
Console.WriteLine($"Received: {message}");
}

await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
}
catch (NatsJSProtocolException e)
{
Console.WriteLine(e.Message);
}
catch (NatsJSException e)
{
Console.WriteLine(e.Message);
await Task.Delay(1000);
}
}
}
else
{
Console.WriteLine("Usage: dotnet run -- <consume|consume-all|fetch|fetch-all|next>");
Expand Down
29 changes: 0 additions & 29 deletions src/NATS.Client.JetStream/INatsJSConsume.cs

This file was deleted.

14 changes: 0 additions & 14 deletions src/NATS.Client.JetStream/INatsJSFetch.cs

This file was deleted.

12 changes: 4 additions & 8 deletions src/NATS.Client.JetStream/Internal/NatsJSConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ internal struct PullRequest
public string Origin { get; init; }
}

internal class NatsJSConsume<TMsg> : NatsSubBase, INatsJSConsume<TMsg>
internal class NatsJSConsume<TMsg> : NatsSubBase
{
private readonly ILogger _logger;
private readonly bool _debug;
private readonly CancellationTokenSource _cts;
private readonly Channel<NatsJSMsg<TMsg?>> _userMsgs;
private readonly Channel<PullRequest> _pullRequests;
private readonly NatsJSContext _context;
Expand Down Expand Up @@ -60,8 +59,7 @@ public NatsJSConsume(
CancellationToken cancellationToken)
: base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_cancellationToken = _cts.Token;
_cancellationToken = cancellationToken;
_logger = Connection.Opts.LoggerFactory.CreateLogger<NatsJSConsume<TMsg>>();
_debug = _logger.IsEnabled(LogLevel.Debug);
_context = context;
Expand Down Expand Up @@ -139,8 +137,6 @@ public NatsJSConsume(

public ChannelReader<NatsJSMsg<TMsg?>> Msgs { get; }

public void Stop() => _cts.Cancel();

public ValueTask CallMsgNextAsync(string origin, ConsumerGetnextRequest request, CancellationToken cancellationToken = default)
{
if (_cancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -286,7 +282,7 @@ protected override async ValueTask ReceiveInternalAsync(
}
else if (headers.HasTerminalJSError())
{
_userMsgs.Writer.TryComplete(new NatsJSProtocolException($"JetStream server error: {headers.Code} {headers.MessageText}"));
_userMsgs.Writer.TryComplete(new NatsJSProtocolException(headers.Code, headers.Message, headers.MessageText));
EndSubscription(NatsSubEndReason.JetStreamError);
}
else
Expand Down Expand Up @@ -425,7 +421,7 @@ private void Pull(string origin, long batch, long maxBytes) => _pullRequests.Wri

private async Task PullLoop()
{
await foreach (var pr in _pullRequests.Reader.ReadAllAsync())
await foreach (var pr in _pullRequests.Reader.ReadAllAsync().ConfigureAwait(false))
{
var origin = $"pull-loop({pr.Origin})";
await CallMsgNextAsync(origin, pr.Request).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public static class NatsJSExtensionsInternal

public static bool HasTerminalJSError(this NatsHeaders headers) => headers
is { Code: 400 }
or { Code: 404 }
or { Code: 409, Message: NatsHeaders.Messages.ConsumerDeleted }
or { Code: 409, Message: NatsHeaders.Messages.ConsumerIsPushBased };
}
4 changes: 2 additions & 2 deletions src/NATS.Client.JetStream/Internal/NatsJSFetch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace NATS.Client.JetStream.Internal;

internal class NatsJSFetch<TMsg> : NatsSubBase, INatsJSFetch<TMsg>
internal class NatsJSFetch<TMsg> : NatsSubBase
{
private readonly ILogger _logger;
private readonly bool _debug;
Expand Down Expand Up @@ -189,7 +189,7 @@ protected override async ValueTask ReceiveInternalAsync(
}
else if (headers.HasTerminalJSError())
{
_userMsgs.Writer.TryComplete(new NatsJSProtocolException($"JetStream server error: {headers.Code} {headers.MessageText}"));
_userMsgs.Writer.TryComplete(new NatsJSProtocolException(headers.Code, headers.Message, headers.MessageText));
EndSubscription(NatsSubEndReason.JetStreamError);
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ internal class NatsJSOrderedPushConsumer<T>
private readonly Task _commandTask;
private readonly long _ackWaitNanos;

private long _sequenceStream;
private long _sequenceConsumer;
private ulong _sequenceStream;
private ulong _sequenceConsumer;
private string _consumer;
private volatile NatsJSOrderedPushConsumerSub<T>? _sub;
private int _done;
Expand Down Expand Up @@ -218,7 +218,7 @@ private async Task CommandLoop()

var sequence = Interlocked.Increment(ref _sequenceConsumer);

if (sequence != (long)metadata.Sequence.Consumer)
if (sequence != metadata.Sequence.Consumer)
{
CreateSub("sequence-mismatch");
_logger.LogWarning("Missed messages, recreating consumer");
Expand All @@ -228,7 +228,7 @@ private async Task CommandLoop()
// Increment the sequence before writing to the channel in case the channel is full
// and the writer is waiting for the reader to read the message. This way the sequence
// will be correctly incremented in case the timeout kicks in and recreated the consumer.
Interlocked.Exchange(ref _sequenceStream, (long)metadata.Sequence.Stream);
Interlocked.Exchange(ref _sequenceStream, metadata.Sequence.Stream);

if (!IsDone)
{
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.JetStream/Models/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public record ConsumerConfiguration
[System.Text.Json.Serialization.JsonPropertyName("opt_start_seq")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
[System.ComponentModel.DataAnnotations.Range(0D, 18446744073709552000D)]
public long OptStartSeq { get; set; } = default!;
public ulong OptStartSeq { get; set; } = default!;

[System.Text.Json.Serialization.JsonPropertyName("opt_start_time")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
Expand Down
Loading