Skip to content

Commit

Permalink
Service api docs
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Oct 26, 2023
1 parent e22d1bc commit 62dd895
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 31 deletions.
2 changes: 2 additions & 0 deletions docs/documentation/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ these docs. You can also create a Pull Request using the Edit on GitHub link on
[Key/Value Store](key-value-store/intro.md) is the built-in distributed persistent associative arrays built on top of JetStream.

[Object Store](object-store/intro.md) is the built-in distributed persistent objects of arbitrary size built on top of JetStream.

[Services](services/intro.md) is the services protocol built on top of core NATS enabling discovery and monitoring of services you develop.
2 changes: 1 addition & 1 deletion docs/documentation/object-store/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Let's create our store first. In Object Store, a bucket is simply a storage for
var store = await obj.CreateObjectStore("test-bucket");
```

Now that we have a KV bucket in our stream, let's see its status using the [NATS command
Now that we have a bucket in our stream, let's see its status using the [NATS command
line client](https://github.com/nats-io/natscli):

```shell
Expand Down
102 changes: 102 additions & 0 deletions docs/documentation/services/intro.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Services

[Services](https://docs.nats.io/using-nats/developer/services) is a protocol that provides first-class services support
for NATS clients and it's supported by NATS tooling. This services protocol is an agreement between clients and tooling and
doesn't require any special functionality from the NATS server or JetStream.

To be able to use Services you need to running the `nats-server`.

## Services Quick Start

[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it:

```shell
$ nats-server
```

Install `NATS.Client.Services` preview from Nuget.

Before we can do anything, we need a Services context:

```csharp
await using var nats = new NatsConnection();
var svc = new NatsSvcContext(nats);
```

Let's create our first service:

```csharp
await using var testService = await svc.AddServiceAsync("test", "1.0.0");
```

Now that we have a service in our stream, let's see its status using the [NATS command
line client](https://github.com/nats-io/natscli) (make sure you have at least v0.1.1):

```shell
$ nats --version
0.1.1
```

```shell
$ nats micro info test
Service Information

Service: test (Bw6eqhVYs3dbNzZecuuFOV)
Description:
Version: 1.0.0

Endpoints:

Statistics for 0 Endpoint(s):
```

Now we can add endpoints to our service:

```csharp
await testService.AddEndPointAsync<int>(name: "divide42", handler: async m =>
{
if (m.Data == 0)
{
await m.ReplyErrorAsync(400, "Division by zero");
return;
}

await m.ReplyAsync(42 / m.Data);
});
```

We can also confirm that our endpoint is registered by using the NATS command line:

```shell
$ nats req divide42 2
11:34:03 Sending request on "divide42"
11:34:03 Received with rtt 9.5823ms
21

$ nats micro stats test
╭──────────────────────────────────────────────────────────────────────────────────────────────────────╮
test Service Statistics │
├────────────────────────┬──────────┬──────────┬─────────────┬────────┬─────────────────┬──────────────┤
│ ID │ Endpoint │ Requests │ Queue Group │ Errors │ Processing Time │ Average Time │
├────────────────────────┼──────────┼──────────┼─────────────┼────────┼─────────────────┼──────────────┤
│ RH6q9Y6qM8em8m6lG2yN34 │ divide42 │ 1 │ q │ 0 │ 1ms │ 1ms │
├────────────────────────┼──────────┼──────────┼─────────────┼────────┼─────────────────┼──────────────┤
│ │ │ 1 │ │ 0 │ 1MS │ 1MS │
╰────────────────────────┴──────────┴──────────┴─────────────┴────────┴─────────────────┴──────────────╯
```

## Groups

A group is a collection of endpoints. These are optional and can provide a logical association between endpoints
as well as an optional common subject prefix for all endpoints.

You can group your endpoints optionally in different [queue groups](https://docs.nats.io/nats-concepts/core-nats/queue):

```csharp
var grp1 = await testService.AddGroupAsync("grp1");

await grp1.AddEndPointAsync<int>(name: "ep1", handler: async m =>
{
// handle message
});
```
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The NATS.NET V2 client is in preview and not recommended for production use yet.
- [x] JetStream initial support
- [x] KV initial support
- [x] Object Store initial support
- [ ] Service API initial support
- [x] Service API initial support
- [ ] .NET 8.0 support (e.g. Native AOT)
- [ ] Beta phase

Expand Down
6 changes: 0 additions & 6 deletions src/NATS.Client.Services/Models/StatsResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,13 @@ public record EndpointStats
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string LastError { get; set; } = default!;

/**
* A field that can be customized with any data as returned by stats handler see {@link ServiceConfig}
*/
[JsonPropertyName("data")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public JsonNode Data { get; set; } = default!;

[JsonPropertyName("processing_time")]
public long ProcessingTime { get; set; }

/**
* Average processing_time is the total processing_time divided by the num_requests
*/
[JsonPropertyName("average_processing_time")]
public long AverageProcessingTime { get; set; }
}
28 changes: 28 additions & 0 deletions src/NATS.Client.Services/NatsSvcConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@

namespace NATS.Client.Services;

/// <summary>
/// NATS service configuration.
/// </summary>
public record NatsSvcConfig
{
private static readonly Regex NameRegex = new(@"^[a-zA-Z0-9_-]+$", RegexOptions.Compiled);
private static readonly Regex VersionRegex = new(@"^(?<major>0|[1-9]\d*)\.(?<minor>0|[1-9]\d*)\.(?<patch>0|[1-9]\d*)(?:-(?<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?<buildmetadata>[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$", RegexOptions.Compiled);

/// <summary>
/// Creates a new instance of <see cref="NatsSvcConfig"/>.
/// </summary>
/// <param name="name">Service name.</param>
/// <param name="version">Service SemVer version.</param>
/// <exception cref="ArgumentException">Name or version is invalid.</exception>
public NatsSvcConfig(string name, string version)
{
if (!NameRegex.IsMatch(name))
Expand All @@ -24,15 +33,34 @@ public NatsSvcConfig(string name, string version)
Version = version;
}

/// <summary>
/// Service name.
/// </summary>
public string Name { get; }

/// <summary>
/// Service version. Must be a valid Semantic Versioning string.
/// </summary>
public string Version { get; }

/// <summary>
/// Service description.
/// </summary>
public string? Description { get; init; }

/// <summary>
/// Service metadata. This will be included in the service info.
/// </summary>
public Dictionary<string, string>? Metadata { get; init; }

/// <summary>
/// Queue group name. (default: "q")
/// </summary>
public string QueueGroup { get; init; } = "q";

/// <summary>
/// Stats handler. JSON object returned by this handler will be included in
/// the service stats <c>data</c> property.
/// </summary>
public Func<JsonNode>? StatsHandler { get; init; }
}
21 changes: 21 additions & 0 deletions src/NATS.Client.Services/NatsSvcContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,36 @@

namespace NATS.Client.Services;

/// <summary>
/// NATS service context.
/// </summary>
public class NatsSvcContext
{
private readonly NatsConnection _nats;

/// <summary>
/// Creates a new instance of <see cref="NatsSvcContext"/>.
/// </summary>
/// <param name="nats">NATS connection.</param>
public NatsSvcContext(NatsConnection nats) => _nats = nats;

/// <summary>
/// Adds a new service.
/// </summary>
/// <param name="name">Service name.</param>
/// <param name="version">Service SemVer version.</param>
/// <param name="queueGroup">Optional queue group (default: "q")</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>NATS Service instance.</returns>
public ValueTask<NatsSvcService> AddServiceAsync(string name, string version, string queueGroup = "q", CancellationToken cancellationToken = default) =>
AddServiceAsync(new NatsSvcConfig(name, version) { QueueGroup = queueGroup }, cancellationToken);

/// <summary>
/// Adds a new service.
/// </summary>
/// <param name="config">Service configuration.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>NATS Service instance.</returns>
public async ValueTask<NatsSvcService> AddServiceAsync(NatsSvcConfig config, CancellationToken cancellationToken = default)
{
var service = new NatsSvcService(_nats, config, cancellationToken);
Expand Down
58 changes: 57 additions & 1 deletion src/NATS.Client.Services/NatsSvcEndPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,40 @@

namespace NATS.Client.Services;

/// <summary>
/// NATS service endpoint.
/// </summary>
public interface INatsSvcEndPoint : IAsyncDisposable
{
/// <summary>
/// Number of requests received.
/// </summary>
long Requests { get; }

Check warning on line 20 in src/NATS.Client.Services/NatsSvcEndPoint.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 20 in src/NATS.Client.Services/NatsSvcEndPoint.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 20 in src/NATS.Client.Services/NatsSvcEndPoint.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 20 in src/NATS.Client.Services/NatsSvcEndPoint.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 20 in src/NATS.Client.Services/NatsSvcEndPoint.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 20 in src/NATS.Client.Services/NatsSvcEndPoint.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 20 in src/NATS.Client.Services/NatsSvcEndPoint.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 20 in src/NATS.Client.Services/NatsSvcEndPoint.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 20 in src/NATS.Client.Services/NatsSvcEndPoint.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 20 in src/NATS.Client.Services/NatsSvcEndPoint.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 20 in src/NATS.Client.Services/NatsSvcEndPoint.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 20 in src/NATS.Client.Services/NatsSvcEndPoint.cs

View workflow job for this annotation

GitHub Actions / memory test (main)


/// <summary>
/// Total processing time in nanoseconds.
/// </summary>
long ProcessingTime { get; }

/// <summary>
/// Number of errors.
/// </summary>
long Errors { get; }

/// <summary>
/// Last error message.
/// </summary>
string? LastError { get; }

/// <summary>
/// Average processing time in nanoseconds.
/// </summary>
long AverageProcessingTime { get; }

/// <summary>
/// Endpoint metadata.
/// </summary>
IDictionary<string, string>? Metadata { get; }

/// <summary>
Expand All @@ -28,37 +50,53 @@ public interface INatsSvcEndPoint : IAsyncDisposable
string Subject { get; }

/// <summary>
/// Endpoint queue group.
/// </summary>
/// <remarks>
/// If specified, the subscriber will join this queue group. Subscribers with the same queue group name,
/// become a queue group, and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </summary>
/// </remarks>
string? QueueGroup { get; }
}

/// <summary>
/// Endpoint base class exposing general stats.
/// </summary>
public abstract class NatsSvcEndPointBase : NatsSubBase, INatsSvcEndPoint
{
protected NatsSvcEndPointBase(NatsConnection connection, ISubscriptionManager manager, string subject, string? queueGroup, NatsSubOpts? opts)
: base(connection, manager, subject, queueGroup, opts)
{
}

/// <inheritdoc/>
public abstract long Requests { get; }

/// <inheritdoc/>
public abstract long ProcessingTime { get; }

/// <inheritdoc/>
public abstract long Errors { get; }

/// <inheritdoc/>
public abstract string? LastError { get; }

/// <inheritdoc/>
public abstract long AverageProcessingTime { get; }

/// <inheritdoc/>
public abstract IDictionary<string, string>? Metadata { get; }

internal abstract void IncrementErrors();

internal abstract void SetLastError(string error);
}

/// <summary>
/// NATS service endpoint.
/// </summary>
/// <typeparam name="T">Serialized type to use when receiving data.</typeparam>
public class NatsSvcEndPoint<T> : NatsSvcEndPointBase
{
private readonly ILogger _logger;
Expand All @@ -75,6 +113,17 @@ public class NatsSvcEndPoint<T> : NatsSvcEndPointBase
private long _processingTime;
private string? _lastError;

/// <summary>
/// Creates a new instance of <see cref="NatsSvcEndPoint{T}"/>.
/// </summary>
/// <param name="nats">NATS connection.</param>
/// <param name="queueGroup">Queue group.</param>
/// <param name="name">Optional endpoint name.</param>
/// <param name="handler">Callback function to handle messages received.</param>
/// <param name="subject">Optional subject name.</param>
/// <param name="metadata">Endpoint metadata.</param>
/// <param name="opts">Subscription options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
public NatsSvcEndPoint(NatsConnection nats, string? queueGroup, string name, Func<NatsSvcMsg<T>, ValueTask> handler, string subject, IDictionary<string, string>? metadata, NatsSubOpts? opts, CancellationToken cancellationToken)
: base(nats, nats.SubscriptionManager, subject, queueGroup, opts)
{
Expand All @@ -89,18 +138,25 @@ public NatsSvcEndPoint(NatsConnection nats, string? queueGroup, string name, Fun
_handlerTask = Task.Run(HandlerLoop);
}

/// <inheritdoc/>
public override long Requests => Volatile.Read(ref _requests);

/// <inheritdoc/>
public override long ProcessingTime => Volatile.Read(ref _processingTime);

/// <inheritdoc/>
public override long Errors => Volatile.Read(ref _errors);

/// <inheritdoc/>
public override string? LastError => Volatile.Read(ref _lastError);

/// <inheritdoc/>
public override long AverageProcessingTime => Requests == 0 ? 0 : ProcessingTime / Requests;

/// <inheritdoc/>
public override IDictionary<string, string>? Metadata { get; }

/// <inheritdoc/>
public override async ValueTask DisposeAsync()
{
await base.DisposeAsync();
Expand Down
Loading

0 comments on commit 62dd895

Please sign in to comment.