-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JetStream Ordered consumer #169
Conversation
Simplified consumer api to just consume, fetch and next methods removing the *All* versions. Now consume and fetch methods return IAsyncEnumerable only. This reduces the API foot-print as well as making it consistent between 'Ordered Consumer' and 'Consumer'. * Removed consume and fetch methods exposing channel reader. * Renamed *All* methods e.g. FetchAllAsync() to FetchAsync(). * Also removed the interfaces exposing the channel readers.
# Conflicts: # tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs # tests/NATS.Client.JetStream.Tests/JetStreamTest.cs
What can we do about the disparity with core subscribe? Subscribe call on core on the other hand still returns an interface public interface INatsSub<T> : IAsyncDisposable
{
ChannelReader<NatsMsg<T>> Msgs { get; }
string Subject { get; }
string? QueueGroup { get; }
public ValueTask UnsubscribeAsync();
} An idea here maybe to introduce a different call to match JetStream calls: // New matching call
IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, ...);
// Existing call renamed
ValueTask<INatsSub<T>> SubscribeCoreAsync<T>(string subject, ...); |
if (!string.IsNullOrWhiteSpace(request.Config.Name)) | ||
{ | ||
subject += $".{request.Config.Name}"; | ||
request.Config.Name = default!; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why mutate name here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spot on! no need for it.
if (!string.IsNullOrWhiteSpace(request.Config.FilterSubject)) | ||
{ | ||
subject += $".{request.Config.FilterSubject}"; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the name was not set, but the filter subject was set? With this logic the subject token for name is not present, and the filter subject token is appended to where the name subject should go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FilterSubject
is never set actually. Dead code. I think it might be my copy & paste error. Sorry about that!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Ordered consumer implementation based on https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-17.md
API change discussion:
Simplified consumer api to just consume, fetch and next methods removing the
*All*
versions. Now consume and fetch methods (renamed from the*All*
versions) returnIAsyncEnumerable
only. This reduces the API footprint as well as making it consistent between 'Ordered Consumer' and 'Consumer'.*All*
methods e.g.FetchAllAsync()
toFetchAsync()
.resolves #32