Skip to content

Commit

Permalink
Merge branch 'main' into svc-initial-impl
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Oct 25, 2023
2 parents 5b8c16d + ffac7ca commit e22d1bc
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 13 deletions.
44 changes: 43 additions & 1 deletion sandbox/Example.JetStream.PullConsumer/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Buffers;
using System.Diagnostics;
using System.Text;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -98,6 +97,49 @@ void Report(int i, Stopwatch sw, string data)
}
}
}
else if (cmd == "fetch-all-no-wait")
{
while (!cts.Token.IsCancellationRequested)
{
try
{
const int max = 10;
Console.WriteLine($"___\nFETCH-NO-WAIT {max}");
await consumer.RefreshAsync(cts.Token);

var fetchNoWaitOpts = new NatsJSFetchOpts { MaxMsgs = max };
var fetchMsgCount = 0;

await foreach (var msg in consumer.FetchAllNoWaitAsync<NatsMemoryOwner<byte>>(fetchNoWaitOpts, cts.Token))
{
fetchMsgCount++;
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
Console.WriteLine($"Received: {message}");
}

await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}

if (fetchMsgCount < fetchNoWaitOpts.MaxMsgs)
{
Console.WriteLine("No more messages. Pause for more...");
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
catch (NatsJSProtocolException e)
{
Console.WriteLine(e.Message);
}
catch (NatsJSException e)
{
Console.WriteLine(e.Message);
await Task.Delay(1000);
}
}
}
else if (cmd == "fetch-all")
{
while (!cts.Token.IsCancellationRequested)
Expand Down
11 changes: 8 additions & 3 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public async ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancell
throw new NatsException("Inbox subscriptions don't support queue groups");
}

await SubscribeInboxAsync(sub.Subject, sub.Opts, sub, cancellationToken).ConfigureAwait(false);
await SubscribeInboxAsync(sub, cancellationToken).ConfigureAwait(false);
}
else
{
Expand Down Expand Up @@ -171,7 +171,7 @@ public ISubscriptionManager GetManagerFor(string subject)
return this;
}

private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
private async ValueTask SubscribeInboxAsync(NatsSubBase sub, CancellationToken cancellationToken)
{
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
{
Expand All @@ -181,7 +181,12 @@ private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, N
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
{
var inboxSubject = $"{_inboxPrefix}.*";
_inboxSub = InboxSubBuilder.Build(inboxSubject, opts, _connection, manager: this);

// We need to subscribe to the real inbox subject before we can register the internal subject.
// We use 'default' options here since options provided by the user are for the internal subscription.
// For example if the user provides a timeout, we don't want to timeout the real inbox subscription
// since it must live duration of the connection.
_inboxSub = InboxSubBuilder.Build(inboxSubject, opts: default, _connection, manager: this);
await SubscribeInternalAsync(
inboxSubject,
queueGroup: default,
Expand Down
46 changes: 45 additions & 1 deletion src/NATS.Client.JetStream/NatsJSConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,51 @@ await sub.CallMsgNextAsync(
}
}

public async IAsyncEnumerable<NatsJSMsg<T?>> FetchNoWait<T>(
/// <summary>
/// Consume a set number of messages from the stream using this consumer.
/// Returns immediately if no messages are available.
/// </summary>
/// <param name="opts">Fetch options. (default: <c>MaxMsgs</c> 1,000 and timeout is ignored)</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the call.</param>
/// <typeparam name="T">Message type to deserialize.</typeparam>
/// <returns>Async enumerable of messages which can be used in a <c>await foreach</c> loop.</returns>
/// <exception cref="NatsJSProtocolException">Consumer is deleted, it's push based or request sent to server is invalid.</exception>
/// <exception cref="NatsJSException">There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier.</exception>
/// <remarks>
/// <para>
/// This method will return immediately if no messages are available.
/// </para>
/// <para>
/// Using this method is discouraged because it might create an unnecessary load on your cluster.
/// Use <c>Consume</c> or <c>Fetch</c> instead.
/// </para>
/// </remarks>
/// <example>
/// <para>
/// However, there are scenarios where this method is useful. For example if your application is
/// processing messages in batches infrequently (for example every 5 minutes) you might want to
/// consider <c>FetchNoWait</c>. You must make sure to count your messages and stop fetching
/// if you received all of them in one call, meaning when <c>count &lt; MaxMsgs</c>.
/// </para>
/// <code>
/// const int max = 10;
/// var count = 0;
///
/// await foreach (var msg in consumer.FetchAllNoWaitAsync&lt;int&gt;(new NatsJSFetchOpts { MaxMsgs = max }))
/// {
/// count++;
/// Process(msg);
/// await msg.AckAsync();
/// }
///
/// if (count &lt; max)
/// {
/// // No more messages. Pause for more.
/// await Task.Delay(TimeSpan.FromMinutes(5));
/// }
/// </code>
/// </example>
public async IAsyncEnumerable<NatsJSMsg<T?>> FetchAllNoWaitAsync<T>(
NatsJSFetchOpts? opts = default,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Expand Down
6 changes: 4 additions & 2 deletions src/NATS.Client.JetStream/NatsJSContext.Streams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,19 @@ public async ValueTask<StreamPurgeResponse> PurgeStreamAsync(
/// Get stream information from the server and creates a NATS JetStream stream object <see cref="NatsJSStream"/>.
/// </summary>
/// <param name="stream">Name of the stream to retrieve.</param>
/// <param name="request">Stream info request options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
public async ValueTask<NatsJSStream> GetStreamAsync(
string stream,
StreamInfoRequest? request = null,
CancellationToken cancellationToken = default)
{
var response = await JSRequestResponseAsync<object, StreamInfoResponse>(
var response = await JSRequestResponseAsync<StreamInfoRequest, StreamInfoResponse>(
subject: $"{Opts.Prefix}.STREAM.INFO.{stream}",
request: null,
request: request,
cancellationToken);
return new NatsJSStream(this, response);
}
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.KeyValueStore/NatsKVContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public async ValueTask<NatsKVStore> GetStoreAsync(string bucket, CancellationTok
{
ValidateBucketName(bucket);

var stream = await _context.GetStreamAsync(BucketToStream(bucket), cancellationToken);
var stream = await _context.GetStreamAsync(BucketToStream(bucket), cancellationToken: cancellationToken);

if (stream.Info.Config.MaxMsgsPerSubject < 1)
{
Expand Down
64 changes: 64 additions & 0 deletions tests/NATS.Client.Core.Tests/RequestReplyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,68 @@ static string ToStr(ReadOnlyMemory<byte> input)
await sub.DisposeAsync();
await reg;
}

[Fact]
public async Task Request_reply_many_multiple_with_timeout_test()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

const string subject = "foo";
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var cancellationToken = cts.Token;

var sub = await nats.SubscribeAsync<int>(subject, cancellationToken: cancellationToken);
var reg = sub.Register(async msg =>
{
await msg.ReplyAsync(msg.Data * 2, cancellationToken: cancellationToken);
});

var opts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(2) };

// Make sure timeout isn't affecting the real inbox subscription
// by waiting double the timeout period (by calling RequestMany twice)
// which should be enough
for (var i = 1; i <= 2; i++)
{
var data = -1;
await foreach (var msg in nats.RequestManyAsync<int, int>(subject, i * 100, replyOpts: opts, cancellationToken: cancellationToken))
{
data = msg.Data;
}

Assert.Equal(i * 200, data);
}

// Run a bunch more RequestMany calls with timeout for good measure
List<Task<(int index, int data)>> tasks = new();

for (var i = 0; i < 10; i++)
{
var index = i;

tasks.Add(Task.Run(
async () =>
{
var data = -1;

await foreach (var msg in nats.RequestManyAsync<int, int>(subject, index, replyOpts: opts, cancellationToken: cancellationToken))
{
data = msg.Data;
}

return (index, data);
},
cancellationToken));
}

foreach (var task in tasks)
{
var (index, data) = await task;
Assert.Equal(index * 2, data);
}

await sub.DisposeAsync();
await reg;
}
}
2 changes: 1 addition & 1 deletion tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public async Task FetchNoWait_test()

var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token);
var count = 0;
await foreach (var msg in consumer.FetchNoWait<TestData>(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token))
await foreach (var msg in consumer.FetchAllNoWaitAsync<TestData>(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token))
{
await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token);
Assert.Equal(count, msg.Data!.Test);
Expand Down
6 changes: 3 additions & 3 deletions tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,20 @@ public async Task Account_info_create_get_update_stream()

// Get
{
var stream = await js.GetStreamAsync("events", cancellationToken);
var stream = await js.GetStreamAsync("events", cancellationToken: cancellationToken);
Assert.Equal("events", stream.Info.Config.Name);
Assert.Equal(new[] { "events.*" }, stream.Info.Config.Subjects);
}

// Update
{
var stream1 = await js.GetStreamAsync("events", cancellationToken);
var stream1 = await js.GetStreamAsync("events", cancellationToken: cancellationToken);
Assert.Equal(-1, stream1.Info.Config.MaxMsgs);

var stream2 = await js.UpdateStreamAsync(new StreamUpdateRequest { Name = "events", MaxMsgs = 10 }, cancellationToken);
Assert.Equal(10, stream2.Info.Config.MaxMsgs);

var stream3 = await js.GetStreamAsync("events", cancellationToken);
var stream3 = await js.GetStreamAsync("events", cancellationToken: cancellationToken);
Assert.Equal(10, stream3.Info.Config.MaxMsgs);
}
}
Expand Down
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.0-alpha.5
2.0.0-alpha.6

0 comments on commit e22d1bc

Please sign in to comment.