diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 2a89b9429..d2c913653 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -56,6 +56,9 @@ jobs:
- name: Test Object Store
run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.ObjectStore.Tests/NATS.Client.ObjectStore.Tests.csproj
+ - name: Test Services
+ run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.Services.Tests/NATS.Client.Services.Tests.csproj
+
memory_test:
name: memory test
strategy:
diff --git a/NATS.Client.sln b/NATS.Client.sln
index 930a9f0c9..8f57c7bd6 100644
--- a/NATS.Client.sln
+++ b/NATS.Client.sln
@@ -79,6 +79,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.ObjectStore.Tes
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.ObjectStore", "sandbox\Example.ObjectStore\Example.ObjectStore.csproj", "{51882883-A66E-4F95-A1AB-CFCBF71B4376}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Services", "src\NATS.Client.Services\NATS.Client.Services.csproj", "{050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Services.Tests", "tests\NATS.Client.Services.Tests\NATS.Client.Services.Tests.csproj", "{749CAE39-4C1E-4627-9E31-A36B987BC453}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Services", "sandbox\Example.Services\Example.Services.csproj", "{DD0AB72A-D6CD-4054-A9C9-0DCA3EDBA00F}"
+EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.TlsFirst", "sandbox\Example.TlsFirst\Example.TlsFirst.csproj", "{88625045-978F-417F-9F51-A4E3A9718945}"
EndProject
Global
@@ -203,6 +209,18 @@ Global
{51882883-A66E-4F95-A1AB-CFCBF71B4376}.Debug|Any CPU.Build.0 = Debug|Any CPU
{51882883-A66E-4F95-A1AB-CFCBF71B4376}.Release|Any CPU.ActiveCfg = Release|Any CPU
{51882883-A66E-4F95-A1AB-CFCBF71B4376}.Release|Any CPU.Build.0 = Release|Any CPU
+ {050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}.Release|Any CPU.Build.0 = Release|Any CPU
+ {749CAE39-4C1E-4627-9E31-A36B987BC453}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {749CAE39-4C1E-4627-9E31-A36B987BC453}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {749CAE39-4C1E-4627-9E31-A36B987BC453}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {749CAE39-4C1E-4627-9E31-A36B987BC453}.Release|Any CPU.Build.0 = Release|Any CPU
+ {DD0AB72A-D6CD-4054-A9C9-0DCA3EDBA00F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {DD0AB72A-D6CD-4054-A9C9-0DCA3EDBA00F}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {DD0AB72A-D6CD-4054-A9C9-0DCA3EDBA00F}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {DD0AB72A-D6CD-4054-A9C9-0DCA3EDBA00F}.Release|Any CPU.Build.0 = Release|Any CPU
{88625045-978F-417F-9F51-A4E3A9718945}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{88625045-978F-417F-9F51-A4E3A9718945}.Debug|Any CPU.Build.0 = Debug|Any CPU
{88625045-978F-417F-9F51-A4E3A9718945}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -242,6 +260,9 @@ Global
{3F8840BA-4F91-4359-AA53-6B26823E7F55} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{51882883-A66E-4F95-A1AB-CFCBF71B4376} = {95A69671-16CA-4133-981C-CC381B7AAA30}
+ {050C63EE-8F1C-4535-9C6C-E12E62A1FF1D} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
+ {749CAE39-4C1E-4627-9E31-A36B987BC453} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
+ {DD0AB72A-D6CD-4054-A9C9-0DCA3EDBA00F} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{88625045-978F-417F-9F51-A4E3A9718945} = {95A69671-16CA-4133-981C-CC381B7AAA30}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
diff --git a/NATS.Client.sln.DotSettings b/NATS.Client.sln.DotSettings
index e0bb2621b..a5b44166a 100644
--- a/NATS.Client.sln.DotSettings
+++ b/NATS.Client.sln.DotSettings
@@ -8,4 +8,4 @@
True
True
True
- True
\ No newline at end of file
+ True
diff --git a/docs/documentation/intro.md b/docs/documentation/intro.md
index f8ff7b6c1..c15af3763 100644
--- a/docs/documentation/intro.md
+++ b/docs/documentation/intro.md
@@ -17,3 +17,5 @@ these docs. You can also create a Pull Request using the Edit on GitHub link on
[Key/Value Store](key-value-store/intro.md) is the built-in distributed persistent associative arrays built on top of JetStream.
[Object Store](object-store/intro.md) is the built-in distributed persistent objects of arbitrary size built on top of JetStream.
+
+[Services](services/intro.md) is the services protocol built on top of core NATS enabling discovery and monitoring of services you develop.
diff --git a/docs/documentation/object-store/intro.md b/docs/documentation/object-store/intro.md
index ec32ee011..5759a139c 100644
--- a/docs/documentation/object-store/intro.md
+++ b/docs/documentation/object-store/intro.md
@@ -33,7 +33,7 @@ Let's create our store first. In Object Store, a bucket is simply a storage for
var store = await obj.CreateObjectStore("test-bucket");
```
-Now that we have a KV bucket in our stream, let's see its status using the [NATS command
+Now that we have a bucket in our stream, let's see its status using the [NATS command
line client](https://github.com/nats-io/natscli):
```shell
diff --git a/docs/documentation/services/intro.md b/docs/documentation/services/intro.md
new file mode 100644
index 000000000..d807e88b5
--- /dev/null
+++ b/docs/documentation/services/intro.md
@@ -0,0 +1,102 @@
+# Services
+
+[Services](https://docs.nats.io/using-nats/developer/services) is a protocol that provides first-class services support
+for NATS clients and it's supported by NATS tooling. This services protocol is an agreement between clients and tooling and
+doesn't require any special functionality from the NATS server or JetStream.
+
+To be able to use Services you need to running the `nats-server`.
+
+## Services Quick Start
+
+[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it:
+
+```shell
+$ nats-server
+```
+
+Install `NATS.Client.Services` preview from Nuget.
+
+Before we can do anything, we need a Services context:
+
+```csharp
+await using var nats = new NatsConnection();
+var svc = new NatsSvcContext(nats);
+```
+
+Let's create our first service:
+
+```csharp
+await using var testService = await svc.AddServiceAsync("test", "1.0.0");
+```
+
+Now that we have a service in our stream, let's see its status using the [NATS command
+line client](https://github.com/nats-io/natscli) (make sure you have at least v0.1.1):
+
+```shell
+$ nats --version
+0.1.1
+```
+
+```shell
+$ nats micro info test
+Service Information
+
+ Service: test (Bw6eqhVYs3dbNzZecuuFOV)
+ Description:
+ Version: 1.0.0
+
+Endpoints:
+
+Statistics for 0 Endpoint(s):
+```
+
+Now we can add endpoints to our service:
+
+```csharp
+await testService.AddEndPointAsync(name: "divide42", handler: async m =>
+{
+ if (m.Data == 0)
+ {
+ await m.ReplyErrorAsync(400, "Division by zero");
+ return;
+ }
+
+ await m.ReplyAsync(42 / m.Data);
+});
+```
+
+We can also confirm that our endpoint is registered by using the NATS command line:
+
+```shell
+$ nats req divide42 2
+11:34:03 Sending request on "divide42"
+11:34:03 Received with rtt 9.5823ms
+21
+
+$ nats micro stats test
+╭──────────────────────────────────────────────────────────────────────────────────────────────────────╮
+│ test Service Statistics │
+├────────────────────────┬──────────┬──────────┬─────────────┬────────┬─────────────────┬──────────────┤
+│ ID │ Endpoint │ Requests │ Queue Group │ Errors │ Processing Time │ Average Time │
+├────────────────────────┼──────────┼──────────┼─────────────┼────────┼─────────────────┼──────────────┤
+│ RH6q9Y6qM8em8m6lG2yN34 │ divide42 │ 1 │ q │ 0 │ 1ms │ 1ms │
+├────────────────────────┼──────────┼──────────┼─────────────┼────────┼─────────────────┼──────────────┤
+│ │ │ 1 │ │ 0 │ 1MS │ 1MS │
+╰────────────────────────┴──────────┴──────────┴─────────────┴────────┴─────────────────┴──────────────╯
+```
+
+## Groups
+
+A group is a collection of endpoints. These are optional and can provide a logical association between endpoints
+as well as an optional common subject prefix for all endpoints.
+
+You can group your endpoints optionally in different [queue groups](https://docs.nats.io/nats-concepts/core-nats/queue):
+
+```csharp
+var grp1 = await testService.AddGroupAsync("grp1");
+
+await grp1.AddEndPointAsync(name: "ep1", handler: async m =>
+{
+ // handle message
+});
+```
diff --git a/docs/index.md b/docs/index.md
index 577b2a0ab..bfb43d241 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -12,7 +12,7 @@ The NATS.NET V2 client is in preview and not recommended for production use yet.
- [x] JetStream initial support
- [x] KV initial support
- [x] Object Store initial support
-- [ ] Service API initial support
+- [x] Service API initial support
- [ ] .NET 8.0 support (e.g. Native AOT)
- [ ] Beta phase
diff --git a/sandbox/Example.Services/Example.Services.csproj b/sandbox/Example.Services/Example.Services.csproj
new file mode 100644
index 000000000..b67798a56
--- /dev/null
+++ b/sandbox/Example.Services/Example.Services.csproj
@@ -0,0 +1,15 @@
+
+
+
+ Exe
+ net6.0
+ enable
+ enable
+ false
+
+
+
+
+
+
+
diff --git a/sandbox/Example.Services/Program.cs b/sandbox/Example.Services/Program.cs
new file mode 100644
index 000000000..a06051132
--- /dev/null
+++ b/sandbox/Example.Services/Program.cs
@@ -0,0 +1,77 @@
+using System.Text;
+using Microsoft.Extensions.Logging;
+using NATS.Client.Core;
+using NATS.Client.Services;
+
+var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
+
+var nats = new NatsConnection(opts);
+var svc = new NatsSvcContext(nats);
+
+var qg = args.Length > 0 ? args[0] : "q";
+
+await using var testService = await svc.AddServiceAsync("test", "1.0.0", qg);
+
+await testService.AddEndpointAsync(name: "bla", handler: async m =>
+{
+ if (m.Exception is { } e)
+ {
+ Console.WriteLine($"[MSG] Error: {e.GetBaseException().Message}");
+ await m.ReplyErrorAsync(999, e.GetBaseException().Message, Encoding.UTF8.GetBytes(e.ToString()));
+ }
+
+ Console.WriteLine($"[MSG] {m.Subject}: {m.Data}");
+
+ if (m.Data == 0)
+ {
+ throw new Exception("Data can't be 0");
+ }
+
+ if (m.Data == 1)
+ {
+ throw new NatsSvcEndpointException(1, "Data can't be 1", "More info ...");
+ }
+
+ if (m.Data == 2)
+ {
+ await m.ReplyErrorAsync(2, "Data can't be 2");
+ return;
+ }
+
+ await Task.Delay(Random.Shared.Next(10, 100));
+ await m.ReplyAsync(42);
+});
+
+var grp1 = await testService.AddGroupAsync("grp1");
+
+await grp1.AddEndpointAsync(name: "bla", handler: async m =>
+{
+ if (m.Exception is { } e)
+ {
+ Console.WriteLine($"[MSG] Error: {e.GetBaseException().Message}");
+ await m.ReplyErrorAsync(999, e.GetBaseException().Message, Encoding.UTF8.GetBytes(e.ToString()));
+ }
+
+ Console.WriteLine($"[MSG] {m.Subject}: {m.Data}");
+
+ if (m.Data == 0)
+ {
+ throw new Exception("Data can't be 0");
+ }
+
+ if (m.Data == 1)
+ {
+ throw new NatsSvcEndpointException(1, "Data can't be 1", "More info ...");
+ }
+
+ if (m.Data == 2)
+ {
+ await m.ReplyErrorAsync(2, "Data can't be 2");
+ return;
+ }
+
+ await Task.Delay(Random.Shared.Next(10, 100));
+ await m.ReplyAsync(42);
+});
+
+Console.ReadLine();
diff --git a/src/NATS.Client.Core/Internal/NuidWriter.cs b/src/NATS.Client.Core/Internal/NuidWriter.cs
index 3783c22a9..9513703a8 100644
--- a/src/NATS.Client.Core/Internal/NuidWriter.cs
+++ b/src/NATS.Client.Core/Internal/NuidWriter.cs
@@ -40,6 +40,17 @@ public static bool TryWriteNuid(Span nuidBuffer)
return InitAndWrite(nuidBuffer);
}
+ public static string NewNuid()
+ {
+ Span buffer = stackalloc char[22];
+ if (TryWriteNuid(buffer))
+ {
+ return new string(buffer);
+ }
+
+ throw new InvalidOperationException("Internal error: can't generate nuid");
+ }
+
private static bool TryWriteNuidCore(Span buffer, Span prefix, ulong sequential)
{
if ((uint)buffer.Length < NuidLength || prefix.Length != PrefixLength)
diff --git a/src/NATS.Client.Core/NATS.Client.Core.csproj b/src/NATS.Client.Core/NATS.Client.Core.csproj
index db709f6e2..7ced64843 100644
--- a/src/NATS.Client.Core/NATS.Client.Core.csproj
+++ b/src/NATS.Client.Core/NATS.Client.Core.csproj
@@ -1,35 +1,37 @@
-
- net6.0
- enable
- enable
- true
+
+ net6.0
+ enable
+ enable
+ true
-
- pubsub;messaging
- An alternative high performance NATS client for .NET.
- true
-
+
+ pubsub;messaging
+ NATS client for .NET.
+ true
+
-
-
-
-
- all
- runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/NATS.Client.Hosting/NATS.Client.Hosting.csproj b/src/NATS.Client.Hosting/NATS.Client.Hosting.csproj
index 0acdf843e..a412f3a4d 100644
--- a/src/NATS.Client.Hosting/NATS.Client.Hosting.csproj
+++ b/src/NATS.Client.Hosting/NATS.Client.Hosting.csproj
@@ -1,22 +1,23 @@
-
- net6.0
- enable
- enable
+
+ net6.0
+ enable
+ enable
+ true
-
- pubsub;messaging
- ASP.NET Core and Generic Host support for NATS.Client.
- true
-
+
+ pubsub;messaging
+ ASP.NET Core and Generic Host support for NATS.Client.
+ true
+
-
-
-
+
+
+
-
-
-
+
+
+
diff --git a/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj b/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj
index 62a3ceef3..474feaa55 100644
--- a/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj
+++ b/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj
@@ -4,20 +4,21 @@
net6.0
enable
enable
+ true
- pubsub;messaging
+ pubsub;messaging;persistance
JetStream support for NATS.Client.
-
+ true
-
-
-
-
-
-
+
+
+
+
+
+
diff --git a/src/NATS.Client.KeyValueStore/NATS.Client.KeyValueStore.csproj b/src/NATS.Client.KeyValueStore/NATS.Client.KeyValueStore.csproj
index 0e8344a26..ad3673fb9 100644
--- a/src/NATS.Client.KeyValueStore/NATS.Client.KeyValueStore.csproj
+++ b/src/NATS.Client.KeyValueStore/NATS.Client.KeyValueStore.csproj
@@ -1,20 +1,20 @@
-
- net6.0
- enable
- enable
+
+ net6.0
+ enable
+ enable
+ true
-
- pubsub;messaging
- JetStream Key/Value Store support for NATS.Client.
- true
-
-
-
-
-
-
+
+ pubsub;messaging;persistance;key-value;storage
+ JetStream Key/Value Store support for NATS.Client.
+ true
+
+
+
+
+
diff --git a/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj b/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj
index 39282a010..d0ce22d99 100644
--- a/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj
+++ b/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj
@@ -7,7 +7,7 @@
true
- pubsub;messaging
+ pubsub;messaging;persistance;storage
JetStream Object Store support for NATS.Client.
true
diff --git a/src/NATS.Client.Services/Internal/SvcListener.cs b/src/NATS.Client.Services/Internal/SvcListener.cs
new file mode 100644
index 000000000..cda8ce9ad
--- /dev/null
+++ b/src/NATS.Client.Services/Internal/SvcListener.cs
@@ -0,0 +1,49 @@
+using System.Threading.Channels;
+using NATS.Client.Core;
+
+namespace NATS.Client.Services.Internal;
+
+internal class SvcListener : IAsyncDisposable
+{
+ private readonly NatsConnection _nats;
+ private readonly Channel _channel;
+ private readonly SvcMsgType _type;
+ private readonly string _subject;
+ private readonly string _queueGroup;
+ private readonly CancellationToken _cancellationToken;
+ private INatsSub>? _sub;
+ private Task? _readLoop;
+
+ public SvcListener(NatsConnection nats, Channel channel, SvcMsgType type, string subject, string queueGroup, CancellationToken cancellationToken)
+ {
+ _nats = nats;
+ _channel = channel;
+ _type = type;
+ _subject = subject;
+ _queueGroup = queueGroup;
+ _cancellationToken = cancellationToken;
+ }
+
+ public async ValueTask StartAsync()
+ {
+ _sub = await _nats.SubscribeAsync>(_subject, queueGroup: _queueGroup, cancellationToken: _cancellationToken);
+ _readLoop = Task.Run(async () =>
+ {
+ while (await _sub.Msgs.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
+ {
+ while (_sub.Msgs.TryRead(out var msg))
+ {
+ await _channel.Writer.WriteAsync(new SvcMsg(_type, msg), _cancellationToken).ConfigureAwait(false);
+ }
+ }
+ });
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ if (_sub != null)
+ await _sub.DisposeAsync();
+ if (_readLoop != null)
+ await _readLoop;
+ }
+}
diff --git a/src/NATS.Client.Services/Internal/SvcMsg.cs b/src/NATS.Client.Services/Internal/SvcMsg.cs
new file mode 100644
index 000000000..7ebecc2b2
--- /dev/null
+++ b/src/NATS.Client.Services/Internal/SvcMsg.cs
@@ -0,0 +1,12 @@
+using NATS.Client.Core;
+
+namespace NATS.Client.Services.Internal;
+
+internal enum SvcMsgType
+{
+ Ping,
+ Info,
+ Stats,
+}
+
+internal readonly record struct SvcMsg(SvcMsgType MsgType, NatsMsg> Msg);
diff --git a/src/NATS.Client.Services/Models/InfoResponse.cs b/src/NATS.Client.Services/Models/InfoResponse.cs
new file mode 100644
index 000000000..c6f33912e
--- /dev/null
+++ b/src/NATS.Client.Services/Models/InfoResponse.cs
@@ -0,0 +1,45 @@
+using System.Text.Json.Serialization;
+
+namespace NATS.Client.Services.Models;
+
+public record InfoResponse
+{
+ [JsonPropertyName("type")]
+ public string Type { get; set; } = "io.nats.micro.v1.info_response";
+
+ [JsonPropertyName("name")]
+ public string Name { get; set; } = default!;
+
+ [JsonPropertyName("id")]
+ public string Id { get; set; } = default!;
+
+ [JsonPropertyName("version")]
+ public string Version { get; set; } = default!;
+
+ [JsonPropertyName("metadata")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public IDictionary Metadata { get; set; } = default!;
+
+ [JsonPropertyName("description")]
+ public string Description { get; set; } = default!;
+
+ [JsonPropertyName("endpoints")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public ICollection Endpoints { get; set; } = default!;
+}
+
+public record EndpointInfo
+{
+ [JsonPropertyName("name")]
+ public string Name { get; set; } = default!;
+
+ [JsonPropertyName("subject")]
+ public string Subject { get; set; } = default!;
+
+ [JsonPropertyName("queue_group")]
+ public string QueueGroup { get; set; } = default!;
+
+ [JsonPropertyName("metadata")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public IDictionary Metadata { get; set; } = default!;
+}
diff --git a/src/NATS.Client.Services/Models/PingResponse.cs b/src/NATS.Client.Services/Models/PingResponse.cs
new file mode 100644
index 000000000..01df5341a
--- /dev/null
+++ b/src/NATS.Client.Services/Models/PingResponse.cs
@@ -0,0 +1,21 @@
+using System.Text.Json.Serialization;
+
+namespace NATS.Client.Services.Models;
+
+public record PingResponse
+{
+ [JsonPropertyName("type")]
+ public string Type { get; set; } = "io.nats.micro.v1.ping_response";
+
+ [JsonPropertyName("name")]
+ public string Name { get; set; } = default!;
+
+ [JsonPropertyName("id")]
+ public string Id { get; set; } = default!;
+
+ [JsonPropertyName("version")]
+ public string Version { get; set; } = default!;
+
+ [JsonPropertyName("metadata")]
+ public IDictionary Metadata { get; set; } = default!;
+}
diff --git a/src/NATS.Client.Services/Models/StatsResponse.cs b/src/NATS.Client.Services/Models/StatsResponse.cs
new file mode 100644
index 000000000..13c1677bc
--- /dev/null
+++ b/src/NATS.Client.Services/Models/StatsResponse.cs
@@ -0,0 +1,62 @@
+using System.Text.Json.Nodes;
+using System.Text.Json.Serialization;
+
+namespace NATS.Client.Services.Models;
+
+public record StatsResponse
+{
+ [JsonPropertyName("type")]
+ public string Type { get; set; } = "io.nats.micro.v1.stats_response";
+
+ [JsonPropertyName("name")]
+ public string Name { get; set; } = default!;
+
+ [JsonPropertyName("id")]
+ public string Id { get; set; } = default!;
+
+ [JsonPropertyName("version")]
+ public string Version { get; set; } = default!;
+
+ [JsonPropertyName("metadata")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public IDictionary Metadata { get; set; } = default!;
+
+ [JsonPropertyName("endpoints")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public ICollection Endpoints { get; set; } = default!;
+
+ [JsonPropertyName("started")]
+ public string Started { get; set; } = default!;
+}
+
+public record EndpointStats
+{
+ [JsonPropertyName("name")]
+ public string Name { get; set; } = default!;
+
+ [JsonPropertyName("subject")]
+ public string Subject { get; set; } = default!;
+
+ [JsonPropertyName("queue_group")]
+ public string QueueGroup { get; set; } = default!;
+
+ [JsonPropertyName("num_requests")]
+ public long NumRequests { get; set; }
+
+ [JsonPropertyName("num_errors")]
+ public long NumErrors { get; set; }
+
+ [JsonPropertyName("last_error")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public string LastError { get; set; } = default!;
+
+ [JsonPropertyName("data")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public JsonNode Data { get; set; } = default!;
+
+ [JsonPropertyName("processing_time")]
+ public long ProcessingTime { get; set; }
+
+ [JsonPropertyName("average_processing_time")]
+ public long AverageProcessingTime { get; set; }
+}
diff --git a/src/NATS.Client.Services/NATS.Client.Services.csproj b/src/NATS.Client.Services/NATS.Client.Services.csproj
new file mode 100644
index 000000000..5a9b4940f
--- /dev/null
+++ b/src/NATS.Client.Services/NATS.Client.Services.csproj
@@ -0,0 +1,19 @@
+
+
+
+ net6.0
+ enable
+ enable
+ true
+
+
+ pubsub;messaging;microservices;services
+ Service API support for NATS.Client.
+ true
+
+
+
+
+
+
+
diff --git a/src/NATS.Client.Services/NatsSvcConfig.cs b/src/NATS.Client.Services/NatsSvcConfig.cs
new file mode 100644
index 000000000..fc357cf30
--- /dev/null
+++ b/src/NATS.Client.Services/NatsSvcConfig.cs
@@ -0,0 +1,66 @@
+using System.Text.Json.Nodes;
+using System.Text.RegularExpressions;
+
+namespace NATS.Client.Services;
+
+///
+/// NATS service configuration.
+///
+public record NatsSvcConfig
+{
+ private static readonly Regex NameRegex = new(@"^[a-zA-Z0-9_-]+$", RegexOptions.Compiled);
+ private static readonly Regex VersionRegex = new(@"^(?0|[1-9]\d*)\.(?0|[1-9]\d*)\.(?0|[1-9]\d*)(?:-(?(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$", RegexOptions.Compiled);
+
+ ///
+ /// Creates a new instance of .
+ ///
+ /// Service name.
+ /// Service SemVer version.
+ /// Name or version is invalid.
+ public NatsSvcConfig(string name, string version)
+ {
+ if (!NameRegex.IsMatch(name))
+ {
+ throw new ArgumentException("Invalid service name (name can only have A-Z, a-z, 0-9, dash and underscore).", nameof(name));
+ }
+
+ if (!VersionRegex.IsMatch(version))
+ {
+ throw new ArgumentException("Invalid service version (must use Semantic Versioning).", nameof(version));
+ }
+
+ Name = name;
+ Version = version;
+ }
+
+ ///
+ /// Service name.
+ ///
+ public string Name { get; }
+
+ ///
+ /// Service version. Must be a valid Semantic Versioning string.
+ ///
+ public string Version { get; }
+
+ ///
+ /// Service description.
+ ///
+ public string? Description { get; init; }
+
+ ///
+ /// Service metadata. This will be included in the service info.
+ ///
+ public Dictionary? Metadata { get; init; }
+
+ ///
+ /// Queue group name. (default: "q")
+ ///
+ public string QueueGroup { get; init; } = "q";
+
+ ///
+ /// Stats handler. JSON object returned by this handler will be included in
+ /// the service stats data property.
+ ///
+ public Func? StatsHandler { get; init; }
+}
diff --git a/src/NATS.Client.Services/NatsSvcContext.cs b/src/NATS.Client.Services/NatsSvcContext.cs
new file mode 100644
index 000000000..b91f695bf
--- /dev/null
+++ b/src/NATS.Client.Services/NatsSvcContext.cs
@@ -0,0 +1,41 @@
+using NATS.Client.Core;
+
+namespace NATS.Client.Services;
+
+///
+/// NATS service context.
+///
+public class NatsSvcContext
+{
+ private readonly NatsConnection _nats;
+
+ ///
+ /// Creates a new instance of .
+ ///
+ /// NATS connection.
+ public NatsSvcContext(NatsConnection nats) => _nats = nats;
+
+ ///
+ /// Adds a new service.
+ ///
+ /// Service name.
+ /// Service SemVer version.
+ /// Optional queue group (default: "q")
+ /// A used to cancel the API call.
+ /// NATS Service instance.
+ public ValueTask AddServiceAsync(string name, string version, string queueGroup = "q", CancellationToken cancellationToken = default) =>
+ AddServiceAsync(new NatsSvcConfig(name, version) { QueueGroup = queueGroup }, cancellationToken);
+
+ ///
+ /// Adds a new service.
+ ///
+ /// Service configuration.
+ /// A used to cancel the API call.
+ /// NATS Service instance.
+ public async ValueTask AddServiceAsync(NatsSvcConfig config, CancellationToken cancellationToken = default)
+ {
+ var service = new NatsSvcServer(_nats, config, cancellationToken);
+ await service.StartAsync().ConfigureAwait(false);
+ return service;
+ }
+}
diff --git a/src/NATS.Client.Services/NatsSvcEndPoint.cs b/src/NATS.Client.Services/NatsSvcEndPoint.cs
new file mode 100644
index 000000000..cf126a877
--- /dev/null
+++ b/src/NATS.Client.Services/NatsSvcEndPoint.cs
@@ -0,0 +1,259 @@
+using System.Buffers;
+using System.Diagnostics;
+using System.Text;
+using System.Threading.Channels;
+using Microsoft.Extensions.Logging;
+using NATS.Client.Core;
+using NATS.Client.Core.Internal;
+
+namespace NATS.Client.Services;
+
+///
+/// NATS service endpoint.
+///
+public interface INatsSvcEndpoint : IAsyncDisposable
+{
+ ///
+ /// Number of requests received.
+ ///
+ long Requests { get; }
+
+ ///
+ /// Total processing time in nanoseconds.
+ ///
+ long ProcessingTime { get; }
+
+ ///
+ /// Number of errors.
+ ///
+ long Errors { get; }
+
+ ///
+ /// Last error message.
+ ///
+ string? LastError { get; }
+
+ ///
+ /// Average processing time in nanoseconds.
+ ///
+ long AverageProcessingTime { get; }
+
+ ///
+ /// Endpoint metadata.
+ ///
+ IDictionary? Metadata { get; }
+
+ ///
+ /// The subject name to subscribe to.
+ ///
+ string Subject { get; }
+
+ ///
+ /// Endpoint queue group.
+ ///
+ ///
+ /// If specified, the subscriber will join this queue group. Subscribers with the same queue group name,
+ /// become a queue group, and only one randomly chosen subscriber of the queue group will
+ /// consume a message each time a message is received by the queue group.
+ ///
+ string? QueueGroup { get; }
+}
+
+///
+/// Endpoint base class exposing general stats.
+///
+public abstract class NatsSvcEndpointBase : NatsSubBase, INatsSvcEndpoint
+{
+ protected NatsSvcEndpointBase(NatsConnection connection, ISubscriptionManager manager, string subject, string? queueGroup, NatsSubOpts? opts)
+ : base(connection, manager, subject, queueGroup, opts)
+ {
+ }
+
+ ///
+ public abstract long Requests { get; }
+
+ ///
+ public abstract long ProcessingTime { get; }
+
+ ///
+ public abstract long Errors { get; }
+
+ ///
+ public abstract string? LastError { get; }
+
+ ///
+ public abstract long AverageProcessingTime { get; }
+
+ ///
+ public abstract IDictionary? Metadata { get; }
+
+ internal abstract void IncrementErrors();
+
+ internal abstract void SetLastError(string error);
+}
+
+///
+/// NATS service endpoint.
+///
+/// Serialized type to use when receiving data.
+public class NatsSvcEndpoint : NatsSvcEndpointBase
+{
+ private readonly ILogger _logger;
+ private readonly Func, ValueTask> _handler;
+ private readonly NatsConnection _nats;
+ private readonly string _name;
+ private readonly CancellationToken _cancellationToken;
+ private readonly Channel> _channel;
+ private readonly INatsSerializer _serializer;
+ private readonly Task _handlerTask;
+
+ private long _requests;
+ private long _errors;
+ private long _processingTime;
+ private string? _lastError;
+
+ ///
+ /// Creates a new instance of .
+ ///
+ /// NATS connection.
+ /// Queue group.
+ /// Optional endpoint name.
+ /// Callback function to handle messages received.
+ /// Optional subject name.
+ /// Endpoint metadata.
+ /// Subscription options.
+ /// A used to cancel the API call.
+ public NatsSvcEndpoint(NatsConnection nats, string? queueGroup, string name, Func, ValueTask> handler, string subject, IDictionary? metadata, NatsSubOpts? opts, CancellationToken cancellationToken)
+ : base(nats, nats.SubscriptionManager, subject, queueGroup, opts)
+ {
+ _logger = nats.Opts.LoggerFactory.CreateLogger>();
+ _handler = handler;
+ _nats = nats;
+ _name = name;
+ Metadata = metadata;
+ _cancellationToken = cancellationToken;
+ _serializer = opts?.Serializer ?? _nats.Opts.Serializer;
+ _channel = Channel.CreateBounded>(128);
+ _handlerTask = Task.Run(HandlerLoop);
+ }
+
+ ///
+ public override long Requests => Volatile.Read(ref _requests);
+
+ ///
+ public override long ProcessingTime => Volatile.Read(ref _processingTime);
+
+ ///
+ public override long Errors => Volatile.Read(ref _errors);
+
+ ///
+ public override string? LastError => Volatile.Read(ref _lastError);
+
+ ///
+ public override long AverageProcessingTime => Requests == 0 ? 0 : ProcessingTime / Requests;
+
+ ///
+ public override IDictionary? Metadata { get; }
+
+ ///
+ public override async ValueTask DisposeAsync()
+ {
+ await base.DisposeAsync();
+ await _handlerTask;
+ }
+
+ internal override void IncrementErrors() => Interlocked.Increment(ref _errors);
+
+ internal override void SetLastError(string error) => Interlocked.Exchange(ref _lastError, error);
+
+ internal ValueTask StartAsync(CancellationToken cancellationToken) =>
+ _nats.SubAsync(this, cancellationToken);
+
+ protected override ValueTask ReceiveInternalAsync(
+ string subject,
+ string? replyTo,
+ ReadOnlySequence? headersBuffer,
+ ReadOnlySequence payloadBuffer)
+ {
+ NatsMsg msg;
+ Exception? exception;
+ try
+ {
+ msg = NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _nats.HeaderParser, _serializer);
+ exception = null;
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, "Endpoint {Name} error building message", _name);
+ exception = e;
+
+ // Most likely a serialization error.
+ // Make sure we have a valid message
+ // so handler can reply with an error.
+ msg = new NatsMsg(subject, replyTo, subject.Length + (replyTo?.Length ?? 0), default, default, _nats);
+ }
+
+ return _channel.Writer.WriteAsync(new NatsSvcMsg(msg, this, exception), _cancellationToken);
+ }
+
+ protected override void TryComplete() => _channel.Writer.TryComplete();
+
+ private async Task HandlerLoop()
+ {
+ var stopwatch = new Stopwatch();
+ await foreach (var svcMsg in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false))
+ {
+ Interlocked.Increment(ref _requests);
+ stopwatch.Restart();
+ try
+ {
+ await _handler(svcMsg).ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ int code;
+ string message;
+ string body;
+ if (e is NatsSvcEndpointException epe)
+ {
+ code = epe.Code;
+ message = epe.Message;
+ body = epe.Body;
+ }
+ else
+ {
+ // Do not expose exceptions unless explicitly
+ // thrown as NatsSvcEndpointException
+ code = 999;
+ message = "Handler error";
+ body = string.Empty;
+
+ // Only log unknown exceptions
+ _logger.LogError(e, "Endpoint {Name} error processing message", _name);
+ }
+
+ try
+ {
+ if (string.IsNullOrWhiteSpace(body))
+ {
+ await svcMsg.ReplyErrorAsync(code, message, cancellationToken: _cancellationToken);
+ }
+ else
+ {
+ await svcMsg.ReplyErrorAsync(code, message, data: Encoding.UTF8.GetBytes(body), cancellationToken: _cancellationToken);
+ }
+ }
+ catch (Exception e1)
+ {
+ _logger.LogError(e1, "Endpoint {Name} error responding", _name);
+ }
+ }
+ finally
+ {
+ Interlocked.Add(ref _processingTime, ToNanos(stopwatch.Elapsed));
+ }
+ }
+ }
+
+ private long ToNanos(TimeSpan timeSpan) => (long)(timeSpan.TotalMilliseconds * 1_000_000);
+}
diff --git a/src/NATS.Client.Services/NatsSvcException.cs b/src/NATS.Client.Services/NatsSvcException.cs
new file mode 100644
index 000000000..b9b418941
--- /dev/null
+++ b/src/NATS.Client.Services/NatsSvcException.cs
@@ -0,0 +1,47 @@
+using NATS.Client.Core;
+
+namespace NATS.Client.Services;
+
+///
+/// NATS service exception.
+///
+public class NatsSvcException : NatsException
+{
+ ///
+ /// Creates a new instance of .
+ ///
+ /// Exception message.
+ public NatsSvcException(string message)
+ : base(message)
+ {
+ }
+}
+
+///
+/// NATS service endpoint exception.
+///
+public class NatsSvcEndpointException : NatsException
+{
+ ///
+ /// Creates a new instance of .
+ ///
+ /// Error code.
+ /// Error message
+ /// Optional error body.
+ public NatsSvcEndpointException(int code, string message, string? body = default)
+ : base(message)
+ {
+ Code = code;
+ Body = body ?? string.Empty;
+ }
+
+ ///
+ /// Error code.
+ ///
+ public int Code { get; }
+
+ ///
+ /// Error body.
+ ///
+ public string Body { get; }
+}
diff --git a/src/NATS.Client.Services/NatsSvcMsg.cs b/src/NATS.Client.Services/NatsSvcMsg.cs
new file mode 100644
index 000000000..d4aa8ac5b
--- /dev/null
+++ b/src/NATS.Client.Services/NatsSvcMsg.cs
@@ -0,0 +1,119 @@
+using NATS.Client.Core;
+
+namespace NATS.Client.Services;
+
+///
+/// NATS service exception.
+///
+///
+public readonly struct NatsSvcMsg
+{
+ private readonly NatsMsg _msg;
+ private readonly NatsSvcEndpointBase? _endPoint;
+
+ ///
+ /// Creates a new instance of .
+ ///
+ /// NATS message.
+ /// Service endpoint.
+ /// Optional exception if there were any errors.
+ public NatsSvcMsg(NatsMsg msg, NatsSvcEndpointBase? endPoint, Exception? exception)
+ {
+ Exception = exception;
+ _msg = msg;
+ _endPoint = endPoint;
+ }
+
+ ///
+ /// Optional exception if there were any errors.
+ ///
+ ///
+ /// Check this property to see if there were any errors before processing the message.
+ ///
+ public Exception? Exception { get; }
+
+ ///
+ /// Message subject.
+ ///
+ public string Subject => _msg.Subject;
+
+ ///
+ /// Message data.
+ ///
+ public T? Data => _msg.Data;
+
+ ///
+ /// Message reply-to subject.
+ ///
+ public string? ReplyTo => _msg.ReplyTo;
+
+ ///
+ /// Send a reply with an empty message body.
+ ///
+ /// Optional message headers.
+ /// Optional reply-to subject.
+ /// Optional publishing options.
+ /// A used to cancel the API call.
+ /// A representing the asynchronous operation.
+ public ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) =>
+ _msg.ReplyAsync(headers, replyTo, opts, cancellationToken);
+
+ ///
+ /// Send a reply with a message body.
+ ///
+ /// Data to be sent.
+ /// Optional message headers.
+ /// Optional reply-to subject.
+ /// Optional publishing options.
+ /// A used to cancel the API call.
+ /// A serializable type as data.
+ /// A representing the asynchronous operation.
+ public ValueTask ReplyAsync(TReply data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) =>
+ _msg.ReplyAsync(data, headers, replyTo, opts, cancellationToken);
+
+ ///
+ /// Reply with an error and additional data as error body.
+ ///
+ /// Error code.
+ /// Error message.
+ /// Error body.
+ /// Optional additional headers.
+ /// Optional reply-to subject.
+ /// Optional publishing options.
+ /// A used to cancel the API call.
+ /// A serializable type as data.
+ /// A representing the asynchronous operation.
+ public ValueTask ReplyErrorAsync(int code, string message, TReply data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
+ {
+ headers ??= new NatsHeaders();
+ headers.Add("Nats-Service-Error-Code", $"{code}");
+ headers.Add("Nats-Service-Error", $"{message}");
+
+ _endPoint?.IncrementErrors();
+ _endPoint?.SetLastError($"{message} ({code})");
+
+ return ReplyAsync(data, headers, replyTo, opts, cancellationToken);
+ }
+
+ ///
+ /// Reply with an error.
+ ///
+ /// Error code.
+ /// Error message.
+ /// Optional additional headers.
+ /// Optional reply-to subject.
+ /// Optional publishing options.
+ /// A used to cancel the API call.
+ /// A representing the asynchronous operation.
+ public ValueTask ReplyErrorAsync(int code, string message, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
+ {
+ headers ??= new NatsHeaders();
+ headers.Add("Nats-Service-Error-Code", $"{code}");
+ headers.Add("Nats-Service-Error", $"{message}");
+
+ _endPoint?.IncrementErrors();
+ _endPoint?.SetLastError($"{message} ({code})");
+
+ return ReplyAsync(headers: headers, replyTo: replyTo, opts: opts, cancellationToken: cancellationToken);
+ }
+}
diff --git a/src/NATS.Client.Services/NatsSvcServer.cs b/src/NATS.Client.Services/NatsSvcServer.cs
new file mode 100644
index 000000000..b77d656aa
--- /dev/null
+++ b/src/NATS.Client.Services/NatsSvcServer.cs
@@ -0,0 +1,328 @@
+using System.Collections.Concurrent;
+using System.Text.Json.Nodes;
+using System.Threading.Channels;
+using Microsoft.Extensions.Logging;
+using NATS.Client.Core;
+using NATS.Client.Core.Internal;
+using NATS.Client.Services.Internal;
+using NATS.Client.Services.Models;
+
+namespace NATS.Client.Services;
+
+///
+/// NATS service server.
+///
+public class NatsSvcServer : IAsyncDisposable
+{
+ private readonly ILogger _logger;
+ private readonly string _id;
+ private readonly NatsConnection _nats;
+ private readonly NatsSvcConfig _config;
+ private readonly CancellationToken _cancellationToken;
+ private readonly Channel _channel;
+ private readonly Task _taskMsgLoop;
+ private readonly List _svcListeners = new();
+ private readonly ConcurrentDictionary _endPoints = new();
+ private readonly string _started;
+ private readonly CancellationTokenSource _cts;
+
+ ///
+ /// Creates a new instance of .
+ ///
+ /// NATS connection.
+ /// Service configuration.
+ /// A used to cancel the service creation requests.
+ public NatsSvcServer(NatsConnection nats, NatsSvcConfig config, CancellationToken cancellationToken)
+ {
+ _logger = nats.Opts.LoggerFactory.CreateLogger();
+ _id = NuidWriter.NewNuid();
+ _nats = nats;
+ _config = config;
+ _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ _cancellationToken = _cts.Token;
+ _channel = Channel.CreateBounded(32);
+ _taskMsgLoop = Task.Run(MsgLoop);
+ _started = DateTimeOffset.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ");
+ }
+
+ ///
+ /// Stop the service.
+ ///
+ /// A used to cancel the stop operation.
+ /// A representing the asynchronous operation.
+ public async ValueTask StopAsync(CancellationToken cancellationToken = default)
+ {
+ foreach (var listener in _svcListeners)
+ {
+ await listener.DisposeAsync();
+ }
+
+ // Drain buffers
+ await _nats.PingAsync(cancellationToken);
+
+ foreach (var ep in _endPoints.Values)
+ {
+ await ep.DisposeAsync();
+ }
+
+ _channel.Writer.TryComplete();
+
+ _cts.Cancel();
+
+ await _taskMsgLoop;
+ }
+
+ ///
+ /// Adds a new endpoint.
+ ///
+ /// Callback for handling incoming messages.
+ /// Optional endpoint name.
+ /// Optional endpoint subject.
+ /// Optional endpoint metadata.
+ /// A used to stop the endpoint.
+ /// Serialization type for messages received.
+ /// A representing the asynchronous operation.
+ ///
+ /// One of name or subject must be specified.
+ ///
+ public ValueTask AddEndpointAsync(Func, ValueTask> handler, string? name = default, string? subject = default, IDictionary? metadata = default, CancellationToken cancellationToken = default) =>
+ AddEndpointInternalAsync(handler, name, subject, _config.QueueGroup, metadata, cancellationToken);
+
+ ///
+ /// Adds a new service group with optional queue group.
+ ///
+ /// Name of the group.
+ /// Queue group name.
+ /// A may be used to cancel th call in the future.
+ /// A representing the asynchronous operation.
+ public ValueTask AddGroupAsync(string name, string? queueGroup = default, CancellationToken cancellationToken = default)
+ {
+ var group = new Group(this, name, queueGroup, cancellationToken);
+ return ValueTask.FromResult(group);
+ }
+
+ ///
+ /// Stop the service.
+ ///
+ public async ValueTask DisposeAsync()
+ {
+ await StopAsync(_cancellationToken);
+ GC.SuppressFinalize(this);
+ }
+
+ internal async ValueTask StartAsync()
+ {
+ var name = _config.Name;
+
+ foreach (var svcType in new[] { SvcMsgType.Ping, SvcMsgType.Info, SvcMsgType.Stats })
+ {
+ var type = svcType.ToString().ToUpper();
+ foreach (var subject in new[] { $"$SRV.{type}", $"$SRV.{type}.{name}", $"$SRV.{type}.{name}.{_id}" })
+ {
+ var svcListener = new SvcListener(_nats, _channel, svcType, subject, _config.QueueGroup, _cancellationToken);
+ await svcListener.StartAsync();
+ _svcListeners.Add(svcListener);
+ }
+ }
+ }
+
+ private async ValueTask AddEndpointInternalAsync(Func, ValueTask> handler, string? name, string? subject, string? queueGroup, IDictionary? metadata, CancellationToken cancellationToken)
+ {
+ var epSubject = subject ?? name ?? throw new NatsSvcException("Either name or subject must be specified");
+ var epName = name ?? epSubject;
+
+ var ep = new NatsSvcEndpoint(_nats, queueGroup, epName, handler, epSubject, metadata, opts: default, cancellationToken);
+
+ if (!_endPoints.TryAdd(epName, ep))
+ {
+ await using (ep)
+ {
+ throw new NatsSvcException($"Endpoint '{name}' already exists");
+ }
+ }
+
+ await ep.StartAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ private async Task MsgLoop()
+ {
+ await foreach (var svcMsg in _channel.Reader.ReadAllAsync(_cancellationToken))
+ {
+ try
+ {
+ var type = svcMsg.MsgType;
+ var data = svcMsg.Msg.Data;
+
+ if (type == SvcMsgType.Ping)
+ {
+ using (data)
+ {
+ // empty request payload
+ }
+
+ await svcMsg.Msg.ReplyAsync(
+ new PingResponse { Name = _config.Name, Id = _id, Version = _config.Version, },
+ cancellationToken: _cancellationToken);
+ }
+ else if (type == SvcMsgType.Info)
+ {
+ using (data)
+ {
+ // empty request payload
+ }
+
+ var endPoints = _endPoints.Select(ep => new EndpointInfo
+ {
+ Name = ep.Key,
+ Subject = ep.Value.Subject,
+ QueueGroup = ep.Value.QueueGroup!,
+ Metadata = ep.Value.Metadata!,
+ }).ToList();
+
+ await svcMsg.Msg.ReplyAsync(
+ new InfoResponse
+ {
+ Name = _config.Name,
+ Id = _id,
+ Version = _config.Version,
+ Description = _config.Description!,
+ Metadata = _config.Metadata!,
+ Endpoints = endPoints,
+ },
+ cancellationToken: _cancellationToken);
+ }
+ else if (type == SvcMsgType.Stats)
+ {
+ using (data)
+ {
+ // empty request payload
+ }
+
+ var endPoints = _endPoints.Select(ep =>
+ {
+ JsonNode? statsData;
+ try
+ {
+ statsData = _config.StatsHandler?.Invoke();
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error calling stats handler for {Endpoint}", ep.Key);
+ statsData = null;
+ }
+
+ return new EndpointStats
+ {
+ Name = ep.Key,
+ Subject = ep.Value.Subject,
+ QueueGroup = ep.Value.QueueGroup!,
+ Data = statsData!,
+ ProcessingTime = ep.Value.ProcessingTime,
+ NumRequests = ep.Value.Requests,
+ NumErrors = ep.Value.Errors,
+ LastError = ep.Value.LastError!,
+ AverageProcessingTime = ep.Value.AverageProcessingTime,
+ };
+ }).ToList();
+
+ var response = new StatsResponse
+ {
+ Name = _config.Name,
+ Id = _id,
+ Version = _config.Version,
+ Metadata = _config.Metadata!,
+ Endpoints = endPoints,
+ Started = _started,
+ };
+
+ await svcMsg.Msg.ReplyAsync(
+ response,
+ cancellationToken: _cancellationToken);
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Message loop error");
+ }
+ }
+ }
+
+ ///
+ /// NATS service group.
+ ///
+ public class Group
+ {
+ private readonly NatsSvcServer _server;
+ private readonly CancellationToken _cancellationToken;
+ private readonly string _dot;
+
+ ///
+ /// Creates a new instance of .
+ ///
+ /// Service instance.
+ /// Group name.
+ /// Optional queue group.
+ /// A may be used to cancel th call in the future.
+ public Group(NatsSvcServer server, string groupName, string? queueGroup = default, CancellationToken cancellationToken = default)
+ {
+ ValidateGroupName(groupName);
+ _server = server;
+ GroupName = groupName;
+ QueueGroup = queueGroup;
+ _cancellationToken = cancellationToken;
+ _dot = GroupName.Length == 0 ? string.Empty : ".";
+ }
+
+ public string GroupName { get; }
+
+ public string? QueueGroup { get; }
+
+ ///
+ /// Adds a new endpoint.
+ ///
+ /// Callback for handling incoming messages.
+ /// Optional endpoint name.
+ /// Optional endpoint subject.
+ /// Optional endpoint metadata.
+ /// A used to stop the endpoint.
+ /// Serialization type for messages received.
+ /// A representing the asynchronous operation.
+ ///
+ /// One of name or subject must be specified.
+ ///
+ public ValueTask AddEndpointAsync(Func, ValueTask> handler, string? name = default, string? subject = default, IDictionary? metadata = default, CancellationToken cancellationToken = default)
+ {
+ var epName = name != null ? $"{GroupName}{_dot}{name}" : null;
+ var epSubject = subject != null ? $"{GroupName}{_dot}{subject}" : null;
+ var queueGroup = QueueGroup ?? _server._config.QueueGroup;
+ return _server.AddEndpointInternalAsync(handler, epName, epSubject, queueGroup, metadata, cancellationToken);
+ }
+
+ ///
+ /// Adds a new service group with optional queue group.
+ ///
+ /// Name of the group.
+ /// Optional queue group name.
+ /// A may be used to cancel th call in the future.
+ /// A representing the asynchronous operation.
+ public ValueTask AddGroupAsync(string name, string? queueGroup = default, CancellationToken cancellationToken = default)
+ {
+ var groupName = $"{GroupName}{_dot}{name}";
+ return _server.AddGroupAsync(groupName, queueGroup, cancellationToken);
+ }
+
+ private void ValidateGroupName(string groupName)
+ {
+ foreach (var c in groupName)
+ {
+ switch (c)
+ {
+ case '>':
+ throw new NatsSvcException("Invalid group name (can't have '>' wildcard in group name)");
+ case '\r' or '\n' or ' ':
+ throw new NatsSvcException("Invalid group name (must be a valid NATS subject)");
+ }
+ }
+ }
+ }
+}
diff --git a/tests/NATS.Client.Services.Tests/NATS.Client.Services.Tests.csproj b/tests/NATS.Client.Services.Tests/NATS.Client.Services.Tests.csproj
new file mode 100644
index 000000000..6100af0a8
--- /dev/null
+++ b/tests/NATS.Client.Services.Tests/NATS.Client.Services.Tests.csproj
@@ -0,0 +1,35 @@
+
+
+
+ net6.0
+ enable
+ enable
+
+ false
+ true
+
+
+
+
+
+
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+ all
+
+
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+ all
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/NATS.Client.Services.Tests/ServicesTests.cs b/tests/NATS.Client.Services.Tests/ServicesTests.cs
new file mode 100644
index 000000000..ebb777fc2
--- /dev/null
+++ b/tests/NATS.Client.Services.Tests/ServicesTests.cs
@@ -0,0 +1,245 @@
+using System.Text.Json.Nodes;
+using NATS.Client.Core.Tests;
+using NATS.Client.Services.Models;
+
+namespace NATS.Client.Services.Tests;
+
+public class ServicesTests
+{
+ private readonly ITestOutputHelper _output;
+
+ public ServicesTests(ITestOutputHelper output) => _output = output;
+
+ [Fact]
+ public async Task Add_service_listeners_ping_info_and_stats()
+ {
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ var cancellationToken = cts.Token;
+
+ await using var server = NatsServer.Start();
+ await using var nats = server.CreateClientConnection();
+ var svc = new NatsSvcContext(nats);
+
+ await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken);
+
+ var pingsTask = FindServices(server, "$SRV.PING", 1, cancellationToken);
+ var infosTask = FindServices(server, "$SRV.INFO", 1, cancellationToken);
+ var statsTask = FindServices(server, "$SRV.STATS", 1, cancellationToken);
+
+ var pings = await pingsTask;
+ pings.ForEach(x => _output.WriteLine($"{x}"));
+ Assert.Single(pings);
+ Assert.Equal("s1", pings[0].Name);
+ Assert.Equal("1.0.0", pings[0].Version);
+
+ var infos = await infosTask;
+ infos.ForEach(x => _output.WriteLine($"{x}"));
+ Assert.Single(infos);
+ Assert.Equal("s1", infos[0].Name);
+ Assert.Equal("1.0.0", infos[0].Version);
+
+ var stats = await statsTask;
+ stats.ForEach(x => _output.WriteLine($"{x}"));
+ Assert.Single(stats);
+ Assert.Equal("s1", stats[0].Name);
+ Assert.Equal("1.0.0", stats[0].Version);
+ }
+
+ [Fact]
+ public async Task Add_end_point()
+ {
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ var cancellationToken = cts.Token;
+
+ await using var server = NatsServer.Start();
+ await using var nats = server.CreateClientConnection();
+ var svc = new NatsSvcContext(nats);
+
+ await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken);
+
+ await s1.AddEndpointAsync(
+ name: "e1",
+ handler: async m =>
+ {
+ if (m.Data == 7)
+ {
+ await m.ReplyErrorAsync(m.Data, $"Error{m.Data}", cancellationToken: cancellationToken);
+ return;
+ }
+
+ if (m.Data == 8)
+ {
+ throw new NatsSvcEndpointException(m.Data, $"Error{m.Data}");
+ }
+
+ if (m.Data == 9)
+ {
+ throw new Exception("this won't be exposed");
+ }
+
+ await m.ReplyAsync(m.Data * m.Data, cancellationToken: cancellationToken);
+ },
+ cancellationToken: cancellationToken);
+
+ var info = (await FindServices(server, "$SRV.INFO", 1, cancellationToken)).First();
+ Assert.Single(info.Endpoints);
+ var endpointInfo = info.Endpoints.First();
+ Assert.Equal("e1", endpointInfo.Name);
+
+ for (var i = 0; i < 10; i++)
+ {
+ var response = await nats.RequestAsync(endpointInfo.Subject, i, cancellationToken: cancellationToken);
+ if (i is 7 or 8)
+ {
+ Assert.Equal($"{i}", response?.Headers?["Nats-Service-Error-Code"]);
+ Assert.Equal($"Error{i}", response?.Headers?["Nats-Service-Error"]);
+ }
+ else if (i is 9)
+ {
+ Assert.Equal("999", response?.Headers?["Nats-Service-Error-Code"]);
+ Assert.Equal("Handler error", response?.Headers?["Nats-Service-Error"]);
+ }
+ else
+ {
+ Assert.Equal(i * i, response?.Data);
+ Assert.Null(response?.Headers);
+ }
+ }
+
+ var stat = (await FindServices(server, "$SRV.STATS", 1, cancellationToken)).First();
+ Assert.Single(stat.Endpoints);
+ var endpointStats = stat.Endpoints.First();
+ Assert.Equal("e1", endpointStats.Name);
+ Assert.Equal(10, endpointStats.NumRequests);
+ Assert.Equal(3, endpointStats.NumErrors);
+ Assert.Equal("Handler error (999)", endpointStats.LastError);
+ Assert.True(endpointStats.ProcessingTime > 0);
+ Assert.True(endpointStats.AverageProcessingTime > 0);
+ }
+
+ [Fact]
+ public async Task Add_groups_metadata_and_stats()
+ {
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ var cancellationToken = cts.Token;
+
+ await using var server = NatsServer.Start();
+ await using var nats = server.CreateClientConnection();
+ var svc = new NatsSvcContext(nats);
+
+ await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken);
+
+ await s1.AddEndpointAsync(
+ name: "baz",
+ subject: "foo.baz",
+ handler: m => ValueTask.CompletedTask,
+ cancellationToken: cancellationToken);
+
+ await s1.AddEndpointAsync(
+ subject: "foo.bar1",
+ handler: m => ValueTask.CompletedTask,
+ cancellationToken: cancellationToken);
+
+ var grp1 = await s1.AddGroupAsync("grp1", cancellationToken: cancellationToken);
+
+ await grp1.AddEndpointAsync(
+ name: "e1",
+ handler: m => ValueTask.CompletedTask,
+ cancellationToken: cancellationToken);
+
+ await grp1.AddEndpointAsync(
+ name: "e2",
+ subject: "foo.bar2",
+ handler: m => ValueTask.CompletedTask,
+ cancellationToken: cancellationToken);
+
+ var grp2 = await s1.AddGroupAsync(string.Empty, queueGroup: "q_empty", cancellationToken: cancellationToken);
+
+ await grp2.AddEndpointAsync(
+ name: "empty1",
+ subject: "foo.empty1",
+ handler: m => ValueTask.CompletedTask,
+ cancellationToken: cancellationToken);
+
+ // Check that the endpoints are registered correctly
+ {
+ var info = (await FindServices(server, "$SRV.INFO.s1", 1, cancellationToken)).First();
+ Assert.Equal(5, info.Endpoints.Count);
+ var endpoints = info.Endpoints.ToList();
+
+ Assert.Equal("foo.baz", info.Endpoints.First(e => e.Name == "baz").Subject);
+ Assert.Equal("q", info.Endpoints.First(e => e.Name == "baz").QueueGroup);
+
+ Assert.Equal("foo.bar1", info.Endpoints.First(e => e.Name == "foo.bar1").Subject);
+ Assert.Equal("q", info.Endpoints.First(e => e.Name == "foo.bar1").QueueGroup);
+
+ Assert.Equal("grp1.e1", info.Endpoints.First(e => e.Name == "grp1.e1").Subject);
+ Assert.Equal("q", info.Endpoints.First(e => e.Name == "grp1.e1").QueueGroup);
+
+ Assert.Equal("grp1.foo.bar2", info.Endpoints.First(e => e.Name == "grp1.e2").Subject);
+ Assert.Equal("q", info.Endpoints.First(e => e.Name == "grp1.e2").QueueGroup);
+
+ Assert.Equal("foo.empty1", info.Endpoints.First(e => e.Name == "empty1").Subject);
+ Assert.Equal("q_empty", info.Endpoints.First(e => e.Name == "empty1").QueueGroup);
+ }
+
+ await using var s2 = await svc.AddServiceAsync(
+ new NatsSvcConfig("s2", "2.0.0")
+ {
+ Description = "es-two",
+ QueueGroup = "q2",
+ Metadata = new Dictionary { { "k1", "v1" }, { "k2", "v2" }, },
+ StatsHandler = () => JsonNode.Parse("{\"stat-k1\":\"stat-v1\",\"stat-k2\":\"stat-v2\"}")!,
+ },
+ cancellationToken: cancellationToken);
+
+ await s2.AddEndpointAsync(
+ name: "s2baz",
+ subject: "s2foo.baz",
+ handler: m => ValueTask.CompletedTask,
+ metadata: new Dictionary { { "ep-k1", "ep-v1" } },
+ cancellationToken: cancellationToken);
+
+ // Check default queue group and stats handler
+ {
+ var info = (await FindServices(server, "$SRV.INFO.s2", 1, cancellationToken)).First();
+ Assert.Single(info.Endpoints);
+ var epi = info.Endpoints.First();
+
+ Assert.Equal("s2baz", epi.Name);
+ Assert.Equal("s2foo.baz", epi.Subject);
+ Assert.Equal("q2", epi.QueueGroup);
+ Assert.Equal("ep-v1", epi.Metadata["ep-k1"]);
+
+ var stat = (await FindServices(server, "$SRV.STATS.s2", 1, cancellationToken)).First();
+ Assert.Equal("v1", stat.Metadata["k1"]);
+ Assert.Equal("v2", stat.Metadata["k2"]);
+ Assert.Single(stat.Endpoints);
+ var eps = stat.Endpoints.First();
+ Assert.Equal("stat-v1", eps.Data["stat-k1"]?.GetValue());
+ Assert.Equal("stat-v2", eps.Data["stat-k2"]?.GetValue());
+ }
+ }
+
+ private static async Task> FindServices(NatsServer server, string subject, int limit, CancellationToken ct)
+ {
+ await using var nats = server.CreateClientConnection();
+ var replyOpts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(2) };
+ var responses = new List();
+
+ var count = 0;
+ await foreach (var msg in nats.RequestManyAsync