From 4ac9672653a3dd20c1bd6bddb580b6a6a16d034e Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 6 Nov 2023 23:22:32 +0000 Subject: [PATCH] Subscribe docs updates --- docs/documentation/core/intro.md | 25 ++++++++++++++++++++++--- docs/documentation/core/pub-sub.md | 19 ++++++++++++++----- docs/documentation/core/queue.md | 12 ++++-------- docs/documentation/core/req-rep.md | 4 +--- docs/index.md | 4 ++-- 5 files changed, 43 insertions(+), 21 deletions(-) diff --git a/docs/documentation/core/intro.md b/docs/documentation/core/intro.md index 48904a44c..d28d069de 100644 --- a/docs/documentation/core/intro.md +++ b/docs/documentation/core/intro.md @@ -33,9 +33,11 @@ Subscribe to all `bar` [related subjects](https://docs.nats.io/nats-concepts/sub ```csharp await using var nats = new NatsConnection(); -await using sub = await nats.SubscribeAsync("bar.>"); -await foreach (var msg in sub.Msgs.ReadAllAsync()) +await foreach (var msg in nats.Subscription("bar.>")) { + if (msg.Subject == "bar.exit") + break; + Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n"); } ``` @@ -49,6 +51,8 @@ for (int i = 0; i < 10; i++) Console.WriteLine($" Publishing {i}..."); await nats.PublishAsync($"bar.baz.{i}", new Bar { Id = i, Name = "Baz" }); } + +await nats.PublishAsync("bar.exit"); ``` ## Logging @@ -57,7 +61,22 @@ You should also hook your logger to `NatsConnection` to make sure all is working to get help diagnosing any issues you might have: ```csharp -var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +// First add Nuget package Microsoft.Extensions.Logging.Console +using Microsoft.Extensions.Logging; + +using var loggerFactory = LoggerFactory.Create(configure: builder => +{ + builder + .SetMinimumLevel(LogLevel.Information) + .AddSimpleConsole(options => + { + options.SingleLine = true; + options.TimestampFormat = "yyyy-MM-dd HH:mm:ss.fff zzz "; + }); +}); + +var opts = NatsOpts.Default with { LoggerFactory = loggerFactory }; + await using var nats = new NatsConnection(otps); ``` diff --git a/docs/documentation/core/pub-sub.md b/docs/documentation/core/pub-sub.md index 1d3a76a79..89a9729bd 100644 --- a/docs/documentation/core/pub-sub.md +++ b/docs/documentation/core/pub-sub.md @@ -7,16 +7,25 @@ receives the message. ```csharp await using var nats = new NatsConnection(); -await using sub = await nats.SubscribeAsync("foo"); +var sub = Task.Run(async () => +{ + await foreach(var msg in nats.SubscribeAsync("foo")) + { + Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n"); + + if (msg.Data == -1) + break; + } +}); for (int i = 0; i < 10; i++) { Console.WriteLine($" Publishing {i}..."); await nats.PublishAsync("foo", i); + await Task.Delay(1000); } -await foreach (var msg in sub.Msgs.ReadAllAsync()) -{ - Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n"); -} +await nats.PublishAsync("foo", -1); + +await sub; ``` diff --git a/docs/documentation/core/queue.md b/docs/documentation/core/queue.md index 9ed3d56fa..4072641eb 100644 --- a/docs/documentation/core/queue.md +++ b/docs/documentation/core/queue.md @@ -17,20 +17,17 @@ await using var nats = new NatsConnection(); var subs = new List(); var replyTasks = new List(); +var cts = new CancellationTokenSource(); for (int i = 0; i < 3; i++) { // Create three subscriptions all on the same queue group - var sub = await nats.SubscribeAsync("math.double", queueGroup: "maths-service"); - - subs.Add(sub); - // Create a background message loop for every subscription var replyTaskId = i; replyTasks.Add(Task.Run(async () => { // Retrieve messages until unsubscribed - await foreach (var msg in sub.Msgs.ReadAllAsync()) + await foreach (var msg in nats.SubscribeAsync("math.double", queueGroup: "maths-service", cancellationToken: cts.Token)) { Console.WriteLine($"[{replyTaskId}] Received request: {msg.Data}"); await msg.ReplyAsync($"Answer is: {2 * msg.Data}"); @@ -49,9 +46,8 @@ for (int i = 0; i < 10; i++) Console.WriteLine("Stopping..."); -// Unsubscribing or disposing will complete the message loops -foreach (var sub in subs) - await sub.UnsubscribeAsync(); +// Cancellation token will unsubcribe and complete the message loops +cts.Cancel(); // Make sure all tasks finished cleanly await Task.WhenAll(replyTasks); diff --git a/docs/documentation/core/req-rep.md b/docs/documentation/core/req-rep.md index d4cc642ac..a5c90b4aa 100644 --- a/docs/documentation/core/req-rep.md +++ b/docs/documentation/core/req-rep.md @@ -8,9 +8,7 @@ Create a service that will be responding to requests: ```csharp await using var nats = new NatsConnection(); -await using var sub = await conn.SubscribeAsync("math.double"); - -await foreach (var msg in sub.Msgs.ReadAllAsync()) +await foreach (var msg in conn.SubscribeAsync("math.double")) { Console.WriteLine($"Received request: {msg.Data}"); diff --git a/docs/index.md b/docs/index.md index 69568ccf8..efaae62bb 100644 --- a/docs/index.md +++ b/docs/index.md @@ -14,8 +14,8 @@ The NATS.NET V2 client is in preview and not recommended for production use yet. - [x] Object Store initial support - [x] Service API initial support - [x] .NET 8.0 support (Native AOT) -- [ ] Implementation of missing major features (e.g. JetStream ordered consumers) -- [ ] Beta phase +- [x] Implementation of missing major features (e.g. JetStream ordered consumers) +- [x] Beta phase - [ ] Testing and bug fixing - [ ] General Availability